On Sun, Jan 19, 2025 at 4:33 PM Bernd Schubert <bernd@xxxxxxxxxxx> wrote: > > Hi Joanne, > > sorry for my late reply, I was occupied all week. > > On 1/13/25 23:44, Joanne Koong wrote: > > On Mon, Jan 6, 2025 at 4:25 PM Bernd Schubert <bschubert@xxxxxxx> wrote: > >> > >> This adds support for fuse request completion through ring SQEs > >> (FUSE_URING_CMD_COMMIT_AND_FETCH handling). After committing > >> the ring entry it becomes available for new fuse requests. > >> Handling of requests through the ring (SQE/CQE handling) > >> is complete now. > >> > >> Fuse request data are copied through the mmaped ring buffer, > >> there is no support for any zero copy yet. > >> > >> Signed-off-by: Bernd Schubert <bschubert@xxxxxxx> > >> --- > >> fs/fuse/dev_uring.c | 450 ++++++++++++++++++++++++++++++++++++++++++++++++++ > >> fs/fuse/dev_uring_i.h | 12 ++ > >> fs/fuse/fuse_i.h | 4 + > >> 3 files changed, 466 insertions(+) > >> > >> diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c > >> index b44ba4033615e01041313c040035b6da6af0ee17..f44e66a7ea577390da87e9ac7d118a9416898c28 100644 > >> --- a/fs/fuse/dev_uring.c > >> +++ b/fs/fuse/dev_uring.c > >> @@ -26,6 +26,19 @@ bool fuse_uring_enabled(void) > >> return enable_uring; > >> } > >> > >> +static void fuse_uring_req_end(struct fuse_ring_ent *ring_ent, bool set_err, > >> + int error) > >> +{ > >> + struct fuse_req *req = ring_ent->fuse_req; > >> + > >> + if (set_err) > >> + req->out.h.error = error; > > > > I think we could get away with not having the "bool set_err" as an > > argument if we do "if (error)" directly. AFAICT, we can use the value > > of error directly since it always returns zero on success and any > > non-zero value is considered an error. > > I had done this because of fuse_uring_commit() > > > err = fuse_uring_out_header_has_err(&req->out.h, req, fc); > if (err) { > /* req->out.h.error already set */ > goto out; > } > > > In fuse_uring_out_header_has_err() the header might already have the > error code, but there are other errors as well. Well, setting an > existing error code saves us a few lines and conditions, so you are > probably right and I removed that argument now. > > > > > >> + > >> + clear_bit(FR_SENT, &req->flags); > >> + fuse_request_end(ring_ent->fuse_req); > >> + ring_ent->fuse_req = NULL; > >> +} > >> + > >> void fuse_uring_destruct(struct fuse_conn *fc) > >> { > >> struct fuse_ring *ring = fc->ring; > >> @@ -41,8 +54,11 @@ void fuse_uring_destruct(struct fuse_conn *fc) > >> continue; > >> > >> WARN_ON(!list_empty(&queue->ent_avail_queue)); > >> + WARN_ON(!list_empty(&queue->ent_w_req_queue)); > >> WARN_ON(!list_empty(&queue->ent_commit_queue)); > >> + WARN_ON(!list_empty(&queue->ent_in_userspace)); > >> > >> + kfree(queue->fpq.processing); > >> kfree(queue); > >> ring->queues[qid] = NULL; > >> } > >> @@ -101,20 +117,34 @@ static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring, > >> { > >> struct fuse_conn *fc = ring->fc; > >> struct fuse_ring_queue *queue; > >> + struct list_head *pq; > >> > >> queue = kzalloc(sizeof(*queue), GFP_KERNEL_ACCOUNT); > >> if (!queue) > >> return NULL; > >> + pq = kcalloc(FUSE_PQ_HASH_SIZE, sizeof(struct list_head), GFP_KERNEL); > >> + if (!pq) { > >> + kfree(queue); > >> + return NULL; > >> + } > >> + > >> queue->qid = qid; > >> queue->ring = ring; > >> spin_lock_init(&queue->lock); > >> > >> INIT_LIST_HEAD(&queue->ent_avail_queue); > >> INIT_LIST_HEAD(&queue->ent_commit_queue); > >> + INIT_LIST_HEAD(&queue->ent_w_req_queue); > >> + INIT_LIST_HEAD(&queue->ent_in_userspace); > >> + INIT_LIST_HEAD(&queue->fuse_req_queue); > >> + > >> + queue->fpq.processing = pq; > >> + fuse_pqueue_init(&queue->fpq); > >> > >> spin_lock(&fc->lock); > >> if (ring->queues[qid]) { > >> spin_unlock(&fc->lock); > >> + kfree(queue->fpq.processing); > >> kfree(queue); > >> return ring->queues[qid]; > >> } > >> @@ -128,6 +158,214 @@ static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring, > >> return queue; > >> } > >> > >> +/* > >> + * Checks for errors and stores it into the request > >> + */ > >> +static int fuse_uring_out_header_has_err(struct fuse_out_header *oh, > >> + struct fuse_req *req, > >> + struct fuse_conn *fc) > >> +{ > >> + int err; > >> + > >> + err = -EINVAL; > >> + if (oh->unique == 0) { > >> + /* Not supportd through io-uring yet */ > >> + pr_warn_once("notify through fuse-io-uring not supported\n"); > >> + goto seterr; > >> + } > >> + > >> + err = -EINVAL; > >> + if (oh->error <= -ERESTARTSYS || oh->error > 0) > >> + goto seterr; > >> + > >> + if (oh->error) { > >> + err = oh->error; > >> + goto err; > >> + } > >> + > >> + err = -ENOENT; > >> + if ((oh->unique & ~FUSE_INT_REQ_BIT) != req->in.h.unique) { > >> + pr_warn_ratelimited("unique mismatch, expected: %llu got %llu\n", > >> + req->in.h.unique, > >> + oh->unique & ~FUSE_INT_REQ_BIT); > >> + goto seterr; > >> + } > >> + > >> + /* > >> + * Is it an interrupt reply ID? > >> + * XXX: Not supported through fuse-io-uring yet, it should not even > >> + * find the request - should not happen. > >> + */ > >> + WARN_ON_ONCE(oh->unique & FUSE_INT_REQ_BIT); > >> + > >> + return 0; > >> + > >> +seterr: > >> + oh->error = err; > >> +err: > >> + return err; > >> +} > >> + > >> +static int fuse_uring_copy_from_ring(struct fuse_ring *ring, > >> + struct fuse_req *req, > >> + struct fuse_ring_ent *ent) > >> +{ > >> + struct fuse_copy_state cs; > >> + struct fuse_args *args = req->args; > >> + struct iov_iter iter; > >> + int err, res; > >> + struct fuse_uring_ent_in_out ring_in_out; > >> + > >> + res = copy_from_user(&ring_in_out, &ent->headers->ring_ent_in_out, > >> + sizeof(ring_in_out)); > >> + if (res) > >> + return -EFAULT; > >> + > >> + err = import_ubuf(ITER_SOURCE, ent->payload, ring->max_payload_sz, > >> + &iter); > >> + if (err) > >> + return err; > >> + > >> + fuse_copy_init(&cs, 0, &iter); > >> + cs.is_uring = 1; > >> + cs.req = req; > >> + > >> + return fuse_copy_out_args(&cs, args, ring_in_out.payload_sz); > >> +} > >> + > >> + /* > >> + * Copy data from the req to the ring buffer > >> + */ > >> +static int fuse_uring_copy_to_ring(struct fuse_ring *ring, struct fuse_req *req, > >> + struct fuse_ring_ent *ent) > >> +{ > >> + struct fuse_copy_state cs; > >> + struct fuse_args *args = req->args; > >> + struct fuse_in_arg *in_args = args->in_args; > >> + int num_args = args->in_numargs; > >> + int err, res; > >> + struct iov_iter iter; > >> + struct fuse_uring_ent_in_out ent_in_out = { > >> + .flags = 0, > >> + .commit_id = ent->commit_id, > >> + }; > >> + > >> + if (WARN_ON(ent_in_out.commit_id == 0)) > >> + return -EINVAL; > >> + > >> + err = import_ubuf(ITER_DEST, ent->payload, ring->max_payload_sz, &iter); > >> + if (err) { > >> + pr_info_ratelimited("fuse: Import of user buffer failed\n"); > >> + return err; > >> + } > >> + > >> + fuse_copy_init(&cs, 1, &iter); > >> + cs.is_uring = 1; > >> + cs.req = req; > >> + > >> + if (num_args > 0) { > >> + /* > >> + * Expectation is that the first argument is the per op header. > >> + * Some op code have that as zero. > >> + */ > >> + if (args->in_args[0].size > 0) { > >> + res = copy_to_user(&ent->headers->op_in, in_args->value, > >> + in_args->size); > >> + err = res > 0 ? -EFAULT : res; > >> + if (err) { > >> + pr_info_ratelimited( > >> + "Copying the header failed.\n"); > >> + return err; > >> + } > >> + } > >> + in_args++; > >> + num_args--; > >> + } > >> + > >> + /* copy the payload */ > >> + err = fuse_copy_args(&cs, num_args, args->in_pages, > >> + (struct fuse_arg *)in_args, 0); > >> + if (err) { > >> + pr_info_ratelimited("%s fuse_copy_args failed\n", __func__); > >> + return err; > >> + } > >> + > >> + ent_in_out.payload_sz = cs.ring.copied_sz; > >> + res = copy_to_user(&ent->headers->ring_ent_in_out, &ent_in_out, > >> + sizeof(ent_in_out)); > >> + err = res > 0 ? -EFAULT : res; > >> + if (err) > >> + return err; > >> + > >> + return 0; > >> +} > >> + > >> +static int > >> +fuse_uring_prepare_send(struct fuse_ring_ent *ring_ent) > >> +{ > >> + struct fuse_ring_queue *queue = ring_ent->queue; > >> + struct fuse_ring *ring = queue->ring; > >> + struct fuse_req *req = ring_ent->fuse_req; > >> + int err, res; > >> + > >> + err = -EIO; > >> + if (WARN_ON(ring_ent->state != FRRS_FUSE_REQ)) { > >> + pr_err("qid=%d ring-req=%p invalid state %d on send\n", > >> + queue->qid, ring_ent, ring_ent->state); > >> + err = -EIO; > >> + goto err; > >> + } > >> + > >> + /* copy the request */ > >> + err = fuse_uring_copy_to_ring(ring, req, ring_ent); > >> + if (unlikely(err)) { > >> + pr_info_ratelimited("Copy to ring failed: %d\n", err); > >> + goto err; > >> + } > >> + > >> + /* copy fuse_in_header */ > >> + res = copy_to_user(&ring_ent->headers->in_out, &req->in.h, > >> + sizeof(req->in.h)); > >> + err = res > 0 ? -EFAULT : res; > >> + if (err) > >> + goto err; > >> + > >> + set_bit(FR_SENT, &req->flags); > >> + return 0; > >> + > >> +err: > >> + fuse_uring_req_end(ring_ent, true, err); > >> + return err; > >> +} > >> + > >> +/* > >> + * Write data to the ring buffer and send the request to userspace, > >> + * userspace will read it > >> + * This is comparable with classical read(/dev/fuse) > >> + */ > >> +static int fuse_uring_send_next_to_ring(struct fuse_ring_ent *ring_ent, > >> + unsigned int issue_flags) > >> +{ > >> + int err = 0; > >> + struct fuse_ring_queue *queue = ring_ent->queue; > >> + > >> + err = fuse_uring_prepare_send(ring_ent); > >> + if (err) > >> + goto err; > >> + > >> + spin_lock(&queue->lock); > >> + ring_ent->state = FRRS_USERSPACE; > >> + list_move(&ring_ent->list, &queue->ent_in_userspace); > >> + spin_unlock(&queue->lock); > >> + > >> + io_uring_cmd_done(ring_ent->cmd, 0, 0, issue_flags); > >> + ring_ent->cmd = NULL; > >> + return 0; > >> + > >> +err: > >> + return err; > >> +} > >> + > >> /* > >> * Make a ring entry available for fuse_req assignment > >> */ > >> @@ -138,6 +376,210 @@ static void fuse_uring_ent_avail(struct fuse_ring_ent *ring_ent, > >> ring_ent->state = FRRS_AVAILABLE; > >> } > >> > >> +/* Used to find the request on SQE commit */ > >> +static void fuse_uring_add_to_pq(struct fuse_ring_ent *ring_ent, > >> + struct fuse_req *req) > >> +{ > >> + struct fuse_ring_queue *queue = ring_ent->queue; > >> + struct fuse_pqueue *fpq = &queue->fpq; > >> + unsigned int hash; > >> + > >> + /* commit_id is the unique id of the request */ > >> + ring_ent->commit_id = req->in.h.unique; > >> + > >> + req->ring_entry = ring_ent; > >> + hash = fuse_req_hash(ring_ent->commit_id); > >> + list_move_tail(&req->list, &fpq->processing[hash]); > >> +} > >> + > >> +/* > >> + * Assign a fuse queue entry to the given entry > >> + */ > >> +static void fuse_uring_add_req_to_ring_ent(struct fuse_ring_ent *ring_ent, > >> + struct fuse_req *req) > >> +{ > >> + struct fuse_ring_queue *queue = ring_ent->queue; > >> + > >> + lockdep_assert_held(&queue->lock); > >> + > >> + if (WARN_ON_ONCE(ring_ent->state != FRRS_AVAILABLE && > >> + ring_ent->state != FRRS_COMMIT)) { > >> + pr_warn("%s qid=%d state=%d\n", __func__, ring_ent->queue->qid, > >> + ring_ent->state); > >> + } > >> + list_del_init(&req->list); > >> + clear_bit(FR_PENDING, &req->flags); > >> + ring_ent->fuse_req = req; > >> + ring_ent->state = FRRS_FUSE_REQ; > >> + list_move(&ring_ent->list, &queue->ent_w_req_queue); > >> + fuse_uring_add_to_pq(ring_ent, req); > >> +} > >> + > >> +/* > >> + * Release the ring entry and fetch the next fuse request if available > >> + * > >> + * @return true if a new request has been fetched > >> + */ > >> +static bool fuse_uring_ent_assign_req(struct fuse_ring_ent *ring_ent) > >> + __must_hold(&queue->lock) > >> +{ > >> + struct fuse_req *req; > >> + struct fuse_ring_queue *queue = ring_ent->queue; > >> + struct list_head *req_queue = &queue->fuse_req_queue; > >> + > >> + lockdep_assert_held(&queue->lock); > >> + > >> + /* get and assign the next entry while it is still holding the lock */ > >> + req = list_first_entry_or_null(req_queue, struct fuse_req, list); > >> + if (req) { > >> + fuse_uring_add_req_to_ring_ent(ring_ent, req); > >> + return true; > >> + } > >> + > >> + return false; > >> +} > >> + > >> +/* > >> + * Read data from the ring buffer, which user space has written to > >> + * This is comparible with handling of classical write(/dev/fuse). > >> + * Also make the ring request available again for new fuse requests. > >> + */ > >> +static void fuse_uring_commit(struct fuse_ring_ent *ring_ent, > >> + unsigned int issue_flags) > >> +{ > >> + struct fuse_ring *ring = ring_ent->queue->ring; > >> + struct fuse_conn *fc = ring->fc; > >> + struct fuse_req *req = ring_ent->fuse_req; > >> + ssize_t err = 0; > >> + bool set_err = false; > >> + > >> + err = copy_from_user(&req->out.h, &ring_ent->headers->in_out, > >> + sizeof(req->out.h)); > >> + if (err) { > >> + req->out.h.error = err; > >> + goto out; > >> + } > >> + > >> + err = fuse_uring_out_header_has_err(&req->out.h, req, fc); > >> + if (err) { > >> + /* req->out.h.error already set */ > >> + goto out; > >> + } > >> + > >> + err = fuse_uring_copy_from_ring(ring, req, ring_ent); > >> + if (err) > >> + set_err = true; > >> + > >> +out: > >> + fuse_uring_req_end(ring_ent, set_err, err); > >> +} > >> + > >> +/* > >> + * Get the next fuse req and send it > >> + */ > >> +static void fuse_uring_next_fuse_req(struct fuse_ring_ent *ring_ent, > >> + struct fuse_ring_queue *queue, > >> + unsigned int issue_flags) > >> +{ > >> + int err; > >> + bool has_next; > >> + > >> +retry: > >> + spin_lock(&queue->lock); > >> + fuse_uring_ent_avail(ring_ent, queue); > >> + has_next = fuse_uring_ent_assign_req(ring_ent); > >> + spin_unlock(&queue->lock); > >> + > >> + if (has_next) { > >> + err = fuse_uring_send_next_to_ring(ring_ent, issue_flags); > >> + if (err) > >> + goto retry; > >> + } > >> +} > >> + > >> +static int fuse_ring_ent_set_commit(struct fuse_ring_ent *ent) > >> +{ > >> + struct fuse_ring_queue *queue = ent->queue; > >> + > >> + lockdep_assert_held(&queue->lock); > >> + > >> + if (WARN_ON_ONCE(ent->state != FRRS_USERSPACE)) > >> + return -EIO; > >> + > >> + ent->state = FRRS_COMMIT; > >> + list_move(&ent->list, &queue->ent_commit_queue); > >> + > >> + return 0; > >> +} > >> + > >> +/* FUSE_URING_CMD_COMMIT_AND_FETCH handler */ > >> +static int fuse_uring_commit_fetch(struct io_uring_cmd *cmd, int issue_flags, > >> + struct fuse_conn *fc) > >> +{ > >> + const struct fuse_uring_cmd_req *cmd_req = io_uring_sqe_cmd(cmd->sqe); > >> + struct fuse_ring_ent *ring_ent; > >> + int err; > >> + struct fuse_ring *ring = fc->ring; > >> + struct fuse_ring_queue *queue; > >> + uint64_t commit_id = READ_ONCE(cmd_req->commit_id); > >> + unsigned int qid = READ_ONCE(cmd_req->qid); > >> + struct fuse_pqueue *fpq; > >> + struct fuse_req *req; > >> + > >> + err = -ENOTCONN; > >> + if (!ring) > >> + return err; > >> + > >> + if (qid >= ring->nr_queues) > >> + return -EINVAL; > >> + > >> + queue = ring->queues[qid]; > >> + if (!queue) > >> + return err; > >> + fpq = &queue->fpq; > >> + > >> + spin_lock(&queue->lock); > >> + /* Find a request based on the unique ID of the fuse request > >> + * This should get revised, as it needs a hash calculation and list > >> + * search. And full struct fuse_pqueue is needed (memory overhead). > >> + * As well as the link from req to ring_ent. > >> + */ > > > > imo, the hash calculation and list search seems ok. I can't think of a > > more optimal way of doing it. Instead of using the full struct > > fuse_pqueue, I think we could just have the "struct list_head > > *processing" defined inside "struct fuse_ring_queue" and change > > fuse_request_find() to take in a list_head. I don't think we need a > > dedicated spinlock for the list either. We can just reuse queue->lock, > > as that's (currently) always held already when the processing list is > > accessed. > > > Please see the attached patch, which uses xarray. Totally untested, though. > I actually found an issue while writing this patch - FR_PENDING was cleared > without holding fiq->lock, but that is important for request_wait_answer(). > If something removes req from the list, we entirely loose the ring entry - > can never be used anymore. Personally I think the attached patch is safer. > > > > > > > >> + req = fuse_request_find(fpq, commit_id); > >> + err = -ENOENT; > >> + if (!req) { > >> + pr_info("qid=%d commit_id %llu not found\n", queue->qid, > >> + commit_id); > >> + spin_unlock(&queue->lock); > >> + return err; > >> + } > >> + list_del_init(&req->list); > >> + ring_ent = req->ring_entry; > >> + req->ring_entry = NULL; > > > > Do we need to set this to NULL, given that the request will be cleaned > > up later in fuse_uring_req_end() anyways? > > It is not explicitly set to NULL in that function. Would you mind to keep > it safe? > > > > >> + > >> + err = fuse_ring_ent_set_commit(ring_ent); > >> + if (err != 0) { > >> + pr_info_ratelimited("qid=%d commit_id %llu state %d", > >> + queue->qid, commit_id, ring_ent->state); > >> + spin_unlock(&queue->lock); > >> + return err; > >> + } > >> + > >> + ring_ent->cmd = cmd; > >> + spin_unlock(&queue->lock); > >> + > >> + /* without the queue lock, as other locks are taken */ > >> + fuse_uring_commit(ring_ent, issue_flags); > >> + > >> + /* > >> + * Fetching the next request is absolutely required as queued > >> + * fuse requests would otherwise not get processed - committing > >> + * and fetching is done in one step vs legacy fuse, which has separated > >> + * read (fetch request) and write (commit result). > >> + */ > >> + fuse_uring_next_fuse_req(ring_ent, queue, issue_flags); > > > > If there's no request ready to read next, then no request will be > > fetched and this will return. However, as I understand it, once the > > uring is registered, userspace should only be interacting with the > > uring via FUSE_IO_URING_CMD_COMMIT_AND_FETCH. However for the case > > where no request was ready to read, it seems like userspace would have > > nothing to commit when it wants to fetch the next request? > > We have > > FUSE_IO_URING_CMD_REGISTER > FUSE_IO_URING_CMD_COMMIT_AND_FETCH > > > After _CMD_REGISTER the corresponding ring-entry is ready to get fuse > requests and waiting. After it gets a request assigned and handles it > by fuse server the _COMMIT_AND_FETCH scheme applies. Did you possibly > miss that _CMD_REGISTER will already have it waiting? > Sorry for the late reply. After _CMD_REGISTER and _COMMIT_AND_FETCH, it seems possible that there is no fuse request waiting until a later time? This is the scenario I'm envisioning: a) uring registers successfully and fetches request through _CMD_REGISTER b) server replies to request and fetches new request through _COMMIT_AND_FETCH c) server replies to request, tries to fetch new request but no request is ready, through _COMMIT_AND_FETCH maybe I'm missing something in my reading of the code, but how will the server then fetch the next request once the request is ready? It will have to commit something in order to fetch it since there's only _COMMIT_AND_FETCH which requires a commit, no? Thanks, Joanne > > > > > A more general question though: I imagine the most common use case > > from the server side is waiting / polling until there is a request to > > fetch. Could we not just do that here in the kernel instead with > > adding a waitqueue mechanism and having fuse_uring_next_fuse_req() > > only return when there is a request available? It seems like that > > would reduce the amount of overhead instead of doing the > > waiting/checking from the server side? > > The io-uring interface says that we should return -EIOCBQUEUED. If we > would wait here, other SQEs that are submitted in parallel by > fuse-server couldn't be handled anymore, as we wouldn't return > to io-uring (all of this is in io-uring task context). > > > > >> + return 0; > >> +} > >> + > >> /* > >> * fuse_uring_req_fetch command handling > >> */ > >> @@ -325,6 +767,14 @@ int __maybe_unused fuse_uring_cmd(struct io_uring_cmd *cmd, > >> return err; > >> } > >> break; > >> + case FUSE_IO_URING_CMD_COMMIT_AND_FETCH: > >> + err = fuse_uring_commit_fetch(cmd, issue_flags, fc); > >> + if (err) { > >> + pr_info_once("FUSE_IO_URING_COMMIT_AND_FETCH failed err=%d\n", > >> + err); > >> + return err; > >> + } > >> + break; > >> default: > >> return -EINVAL; > >> } > >> diff --git a/fs/fuse/dev_uring_i.h b/fs/fuse/dev_uring_i.h > >> index 4e46dd65196d26dabc62dada33b17de9aa511c08..80f1c62d4df7f0ca77c4d5179068df6ffdbf7d85 100644 > >> --- a/fs/fuse/dev_uring_i.h > >> +++ b/fs/fuse/dev_uring_i.h > >> @@ -20,6 +20,9 @@ enum fuse_ring_req_state { > >> /* The ring entry is waiting for new fuse requests */ > >> FRRS_AVAILABLE, > >> > >> + /* The ring entry got assigned a fuse req */ > >> + FRRS_FUSE_REQ, > >> + > >> /* The ring entry is in or on the way to user space */ > >> FRRS_USERSPACE, > >> }; > >> @@ -70,7 +73,16 @@ struct fuse_ring_queue { > >> * entries in the process of being committed or in the process > >> * to be sent to userspace > >> */ > >> + struct list_head ent_w_req_queue; > > > > What does the w in this stand for? I find the name ambiguous here. > > "entry-with-request-queue". Do you have another naming suggestion? > > > Thanks, > Bernd >