[PATCH RFC v2 13/19] fuse: {uring} Handle uring shutdown

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Signed-off-by: Bernd Schubert <bschubert@xxxxxxx>
---
 fs/fuse/dev.c         |  10 +++
 fs/fuse/dev_uring.c   | 194 ++++++++++++++++++++++++++++++++++++++++++++++++++
 fs/fuse/dev_uring_i.h |  67 +++++++++++++++++
 3 files changed, 271 insertions(+)

diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
index a7d26440de39..6ffd216b27c8 100644
--- a/fs/fuse/dev.c
+++ b/fs/fuse/dev.c
@@ -2202,6 +2202,8 @@ void fuse_abort_conn(struct fuse_conn *fc)
 		fc->connected = 0;
 		spin_unlock(&fc->bg_lock);
 
+		fuse_uring_set_stopped(fc);
+
 		fuse_set_initialized(fc);
 		list_for_each_entry(fud, &fc->devices, entry) {
 			struct fuse_pqueue *fpq = &fud->pq;
@@ -2245,6 +2247,12 @@ void fuse_abort_conn(struct fuse_conn *fc)
 		spin_unlock(&fc->lock);
 
 		fuse_dev_end_requests(&to_end);
+
+		/*
+		 * fc->lock must not be taken to avoid conflicts with io-uring
+		 * locks
+		 */
+		fuse_uring_abort(fc);
 	} else {
 		spin_unlock(&fc->lock);
 	}
@@ -2256,6 +2264,8 @@ void fuse_wait_aborted(struct fuse_conn *fc)
 	/* matches implicit memory barrier in fuse_drop_waiting() */
 	smp_mb();
 	wait_event(fc->blocked_waitq, atomic_read(&fc->num_waiting) == 0);
+
+	fuse_uring_wait_stopped_queues(fc);
 }
 
 int fuse_dev_release(struct inode *inode, struct file *file)
diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c
index 5269b3f8891e..6001ba4d6e82 100644
--- a/fs/fuse/dev_uring.c
+++ b/fs/fuse/dev_uring.c
@@ -48,6 +48,44 @@ fuse_uring_async_send_to_ring(struct io_uring_cmd *cmd,
 	io_uring_cmd_done(cmd, 0, 0, issue_flags);
 }
 
