On 02/03, K Prateek Nayak wrote: > > With the below patch on mainline, I see more improvements for a > modified version of sched-messaging (sched-messaging is same as > hackbench as you noted on the parallel thread) that uses > pipe2(O_NOATIME) Thanks, > The original regression is still noticeable despite the improvements > but if folks believe this is a corner case with the original changes > exhibited by sched-messaging, I'll just continue further testing with > the new baseline. I still don't know if we should worry or not... But if we want to try to improve the wake_writer logic, then I think it makes sense to cleanup this code first. IMO the (untested) patch below makes sense regardless, I am going to send it after I grep fs/splice.c a bit more. a194dfe6e6f6f ("pipe: Rearrange sequence in pipe_write() to preallocate slot") changed pipe_write() to increment pipe->head in advance. IIUC to avoid the race with the post_one_notification()-like code which can add another buffer under pipe->rd_wait.lock without pipe->mutex. This is no longer necessary after c73be61cede ("pipe: Add general notification queue support"), pipe_write() checks pipe_has_watch_queue() and returns -EXDEV at the start. And can't help in any case, pipe_write() no longer takes this spinlock. Change pipe_write() to call copy_page_from_iter() first and do nothing if it fails. This way pipe_write() can't add a zero-sized bufer and we can simplify pipe_read() which currently has to handle this very unlikely case. Oleg. diff --git a/fs/pipe.c b/fs/pipe.c index baaa8c0817f1..0816070a5e7a 100644 --- a/fs/pipe.c +++ b/fs/pipe.c @@ -312,6 +312,7 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to) size_t written; int error; + WARN_ON_ONCE(chars == 0); if (chars > total_len) { if (buf->flags & PIPE_BUF_FLAG_WHOLE) { if (ret == 0) @@ -365,29 +366,9 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to) break; } mutex_unlock(&pipe->mutex); - /* * We only get here if we didn't actually read anything. * - * However, we could have seen (and removed) a zero-sized - * pipe buffer, and might have made space in the buffers - * that way. - * - * You can't make zero-sized pipe buffers by doing an empty - * write (not even in packet mode), but they can happen if - * the writer gets an EFAULT when trying to fill a buffer - * that already got allocated and inserted in the buffer - * array. - * - * So we still need to wake up any pending writers in the - * _very_ unlikely case that the pipe was full, but we got - * no data. - */ - if (unlikely(wake_writer)) - wake_up_interruptible_sync_poll(&pipe->wr_wait, EPOLLOUT | EPOLLWRNORM); - kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT); - - /* * But because we didn't read anything, at this point we can * just return directly with -ERESTARTSYS if we're interrupted, * since we've done any required wakeups and there's no need @@ -396,7 +377,6 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to) if (wait_event_interruptible_exclusive(pipe->rd_wait, pipe_readable(pipe)) < 0) return -ERESTARTSYS; - wake_writer = false; wake_next_reader = true; mutex_lock(&pipe->mutex); } @@ -524,31 +504,25 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from) pipe->tmp_page = page; } - /* Allocate a slot in the ring in advance and attach an - * empty buffer. If we fault or otherwise fail to use - * it, either the reader will consume it or it'll still - * be there for the next write. - */ - pipe->head = head + 1; + copied = copy_page_from_iter(page, 0, PAGE_SIZE, from); + if (unlikely(copied < PAGE_SIZE && iov_iter_count(from))) { + if (!ret) + ret = -EFAULT; + break; + } + pipe->head = head + 1; + pipe->tmp_page = NULL; /* Insert it into the buffer array */ buf = &pipe->bufs[head & mask]; buf->page = page; buf->ops = &anon_pipe_buf_ops; buf->offset = 0; - buf->len = 0; if (is_packetized(filp)) buf->flags = PIPE_BUF_FLAG_PACKET; else buf->flags = PIPE_BUF_FLAG_CAN_MERGE; - pipe->tmp_page = NULL; - copied = copy_page_from_iter(page, 0, PAGE_SIZE, from); - if (unlikely(copied < PAGE_SIZE && iov_iter_count(from))) { - if (!ret) - ret = -EFAULT; - break; - } ret += copied; buf->len = copied;