Re: Pending splice(file -> FIFO) excludes all other FIFO operations forever (was: ... always blocks read(FIFO), regardless of O_NONBLOCK on read side?)

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Fri, 7 Jul 2023 at 10:21, Christian Brauner <brauner@xxxxxxxxxx> wrote:
>
> Forgot to say, fwiw, I've been running this through the LTP splice,
> pipe, and ipc tests without issues. A hanging reader can be signaled
> away cleanly with this.

So that patch still has a couple of "wait for this" cases remaining.

In particular, when we do a read, and we do have pipe buffers, both
the read() system call and a number of internal splice functions will
go "Ahh, I have data", and then do pipe_buf_confirm() and read it.

Which then results in pipe_buf_confirm() blocking. It now blocks
interruptibly, which is much nicer, but several of these users *could*
just do a non-blocking confirmation instead, and wait for pipe
readability.

HOWEVER, that's slightly less trivial than you'd expect, because the
"wait for readability" needs to be done without the pipe lock held -
so you can't actually check the pipe buffer state at that point (since
you need the pipe lock to look up the buffer).

That's true even of "trivial" cases like actual user-space "read()
with O_NONBLOCK and poll()" situations.

Now, the solution to all this is *fairly* straightforward:

 (a) don't use "!pipe_empty()" for a readability check.

     We already have "pipe_readable()", but it's hidden in fs/pipe.c,
so all the splice() code ended up writing the "does this pipe have
data" using "!pipe_empty()" instead.

 (b) make "pipe_buf_confirm()" take a "non-blocking" boolean argument,
and if it is non-blocking but hits one of those blocked pages, set
"pipe->not_ready", and return -EAGAIN.

     This is ok, because "pipe_buf_confirm()" is always under the pipe
lock, and we'll just clear "pipe->not_ready" under the pipe lock after
finalizing all those pages (and before waking up readers)

 (c) make "pipe_wait_readable()" and "poll()" know about this all, so
that we wait properly for a pipe that was not ready to become ready

This all makes *most* users deal properly with these blocking events.
In particular, things like splice_to_socket() can now do the whole
proper "wait without holding the pipe lock" sequence, even when the
pipe is not empty, just in this blocked state.

This *may* also make all the cases Jens had with io_uring and splicing
JustWork(tm).

NOTE! NOTE! NOTE! Once more, this "feels right to me", and I'd argue
that the basic approach is fairly straightfoward. The patch is also
not horrendous. It all makes a fair amount of sense. BUT! I haven't
tested this, and like the previous patch, I really would want people
to think about this a lot.

Comments? Jens?

                Linus
 fs/fuse/dev.c             |  4 +--
 fs/pipe.c                 | 23 ++++---------
 fs/splice.c               | 82 +++++++++++++++++++++++++++++++++++------------
 include/linux/pipe_fs_i.h | 25 +++++++++++++--
 4 files changed, 93 insertions(+), 41 deletions(-)

diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
index 1a8f82f478cb..b891468dee06 100644
--- a/fs/fuse/dev.c
+++ b/fs/fuse/dev.c
@@ -700,7 +700,7 @@ static int fuse_copy_fill(struct fuse_copy_state *cs)
 		struct pipe_buffer *buf = cs->pipebufs;
 
 		if (!cs->write) {
-			err = pipe_buf_confirm(cs->pipe, buf);
+			err = pipe_buf_confirm(cs->pipe, buf, false);
 			if (err)
 				return err;
 
@@ -800,7 +800,7 @@ static int fuse_try_move_page(struct fuse_copy_state *cs, struct page **pagep)
 
 	fuse_copy_finish(cs);
 
-	err = pipe_buf_confirm(cs->pipe, buf);
+	err = pipe_buf_confirm(cs->pipe, buf, false);
 	if (err)
 		goto out_put_old;
 
diff --git a/fs/pipe.c b/fs/pipe.c
index 71942d240c98..25abf0c5c169 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -217,23 +217,13 @@ static const struct pipe_buf_operations anon_pipe_buf_ops = {
 	.get		= generic_pipe_buf_get,
 };
 
-/* Done while waiting without holding the pipe lock - thus the READ_ONCE() */
-static inline bool pipe_readable(const struct pipe_inode_info *pipe)
-{
-	unsigned int head = READ_ONCE(pipe->head);
-	unsigned int tail = READ_ONCE(pipe->tail);
-	unsigned int writers = READ_ONCE(pipe->writers);
-
-	return !pipe_empty(head, tail) || !writers;
-}
-
 static ssize_t
 pipe_read(struct kiocb *iocb, struct iov_iter *to)
 {
 	size_t total_len = iov_iter_count(to);
 	struct file *filp = iocb->ki_filp;
 	struct pipe_inode_info *pipe = filp->private_data;
-	bool was_full, wake_next_reader = false;
+	bool non_blocking, was_full, wake_next_reader = false;
 	ssize_t ret;
 
 	/* Null read succeeds. */
@@ -252,6 +242,8 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
 	 * data for us.
 	 */
 	was_full = pipe_full(pipe->head, pipe->tail, pipe->max_usage);
+	non_blocking = (filp->f_flags & O_NONBLOCK) ||
+			(iocb->ki_flags & IOCB_NOWAIT);
 	for (;;) {
 		/* Read ->head with a barrier vs post_one_notification() */
 		unsigned int head = smp_load_acquire(&pipe->head);
@@ -287,7 +279,7 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
 			size_t chars, written;
 			int error;
 
-			error = pipe_buf_confirm(pipe, buf);
+			error = pipe_buf_confirm(pipe, buf, non_blocking);
 			if (error) {
 				if (!ret)
 					ret = error;
@@ -342,8 +334,7 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
 			break;
 		if (ret)
 			break;
-		if ((filp->f_flags & O_NONBLOCK) ||
-		    (iocb->ki_flags & IOCB_NOWAIT)) {
+		if (non_blocking) {
 			ret = -EAGAIN;
 			break;
 		}
@@ -462,7 +453,7 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
 
 		if ((buf->flags & PIPE_BUF_FLAG_CAN_MERGE) &&
 		    offset + chars <= PAGE_SIZE) {
-			ret = pipe_buf_confirm(pipe, buf);
+			ret = pipe_buf_confirm(pipe, buf, false);
 			if (ret)
 				goto out;
 
@@ -678,7 +669,7 @@ pipe_poll(struct file *filp, poll_table *wait)
 
 	mask = 0;
 	if (filp->f_mode & FMODE_READ) {
-		if (!pipe_empty(head, tail))
+		if (!pipe_empty(head, tail) && !READ_ONCE(pipe->not_ready))
 			mask |= EPOLLIN | EPOLLRDNORM;
 		if (!pipe->writers && filp->f_version != pipe->w_counter)
 			mask |= EPOLLHUP;
diff --git a/fs/splice.c b/fs/splice.c
index 503f7eff41b6..49139413457d 100644
--- a/fs/splice.c
+++ b/fs/splice.c
@@ -116,9 +116,13 @@ static void page_cache_pipe_buf_release(struct pipe_inode_info *pipe,
 /*
  * Check whether the contents of buf is OK to access. Since the content
  * is a page cache page, IO may be in flight.
+ *
+ * Note: we don't react to 'non_blocking', because we have no wakeup
+ * event for any polling.
  */
 static int page_cache_pipe_buf_confirm(struct pipe_inode_info *pipe,
-				       struct pipe_buffer *buf)
+				       struct pipe_buffer *buf,
+				       bool non_blocking)
 {
 	struct page *page = buf->page;
 	int err;
@@ -307,12 +311,31 @@ static void finalize_pipe_buf(struct pipe_buffer *buf, unsigned int chunk)
 	unlock_page(buf->page);
 }
 
+/*
+ * This is called with the pipe locked by the read path, so will
+ * not race with 'finalize_pipe_buf()'.
+ *
+ * If it finds a locked page, and we're doing a non-blocking read,
+ * just set 'pipe->not_ready' and return -EINTR. We will clear that
+ * and wake up readers after finalizing the pending IO.
+ *
+ * NOTE! We have to do it this indirect way because 'pipe_poll()'
+ * is run without any pipe locking, so we can't check the buffer
+ * state at polling time.
+ */
 static int busy_pipe_buf_confirm(struct pipe_inode_info *pipe,
-				 struct pipe_buffer *buf)
+				 struct pipe_buffer *buf,
+				 bool non_blocking)
 {
-	struct page *page = buf->page;
+	struct folio *folio = page_folio(buf->page);
 
-	if (folio_wait_bit_interruptible(page_folio(page), PG_locked))
+	if (!folio_test_locked(folio))
+		return 0;
+	if (non_blocking) {
+		pipe->not_ready = true;
+		return -EAGAIN;
+	}
+	if (folio_wait_bit_interruptible(folio, PG_locked))
 		return -EINTR;
 	return 0;
 }
@@ -453,6 +476,14 @@ ssize_t copy_splice_read(struct file *in, loff_t *ppos,
 	}
 
 	pipe_lock(pipe);
+	/*
+	 * Now that we have finalized any pending pipe buffers
+	 * and re-taken the pipe lock, make sure to tell poll()
+	 * and pipe_readable() that the buffers are usable again.
+	 *
+	 * The caller will be doing the pipe->rd_wait wakeup.
+	 */
+	pipe->not_ready = 0;
 	kfree(bv);
 	return ret;
 }
@@ -510,7 +541,11 @@ static int splice_from_pipe_feed(struct pipe_inode_info *pipe, struct splice_des
 	while (!pipe_empty(head, tail)) {
 		struct pipe_buffer *buf = &pipe->bufs[tail & mask];
 
-		ret = pipe_buf_confirm(pipe, buf);
+		/*
+		 * Do this as a non-blocking confirm. We'll wait for
+		 * readability if required in splice_from_pipe_next().
+		 */
+		ret = pipe_buf_confirm(pipe, buf, true);
 		if (unlikely(ret)) {
 			if (ret == -ENODATA)
 				ret = 0;
@@ -584,10 +619,7 @@ static int splice_from_pipe_next(struct pipe_inode_info *pipe, struct splice_des
 		return -ERESTARTSYS;
 
 repeat:
-	while (pipe_empty(pipe->head, pipe->tail)) {
-		if (!pipe->writers)
-			return 0;
-
+	while (!pipe_readable(pipe)) {
 		if (sd->num_spliced)
 			return 0;
 
@@ -608,6 +640,9 @@ static int splice_from_pipe_next(struct pipe_inode_info *pipe, struct splice_des
 	if (eat_empty_buffer(pipe))
 		goto repeat;
 
+	if (!pipe->writers)
+		return 0;
+
 	return 1;
 }
 
@@ -772,7 +807,7 @@ iter_file_splice_write(struct pipe_inode_info *pipe, struct file *out,
 			struct pipe_buffer *buf = &pipe->bufs[tail & mask];
 			size_t this_len;
 
-			ret = pipe_buf_confirm(pipe, buf);
+			ret = pipe_buf_confirm(pipe, buf, false);
 			if (unlikely(ret)) {
 				if (ret == -ENODATA)
 					ret = 0;
@@ -863,6 +898,7 @@ ssize_t splice_to_socket(struct pipe_inode_info *pipe, struct file *out,
 		unsigned int head, tail, mask, bc = 0;
 		size_t remain = len;
 
+repeat:
 		/*
 		 * Check for signal early to make process killable when there
 		 * are always buffers available
@@ -871,11 +907,8 @@ ssize_t splice_to_socket(struct pipe_inode_info *pipe, struct file *out,
 		if (signal_pending(current))
 			break;
 
-		while (pipe_empty(pipe->head, pipe->tail)) {
+		while (!pipe_readable(pipe)) {
 			ret = 0;
-			if (!pipe->writers)
-				goto out;
-
 			if (spliced)
 				goto out;
 
@@ -894,17 +927,28 @@ ssize_t splice_to_socket(struct pipe_inode_info *pipe, struct file *out,
 
 			pipe_wait_readable(pipe);
 		}
+		ret = 0;
+		if (!pipe->writers)
+			goto out;
+
 
 		head = pipe->head;
 		tail = pipe->tail;
 		mask = pipe->ring_size - 1;
 
-		while (!pipe_empty(head, tail)) {
+		while (pipe_readable(pipe)) {
 			struct pipe_buffer *buf = &pipe->bufs[tail & mask];
 			size_t seg;
 
-			ret = pipe_buf_confirm(pipe, buf);
+			ret = pipe_buf_confirm(pipe, buf, true);
 			if (unlikely(ret)) {
+				if (ret == -EAGAIN) {
+					ret = 0;
+					if (bc || spliced)
+						break;
+					goto repeat;
+				}
+
 				if (ret == -ENODATA)
 					ret = 0;
 				break;
@@ -1658,19 +1702,17 @@ static int ipipe_prep(struct pipe_inode_info *pipe, unsigned int flags)
 	 * Check the pipe occupancy without the inode lock first. This function
 	 * is speculative anyways, so missing one is ok.
 	 */
-	if (!pipe_empty(pipe->head, pipe->tail))
+	if (pipe_readable(pipe))
 		return 0;
 
 	ret = 0;
 	pipe_lock(pipe);
 
-	while (pipe_empty(pipe->head, pipe->tail)) {
+	while (!pipe_readable(pipe)) {
 		if (signal_pending(current)) {
 			ret = -ERESTARTSYS;
 			break;
 		}
-		if (!pipe->writers)
-			break;
 		if (flags & SPLICE_F_NONBLOCK) {
 			ret = -EAGAIN;
 			break;
diff --git a/include/linux/pipe_fs_i.h b/include/linux/pipe_fs_i.h
index 02e0086b10f6..7759779a3561 100644
--- a/include/linux/pipe_fs_i.h
+++ b/include/linux/pipe_fs_i.h
@@ -41,6 +41,7 @@ struct pipe_buffer {
  *	@note_loss: The next read() should insert a data-lost message
  *	@max_usage: The maximum number of slots that may be used in the ring
  *	@ring_size: total number of buffers (should be a power of 2)
+ *	@not_ready: pipe has buffers, but poll shouldn't return success yet
  *	@nr_accounted: The amount this pipe accounts for in user->pipe_bufs
  *	@tmp_page: cached released page
  *	@readers: number of current readers of this pipe
@@ -62,6 +63,7 @@ struct pipe_inode_info {
 	unsigned int tail;
 	unsigned int max_usage;
 	unsigned int ring_size;
+	bool not_ready;
 #ifdef CONFIG_WATCH_QUEUE
 	bool note_loss;
 #endif
@@ -100,7 +102,7 @@ struct pipe_buf_operations {
 	 * hook. Returns 0 for good, or a negative error value in case of
 	 * error.  If not present all pages are considered good.
 	 */
-	int (*confirm)(struct pipe_inode_info *, struct pipe_buffer *);
+	int (*confirm)(struct pipe_inode_info *, struct pipe_buffer *, bool);
 
 	/*
 	 * When the contents of this pipe buffer has been completely
@@ -156,6 +158,22 @@ static inline bool pipe_full(unsigned int head, unsigned int tail,
 	return pipe_occupancy(head, tail) >= limit;
 }
 
+/**
+ * pipe_readable - Return true of there is data to be read
+ * @pipe: The pipe to check
+ *
+ * This can be done while waiting without holding the pipe lock - thus the READ_ONCE()
+ */
+static inline bool pipe_readable(const struct pipe_inode_info *pipe)
+{
+	unsigned int head = READ_ONCE(pipe->head);
+	unsigned int tail = READ_ONCE(pipe->tail);
+	unsigned int writers = READ_ONCE(pipe->writers);
+	bool ready = !READ_ONCE(pipe->not_ready);
+
+	return (ready && !pipe_empty(head, tail)) || !writers;
+}
+
 /**
  * pipe_buf - Return the pipe buffer for the specified slot in the pipe ring
  * @pipe: The pipe to access
@@ -209,11 +227,12 @@ static inline void pipe_buf_release(struct pipe_inode_info *pipe,
  * @buf:	the buffer to confirm
  */
 static inline int pipe_buf_confirm(struct pipe_inode_info *pipe,
-				   struct pipe_buffer *buf)
+				   struct pipe_buffer *buf,
+				   bool non_blocking)
 {
 	if (!buf->ops->confirm)
 		return 0;
-	return buf->ops->confirm(pipe, buf);
+	return buf->ops->confirm(pipe, buf, non_blocking);
 }
 
 /**

[Index of Archives]     [Linux Ext4 Filesystem]     [Union Filesystem]     [Filesystem Testing]     [Ceph Users]     [Ecryptfs]     [NTFS 3]     [AutoFS]     [Kernel Newbies]     [Share Photos]     [Security]     [Netfilter]     [Bugtraq]     [Yosemite News]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux Cachefs]     [Reiser Filesystem]     [Linux RAID]     [NTFS 3]     [Samba]     [Device Mapper]     [CEPH Development]

  Powered by Linux