[PATCH RFC v2 14/19] fuse: {uring} Allow to queue to the ring

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



This enables enqueuing requests through fuse uring queues.

For initial simplicity requests are always allocated the normal way
then added to ring queues lists and only then copied to ring queue
entries. Later on the allocation and adding the requests to a list
can be avoided, by directly using a ring entry. This introduces
some code complexity and is therefore not done for now.

Signed-off-by: Bernd Schubert <bschubert@xxxxxxx>
---
 fs/fuse/dev.c         | 80 +++++++++++++++++++++++++++++++++++++++-----
 fs/fuse/dev_uring.c   | 92 ++++++++++++++++++++++++++++++++++++++++++---------
 fs/fuse/dev_uring_i.h | 17 ++++++++++
 3 files changed, 165 insertions(+), 24 deletions(-)

diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
index 6ffd216b27c8..c7fd3849a105 100644
--- a/fs/fuse/dev.c
+++ b/fs/fuse/dev.c
@@ -218,13 +218,29 @@ const struct fuse_iqueue_ops fuse_dev_fiq_ops = {
 };
 EXPORT_SYMBOL_GPL(fuse_dev_fiq_ops);
 
-static void queue_request_and_unlock(struct fuse_iqueue *fiq,
-				     struct fuse_req *req)
+
+static void queue_request_and_unlock(struct fuse_conn *fc,
+				     struct fuse_req *req, bool allow_uring)
 __releases(fiq->lock)
 {
+	struct fuse_iqueue *fiq = &fc->iq;
+
 	req->in.h.len = sizeof(struct fuse_in_header) +
 		fuse_len_args(req->args->in_numargs,
 			      (struct fuse_arg *) req->args->in_args);
+
+	if (allow_uring && fuse_uring_ready(fc)) {
+		int res;
+
+		/* this lock is not needed at all for ring req handling */
+		spin_unlock(&fiq->lock);
+		res = fuse_uring_queue_fuse_req(fc, req);
+		if (!res)
+			return;
+
+		/* fallthrough, handled through /dev/fuse read/write */
+	}
+
 	list_add_tail(&req->list, &fiq->pending);
 	fiq->ops->wake_pending_and_unlock(fiq);
 }
@@ -261,7 +277,7 @@ static void flush_bg_queue(struct fuse_conn *fc)
 		fc->active_background++;
 		spin_lock(&fiq->lock);
 		req->in.h.unique = fuse_get_unique(fiq);
-		queue_request_and_unlock(fiq, req);
+		queue_request_and_unlock(fc, req, true);
 	}
 }
 
@@ -405,7 +421,8 @@ static void request_wait_answer(struct fuse_req *req)
 
 static void __fuse_request_send(struct fuse_req *req)
 {
-	struct fuse_iqueue *fiq = &req->fm->fc->iq;
+	struct fuse_conn *fc = req->fm->fc;
+	struct fuse_iqueue *fiq = &fc->iq;
 
 	BUG_ON(test_bit(FR_BACKGROUND, &req->flags));
 	spin_lock(&fiq->lock);
@@ -417,7 +434,7 @@ static void __fuse_request_send(struct fuse_req *req)
 		/* acquire extra reference, since request is still needed
 		   after fuse_request_end() */
 		__fuse_get_request(req);
-		queue_request_and_unlock(fiq, req);
+		queue_request_and_unlock(fc, req, true);
 
 		request_wait_answer(req);
 		/* Pairs with smp_wmb() in fuse_request_end() */
@@ -487,6 +504,10 @@ ssize_t fuse_simple_request(struct fuse_mount *fm, struct fuse_args *args)
 	if (args->force) {
 		atomic_inc(&fc->num_waiting);
 		req = fuse_request_alloc(fm, GFP_KERNEL | __GFP_NOFAIL);
+		if (unlikely(!req)) {
+			ret = -ENOTCONN;
+			goto err;
+		}
 
 		if (!args->nocreds)
 			fuse_force_creds(req);
@@ -514,16 +535,55 @@ ssize_t fuse_simple_request(struct fuse_mount *fm, struct fuse_args *args)
 	}
 	fuse_put_request(req);
 
+err:
 	return ret;
 }
 
