[PATCH] rbd: rework rbd_request_fn()

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



While it was never a good idea to sleep in request_fn(), commit
34c6bc2c919a ("locking/mutexes: Add extra reschedule point") made it
a *bad* idea.  mutex_lock() since 3.15 may reschedule *before* putting
task on the mutex wait queue, which for tasks in !TASK_RUNNING state
means block forever.  request_fn() may be called with !TASK_RUNNING on
the way to schedule() in io_schedule().

Offload request handling to a workqueue, one per rbd device, to avoid
calling blocking primitives from rbd_request_fn().

Cc: stable@xxxxxxxxxxxxxxx # 3.15+
Signed-off-by: Ilya Dryomov <ilya.dryomov@xxxxxxxxxxx>
---
 drivers/block/rbd.c |  195 +++++++++++++++++++++++++++++++--------------------
 1 file changed, 118 insertions(+), 77 deletions(-)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index e3412317d3f9..e07d9d71b187 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -42,6 +42,7 @@
 #include <linux/blkdev.h>
 #include <linux/slab.h>
 #include <linux/idr.h>
+#include <linux/workqueue.h>
 
 #include "rbd_types.h"
 
@@ -332,7 +333,10 @@ struct rbd_device {
 
 	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
 
+	struct list_head	rq_queue;	/* incoming rq queue */
 	spinlock_t		lock;		/* queue, flags, open_count */
+	struct workqueue_struct	*rq_wq;
+	struct work_struct	rq_work;
 
 	struct rbd_image_header	header;
 	unsigned long		flags;		/* possibly lock protected */
@@ -3176,102 +3180,129 @@ out:
 	return ret;
 }
 
-static void rbd_request_fn(struct request_queue *q)
-		__releases(q->queue_lock) __acquires(q->queue_lock)
+static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
 {
-	struct rbd_device *rbd_dev = q->queuedata;
-	struct request *rq;
+	struct rbd_img_request *img_request;
+	u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
+	u64 length = blk_rq_bytes(rq);
+	bool wr = rq_data_dir(rq) == WRITE;
 	int result;
 
-	while ((rq = blk_fetch_request(q))) {
-		bool write_request = rq_data_dir(rq) == WRITE;
-		struct rbd_img_request *img_request;
-		u64 offset;
-		u64 length;
+	/* Ignore/skip any zero-length requests */
 
-		/* Ignore any non-FS requests that filter through. */
+	if (!length) {
+		dout("%s: zero-length request\n", __func__);
+		result = 0;
+		goto err_rq;
+	}
 
-		if (rq->cmd_type != REQ_TYPE_FS) {
-			dout("%s: non-fs request type %d\n", __func__,
-				(int) rq->cmd_type);
-			__blk_end_request_all(rq, 0);
-			continue;
+	/* Disallow writes to a read-only device */
+
+	if (wr) {
+		if (rbd_dev->mapping.read_only) {
+			result = -EROFS;
+			goto err_rq;
 		}
+		rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
+	}
 
-		/* Ignore/skip any zero-length requests */
+	/*
+	 * Quit early if the mapped snapshot no longer exists.  It's
+	 * still possible the snapshot will have disappeared by the
+	 * time our request arrives at the osd, but there's no sense in
+	 * sending it if we already know.
+	 */
+	if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
+		dout("request for non-existent snapshot");
+		rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
+		result = -ENXIO;
+		goto err_rq;
+	}
 
-		offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
-		length = (u64) blk_rq_bytes(rq);
+	if (offset && length > U64_MAX - offset + 1) {
+		rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
+			 length);
+		result = -EINVAL;
+		goto err_rq;	/* Shouldn't happen */
+	}
 
-		if (!length) {
-			dout("%s: zero-length request\n", __func__);
-			__blk_end_request_all(rq, 0);
-			continue;
-		}
+	if (offset + length > rbd_dev->mapping.size) {
+		rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
+			 length, rbd_dev->mapping.size);
+		result = -EIO;
+		goto err_rq;
+	}
 
-		spin_unlock_irq(q->queue_lock);
+	img_request = rbd_img_request_create(rbd_dev, offset, length, wr);
+	if (!img_request) {
+		result = -ENOMEM;
+		goto err_rq;
+	}
+	img_request->rq = rq;
 
-		/* Disallow writes to a read-only device */
+	result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, rq->bio);
+	if (result)
+		goto err_img_request;
 
-		if (write_request) {
-			result = -EROFS;
-			if (rbd_dev->mapping.read_only)
-				goto end_request;
-			rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
-		}
+	result = rbd_img_request_submit(img_request);
+	if (result)
+		goto err_img_request;
 
-		/*
-		 * Quit early if the mapped snapshot no longer
-		 * exists.  It's still possible the snapshot will
-		 * have disappeared by the time our request arrives
-		 * at the osd, but there's no sense in sending it if
-		 * we already know.
-		 */
-		if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
-			dout("request for non-existent snapshot");
-			rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
-			result = -ENXIO;
-			goto end_request;
-		}
+	return;
 
-		result = -EINVAL;
-		if (offset && length > U64_MAX - offset + 1) {
-			rbd_warn(rbd_dev, "bad request range (%llu~%llu)",
-				offset, length);
-			goto end_request;	/* Shouldn't happen */
-		}
+err_img_request:
+	rbd_img_request_put(img_request);
+err_rq:
+	if (result)
+		rbd_warn(rbd_dev, "%s %llx at %llx result %d",
+			 wr ? "write" : "read", length, offset, result);
+	blk_end_request_all(rq, result);
+}
 
-		result = -EIO;
-		if (offset + length > rbd_dev->mapping.size) {
-			rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)",
-				offset, length, rbd_dev->mapping.size);
-			goto end_request;
-		}
+static void rbd_request_workfn(struct work_struct *work)
+{
+	struct rbd_device *rbd_dev =
+	    container_of(work, struct rbd_device, rq_work);
+	struct request *rq, *next;
+	LIST_HEAD(requests);
 
-		result = -ENOMEM;
-		img_request = rbd_img_request_create(rbd_dev, offset, length,
-							write_request);
-		if (!img_request)
-			goto end_request;
+	spin_lock_irq(&rbd_dev->lock); /* rq->q->queue_lock */
+	list_splice_init(&rbd_dev->rq_queue, &requests);
+	spin_unlock_irq(&rbd_dev->lock);
 
-		img_request->rq = rq;
+	list_for_each_entry_safe(rq, next, &requests, queuelist) {
+		list_del_init(&rq->queuelist);
+		rbd_handle_request(rbd_dev, rq);
+	}
+}
 
-		result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
-						rq->bio);
-		if (!result)
-			result = rbd_img_request_submit(img_request);
-		if (result)
-			rbd_img_request_put(img_request);
-end_request:
-		spin_lock_irq(q->queue_lock);
-		if (result < 0) {
-			rbd_warn(rbd_dev, "%s %llx at %llx result %d",
-				write_request ? "write" : "read",
-				length, offset, result);
-
-			__blk_end_request_all(rq, result);
+/*
+ * Called with q->queue_lock held and interrupts disabled, possibly on
+ * the way to schedule().  Do not sleep here!
+ */
+static void rbd_request_fn(struct request_queue *q)
+{
+	struct rbd_device *rbd_dev = q->queuedata;
+	struct request *rq;
+	int queued = 0;
+
+	rbd_assert(rbd_dev);
+
+	while ((rq = blk_fetch_request(q))) {
+		/* Ignore any non-FS requests that filter through. */
+		if (rq->cmd_type != REQ_TYPE_FS) {
+			dout("%s: non-fs request type %d\n", __func__,
+				(int) rq->cmd_type);
+			__blk_end_request_all(rq, 0);
+			continue;
 		}
+
+		list_add_tail(&rq->queuelist, &rbd_dev->rq_queue);
+		queued++;
 	}
+
+	if (queued)
+		queue_work(rbd_dev->rq_wq, &rbd_dev->rq_work);
 }
 
 /*
@@ -3847,6 +3878,8 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
 		return NULL;
 
 	spin_lock_init(&rbd_dev->lock);
+	INIT_LIST_HEAD(&rbd_dev->rq_queue);
+	INIT_WORK(&rbd_dev->rq_work, rbd_request_workfn);
 	rbd_dev->flags = 0;
 	atomic_set(&rbd_dev->parent_ref, 0);
 	INIT_LIST_HEAD(&rbd_dev->node);
@@ -5050,12 +5083,17 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
 	ret = rbd_dev_mapping_set(rbd_dev);
 	if (ret)
 		goto err_out_disk;
+
 	set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
 	set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
 
+	rbd_dev->rq_wq = alloc_workqueue(rbd_dev->disk->disk_name, 0, 0);
+	if (!rbd_dev->rq_wq)
+		goto err_out_mapping;
+
 	ret = rbd_bus_add_dev(rbd_dev);
 	if (ret)
-		goto err_out_mapping;
+		goto err_out_workqueue;
 
 	/* Everything's ready.  Announce the disk to the world. */
 
@@ -5067,6 +5105,8 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
 
 	return ret;
 
+err_out_workqueue:
+	destroy_workqueue(rbd_dev->rq_wq);
 err_out_mapping:
 	rbd_dev_mapping_clear(rbd_dev);
 err_out_disk:
@@ -5313,6 +5353,7 @@ static void rbd_dev_device_release(struct device *dev)
 {
 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
+	destroy_workqueue(rbd_dev->rq_wq);
 	rbd_free_disk(rbd_dev);
 	clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
 	rbd_dev_mapping_clear(rbd_dev);
-- 
1.7.10.4

--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html




[Index of Archives]     [CEPH Users]     [Ceph Large]     [Information on CEPH]     [Linux BTRFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]

  Powered by Linux