Before this patch, the producer position could overflow `unsigned long`, in which case libbpf would forever stop processing new writes to the ringbuf. Similarly, overflows of the producer position could result in __bpf_user_ringbuf_peek not discovering available data. This patch addresses that bug by computing using the signed delta between the consumer and producer position to determine if data is available; the delta computation is robust to overflow. A more defensive check could be to ensure that the delta is within the allowed range, but such defensive checks are neither present in the kernel side code nor in libbpf. The overflow that this patch handles can occur while the producer and consumer follow a correct protocol. Secondarily, the type used to represent the positions in the user_ring_buffer functions in both libbpf and the kernel has been changed from u64 to unsigned long to match the type used in the kernel's representation of the structure. The change occurs in the same patch because it's required to align the data availability calculations between the userspace producing ringbuf and the bpf producing ringbuf. Not included in this patch, a selftest was written to demonstrate the bug, and indeed this patch allows the test to continue to make progress past the overflow. The shape of the self test was as follows: a) Set up ringbuf of 2GB size (the maximum permitted size). b) reserve+commit maximum-sized records (ULONG_MAX/4) constantly as fast as possible. With 1 million records per second repro time should be about 4.7 hours. Such a test duration is impractical to run, hence the omission. Additionally, this patch adds commentary around a separate point to note that the modular arithmetic is valid in the face of overflows, as that fact may not be obvious to future readers. v2->v3: - Changed the representation of the consumer and producer positions from u64 to unsigned long in user_ring_buffer functions. - Addressed overflow in __bpf_user_ringbuf_peek. - Changed data availability computations to use the signed delta between the consumer and producer positions rather than merely checking whether their values were unequal. v1->v2: - Fixed comment grammar. - Properly formatted subject line. Signed-off-by: Andrew Werner <awerner32@xxxxxxxxx> --- kernel/bpf/ringbuf.c | 11 ++++++++--- tools/lib/bpf/ringbuf.c | 16 +++++++++++++--- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c index f045fde632e5..0c48673520fb 100644 --- a/kernel/bpf/ringbuf.c +++ b/kernel/bpf/ringbuf.c @@ -658,7 +658,7 @@ static int __bpf_user_ringbuf_peek(struct bpf_ringbuf *rb, void **sample, u32 *s { int err; u32 hdr_len, sample_len, total_len, flags, *hdr; - u64 cons_pos, prod_pos; + unsigned long cons_pos, prod_pos; /* Synchronizes with smp_store_release() in user-space producer. */ prod_pos = smp_load_acquire(&rb->producer_pos); @@ -667,7 +667,12 @@ static int __bpf_user_ringbuf_peek(struct bpf_ringbuf *rb, void **sample, u32 *s /* Synchronizes with smp_store_release() in __bpf_user_ringbuf_sample_release() */ cons_pos = smp_load_acquire(&rb->consumer_pos); - if (cons_pos >= prod_pos) + + /* Check if there's data available by computing the signed delta between + * cons_pos and prod_pos; a negative delta indicates that the consumer has + * not caught up. This formulation is robust to prod_pos wrapping around. + */ + if ((long)(cons_pos - prod_pos) >= 0) return -ENODATA; hdr = (u32 *)((uintptr_t)rb->data + (uintptr_t)(cons_pos & rb->mask)); @@ -711,7 +716,7 @@ static int __bpf_user_ringbuf_peek(struct bpf_ringbuf *rb, void **sample, u32 *s static void __bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size, u64 flags) { - u64 consumer_pos; + unsigned long consumer_pos; u32 rounded_size = round_up(size + BPF_RINGBUF_HDR_SZ, 8); /* Using smp_load_acquire() is unnecessary here, as the busy-bit diff --git a/tools/lib/bpf/ringbuf.c b/tools/lib/bpf/ringbuf.c index 02199364db13..141030a89370 100644 --- a/tools/lib/bpf/ringbuf.c +++ b/tools/lib/bpf/ringbuf.c @@ -237,7 +237,13 @@ static int64_t ringbuf_process_ring(struct ring *r) do { got_new_data = false; prod_pos = smp_load_acquire(r->producer_pos); - while (cons_pos < prod_pos) { + + /* Check if there's data available by computing the signed delta + * between cons_pos and prod_pos; a negative delta indicates that the + * consumer has not caught up. This formulation is robust to prod_pos + * wrapping around. + */ + while ((long)(cons_pos - prod_pos) < 0) { len_ptr = r->data + (cons_pos & r->mask); len = smp_load_acquire(len_ptr); @@ -482,8 +488,7 @@ void user_ring_buffer__submit(struct user_ring_buffer *rb, void *sample) void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size) { __u32 avail_size, total_size, max_size; - /* 64-bit to avoid overflow in case of extreme application behavior */ - __u64 cons_pos, prod_pos; + unsigned long cons_pos, prod_pos; struct ringbuf_hdr *hdr; /* The top two bits are used as special flags */ @@ -498,6 +503,11 @@ void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size) prod_pos = smp_load_acquire(rb->producer_pos); max_size = rb->mask + 1; + + /* Note that this formulation is valid in the face of overflow of + * prod_pos so long as the delta between prod_pos and cons_pos is + * no greater than max_size. + */ avail_size = max_size - (prod_pos - cons_pos); /* Round up total size to a multiple of 8. */ total_size = (size + BPF_RINGBUF_HDR_SZ + 7) / 8 * 8; -- 2.39.2