Now that the request function has been replaced by one using the new request management data structures the old one can go away. Deleting it makes rbd_dev_do_request() no longer needed, and deleting that makes other functions unneeded, and so on. Signed-off-by: Alex Elder <elder@xxxxxxxxxxx> --- drivers/block/rbd.c | 319 --------------------------------------------------- 1 file changed, 319 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 46a61dd..7caddea 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -621,18 +621,6 @@ static void rbd_put_client(struct rbd_client *rbdc) kref_put(&rbdc->kref, rbd_client_release); } -/* - * Destroy requests collection - */ -static void rbd_coll_release(struct kref *kref) -{ - struct rbd_req_coll *coll = - container_of(kref, struct rbd_req_coll, kref); - - dout("rbd_coll_release %p\n", coll); - kfree(coll); -} - static bool rbd_image_format_valid(u32 image_format) { return image_format == 1 || image_format == 2; @@ -876,28 +864,6 @@ static u64 rbd_segment_length(struct rbd_device *rbd_dev, return length; } -static int rbd_get_num_segments(struct rbd_image_header *header, - u64 ofs, u64 len) -{ - u64 start_seg; - u64 end_seg; - u64 result; - - if (!len) - return 0; - if (len - 1 > U64_MAX - ofs) - return -ERANGE; - - start_seg = ofs >> header->obj_order; - end_seg = (ofs + len - 1) >> header->obj_order; - - result = end_seg - start_seg + 1; - if (result > (u64) INT_MAX) - return -ERANGE; - - return (int) result; -} - /* * returns the size of an object in the image */ @@ -1216,52 +1182,6 @@ static void rbd_osd_req_op_destroy(struct ceph_osd_req_op *op) kfree(op); } -static void rbd_coll_end_req_index(struct request *rq, - struct rbd_req_coll *coll, - int index, - s32 ret, u64 len) -{ - struct request_queue *q; - int min, max, i; - - dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n", - coll, index, (int)ret, (unsigned long long)len); - - if (!rq) - return; - - if (!coll) { - blk_end_request(rq, ret, len); - return; - } - - q = rq->q; - - spin_lock_irq(q->queue_lock); - coll->status[index].done = 1; - coll->status[index].rc = ret; - coll->status[index].bytes = len; - max = min = coll->num_done; - while (max < coll->total && coll->status[max].done) - max++; - - for (i = min; i<max; i++) { - __blk_end_request(rq, (int)coll->status[i].rc, - coll->status[i].bytes); - coll->num_done++; - kref_put(&coll->kref, rbd_coll_release); - } - spin_unlock_irq(q->queue_lock); -} - -static void rbd_coll_end_req(struct rbd_request *rbd_req, - s32 ret, u64 len) -{ - rbd_coll_end_req_index(rbd_req->rq, - rbd_req->coll, rbd_req->coll_index, - ret, len); -} - /* * Send ceph osd request */ @@ -1361,46 +1281,6 @@ done_osd_req: return ret; } -/* - * Ceph osd op callback - */ -static void rbd_req_cb(struct ceph_osd_request *osd_req, struct ceph_msg *msg) -{ - struct rbd_request *rbd_req = osd_req->r_priv; - struct ceph_osd_reply_head *replyhead; - struct ceph_osd_op *op; - s32 rc; - u64 bytes; - int read_op; - - /* parse reply */ - replyhead = msg->front.iov_base; - WARN_ON(le32_to_cpu(replyhead->num_ops) == 0); - op = (void *)(replyhead + 1); - rc = (s32)le32_to_cpu(replyhead->result); - bytes = le64_to_cpu(op->extent.length); - read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ); - - dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n", - (unsigned long long) bytes, read_op, (int) rc); - - if (rc == (s32)-ENOENT && read_op) { - zero_bio_chain(rbd_req->bio, 0); - rc = 0; - } else if (rc == 0 && read_op && bytes < rbd_req->len) { - zero_bio_chain(rbd_req->bio, bytes); - bytes = rbd_req->len; - } - - rbd_coll_end_req(rbd_req, rc, bytes); - - if (rbd_req->bio) - bio_chain_put(rbd_req->bio); - - ceph_osdc_put_request(osd_req); - kfree(rbd_req); -} - static void rbd_simple_req_cb(struct ceph_osd_request *osd_req, struct ceph_msg *msg) { @@ -1448,70 +1328,6 @@ done: return ret; } -/* - * Do an asynchronous ceph osd operation - */ -static int rbd_do_op(struct request *rq, - struct rbd_device *rbd_dev, - struct ceph_snap_context *snapc, - u64 ofs, u64 len, - struct bio *bio, - struct rbd_req_coll *coll, - int coll_index) -{ - const char *seg_name; - u64 seg_ofs; - u64 seg_len; - int ret; - struct ceph_osd_req_op *op; - int opcode; - int flags; - u64 snapid; - - seg_name = rbd_segment_name(rbd_dev, ofs); - if (!seg_name) - return -ENOMEM; - seg_len = rbd_segment_length(rbd_dev, ofs, len); - seg_ofs = rbd_segment_offset(rbd_dev, ofs); - - if (rq_data_dir(rq) == WRITE) { - opcode = CEPH_OSD_OP_WRITE; - flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK; - snapid = CEPH_NOSNAP; - } else { - opcode = CEPH_OSD_OP_READ; - flags = CEPH_OSD_FLAG_READ; - rbd_assert(!snapc); - snapid = rbd_dev->spec->snap_id; - } - - ret = -ENOMEM; - op = rbd_osd_req_op_create(opcode, seg_ofs, seg_len); - if (!op) - goto done; - - /* we've taken care of segment sizes earlier when we - cloned the bios. We should never have a segment - truncated at this point */ - rbd_assert(seg_len == len); - - ret = rbd_do_request(rq, rbd_dev, snapc, snapid, - seg_name, seg_ofs, seg_len, - bio, - NULL, 0, - flags, - op, - coll, coll_index, - rbd_req_cb, NULL); - if (ret < 0) - rbd_coll_end_req_index(rq, coll, coll_index, - (s32)ret, seg_len); - rbd_osd_req_op_destroy(op); -done: - kfree(seg_name); - return ret; -} - static int rbd_obj_request_submit(struct ceph_osd_client *osdc, struct rbd_obj_request *obj_request) { @@ -1683,78 +1499,6 @@ static int rbd_req_sync_exec(struct rbd_device *rbd_dev, return ret; } -static struct rbd_req_coll *rbd_alloc_coll(int num_reqs) -{ - struct rbd_req_coll *coll = - kzalloc(sizeof(struct rbd_req_coll) + - sizeof(struct rbd_req_status) * num_reqs, - GFP_ATOMIC); - - if (!coll) - return NULL; - coll->total = num_reqs; - kref_init(&coll->kref); - return coll; -} - -static int rbd_dev_do_request(struct request *rq, - struct rbd_device *rbd_dev, - struct ceph_snap_context *snapc, - u64 ofs, unsigned int size, - struct bio *bio_chain) -{ - int num_segs; - struct rbd_req_coll *coll; - unsigned int bio_offset; - int cur_seg = 0; - - dout("%s 0x%x bytes at 0x%llx\n", - rq_data_dir(rq) == WRITE ? "write" : "read", - size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE); - - num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size); - if (num_segs <= 0) - return num_segs; - - coll = rbd_alloc_coll(num_segs); - if (!coll) - return -ENOMEM; - - bio_offset = 0; - do { - u64 limit = rbd_segment_length(rbd_dev, ofs, size); - unsigned int clone_size; - struct bio *bio_clone; - - BUG_ON(limit > (u64)UINT_MAX); - clone_size = (unsigned int)limit; - dout("bio_chain->bi_vcnt=%hu\n", bio_chain->bi_vcnt); - - kref_get(&coll->kref); - - /* Pass a cloned bio chain via an osd request */ - - bio_clone = bio_chain_clone_range(&bio_chain, - &bio_offset, clone_size, - GFP_ATOMIC); - if (bio_clone) - (void)rbd_do_op(rq, rbd_dev, snapc, - ofs, clone_size, - bio_clone, coll, cur_seg); - else - rbd_coll_end_req_index(rq, coll, cur_seg, - (s32)-ENOMEM, - clone_size); - size -= clone_size; - ofs += clone_size; - - cur_seg++; - } while (size > 0); - kref_put(&coll->kref, rbd_coll_release); - - return 0; -} - static void rbd_osd_read_callback(struct rbd_obj_request *obj_request, struct ceph_osd_op *op) { @@ -2237,68 +1981,6 @@ end_request: } /* - * block device queue callback - */ -static void rbd_rq_fn(struct request_queue *q) -{ - struct rbd_device *rbd_dev = q->queuedata; - bool read_only = rbd_dev->mapping.read_only; - struct request *rq; - - while ((rq = blk_fetch_request(q))) { - struct ceph_snap_context *snapc = NULL; - unsigned int size = 0; - int result; - - dout("fetched request\n"); - - /* Filter out block requests we don't understand */ - - if ((rq->cmd_type != REQ_TYPE_FS)) { - __blk_end_request_all(rq, 0); - continue; - } - spin_unlock_irq(q->queue_lock); - - /* Write requests need a reference to the snapshot context */ - - if (rq_data_dir(rq) == WRITE) { - result = -EROFS; - if (read_only) /* Can't write to a read-only device */ - goto out_end_request; - - /* - * Note that each osd request will take its - * own reference to the snapshot context - * supplied. The reference we take here - * just guarantees the one we provide stays - * valid. - */ - down_read(&rbd_dev->header_rwsem); - snapc = ceph_get_snap_context(rbd_dev->header.snapc); - up_read(&rbd_dev->header_rwsem); - rbd_assert(snapc != NULL); - } else if (!atomic_read(&rbd_dev->exists)) { - rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP); - dout("request for non-existent snapshot"); - result = -ENXIO; - goto out_end_request; - } - - size = blk_rq_bytes(rq); - result = rbd_dev_do_request(rq, rbd_dev, snapc, - blk_rq_pos(rq) * SECTOR_SIZE, - size, rq->bio); -out_end_request: - if (snapc) - ceph_put_snap_context(snapc); - spin_lock_irq(q->queue_lock); - if (!size || result < 0) - __blk_end_request_all(rq, result); - } -} - -/* * a queue callback. Makes sure that we don't create a bio that spans across * multiple osd objects. One exception would be with a single page bios, * which we handle later at bio_chain_clone_range() @@ -2547,7 +2229,6 @@ static int rbd_init_disk(struct rbd_device *rbd_dev) disk->fops = &rbd_bd_ops; disk->private_data = rbd_dev; - (void) rbd_rq_fn; /* avoid a warning */ q = blk_init_queue(rbd_request_fn, &rbd_dev->lock); if (!q) goto out_disk; -- 1.7.9.5 -- To unsubscribe from this list: send the line "unsubscribe ceph-devel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html