Re: [PATCH] pipe_read: don't wake up the writer if the pipe is still full

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Hi Oleg,

On 2/28/2025 10:03 PM, Oleg Nesterov wrote:
And... I know, I know you already hate me ;)


Not at all :)

but if you have time, could you check if this patch (with or without the
previous debugging patch) makes any difference? Just to be sure.


Sure, I will give this a try.

But in the meanwhile me and Prateek tried some of the experiments in the weekend.
We were able to reproduce this issue on a third generation EPYC system as well as
on an Intel Emerald Rapids (2 X INTEL(R) XEON(R) PLATINUM 8592+).

We tried heavy hammered tracing approach over the weekend on top of your debug patch.
I have attached the debug patch below. With tracing we found the following case for
pipe_writable():

  hackbench-118768  [206] .....  1029.550601: pipe_write: 000000005eea28ff: 0: 37 38 16: 1

Here,

head = 37
tail = 38
max_usage = 16
pipe_full() returns 1.

Between reading of head and later the tail, the tail seems to have moved ahead of the
head leading to wraparound. Applying the following changes I have not yet run into a
hang on the original machine where I first saw it:

diff --git a/fs/pipe.c b/fs/pipe.c
index ce1af7592780..a1931c817822 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -417,9 +417,19 @@ static inline int is_packetized(struct file *file)
 /* Done while waiting without holding the pipe lock - thus the READ_ONCE() */
 static inline bool pipe_writable(const struct pipe_inode_info *pipe)
 {
-	unsigned int head = READ_ONCE(pipe->head);
-	unsigned int tail = READ_ONCE(pipe->tail);
 	unsigned int max_usage = READ_ONCE(pipe->max_usage);
+	unsigned int head, tail;
+
+	tail = READ_ONCE(pipe->tail);
+	/*
+	 * Since the unsigned arithmetic in this lockless preemptible context
+	 * relies on the fact that the tail can never be ahead of head, read
+	 * the head after the tail to ensure we've not missed any updates to
+	 * the head. Reordering the reads can cause wraparounds and give the
+	 * illusion that the pipe is full.
+	 */
+	smp_rmb();
+	head = READ_ONCE(pipe->head);
return !pipe_full(head, tail, max_usage) ||
 		!READ_ONCE(pipe->readers);
---

smp_rmb() on x86 is a nop and even without the barrier we were not able to
reproduce the hang even after 10000 iterations.

If you think this is a genuine bug fix, I will send a patch for this.

Thanks to Prateek who was actively involved in this debug.

--
Thanks and Regards,
Swapnil

Oleg.
---

diff --git a/fs/pipe.c b/fs/pipe.c
index 4336b8cccf84..524b8845523e 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -445,7 +445,7 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *from)
  		return 0;
mutex_lock(&pipe->mutex);
-
+again:
  	if (!pipe->readers) {
  		send_sig(SIGPIPE, current, 0);
  		ret = -EPIPE;
@@ -467,20 +467,24 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *from)
  		unsigned int mask = pipe->ring_size - 1;
  		struct pipe_buffer *buf = &pipe->bufs[(head - 1) & mask];
  		int offset = buf->offset + buf->len;
+		int xxx;
if ((buf->flags & PIPE_BUF_FLAG_CAN_MERGE) &&
  		    offset + chars <= PAGE_SIZE) {
-			ret = pipe_buf_confirm(pipe, buf);
-			if (ret)
+			xxx = pipe_buf_confirm(pipe, buf);
+			if (xxx) {
+				if (!ret) ret = xxx;
  				goto out;
+			}
- ret = copy_page_from_iter(buf->page, offset, chars, from);
-			if (unlikely(ret < chars)) {
-				ret = -EFAULT;
+			xxx = copy_page_from_iter(buf->page, offset, chars, from);
+			if (unlikely(xxx < chars)) {
+				if (!ret) ret = -EFAULT;
  				goto out;
  			}
- buf->len += ret;
+			ret += xxx;
+			buf->len += xxx;
  			if (!iov_iter_count(from))
  				goto out;
  		}
@@ -567,6 +571,7 @@ atomic_inc(&WR_SLEEP);
  		mutex_lock(&pipe->mutex);
  		was_empty = pipe_empty(pipe->head, pipe->tail);
  		wake_next_writer = true;
+		goto again;
  	}
  out:
  	if (pipe_full(pipe->head, pipe->tail, pipe->max_usage))

