Signed-off-by: Bernd Schubert <bschubert@xxxxxxx> --- fs/fuse/dev.c | 10 +++ fs/fuse/dev_uring.c | 194 ++++++++++++++++++++++++++++++++++++++++++++++++++ fs/fuse/dev_uring_i.h | 67 +++++++++++++++++ 3 files changed, 271 insertions(+) diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c index a7d26440de39..6ffd216b27c8 100644 --- a/fs/fuse/dev.c +++ b/fs/fuse/dev.c @@ -2202,6 +2202,8 @@ void fuse_abort_conn(struct fuse_conn *fc) fc->connected = 0; spin_unlock(&fc->bg_lock); + fuse_uring_set_stopped(fc); + fuse_set_initialized(fc); list_for_each_entry(fud, &fc->devices, entry) { struct fuse_pqueue *fpq = &fud->pq; @@ -2245,6 +2247,12 @@ void fuse_abort_conn(struct fuse_conn *fc) spin_unlock(&fc->lock); fuse_dev_end_requests(&to_end); + + /* + * fc->lock must not be taken to avoid conflicts with io-uring + * locks + */ + fuse_uring_abort(fc); } else { spin_unlock(&fc->lock); } @@ -2256,6 +2264,8 @@ void fuse_wait_aborted(struct fuse_conn *fc) /* matches implicit memory barrier in fuse_drop_waiting() */ smp_mb(); wait_event(fc->blocked_waitq, atomic_read(&fc->num_waiting) == 0); + + fuse_uring_wait_stopped_queues(fc); } int fuse_dev_release(struct inode *inode, struct file *file) diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c index 5269b3f8891e..6001ba4d6e82 100644 --- a/fs/fuse/dev_uring.c +++ b/fs/fuse/dev_uring.c @@ -48,6 +48,44 @@ fuse_uring_async_send_to_ring(struct io_uring_cmd *cmd, io_uring_cmd_done(cmd, 0, 0, issue_flags); } +/* Abort all list queued request on the given ring queue */ +static void fuse_uring_abort_end_queue_requests(struct fuse_ring_queue *queue) +{ + struct fuse_req *req; + LIST_HEAD(sync_list); + LIST_HEAD(async_list); + + spin_lock(&queue->lock); + + list_for_each_entry(req, &queue->sync_fuse_req_queue, list) + clear_bit(FR_PENDING, &req->flags); + list_for_each_entry(req, &queue->async_fuse_req_queue, list) + clear_bit(FR_PENDING, &req->flags); + + list_splice_init(&queue->async_fuse_req_queue, &sync_list); + list_splice_init(&queue->sync_fuse_req_queue, &async_list); + + spin_unlock(&queue->lock); + + /* must not hold queue lock to avoid order issues with fi->lock */ + fuse_dev_end_requests(&sync_list); + fuse_dev_end_requests(&async_list); +} + +void fuse_uring_abort_end_requests(struct fuse_ring *ring) +{ + int qid; + + for (qid = 0; qid < ring->nr_queues; qid++) { + struct fuse_ring_queue *queue = fuse_uring_get_queue(ring, qid); + + if (!queue->configured) + continue; + + fuse_uring_abort_end_queue_requests(queue); + } +} + /* Update conn limits according to ring values */ static void fuse_uring_conn_cfg_limits(struct fuse_ring *ring) { @@ -361,6 +399,162 @@ int fuse_uring_queue_cfg(struct fuse_ring *ring, return 0; } +static void fuse_uring_stop_fuse_req_end(struct fuse_ring_ent *ent) +{ + struct fuse_req *req = ent->fuse_req; + + ent->fuse_req = NULL; + clear_bit(FRRS_FUSE_REQ, &ent->state); + clear_bit(FR_SENT, &req->flags); + req->out.h.error = -ECONNABORTED; + fuse_request_end(req); +} + +/* + * Release a request/entry on connection shutdown + */ +static bool fuse_uring_try_entry_stop(struct fuse_ring_ent *ent, + bool need_cmd_done) + __must_hold(ent->queue->lock) +{ + struct fuse_ring_queue *queue = ent->queue; + bool released = false; + + if (test_bit(FRRS_FREED, &ent->state)) + goto out; /* no work left, freed before */ + + if (ent->state == BIT(FRRS_INIT) || test_bit(FRRS_WAIT, &ent->state) || + test_bit(FRRS_USERSPACE, &ent->state)) { + set_bit(FRRS_FREED, &ent->state); + + if (need_cmd_done) { + pr_devel("qid=%d tag=%d sending cmd_done\n", queue->qid, + ent->tag); + + spin_unlock(&queue->lock); + io_uring_cmd_done(ent->cmd, -ENOTCONN, 0, + IO_URING_F_UNLOCKED); + spin_lock(&queue->lock); + } + + if (ent->fuse_req) + fuse_uring_stop_fuse_req_end(ent); + released = true; + } +out: + return released; +} + +static void fuse_uring_stop_list_entries(struct list_head *head, + struct fuse_ring_queue *queue, + bool need_cmd_done) +{ + struct fuse_ring *ring = queue->ring; + struct fuse_ring_ent *ent, *next; + ssize_t queue_refs = SSIZE_MAX; + + list_for_each_entry_safe(ent, next, head, list) { + if (fuse_uring_try_entry_stop(ent, need_cmd_done)) { + queue_refs = atomic_dec_return(&ring->queue_refs); + list_del_init(&ent->list); + } + + if (WARN_ON_ONCE(queue_refs < 0)) + pr_warn("qid=%d queue_refs=%zd", queue->qid, + queue_refs); + } +} + +static void fuse_uring_stop_queue(struct fuse_ring_queue *queue) + __must_hold(&queue->lock) +{ + fuse_uring_stop_list_entries(&queue->ent_in_userspace, queue, false); + fuse_uring_stop_list_entries(&queue->async_ent_avail_queue, queue, true); + fuse_uring_stop_list_entries(&queue->sync_ent_avail_queue, queue, true); +} + +/* + * Log state debug info + */ +static void fuse_uring_stop_ent_state(struct fuse_ring *ring) +{ + int qid, tag; + + for (qid = 0; qid < ring->nr_queues; qid++) { + struct fuse_ring_queue *queue = fuse_uring_get_queue(ring, qid); + + for (tag = 0; tag < ring->queue_depth; tag++) { + struct fuse_ring_ent *ent = &queue->ring_ent[tag]; + + if (!test_bit(FRRS_FREED, &ent->state)) + pr_info("ring=%p qid=%d tag=%d state=%lu\n", + ring, qid, tag, ent->state); + } + } + ring->stop_debug_log = 1; +} + +static void fuse_uring_async_stop_queues(struct work_struct *work) +{ + int qid; + struct fuse_ring *ring = + container_of(work, struct fuse_ring, stop_work.work); + + for (qid = 0; qid < ring->nr_queues; qid++) { + struct fuse_ring_queue *queue = fuse_uring_get_queue(ring, qid); + + if (!queue->configured) + continue; + + spin_lock(&queue->lock); + fuse_uring_stop_queue(queue); + spin_unlock(&queue->lock); + } + + if (atomic_read(&ring->queue_refs) > 0) { + if (time_after(jiffies, + ring->stop_time + FUSE_URING_STOP_WARN_TIMEOUT)) + fuse_uring_stop_ent_state(ring); + + pr_info("ring=%p scheduling intervalled queue stop\n", ring); + + schedule_delayed_work(&ring->stop_work, + FUSE_URING_STOP_INTERVAL); + } else { + wake_up_all(&ring->stop_waitq); + } +} + +/* + * Stop the ring queues + */ +void fuse_uring_stop_queues(struct fuse_ring *ring) +{ + int qid; + + for (qid = 0; qid < ring->nr_queues; qid++) { + struct fuse_ring_queue *queue = fuse_uring_get_queue(ring, qid); + + if (!queue->configured) + continue; + + spin_lock(&queue->lock); + fuse_uring_stop_queue(queue); + spin_unlock(&queue->lock); + } + + if (atomic_read(&ring->queue_refs) > 0) { + pr_info("ring=%p scheduling intervalled queue stop\n", ring); + ring->stop_time = jiffies; + INIT_DELAYED_WORK(&ring->stop_work, + fuse_uring_async_stop_queues); + schedule_delayed_work(&ring->stop_work, + FUSE_URING_STOP_INTERVAL); + } else { + wake_up_all(&ring->stop_waitq); + } +} + /* * Checks for errors and stores it into the request */ diff --git a/fs/fuse/dev_uring_i.h b/fs/fuse/dev_uring_i.h index b2be67bb2fa7..e5fc84e2f3ea 100644 --- a/fs/fuse/dev_uring_i.h +++ b/fs/fuse/dev_uring_i.h @@ -16,6 +16,9 @@ /* IORING_MAX_ENTRIES */ #define FUSE_URING_MAX_QUEUE_DEPTH 32768 +#define FUSE_URING_STOP_WARN_TIMEOUT (5 * HZ) +#define FUSE_URING_STOP_INTERVAL (HZ/20) + enum fuse_ring_req_state { /* request is basially initialized */ @@ -203,6 +206,7 @@ int fuse_uring_mmap(struct file *filp, struct vm_area_struct *vma); int fuse_uring_queue_cfg(struct fuse_ring *ring, struct fuse_ring_queue_config *qcfg); void fuse_uring_ring_destruct(struct fuse_ring *ring); +void fuse_uring_stop_queues(struct fuse_ring *ring); int fuse_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags); static inline void fuse_uring_conn_init(struct fuse_ring *ring, @@ -275,6 +279,58 @@ static inline bool fuse_per_core_queue(struct fuse_conn *fc) return fc->ring && fc->ring->per_core_queue; } +static inline void fuse_uring_set_stopped_queues(struct fuse_ring *ring) +{ + int qid; + + for (qid = 0; qid < ring->nr_queues; qid++) { + struct fuse_ring_queue *queue = fuse_uring_get_queue(ring, qid); + + if (!queue->configured) + continue; + + spin_lock(&queue->lock); + queue->stopped = 1; + spin_unlock(&queue->lock); + } +} + +/* + * Set per queue aborted flag + */ +static inline void fuse_uring_set_stopped(struct fuse_conn *fc) + __must_hold(fc->lock) +{ + if (fc->ring == NULL) + return; + + fc->ring->ready = false; + + fuse_uring_set_stopped_queues(fc->ring); +} + +static inline void fuse_uring_abort(struct fuse_conn *fc) +{ + struct fuse_ring *ring = fc->ring; + + if (ring == NULL) + return; + + if (ring->configured && atomic_read(&ring->queue_refs) > 0) { + fuse_uring_abort_end_requests(ring); + fuse_uring_stop_queues(ring); + } +} + +static inline void fuse_uring_wait_stopped_queues(struct fuse_conn *fc) +{ + struct fuse_ring *ring = fc->ring; + + if (ring && ring->configured) + wait_event(ring->stop_waitq, + atomic_read(&ring->queue_refs) == 0); +} + #else /* CONFIG_FUSE_IO_URING */ struct fuse_ring; @@ -298,6 +354,17 @@ static inline bool fuse_per_core_queue(struct fuse_conn *fc) return false; } +static inline void fuse_uring_set_stopped(struct fuse_conn *fc) +{ +} + +static inline void fuse_uring_abort(struct fuse_conn *fc) +{ +} + +static inline void fuse_uring_wait_stopped_queues(struct fuse_conn *fc) +{ +} #endif /* CONFIG_FUSE_IO_URING */ -- 2.40.1