[PATCH RFC v3 08/17] fuse: {uring} Handle SQEs - register commands

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



This adds basic support for ring SQEs (with opcode=IORING_OP_URING_CMD).
For now only FUSE_URING_REQ_FETCH is handled to register queue entries.

Signed-off-by: Bernd Schubert <bschubert@xxxxxxx>
---
 fs/fuse/dev.c             |   3 +
 fs/fuse/dev_uring.c       | 231 ++++++++++++++++++++++++++++++++++++++++++++++
 fs/fuse/dev_uring_i.h     |  60 ++++++++++++
 include/uapi/linux/fuse.h |  38 ++++++++
 4 files changed, 332 insertions(+)

diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
index fec995818a9e..998027825481 100644
--- a/fs/fuse/dev.c
+++ b/fs/fuse/dev.c
@@ -2477,6 +2477,9 @@ const struct file_operations fuse_dev_operations = {
 	.fasync		= fuse_dev_fasync,
 	.unlocked_ioctl = fuse_dev_ioctl,
 	.compat_ioctl   = compat_ptr_ioctl,
+#ifdef CONFIG_FUSE_IO_URING
+	.uring_cmd	= fuse_uring_cmd,
+#endif
 };
 EXPORT_SYMBOL_GPL(fuse_dev_operations);
 
diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c
index 4dcb4972242e..46c2274193bf 100644
--- a/fs/fuse/dev_uring.c
+++ b/fs/fuse/dev_uring.c
@@ -29,6 +29,30 @@
 #include <linux/topology.h>
 #include <linux/io_uring/cmd.h>
 
