Hi, On 8/25/2023 6:09 AM, Andrew Werner wrote: > Before this patch, the producer position could overflow `unsigned > long`, in which case libbpf would forever stop processing new writes to > the ringbuf. Similarly, overflows of the producer position could result > in __bpf_user_ringbuf_peek not discovering available data. This patch > addresses that bug by computing using the signed delta between the > consumer and producer position to determine if data is available; the > delta computation is robust to overflow. > > A more defensive check could be to ensure that the delta is within > the allowed range, but such defensive checks are neither present in > the kernel side code nor in libbpf. The overflow that this patch > handles can occur while the producer and consumer follow a correct > protocol. > > Secondarily, the type used to represent the positions in the > user_ring_buffer functions in both libbpf and the kernel has been > changed from u64 to unsigned long to match the type used in the > kernel's representation of the structure. The change occurs in the > same patch because it's required to align the data availability > calculations between the userspace producing ringbuf and the bpf > producing ringbuf. Because the changes include both the change for ring buffer and user ring buffer. I think it is better to split the changes into three patches to ease the backports of these changes: one patch for change in libbpf for ring buffer, and another two patches for changes in libbpf and kernel for user ring buffer. > > Not included in this patch, a selftest was written to demonstrate the > bug, and indeed this patch allows the test to continue to make progress > past the overflow. The shape of the self test was as follows: > > a) Set up ringbuf of 2GB size (the maximum permitted size). > b) reserve+commit maximum-sized records (ULONG_MAX/4) constantly as > fast as possible. ULONG_MAX -> UINT_MAX ? > > With 1 million records per second repro time should be about 4.7 hours. > Such a test duration is impractical to run, hence the omission. > > Additionally, this patch adds commentary around a separate point to note > that the modular arithmetic is valid in the face of overflows, as that > fact may not be obvious to future readers. > > v2->v3: > - Changed the representation of the consumer and producer positions > from u64 to unsigned long in user_ring_buffer functions. > - Addressed overflow in __bpf_user_ringbuf_peek. > - Changed data availability computations to use the signed delta > between the consumer and producer positions rather than merely > checking whether their values were unequal. > v1->v2: > - Fixed comment grammar. > - Properly formatted subject line. > > Signed-off-by: Andrew Werner <awerner32@xxxxxxxxx> > --- > kernel/bpf/ringbuf.c | 11 ++++++++--- > tools/lib/bpf/ringbuf.c | 16 +++++++++++++--- > 2 files changed, 21 insertions(+), 6 deletions(-) Otherwise, these changes look good to me: Acked-by: Hou Tao <houtao1@xxxxxxxxxx> > > diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c > index f045fde632e5..0c48673520fb 100644 > --- a/kernel/bpf/ringbuf.c > +++ b/kernel/bpf/ringbuf.c > @@ -658,7 +658,7 @@ static int __bpf_user_ringbuf_peek(struct bpf_ringbuf *rb, void **sample, u32 *s > { > int err; > u32 hdr_len, sample_len, total_len, flags, *hdr; > - u64 cons_pos, prod_pos; > + unsigned long cons_pos, prod_pos; > > /* Synchronizes with smp_store_release() in user-space producer. */ > prod_pos = smp_load_acquire(&rb->producer_pos); > @@ -667,7 +667,12 @@ static int __bpf_user_ringbuf_peek(struct bpf_ringbuf *rb, void **sample, u32 *s > > /* Synchronizes with smp_store_release() in __bpf_user_ringbuf_sample_release() */ > cons_pos = smp_load_acquire(&rb->consumer_pos); > - if (cons_pos >= prod_pos) > + > + /* Check if there's data available by computing the signed delta between > + * cons_pos and prod_pos; a negative delta indicates that the consumer has > + * not caught up. This formulation is robust to prod_pos wrapping around. > + */ > + if ((long)(cons_pos - prod_pos) >= 0) > return -ENODATA; > > hdr = (u32 *)((uintptr_t)rb->data + (uintptr_t)(cons_pos & rb->mask)); > @@ -711,7 +716,7 @@ static int __bpf_user_ringbuf_peek(struct bpf_ringbuf *rb, void **sample, u32 *s > > static void __bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size, u64 flags) > { > - u64 consumer_pos; > + unsigned long consumer_pos; > u32 rounded_size = round_up(size + BPF_RINGBUF_HDR_SZ, 8); > > /* Using smp_load_acquire() is unnecessary here, as the busy-bit > diff --git a/tools/lib/bpf/ringbuf.c b/tools/lib/bpf/ringbuf.c > index 02199364db13..141030a89370 100644 > --- a/tools/lib/bpf/ringbuf.c > +++ b/tools/lib/bpf/ringbuf.c > @@ -237,7 +237,13 @@ static int64_t ringbuf_process_ring(struct ring *r) > do { > got_new_data = false; > prod_pos = smp_load_acquire(r->producer_pos); > - while (cons_pos < prod_pos) { > + > + /* Check if there's data available by computing the signed delta > + * between cons_pos and prod_pos; a negative delta indicates that the > + * consumer has not caught up. This formulation is robust to prod_pos > + * wrapping around. > + */ > + while ((long)(cons_pos - prod_pos) < 0) { > len_ptr = r->data + (cons_pos & r->mask); > len = smp_load_acquire(len_ptr); > > @@ -482,8 +488,7 @@ void user_ring_buffer__submit(struct user_ring_buffer *rb, void *sample) > void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size) > { > __u32 avail_size, total_size, max_size; > - /* 64-bit to avoid overflow in case of extreme application behavior */ > - __u64 cons_pos, prod_pos; > + unsigned long cons_pos, prod_pos; > struct ringbuf_hdr *hdr; > > /* The top two bits are used as special flags */ > @@ -498,6 +503,11 @@ void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size) > prod_pos = smp_load_acquire(rb->producer_pos); > > max_size = rb->mask + 1; > + > + /* Note that this formulation is valid in the face of overflow of > + * prod_pos so long as the delta between prod_pos and cons_pos is > + * no greater than max_size. > + */ > avail_size = max_size - (prod_pos - cons_pos); > /* Round up total size to a multiple of 8. */ > total_size = (size + BPF_RINGBUF_HDR_SZ + 7) / 8 * 8;