a194dfe6e6f6 ("pipe: Rearrange sequence in pipe_write() to preallocate slot") changed pipe_write() to increment pipe->head in advance. IIUC to avoid the race with the post_one_notification()-like code which can add another buffer under pipe->rd_wait.lock without pipe->mutex. This is no longer necessary after c73be61cede5 ("pipe: Add general notification queue support"), pipe_write() checks pipe_has_watch_queue() and returns -EXDEV at the start. And can't help in any case, pipe_write() no longer takes this rd_wait.lock spinlock. Change pipe_write() to call copy_page_from_iter() first and do nothing if it fails. This way pipe_write() can't add a zero-sized buffer and we can simplify pipe_read() which currently has to take care of this very unlikely case. Also, with this patch we can probably kill eat_empty_buffer() and more "is this buffer empty" checks in fs/splice.c later. Link: https://lore.kernel.org/all/20250209150718.GA17013@xxxxxxxxxx/ Signed-off-by: Oleg Nesterov <oleg@xxxxxxxxxx> Tested-by: K Prateek Nayak <kprateek.nayak@xxxxxxx> --- fs/pipe.c | 45 +++++++++------------------------------------ 1 file changed, 9 insertions(+), 36 deletions(-) diff --git a/fs/pipe.c b/fs/pipe.c index 2ae75adfba64..b0641f75b1ba 100644 --- a/fs/pipe.c +++ b/fs/pipe.c @@ -360,29 +360,9 @@ anon_pipe_read(struct kiocb *iocb, struct iov_iter *to) break; } mutex_unlock(&pipe->mutex); - /* * We only get here if we didn't actually read anything. * - * However, we could have seen (and removed) a zero-sized - * pipe buffer, and might have made space in the buffers - * that way. - * - * You can't make zero-sized pipe buffers by doing an empty - * write (not even in packet mode), but they can happen if - * the writer gets an EFAULT when trying to fill a buffer - * that already got allocated and inserted in the buffer - * array. - * - * So we still need to wake up any pending writers in the - * _very_ unlikely case that the pipe was full, but we got - * no data. - */ - if (unlikely(wake_writer)) - wake_up_interruptible_sync_poll(&pipe->wr_wait, EPOLLOUT | EPOLLWRNORM); - kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT); - - /* * But because we didn't read anything, at this point we can * just return directly with -ERESTARTSYS if we're interrupted, * since we've done any required wakeups and there's no need @@ -391,7 +371,6 @@ anon_pipe_read(struct kiocb *iocb, struct iov_iter *to) if (wait_event_interruptible_exclusive(pipe->rd_wait, pipe_readable(pipe)) < 0) return -ERESTARTSYS; - wake_writer = false; wake_next_reader = true; mutex_lock(&pipe->mutex); } @@ -526,33 +505,27 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *from) pipe->tmp_page = page; } - /* Allocate a slot in the ring in advance and attach an - * empty buffer. If we fault or otherwise fail to use - * it, either the reader will consume it or it'll still - * be there for the next write. - */ - pipe->head = head + 1; + copied = copy_page_from_iter(page, 0, PAGE_SIZE, from); + if (unlikely(copied < PAGE_SIZE && iov_iter_count(from))) { + if (!ret) + ret = -EFAULT; + break; + } + pipe->head = head + 1; + pipe->tmp_page = NULL; /* Insert it into the buffer array */ buf = &pipe->bufs[head & mask]; buf->page = page; buf->ops = &anon_pipe_buf_ops; buf->offset = 0; - buf->len = 0; if (is_packetized(filp)) buf->flags = PIPE_BUF_FLAG_PACKET; else buf->flags = PIPE_BUF_FLAG_CAN_MERGE; - pipe->tmp_page = NULL; - copied = copy_page_from_iter(page, 0, PAGE_SIZE, from); - if (unlikely(copied < PAGE_SIZE && iov_iter_count(from))) { - if (!ret) - ret = -EFAULT; - break; - } - ret += copied; buf->len = copied; + ret += copied; if (!iov_iter_count(from)) break; -- 2.25.1.362.g51ebf55