diff --git a/fs/pipe.c b/fs/pipe.c
index 82fede0f2111..a0b737a8b8f9 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -217,6 +217,20 @@ static inline bool pipe_readable(const struct pipe_inode_info *pipe)
 	return !pipe_empty(head, tail) || !writers;
 }
 
+/* Done while waiting without holding the pipe lock - thus the READ_ONCE() */
+static inline bool pipe_readable_sleep_check(const struct pipe_inode_info *pipe)
+{
+	unsigned int head = READ_ONCE(pipe->head);
+	unsigned int tail = READ_ONCE(pipe->tail);
+	unsigned int writers = READ_ONCE(pipe->writers);
+	bool empty = pipe_empty(head, tail);
+	bool ret = !empty || !writers;
+
+	trace_printk("%p: %d: %u %u: %d\n", (void*)pipe, ret, head, tail, empty);
+
+	return ret;
+}
+
 static inline unsigned int pipe_update_tail(struct pipe_inode_info *pipe,
 					    struct pipe_buffer *buf,
 					    unsigned int tail)
@@ -243,6 +257,7 @@ static inline unsigned int pipe_update_tail(struct pipe_inode_info *pipe,
 	 * Without a watch_queue, we can simply increment the tail
 	 * without the spinlock - the mutex is enough.
 	 */
+	trace_printk("%p: t: %u -> %u\n", (void*)pipe, pipe->tail, pipe->tail + 1);
 	pipe->tail = ++tail;
 	return tail;
 }
@@ -388,7 +403,7 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
 		 * since we've done any required wakeups and there's no need
 		 * to mark anything accessed. And we've dropped the lock.
 		 */
-		if (wait_event_interruptible_exclusive(pipe->rd_wait, pipe_readable(pipe)) < 0)
+		if (wait_event_interruptible_exclusive(pipe->rd_wait, pipe_readable_sleep_check(pipe)) < 0)
 			return -ERESTARTSYS;
 
 		wake_writer = false;
@@ -397,6 +412,8 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
 	}
 	if (pipe_empty(pipe->head, pipe->tail))
 		wake_next_reader = false;
+	if (ret > 0)
+		pipe->r_cnt++;
 	mutex_unlock(&pipe->mutex);
 
 	if (wake_writer)
@@ -425,6 +442,19 @@ static inline bool pipe_writable(const struct pipe_inode_info *pipe)
 		!READ_ONCE(pipe->readers);
 }
 
+/* Done while waiting without holding the pipe lock - thus the READ_ONCE() */
+static inline bool pipe_writable_sleep_check(const struct pipe_inode_info *pipe)
+{
+	unsigned int head = READ_ONCE(pipe->head);
+	unsigned int tail = READ_ONCE(pipe->tail);
+	unsigned int max_usage = READ_ONCE(pipe->max_usage);
+	bool full = pipe_full(head, tail, max_usage);
+	bool ret = !full || !READ_ONCE(pipe->readers);
+
+	trace_printk("%p: %d: %u %u %u: %d\n", (void*)pipe, ret, head, tail, max_usage, full);
+	return ret;
+}
+
 static ssize_t
 pipe_write(struct kiocb *iocb, struct iov_iter *from)
 {
@@ -490,6 +520,7 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
 			}
 
 			buf->len += ret;
+			trace_printk("%p: m: %u\n", (void*)pipe, head);
 			if (!iov_iter_count(from))
 				goto out;
 		}
@@ -525,6 +556,7 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
 			 * be there for the next write.
 			 */
 			pipe->head = head + 1;
+			trace_printk("%p: h: %u -> %u\n", (void*)pipe, head, head + 1);
 
 			/* Insert it into the buffer array */
 			buf = &pipe->bufs[head & mask];
