[Adding some of my colleagues who were part of the original submission to the CC list for their information.]
On 2025/1/2 22:07, Oleg Nesterov wrote:
wake_up(pipe->wr_wait) makes no sense if pipe_full() is still true after the reading, the writer sleeping in wait_event(wr_wait, pipe_writable()) will check the pipe_writable() == !pipe_full() condition and sleep again. Only wake the writer if we actually released a pipe buf, and the pipe was full before we did so.
As Linus said, for fs/pipe, he "want any patches to be very clearly documented," perhaps we should include a link to the original discussion here.
Link: https://lore.kernel.org/all/75B06EE0B67747ED+20241225094202.597305-1-wangyuli@xxxxxxxxxxxxx/
Link:
https://lore.kernel.org/all/20241229135737.GA3293@xxxxxxxxxx/
Signed-off-by: Oleg Nesterov <oleg@xxxxxxxxxx>
Reported-by: WangYuli <wangyuli@xxxxxxxxxxxxx>
I'm happy to provide more test results for this patch if it's not
too late.
--- fs/pipe.c | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/fs/pipe.c b/fs/pipe.c index 12b22c2723b7..82fede0f2111 100644 --- a/fs/pipe.c +++ b/fs/pipe.c @@ -253,7 +253,7 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to) size_t total_len = iov_iter_count(to); struct file *filp = iocb->ki_filp; struct pipe_inode_info *pipe = filp->private_data; - bool was_full, wake_next_reader = false; + bool wake_writer = false, wake_next_reader = false; ssize_t ret; /* Null read succeeds. */ @@ -264,14 +264,13 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to) mutex_lock(&pipe->mutex); /* - * We only wake up writers if the pipe was full when we started - * reading in order to avoid unnecessary wakeups. + * We only wake up writers if the pipe was full when we started reading + * and it is no longer full after reading to avoid unnecessary wakeups. * * But when we do wake up writers, we do so using a sync wakeup * (WF_SYNC), because we want them to get going and generate more * data for us. */ - was_full = pipe_full(pipe->head, pipe->tail, pipe->max_usage); for (;;) { /* Read ->head with a barrier vs post_one_notification() */ unsigned int head = smp_load_acquire(&pipe->head); @@ -340,8 +339,10 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to) buf->len = 0; } - if (!buf->len) + if (!buf->len) { + wake_writer |= pipe_full(head, tail, pipe->max_usage); tail = pipe_update_tail(pipe, buf, tail); + } total_len -= chars; if (!total_len) break; /* common path: read succeeded */ @@ -377,7 +378,7 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to) * _very_ unlikely case that the pipe was full, but we got * no data. */ - if (unlikely(was_full)) + if (unlikely(wake_writer)) wake_up_interruptible_sync_poll(&pipe->wr_wait, EPOLLOUT | EPOLLWRNORM); kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT); @@ -390,15 +391,15 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to) if (wait_event_interruptible_exclusive(pipe->rd_wait, pipe_readable(pipe)) < 0) return -ERESTARTSYS; - mutex_lock(&pipe->mutex); - was_full = pipe_full(pipe->head, pipe->tail, pipe->max_usage); + wake_writer = false; wake_next_reader = true; + mutex_lock(&pipe->mutex); } if (pipe_empty(pipe->head, pipe->tail)) wake_next_reader = false; mutex_unlock(&pipe->mutex); - if (was_full) + if (wake_writer) wake_up_interruptible_sync_poll(&pipe->wr_wait, EPOLLOUT | EPOLLWRNORM); if (wake_next_reader) wake_up_interruptible_sync_poll(&pipe->rd_wait, EPOLLIN | EPOLLRDNORM);
Hmm..
Initially, the sole purpose of our
original patch was to simply check if there were any waiting
processes in the process wait queue to avoid unnecessary wake-ups,
for both reads and writes.
And then, sincerely thank you all for
taking the time to review it!
While your patch and ours share some
little similarities, our primary goals may vary slightly. Do you
have any suggestions on how we could better achieve our original
objective?
Thanks,
--
WangYuli
WangYuli
Attachment:
OpenPGP_0xC5DA1F3046F40BEE.asc
Description: OpenPGP public key
Attachment:
OpenPGP_signature.asc
Description: OpenPGP digital signature