Re: [PATCH v9 11/17] fuse: {io-uring} Handle teardown of ring entries

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Tue, Jan 07 2025, Bernd Schubert wrote:

> On teardown struct file_operations::uring_cmd requests
> need to be completed by calling io_uring_cmd_done().
> Not completing all ring entries would result in busy io-uring
> tasks giving warning messages in intervals and unreleased
> struct file.
>
> Additionally the fuse connection and with that the ring can
> only get released when all io-uring commands are completed.
>
> Completion is done with ring entries that are
> a) in waiting state for new fuse requests - io_uring_cmd_done
> is needed
>
> b) already in userspace - io_uring_cmd_done through teardown
> is not needed, the request can just get released. If fuse server
> is still active and commits such a ring entry, fuse_uring_cmd()
> already checks if the connection is active and then complete the
> io-uring itself with -ENOTCONN. I.e. special handling is not
> needed.
>
> This scheme is basically represented by the ring entry state
> FRRS_WAIT and FRRS_USERSPACE.
>
> Entries in state:
> - FRRS_INIT: No action needed, do not contribute to
>   ring->queue_refs yet
> - All other states: Are currently processed by other tasks,
>   async teardown is needed and it has to wait for the two
>   states above. It could be also solved without an async
>   teardown task, but would require additional if conditions
>   in hot code paths. Also in my personal opinion the code
>   looks cleaner with async teardown.
>
> Signed-off-by: Bernd Schubert <bschubert@xxxxxxx>
> ---
>  fs/fuse/dev.c         |   9 +++
>  fs/fuse/dev_uring.c   | 198 ++++++++++++++++++++++++++++++++++++++++++++++++++
>  fs/fuse/dev_uring_i.h |  51 +++++++++++++
>  3 files changed, 258 insertions(+)
>
> diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
> index aa33eba51c51dff6af2cdcf60bed9c3f6b4bc0d0..1c21e491e891196c77c7f6135cdc2aece785d399 100644
> --- a/fs/fuse/dev.c
> +++ b/fs/fuse/dev.c
> @@ -6,6 +6,7 @@
>    See the file COPYING.
>  */
>  
> +#include "dev_uring_i.h"
>  #include "fuse_i.h"
>  #include "fuse_dev_i.h"
>  
> @@ -2291,6 +2292,12 @@ void fuse_abort_conn(struct fuse_conn *fc)
>  		spin_unlock(&fc->lock);
>  
>  		fuse_dev_end_requests(&to_end);
> +
> +		/*
> +		 * fc->lock must not be taken to avoid conflicts with io-uring
> +		 * locks
> +		 */
> +		fuse_uring_abort(fc);
>  	} else {
>  		spin_unlock(&fc->lock);
>  	}
> @@ -2302,6 +2309,8 @@ void fuse_wait_aborted(struct fuse_conn *fc)
>  	/* matches implicit memory barrier in fuse_drop_waiting() */
>  	smp_mb();
>  	wait_event(fc->blocked_waitq, atomic_read(&fc->num_waiting) == 0);
> +
> +	fuse_uring_wait_stopped_queues(fc);
>  }
>  
>  int fuse_dev_release(struct inode *inode, struct file *file)
> diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c
> index f44e66a7ea577390da87e9ac7d118a9416898c28..01a908b2ef9ada14b759ca047eab40b4c4431d89 100644
> --- a/fs/fuse/dev_uring.c
> +++ b/fs/fuse/dev_uring.c
> @@ -39,6 +39,37 @@ static void fuse_uring_req_end(struct fuse_ring_ent *ring_ent, bool set_err,
>  	ring_ent->fuse_req = NULL;
>  }
>  
> +/* Abort all list queued request on the given ring queue */
> +static void fuse_uring_abort_end_queue_requests(struct fuse_ring_queue *queue)
> +{
> +	struct fuse_req *req;
> +	LIST_HEAD(req_list);
> +
> +	spin_lock(&queue->lock);
> +	list_for_each_entry(req, &queue->fuse_req_queue, list)
> +		clear_bit(FR_PENDING, &req->flags);
> +	list_splice_init(&queue->fuse_req_queue, &req_list);
> +	spin_unlock(&queue->lock);
> +
> +	/* must not hold queue lock to avoid order issues with fi->lock */
> +	fuse_dev_end_requests(&req_list);
> +}
> +
> +void fuse_uring_abort_end_requests(struct fuse_ring *ring)
> +{
> +	int qid;
> +	struct fuse_ring_queue *queue;
> +
> +	for (qid = 0; qid < ring->nr_queues; qid++) {
> +		queue = READ_ONCE(ring->queues[qid]);
> +		if (!queue)
> +			continue;
> +
> +		queue->stopped = true;
> +		fuse_uring_abort_end_queue_requests(queue);
> +	}
> +}
> +
>  void fuse_uring_destruct(struct fuse_conn *fc)
>  {
>  	struct fuse_ring *ring = fc->ring;
> @@ -98,10 +129,13 @@ static struct fuse_ring *fuse_uring_create(struct fuse_conn *fc)
>  		goto out_err;
>  	}
>  
> +	init_waitqueue_head(&ring->stop_waitq);
> +
>  	fc->ring = ring;
>  	ring->nr_queues = nr_queues;
>  	ring->fc = fc;
>  	ring->max_payload_sz = max_payload_size;
> +	atomic_set(&ring->queue_refs, 0);
>  
>  	spin_unlock(&fc->lock);
>  	return ring;
> @@ -158,6 +192,166 @@ static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring,
>  	return queue;
>  }
>  
> +static void fuse_uring_stop_fuse_req_end(struct fuse_ring_ent *ent)
> +{
> +	struct fuse_req *req = ent->fuse_req;
> +
> +	/* remove entry from fuse_pqueue->processing */
> +	list_del_init(&req->list);
> +	ent->fuse_req = NULL;
> +	clear_bit(FR_SENT, &req->flags);
> +	req->out.h.error = -ECONNABORTED;
> +	fuse_request_end(req);
> +}
> +
> +/*
> + * Release a request/entry on connection tear down
> + */
> +static void fuse_uring_entry_teardown(struct fuse_ring_ent *ent)
> +{
> +	if (ent->cmd) {
> +		io_uring_cmd_done(ent->cmd, -ENOTCONN, 0, IO_URING_F_UNLOCKED);
> +		ent->cmd = NULL;
> +	}
> +
> +	if (ent->fuse_req)
> +		fuse_uring_stop_fuse_req_end(ent);
> +
> +	list_del_init(&ent->list);
> +	kfree(ent);
> +}
> +
> +static void fuse_uring_stop_list_entries(struct list_head *head,
> +					 struct fuse_ring_queue *queue,
> +					 enum fuse_ring_req_state exp_state)
> +{
> +	struct fuse_ring *ring = queue->ring;
> +	struct fuse_ring_ent *ent, *next;
> +	ssize_t queue_refs = SSIZE_MAX;
> +	LIST_HEAD(to_teardown);
> +
> +	spin_lock(&queue->lock);
> +	list_for_each_entry_safe(ent, next, head, list) {
> +		if (ent->state != exp_state) {
> +			pr_warn("entry teardown qid=%d state=%d expected=%d",
> +				queue->qid, ent->state, exp_state);
> +			continue;
> +		}
> +
> +		list_move(&ent->list, &to_teardown);
> +	}
> +	spin_unlock(&queue->lock);
> +
> +	/* no queue lock to avoid lock order issues */
> +	list_for_each_entry_safe(ent, next, &to_teardown, list) {
> +		fuse_uring_entry_teardown(ent);
> +		queue_refs = atomic_dec_return(&ring->queue_refs);
> +		WARN_ON_ONCE(queue_refs < 0);
> +	}
> +}
> +
> +static void fuse_uring_teardown_entries(struct fuse_ring_queue *queue)
> +{
> +	fuse_uring_stop_list_entries(&queue->ent_in_userspace, queue,
> +				     FRRS_USERSPACE);
> +	fuse_uring_stop_list_entries(&queue->ent_avail_queue, queue,
> +				     FRRS_AVAILABLE);
> +}
> +
> +/*
> + * Log state debug info
> + */
> +static void fuse_uring_log_ent_state(struct fuse_ring *ring)
> +{
> +	int qid;
> +	struct fuse_ring_ent *ent;
> +
> +	for (qid = 0; qid < ring->nr_queues; qid++) {
> +		struct fuse_ring_queue *queue = ring->queues[qid];
> +
> +		if (!queue)
> +			continue;
> +
> +		spin_lock(&queue->lock);
> +		/*
> +		 * Log entries from the intermediate queue, the other queues
> +		 * should be empty
> +		 */
> +		list_for_each_entry(ent, &queue->ent_w_req_queue, list) {
> +			pr_info(" ent-req-queue ring=%p qid=%d ent=%p state=%d\n",
> +				ring, qid, ent, ent->state);
> +		}
> +		list_for_each_entry(ent, &queue->ent_commit_queue, list) {
> +			pr_info(" ent-req-queue ring=%p qid=%d ent=%p state=%d\n",

Probably copy&paste: the above string 'ent-req-queue' should probably be
'ent-commit-queue' or something similar.

> +				ring, qid, ent, ent->state);
> +		}
> +		spin_unlock(&queue->lock);
> +	}
> +	ring->stop_debug_log = 1;
> +}
> +
> +static void fuse_uring_async_stop_queues(struct work_struct *work)
> +{
> +	int qid;
> +	struct fuse_ring *ring =
> +		container_of(work, struct fuse_ring, async_teardown_work.work);
> +
> +	/* XXX code dup */

Yeah, I guess the delayed work callback could simply call
fuse_uring_stop_queues(), which would do different things depending on the
value of ring->teardown_time (0 or jiffies).  Which could also be
confusing.

> 
> +	for (qid = 0; qid < ring->nr_queues; qid++) {
> +		struct fuse_ring_queue *queue = READ_ONCE(ring->queues[qid]);
> +
> +		if (!queue)
> +			continue;
> +
> +		fuse_uring_teardown_entries(queue);
> +	}
> +
> +	/*
> +	 * Some ring entries are might be in the middle of IO operations,

nit: remove extra 'are'.

> +	 * i.e. in process to get handled by file_operations::uring_cmd
> +	 * or on the way to userspace - we could handle that with conditions in
> +	 * run time code, but easier/cleaner to have an async tear down handler
> +	 * If there are still queue references left
> +	 */
> +	if (atomic_read(&ring->queue_refs) > 0) {
> +		if (time_after(jiffies,
> +			       ring->teardown_time + FUSE_URING_TEARDOWN_TIMEOUT))
> +			fuse_uring_log_ent_state(ring);
> +
> +		schedule_delayed_work(&ring->async_teardown_work,
> +				      FUSE_URING_TEARDOWN_INTERVAL);
> +	} else {
> +		wake_up_all(&ring->stop_waitq);
> +	}
> +}
> +
> +/*
> + * Stop the ring queues
> + */
> +void fuse_uring_stop_queues(struct fuse_ring *ring)
> +{
> +	int qid;
> +
> +	for (qid = 0; qid < ring->nr_queues; qid++) {
> +		struct fuse_ring_queue *queue = READ_ONCE(ring->queues[qid]);
> +
> +		if (!queue)
> +			continue;
> +
> +		fuse_uring_teardown_entries(queue);
> +	}
> +
> +	if (atomic_read(&ring->queue_refs) > 0) {
> +		ring->teardown_time = jiffies;
> +		INIT_DELAYED_WORK(&ring->async_teardown_work,
> +				  fuse_uring_async_stop_queues);
> +		schedule_delayed_work(&ring->async_teardown_work,
> +				      FUSE_URING_TEARDOWN_INTERVAL);
> +	} else {
> +		wake_up_all(&ring->stop_waitq);
> +	}
> +}
> +
>  /*
>   * Checks for errors and stores it into the request
>   */
> @@ -538,6 +732,9 @@ static int fuse_uring_commit_fetch(struct io_uring_cmd *cmd, int issue_flags,
>  		return err;
>  	fpq = &queue->fpq;
>  
> +	if (!READ_ONCE(fc->connected) || READ_ONCE(queue->stopped))
> +		return err;
> +
>  	spin_lock(&queue->lock);
>  	/* Find a request based on the unique ID of the fuse request
>  	 * This should get revised, as it needs a hash calculation and list
> @@ -667,6 +864,7 @@ fuse_uring_create_ring_ent(struct io_uring_cmd *cmd,
>  		return ERR_PTR(err);
>  	}
>  
> +	atomic_inc(&ring->queue_refs);
>  	return ent;
>  }
>  
> diff --git a/fs/fuse/dev_uring_i.h b/fs/fuse/dev_uring_i.h
> index 80f1c62d4df7f0ca77c4d5179068df6ffdbf7d85..ee5aeccae66caaf9a4dccbbbc785820836182668 100644
> --- a/fs/fuse/dev_uring_i.h
> +++ b/fs/fuse/dev_uring_i.h
> @@ -11,6 +11,9 @@
>  
>  #ifdef CONFIG_FUSE_IO_URING
>  
> +#define FUSE_URING_TEARDOWN_TIMEOUT (5 * HZ)
> +#define FUSE_URING_TEARDOWN_INTERVAL (HZ/20)
> +
>  enum fuse_ring_req_state {
>  	FRRS_INVALID = 0,
>  
> @@ -83,6 +86,8 @@ struct fuse_ring_queue {
>  	struct list_head fuse_req_queue;
>  
>  	struct fuse_pqueue fpq;
> +
> +	bool stopped;
>  };
>  
>  /**
> @@ -100,12 +105,51 @@ struct fuse_ring {
>  	size_t max_payload_sz;
>  
>  	struct fuse_ring_queue **queues;
> +	/*
> +	 * Log ring entry states onces on stop when entries cannot be

typo: "once"

> +	 * released
> +	 */
> +	unsigned int stop_debug_log : 1;
> +
> +	wait_queue_head_t stop_waitq;
> +
> +	/* async tear down */
> +	struct delayed_work async_teardown_work;
> +
> +	/* log */
> +	unsigned long teardown_time;
> +
> +	atomic_t queue_refs;
>  };
>  
>  bool fuse_uring_enabled(void);
>  void fuse_uring_destruct(struct fuse_conn *fc);
> +void fuse_uring_stop_queues(struct fuse_ring *ring);
> +void fuse_uring_abort_end_requests(struct fuse_ring *ring);
>  int fuse_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags);
>  
> +static inline void fuse_uring_abort(struct fuse_conn *fc)
> +{
> +	struct fuse_ring *ring = fc->ring;
> +
> +	if (ring == NULL)
> +		return;
> +
> +	if (atomic_read(&ring->queue_refs) > 0) {
> +		fuse_uring_abort_end_requests(ring);
> +		fuse_uring_stop_queues(ring);
> +	}
> +}
> +
> +static inline void fuse_uring_wait_stopped_queues(struct fuse_conn *fc)
> +{
> +	struct fuse_ring *ring = fc->ring;
> +
> +	if (ring)
> +		wait_event(ring->stop_waitq,
> +			   atomic_read(&ring->queue_refs) == 0);
> +}
> +
>  #else /* CONFIG_FUSE_IO_URING */
>  
>  struct fuse_ring;
> @@ -123,6 +167,13 @@ static inline bool fuse_uring_enabled(void)
>  	return false;
>  }
>  
> +static inline void fuse_uring_abort(struct fuse_conn *fc)
> +{
> +}
> +
> +static inline void fuse_uring_wait_stopped_queues(struct fuse_conn *fc)
> +{
> +}
>  #endif /* CONFIG_FUSE_IO_URING */
>  
>  #endif /* _FS_FUSE_DEV_URING_I_H */
>
> -- 
> 2.43.0
>
>

-- 
Luís





[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux