Hello Oleg,
On 1/2/2025 7:37 PM, Oleg Nesterov wrote:
wake_up(pipe->wr_wait) makes no sense if pipe_full() is still true after
the reading, the writer sleeping in wait_event(wr_wait, pipe_writable())
will check the pipe_writable() == !pipe_full() condition and sleep again.
Only wake the writer if we actually released a pipe buf, and the pipe was
full before we did so.
We saw hang in hackbench in our weekly regression testing on mainline
kernel. The bisect pointed to this commit.
This patch avoids the unnecessary writer wakeup but I think there may be
a subtle race due to which the writer is never woken up in certain cases.
On zen5 system with 2 sockets with 192C/384T each, I ran hackbench with
16 groups or 32 groups. In 1 out of 20 runs, the race condition is
occurring where the writer is not getting woken up and the benchmarks
hangs. I tried reverting this commit and it again started working fine.
I also tried with
https://lore.kernel.org/all/20250210114039.GA3588@xxxxxxxxxx/. After
applying this patch, the frequency of hang is reduced to 1 in 100 times,
but hang still
exists.
Whenever I compare the case where was_full would have been set but
wake_writer was not set, I see the following pattern:
ret = 100 (Read was successful)
pipe_full() = 1
total_len = 0
buf->len != 0
total_len is computed using iov_iter_count() while the buf->len is the
length of the buffer corresponding to tail(pipe->bufs[tail & mask].len).
Looking at pipe_write(), there seems to be a case where the writer can
make progress when (chars && !was_empty) which only looks at
iov_iter_count(). Could it be the case that there is still room in the
buffer but we are not waking up the writer?
Signed-off-by: Oleg Nesterov <oleg@xxxxxxxxxx>
---
fs/pipe.c | 19 ++++++++++---------
1 file changed, 10 insertions(+), 9 deletions(-)
diff --git a/fs/pipe.c b/fs/pipe.c
index 12b22c2723b7..82fede0f2111 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -253,7 +253,7 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
size_t total_len = iov_iter_count(to);
struct file *filp = iocb->ki_filp;
struct pipe_inode_info *pipe = filp->private_data;
- bool was_full, wake_next_reader = false;
+ bool wake_writer = false, wake_next_reader = false;
ssize_t ret;
/* Null read succeeds. */
@@ -264,14 +264,13 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
mutex_lock(&pipe->mutex);
/*
- * We only wake up writers if the pipe was full when we started
- * reading in order to avoid unnecessary wakeups.
+ * We only wake up writers if the pipe was full when we started reading
+ * and it is no longer full after reading to avoid unnecessary wakeups.
*
* But when we do wake up writers, we do so using a sync wakeup
* (WF_SYNC), because we want them to get going and generate more
* data for us.
*/
- was_full = pipe_full(pipe->head, pipe->tail, pipe->max_usage);
for (;;) {
/* Read ->head with a barrier vs post_one_notification() */
unsigned int head = smp_load_acquire(&pipe->head);
@@ -340,8 +339,10 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
buf->len = 0;
}
- if (!buf->len)
+ if (!buf->len) {
+ wake_writer |= pipe_full(head, tail, pipe->max_usage);
tail = pipe_update_tail(pipe, buf, tail);
+ }
total_len -= chars;
if (!total_len)
break; /* common path: read succeeded */
@@ -377,7 +378,7 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
* _very_ unlikely case that the pipe was full, but we got
* no data.
*/
- if (unlikely(was_full))
+ if (unlikely(wake_writer))
wake_up_interruptible_sync_poll(&pipe->wr_wait, EPOLLOUT | EPOLLWRNORM);
kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
@@ -390,15 +391,15 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
if (wait_event_interruptible_exclusive(pipe->rd_wait, pipe_readable(pipe)) < 0)
return -ERESTARTSYS;
- mutex_lock(&pipe->mutex);
- was_full = pipe_full(pipe->head, pipe->tail, pipe->max_usage);
+ wake_writer = false;
wake_next_reader = true;
+ mutex_lock(&pipe->mutex);
}
if (pipe_empty(pipe->head, pipe->tail))
wake_next_reader = false;
mutex_unlock(&pipe->mutex);
- if (was_full)
+ if (wake_writer)
wake_up_interruptible_sync_poll(&pipe->wr_wait, EPOLLOUT | EPOLLWRNORM);
if (wake_next_reader)
wake_up_interruptible_sync_poll(&pipe->rd_wait, EPOLLIN | EPOLLRDNORM);
--
Thanks and Regards,
Swapnil