+static int fuse_ring_ring_ent_unset_userspace(struct fuse_ring_ent *ent)
+{
+	if (WARN_ON_ONCE(ent->state != FRRS_USERSPACE))
+		return -EIO;
+
+	ent->state = FRRS_COMMIT;
+	list_del_init(&ent->list);
+
+	return 0;
+}
+
+/* Update conn limits according to ring values */
+static void fuse_uring_conn_cfg_limits(struct fuse_ring *ring)
+{
+	struct fuse_conn *fc = ring->fc;
+
+	/*
+	 * This not ideal, as multiplication with nr_queue assumes the limit
+	 * gets reached when all queues are used, but even a single queue
+	 * might reach the limit.
+	 */
+	WRITE_ONCE(fc->max_background, ring->nr_queues * ring->max_nr_async);
+}
+
 static void fuse_uring_queue_cfg(struct fuse_ring_queue *queue, int qid,
 				 struct fuse_ring *ring)
 {
@@ -37,6 +61,11 @@ static void fuse_uring_queue_cfg(struct fuse_ring_queue *queue, int qid,
 	queue->qid = qid;
 	queue->ring = ring;
 
+	spin_lock_init(&queue->lock);
+
+	INIT_LIST_HEAD(&queue->sync_ent_avail_queue);
+	INIT_LIST_HEAD(&queue->async_ent_avail_queue);
+
 	for (tag = 0; tag < ring->queue_depth; tag++) {
 		struct fuse_ring_ent *ent = &queue->ring_ent[tag];
 
@@ -44,6 +73,8 @@ static void fuse_uring_queue_cfg(struct fuse_ring_queue *queue, int qid,
 		ent->tag = tag;
 
 		ent->state = FRRS_INIT;
+
+		INIT_LIST_HEAD(&ent->list);
 	}
 }
 
@@ -141,3 +172,203 @@ int fuse_uring_conn_cfg(struct file *file, void __user *argp)
 	kvfree(ring);
 	return res;
 }
+
+/*
+ * Put a ring request onto hold, it is no longer used for now.
+ */
+static void fuse_uring_ent_avail(struct fuse_ring_ent *ring_ent,
+				 struct fuse_ring_queue *queue)
+	__must_hold(&queue->lock)
+{
+	struct fuse_ring *ring = queue->ring;
+
+	lockdep_assert_held(&queue->lock);
+
+	/* unsets all previous flags - basically resets */
+	pr_devel("%s ring=%p qid=%d tag=%d state=%d async=%d\n", __func__,
+		 ring, ring_ent->queue->qid, ring_ent->tag, ring_ent->state,
+		 ring_ent->async);
+
+	if (WARN_ON(ring_ent->state != FRRS_COMMIT)) {
+		pr_warn("%s qid=%d tag=%d state=%d async=%d\n", __func__,
+			ring_ent->queue->qid, ring_ent->tag, ring_ent->state,
+			ring_ent->async);
+		return;
+	}
+
+	WARN_ON_ONCE(!list_empty(&ring_ent->list));
+
+	if (ring_ent->async)
+		list_add(&ring_ent->list, &queue->async_ent_avail_queue);
+	else
+		list_add(&ring_ent->list, &queue->sync_ent_avail_queue);
+
+	ring_ent->state = FRRS_WAIT;
+}
+
+/*
+ * fuse_uring_req_fetch command handling
+ */
+static int _fuse_uring_fetch(struct fuse_ring_ent *ring_ent,
+			    struct io_uring_cmd *cmd, unsigned int issue_flags)
+__must_hold(ring_ent->queue->lock)
+{
+	struct fuse_ring_queue *queue = ring_ent->queue;
+	struct fuse_ring *ring = queue->ring;
+	int nr_ring_sqe;
+
+	lockdep_assert_held(&queue->lock);
+
+	/* register requests for foreground requests first, then backgrounds */
+	if (queue->nr_req_sync >= ring->max_nr_sync) {
+		queue->nr_req_async++;
+		ring_ent->async = 1;
+	} else
+		queue->nr_req_sync++;
+
+	fuse_uring_ent_avail(ring_ent, queue);
+
+	if (WARN_ON_ONCE(queue->nr_req_sync +
+			 queue->nr_req_async > ring->queue_depth)) {
+		/* should be caught by ring state before and queue depth
+		 * check before
+		 */
+		pr_info("qid=%d tag=%d req cnt (fg=%d async=%d exceeds depth=%zu",
+			queue->qid, ring_ent->tag, queue->nr_req_sync,
+			queue->nr_req_async, ring->queue_depth);
+		return -ERANGE;
+	}
+
+	WRITE_ONCE(ring_ent->cmd, cmd);
+
+	nr_ring_sqe = ring->queue_depth * ring->nr_queues;
+	if (atomic_inc_return(&ring->nr_sqe_init) == nr_ring_sqe) {
+		fuse_uring_conn_cfg_limits(ring);
+		ring->ready = 1;
+	}
+
+	return 0;
+}
+
+static int fuse_uring_fetch(struct fuse_ring_ent *ring_ent,
+			    struct io_uring_cmd *cmd, unsigned int issue_flags)
+	__releases(ring_ent->queue->lock)
+{
+	struct fuse_ring *ring = ring_ent->queue->ring;
+	struct fuse_ring_queue *queue = ring_ent->queue;
+	int ret;
+
+	/* No other bit must be set here */
+	ret = -EINVAL;
+	if (ring_ent->state != FRRS_INIT)
+		goto err;
+
+	/*
+	 * FUSE_URING_REQ_FETCH is an initialization exception, needs
+	 * state override
+	 */
+	ring_ent->state = FRRS_USERSPACE;
+	ret = fuse_ring_ring_ent_unset_userspace(ring_ent);
+	if (ret != 0) {
+		pr_info_ratelimited(
+			"qid=%d tag=%d register req state %d expected %d",
+			queue->qid, ring_ent->tag, ring_ent->state,
+			FRRS_INIT);
+		goto err;
+	}
+
+	ret = _fuse_uring_fetch(ring_ent, cmd, issue_flags);
+	if (ret)
+		goto err;
+
+	/*
+	 * The ring entry is registered now and needs to be handled
+	 * for shutdown.
+	 */
+	atomic_inc(&ring->queue_refs);
+err:
+	spin_unlock(&queue->lock);
+	return ret;
+}
+
+/**
+ * Entry function from io_uring to handle the given passthrough command
+ * (op cocde IORING_OP_URING_CMD)
+ */
+int fuse_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags)
+{
+	const struct fuse_uring_cmd_req *cmd_req = io_uring_sqe_cmd(cmd->sqe);
+	struct fuse_dev *fud;
+	struct fuse_conn *fc;
+	struct fuse_ring *ring;
+	struct fuse_ring_queue *queue;
+	struct fuse_ring_ent *ring_ent = NULL;
+	u32 cmd_op = cmd->cmd_op;
+	int ret = 0;
+
+	ret = -ENODEV;
+	fud = fuse_get_dev(cmd->file);
+	if (!fud)
+		goto out;
+	fc = fud->fc;
+
+	ring = fc->ring;
+	if (!ring)
+		goto out;
+
+	queue = fud->ring_q;
+	if (!queue)
+		goto out;
+
+	ret = -EINVAL;
+	if (queue->qid != cmd_req->qid)
+		goto out;
+
+	ret = -ERANGE;
+	if (cmd_req->tag > ring->queue_depth)
+		goto out;
+
+	ring_ent = &queue->ring_ent[cmd_req->tag];
+
+	pr_devel("%s:%d received: cmd op %d qid %d (%p) tag %d  (%p)\n",
+		 __func__, __LINE__, cmd_op, cmd_req->qid, queue, cmd_req->tag,
+		 ring_ent);
+
+	spin_lock(&queue->lock);
+	ret = -ENOTCONN;
+	if (unlikely(fc->aborted || queue->stopped))
+		goto err_unlock;
+
+	switch (cmd_op) {
+	case FUSE_URING_REQ_FETCH:
+		ret = fuse_uring_fetch(ring_ent, cmd, issue_flags);
+		break;
+	default:
+		ret = -EINVAL;
+		pr_devel("Unknown uring command %d", cmd_op);
+		goto err_unlock;
+	}
+out:
+	pr_devel("uring cmd op=%d, qid=%d tag=%d ret=%d\n", cmd_op,
+		 cmd_req->qid, cmd_req->tag, ret);
+
+	if (ret < 0) {
+		if (ring_ent != NULL) {
+			pr_info_ratelimited("error: uring cmd op=%d, qid=%d tag=%d ret=%d\n",
+					    cmd_op, cmd_req->qid, cmd_req->tag,
+					    ret);
+
+			/* must not change the entry state, as userspace
+			 * might have sent random data, but valid requests
+			 * might be registered already - don't confuse those.
+			 */
+		}
+		io_uring_cmd_done(cmd, ret, 0, issue_flags);
+	}
+
+	return -EIOCBQUEUED;
+
+err_unlock:
+	spin_unlock(&queue->lock);
+	goto out;
+}
diff --git a/fs/fuse/dev_uring_i.h b/fs/fuse/dev_uring_i.h
index 26266f923321..6561f4178cac 100644
--- a/fs/fuse/dev_uring_i.h
+++ b/fs/fuse/dev_uring_i.h
@@ -19,6 +19,15 @@ enum fuse_ring_req_state {
 
 	/* request is basially initialized */
 	FRRS_INIT,
+
+	/* ring entry received from userspace and it being processed */
+	FRRS_COMMIT,
+
+	/* The ring request waits for a new fuse request */
+	FRRS_WAIT,
+
+	/* request is in or on the way to user space */
+	FRRS_USERSPACE,
 };
 
 /* A fuse ring entry, part of the ring queue */
