Re: [PATCH] pipe_read: don't wake up the writer if the pipe is still full

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Hello Oleg,

On 1/2/2025 7:37 PM, Oleg Nesterov wrote:
wake_up(pipe->wr_wait) makes no sense if pipe_full() is still true after
the reading, the writer sleeping in wait_event(wr_wait, pipe_writable())
will check the pipe_writable() == !pipe_full() condition and sleep again.

Only wake the writer if we actually released a pipe buf, and the pipe was
full before we did so.


We saw hang in hackbench in our weekly regression testing on mainline kernel. The bisect pointed to this commit.

This patch avoids the unnecessary writer wakeup but I think there may be a subtle race due to which the writer is never woken up in certain cases.

On zen5 system with 2 sockets with 192C/384T each, I ran hackbench with 16 groups or 32 groups. In 1 out of 20 runs, the race condition is occurring where the writer is not getting woken up and the benchmarks hangs. I tried reverting this commit and it again started working fine.

I also tried with
https://lore.kernel.org/all/20250210114039.GA3588@xxxxxxxxxx/. After applying this patch, the frequency of hang is reduced to 1 in 100 times, but hang still
exists.

Whenever I compare the case where was_full would have been set but wake_writer was not set, I see the following pattern:

ret = 100 (Read was successful)
pipe_full() = 1
total_len = 0
buf->len != 0

total_len is computed using iov_iter_count() while the buf->len is the length of the buffer corresponding to tail(pipe->bufs[tail & mask].len). Looking at pipe_write(), there seems to be a case where the writer can make progress when (chars && !was_empty) which only looks at iov_iter_count(). Could it be the case that there is still room in the buffer but we are not waking up the writer?

Signed-off-by: Oleg Nesterov <oleg@xxxxxxxxxx>
---
  fs/pipe.c | 19 ++++++++++---------
  1 file changed, 10 insertions(+), 9 deletions(-)

diff --git a/fs/pipe.c b/fs/pipe.c
index 12b22c2723b7..82fede0f2111 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -253,7 +253,7 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
  	size_t total_len = iov_iter_count(to);
  	struct file *filp = iocb->ki_filp;
  	struct pipe_inode_info *pipe = filp->private_data;
-	bool was_full, wake_next_reader = false;
+	bool wake_writer = false, wake_next_reader = false;
  	ssize_t ret;
/* Null read succeeds. */
@@ -264,14 +264,13 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
  	mutex_lock(&pipe->mutex);
/*
-	 * We only wake up writers if the pipe was full when we started
-	 * reading in order to avoid unnecessary wakeups.
+	 * We only wake up writers if the pipe was full when we started reading
+	 * and it is no longer full after reading to avoid unnecessary wakeups.
  	 *
  	 * But when we do wake up writers, we do so using a sync wakeup
  	 * (WF_SYNC), because we want them to get going and generate more
  	 * data for us.
  	 */
-	was_full = pipe_full(pipe->head, pipe->tail, pipe->max_usage);
  	for (;;) {
  		/* Read ->head with a barrier vs post_one_notification() */
  		unsigned int head = smp_load_acquire(&pipe->head);
@@ -340,8 +339,10 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
  				buf->len = 0;
  			}
- if (!buf->len)
+			if (!buf->len) {
+				wake_writer |= pipe_full(head, tail, pipe->max_usage);
  				tail = pipe_update_tail(pipe, buf, tail);
+			}
  			total_len -= chars;
  			if (!total_len)
  				break;	/* common path: read succeeded */
@@ -377,7 +378,7 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
  		 * _very_ unlikely case that the pipe was full, but we got
  		 * no data.
  		 */
-		if (unlikely(was_full))
+		if (unlikely(wake_writer))
  			wake_up_interruptible_sync_poll(&pipe->wr_wait, EPOLLOUT | EPOLLWRNORM);
  		kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
@@ -390,15 +391,15 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
  		if (wait_event_interruptible_exclusive(pipe->rd_wait, pipe_readable(pipe)) < 0)
  			return -ERESTARTSYS;
- mutex_lock(&pipe->mutex);
-		was_full = pipe_full(pipe->head, pipe->tail, pipe->max_usage);
+		wake_writer = false;
  		wake_next_reader = true;
+		mutex_lock(&pipe->mutex);
  	}
  	if (pipe_empty(pipe->head, pipe->tail))
  		wake_next_reader = false;
  	mutex_unlock(&pipe->mutex);
- if (was_full)
+	if (wake_writer)
  		wake_up_interruptible_sync_poll(&pipe->wr_wait, EPOLLOUT | EPOLLWRNORM);
  	if (wake_next_reader)
  		wake_up_interruptible_sync_poll(&pipe->rd_wait, EPOLLIN | EPOLLRDNORM);
--
Thanks and Regards,
Swapnil




[Index of Archives]     [Linux Ext4 Filesystem]     [Union Filesystem]     [Filesystem Testing]     [Ceph Users]     [Ecryptfs]     [NTFS 3]     [AutoFS]     [Kernel Newbies]     [Share Photos]     [Security]     [Netfilter]     [Bugtraq]     [Yosemite News]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux Cachefs]     [Reiser Filesystem]     [Linux RAID]     [NTFS 3]     [Samba]     [Device Mapper]     [CEPH Development]

  Powered by Linux