[PATCH 06/13] fuse: Add an interval ring stop worker/monitor

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



This adds a delayed work queue that runs in intervals
to check and to stop the ring if needed. Fuse connection
abort now waits for this worker to complete.

On stop the worker iterates over all queues and their ring
entries and tries to release entries when they are in the
right' state.

FRRS_INIT - the ring entry is not used at all yet, nothing
to be done.

FRRS_FUSE_WAIT - a CQE needs to be send. This is really important
to do, as uring other keeps workers in D state and prints a warning.

FRRS_USERSPACE bit set - a CQE was already sent and must not be
send again from shutdown code, but typically a fuse request
needs to be completed.

Any other state - the ring entry is currently worked on, shutdown
has to wait until this is completed.

Also, the queue lock is held on any queue entry state change,
shutdown handling is the main reason for that.

Signed-off-by: Bernd Schubert <bschubert@xxxxxxx>
cc: Miklos Szeredi <miklos@xxxxxxxxxx>
cc: linux-fsdevel@xxxxxxxxxxxxxxx
cc: Amir Goldstein <amir73il@xxxxxxxxx>
cc: fuse-devel@xxxxxxxxxxxxxxxxxxxxx
---
 fs/fuse/dev.c         |  12 ++-
 fs/fuse/dev_uring.c   | 168 ++++++++++++++++++++++++++++++++++++++++++
 fs/fuse/dev_uring_i.h |   2 +-
 fs/fuse/inode.c       |   3 +
 4 files changed, 183 insertions(+), 2 deletions(-)

diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
index 07323b041377..d9c40d782c94 100644
--- a/fs/fuse/dev.c
+++ b/fs/fuse/dev.c
@@ -2174,9 +2174,12 @@ void fuse_abort_conn(struct fuse_conn *fc)
 		fuse_dev_end_requests(&to_end);
 
 		mutex_lock(&fc->ring.start_stop_lock);
-		if (fc->ring.configured && !fc->ring.queues_stopped)
+		if (fc->ring.configured && !fc->ring.queues_stopped) {
 			fuse_uring_end_requests(fc);
+			schedule_delayed_work(&fc->ring.stop_monitor, 0);
+		}
 		mutex_unlock(&fc->ring.start_stop_lock);
+
 	} else {
 		spin_unlock(&fc->lock);
 	}
@@ -2187,7 +2190,14 @@ void fuse_wait_aborted(struct fuse_conn *fc)
 {
 	/* matches implicit memory barrier in fuse_drop_waiting() */
 	smp_mb();
+
 	wait_event(fc->blocked_waitq, atomic_read(&fc->num_waiting) == 0);
+
+	/* XXX use struct completion? */
+	if (fc->ring.daemon != NULL) {
+		schedule_delayed_work(&fc->ring.stop_monitor, 0);
+		wait_event(fc->ring.stop_waitq, fc->ring.queues_stopped == 1);
+	}
 }
 
 int fuse_dev_release(struct inode *inode, struct file *file)
diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c
index 12fd21526b2b..44ff23ce5ebf 100644
--- a/fs/fuse/dev_uring.c
+++ b/fs/fuse/dev_uring.c
@@ -26,6 +26,9 @@
 #include <linux/io_uring.h>
 #include <linux/topology.h>
 
+/* default monitor interval for a dying daemon */
+#define FURING_DAEMON_MON_PERIOD (5 * HZ)
+
 static bool __read_mostly enable_uring;
 module_param(enable_uring, bool, 0644);
 MODULE_PARM_DESC(enable_uring,
@@ -44,6 +47,15 @@ fuse_uring_get_queue(struct fuse_conn *fc, int qid)
 	return (struct fuse_ring_queue *)(ptr + qid * fc->ring.queue_size);
 }
 
+/* dummy function will be replaced in later commits */
+static void fuse_uring_bit_set(struct fuse_ring_ent *ent, bool is_bg,
+			       const char *str)
+{
+	(void)ent;
+	(void)is_bg;
+	(void)str;
+}
+
 /* Abort all list queued request on the given ring queue */
 static void fuse_uring_end_queue_requests(struct fuse_ring_queue *queue)
 {
@@ -69,6 +81,156 @@ void fuse_uring_end_requests(struct fuse_conn *fc)
 	}
 }
 
