Re: [PATCH bpf-next v3] libbpf: handle producer position overflow

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Fri, Aug 25, 2023 at 11:28 AM Daniel Borkmann <daniel@xxxxxxxxxxxxx> wrote:
>
> On 8/25/23 12:09 AM, Andrew Werner wrote:
> > Before this patch, the producer position could overflow `unsigned
> > long`, in which case libbpf would forever stop processing new writes to
> > the ringbuf. Similarly, overflows of the producer position could result
> > in __bpf_user_ringbuf_peek not discovering available data. This patch
> > addresses that bug by computing using the signed delta between the
> > consumer and producer position to determine if data is available; the
> > delta computation is robust to overflow.
> >
> > A more defensive check could be to ensure that the delta is within
> > the allowed range, but such defensive checks are neither present in
> > the kernel side code nor in libbpf. The overflow that this patch
> > handles can occur while the producer and consumer follow a correct
> > protocol.
> >
> > Secondarily, the type used to represent the positions in the
> > user_ring_buffer functions in both libbpf and the kernel has been
> > changed from u64 to unsigned long to match the type used in the
> > kernel's representation of the structure. The change occurs in the
>
> Hm, but won't this mismatch for 64bit kernel and 32bit user space? Why
> not fixate both on u64 instead so types are consistent?

Sure. It feels like if we do that then we'd break existing 32bit
big-endian clients, though I am not sure those exist. Concretely, the
request here would be to change the kernel structure and all library
usages to use u64, right?

>
> > same patch because it's required to align the data availability
> > calculations between the userspace producing ringbuf and the bpf
> > producing ringbuf.
> >
> > Not included in this patch, a selftest was written to demonstrate the
> > bug, and indeed this patch allows the test to continue to make progress
> > past the overflow. The shape of the self test was as follows:
> >
> >   a) Set up ringbuf of 2GB size (the maximum permitted size).
> >   b) reserve+commit maximum-sized records (ULONG_MAX/4) constantly as
> >      fast as possible.
> >
> > With 1 million records per second repro time should be about 4.7 hours.
> > Such a test duration is impractical to run, hence the omission.
> >
> > Additionally, this patch adds commentary around a separate point to note
> > that the modular arithmetic is valid in the face of overflows, as that
> > fact may not be obvious to future readers.
> >
> > v2->v3:
> >    - Changed the representation of the consumer and producer positions
> >      from u64 to unsigned long in user_ring_buffer functions.
> >    - Addressed overflow in __bpf_user_ringbuf_peek.
> >    - Changed data availability computations to use the signed delta
> >      between the consumer and producer positions rather than merely
> >      checking whether their values were unequal.
> > v1->v2:
> >    - Fixed comment grammar.
> >    - Properly formatted subject line.
> >
> > Signed-off-by: Andrew Werner <awerner32@xxxxxxxxx>
> > ---
> >   kernel/bpf/ringbuf.c    | 11 ++++++++---
> >   tools/lib/bpf/ringbuf.c | 16 +++++++++++++---
> >   2 files changed, 21 insertions(+), 6 deletions(-)
> >
> > diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c
> > index f045fde632e5..0c48673520fb 100644
> > --- a/kernel/bpf/ringbuf.c
> > +++ b/kernel/bpf/ringbuf.c
> > @@ -658,7 +658,7 @@ static int __bpf_user_ringbuf_peek(struct bpf_ringbuf *rb, void **sample, u32 *s
> >   {
> >       int err;
> >       u32 hdr_len, sample_len, total_len, flags, *hdr;
> > -     u64 cons_pos, prod_pos;
> > +     unsigned long cons_pos, prod_pos;
> >
> >       /* Synchronizes with smp_store_release() in user-space producer. */
> >       prod_pos = smp_load_acquire(&rb->producer_pos);
> > @@ -667,7 +667,12 @@ static int __bpf_user_ringbuf_peek(struct bpf_ringbuf *rb, void **sample, u32 *s
> >
> >       /* Synchronizes with smp_store_release() in __bpf_user_ringbuf_sample_release() */
> >       cons_pos = smp_load_acquire(&rb->consumer_pos);
> > -     if (cons_pos >= prod_pos)
> > +
> > +     /* Check if there's data available by computing the signed delta between
> > +      * cons_pos and prod_pos; a negative delta indicates that the consumer has
> > +      * not caught up. This formulation is robust to prod_pos wrapping around.
> > +      */
> > +     if ((long)(cons_pos - prod_pos) >= 0)
> >               return -ENODATA;
> >
> >       hdr = (u32 *)((uintptr_t)rb->data + (uintptr_t)(cons_pos & rb->mask));
> > @@ -711,7 +716,7 @@ static int __bpf_user_ringbuf_peek(struct bpf_ringbuf *rb, void **sample, u32 *s
> >
> >   static void __bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size, u64 flags)
> >   {
> > -     u64 consumer_pos;
> > +     unsigned long consumer_pos;
> >       u32 rounded_size = round_up(size + BPF_RINGBUF_HDR_SZ, 8);
> >
> >       /* Using smp_load_acquire() is unnecessary here, as the busy-bit
> > diff --git a/tools/lib/bpf/ringbuf.c b/tools/lib/bpf/ringbuf.c
> > index 02199364db13..141030a89370 100644
> > --- a/tools/lib/bpf/ringbuf.c
> > +++ b/tools/lib/bpf/ringbuf.c
> > @@ -237,7 +237,13 @@ static int64_t ringbuf_process_ring(struct ring *r)
> >       do {
> >               got_new_data = false;
> >               prod_pos = smp_load_acquire(r->producer_pos);
> > -             while (cons_pos < prod_pos) {
> > +
> > +             /* Check if there's data available by computing the signed delta
> > +              * between cons_pos and prod_pos; a negative delta indicates that the
> > +              * consumer has not caught up. This formulation is robust to prod_pos
> > +              * wrapping around.
> > +              */
> > +             while ((long)(cons_pos - prod_pos) < 0) {
> >                       len_ptr = r->data + (cons_pos & r->mask);
> >                       len = smp_load_acquire(len_ptr);
> >
> > @@ -482,8 +488,7 @@ void user_ring_buffer__submit(struct user_ring_buffer *rb, void *sample)
> >   void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size)
> >   {
> >       __u32 avail_size, total_size, max_size;
> > -     /* 64-bit to avoid overflow in case of extreme application behavior */
> > -     __u64 cons_pos, prod_pos;
> > +     unsigned long cons_pos, prod_pos;
> >       struct ringbuf_hdr *hdr;
> >
> >       /* The top two bits are used as special flags */
> > @@ -498,6 +503,11 @@ void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size)
> >       prod_pos = smp_load_acquire(rb->producer_pos);
> >
> >       max_size = rb->mask + 1;
> > +
> > +     /* Note that this formulation is valid in the face of overflow of
> > +      * prod_pos so long as the delta between prod_pos and cons_pos is
> > +      * no greater than max_size.
> > +      */
> >       avail_size = max_size - (prod_pos - cons_pos);
> >       /* Round up total size to a multiple of 8. */
> >       total_size = (size + BPF_RINGBUF_HDR_SZ + 7) / 8 * 8;
> >
>





[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux