Re: krbd blk-mq support ?

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Hi Alexandre,

can you try the patch below instead of the previous three patches?
This one uses a per-request work struct to allow for more concurrency.

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 0a54c58..b981096 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -38,6 +38,7 @@
 #include <linux/kernel.h>
 #include <linux/device.h>
 #include <linux/module.h>
+#include <linux/blk-mq.h>
 #include <linux/fs.h>
 #include <linux/blkdev.h>
 #include <linux/slab.h>
@@ -343,7 +344,6 @@ struct rbd_device {
 	struct list_head	rq_queue;	/* incoming rq queue */
 	spinlock_t		lock;		/* queue, flags, open_count */
 	struct workqueue_struct	*rq_wq;
-	struct work_struct	rq_work;
 
 	struct rbd_image_header	header;
 	unsigned long		flags;		/* possibly lock protected */
@@ -361,6 +361,9 @@ struct rbd_device {
 	atomic_t		parent_ref;
 	struct rbd_device	*parent;
 
+	/* Block layer tags. */
+	struct blk_mq_tag_set	tag_set;
+
 	/* protects updating the header */
 	struct rw_semaphore     header_rwsem;
 
@@ -1816,7 +1819,8 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
 
 	/*
 	 * We support a 64-bit length, but ultimately it has to be
-	 * passed to blk_end_request(), which takes an unsigned int.
+	 * passed to the block layer, which just supports a 32-bit
+	 * length field.
 	 */
 	obj_request->xferred = osd_req->r_reply_op_len[0];
 	rbd_assert(obj_request->xferred < (u64)UINT_MAX);
@@ -2280,7 +2284,10 @@ static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
 		more = obj_request->which < img_request->obj_request_count - 1;
 	} else {
 		rbd_assert(img_request->rq != NULL);
-		more = blk_end_request(img_request->rq, result, xferred);
+	
+		more = blk_update_request(img_request->rq, result, xferred);
+		if (!more)
+			__blk_mq_end_request(img_request->rq, result);
 	}
 
 	return more;
@@ -3305,8 +3312,10 @@ out:
 	return ret;
 }
 
-static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
+static void rbd_queue_workfn(struct work_struct *work)
 {
+	struct request *rq = blk_mq_rq_from_pdu(work);
+	struct rbd_device *rbd_dev = rq->q->queuedata;
 	struct rbd_img_request *img_request;
 	struct ceph_snap_context *snapc = NULL;
 	u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
@@ -3314,6 +3323,13 @@ static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
 	enum obj_operation_type op_type;
 	u64 mapping_size;
 	int result;
+		
+	if (rq->cmd_type != REQ_TYPE_FS) {
+		dout("%s: non-fs request type %d\n", __func__,
+			(int) rq->cmd_type);
+		result = -EIO;
+		goto err;
+	}
 
 	if (rq->cmd_flags & REQ_DISCARD)
 		op_type = OBJ_OP_DISCARD;
@@ -3353,6 +3369,8 @@ static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
 		goto err_rq;
 	}
 
+	blk_mq_start_request(rq);
+
 	if (offset && length > U64_MAX - offset + 1) {
 		rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
 			 length);
@@ -3406,53 +3424,18 @@ err_rq:
 			 obj_op_name(op_type), length, offset, result);
 	if (snapc)
 		ceph_put_snap_context(snapc);
-	blk_end_request_all(rq, result);
+err:
+	blk_mq_end_request(rq, result);
 }
 
-static void rbd_request_workfn(struct work_struct *work)
+static int rbd_queue_rq(struct blk_mq_hw_ctx *hctx, struct request *rq,
+		bool last)
 {
-	struct rbd_device *rbd_dev =
-	    container_of(work, struct rbd_device, rq_work);
-	struct request *rq, *next;
-	LIST_HEAD(requests);
-
-	spin_lock_irq(&rbd_dev->lock); /* rq->q->queue_lock */
-	list_splice_init(&rbd_dev->rq_queue, &requests);
-	spin_unlock_irq(&rbd_dev->lock);
-
-	list_for_each_entry_safe(rq, next, &requests, queuelist) {
-		list_del_init(&rq->queuelist);
-		rbd_handle_request(rbd_dev, rq);
-	}
-}
+	struct rbd_device *rbd_dev = rq->q->queuedata;
+	struct work_struct *work = blk_mq_rq_to_pdu(rq);
 
-/*
- * Called with q->queue_lock held and interrupts disabled, possibly on
- * the way to schedule().  Do not sleep here!
- */
-static void rbd_request_fn(struct request_queue *q)
-{
-	struct rbd_device *rbd_dev = q->queuedata;
-	struct request *rq;
-	int queued = 0;
-
-	rbd_assert(rbd_dev);
-
-	while ((rq = blk_fetch_request(q))) {
-		/* Ignore any non-FS requests that filter through. */
-		if (rq->cmd_type != REQ_TYPE_FS) {
-			dout("%s: non-fs request type %d\n", __func__,
-				(int) rq->cmd_type);
-			__blk_end_request_all(rq, 0);
-			continue;
-		}
-
-		list_add_tail(&rq->queuelist, &rbd_dev->rq_queue);
-		queued++;
-	}
-
-	if (queued)
-		queue_work(rbd_dev->rq_wq, &rbd_dev->rq_work);
+	queue_work(rbd_dev->rq_wq, work);
+	return 0;
 }
 
 /*
@@ -3513,6 +3496,7 @@ static void rbd_free_disk(struct rbd_device *rbd_dev)
 		del_gendisk(disk);
 		if (disk->queue)
 			blk_cleanup_queue(disk->queue);
+		blk_mq_free_tag_set(&rbd_dev->tag_set);
 	}
 	put_disk(disk);
 }
@@ -3724,11 +3708,28 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev)
 	return 0;
 }
 
+static int rbd_init_request(void *data, struct request *rq,
+		unsigned int hctx_idx, unsigned int request_idx,
+		unsigned int numa_node)
+{
+	struct work_struct *work = blk_mq_rq_to_pdu(rq);
+
+	INIT_WORK(work, rbd_queue_workfn);
+	return 0;
+}
+
+static struct blk_mq_ops rbd_mq_ops = {
+	.queue_rq	= rbd_queue_rq,
+	.map_queue	= blk_mq_map_queue,
+	.init_request	= rbd_init_request,
+};
+
 static int rbd_init_disk(struct rbd_device *rbd_dev)
 {
 	struct gendisk *disk;
 	struct request_queue *q;
 	u64 segment_size;
+	int err;
 
 	/* create gendisk info */
 	disk = alloc_disk(single_major ?
@@ -3746,10 +3747,24 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
 	disk->fops = &rbd_bd_ops;
 	disk->private_data = rbd_dev;
 
-	q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
-	if (!q)
+	memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
+	rbd_dev->tag_set.ops = &rbd_mq_ops;
+	rbd_dev->tag_set.queue_depth = 128; //
+	rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
+	rbd_dev->tag_set.flags =
+		BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
+	rbd_dev->tag_set.nr_hw_queues = 1;
+	rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
+
+	err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
+	if (err)
 		goto out_disk;
 
+	err = -ENOMEM;
+	q = blk_mq_init_queue(&rbd_dev->tag_set);
+	if (!q)
+		goto out_tag_set;
+
 	/* We use the default size, but let's be explicit about it. */
 	blk_queue_physical_block_size(q, SECTOR_SIZE);
 
@@ -3775,10 +3790,11 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
 	rbd_dev->disk = disk;
 
 	return 0;
+out_tag_set:
+	blk_mq_free_tag_set(&rbd_dev->tag_set);
 out_disk:
 	put_disk(disk);
-
-	return -ENOMEM;
+	return err;
 }
 
 /*
@@ -4036,7 +4052,6 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
 
 	spin_lock_init(&rbd_dev->lock);
 	INIT_LIST_HEAD(&rbd_dev->rq_queue);
-	INIT_WORK(&rbd_dev->rq_work, rbd_request_workfn);
 	rbd_dev->flags = 0;
 	atomic_set(&rbd_dev->parent_ref, 0);
 	INIT_LIST_HEAD(&rbd_dev->node);
-- 
1.9.1

--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html




[Index of Archives]     [CEPH Users]     [Ceph Large]     [Information on CEPH]     [Linux BTRFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]
  Powered by Linux