On Fri, Aug 25, 2023 at 11:28 AM Daniel Borkmann <daniel@xxxxxxxxxxxxx> wrote: > > On 8/25/23 12:09 AM, Andrew Werner wrote: > > Before this patch, the producer position could overflow `unsigned > > long`, in which case libbpf would forever stop processing new writes to > > the ringbuf. Similarly, overflows of the producer position could result > > in __bpf_user_ringbuf_peek not discovering available data. This patch > > addresses that bug by computing using the signed delta between the > > consumer and producer position to determine if data is available; the > > delta computation is robust to overflow. > > > > A more defensive check could be to ensure that the delta is within > > the allowed range, but such defensive checks are neither present in > > the kernel side code nor in libbpf. The overflow that this patch > > handles can occur while the producer and consumer follow a correct > > protocol. > > > > Secondarily, the type used to represent the positions in the > > user_ring_buffer functions in both libbpf and the kernel has been > > changed from u64 to unsigned long to match the type used in the > > kernel's representation of the structure. The change occurs in the > > Hm, but won't this mismatch for 64bit kernel and 32bit user space? Why > not fixate both on u64 instead so types are consistent? Sure. It feels like if we do that then we'd break existing 32bit big-endian clients, though I am not sure those exist. Concretely, the request here would be to change the kernel structure and all library usages to use u64, right? > > > same patch because it's required to align the data availability > > calculations between the userspace producing ringbuf and the bpf > > producing ringbuf. > > > > Not included in this patch, a selftest was written to demonstrate the > > bug, and indeed this patch allows the test to continue to make progress > > past the overflow. The shape of the self test was as follows: > > > > a) Set up ringbuf of 2GB size (the maximum permitted size). > > b) reserve+commit maximum-sized records (ULONG_MAX/4) constantly as > > fast as possible. > > > > With 1 million records per second repro time should be about 4.7 hours. > > Such a test duration is impractical to run, hence the omission. > > > > Additionally, this patch adds commentary around a separate point to note > > that the modular arithmetic is valid in the face of overflows, as that > > fact may not be obvious to future readers. > > > > v2->v3: > > - Changed the representation of the consumer and producer positions > > from u64 to unsigned long in user_ring_buffer functions. > > - Addressed overflow in __bpf_user_ringbuf_peek. > > - Changed data availability computations to use the signed delta > > between the consumer and producer positions rather than merely > > checking whether their values were unequal. > > v1->v2: > > - Fixed comment grammar. > > - Properly formatted subject line. > > > > Signed-off-by: Andrew Werner <awerner32@xxxxxxxxx> > > --- > > kernel/bpf/ringbuf.c | 11 ++++++++--- > > tools/lib/bpf/ringbuf.c | 16 +++++++++++++--- > > 2 files changed, 21 insertions(+), 6 deletions(-) > > > > diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c > > index f045fde632e5..0c48673520fb 100644 > > --- a/kernel/bpf/ringbuf.c > > +++ b/kernel/bpf/ringbuf.c > > @@ -658,7 +658,7 @@ static int __bpf_user_ringbuf_peek(struct bpf_ringbuf *rb, void **sample, u32 *s > > { > > int err; > > u32 hdr_len, sample_len, total_len, flags, *hdr; > > - u64 cons_pos, prod_pos; > > + unsigned long cons_pos, prod_pos; > > > > /* Synchronizes with smp_store_release() in user-space producer. */ > > prod_pos = smp_load_acquire(&rb->producer_pos); > > @@ -667,7 +667,12 @@ static int __bpf_user_ringbuf_peek(struct bpf_ringbuf *rb, void **sample, u32 *s > > > > /* Synchronizes with smp_store_release() in __bpf_user_ringbuf_sample_release() */ > > cons_pos = smp_load_acquire(&rb->consumer_pos); > > - if (cons_pos >= prod_pos) > > + > > + /* Check if there's data available by computing the signed delta between > > + * cons_pos and prod_pos; a negative delta indicates that the consumer has > > + * not caught up. This formulation is robust to prod_pos wrapping around. > > + */ > > + if ((long)(cons_pos - prod_pos) >= 0) > > return -ENODATA; > > > > hdr = (u32 *)((uintptr_t)rb->data + (uintptr_t)(cons_pos & rb->mask)); > > @@ -711,7 +716,7 @@ static int __bpf_user_ringbuf_peek(struct bpf_ringbuf *rb, void **sample, u32 *s > > > > static void __bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size, u64 flags) > > { > > - u64 consumer_pos; > > + unsigned long consumer_pos; > > u32 rounded_size = round_up(size + BPF_RINGBUF_HDR_SZ, 8); > > > > /* Using smp_load_acquire() is unnecessary here, as the busy-bit > > diff --git a/tools/lib/bpf/ringbuf.c b/tools/lib/bpf/ringbuf.c > > index 02199364db13..141030a89370 100644 > > --- a/tools/lib/bpf/ringbuf.c > > +++ b/tools/lib/bpf/ringbuf.c > > @@ -237,7 +237,13 @@ static int64_t ringbuf_process_ring(struct ring *r) > > do { > > got_new_data = false; > > prod_pos = smp_load_acquire(r->producer_pos); > > - while (cons_pos < prod_pos) { > > + > > + /* Check if there's data available by computing the signed delta > > + * between cons_pos and prod_pos; a negative delta indicates that the > > + * consumer has not caught up. This formulation is robust to prod_pos > > + * wrapping around. > > + */ > > + while ((long)(cons_pos - prod_pos) < 0) { > > len_ptr = r->data + (cons_pos & r->mask); > > len = smp_load_acquire(len_ptr); > > > > @@ -482,8 +488,7 @@ void user_ring_buffer__submit(struct user_ring_buffer *rb, void *sample) > > void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size) > > { > > __u32 avail_size, total_size, max_size; > > - /* 64-bit to avoid overflow in case of extreme application behavior */ > > - __u64 cons_pos, prod_pos; > > + unsigned long cons_pos, prod_pos; > > struct ringbuf_hdr *hdr; > > > > /* The top two bits are used as special flags */ > > @@ -498,6 +503,11 @@ void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size) > > prod_pos = smp_load_acquire(rb->producer_pos); > > > > max_size = rb->mask + 1; > > + > > + /* Note that this formulation is valid in the face of overflow of > > + * prod_pos so long as the delta between prod_pos and cons_pos is > > + * no greater than max_size. > > + */ > > avail_size = max_size - (prod_pos - cons_pos); > > /* Round up total size to a multiple of 8. */ > > total_size = (size + BPF_RINGBUF_HDR_SZ + 7) / 8 * 8; > > >