+/**
+ * Simplified ring-entry release function, for shutdown only
+ */
+static void _fuse_uring_shutdown_release_ent(struct fuse_ring_ent *ent)
+__must_hold(&queue->lock)
+{
+	bool is_bg = !!(ent->rreq->flags & FUSE_RING_REQ_FLAG_BACKGROUND);
+
+	ent->state |= FRRS_FUSE_REQ_END;
+	ent->need_req_end = 0;
+	fuse_request_end(ent->fuse_req);
+	ent->fuse_req = NULL;
+	fuse_uring_bit_set(ent, is_bg, __func__);
+}
+
+/*
+ * Release a request/entry on connection shutdown
+ */
+static void fuse_uring_shutdown_release_ent(struct fuse_ring_ent *ent)
+__must_hold(&fc->ring.start_stop_lock)
+__must_hold(&queue->lock)
+{
+	struct fuse_ring_queue *queue = ent->queue;
+	struct fuse_conn *fc = queue->fc;
+	bool may_release = false;
+	int state;
+
+	pr_devel("%s fc=%p qid=%d tag=%d state=%llu\n",
+		 __func__, fc, queue->qid, ent->tag, ent->state);
+
+	if (ent->state & FRRS_FREED)
+		goto out; /* no work left, freed before */
+
+	state = ent->state;
+
+	if (state == FRRS_INIT || state == FRRS_FUSE_WAIT ||
+	    ((state & FRRS_USERSPACE) && queue->aborted)) {
+		ent->state |= FRRS_FREED;
+
+		if (ent->need_cmd_done) {
+			pr_devel("qid=%d tag=%d sending cmd_done\n",
+				queue->qid, ent->tag);
+			io_uring_cmd_done(ent->cmd, -ENOTCONN, 0);
+			ent->need_cmd_done = 0;
+		}
+
+		if (ent->need_req_end)
+			_fuse_uring_shutdown_release_ent(ent);
+		may_release = true;
+	} else {
+		/* somewhere in between states, another thread should currently
+		 * handle it
+		 */
+		pr_devel("%s qid=%d tag=%d state=%llu\n",
+			 __func__, queue->qid, ent->tag, ent->state);
+	}
+
+out:
+	/* might free the queue - needs to have the queue waitq lock released */
+	if (may_release) {
+		int refs = --fc->ring.queue_refs;
+
+		pr_devel("free-req fc=%p qid=%d tag=%d refs=%d\n",
+			 fc, queue->qid, ent->tag, refs);
+		if (refs == 0) {
+			fc->ring.queues_stopped = 1;
+			wake_up_all(&fc->ring.stop_waitq);
+		}
+	}
+}
+
+static void fuse_uring_stop_queue(struct fuse_ring_queue *queue)
+__must_hold(&fc->ring.start_stop_lock)
+__must_hold(&queue->lock)
+{
+	struct fuse_conn *fc = queue->fc;
+	int tag;
+	bool empty =
+		(list_empty(&queue->fg_queue) && list_empty(&queue->fg_queue));
+
+	if (!empty && !queue->aborted)
+		return;
+
+	for (tag = 0; tag < fc->ring.queue_depth; tag++) {
+		struct fuse_ring_ent *ent = &queue->ring_ent[tag];
+
+		fuse_uring_shutdown_release_ent(ent);
+	}
+}
+
+/*
+ *  Stop the ring queues
+ */
+static void fuse_uring_stop_queues(struct fuse_conn *fc)
+__must_hold(fc->ring.start_stop_lock)
+{
+	int qid;
+
+	if (fc->ring.daemon == NULL)
+		return;
+
+	fc->ring.stop_requested = 1;
+	fc->ring.ready = 0;
+
+	for (qid = 0; qid < fc->ring.nr_queues; qid++) {
+		struct fuse_ring_queue *queue =
+			fuse_uring_get_queue(fc, qid);
+
+		if (!queue->configured)
+			continue;
+
+		spin_lock(&queue->lock);
+		fuse_uring_stop_queue(queue);
+		spin_unlock(&queue->lock);
+	}
+}
+
+/*
+ * monitoring functon to check if fuse shall be destructed, run
+ * as delayed task
+ */
+static void fuse_uring_stop_mon(struct work_struct *work)
+{
+	struct fuse_conn *fc = container_of(work, struct fuse_conn,
+					    ring.stop_monitor.work);
+	struct fuse_iqueue *fiq = &fc->iq;
+
+	pr_devel("fc=%p running stop-mon, queues-stopped=%u queue-refs=%d\n",
+		fc, fc->ring.queues_stopped, fc->ring.queue_refs);
+
+	mutex_lock(&fc->ring.start_stop_lock);
+
+	if (!fiq->connected || fc->ring.stop_requested ||
+	    (fc->ring.daemon->flags & PF_EXITING)) {
+		pr_devel("%s Stopping queues connected=%d stop-req=%d exit=%d\n",
+			__func__, fiq->connected, fc->ring.stop_requested,
+			(fc->ring.daemon->flags & PF_EXITING));
+		fuse_uring_stop_queues(fc);
+	}
+
+	if (!fc->ring.queues_stopped)
+		schedule_delayed_work(&fc->ring.stop_monitor,
+				      FURING_DAEMON_MON_PERIOD);
+	else
+		pr_devel("Not scheduling work queues-stopped=%u queue-refs=%d.\n",
+			fc->ring.queues_stopped,  fc->ring.queue_refs);
+
+	mutex_unlock(&fc->ring.start_stop_lock);
+}
+
 /**
  * use __vmalloc_node_range() (needs to be
  * exported?) or add a new (exported) function vm_alloc_user_node()
@@ -127,6 +289,11 @@ __must_hold(fc->ring.stop_waitq.lock)
 	fc->ring.daemon = current;
 	get_task_struct(fc->ring.daemon);
 
+	INIT_DELAYED_WORK(&fc->ring.stop_monitor,
+			  fuse_uring_stop_mon);
+	schedule_delayed_work(&fc->ring.stop_monitor,
+			      FURING_DAEMON_MON_PERIOD);
+
 	fc->ring.nr_queues = cfg->nr_queues;
 	fc->ring.per_core_queue = cfg->nr_queues > 1;
 
@@ -298,6 +465,7 @@ void fuse_uring_ring_destruct(struct fuse_conn *fc)
 		return;
 	}
 
+	cancel_delayed_work_sync(&fc->ring.stop_monitor);
 	put_task_struct(fc->ring.daemon);
 	fc->ring.daemon = NULL;
 
diff --git a/fs/fuse/dev_uring_i.h b/fs/fuse/dev_uring_i.h
index 4ab440ee00f2..d5cb9bdca64e 100644
--- a/fs/fuse/dev_uring_i.h
+++ b/fs/fuse/dev_uring_i.h
@@ -10,8 +10,8 @@
 #include "fuse_i.h"
 
 void fuse_uring_end_requests(struct fuse_conn *fc);
-void fuse_uring_ring_destruct(struct fuse_conn *fc);
 int fuse_uring_ioctl(struct file *file, struct fuse_uring_cfg *cfg);
+void fuse_uring_ring_destruct(struct fuse_conn *fc);
 #endif
 
 
diff --git a/fs/fuse/inode.c b/fs/fuse/inode.c
index 3f765e65a7b0..91c912793dca 100644
--- a/fs/fuse/inode.c
+++ b/fs/fuse/inode.c
@@ -856,6 +856,7 @@ void fuse_conn_init(struct fuse_conn *fc, struct fuse_mount *fm,
 	fc->max_pages = FUSE_DEFAULT_MAX_PAGES_PER_REQ;
 	fc->max_pages_limit = FUSE_MAX_MAX_PAGES;
 
+	init_waitqueue_head(&fc->ring.stop_waitq);
 	mutex_init(&fc->ring.start_stop_lock);
 	fc->ring.daemon = NULL;
 
@@ -1792,6 +1793,8 @@ void fuse_conn_destroy(struct fuse_mount *fm)
 
 	if (fc->ring.daemon != NULL)
 		fuse_uring_ring_destruct(fc);
+
+	mutex_destroy(&fc->ring.start_stop_lock);
 }
 EXPORT_SYMBOL_GPL(fuse_conn_destroy);
 
-- 
2.37.2




[Index of Archives]     [Linux Ext4 Filesystem]     [Union Filesystem]     [Filesystem Testing]     [Ceph Users]     [Ecryptfs]     [NTFS 3]     [AutoFS]     [Kernel Newbies]     [Share Photos]     [Security]     [Netfilter]     [Bugtraq]     [Yosemite News]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux Cachefs]     [Reiser Filesystem]     [Linux RAID]     [NTFS 3]     [Samba]     [Device Mapper]     [CEPH Development]

  Powered by Linux