-static bool fuse_request_queue_background(struct fuse_req *req)
+static bool fuse_request_queue_background_uring(struct fuse_conn *fc,
+					       struct fuse_req *req)
+{
+	struct fuse_iqueue *fiq = &fc->iq;
+	int err;
+
+	req->in.h.unique = fuse_get_unique(fiq);
+	req->in.h.len = sizeof(struct fuse_in_header) +
+		fuse_len_args(req->args->in_numargs,
+			      (struct fuse_arg *) req->args->in_args);
+
+	err = fuse_uring_queue_fuse_req(fc, req);
+	if (!err) {
+		/* XXX remove and lets the users of that use per queue values -
+		 * avoid the shared spin lock...
+		 * Is this needed at all?
+		 */
+		spin_lock(&fc->bg_lock);
+		fc->num_background++;
+		fc->active_background++;
+
+
+		/* XXX block when per ring queues get occupied */
+		if (fc->num_background == fc->max_background)
+			fc->blocked = 1;
+		spin_unlock(&fc->bg_lock);
+	}
+
+	return err ? false : true;
+}
+
+/*
+ * @return true if queued
+ */
+static int fuse_request_queue_background(struct fuse_req *req)
 {
 	struct fuse_mount *fm = req->fm;
 	struct fuse_conn *fc = fm->fc;
 	bool queued = false;
 
 	WARN_ON(!test_bit(FR_BACKGROUND, &req->flags));
+
+	if (fuse_uring_ready(fc))
+		return fuse_request_queue_background_uring(fc, req);
+
 	if (!test_bit(FR_WAITING, &req->flags)) {
 		__set_bit(FR_WAITING, &req->flags);
 		atomic_inc(&fc->num_waiting);
@@ -576,7 +636,8 @@ static int fuse_simple_notify_reply(struct fuse_mount *fm,
 				    struct fuse_args *args, u64 unique)
 {
 	struct fuse_req *req;
-	struct fuse_iqueue *fiq = &fm->fc->iq;
+	struct fuse_conn *fc = fm->fc;
+	struct fuse_iqueue *fiq = &fc->iq;
 	int err = 0;
 
 	req = fuse_get_req(fm, false);
@@ -590,7 +651,8 @@ static int fuse_simple_notify_reply(struct fuse_mount *fm,
 
 	spin_lock(&fiq->lock);
 	if (fiq->connected) {
-		queue_request_and_unlock(fiq, req);
+		/* uring for notify not supported yet */
+		queue_request_and_unlock(fc, req, false);
 	} else {
 		err = -ENODEV;
 		spin_unlock(&fiq->lock);
@@ -2205,6 +2267,7 @@ void fuse_abort_conn(struct fuse_conn *fc)
 		fuse_uring_set_stopped(fc);
 
 		fuse_set_initialized(fc);
+
 		list_for_each_entry(fud, &fc->devices, entry) {
 			struct fuse_pqueue *fpq = &fud->pq;
 
@@ -2478,6 +2541,7 @@ static long fuse_uring_ioctl(struct file *file, __u32 __user *argp)
 		if (res != 0)
 			return res;
 		break;
+
 		case FUSE_URING_IOCTL_CMD_QUEUE_CFG:
 			fud->uring_dev = 1;
 			res = fuse_uring_queue_cfg(fc->ring, &cfg.qconf);
diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c
index 6001ba4d6e82..fe80e66150c3 100644
--- a/fs/fuse/dev_uring.c
+++ b/fs/fuse/dev_uring.c
@@ -32,8 +32,7 @@
 #include <linux/io_uring/cmd.h>
 
 static void fuse_uring_req_end_and_get_next(struct fuse_ring_ent *ring_ent,
-					    bool set_err, int error,
-					    unsigned int issue_flags);
+					    bool set_err, int error);
 
 static void fuse_ring_ring_ent_unset_userspace(struct fuse_ring_ent *ent)
 {
@@ -683,8 +682,7 @@ static int fuse_uring_copy_to_ring(struct fuse_ring *ring, struct fuse_req *req,
  * userspace will read it
  * This is comparable with classical read(/dev/fuse)
  */
-static void fuse_uring_send_to_ring(struct fuse_ring_ent *ring_ent,
-				    unsigned int issue_flags, bool send_in_task)
+static void fuse_uring_send_to_ring(struct fuse_ring_ent *ring_ent)
 {
 	struct fuse_ring *ring = ring_ent->queue->ring;
 	struct fuse_ring_req *rreq = ring_ent->rreq;
@@ -721,20 +719,17 @@ static void fuse_uring_send_to_ring(struct fuse_ring_ent *ring_ent,
 	rreq->in = req->in.h;
 	set_bit(FR_SENT, &req->flags);
 
-	pr_devel("%s qid=%d tag=%d state=%lu cmd-done op=%d unique=%llu issue_flags=%u\n",
+	pr_devel("%s qid=%d tag=%d state=%lu cmd-done op=%d unique=%llu\n",
 		 __func__, ring_ent->queue->qid, ring_ent->tag, ring_ent->state,
-		 rreq->in.opcode, rreq->in.unique, issue_flags);
+		 rreq->in.opcode, rreq->in.unique);
 
-	if (send_in_task)
-		io_uring_cmd_complete_in_task(ring_ent->cmd,
-					      fuse_uring_async_send_to_ring);
-	else
-		io_uring_cmd_done(ring_ent->cmd, 0, 0, issue_flags);
+	io_uring_cmd_complete_in_task(ring_ent->cmd,
+				      fuse_uring_async_send_to_ring);
 
 	return;
 
 err:
-	fuse_uring_req_end_and_get_next(ring_ent, true, err, issue_flags);
+	fuse_uring_req_end_and_get_next(ring_ent, true, err);
 }
 
 /*
@@ -811,8 +806,7 @@ static bool fuse_uring_ent_release_and_fetch(struct fuse_ring_ent *ring_ent)
  * has lock/unlock/lock to avoid holding the lock on calling fuse_request_end
  */
 static void fuse_uring_req_end_and_get_next(struct fuse_ring_ent *ring_ent,
-					    bool set_err, int error,
-					    unsigned int issue_flags)
+					    bool set_err, int error)
 {
 	struct fuse_req *req = ring_ent->fuse_req;
 	int has_next;
@@ -828,7 +822,7 @@ static void fuse_uring_req_end_and_get_next(struct fuse_ring_ent *ring_ent,
 	has_next = fuse_uring_ent_release_and_fetch(ring_ent);
 	if (has_next) {
 		/* called within uring context - use provided flags */
-		fuse_uring_send_to_ring(ring_ent, issue_flags, false);
+		fuse_uring_send_to_ring(ring_ent);
 	}
 }
 
@@ -863,7 +857,7 @@ static void fuse_uring_commit_and_release(struct fuse_dev *fud,
 out:
 	pr_devel("%s:%d ret=%zd op=%d req-ret=%d\n", __func__, __LINE__, err,
 		 req->args->opcode, req->out.h.error);
-	fuse_uring_req_end_and_get_next(ring_ent, set_err, err, issue_flags);
+	fuse_uring_req_end_and_get_next(ring_ent, set_err, err);
 }
 
 /*
@@ -1101,3 +1095,69 @@ int fuse_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags)
 	goto out;
 }
 
+int fuse_uring_queue_fuse_req(struct fuse_conn *fc, struct fuse_req *req)
+{
+	struct fuse_ring *ring = fc->ring;
+	struct fuse_ring_queue *queue;
+	int qid = 0;
+	struct fuse_ring_ent *ring_ent = NULL;
+	int res;
+	bool async = test_bit(FR_BACKGROUND, &req->flags);
+	struct list_head *req_queue, *ent_queue;
+
+	if (ring->per_core_queue) {
+		/*
+		 * async requests are best handled on another core, the current
+		 * core can do application/page handling, while the async request
+		 * is handled on another core in userspace.
+		 * For sync request the application has to wait - no processing, so
+		 * the request should continue on the current core and avoid context
+		 * switches.
+		 * XXX This should be on the same numa node and not busy - is there
+		 * a scheduler function available  that could make this decision?
+		 * It should also not persistently switch between cores - makes
+		 * it hard for the scheduler.
+		 */
+		qid = task_cpu(current);
+
+		if (unlikely(qid >= ring->nr_queues)) {
+			WARN_ONCE(1,
+				  "Core number (%u) exceeds nr ueues (%zu)\n",
+				  qid, ring->nr_queues);
+			qid = 0;
+		}
+	}
+
+	queue = fuse_uring_get_queue(ring, qid);
+	req_queue = async ? &queue->async_fuse_req_queue :
+			    &queue->sync_fuse_req_queue;
+	ent_queue = async ? &queue->async_ent_avail_queue :
+			    &queue->sync_ent_avail_queue;
+
+	spin_lock(&queue->lock);
+
+	if (unlikely(queue->stopped)) {
+		res = -ENOTCONN;
+		goto err_unlock;
+	}
+
+	if (list_empty(ent_queue)) {
+		list_add_tail(&req->list, req_queue);
+	} else {
+		ring_ent =
+			list_first_entry(ent_queue, struct fuse_ring_ent, list);
+		list_del(&ring_ent->list);
+		fuse_uring_add_req_to_ring_ent(ring_ent, req);
+	}
+	spin_unlock(&queue->lock);
+
+	if (ring_ent != NULL)
+		fuse_uring_send_to_ring(ring_ent);
+
+	return 0;
+
+err_unlock:
+	spin_unlock(&queue->lock);
+	return res;
+}
+
diff --git a/fs/fuse/dev_uring_i.h b/fs/fuse/dev_uring_i.h
index e5fc84e2f3ea..5d7e1e6e7a82 100644
--- a/fs/fuse/dev_uring_i.h
+++ b/fs/fuse/dev_uring_i.h
@@ -208,6 +208,7 @@ int fuse_uring_queue_cfg(struct fuse_ring *ring,
 void fuse_uring_ring_destruct(struct fuse_ring *ring);
 void fuse_uring_stop_queues(struct fuse_ring *ring);
 int fuse_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags);
+int fuse_uring_queue_fuse_req(struct fuse_conn *fc, struct fuse_req *req);
 
 static inline void fuse_uring_conn_init(struct fuse_ring *ring,
 					struct fuse_conn *fc)
@@ -331,6 +332,11 @@ static inline void fuse_uring_wait_stopped_queues(struct fuse_conn *fc)
 			   atomic_read(&ring->queue_refs) == 0);
 }
 
+static inline bool fuse_uring_ready(struct fuse_conn *fc)
+{
+	return fc->ring && fc->ring->ready;
+}
+
 #else /* CONFIG_FUSE_IO_URING */
 
 struct fuse_ring;
@@ -366,6 +372,17 @@ static inline void fuse_uring_wait_stopped_queues(struct fuse_conn *fc)
 {
 }
 
+static inline bool fuse_uring_ready(struct fuse_conn *fc)
+{
+	return false;
+}
+
+static inline int
+fuse_uring_queue_fuse_req(struct fuse_conn *fc, struct fuse_req *req)
+{
+	return -EPFNOSUPPORT;
+}
+
 #endif /* CONFIG_FUSE_IO_URING */
 
 #endif /* _FS_FUSE_DEV_URING_I_H */

-- 
2.40.1





[Index of Archives]     [Linux Ext4 Filesystem]     [Union Filesystem]     [Filesystem Testing]     [Ceph Users]     [Ecryptfs]     [NTFS 3]     [AutoFS]     [Kernel Newbies]     [Share Photos]     [Security]     [Netfilter]     [Bugtraq]     [Yosemite News]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux Cachefs]     [Reiser Filesystem]     [Linux RAID]     [NTFS 3]     [Samba]     [Device Mapper]     [CEPH Development]

  Powered by Linux