Hi Oleg,
On 2/28/2025 10:03 PM, Oleg Nesterov wrote:
And... I know, I know you already hate me ;)
Not at all :)
but if you have time, could you check if this patch (with or without the
previous debugging patch) makes any difference? Just to be sure.
Sure, I will give this a try.
But in the meanwhile me and Prateek tried some of the experiments in the weekend.
We were able to reproduce this issue on a third generation EPYC system as well as
on an Intel Emerald Rapids (2 X INTEL(R) XEON(R) PLATINUM 8592+).
We tried heavy hammered tracing approach over the weekend on top of your debug patch.
I have attached the debug patch below. With tracing we found the following case for
pipe_writable():
hackbench-118768 [206] ..... 1029.550601: pipe_write: 000000005eea28ff: 0: 37 38 16: 1
Here,
head = 37
tail = 38
max_usage = 16
pipe_full() returns 1.
Between reading of head and later the tail, the tail seems to have moved ahead of the
head leading to wraparound. Applying the following changes I have not yet run into a
hang on the original machine where I first saw it:
diff --git a/fs/pipe.c b/fs/pipe.c
index ce1af7592780..a1931c817822 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -417,9 +417,19 @@ static inline int is_packetized(struct file *file)
/* Done while waiting without holding the pipe lock - thus the READ_ONCE() */
static inline bool pipe_writable(const struct pipe_inode_info *pipe)
{
- unsigned int head = READ_ONCE(pipe->head);
- unsigned int tail = READ_ONCE(pipe->tail);
unsigned int max_usage = READ_ONCE(pipe->max_usage);
+ unsigned int head, tail;
+
+ tail = READ_ONCE(pipe->tail);
+ /*
+ * Since the unsigned arithmetic in this lockless preemptible context
+ * relies on the fact that the tail can never be ahead of head, read
+ * the head after the tail to ensure we've not missed any updates to
+ * the head. Reordering the reads can cause wraparounds and give the
+ * illusion that the pipe is full.
+ */
+ smp_rmb();
+ head = READ_ONCE(pipe->head);
return !pipe_full(head, tail, max_usage) ||
!READ_ONCE(pipe->readers);
---
smp_rmb() on x86 is a nop and even without the barrier we were not able to
reproduce the hang even after 10000 iterations.
If you think this is a genuine bug fix, I will send a patch for this.
Thanks to Prateek who was actively involved in this debug.
--
Thanks and Regards,
Swapnil
Oleg.
---
diff --git a/fs/pipe.c b/fs/pipe.c
index 4336b8cccf84..524b8845523e 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -445,7 +445,7 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *from)
return 0;
mutex_lock(&pipe->mutex);
-
+again:
if (!pipe->readers) {
send_sig(SIGPIPE, current, 0);
ret = -EPIPE;
@@ -467,20 +467,24 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *from)
unsigned int mask = pipe->ring_size - 1;
struct pipe_buffer *buf = &pipe->bufs[(head - 1) & mask];
int offset = buf->offset + buf->len;
+ int xxx;
if ((buf->flags & PIPE_BUF_FLAG_CAN_MERGE) &&
offset + chars <= PAGE_SIZE) {
- ret = pipe_buf_confirm(pipe, buf);
- if (ret)
+ xxx = pipe_buf_confirm(pipe, buf);
+ if (xxx) {
+ if (!ret) ret = xxx;
goto out;
+ }
- ret = copy_page_from_iter(buf->page, offset, chars, from);
- if (unlikely(ret < chars)) {
- ret = -EFAULT;
+ xxx = copy_page_from_iter(buf->page, offset, chars, from);
+ if (unlikely(xxx < chars)) {
+ if (!ret) ret = -EFAULT;
goto out;
}
- buf->len += ret;
+ ret += xxx;
+ buf->len += xxx;
if (!iov_iter_count(from))
goto out;
}
@@ -567,6 +571,7 @@ atomic_inc(&WR_SLEEP);
mutex_lock(&pipe->mutex);
was_empty = pipe_empty(pipe->head, pipe->tail);
wake_next_writer = true;
+ goto again;
}
out:
if (pipe_full(pipe->head, pipe->tail, pipe->max_usage))
diff --git a/fs/pipe.c b/fs/pipe.c
index 82fede0f2111..a0b737a8b8f9 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -217,6 +217,20 @@ static inline bool pipe_readable(const struct pipe_inode_info *pipe)
return !pipe_empty(head, tail) || !writers;
}
+/* Done while waiting without holding the pipe lock - thus the READ_ONCE() */
+static inline bool pipe_readable_sleep_check(const struct pipe_inode_info *pipe)
+{
+ unsigned int head = READ_ONCE(pipe->head);
+ unsigned int tail = READ_ONCE(pipe->tail);
+ unsigned int writers = READ_ONCE(pipe->writers);
+ bool empty = pipe_empty(head, tail);
+ bool ret = !empty || !writers;
+
+ trace_printk("%p: %d: %u %u: %d\n", (void*)pipe, ret, head, tail, empty);
+
+ return ret;
+}
+
static inline unsigned int pipe_update_tail(struct pipe_inode_info *pipe,
struct pipe_buffer *buf,
unsigned int tail)
@@ -243,6 +257,7 @@ static inline unsigned int pipe_update_tail(struct pipe_inode_info *pipe,
* Without a watch_queue, we can simply increment the tail
* without the spinlock - the mutex is enough.
*/
+ trace_printk("%p: t: %u -> %u\n", (void*)pipe, pipe->tail, pipe->tail + 1);
pipe->tail = ++tail;
return tail;
}
@@ -388,7 +403,7 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
* since we've done any required wakeups and there's no need
* to mark anything accessed. And we've dropped the lock.
*/
- if (wait_event_interruptible_exclusive(pipe->rd_wait, pipe_readable(pipe)) < 0)
+ if (wait_event_interruptible_exclusive(pipe->rd_wait, pipe_readable_sleep_check(pipe)) < 0)
return -ERESTARTSYS;
wake_writer = false;
@@ -397,6 +412,8 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
}
if (pipe_empty(pipe->head, pipe->tail))
wake_next_reader = false;
+ if (ret > 0)
+ pipe->r_cnt++;
mutex_unlock(&pipe->mutex);
if (wake_writer)
@@ -425,6 +442,19 @@ static inline bool pipe_writable(const struct pipe_inode_info *pipe)
!READ_ONCE(pipe->readers);
}
+/* Done while waiting without holding the pipe lock - thus the READ_ONCE() */
+static inline bool pipe_writable_sleep_check(const struct pipe_inode_info *pipe)
+{
+ unsigned int head = READ_ONCE(pipe->head);
+ unsigned int tail = READ_ONCE(pipe->tail);
+ unsigned int max_usage = READ_ONCE(pipe->max_usage);
+ bool full = pipe_full(head, tail, max_usage);
+ bool ret = !full || !READ_ONCE(pipe->readers);
+
+ trace_printk("%p: %d: %u %u %u: %d\n", (void*)pipe, ret, head, tail, max_usage, full);
+ return ret;
+}
+
static ssize_t
pipe_write(struct kiocb *iocb, struct iov_iter *from)
{
@@ -490,6 +520,7 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
}
buf->len += ret;
+ trace_printk("%p: m: %u\n", (void*)pipe, head);
if (!iov_iter_count(from))
goto out;
}
@@ -525,6 +556,7 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
* be there for the next write.
*/
pipe->head = head + 1;
+ trace_printk("%p: h: %u -> %u\n", (void*)pipe, head, head + 1);
/* Insert it into the buffer array */
buf = &pipe->bufs[head & mask];
@@ -577,7 +609,7 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
if (was_empty)
wake_up_interruptible_sync_poll(&pipe->rd_wait, EPOLLIN | EPOLLRDNORM);
kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
- wait_event_interruptible_exclusive(pipe->wr_wait, pipe_writable(pipe));
+ wait_event_interruptible_exclusive(pipe->wr_wait, pipe_writable_sleep_check(pipe));
mutex_lock(&pipe->mutex);
was_empty = pipe_empty(pipe->head, pipe->tail);
wake_next_writer = true;
@@ -585,6 +617,8 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
out:
if (pipe_full(pipe->head, pipe->tail, pipe->max_usage))
wake_next_writer = false;
+ if (ret > 0)
+ pipe->w_cnt++;
mutex_unlock(&pipe->mutex);
/*
@@ -705,6 +739,50 @@ pipe_poll(struct file *filp, poll_table *wait)
return mask;
}
+static DEFINE_MUTEX(PI_MUTEX);
+static LIST_HEAD(PI_LIST);
+
+void pi_dump(void);
+void pi_dump(void)
+{
+ struct pipe_inode_info *pipe;
+
+ pr_crit("---------- DUMP START ----------\n");
+ mutex_lock(&PI_MUTEX);
+ list_for_each_entry(pipe, &PI_LIST, pi_list) {
+ unsigned head, tail;
+
+ mutex_lock(&pipe->mutex);
+ head = pipe->head;
+ tail = pipe->tail;
+ pr_crit("inode: %p\n", (void*)pipe);
+ pr_crit("E=%d F=%d; W=%d R=%d\n",
+ pipe_empty(head, tail), pipe_full(head, tail, pipe->max_usage),
+ pipe->w_cnt, pipe->r_cnt);
+
+// INCOMPLETE
+pr_crit("RD=%d WR=%d\n", waitqueue_active(&pipe->rd_wait), waitqueue_active(&pipe->wr_wait));
+
+ if (pipe_empty(head, tail) && waitqueue_active(&pipe->rd_wait) && waitqueue_active(&pipe->wr_wait)) {
+ pr_crit("RD waiters:\n");
+ __wait_queue_traverse_print_tasks(&pipe->rd_wait);
+ pr_crit("WR waiters:\n");
+ __wait_queue_traverse_print_tasks(&pipe->wr_wait);
+ }
+
+ for (; tail < head; tail++) {
+ struct pipe_buffer *buf = pipe_buf(pipe, tail);
+ WARN_ON(buf->ops != &anon_pipe_buf_ops);
+ pr_crit("buf: o=%d l=%d\n", buf->offset, buf->len);
+ }
+ pr_crit("\n");
+
+ mutex_unlock(&pipe->mutex);
+ }
+ mutex_unlock(&PI_MUTEX);
+ pr_crit("---------- DUMP END ------------\n");
+}
+
static void put_pipe_info(struct inode *inode, struct pipe_inode_info *pipe)
{
int kill = 0;
@@ -716,8 +794,14 @@ static void put_pipe_info(struct inode *inode, struct pipe_inode_info *pipe)
}
spin_unlock(&inode->i_lock);
- if (kill)
+ if (kill) {
+ if (!list_empty(&pipe->pi_list)) {
+ mutex_lock(&PI_MUTEX);
+ list_del_init(&pipe->pi_list);
+ mutex_unlock(&PI_MUTEX);
+ }
free_pipe_info(pipe);
+ }
}
static int
@@ -800,6 +884,13 @@ struct pipe_inode_info *alloc_pipe_info(void)
if (pipe == NULL)
goto out_free_uid;
+ INIT_LIST_HEAD(&pipe->pi_list);
+ if (!strcmp(current->comm, "hackbench")) {
+ mutex_lock(&PI_MUTEX);
+ list_add_tail(&pipe->pi_list, &PI_LIST);
+ mutex_unlock(&PI_MUTEX);
+ }
+
if (pipe_bufs * PAGE_SIZE > max_size && !capable(CAP_SYS_RESOURCE))
pipe_bufs = max_size >> PAGE_SHIFT;
diff --git a/include/linux/pipe_fs_i.h b/include/linux/pipe_fs_i.h
index 8ff23bf5a819..48d9bf5171dc 100644
--- a/include/linux/pipe_fs_i.h
+++ b/include/linux/pipe_fs_i.h
@@ -80,6 +80,9 @@ struct pipe_inode_info {
#ifdef CONFIG_WATCH_QUEUE
struct watch_queue *watch_queue;
#endif
+
+ struct list_head pi_list;
+ unsigned w_cnt, r_cnt;
};
/*
diff --git a/include/linux/wait.h b/include/linux/wait.h
index 6d90ad974408..2c37517f6a05 100644
--- a/include/linux/wait.h
+++ b/include/linux/wait.h
@@ -215,6 +215,7 @@ void __wake_up_locked_sync_key(struct wait_queue_head *wq_head, unsigned int mod
void __wake_up_locked(struct wait_queue_head *wq_head, unsigned int mode, int nr);
void __wake_up_sync(struct wait_queue_head *wq_head, unsigned int mode);
void __wake_up_pollfree(struct wait_queue_head *wq_head);
+void __wait_queue_traverse_print_tasks(struct wait_queue_head *wq_head);
#define wake_up(x) __wake_up(x, TASK_NORMAL, 1, NULL)
#define wake_up_nr(x, nr) __wake_up(x, TASK_NORMAL, nr, NULL)
diff --git a/kernel/sched/wait.c b/kernel/sched/wait.c
index 51e38f5f4701..8f33da87a219 100644
--- a/kernel/sched/wait.c
+++ b/kernel/sched/wait.c
@@ -174,6 +174,29 @@ void __wake_up_sync_key(struct wait_queue_head *wq_head, unsigned int mode,
}
EXPORT_SYMBOL_GPL(__wake_up_sync_key);
+void __wait_queue_traverse_print_tasks(struct wait_queue_head *wq_head)
+{
+ wait_queue_entry_t *curr, *next;
+ unsigned long flags;
+
+ if (unlikely(!wq_head))
+ return;
+
+ spin_lock_irqsave(&wq_head->lock, flags);
+ curr = list_first_entry(&wq_head->head, wait_queue_entry_t, entry);
+
+ if (&curr->entry == &wq_head->head)
+ return;
+
+ list_for_each_entry_safe_from(curr, next, &wq_head->head, entry) {
+ struct task_struct *tsk = (struct task_struct *)curr->private;
+
+ pr_crit("%d(%s)\n", tsk->pid, tsk->comm);
+ }
+ spin_unlock_irqrestore(&wq_head->lock, flags);
+}
+EXPORT_SYMBOL_GPL(__wait_queue_traverse_print_tasks);
+
/**
* __wake_up_locked_sync_key - wake up a thread blocked on a locked waitqueue.
* @wq_head: the waitqueue
diff --git a/kernel/sys.c b/kernel/sys.c
index c4c701c6f0b4..676e623d491d 100644
--- a/kernel/sys.c
+++ b/kernel/sys.c
@@ -2477,6 +2477,11 @@ SYSCALL_DEFINE5(prctl, int, option, unsigned long, arg2, unsigned long, arg3,
error = 0;
switch (option) {
+ case 666: {
+ extern void pi_dump(void);
+ pi_dump();
+ break;
+ }
case PR_SET_PDEATHSIG:
if (!valid_signal(arg2)) {
error = -EINVAL;