Refactor post_one_notification so that we can lock pipe using sleepable lock. Signed-off-by: Hongchen Zhang <zhanghongchen@xxxxxxxxxxx> --- fs/pipe.c | 5 +++- include/linux/watch_queue.h | 14 ++++++++++- kernel/watch_queue.c | 47 +++++++++++++++++++++++++++---------- 3 files changed, 51 insertions(+), 15 deletions(-) diff --git a/fs/pipe.c b/fs/pipe.c index 2d88f73f585a..5c6b3daed938 100644 --- a/fs/pipe.c +++ b/fs/pipe.c @@ -834,8 +834,11 @@ void free_pipe_info(struct pipe_inode_info *pipe) unsigned int i; #ifdef CONFIG_WATCH_QUEUE - if (pipe->watch_queue) + if (pipe->watch_queue) { watch_queue_clear(pipe->watch_queue); + smp_cond_load_relaxed(&pipe->watch_queue->state, + (VAL & WATCH_QUEUE_POST_CNT_MASK) == 0); + } #endif (void) account_pipe_buffers(pipe->user, pipe->nr_accounted, 0); diff --git a/include/linux/watch_queue.h b/include/linux/watch_queue.h index fc6bba20273b..1db3eee2137a 100644 --- a/include/linux/watch_queue.h +++ b/include/linux/watch_queue.h @@ -35,6 +35,7 @@ struct watch_filter { struct watch_type_filter filters[]; }; +#define WATCH_QUEUE_POST_CNT_MASK GENMASK(30, 0) struct watch_queue { struct rcu_head rcu; struct watch_filter __rcu *filter; @@ -46,7 +47,18 @@ struct watch_queue { spinlock_t lock; unsigned int nr_notes; /* Number of notes */ unsigned int nr_pages; /* Number of pages in notes[] */ - bool defunct; /* T when queues closed */ + union { + struct { +#ifdef __LITTLE_ENDIAN + u32 post_cnt:31; /* How many threads are posting notification */ + u32 defunct:1; /* T when queues closed */ +#else + u32 defunct:1; /* T when queues closed */ + u32 post_cnt:31; /* How many threads are posting notification */ +#endif + }; + u32 state; + }; }; /* diff --git a/kernel/watch_queue.c b/kernel/watch_queue.c index e91cb4c2833f..bd14f054ffb8 100644 --- a/kernel/watch_queue.c +++ b/kernel/watch_queue.c @@ -33,6 +33,8 @@ MODULE_AUTHOR("Red Hat, Inc."); #define WATCH_QUEUE_NOTE_SIZE 128 #define WATCH_QUEUE_NOTES_PER_PAGE (PAGE_SIZE / WATCH_QUEUE_NOTE_SIZE) +static void put_watch(struct watch *watch); + /* * This must be called under the RCU read-lock, which makes * sure that the wqueue still exists. It can then take the lock, @@ -88,24 +90,40 @@ static const struct pipe_buf_operations watch_queue_pipe_buf_ops = { }; /* - * Post a notification to a watch queue. - * - * Must be called with the RCU lock for reading, and the - * watch_queue lock held, which guarantees that the pipe - * hasn't been released. + * Post a notification to a watch queue with RCU lock held. */ -static bool post_one_notification(struct watch_queue *wqueue, +static bool post_one_notification(struct watch *watch, struct watch_notification *n) { void *p; - struct pipe_inode_info *pipe = wqueue->pipe; + struct watch_queue *wqueue; + struct pipe_inode_info *pipe; struct pipe_buffer *buf; struct page *page; unsigned int head, tail, mask, note, offset, len; bool done = false; + u32 state; + + if (!kref_get_unless_zero(&watch->usage)) + return false; + wqueue = rcu_dereference(watch->queue); + + pipe = wqueue->pipe; - if (!pipe) + if (!pipe) { + put_watch(watch); return false; + } + + do { + if (wqueue->defunct) { + put_watch(watch); + return false; + } + state = wqueue->state; + } while (cmpxchg(&wqueue->state, state, state + 1) != state); + + rcu_read_unlock(); spin_lock_irq(&pipe->rd_wait.lock); @@ -145,6 +163,12 @@ static bool post_one_notification(struct watch_queue *wqueue, out: spin_unlock_irq(&pipe->rd_wait.lock); + do { + state = wqueue->state; + } while (cmpxchg(&wqueue->state, state, state - 1) != state); + + rcu_read_lock(); + put_watch(watch); if (done) kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN); return done; @@ -224,10 +248,7 @@ void __post_watch_notification(struct watch_list *wlist, if (security_post_notification(watch->cred, cred, n) < 0) continue; - if (lock_wqueue(wqueue)) { - post_one_notification(wqueue, n); - unlock_wqueue(wqueue); - } + post_one_notification(watch, n); } rcu_read_unlock(); @@ -560,8 +581,8 @@ int remove_watch_from_object(struct watch_list *wlist, struct watch_queue *wq, wqueue = rcu_dereference(watch->queue); + post_one_notification(watch, &n.watch); if (lock_wqueue(wqueue)) { - post_one_notification(wqueue, &n.watch); if (!hlist_unhashed(&watch->queue_node)) { hlist_del_init_rcu(&watch->queue_node); base-commit: 6995e2de6891c724bfeb2db33d7b87775f913ad1 -- 2.33.0