Re: [PATCH v2 2/4] bpf: Add bpf_user_ringbuf_drain() helper

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Mon, Aug 15, 2022 at 02:23:04PM -0700, Hao Luo wrote:

Hi Hao,

Thanks for the review.

> On Thu, Aug 11, 2022 at 4:50 PM David Vernet <void@xxxxxxxxxxxxx> wrote:
> >
> > Now that we have a BPF_MAP_TYPE_USER_RINGBUF map type, we need to add a
> > helper function that allows BPF programs to drain samples from the ring
> > buffer, and invoke a callback for each. This patch adds a new
> > bpf_user_ringbuf_drain() helper that provides this abstraction.
> >
> > In order to support this, we needed to also add a new PTR_TO_DYNPTR
> > register type to reflect a dynptr that was allocated by a helper function
> > and passed to a BPF program. The verifier currently only supports
> > PTR_TO_DYNPTR registers that are also DYNPTR_TYPE_LOCAL and MEM_ALLOC.
> >
> > Signed-off-by: David Vernet <void@xxxxxxxxxxxxx>
> > ---
> [...]
> > diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c
> > index c0f3bca4bb09..73fa6ed12052 100644
> > --- a/kernel/bpf/ringbuf.c
> > +++ b/kernel/bpf/ringbuf.c
> [...]
> > +static int __bpf_user_ringbuf_poll(struct bpf_ringbuf *rb, void **sample,
> > +                                  u32 *size)
> > +{
> > +       unsigned long cons_pos, prod_pos;
> > +       u32 sample_len, total_len;
> > +       u32 *hdr;
> > +       int err;
> > +       int busy = 0;
> > +
> > +       /* If another consumer is already consuming a sample, wait for them to
> > +        * finish.
> > +        */
> > +       if (!atomic_try_cmpxchg(&rb->busy, &busy, 1))
> > +               return -EBUSY;
> > +
> > +       /* Synchronizes with smp_store_release() in user-space. */
> > +       prod_pos = smp_load_acquire(&rb->producer_pos);
> > +       /* Synchronizes with smp_store_release() in
> > +        * __bpf_user_ringbuf_sample_release().
> > +        */
> > +       cons_pos = smp_load_acquire(&rb->consumer_pos);
> > +       if (cons_pos >= prod_pos) {
> > +               atomic_set(&rb->busy, 0);
> > +               return -ENODATA;
> > +       }
> > +
> > +       hdr = (u32 *)((uintptr_t)rb->data + (uintptr_t)(cons_pos & rb->mask));
> > +       sample_len = *hdr;
> > +
>
> rb->data and rb->mask better be protected by READ_ONCE.

Could you please clarify about the behavior you're protecting against here?
We're just calculating an offset from rb->data, and both rb->data and
rb->mask are set only once when the ringbuffer is first created in
bpf_ringbuf_area_alloc(). I'm not following what we'd be protecting against
by making these volatile, though I freely admit that I may be missing some
weird possible behavior in the compiler.

For what it's worth, in a follow-on version of the patch, I've updated this
read of the sample len to be an smp_load_acquire() to accommodate Andrii's
suggestion [0] that we should support using the busy bit and discard bit in
the header from the get-go, as we do with BPF_MAP_TYPE_RINGBUF ringbuffers.

[0]: https://lore.kernel.org/all/CAEf4BzYVLgd=rHaxzZjyv0WJBzBpMqGSStgVhXG9XOHpB7qDRQ@xxxxxxxxxxxxxx/

> > +       /* Check that the sample can fit into a dynptr. */
> > +       err = bpf_dynptr_check_size(sample_len);
> > +       if (err) {
> > +               atomic_set(&rb->busy, 0);
> > +               return err;
> > +       }
> > +
> > +       /* Check that the sample fits within the region advertised by the
> > +        * consumer position.
> > +        */
> > +       total_len = sample_len + BPF_RINGBUF_HDR_SZ;
> > +       if (total_len > prod_pos - cons_pos) {
> > +               atomic_set(&rb->busy, 0);
> > +               return -E2BIG;
> > +       }
> > +
> > +       /* Check that the sample fits within the data region of the ring buffer.
> > +        */
> > +       if (total_len > rb->mask + 1) {
> > +               atomic_set(&rb->busy, 0);
> > +               return -E2BIG;
> > +       }
> > +
> > +       /* consumer_pos is updated when the sample is released.
> > +        */
> > +
> > +       *sample = (void *)((uintptr_t)rb->data +
> > +                          (uintptr_t)((cons_pos + BPF_RINGBUF_HDR_SZ) & rb->mask));
> > +       *size = sample_len;
> > +
> > +       return 0;
> > +}
> > +
> > +static void
> > +__bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size,
> > +                                 u64 flags)
> > +{
> > +
> > +
> > +       /* To release the ringbuffer, just increment the producer position to
> > +        * signal that a new sample can be consumed. The busy bit is cleared by
> > +        * userspace when posting a new sample to the ringbuffer.
> > +        */
> > +       smp_store_release(&rb->consumer_pos, rb->consumer_pos + size +
> > +                         BPF_RINGBUF_HDR_SZ);
> > +
> > +       if (flags & BPF_RB_FORCE_WAKEUP || !(flags & BPF_RB_NO_WAKEUP))
> > +               irq_work_queue(&rb->work);
> > +
> > +       atomic_set(&rb->busy, 0);
> > +}
> 
> atomic_set() doesn't imply barrier, so it could be observed before
> smp_store_release(). So the paired smp_load_acquire could observe
> rb->busy == 0 while seeing the old consumer_pos. At least, you need
> smp_mb__before_atomic() as a barrier before atomic_set. Or smp_wmb()
> to ensure all _writes_ complete when see rb->busy == 0.

Thanks for catching this. I should have been more careful to not assume the
semantics of atomic_set(), and I see now that you're of course correct that
it's just a WRITE_ONCE() and has no implications at all w.r.t. memory or
compiler barriers. I'll fix this in the follow-on version, and will give
another closer read over memory-barriers.txt and atomic_t.txt.

> Similarly rb->work could be observed before smp_store_release.

Yes, in the follow-on version I'll move the atomic_set() to before the
irq_work_queue() invocation (per Andrii's comment in [1], though that
discussion is still ongoing), and will add the missing
smp_mb__before_atomic(). Thanks again for catching this.

[1]: https://lore.kernel.org/all/CAEf4BzZ-m-AUX+1+CGr7nMxMDnT=fjkn8DP9nP21Uts1y7fMyg@xxxxxxxxxxxxxx/

> Is it possible for __bpf_user_ringbuf_sample_release to be called
> concurrently? If yes, there are races. Because the load of
> rb->consumer_pos is not protected by smp_load_acquire, they are not
> synchronized with this smp_store_release. Concurrently calling
> __bpf_user_ringbuf_sample_release may cause both threads getting stale
> consumer_pos values.

If we add smp_mb__before_atomic() per your proposed fix above, I don't
believe this is an issue. __bpf_user_ringbuf_sample_release() should only
be invoked when a caller has an unreleased sample, and that can only happen
in a serial context due to the protection afforded by the atomic busy bit.

A single caller will not see a stale value, as they must first invoke
__bpf_user_ringbuf_peek(), and then invoke
__bpf_user_ringbuf_sample_release() with the sample they received. The
consumer pos they read in __bpf_user_ringbuf_sample_release() was already
smp_load_acquire()'d in __bpf_user_ringbuf_peek(), so they shouldn't see a
stale value there. We could certainly add another smp_load_acquire() here
for safety, but it would be redundant.

Thanks,
David



[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux