Re: krbd blk-mq support ?

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



If you're willing to experiment give the patches below a try, not that
I don't have a ceph test cluster available, so the conversion is
untestested.

>From 00668f00afc6f0cfbce05d1186116469c1f3f9b3 Mon Sep 17 00:00:00 2001
From: Christoph Hellwig <hch@xxxxxx>
Date: Fri, 24 Oct 2014 11:53:36 +0200
Subject: blk-mq: handle single queue case in blk_mq_hctx_next_cpu

Don't duplicate the code to handle the not cpu bounce case in the
caller, do it inside blk_mq_hctx_next_cpu instead.

Signed-off-by: Christoph Hellwig <hch@xxxxxx>
---
 block/blk-mq.c | 34 +++++++++++++---------------------
 1 file changed, 13 insertions(+), 21 deletions(-)

diff --git a/block/blk-mq.c b/block/blk-mq.c
index 68929ba..eaaedea 100644
--- a/block/blk-mq.c
+++ b/block/blk-mq.c
@@ -760,10 +760,11 @@ static void __blk_mq_run_hw_queue(struct blk_mq_hw_ctx *hctx)
  */
 static int blk_mq_hctx_next_cpu(struct blk_mq_hw_ctx *hctx)
 {
-	int cpu = hctx->next_cpu;
+	if (hctx->queue->nr_hw_queues == 1)
+		return WORK_CPU_UNBOUND;
 
 	if (--hctx->next_cpu_batch <= 0) {
-		int next_cpu;
+		int cpu = hctx->next_cpu, next_cpu;
 
 		next_cpu = cpumask_next(hctx->next_cpu, hctx->cpumask);
 		if (next_cpu >= nr_cpu_ids)
@@ -771,9 +772,11 @@ static int blk_mq_hctx_next_cpu(struct blk_mq_hw_ctx *hctx)
 
 		hctx->next_cpu = next_cpu;
 		hctx->next_cpu_batch = BLK_MQ_CPU_WORK_BATCH;
+	
+		return cpu;
 	}
 
-	return cpu;
+	return hctx->next_cpu;
 }
 
 void blk_mq_run_hw_queue(struct blk_mq_hw_ctx *hctx, bool async)
@@ -781,16 +784,13 @@ void blk_mq_run_hw_queue(struct blk_mq_hw_ctx *hctx, bool async)
 	if (unlikely(test_bit(BLK_MQ_S_STOPPED, &hctx->state)))
 		return;
 
-	if (!async && cpumask_test_cpu(smp_processor_id(), hctx->cpumask))
+	if (!async && cpumask_test_cpu(smp_processor_id(), hctx->cpumask)) {
 		__blk_mq_run_hw_queue(hctx);
-	else if (hctx->queue->nr_hw_queues == 1)
-		kblockd_schedule_delayed_work(&hctx->run_work, 0);
-	else {
-		unsigned int cpu;
-
-		cpu = blk_mq_hctx_next_cpu(hctx);
-		kblockd_schedule_delayed_work_on(cpu, &hctx->run_work, 0);
+		return;
 	}
+
+	kblockd_schedule_delayed_work_on(blk_mq_hctx_next_cpu(hctx),
+			&hctx->run_work, 0);
 }
 
 void blk_mq_run_queues(struct request_queue *q, bool async)
@@ -888,16 +888,8 @@ static void blk_mq_delay_work_fn(struct work_struct *work)
 
 void blk_mq_delay_queue(struct blk_mq_hw_ctx *hctx, unsigned long msecs)
 {
-	unsigned long tmo = msecs_to_jiffies(msecs);
-
-	if (hctx->queue->nr_hw_queues == 1)
-		kblockd_schedule_delayed_work(&hctx->delay_work, tmo);
-	else {
-		unsigned int cpu;
-
-		cpu = blk_mq_hctx_next_cpu(hctx);
-		kblockd_schedule_delayed_work_on(cpu, &hctx->delay_work, tmo);
-	}
+	kblockd_schedule_delayed_work_on(blk_mq_hctx_next_cpu(hctx),
+			&hctx->delay_work, msecs_to_jiffies(msecs));
 }
 EXPORT_SYMBOL(blk_mq_delay_queue);
 
-- 
1.9.1

>From 6002e20c4d2b150fcbe82a7bc45c90d30cb61b78 Mon Sep 17 00:00:00 2001
From: Christoph Hellwig <hch@xxxxxx>
Date: Fri, 24 Oct 2014 12:04:07 +0200
Subject: blk-mq: allow direct dispatch to a driver specific workqueue

