[PATCH v5 1/2] watch_queue: refactor post_one_notification

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Refactor post_one_notification so that we can lock pipe using
sleepable lock.

Signed-off-by: Hongchen Zhang <zhanghongchen@xxxxxxxxxxx>
---
 fs/pipe.c                   |  5 +++-
 include/linux/watch_queue.h | 14 ++++++++++-
 kernel/watch_queue.c        | 47 +++++++++++++++++++++++++++----------
 3 files changed, 51 insertions(+), 15 deletions(-)

diff --git a/fs/pipe.c b/fs/pipe.c
index 2d88f73f585a..5c6b3daed938 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -834,8 +834,11 @@ void free_pipe_info(struct pipe_inode_info *pipe)
 	unsigned int i;
 
 #ifdef CONFIG_WATCH_QUEUE
-	if (pipe->watch_queue)
+	if (pipe->watch_queue) {
 		watch_queue_clear(pipe->watch_queue);
+		smp_cond_load_relaxed(&pipe->watch_queue->state,
+				(VAL & WATCH_QUEUE_POST_CNT_MASK) == 0);
+	}
 #endif
 
 	(void) account_pipe_buffers(pipe->user, pipe->nr_accounted, 0);
diff --git a/include/linux/watch_queue.h b/include/linux/watch_queue.h
index fc6bba20273b..1db3eee2137a 100644
--- a/include/linux/watch_queue.h
+++ b/include/linux/watch_queue.h
@@ -35,6 +35,7 @@ struct watch_filter {
 	struct watch_type_filter filters[];
 };
 
+#define WATCH_QUEUE_POST_CNT_MASK GENMASK(30, 0)
 struct watch_queue {
 	struct rcu_head		rcu;
 	struct watch_filter __rcu *filter;
@@ -46,7 +47,18 @@ struct watch_queue {
 	spinlock_t		lock;
 	unsigned int		nr_notes;	/* Number of notes */
 	unsigned int		nr_pages;	/* Number of pages in notes[] */
-	bool			defunct;	/* T when queues closed */
+	union {
+		struct {
+#ifdef __LITTLE_ENDIAN
+			u32	post_cnt:31;	/* How many threads are posting notification */
+			u32	defunct:1;	/* T when queues closed */
+#else
+			u32	defunct:1;	/* T when queues closed */
+			u32	post_cnt:31;	/* How many threads are posting notification */
+#endif
+		};
+		u32	state;
+	};
 };
 
 /*
diff --git a/kernel/watch_queue.c b/kernel/watch_queue.c
index e91cb4c2833f..bd14f054ffb8 100644
--- a/kernel/watch_queue.c
+++ b/kernel/watch_queue.c
@@ -33,6 +33,8 @@ MODULE_AUTHOR("Red Hat, Inc.");
 #define WATCH_QUEUE_NOTE_SIZE 128
 #define WATCH_QUEUE_NOTES_PER_PAGE (PAGE_SIZE / WATCH_QUEUE_NOTE_SIZE)
 
+static void put_watch(struct watch *watch);
+
 /*
  * This must be called under the RCU read-lock, which makes
  * sure that the wqueue still exists. It can then take the lock,
@@ -88,24 +90,40 @@ static const struct pipe_buf_operations watch_queue_pipe_buf_ops = {
 };
 
 /*
- * Post a notification to a watch queue.
- *
- * Must be called with the RCU lock for reading, and the
- * watch_queue lock held, which guarantees that the pipe
- * hasn't been released.
+ * Post a notification to a watch queue with RCU lock held.
  */
-static bool post_one_notification(struct watch_queue *wqueue,
+static bool post_one_notification(struct watch *watch,
 				  struct watch_notification *n)
 {
 	void *p;
-	struct pipe_inode_info *pipe = wqueue->pipe;
+	struct watch_queue *wqueue;
+	struct pipe_inode_info *pipe;
 	struct pipe_buffer *buf;
 	struct page *page;
 	unsigned int head, tail, mask, note, offset, len;
 	bool done = false;
+	u32 state;
+
+	if (!kref_get_unless_zero(&watch->usage))
+		return false;
+	wqueue = rcu_dereference(watch->queue);
+
+	pipe = wqueue->pipe;
 
-	if (!pipe)
+	if (!pipe) {
+		put_watch(watch);
 		return false;
+	}
+
+	do {
+		if (wqueue->defunct) {
+			put_watch(watch);
+			return false;
+		}
+		state = wqueue->state;
+	} while (cmpxchg(&wqueue->state, state, state + 1) != state);
+
+	rcu_read_unlock();
 
 	spin_lock_irq(&pipe->rd_wait.lock);
 
@@ -145,6 +163,12 @@ static bool post_one_notification(struct watch_queue *wqueue,
 
 out:
 	spin_unlock_irq(&pipe->rd_wait.lock);
+	do {
+		state = wqueue->state;
+	} while (cmpxchg(&wqueue->state, state, state - 1) != state);
+
+	rcu_read_lock();
+	put_watch(watch);
 	if (done)
 		kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
 	return done;
@@ -224,10 +248,7 @@ void __post_watch_notification(struct watch_list *wlist,
 		if (security_post_notification(watch->cred, cred, n) < 0)
 			continue;
 
-		if (lock_wqueue(wqueue)) {
-			post_one_notification(wqueue, n);
-			unlock_wqueue(wqueue);
-		}
+		post_one_notification(watch, n);
 	}
 
 	rcu_read_unlock();
@@ -560,8 +581,8 @@ int remove_watch_from_object(struct watch_list *wlist, struct watch_queue *wq,
 
 	wqueue = rcu_dereference(watch->queue);
 
+	post_one_notification(watch, &n.watch);
 	if (lock_wqueue(wqueue)) {
-		post_one_notification(wqueue, &n.watch);
 
 		if (!hlist_unhashed(&watch->queue_node)) {
 			hlist_del_init_rcu(&watch->queue_node);

base-commit: 6995e2de6891c724bfeb2db33d7b87775f913ad1
-- 
2.33.0




[Index of Archives]     [Linux Ext4 Filesystem]     [Union Filesystem]     [Filesystem Testing]     [Ceph Users]     [Ecryptfs]     [NTFS 3]     [AutoFS]     [Kernel Newbies]     [Share Photos]     [Security]     [Netfilter]     [Bugtraq]     [Yosemite News]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux Cachefs]     [Reiser Filesystem]     [Linux RAID]     [NTFS 3]     [Samba]     [Device Mapper]     [CEPH Development]

  Powered by Linux