+/* Abort all list queued request on the given ring queue */
+static void fuse_uring_abort_end_queue_requests(struct fuse_ring_queue *queue)
+{
+	struct fuse_req *req;
+	LIST_HEAD(sync_list);
+	LIST_HEAD(async_list);
+
+	spin_lock(&queue->lock);
+
+	list_for_each_entry(req, &queue->sync_fuse_req_queue, list)
+		clear_bit(FR_PENDING, &req->flags);
+	list_for_each_entry(req, &queue->async_fuse_req_queue, list)
+		clear_bit(FR_PENDING, &req->flags);
+
+	list_splice_init(&queue->async_fuse_req_queue, &sync_list);
+	list_splice_init(&queue->sync_fuse_req_queue, &async_list);
+
+	spin_unlock(&queue->lock);
+
+	/* must not hold queue lock to avoid order issues with fi->lock */
+	fuse_dev_end_requests(&sync_list);
+	fuse_dev_end_requests(&async_list);
+}
+
+void fuse_uring_abort_end_requests(struct fuse_ring *ring)
+{
+	int qid;
+
+	for (qid = 0; qid < ring->nr_queues; qid++) {
+		struct fuse_ring_queue *queue = fuse_uring_get_queue(ring, qid);
+
+		if (!queue->configured)
+			continue;
+
+		fuse_uring_abort_end_queue_requests(queue);
+	}
+}
+
 /* Update conn limits according to ring values */
 static void fuse_uring_conn_cfg_limits(struct fuse_ring *ring)
 {
@@ -361,6 +399,162 @@ int fuse_uring_queue_cfg(struct fuse_ring *ring,
 	return 0;
 }
 
+static void fuse_uring_stop_fuse_req_end(struct fuse_ring_ent *ent)
+{
+	struct fuse_req *req = ent->fuse_req;
+
+	ent->fuse_req = NULL;
+	clear_bit(FRRS_FUSE_REQ, &ent->state);
+	clear_bit(FR_SENT, &req->flags);
+	req->out.h.error = -ECONNABORTED;
+	fuse_request_end(req);
+}
+
+/*
+ * Release a request/entry on connection shutdown
+ */
+static bool fuse_uring_try_entry_stop(struct fuse_ring_ent *ent,
+				      bool need_cmd_done)
+	__must_hold(ent->queue->lock)
+{
+	struct fuse_ring_queue *queue = ent->queue;
+	bool released = false;
+
+	if (test_bit(FRRS_FREED, &ent->state))
+		goto out; /* no work left, freed before */
+
+	if (ent->state == BIT(FRRS_INIT) || test_bit(FRRS_WAIT, &ent->state) ||
+	    test_bit(FRRS_USERSPACE, &ent->state)) {
+		set_bit(FRRS_FREED, &ent->state);
+
+		if (need_cmd_done) {
+			pr_devel("qid=%d tag=%d sending cmd_done\n", queue->qid,
+				 ent->tag);
+
+			spin_unlock(&queue->lock);
+			io_uring_cmd_done(ent->cmd, -ENOTCONN, 0,
+					  IO_URING_F_UNLOCKED);
+			spin_lock(&queue->lock);
+		}
+
+		if (ent->fuse_req)
+			fuse_uring_stop_fuse_req_end(ent);
+		released = true;
+	}
+out:
+	return released;
+}
+
+static void fuse_uring_stop_list_entries(struct list_head *head,
+					 struct fuse_ring_queue *queue,
+					 bool need_cmd_done)
+{
+	struct fuse_ring *ring = queue->ring;
+	struct fuse_ring_ent *ent, *next;
+	ssize_t queue_refs = SSIZE_MAX;
+
+	list_for_each_entry_safe(ent, next, head, list) {
+		if (fuse_uring_try_entry_stop(ent, need_cmd_done)) {
+			queue_refs = atomic_dec_return(&ring->queue_refs);
+			list_del_init(&ent->list);
+		}
+
+		if (WARN_ON_ONCE(queue_refs < 0))
+			pr_warn("qid=%d queue_refs=%zd", queue->qid,
+				queue_refs);
+	}
+}
+
+static void fuse_uring_stop_queue(struct fuse_ring_queue *queue)
+	__must_hold(&queue->lock)
+{
+	fuse_uring_stop_list_entries(&queue->ent_in_userspace, queue, false);
+	fuse_uring_stop_list_entries(&queue->async_ent_avail_queue, queue, true);
+	fuse_uring_stop_list_entries(&queue->sync_ent_avail_queue, queue, true);
+}
+
+/*
+ * Log state debug info
+ */
+static void fuse_uring_stop_ent_state(struct fuse_ring *ring)
+{
+	int qid, tag;
+
+	for (qid = 0; qid < ring->nr_queues; qid++) {
+		struct fuse_ring_queue *queue = fuse_uring_get_queue(ring, qid);
+
+		for (tag = 0; tag < ring->queue_depth; tag++) {
+			struct fuse_ring_ent *ent = &queue->ring_ent[tag];
+
+			if (!test_bit(FRRS_FREED, &ent->state))
+				pr_info("ring=%p qid=%d tag=%d state=%lu\n",
+					ring, qid, tag, ent->state);
+		}
+	}
+	ring->stop_debug_log = 1;
+}
+
+static void fuse_uring_async_stop_queues(struct work_struct *work)
+{
+	int qid;
+	struct fuse_ring *ring =
+		container_of(work, struct fuse_ring, stop_work.work);
+
+	for (qid = 0; qid < ring->nr_queues; qid++) {
+		struct fuse_ring_queue *queue = fuse_uring_get_queue(ring, qid);
+
+		if (!queue->configured)
+			continue;
+
+		spin_lock(&queue->lock);
+		fuse_uring_stop_queue(queue);
+		spin_unlock(&queue->lock);
+	}
+
+	if (atomic_read(&ring->queue_refs) > 0) {
+		if (time_after(jiffies,
+			       ring->stop_time + FUSE_URING_STOP_WARN_TIMEOUT))
+			fuse_uring_stop_ent_state(ring);
+
+		pr_info("ring=%p scheduling intervalled queue stop\n", ring);
+
+		schedule_delayed_work(&ring->stop_work,
+				      FUSE_URING_STOP_INTERVAL);
+	} else {
+		wake_up_all(&ring->stop_waitq);
+	}
+}
+
+/*
+ * Stop the ring queues
+ */
+void fuse_uring_stop_queues(struct fuse_ring *ring)
+{
+	int qid;
+
+	for (qid = 0; qid < ring->nr_queues; qid++) {
+		struct fuse_ring_queue *queue = fuse_uring_get_queue(ring, qid);
+
+		if (!queue->configured)
+			continue;
+
+		spin_lock(&queue->lock);
+		fuse_uring_stop_queue(queue);
+		spin_unlock(&queue->lock);
+	}
+
+	if (atomic_read(&ring->queue_refs) > 0) {
+		pr_info("ring=%p scheduling intervalled queue stop\n", ring);
+		ring->stop_time = jiffies;
+		INIT_DELAYED_WORK(&ring->stop_work,
+				  fuse_uring_async_stop_queues);
+		schedule_delayed_work(&ring->stop_work,
+				      FUSE_URING_STOP_INTERVAL);
+	} else {
+		wake_up_all(&ring->stop_waitq);
+	}
+}
+
 /*
  * Checks for errors and stores it into the request
  */
diff --git a/fs/fuse/dev_uring_i.h b/fs/fuse/dev_uring_i.h
index b2be67bb2fa7..e5fc84e2f3ea 100644
--- a/fs/fuse/dev_uring_i.h
+++ b/fs/fuse/dev_uring_i.h
@@ -16,6 +16,9 @@
 /* IORING_MAX_ENTRIES */
 #define FUSE_URING_MAX_QUEUE_DEPTH 32768
 
+#define FUSE_URING_STOP_WARN_TIMEOUT (5 * HZ)
+#define FUSE_URING_STOP_INTERVAL (HZ/20)
+
 enum fuse_ring_req_state {
 
 	/* request is basially initialized */
@@ -203,6 +206,7 @@ int fuse_uring_mmap(struct file *filp, struct vm_area_struct *vma);
 int fuse_uring_queue_cfg(struct fuse_ring *ring,
 			 struct fuse_ring_queue_config *qcfg);
 void fuse_uring_ring_destruct(struct fuse_ring *ring);
+void fuse_uring_stop_queues(struct fuse_ring *ring);
 int fuse_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags);
 
 static inline void fuse_uring_conn_init(struct fuse_ring *ring,
@@ -275,6 +279,58 @@ static inline bool fuse_per_core_queue(struct fuse_conn *fc)
 	return fc->ring && fc->ring->per_core_queue;
 }
 
+static inline void fuse_uring_set_stopped_queues(struct fuse_ring *ring)
+{
+	int qid;
+
+	for (qid = 0; qid < ring->nr_queues; qid++) {
+		struct fuse_ring_queue *queue = fuse_uring_get_queue(ring, qid);
+
+		if (!queue->configured)
+			continue;
+
+		spin_lock(&queue->lock);
+		queue->stopped = 1;
+		spin_unlock(&queue->lock);
+	}
+}
+
+/*
+ *  Set per queue aborted flag
+ */
+static inline void fuse_uring_set_stopped(struct fuse_conn *fc)
+	__must_hold(fc->lock)
+{
+	if (fc->ring == NULL)
+		return;
+
+	fc->ring->ready = false;
+
+	fuse_uring_set_stopped_queues(fc->ring);
+}
+
+static inline void fuse_uring_abort(struct fuse_conn *fc)
+{
+	struct fuse_ring *ring = fc->ring;
+
+	if (ring == NULL)
+		return;
+
+	if (ring->configured && atomic_read(&ring->queue_refs) > 0) {
+		fuse_uring_abort_end_requests(ring);
+		fuse_uring_stop_queues(ring);
+	}
+}
+
+static inline void fuse_uring_wait_stopped_queues(struct fuse_conn *fc)
+{
+	struct fuse_ring *ring = fc->ring;
+
+	if (ring && ring->configured)
+		wait_event(ring->stop_waitq,
+			   atomic_read(&ring->queue_refs) == 0);
+}
+
 #else /* CONFIG_FUSE_IO_URING */
 
 struct fuse_ring;
@@ -298,6 +354,17 @@ static inline bool fuse_per_core_queue(struct fuse_conn *fc)
 	return false;
 }
 
+static inline void fuse_uring_set_stopped(struct fuse_conn *fc)
+{
+}
+
+static inline void fuse_uring_abort(struct fuse_conn *fc)
+{
+}
+
+static inline void fuse_uring_wait_stopped_queues(struct fuse_conn *fc)
+{
+}
 
 #endif /* CONFIG_FUSE_IO_URING */
 

-- 
2.40.1





[Index of Archives]     [Linux Ext4 Filesystem]     [Union Filesystem]     [Filesystem Testing]     [Ceph Users]     [Ecryptfs]     [NTFS 3]     [AutoFS]     [Kernel Newbies]     [Share Photos]     [Security]     [Netfilter]     [Bugtraq]     [Yosemite News]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux Cachefs]     [Reiser Filesystem]     [Linux RAID]     [NTFS 3]     [Samba]     [Device Mapper]     [CEPH Development]

  Powered by Linux