@@ -577,7 +609,7 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
 		if (was_empty)
 			wake_up_interruptible_sync_poll(&pipe->rd_wait, EPOLLIN | EPOLLRDNORM);
 		kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
-		wait_event_interruptible_exclusive(pipe->wr_wait, pipe_writable(pipe));
+		wait_event_interruptible_exclusive(pipe->wr_wait, pipe_writable_sleep_check(pipe));
 		mutex_lock(&pipe->mutex);
 		was_empty = pipe_empty(pipe->head, pipe->tail);
 		wake_next_writer = true;
@@ -585,6 +617,8 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
 out:
 	if (pipe_full(pipe->head, pipe->tail, pipe->max_usage))
 		wake_next_writer = false;
+	if (ret > 0)
+		pipe->w_cnt++;
 	mutex_unlock(&pipe->mutex);
 
 	/*
@@ -705,6 +739,50 @@ pipe_poll(struct file *filp, poll_table *wait)
 	return mask;
 }
 
+static DEFINE_MUTEX(PI_MUTEX);
+static LIST_HEAD(PI_LIST);
+
+void pi_dump(void);
+void pi_dump(void)
+{
+	struct pipe_inode_info *pipe;
+
+	pr_crit("---------- DUMP START ----------\n");
+	mutex_lock(&PI_MUTEX);
+	list_for_each_entry(pipe, &PI_LIST, pi_list) {
+		unsigned head, tail;
+
+		mutex_lock(&pipe->mutex);
+		head = pipe->head;
+		tail = pipe->tail;
+		pr_crit("inode: %p\n", (void*)pipe);
+		pr_crit("E=%d F=%d; W=%d R=%d\n",
+			pipe_empty(head, tail), pipe_full(head, tail, pipe->max_usage),
+			pipe->w_cnt, pipe->r_cnt);
+
+// INCOMPLETE
+pr_crit("RD=%d WR=%d\n", waitqueue_active(&pipe->rd_wait), waitqueue_active(&pipe->wr_wait));
+
+		if (pipe_empty(head, tail) && waitqueue_active(&pipe->rd_wait) && waitqueue_active(&pipe->wr_wait)) {
+			pr_crit("RD waiters:\n");
+			__wait_queue_traverse_print_tasks(&pipe->rd_wait);
+			pr_crit("WR waiters:\n");
+			__wait_queue_traverse_print_tasks(&pipe->wr_wait);
+		}
+
+		for (; tail < head; tail++) {
+			struct pipe_buffer *buf = pipe_buf(pipe, tail);
+			WARN_ON(buf->ops != &anon_pipe_buf_ops);
+			pr_crit("buf: o=%d l=%d\n", buf->offset, buf->len);
+		}
+		pr_crit("\n");
+
+		mutex_unlock(&pipe->mutex);
+	}
+	mutex_unlock(&PI_MUTEX);
+	pr_crit("---------- DUMP END ------------\n");
+}
+
 static void put_pipe_info(struct inode *inode, struct pipe_inode_info *pipe)
 {
 	int kill = 0;
@@ -716,8 +794,14 @@ static void put_pipe_info(struct inode *inode, struct pipe_inode_info *pipe)
 	}
 	spin_unlock(&inode->i_lock);
 
-	if (kill)
+	if (kill) {
+		if (!list_empty(&pipe->pi_list)) {
+			mutex_lock(&PI_MUTEX);
+			list_del_init(&pipe->pi_list);
+			mutex_unlock(&PI_MUTEX);
+		}
 		free_pipe_info(pipe);
+	}
 }
 
 static int
@@ -800,6 +884,13 @@ struct pipe_inode_info *alloc_pipe_info(void)
 	if (pipe == NULL)
 		goto out_free_uid;
 
+	INIT_LIST_HEAD(&pipe->pi_list);
+	if (!strcmp(current->comm, "hackbench")) {
+		mutex_lock(&PI_MUTEX);
+		list_add_tail(&pipe->pi_list, &PI_LIST);
+		mutex_unlock(&PI_MUTEX);
+	}
+
 	if (pipe_bufs * PAGE_SIZE > max_size && !capable(CAP_SYS_RESOURCE))
 		pipe_bufs = max_size >> PAGE_SHIFT;
 
diff --git a/include/linux/pipe_fs_i.h b/include/linux/pipe_fs_i.h
index 8ff23bf5a819..48d9bf5171dc 100644
--- a/include/linux/pipe_fs_i.h
+++ b/include/linux/pipe_fs_i.h
@@ -80,6 +80,9 @@ struct pipe_inode_info {
 #ifdef CONFIG_WATCH_QUEUE
 	struct watch_queue *watch_queue;
 #endif
+
+	struct list_head pi_list;
+	unsigned w_cnt, r_cnt;
 };
 
 /*
diff --git a/include/linux/wait.h b/include/linux/wait.h
index 6d90ad974408..2c37517f6a05 100644
--- a/include/linux/wait.h
+++ b/include/linux/wait.h
@@ -215,6 +215,7 @@ void __wake_up_locked_sync_key(struct wait_queue_head *wq_head, unsigned int mod
 void __wake_up_locked(struct wait_queue_head *wq_head, unsigned int mode, int nr);
 void __wake_up_sync(struct wait_queue_head *wq_head, unsigned int mode);
 void __wake_up_pollfree(struct wait_queue_head *wq_head);
+void __wait_queue_traverse_print_tasks(struct wait_queue_head *wq_head);
 
 #define wake_up(x)			__wake_up(x, TASK_NORMAL, 1, NULL)
 #define wake_up_nr(x, nr)		__wake_up(x, TASK_NORMAL, nr, NULL)
diff --git a/kernel/sched/wait.c b/kernel/sched/wait.c
index 51e38f5f4701..8f33da87a219 100644
--- a/kernel/sched/wait.c
+++ b/kernel/sched/wait.c
@@ -174,6 +174,29 @@ void __wake_up_sync_key(struct wait_queue_head *wq_head, unsigned int mode,
 }
 EXPORT_SYMBOL_GPL(__wake_up_sync_key);
 
+void __wait_queue_traverse_print_tasks(struct wait_queue_head *wq_head)
+{
+	wait_queue_entry_t *curr, *next;
+	unsigned long flags;
+
+	if (unlikely(!wq_head))
+		return;
+
+	spin_lock_irqsave(&wq_head->lock, flags);
+	curr = list_first_entry(&wq_head->head, wait_queue_entry_t, entry);
+
+	if (&curr->entry == &wq_head->head)
+		return;
+
+	list_for_each_entry_safe_from(curr, next, &wq_head->head, entry) {
+		struct task_struct *tsk =  (struct task_struct *)curr->private;
+
+		pr_crit("%d(%s)\n", tsk->pid, tsk->comm);
+	}
+	spin_unlock_irqrestore(&wq_head->lock, flags);
+}
+EXPORT_SYMBOL_GPL(__wait_queue_traverse_print_tasks);
+
 /**
  * __wake_up_locked_sync_key - wake up a thread blocked on a locked waitqueue.
  * @wq_head: the waitqueue
diff --git a/kernel/sys.c b/kernel/sys.c
index c4c701c6f0b4..676e623d491d 100644
--- a/kernel/sys.c
+++ b/kernel/sys.c
@@ -2477,6 +2477,11 @@ SYSCALL_DEFINE5(prctl, int, option, unsigned long, arg2, unsigned long, arg3,
 
 	error = 0;
 	switch (option) {
+	case 666: {
+		extern void pi_dump(void);
+		pi_dump();
+		break;
+	}
 	case PR_SET_PDEATHSIG:
 		if (!valid_signal(arg2)) {
 			error = -EINVAL;

[Index of Archives]     [Linux Ext4 Filesystem]     [Union Filesystem]     [Filesystem Testing]     [Ceph Users]     [Ecryptfs]     [NTFS 3]     [AutoFS]     [Kernel Newbies]     [Share Photos]     [Security]     [Netfilter]     [Bugtraq]     [Yosemite News]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux Cachefs]     [Reiser Filesystem]     [Linux RAID]     [NTFS 3]     [Samba]     [Device Mapper]     [CEPH Development]

  Powered by Linux