On Fri, 7 Jul 2023 at 10:21, Christian Brauner <brauner@xxxxxxxxxx> wrote: > > Forgot to say, fwiw, I've been running this through the LTP splice, > pipe, and ipc tests without issues. A hanging reader can be signaled > away cleanly with this. So that patch still has a couple of "wait for this" cases remaining. In particular, when we do a read, and we do have pipe buffers, both the read() system call and a number of internal splice functions will go "Ahh, I have data", and then do pipe_buf_confirm() and read it. Which then results in pipe_buf_confirm() blocking. It now blocks interruptibly, which is much nicer, but several of these users *could* just do a non-blocking confirmation instead, and wait for pipe readability. HOWEVER, that's slightly less trivial than you'd expect, because the "wait for readability" needs to be done without the pipe lock held - so you can't actually check the pipe buffer state at that point (since you need the pipe lock to look up the buffer). That's true even of "trivial" cases like actual user-space "read() with O_NONBLOCK and poll()" situations. Now, the solution to all this is *fairly* straightforward: (a) don't use "!pipe_empty()" for a readability check. We already have "pipe_readable()", but it's hidden in fs/pipe.c, so all the splice() code ended up writing the "does this pipe have data" using "!pipe_empty()" instead. (b) make "pipe_buf_confirm()" take a "non-blocking" boolean argument, and if it is non-blocking but hits one of those blocked pages, set "pipe->not_ready", and return -EAGAIN. This is ok, because "pipe_buf_confirm()" is always under the pipe lock, and we'll just clear "pipe->not_ready" under the pipe lock after finalizing all those pages (and before waking up readers) (c) make "pipe_wait_readable()" and "poll()" know about this all, so that we wait properly for a pipe that was not ready to become ready This all makes *most* users deal properly with these blocking events. In particular, things like splice_to_socket() can now do the whole proper "wait without holding the pipe lock" sequence, even when the pipe is not empty, just in this blocked state. This *may* also make all the cases Jens had with io_uring and splicing JustWork(tm). NOTE! NOTE! NOTE! Once more, this "feels right to me", and I'd argue that the basic approach is fairly straightfoward. The patch is also not horrendous. It all makes a fair amount of sense. BUT! I haven't tested this, and like the previous patch, I really would want people to think about this a lot. Comments? Jens? Linus
fs/fuse/dev.c | 4 +-- fs/pipe.c | 23 ++++--------- fs/splice.c | 82 +++++++++++++++++++++++++++++++++++------------ include/linux/pipe_fs_i.h | 25 +++++++++++++-- 4 files changed, 93 insertions(+), 41 deletions(-) diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c index 1a8f82f478cb..b891468dee06 100644 --- a/fs/fuse/dev.c +++ b/fs/fuse/dev.c @@ -700,7 +700,7 @@ static int fuse_copy_fill(struct fuse_copy_state *cs) struct pipe_buffer *buf = cs->pipebufs; if (!cs->write) { - err = pipe_buf_confirm(cs->pipe, buf); + err = pipe_buf_confirm(cs->pipe, buf, false); if (err) return err; @@ -800,7 +800,7 @@ static int fuse_try_move_page(struct fuse_copy_state *cs, struct page **pagep) fuse_copy_finish(cs); - err = pipe_buf_confirm(cs->pipe, buf); + err = pipe_buf_confirm(cs->pipe, buf, false); if (err) goto out_put_old; diff --git a/fs/pipe.c b/fs/pipe.c index 71942d240c98..25abf0c5c169 100644 --- a/fs/pipe.c +++ b/fs/pipe.c @@ -217,23 +217,13 @@ static const struct pipe_buf_operations anon_pipe_buf_ops = { .get = generic_pipe_buf_get, }; -/* Done while waiting without holding the pipe lock - thus the READ_ONCE() */ -static inline bool pipe_readable(const struct pipe_inode_info *pipe) -{ - unsigned int head = READ_ONCE(pipe->head); - unsigned int tail = READ_ONCE(pipe->tail); - unsigned int writers = READ_ONCE(pipe->writers); - - return !pipe_empty(head, tail) || !writers; -} - static ssize_t pipe_read(struct kiocb *iocb, struct iov_iter *to) { size_t total_len = iov_iter_count(to); struct file *filp = iocb->ki_filp; struct pipe_inode_info *pipe = filp->private_data; - bool was_full, wake_next_reader = false; + bool non_blocking, was_full, wake_next_reader = false; ssize_t ret; /* Null read succeeds. */ @@ -252,6 +242,8 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to) * data for us. */ was_full = pipe_full(pipe->head, pipe->tail, pipe->max_usage); + non_blocking = (filp->f_flags & O_NONBLOCK) || + (iocb->ki_flags & IOCB_NOWAIT); for (;;) { /* Read ->head with a barrier vs post_one_notification() */ unsigned int head = smp_load_acquire(&pipe->head); @@ -287,7 +279,7 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to) size_t chars, written; int error; - error = pipe_buf_confirm(pipe, buf); + error = pipe_buf_confirm(pipe, buf, non_blocking); if (error) { if (!ret) ret = error; @@ -342,8 +334,7 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to) break; if (ret) break; - if ((filp->f_flags & O_NONBLOCK) || - (iocb->ki_flags & IOCB_NOWAIT)) { + if (non_blocking) { ret = -EAGAIN; break; } @@ -462,7 +453,7 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from) if ((buf->flags & PIPE_BUF_FLAG_CAN_MERGE) && offset + chars <= PAGE_SIZE) { - ret = pipe_buf_confirm(pipe, buf); + ret = pipe_buf_confirm(pipe, buf, false); if (ret) goto out; @@ -678,7 +669,7 @@ pipe_poll(struct file *filp, poll_table *wait) mask = 0; if (filp->f_mode & FMODE_READ) { - if (!pipe_empty(head, tail)) + if (!pipe_empty(head, tail) && !READ_ONCE(pipe->not_ready)) mask |= EPOLLIN | EPOLLRDNORM; if (!pipe->writers && filp->f_version != pipe->w_counter) mask |= EPOLLHUP; diff --git a/fs/splice.c b/fs/splice.c index 503f7eff41b6..49139413457d 100644 --- a/fs/splice.c +++ b/fs/splice.c @@ -116,9 +116,13 @@ static void page_cache_pipe_buf_release(struct pipe_inode_info *pipe, /* * Check whether the contents of buf is OK to access. Since the content * is a page cache page, IO may be in flight. + * + * Note: we don't react to 'non_blocking', because we have no wakeup + * event for any polling. */ static int page_cache_pipe_buf_confirm(struct pipe_inode_info *pipe, - struct pipe_buffer *buf) + struct pipe_buffer *buf, + bool non_blocking) { struct page *page = buf->page; int err; @@ -307,12 +311,31 @@ static void finalize_pipe_buf(struct pipe_buffer *buf, unsigned int chunk) unlock_page(buf->page); } +/* + * This is called with the pipe locked by the read path, so will + * not race with 'finalize_pipe_buf()'. + * + * If it finds a locked page, and we're doing a non-blocking read, + * just set 'pipe->not_ready' and return -EINTR. We will clear that + * and wake up readers after finalizing the pending IO. + * + * NOTE! We have to do it this indirect way because 'pipe_poll()' + * is run without any pipe locking, so we can't check the buffer + * state at polling time. + */ static int busy_pipe_buf_confirm(struct pipe_inode_info *pipe, - struct pipe_buffer *buf) + struct pipe_buffer *buf, + bool non_blocking) { - struct page *page = buf->page; + struct folio *folio = page_folio(buf->page); - if (folio_wait_bit_interruptible(page_folio(page), PG_locked)) + if (!folio_test_locked(folio)) + return 0; + if (non_blocking) { + pipe->not_ready = true; + return -EAGAIN; + } + if (folio_wait_bit_interruptible(folio, PG_locked)) return -EINTR; return 0; } @@ -453,6 +476,14 @@ ssize_t copy_splice_read(struct file *in, loff_t *ppos, } pipe_lock(pipe); + /* + * Now that we have finalized any pending pipe buffers + * and re-taken the pipe lock, make sure to tell poll() + * and pipe_readable() that the buffers are usable again. + * + * The caller will be doing the pipe->rd_wait wakeup. + */ + pipe->not_ready = 0; kfree(bv); return ret; } @@ -510,7 +541,11 @@ static int splice_from_pipe_feed(struct pipe_inode_info *pipe, struct splice_des while (!pipe_empty(head, tail)) { struct pipe_buffer *buf = &pipe->bufs[tail & mask]; - ret = pipe_buf_confirm(pipe, buf); + /* + * Do this as a non-blocking confirm. We'll wait for + * readability if required in splice_from_pipe_next(). + */ + ret = pipe_buf_confirm(pipe, buf, true); if (unlikely(ret)) { if (ret == -ENODATA) ret = 0; @@ -584,10 +619,7 @@ static int splice_from_pipe_next(struct pipe_inode_info *pipe, struct splice_des return -ERESTARTSYS; repeat: - while (pipe_empty(pipe->head, pipe->tail)) { - if (!pipe->writers) - return 0; - + while (!pipe_readable(pipe)) { if (sd->num_spliced) return 0; @@ -608,6 +640,9 @@ static int splice_from_pipe_next(struct pipe_inode_info *pipe, struct splice_des if (eat_empty_buffer(pipe)) goto repeat; + if (!pipe->writers) + return 0; + return 1; } @@ -772,7 +807,7 @@ iter_file_splice_write(struct pipe_inode_info *pipe, struct file *out, struct pipe_buffer *buf = &pipe->bufs[tail & mask]; size_t this_len; - ret = pipe_buf_confirm(pipe, buf); + ret = pipe_buf_confirm(pipe, buf, false); if (unlikely(ret)) { if (ret == -ENODATA) ret = 0; @@ -863,6 +898,7 @@ ssize_t splice_to_socket(struct pipe_inode_info *pipe, struct file *out, unsigned int head, tail, mask, bc = 0; size_t remain = len; +repeat: /* * Check for signal early to make process killable when there * are always buffers available @@ -871,11 +907,8 @@ ssize_t splice_to_socket(struct pipe_inode_info *pipe, struct file *out, if (signal_pending(current)) break; - while (pipe_empty(pipe->head, pipe->tail)) { + while (!pipe_readable(pipe)) { ret = 0; - if (!pipe->writers) - goto out; - if (spliced) goto out; @@ -894,17 +927,28 @@ ssize_t splice_to_socket(struct pipe_inode_info *pipe, struct file *out, pipe_wait_readable(pipe); } + ret = 0; + if (!pipe->writers) + goto out; + head = pipe->head; tail = pipe->tail; mask = pipe->ring_size - 1; - while (!pipe_empty(head, tail)) { + while (pipe_readable(pipe)) { struct pipe_buffer *buf = &pipe->bufs[tail & mask]; size_t seg; - ret = pipe_buf_confirm(pipe, buf); + ret = pipe_buf_confirm(pipe, buf, true); if (unlikely(ret)) { + if (ret == -EAGAIN) { + ret = 0; + if (bc || spliced) + break; + goto repeat; + } + if (ret == -ENODATA) ret = 0; break; @@ -1658,19 +1702,17 @@ static int ipipe_prep(struct pipe_inode_info *pipe, unsigned int flags) * Check the pipe occupancy without the inode lock first. This function * is speculative anyways, so missing one is ok. */ - if (!pipe_empty(pipe->head, pipe->tail)) + if (pipe_readable(pipe)) return 0; ret = 0; pipe_lock(pipe); - while (pipe_empty(pipe->head, pipe->tail)) { + while (!pipe_readable(pipe)) { if (signal_pending(current)) { ret = -ERESTARTSYS; break; } - if (!pipe->writers) - break; if (flags & SPLICE_F_NONBLOCK) { ret = -EAGAIN; break; diff --git a/include/linux/pipe_fs_i.h b/include/linux/pipe_fs_i.h index 02e0086b10f6..7759779a3561 100644 --- a/include/linux/pipe_fs_i.h +++ b/include/linux/pipe_fs_i.h @@ -41,6 +41,7 @@ struct pipe_buffer { * @note_loss: The next read() should insert a data-lost message * @max_usage: The maximum number of slots that may be used in the ring * @ring_size: total number of buffers (should be a power of 2) + * @not_ready: pipe has buffers, but poll shouldn't return success yet * @nr_accounted: The amount this pipe accounts for in user->pipe_bufs * @tmp_page: cached released page * @readers: number of current readers of this pipe @@ -62,6 +63,7 @@ struct pipe_inode_info { unsigned int tail; unsigned int max_usage; unsigned int ring_size; + bool not_ready; #ifdef CONFIG_WATCH_QUEUE bool note_loss; #endif @@ -100,7 +102,7 @@ struct pipe_buf_operations { * hook. Returns 0 for good, or a negative error value in case of * error. If not present all pages are considered good. */ - int (*confirm)(struct pipe_inode_info *, struct pipe_buffer *); + int (*confirm)(struct pipe_inode_info *, struct pipe_buffer *, bool); /* * When the contents of this pipe buffer has been completely @@ -156,6 +158,22 @@ static inline bool pipe_full(unsigned int head, unsigned int tail, return pipe_occupancy(head, tail) >= limit; } +/** + * pipe_readable - Return true of there is data to be read + * @pipe: The pipe to check + * + * This can be done while waiting without holding the pipe lock - thus the READ_ONCE() + */ +static inline bool pipe_readable(const struct pipe_inode_info *pipe) +{ + unsigned int head = READ_ONCE(pipe->head); + unsigned int tail = READ_ONCE(pipe->tail); + unsigned int writers = READ_ONCE(pipe->writers); + bool ready = !READ_ONCE(pipe->not_ready); + + return (ready && !pipe_empty(head, tail)) || !writers; +} + /** * pipe_buf - Return the pipe buffer for the specified slot in the pipe ring * @pipe: The pipe to access @@ -209,11 +227,12 @@ static inline void pipe_buf_release(struct pipe_inode_info *pipe, * @buf: the buffer to confirm */ static inline int pipe_buf_confirm(struct pipe_inode_info *pipe, - struct pipe_buffer *buf) + struct pipe_buffer *buf, + bool non_blocking) { if (!buf->ops->confirm) return 0; - return buf->ops->confirm(pipe, buf); + return buf->ops->confirm(pipe, buf, non_blocking); } /**