We have various block drivers that need to execute long term blocking
operations during I/O submission like file system or network I/O.

Currently these drivers just queue up work to an internal workqueue
from their request_fn.  With blk-mq we can make sure they always get
called on their own workqueue directly for I/O submission by:

 1) adding a flag to prevent inline submission of I/O, and
 2) allowing the driver to pass in a workqueue in the tag_set that
    will be used instead of kblockd.

Signed-off-by: Christoph Hellwig <hch@xxxxxx>
---
 block/blk-core.c       |  2 +-
 block/blk-mq.c         | 12 +++++++++---
 block/blk.h            |  1 +
 include/linux/blk-mq.h |  4 ++++
 4 files changed, 15 insertions(+), 4 deletions(-)

diff --git a/block/blk-core.c b/block/blk-core.c
index 0421b53..7f7249f 100644
--- a/block/blk-core.c
+++ b/block/blk-core.c
@@ -61,7 +61,7 @@ struct kmem_cache *blk_requestq_cachep;
 /*
  * Controlling structure to kblockd
  */
-static struct workqueue_struct *kblockd_workqueue;
+struct workqueue_struct *kblockd_workqueue;
 
 void blk_queue_congestion_threshold(struct request_queue *q)
 {
diff --git a/block/blk-mq.c b/block/blk-mq.c
index eaaedea..cea2f96 100644
--- a/block/blk-mq.c
+++ b/block/blk-mq.c
@@ -784,12 +784,13 @@ void blk_mq_run_hw_queue(struct blk_mq_hw_ctx *hctx, bool async)
 	if (unlikely(test_bit(BLK_MQ_S_STOPPED, &hctx->state)))
 		return;
 
-	if (!async && cpumask_test_cpu(smp_processor_id(), hctx->cpumask)) {
+	if (!async && !(hctx->flags & BLK_MQ_F_WORKQUEUE) &&
+	    cpumask_test_cpu(smp_processor_id(), hctx->cpumask)) {
 		__blk_mq_run_hw_queue(hctx);
 		return;
 	}
 
-	kblockd_schedule_delayed_work_on(blk_mq_hctx_next_cpu(hctx),
+	queue_delayed_work_on(blk_mq_hctx_next_cpu(hctx), hctx->wq,
 			&hctx->run_work, 0);
 }
 
@@ -888,7 +889,7 @@ static void blk_mq_delay_work_fn(struct work_struct *work)
 
 void blk_mq_delay_queue(struct blk_mq_hw_ctx *hctx, unsigned long msecs)
 {
-	kblockd_schedule_delayed_work_on(blk_mq_hctx_next_cpu(hctx),
+	queue_delayed_work_on(blk_mq_hctx_next_cpu(hctx), hctx->wq,
 			&hctx->delay_work, msecs_to_jiffies(msecs));
 }
 EXPORT_SYMBOL(blk_mq_delay_queue);
@@ -1551,6 +1552,11 @@ static int blk_mq_init_hctx(struct request_queue *q,
 	hctx->flags = set->flags;
 	hctx->cmd_size = set->cmd_size;
 
+	if (set->wq)
+		hctx->wq = set->wq;
+	else
+		hctx->wq = kblockd_workqueue;
+
 	blk_mq_init_cpu_notifier(&hctx->cpu_notifier,
 					blk_mq_hctx_notify, hctx);
 	blk_mq_register_cpu_notifier(&hctx->cpu_notifier);
diff --git a/block/blk.h b/block/blk.h
index 43b0361..fb46ad0 100644
--- a/block/blk.h
+++ b/block/blk.h
@@ -25,6 +25,7 @@ struct blk_flush_queue {
 	spinlock_t		mq_flush_lock;
 };
 
+extern struct workqueue_struct *kblockd_workqueue;
 extern struct kmem_cache *blk_requestq_cachep;
 extern struct kmem_cache *request_cachep;
 extern struct kobj_type blk_queue_ktype;
diff --git a/include/linux/blk-mq.h b/include/linux/blk-mq.h
index c9be158..d61ecfe 100644
--- a/include/linux/blk-mq.h
+++ b/include/linux/blk-mq.h
@@ -37,6 +37,8 @@ struct blk_mq_hw_ctx {
 	unsigned int		queue_num;
 	struct blk_flush_queue	*fq;
 
+	struct workqueue_struct	*wq;
+
 	void			*driver_data;
 
 	struct blk_mq_ctxmap	ctx_map;
@@ -64,6 +66,7 @@ struct blk_mq_hw_ctx {
 
 struct blk_mq_tag_set {
 	struct blk_mq_ops	*ops;
+	struct workqueue_struct	*wq;
 	unsigned int		nr_hw_queues;
 	unsigned int		queue_depth;	/* max hw supported */
 	unsigned int		reserved_tags;
@@ -140,6 +143,7 @@ enum {
 	BLK_MQ_F_TAG_SHARED	= 1 << 1,
 	BLK_MQ_F_SG_MERGE	= 1 << 2,
 	BLK_MQ_F_SYSFS_UP	= 1 << 3,
+	BLK_MQ_F_WORKQUEUE	= 1 << 4,
 
 	BLK_MQ_S_STOPPED	= 0,
 	BLK_MQ_S_TAG_ACTIVE	= 1,
-- 
1.9.1

>From 135c8e415d3800f33142debd93d64af246ccaa57 Mon Sep 17 00:00:00 2001
From: Christoph Hellwig <hch@xxxxxx>
Date: Fri, 24 Oct 2014 12:46:40 +0200
Subject: rbd: WIP conversion to blk-mq

---
 drivers/block/rbd.c | 106 ++++++++++++++++++++++++----------------------------
 1 file changed, 49 insertions(+), 57 deletions(-)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 0a54c58..9321f35 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -38,6 +38,7 @@
 #include <linux/kernel.h>
 #include <linux/device.h>
 #include <linux/module.h>
+#include <linux/blk-mq.h>
 #include <linux/fs.h>
 #include <linux/blkdev.h>
 #include <linux/slab.h>
@@ -343,7 +344,6 @@ struct rbd_device {
 	struct list_head	rq_queue;	/* incoming rq queue */
 	spinlock_t		lock;		/* queue, flags, open_count */
 	struct workqueue_struct	*rq_wq;
-	struct work_struct	rq_work;
 
 	struct rbd_image_header	header;
 	unsigned long		flags;		/* possibly lock protected */
@@ -361,6 +361,9 @@ struct rbd_device {
 	atomic_t		parent_ref;
 	struct rbd_device	*parent;
 
+	/* Block layer tags. */
+	struct blk_mq_tag_set	tag_set;
+
 	/* protects updating the header */
 	struct rw_semaphore     header_rwsem;
 
@@ -1816,7 +1819,8 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
 
 	/*
 	 * We support a 64-bit length, but ultimately it has to be
-	 * passed to blk_end_request(), which takes an unsigned int.
+	 * passed to the block layer, which just supports a 32-bit
+	 * length field.
 	 */
 	obj_request->xferred = osd_req->r_reply_op_len[0];
 	rbd_assert(obj_request->xferred < (u64)UINT_MAX);
@@ -2280,7 +2284,10 @@ static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
 		more = obj_request->which < img_request->obj_request_count - 1;
 	} else {
 		rbd_assert(img_request->rq != NULL);
-		more = blk_end_request(img_request->rq, result, xferred);
+	
+		more = blk_update_request(img_request->rq, result, xferred);
+		if (!more)
+			__blk_mq_end_request(img_request->rq, result);
 	}
 
 	return more;
@@ -3305,8 +3312,10 @@ out:
 	return ret;
 }
 
-static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
+static int rbd_queue_rq(struct blk_mq_hw_ctx *hctx, struct request *rq,
+		bool last)
 {
+	struct rbd_device *rbd_dev = rq->q->queuedata;
 	struct rbd_img_request *img_request;
 	struct ceph_snap_context *snapc = NULL;
 	u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
@@ -3314,6 +3323,12 @@ static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
 	enum obj_operation_type op_type;
 	u64 mapping_size;
 	int result;
+		
+	if (rq->cmd_type != REQ_TYPE_FS) {
+		dout("%s: non-fs request type %d\n", __func__,
+			(int) rq->cmd_type);
+		return BLK_MQ_RQ_QUEUE_ERROR;
+	}
 
 	if (rq->cmd_flags & REQ_DISCARD)
 		op_type = OBJ_OP_DISCARD;
@@ -3353,6 +3368,8 @@ static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
 		goto err_rq;
 	}
 
+	blk_mq_start_request(rq);
+
 	if (offset && length > U64_MAX - offset + 1) {
 		rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
 			 length);
@@ -3396,7 +3413,7 @@ static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
 	if (result)
 		goto err_img_request;
 
-	return;
+	return 0;
 
 err_img_request:
 	rbd_img_request_put(img_request);
@@ -3406,53 +3423,8 @@ err_rq:
 			 obj_op_name(op_type), length, offset, result);
 	if (snapc)
 		ceph_put_snap_context(snapc);
-	blk_end_request_all(rq, result);
-}
-
-static void rbd_request_workfn(struct work_struct *work)
-{
-	struct rbd_device *rbd_dev =
-	    container_of(work, struct rbd_device, rq_work);
-	struct request *rq, *next;
-	LIST_HEAD(requests);
-
-	spin_lock_irq(&rbd_dev->lock); /* rq->q->queue_lock */
-	list_splice_init(&rbd_dev->rq_queue, &requests);
-	spin_unlock_irq(&rbd_dev->lock);
-
-	list_for_each_entry_safe(rq, next, &requests, queuelist) {
-		list_del_init(&rq->queuelist);
-		rbd_handle_request(rbd_dev, rq);
-	}
-}
-
-/*
- * Called with q->queue_lock held and interrupts disabled, possibly on
- * the way to schedule().  Do not sleep here!
- */
-static void rbd_request_fn(struct request_queue *q)
-{
-	struct rbd_device *rbd_dev = q->queuedata;
-	struct request *rq;
-	int queued = 0;
-
-	rbd_assert(rbd_dev);
-
-	while ((rq = blk_fetch_request(q))) {
-		/* Ignore any non-FS requests that filter through. */
-		if (rq->cmd_type != REQ_TYPE_FS) {
-			dout("%s: non-fs request type %d\n", __func__,
-				(int) rq->cmd_type);
-			__blk_end_request_all(rq, 0);
-			continue;
-		}
-
-		list_add_tail(&rq->queuelist, &rbd_dev->rq_queue);
-		queued++;
-	}
-
-	if (queued)
-		queue_work(rbd_dev->rq_wq, &rbd_dev->rq_work);
+	blk_mq_end_request(rq, result);
+	return 0;
 }
 
 /*
@@ -3513,6 +3485,7 @@ static void rbd_free_disk(struct rbd_device *rbd_dev)
 		del_gendisk(disk);
 		if (disk->queue)
 			blk_cleanup_queue(disk->queue);
+		blk_mq_free_tag_set(&rbd_dev->tag_set);
 	}
 	put_disk(disk);
 }
@@ -3724,11 +3697,17 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev)
 	return 0;
 }
 
+static struct blk_mq_ops rbd_mq_ops = {
+	.queue_rq	= rbd_queue_rq,
+	.map_queue	= blk_mq_map_queue,
+};
+
 static int rbd_init_disk(struct rbd_device *rbd_dev)
 {
 	struct gendisk *disk;
 	struct request_queue *q;
 	u64 segment_size;
+	int err;
 
 	/* create gendisk info */
 	disk = alloc_disk(single_major ?
@@ -3746,10 +3725,23 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
 	disk->fops = &rbd_bd_ops;
 	disk->private_data = rbd_dev;
 
-	q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
-	if (!q)
+	memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
+	rbd_dev->tag_set.ops = &rbd_mq_ops;
+	rbd_dev->tag_set.queue_depth = 128; //
+	rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
+	rbd_dev->tag_set.flags =
+		BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
+	rbd_dev->tag_set.nr_hw_queues = 1;
+
+	err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
+	if (err)
 		goto out_disk;
 
+	err = -ENOMEM;
+	q = blk_mq_init_queue(&rbd_dev->tag_set);
+	if (!q)
+		goto out_tag_set;
+
 	/* We use the default size, but let's be explicit about it. */
 	blk_queue_physical_block_size(q, SECTOR_SIZE);
 
@@ -3775,10 +3767,11 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
 	rbd_dev->disk = disk;
 
 	return 0;
+out_tag_set:
+	blk_mq_free_tag_set(&rbd_dev->tag_set);
 out_disk:
 	put_disk(disk);
-
-	return -ENOMEM;
+	return err;
 }
 
 /*
@@ -4036,7 +4029,6 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
 
 	spin_lock_init(&rbd_dev->lock);
 	INIT_LIST_HEAD(&rbd_dev->rq_queue);
-	INIT_WORK(&rbd_dev->rq_work, rbd_request_workfn);
 	rbd_dev->flags = 0;
 	atomic_set(&rbd_dev->parent_ref, 0);
 	INIT_LIST_HEAD(&rbd_dev->node);
-- 
1.9.1


[Index of Archives]     [CEPH Users]     [Ceph Large]     [Information on CEPH]     [Linux BTRFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]
  Powered by Linux