Hi Oleg, just FYI, I did some quick tests with: - your changes to fs/pipe.c - my change, to skip locking in wake-up (and some smp_mb()) - statistics, to check how often wake_up is called/how often the list is wait queue is actually empty Known issue: Statistic printing every 10 seconds doesn't work, it prints at eratic times. And the comment in __wake_up is still wrong, the memory barrier would pair with smp_mb() after updating wq_head->head. Result: (all with a 2 core 4 thread i3, fully idle system) - your change has no impact on 'find /proc /sys | grep doesnotexist' (using busybox) - Running your test app for around 100 seconds - 3 wakeups with non-empty queue - 26 wakeup with empty queue - 2107 __add_wait_queue - find|grep produces insane numbers of wakeup. I've seen 20k, I've now seen 50k wakeup calls. With just around 2k __add_wait_queue, ... Thus, at least for pipe: Should we add the missing memory barriers and switch to wait_queue_active() in front of all wakeup calls? --- fs/pipe.c | 13 +++++++------ include/linux/wait.h | 5 +++++ kernel/sched/wait.c | 45 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 57 insertions(+), 6 deletions(-) diff --git a/fs/pipe.c b/fs/pipe.c index 12b22c2723b7..27ffb650f131 100644 --- a/fs/pipe.c +++ b/fs/pipe.c @@ -253,7 +253,7 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to) size_t total_len = iov_iter_count(to); struct file *filp = iocb->ki_filp; struct pipe_inode_info *pipe = filp->private_data; - bool was_full, wake_next_reader = false; + bool wake_writer = false, wake_next_reader = false; ssize_t ret; /* Null read succeeds. */ @@ -271,7 +271,6 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to) * (WF_SYNC), because we want them to get going and generate more * data for us. */ - was_full = pipe_full(pipe->head, pipe->tail, pipe->max_usage); for (;;) { /* Read ->head with a barrier vs post_one_notification() */ unsigned int head = smp_load_acquire(&pipe->head); @@ -340,8 +339,10 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to) buf->len = 0; } - if (!buf->len) + if (!buf->len) { + wake_writer |= pipe_full(head, tail, pipe->max_usage); tail = pipe_update_tail(pipe, buf, tail); + } total_len -= chars; if (!total_len) break; /* common path: read succeeded */ @@ -377,7 +378,7 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to) * _very_ unlikely case that the pipe was full, but we got * no data. */ - if (unlikely(was_full)) + if (unlikely(wake_writer)) wake_up_interruptible_sync_poll(&pipe->wr_wait, EPOLLOUT | EPOLLWRNORM); kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT); @@ -391,14 +392,14 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to) return -ERESTARTSYS; mutex_lock(&pipe->mutex); - was_full = pipe_full(pipe->head, pipe->tail, pipe->max_usage); wake_next_reader = true; + wake_writer = false; } if (pipe_empty(pipe->head, pipe->tail)) wake_next_reader = false; mutex_unlock(&pipe->mutex); - if (was_full) + if (wake_writer) wake_up_interruptible_sync_poll(&pipe->wr_wait, EPOLLOUT | EPOLLWRNORM); if (wake_next_reader) wake_up_interruptible_sync_poll(&pipe->rd_wait, EPOLLIN | EPOLLRDNORM); diff --git a/include/linux/wait.h b/include/linux/wait.h index 6d90ad974408..0fdad3c3c513 100644 --- a/include/linux/wait.h +++ b/include/linux/wait.h @@ -166,6 +166,7 @@ extern void add_wait_queue_exclusive(struct wait_queue_head *wq_head, struct wai extern void add_wait_queue_priority(struct wait_queue_head *wq_head, struct wait_queue_entry *wq_entry); extern void remove_wait_queue(struct wait_queue_head *wq_head, struct wait_queue_entry *wq_entry); +extern atomic_t g_add_count; static inline void __add_wait_queue(struct wait_queue_head *wq_head, struct wait_queue_entry *wq_entry) { struct list_head *head = &wq_head->head; @@ -177,6 +178,8 @@ static inline void __add_wait_queue(struct wait_queue_head *wq_head, struct wait head = &wq->entry; } list_add(&wq_entry->entry, head); + smp_mb(); + atomic_inc(&g_add_count); } /* @@ -192,6 +195,8 @@ __add_wait_queue_exclusive(struct wait_queue_head *wq_head, struct wait_queue_en static inline void __add_wait_queue_entry_tail(struct wait_queue_head *wq_head, struct wait_queue_entry *wq_entry) { list_add_tail(&wq_entry->entry, &wq_head->head); + smp_mb(); + atomic_inc(&g_add_count); } static inline void diff --git a/kernel/sched/wait.c b/kernel/sched/wait.c index 51e38f5f4701..07487429dddf 100644 --- a/kernel/sched/wait.c +++ b/kernel/sched/wait.c @@ -110,6 +110,10 @@ static int __wake_up_common_lock(struct wait_queue_head *wq_head, unsigned int m return nr_exclusive - remaining; } +#if 1 +atomic_t g_add_count = ATOMIC_INIT(0); +#endif + /** * __wake_up - wake up threads blocked on a waitqueue. * @wq_head: the waitqueue @@ -124,6 +128,47 @@ static int __wake_up_common_lock(struct wait_queue_head *wq_head, unsigned int m int __wake_up(struct wait_queue_head *wq_head, unsigned int mode, int nr_exclusive, void *key) { +#if 1 +static atomic_t g_slow = ATOMIC_INIT(0); +static atomic_t g_mid = ATOMIC_INIT(0); +static atomic_t g_fast = ATOMIC_INIT(0); +static u64 printtime = 10*HZ; +#endif + if (list_empty(&wq_head->head)) { + struct list_head *pn; + + /* + * pairs with spin_unlock_irqrestore(&wq_head->lock); + * We actually do not need to acquire wq_head->lock, we just + * need to be sure that there is no prepare_to_wait() that + * completed on any CPU before __wake_up was called. + * Thus instead of load_acquiring the spinlock and dropping + * it again, we load_acquire the next list entry and check + * that the list is not empty. + */ + pn = smp_load_acquire(&wq_head->head.next); + + if(pn == &wq_head->head) { +#if 1 + atomic_inc(&g_fast); +#endif + return 0; + } else { +#if 1 + atomic_inc(&g_mid); +#endif + } + } else { +#if 1 + atomic_inc(&g_slow); +#endif + } +#if 1 + if (get_jiffies_64() > printtime) { + printtime = get_jiffies_64() + 10*HZ; + pr_info("__wakeup: slow/obvious: %d, mid/nearly raced: %d, fast: %d, add: %d.\n", atomic_read(&g_slow), atomic_read(&g_mid), atomic_read(&g_fast), atomic_read(&g_add_count)); + } +#endif return __wake_up_common_lock(wq_head, mode, nr_exclusive, 0, key); } EXPORT_SYMBOL(__wake_up); -- 2.47.1