This adds basic support for ring SQEs (with opcode=IORING_OP_URING_CMD). For now only FUSE_URING_REQ_FETCH is handled to register queue entries. Signed-off-by: Bernd Schubert <bschubert@xxxxxxx> --- fs/fuse/dev.c | 3 + fs/fuse/dev_uring.c | 231 ++++++++++++++++++++++++++++++++++++++++++++++ fs/fuse/dev_uring_i.h | 60 ++++++++++++ include/uapi/linux/fuse.h | 38 ++++++++ 4 files changed, 332 insertions(+) diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c index fec995818a9e..998027825481 100644 --- a/fs/fuse/dev.c +++ b/fs/fuse/dev.c @@ -2477,6 +2477,9 @@ const struct file_operations fuse_dev_operations = { .fasync = fuse_dev_fasync, .unlocked_ioctl = fuse_dev_ioctl, .compat_ioctl = compat_ptr_ioctl, +#ifdef CONFIG_FUSE_IO_URING + .uring_cmd = fuse_uring_cmd, +#endif }; EXPORT_SYMBOL_GPL(fuse_dev_operations); diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c index 4dcb4972242e..46c2274193bf 100644 --- a/fs/fuse/dev_uring.c +++ b/fs/fuse/dev_uring.c @@ -29,6 +29,30 @@ #include <linux/topology.h> #include <linux/io_uring/cmd.h> +static int fuse_ring_ring_ent_unset_userspace(struct fuse_ring_ent *ent) +{ + if (WARN_ON_ONCE(ent->state != FRRS_USERSPACE)) + return -EIO; + + ent->state = FRRS_COMMIT; + list_del_init(&ent->list); + + return 0; +} + +/* Update conn limits according to ring values */ +static void fuse_uring_conn_cfg_limits(struct fuse_ring *ring) +{ + struct fuse_conn *fc = ring->fc; + + /* + * This not ideal, as multiplication with nr_queue assumes the limit + * gets reached when all queues are used, but even a single queue + * might reach the limit. + */ + WRITE_ONCE(fc->max_background, ring->nr_queues * ring->max_nr_async); +} + static void fuse_uring_queue_cfg(struct fuse_ring_queue *queue, int qid, struct fuse_ring *ring) { @@ -37,6 +61,11 @@ static void fuse_uring_queue_cfg(struct fuse_ring_queue *queue, int qid, queue->qid = qid; queue->ring = ring; + spin_lock_init(&queue->lock); + + INIT_LIST_HEAD(&queue->sync_ent_avail_queue); + INIT_LIST_HEAD(&queue->async_ent_avail_queue); + for (tag = 0; tag < ring->queue_depth; tag++) { struct fuse_ring_ent *ent = &queue->ring_ent[tag]; @@ -44,6 +73,8 @@ static void fuse_uring_queue_cfg(struct fuse_ring_queue *queue, int qid, ent->tag = tag; ent->state = FRRS_INIT; + + INIT_LIST_HEAD(&ent->list); } } @@ -141,3 +172,203 @@ int fuse_uring_conn_cfg(struct file *file, void __user *argp) kvfree(ring); return res; } + +/* + * Put a ring request onto hold, it is no longer used for now. + */ +static void fuse_uring_ent_avail(struct fuse_ring_ent *ring_ent, + struct fuse_ring_queue *queue) + __must_hold(&queue->lock) +{ + struct fuse_ring *ring = queue->ring; + + lockdep_assert_held(&queue->lock); + + /* unsets all previous flags - basically resets */ + pr_devel("%s ring=%p qid=%d tag=%d state=%d async=%d\n", __func__, + ring, ring_ent->queue->qid, ring_ent->tag, ring_ent->state, + ring_ent->async); + + if (WARN_ON(ring_ent->state != FRRS_COMMIT)) { + pr_warn("%s qid=%d tag=%d state=%d async=%d\n", __func__, + ring_ent->queue->qid, ring_ent->tag, ring_ent->state, + ring_ent->async); + return; + } + + WARN_ON_ONCE(!list_empty(&ring_ent->list)); + + if (ring_ent->async) + list_add(&ring_ent->list, &queue->async_ent_avail_queue); + else + list_add(&ring_ent->list, &queue->sync_ent_avail_queue); + + ring_ent->state = FRRS_WAIT; +} + +/* + * fuse_uring_req_fetch command handling + */ +static int _fuse_uring_fetch(struct fuse_ring_ent *ring_ent, + struct io_uring_cmd *cmd, unsigned int issue_flags) +__must_hold(ring_ent->queue->lock) +{ + struct fuse_ring_queue *queue = ring_ent->queue; + struct fuse_ring *ring = queue->ring; + int nr_ring_sqe; + + lockdep_assert_held(&queue->lock); + + /* register requests for foreground requests first, then backgrounds */ + if (queue->nr_req_sync >= ring->max_nr_sync) { + queue->nr_req_async++; + ring_ent->async = 1; + } else + queue->nr_req_sync++; + + fuse_uring_ent_avail(ring_ent, queue); + + if (WARN_ON_ONCE(queue->nr_req_sync + + queue->nr_req_async > ring->queue_depth)) { + /* should be caught by ring state before and queue depth + * check before + */ + pr_info("qid=%d tag=%d req cnt (fg=%d async=%d exceeds depth=%zu", + queue->qid, ring_ent->tag, queue->nr_req_sync, + queue->nr_req_async, ring->queue_depth); + return -ERANGE; + } + + WRITE_ONCE(ring_ent->cmd, cmd); + + nr_ring_sqe = ring->queue_depth * ring->nr_queues; + if (atomic_inc_return(&ring->nr_sqe_init) == nr_ring_sqe) { + fuse_uring_conn_cfg_limits(ring); + ring->ready = 1; + } + + return 0; +} + +static int fuse_uring_fetch(struct fuse_ring_ent *ring_ent, + struct io_uring_cmd *cmd, unsigned int issue_flags) + __releases(ring_ent->queue->lock) +{ + struct fuse_ring *ring = ring_ent->queue->ring; + struct fuse_ring_queue *queue = ring_ent->queue; + int ret; + + /* No other bit must be set here */ + ret = -EINVAL; + if (ring_ent->state != FRRS_INIT) + goto err; + + /* + * FUSE_URING_REQ_FETCH is an initialization exception, needs + * state override + */ + ring_ent->state = FRRS_USERSPACE; + ret = fuse_ring_ring_ent_unset_userspace(ring_ent); + if (ret != 0) { + pr_info_ratelimited( + "qid=%d tag=%d register req state %d expected %d", + queue->qid, ring_ent->tag, ring_ent->state, + FRRS_INIT); + goto err; + } + + ret = _fuse_uring_fetch(ring_ent, cmd, issue_flags); + if (ret) + goto err; + + /* + * The ring entry is registered now and needs to be handled + * for shutdown. + */ + atomic_inc(&ring->queue_refs); +err: + spin_unlock(&queue->lock); + return ret; +} + +/** + * Entry function from io_uring to handle the given passthrough command + * (op cocde IORING_OP_URING_CMD) + */ +int fuse_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags) +{ + const struct fuse_uring_cmd_req *cmd_req = io_uring_sqe_cmd(cmd->sqe); + struct fuse_dev *fud; + struct fuse_conn *fc; + struct fuse_ring *ring; + struct fuse_ring_queue *queue; + struct fuse_ring_ent *ring_ent = NULL; + u32 cmd_op = cmd->cmd_op; + int ret = 0; + + ret = -ENODEV; + fud = fuse_get_dev(cmd->file); + if (!fud) + goto out; + fc = fud->fc; + + ring = fc->ring; + if (!ring) + goto out; + + queue = fud->ring_q; + if (!queue) + goto out; + + ret = -EINVAL; + if (queue->qid != cmd_req->qid) + goto out; + + ret = -ERANGE; + if (cmd_req->tag > ring->queue_depth) + goto out; + + ring_ent = &queue->ring_ent[cmd_req->tag]; + + pr_devel("%s:%d received: cmd op %d qid %d (%p) tag %d (%p)\n", + __func__, __LINE__, cmd_op, cmd_req->qid, queue, cmd_req->tag, + ring_ent); + + spin_lock(&queue->lock); + ret = -ENOTCONN; + if (unlikely(fc->aborted || queue->stopped)) + goto err_unlock; + + switch (cmd_op) { + case FUSE_URING_REQ_FETCH: + ret = fuse_uring_fetch(ring_ent, cmd, issue_flags); + break; + default: + ret = -EINVAL; + pr_devel("Unknown uring command %d", cmd_op); + goto err_unlock; + } +out: + pr_devel("uring cmd op=%d, qid=%d tag=%d ret=%d\n", cmd_op, + cmd_req->qid, cmd_req->tag, ret); + + if (ret < 0) { + if (ring_ent != NULL) { + pr_info_ratelimited("error: uring cmd op=%d, qid=%d tag=%d ret=%d\n", + cmd_op, cmd_req->qid, cmd_req->tag, + ret); + + /* must not change the entry state, as userspace + * might have sent random data, but valid requests + * might be registered already - don't confuse those. + */ + } + io_uring_cmd_done(cmd, ret, 0, issue_flags); + } + + return -EIOCBQUEUED; + +err_unlock: + spin_unlock(&queue->lock); + goto out; +} diff --git a/fs/fuse/dev_uring_i.h b/fs/fuse/dev_uring_i.h index 26266f923321..6561f4178cac 100644 --- a/fs/fuse/dev_uring_i.h +++ b/fs/fuse/dev_uring_i.h @@ -19,6 +19,15 @@ enum fuse_ring_req_state { /* request is basially initialized */ FRRS_INIT, + + /* ring entry received from userspace and it being processed */ + FRRS_COMMIT, + + /* The ring request waits for a new fuse request */ + FRRS_WAIT, + + /* request is in or on the way to user space */ + FRRS_USERSPACE, }; /* A fuse ring entry, part of the ring queue */ @@ -31,6 +40,13 @@ struct fuse_ring_ent { /* state the request is currently in */ enum fuse_ring_req_state state; + + /* is this an async or sync entry */ + unsigned int async : 1; + + struct list_head list; + + struct io_uring_cmd *cmd; }; struct fuse_ring_queue { @@ -43,6 +59,30 @@ struct fuse_ring_queue { /* queue id, typically also corresponds to the cpu core */ unsigned int qid; + /* + * queue lock, taken when any value in the queue changes _and_ also + * a ring entry state changes. + */ + spinlock_t lock; + + /* available ring entries (struct fuse_ring_ent) */ + struct list_head async_ent_avail_queue; + struct list_head sync_ent_avail_queue; + + /* + * available number of sync requests, + * loosely bound to fuse foreground requests + */ + int nr_req_sync; + + /* + * available number of async requests + * loosely bound to fuse background requests + */ + int nr_req_async; + + unsigned int stopped : 1; + /* size depends on queue depth */ struct fuse_ring_ent ring_ent[] ____cacheline_aligned_in_smp; }; @@ -79,11 +119,21 @@ struct fuse_ring { /* numa aware memory allocation */ unsigned int numa_aware : 1; + /* Is the ring read to take requests */ + unsigned int ready : 1; + + /* number of SQEs initialized */ + atomic_t nr_sqe_init; + + /* Used to release the ring on stop */ + atomic_t queue_refs; + struct fuse_ring_queue queues[] ____cacheline_aligned_in_smp; }; void fuse_uring_abort_end_requests(struct fuse_ring *ring); int fuse_uring_conn_cfg(struct file *file, void __user *argp); +int fuse_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags); static inline void fuse_uring_conn_destruct(struct fuse_conn *fc) { @@ -113,6 +163,11 @@ static inline bool fuse_uring_configured(struct fuse_conn *fc) return false; } +static inline bool fuse_per_core_queue(struct fuse_conn *fc) +{ + return fc->ring && fc->ring->per_core_queue; +} + #else /* CONFIG_FUSE_IO_URING */ struct fuse_ring; @@ -131,6 +186,11 @@ static inline bool fuse_uring_configured(struct fuse_conn *fc) return false; } +static inline bool fuse_per_core_queue(struct fuse_conn *fc) +{ + return false; +} + #endif /* CONFIG_FUSE_IO_URING */ #endif /* _FS_FUSE_DEV_URING_I_H */ diff --git a/include/uapi/linux/fuse.h b/include/uapi/linux/fuse.h index 143ed3c1c7b3..586358e9992c 100644 --- a/include/uapi/linux/fuse.h +++ b/include/uapi/linux/fuse.h @@ -1247,6 +1247,12 @@ struct fuse_supp_groups { #define FUSE_RING_HEADER_BUF_SIZE 4096 #define FUSE_RING_MIN_IN_OUT_ARG_SIZE 4096 +/* + * Request is background type. Daemon side is free to use this information + * to handle foreground/background CQEs with different priorities. + */ +#define FUSE_RING_REQ_FLAG_ASYNC (1ull << 0) + /** * This structure mapped onto the */ @@ -1272,4 +1278,36 @@ struct fuse_ring_req { char in_out_arg[]; }; +/** + * sqe commands to the kernel + */ +enum fuse_uring_cmd { + FUSE_URING_REQ_INVALID = 0, + + /* submit sqe to kernel to get a request */ + FUSE_URING_REQ_FETCH = 1, + + /* commit result and fetch next request */ + FUSE_URING_REQ_COMMIT_AND_FETCH = 2, +}; + +/** + * In the 80B command area of the SQE. + */ +struct fuse_uring_cmd_req { + /* User buffer */ + uint64_t buf_ptr; + + /* length of the user buffer */ + uint32_t buf_len; + + /* queue the command is for (queue index) */ + uint16_t qid; + + /* queue entry (array index) */ + uint16_t tag; + + uint32_t flags; +}; + #endif /* _LINUX_FUSE_H */ -- 2.43.0