@@ -31,6 +40,13 @@ struct fuse_ring_ent {
 
 	/* state the request is currently in */
 	enum fuse_ring_req_state state;
+
+	/* is this an async or sync entry */
+	unsigned int async : 1;
+
+	struct list_head list;
+
+	struct io_uring_cmd *cmd;
 };
 
 struct fuse_ring_queue {
@@ -43,6 +59,30 @@ struct fuse_ring_queue {
 	/* queue id, typically also corresponds to the cpu core */
 	unsigned int qid;
 
+	/*
+	 * queue lock, taken when any value in the queue changes _and_ also
+	 * a ring entry state changes.
+	 */
+	spinlock_t lock;
+
+	/* available ring entries (struct fuse_ring_ent) */
+	struct list_head async_ent_avail_queue;
+	struct list_head sync_ent_avail_queue;
+
+	/*
+	 * available number of sync requests,
+	 * loosely bound to fuse foreground requests
+	 */
+	int nr_req_sync;
+
+	/*
+	 * available number of async requests
+	 * loosely bound to fuse background requests
+	 */
+	int nr_req_async;
+
+	unsigned int stopped : 1;
+
 	/* size depends on queue depth */
 	struct fuse_ring_ent ring_ent[] ____cacheline_aligned_in_smp;
 };
@@ -79,11 +119,21 @@ struct fuse_ring {
 	/* numa aware memory allocation */
 	unsigned int numa_aware : 1;
 
+	/* Is the ring read to take requests */
+	unsigned int ready : 1;
+
+	/* number of SQEs initialized */
+	atomic_t nr_sqe_init;
+
+	/* Used to release the ring on stop */
+	atomic_t queue_refs;
+
 	struct fuse_ring_queue queues[] ____cacheline_aligned_in_smp;
 };
 
 void fuse_uring_abort_end_requests(struct fuse_ring *ring);
 int fuse_uring_conn_cfg(struct file *file, void __user *argp);
+int fuse_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags);
 
 static inline void fuse_uring_conn_destruct(struct fuse_conn *fc)
 {
@@ -113,6 +163,11 @@ static inline bool fuse_uring_configured(struct fuse_conn *fc)
 	return false;
 }
 
+static inline bool fuse_per_core_queue(struct fuse_conn *fc)
+{
+	return fc->ring && fc->ring->per_core_queue;
+}
+
 #else /* CONFIG_FUSE_IO_URING */
 
 struct fuse_ring;
@@ -131,6 +186,11 @@ static inline bool fuse_uring_configured(struct fuse_conn *fc)
 	return false;
 }
 
+static inline bool fuse_per_core_queue(struct fuse_conn *fc)
+{
+	return false;
+}
+
 #endif /* CONFIG_FUSE_IO_URING */
 
 #endif /* _FS_FUSE_DEV_URING_I_H */
diff --git a/include/uapi/linux/fuse.h b/include/uapi/linux/fuse.h
index 143ed3c1c7b3..586358e9992c 100644
--- a/include/uapi/linux/fuse.h
+++ b/include/uapi/linux/fuse.h
@@ -1247,6 +1247,12 @@ struct fuse_supp_groups {
 #define FUSE_RING_HEADER_BUF_SIZE 4096
 #define FUSE_RING_MIN_IN_OUT_ARG_SIZE 4096
 
+/*
+ * Request is background type. Daemon side is free to use this information
+ * to handle foreground/background CQEs with different priorities.
+ */
+#define FUSE_RING_REQ_FLAG_ASYNC (1ull << 0)
+
 /**
  * This structure mapped onto the
  */
@@ -1272,4 +1278,36 @@ struct fuse_ring_req {
 	char in_out_arg[];
 };
 
+/**
+ * sqe commands to the kernel
+ */
+enum fuse_uring_cmd {
+	FUSE_URING_REQ_INVALID = 0,
+
+	/* submit sqe to kernel to get a request */
+	FUSE_URING_REQ_FETCH = 1,
+
+	/* commit result and fetch next request */
+	FUSE_URING_REQ_COMMIT_AND_FETCH = 2,
+};
+
+/**
+ * In the 80B command area of the SQE.
+ */
+struct fuse_uring_cmd_req {
+	/* User buffer */
+	uint64_t buf_ptr;
+
+	/* length of the user buffer */
+	uint32_t buf_len;
+
+	/* queue the command is for (queue index) */
+	uint16_t qid;
+
+	/* queue entry (array index) */
+	uint16_t tag;
+
+	uint32_t flags;
+};
+
 #endif /* _LINUX_FUSE_H */

-- 
2.43.0





[Index of Archives]     [Linux Ext4 Filesystem]     [Union Filesystem]     [Filesystem Testing]     [Ceph Users]     [Ecryptfs]     [NTFS 3]     [AutoFS]     [Kernel Newbies]     [Share Photos]     [Security]     [Netfilter]     [Bugtraq]     [Yosemite News]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux Cachefs]     [Reiser Filesystem]     [Linux RAID]     [NTFS 3]     [Samba]     [Device Mapper]     [CEPH Development]

  Powered by Linux