On Tue, Jan 07 2025, Bernd Schubert wrote: > On teardown struct file_operations::uring_cmd requests > need to be completed by calling io_uring_cmd_done(). > Not completing all ring entries would result in busy io-uring > tasks giving warning messages in intervals and unreleased > struct file. > > Additionally the fuse connection and with that the ring can > only get released when all io-uring commands are completed. > > Completion is done with ring entries that are > a) in waiting state for new fuse requests - io_uring_cmd_done > is needed > > b) already in userspace - io_uring_cmd_done through teardown > is not needed, the request can just get released. If fuse server > is still active and commits such a ring entry, fuse_uring_cmd() > already checks if the connection is active and then complete the > io-uring itself with -ENOTCONN. I.e. special handling is not > needed. > > This scheme is basically represented by the ring entry state > FRRS_WAIT and FRRS_USERSPACE. > > Entries in state: > - FRRS_INIT: No action needed, do not contribute to > ring->queue_refs yet > - All other states: Are currently processed by other tasks, > async teardown is needed and it has to wait for the two > states above. It could be also solved without an async > teardown task, but would require additional if conditions > in hot code paths. Also in my personal opinion the code > looks cleaner with async teardown. > > Signed-off-by: Bernd Schubert <bschubert@xxxxxxx> > --- > fs/fuse/dev.c | 9 +++ > fs/fuse/dev_uring.c | 198 ++++++++++++++++++++++++++++++++++++++++++++++++++ > fs/fuse/dev_uring_i.h | 51 +++++++++++++ > 3 files changed, 258 insertions(+) > > diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c > index aa33eba51c51dff6af2cdcf60bed9c3f6b4bc0d0..1c21e491e891196c77c7f6135cdc2aece785d399 100644 > --- a/fs/fuse/dev.c > +++ b/fs/fuse/dev.c > @@ -6,6 +6,7 @@ > See the file COPYING. > */ > > +#include "dev_uring_i.h" > #include "fuse_i.h" > #include "fuse_dev_i.h" > > @@ -2291,6 +2292,12 @@ void fuse_abort_conn(struct fuse_conn *fc) > spin_unlock(&fc->lock); > > fuse_dev_end_requests(&to_end); > + > + /* > + * fc->lock must not be taken to avoid conflicts with io-uring > + * locks > + */ > + fuse_uring_abort(fc); > } else { > spin_unlock(&fc->lock); > } > @@ -2302,6 +2309,8 @@ void fuse_wait_aborted(struct fuse_conn *fc) > /* matches implicit memory barrier in fuse_drop_waiting() */ > smp_mb(); > wait_event(fc->blocked_waitq, atomic_read(&fc->num_waiting) == 0); > + > + fuse_uring_wait_stopped_queues(fc); > } > > int fuse_dev_release(struct inode *inode, struct file *file) > diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c > index f44e66a7ea577390da87e9ac7d118a9416898c28..01a908b2ef9ada14b759ca047eab40b4c4431d89 100644 > --- a/fs/fuse/dev_uring.c > +++ b/fs/fuse/dev_uring.c > @@ -39,6 +39,37 @@ static void fuse_uring_req_end(struct fuse_ring_ent *ring_ent, bool set_err, > ring_ent->fuse_req = NULL; > } > > +/* Abort all list queued request on the given ring queue */ > +static void fuse_uring_abort_end_queue_requests(struct fuse_ring_queue *queue) > +{ > + struct fuse_req *req; > + LIST_HEAD(req_list); > + > + spin_lock(&queue->lock); > + list_for_each_entry(req, &queue->fuse_req_queue, list) > + clear_bit(FR_PENDING, &req->flags); > + list_splice_init(&queue->fuse_req_queue, &req_list); > + spin_unlock(&queue->lock); > + > + /* must not hold queue lock to avoid order issues with fi->lock */ > + fuse_dev_end_requests(&req_list); > +} > + > +void fuse_uring_abort_end_requests(struct fuse_ring *ring) > +{ > + int qid; > + struct fuse_ring_queue *queue; > + > + for (qid = 0; qid < ring->nr_queues; qid++) { > + queue = READ_ONCE(ring->queues[qid]); > + if (!queue) > + continue; > + > + queue->stopped = true; > + fuse_uring_abort_end_queue_requests(queue); > + } > +} > + > void fuse_uring_destruct(struct fuse_conn *fc) > { > struct fuse_ring *ring = fc->ring; > @@ -98,10 +129,13 @@ static struct fuse_ring *fuse_uring_create(struct fuse_conn *fc) > goto out_err; > } > > + init_waitqueue_head(&ring->stop_waitq); > + > fc->ring = ring; > ring->nr_queues = nr_queues; > ring->fc = fc; > ring->max_payload_sz = max_payload_size; > + atomic_set(&ring->queue_refs, 0); > > spin_unlock(&fc->lock); > return ring; > @@ -158,6 +192,166 @@ static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring, > return queue; > } > > +static void fuse_uring_stop_fuse_req_end(struct fuse_ring_ent *ent) > +{ > + struct fuse_req *req = ent->fuse_req; > + > + /* remove entry from fuse_pqueue->processing */ > + list_del_init(&req->list); > + ent->fuse_req = NULL; > + clear_bit(FR_SENT, &req->flags); > + req->out.h.error = -ECONNABORTED; > + fuse_request_end(req); > +} > + > +/* > + * Release a request/entry on connection tear down > + */ > +static void fuse_uring_entry_teardown(struct fuse_ring_ent *ent) > +{ > + if (ent->cmd) { > + io_uring_cmd_done(ent->cmd, -ENOTCONN, 0, IO_URING_F_UNLOCKED); > + ent->cmd = NULL; > + } > + > + if (ent->fuse_req) > + fuse_uring_stop_fuse_req_end(ent); > + > + list_del_init(&ent->list); > + kfree(ent); > +} > + > +static void fuse_uring_stop_list_entries(struct list_head *head, > + struct fuse_ring_queue *queue, > + enum fuse_ring_req_state exp_state) > +{ > + struct fuse_ring *ring = queue->ring; > + struct fuse_ring_ent *ent, *next; > + ssize_t queue_refs = SSIZE_MAX; > + LIST_HEAD(to_teardown); > + > + spin_lock(&queue->lock); > + list_for_each_entry_safe(ent, next, head, list) { > + if (ent->state != exp_state) { > + pr_warn("entry teardown qid=%d state=%d expected=%d", > + queue->qid, ent->state, exp_state); > + continue; > + } > + > + list_move(&ent->list, &to_teardown); > + } > + spin_unlock(&queue->lock); > + > + /* no queue lock to avoid lock order issues */ > + list_for_each_entry_safe(ent, next, &to_teardown, list) { > + fuse_uring_entry_teardown(ent); > + queue_refs = atomic_dec_return(&ring->queue_refs); > + WARN_ON_ONCE(queue_refs < 0); > + } > +} > + > +static void fuse_uring_teardown_entries(struct fuse_ring_queue *queue) > +{ > + fuse_uring_stop_list_entries(&queue->ent_in_userspace, queue, > + FRRS_USERSPACE); > + fuse_uring_stop_list_entries(&queue->ent_avail_queue, queue, > + FRRS_AVAILABLE); > +} > + > +/* > + * Log state debug info > + */ > +static void fuse_uring_log_ent_state(struct fuse_ring *ring) > +{ > + int qid; > + struct fuse_ring_ent *ent; > + > + for (qid = 0; qid < ring->nr_queues; qid++) { > + struct fuse_ring_queue *queue = ring->queues[qid]; > + > + if (!queue) > + continue; > + > + spin_lock(&queue->lock); > + /* > + * Log entries from the intermediate queue, the other queues > + * should be empty > + */ > + list_for_each_entry(ent, &queue->ent_w_req_queue, list) { > + pr_info(" ent-req-queue ring=%p qid=%d ent=%p state=%d\n", > + ring, qid, ent, ent->state); > + } > + list_for_each_entry(ent, &queue->ent_commit_queue, list) { > + pr_info(" ent-req-queue ring=%p qid=%d ent=%p state=%d\n", Probably copy&paste: the above string 'ent-req-queue' should probably be 'ent-commit-queue' or something similar. > + ring, qid, ent, ent->state); > + } > + spin_unlock(&queue->lock); > + } > + ring->stop_debug_log = 1; > +} > + > +static void fuse_uring_async_stop_queues(struct work_struct *work) > +{ > + int qid; > + struct fuse_ring *ring = > + container_of(work, struct fuse_ring, async_teardown_work.work); > + > + /* XXX code dup */ Yeah, I guess the delayed work callback could simply call fuse_uring_stop_queues(), which would do different things depending on the value of ring->teardown_time (0 or jiffies). Which could also be confusing. > > + for (qid = 0; qid < ring->nr_queues; qid++) { > + struct fuse_ring_queue *queue = READ_ONCE(ring->queues[qid]); > + > + if (!queue) > + continue; > + > + fuse_uring_teardown_entries(queue); > + } > + > + /* > + * Some ring entries are might be in the middle of IO operations, nit: remove extra 'are'. > + * i.e. in process to get handled by file_operations::uring_cmd > + * or on the way to userspace - we could handle that with conditions in > + * run time code, but easier/cleaner to have an async tear down handler > + * If there are still queue references left > + */ > + if (atomic_read(&ring->queue_refs) > 0) { > + if (time_after(jiffies, > + ring->teardown_time + FUSE_URING_TEARDOWN_TIMEOUT)) > + fuse_uring_log_ent_state(ring); > + > + schedule_delayed_work(&ring->async_teardown_work, > + FUSE_URING_TEARDOWN_INTERVAL); > + } else { > + wake_up_all(&ring->stop_waitq); > + } > +} > + > +/* > + * Stop the ring queues > + */ > +void fuse_uring_stop_queues(struct fuse_ring *ring) > +{ > + int qid; > + > + for (qid = 0; qid < ring->nr_queues; qid++) { > + struct fuse_ring_queue *queue = READ_ONCE(ring->queues[qid]); > + > + if (!queue) > + continue; > + > + fuse_uring_teardown_entries(queue); > + } > + > + if (atomic_read(&ring->queue_refs) > 0) { > + ring->teardown_time = jiffies; > + INIT_DELAYED_WORK(&ring->async_teardown_work, > + fuse_uring_async_stop_queues); > + schedule_delayed_work(&ring->async_teardown_work, > + FUSE_URING_TEARDOWN_INTERVAL); > + } else { > + wake_up_all(&ring->stop_waitq); > + } > +} > + > /* > * Checks for errors and stores it into the request > */ > @@ -538,6 +732,9 @@ static int fuse_uring_commit_fetch(struct io_uring_cmd *cmd, int issue_flags, > return err; > fpq = &queue->fpq; > > + if (!READ_ONCE(fc->connected) || READ_ONCE(queue->stopped)) > + return err; > + > spin_lock(&queue->lock); > /* Find a request based on the unique ID of the fuse request > * This should get revised, as it needs a hash calculation and list > @@ -667,6 +864,7 @@ fuse_uring_create_ring_ent(struct io_uring_cmd *cmd, > return ERR_PTR(err); > } > > + atomic_inc(&ring->queue_refs); > return ent; > } > > diff --git a/fs/fuse/dev_uring_i.h b/fs/fuse/dev_uring_i.h > index 80f1c62d4df7f0ca77c4d5179068df6ffdbf7d85..ee5aeccae66caaf9a4dccbbbc785820836182668 100644 > --- a/fs/fuse/dev_uring_i.h > +++ b/fs/fuse/dev_uring_i.h > @@ -11,6 +11,9 @@ > > #ifdef CONFIG_FUSE_IO_URING > > +#define FUSE_URING_TEARDOWN_TIMEOUT (5 * HZ) > +#define FUSE_URING_TEARDOWN_INTERVAL (HZ/20) > + > enum fuse_ring_req_state { > FRRS_INVALID = 0, > > @@ -83,6 +86,8 @@ struct fuse_ring_queue { > struct list_head fuse_req_queue; > > struct fuse_pqueue fpq; > + > + bool stopped; > }; > > /** > @@ -100,12 +105,51 @@ struct fuse_ring { > size_t max_payload_sz; > > struct fuse_ring_queue **queues; > + /* > + * Log ring entry states onces on stop when entries cannot be typo: "once" > + * released > + */ > + unsigned int stop_debug_log : 1; > + > + wait_queue_head_t stop_waitq; > + > + /* async tear down */ > + struct delayed_work async_teardown_work; > + > + /* log */ > + unsigned long teardown_time; > + > + atomic_t queue_refs; > }; > > bool fuse_uring_enabled(void); > void fuse_uring_destruct(struct fuse_conn *fc); > +void fuse_uring_stop_queues(struct fuse_ring *ring); > +void fuse_uring_abort_end_requests(struct fuse_ring *ring); > int fuse_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags); > > +static inline void fuse_uring_abort(struct fuse_conn *fc) > +{ > + struct fuse_ring *ring = fc->ring; > + > + if (ring == NULL) > + return; > + > + if (atomic_read(&ring->queue_refs) > 0) { > + fuse_uring_abort_end_requests(ring); > + fuse_uring_stop_queues(ring); > + } > +} > + > +static inline void fuse_uring_wait_stopped_queues(struct fuse_conn *fc) > +{ > + struct fuse_ring *ring = fc->ring; > + > + if (ring) > + wait_event(ring->stop_waitq, > + atomic_read(&ring->queue_refs) == 0); > +} > + > #else /* CONFIG_FUSE_IO_URING */ > > struct fuse_ring; > @@ -123,6 +167,13 @@ static inline bool fuse_uring_enabled(void) > return false; > } > > +static inline void fuse_uring_abort(struct fuse_conn *fc) > +{ > +} > + > +static inline void fuse_uring_wait_stopped_queues(struct fuse_conn *fc) > +{ > +} > #endif /* CONFIG_FUSE_IO_URING */ > > #endif /* _FS_FUSE_DEV_URING_I_H */ > > -- > 2.43.0 > > -- Luís