Re: [PATCH 3/5] bpf: Add bpf_user_ringbuf_drain() helper

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Thu, Aug 11, 2022 at 04:22:43PM -0700, Andrii Nakryiko wrote:
> On Mon, Aug 8, 2022 at 8:54 AM David Vernet <void@xxxxxxxxxxxxx> wrote:
> >
> > Now that we have a BPF_MAP_TYPE_USER_RINGBUF map type, we need to add a
> > helper function that allows BPF programs to drain samples from the ring
> > buffer, and invoke a callback for each. This patch adds a new
> > bpf_user_ringbuf_drain() helper that provides this abstraction.
> >
> > In order to support this, we needed to also add a new PTR_TO_DYNPTR
> > register type to reflect a dynptr that was allocated by a helper function
> > and passed to a BPF program. The verifier currently only supports
> > PTR_TO_DYNPTR registers that are also DYNPTR_TYPE_LOCAL and MEM_ALLOC.
> 
> This commit message is a bit too laconic. There is a lot of
> implications of various parts of this patch, it would be great to
> highlight most important ones. Please consider elaborating a bit more.

Argh, sent my last email too early that only responded to this too early.
I'll do this in v3, as mentioned in my other email.

> > diff --git a/include/uapi/linux/bpf.h b/include/uapi/linux/bpf.h
> > index a341f877b230..ca125648d7fd 100644
> > --- a/include/uapi/linux/bpf.h
> > +++ b/include/uapi/linux/bpf.h
> > @@ -5332,6 +5332,13 @@ union bpf_attr {
> >   *             **-EACCES** if the SYN cookie is not valid.
> >   *
> >   *             **-EPROTONOSUPPORT** if CONFIG_IPV6 is not builtin.
> > + *
> > + * long bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void *ctx, u64 flags)
> > + *     Description
> > + *             Drain samples from the specified user ringbuffer, and invoke the
> > + *             provided callback for each such sample.
> 
> please specify what's the expected signature of callback_fn

Will do, unfortunatley we're inconsistent in doing this in other helper
functions, so it wasn't clear from context.

> > + *     Return
> > + *             An error if a sample could not be drained.
> 
> Negative error, right? And might be worth it briefly describing what
> are the situation(s) when you won't be able to drain a sample?
> 
> Also please clarify if having no sample to drain is an error or not?
> 
> It's also important to specify that if no error (and -ENODATA
> shouldn't be an error, actually) occurred then we get number of
> consumed samples back.

Agreed, I'll add more data here in the next version.

[...]

> > +
> > +static __poll_t ringbuf_map_poll_kern(struct bpf_map *map, struct file *filp,
> > +                                     struct poll_table_struct *pts)
> > +{
> > +       return ringbuf_map_poll(map, filp, pts, true);
> > +}
> > +
> > +static __poll_t ringbuf_map_poll_user(struct bpf_map *map, struct file *filp,
> > +                                     struct poll_table_struct *pts)
> > +{
> > +       return ringbuf_map_poll(map, filp, pts, false);
> >  }
> 
> This is an even stronger case where I think we should keep two
> implementations completely separate. Please keep existing
> ringbuf_map_poll as is and just add a user variant as a separate
> implementation

Agreed, I'll split both this and  into separate functions (and the mmaps)
into separate functions.

> >  BTF_ID_LIST_SINGLE(ringbuf_map_btf_ids, struct, bpf_ringbuf_map)
> > @@ -309,7 +326,7 @@ const struct bpf_map_ops ringbuf_map_ops = {
> >         .map_alloc = ringbuf_map_alloc,
> >         .map_free = ringbuf_map_free,
> >         .map_mmap = ringbuf_map_mmap_kern,
> > -       .map_poll = ringbuf_map_poll,
> > +       .map_poll = ringbuf_map_poll_kern,
> >         .map_lookup_elem = ringbuf_map_lookup_elem,
> >         .map_update_elem = ringbuf_map_update_elem,
> >         .map_delete_elem = ringbuf_map_delete_elem,
> > @@ -323,6 +340,7 @@ const struct bpf_map_ops user_ringbuf_map_ops = {
> >         .map_alloc = ringbuf_map_alloc,
> >         .map_free = ringbuf_map_free,
> >         .map_mmap = ringbuf_map_mmap_user,
> > +       .map_poll = ringbuf_map_poll_user,
> >         .map_lookup_elem = ringbuf_map_lookup_elem,
> >         .map_update_elem = ringbuf_map_update_elem,
> >         .map_delete_elem = ringbuf_map_delete_elem,
> > @@ -605,3 +623,132 @@ const struct bpf_func_proto bpf_ringbuf_discard_dynptr_proto = {
> >         .arg1_type      = ARG_PTR_TO_DYNPTR | DYNPTR_TYPE_RINGBUF | OBJ_RELEASE,
> >         .arg2_type      = ARG_ANYTHING,
> >  };
> > +
> > +static int __bpf_user_ringbuf_poll(struct bpf_ringbuf *rb, void **sample,
> > +                                  u32 *size)
> 
> "poll" part is quite confusing as it has nothing to do with epoll and
> the other poll implementation. Maybe "peek"? It peek into next sample
> without consuming it, seems appropriate
> 
> nit: this declaration can also stay on single line

Yeah, I agree that "poll" is confusing. I think "peek" is a good option. I
was also considering "consume", but I don't think that makes sense given
that we're not actually done consuming the sample until we release it.

> > +{
> > +       unsigned long cons_pos, prod_pos;
> > +       u32 sample_len, total_len;
> > +       u32 *hdr;
> > +       int err;
> > +       int busy = 0;
> 
> nit: combine matching types:
> 
> u32 sample_len, total_len, *hdr;
> int err, busy = 0;

Ack.

> > +
> > +       /* If another consumer is already consuming a sample, wait for them to
> > +        * finish.
> > +        */
> > +       if (!atomic_try_cmpxchg(&rb->busy, &busy, 1))
> > +               return -EBUSY;
> > +
> > +       /* Synchronizes with smp_store_release() in user-space. */
> > +       prod_pos = smp_load_acquire(&rb->producer_pos);
> 
> I think we should enforce that prod_pos is a multiple of 8

Agreed, I'll add a check and selftest for this.

> > +       /* Synchronizes with smp_store_release() in
> > +        * __bpf_user_ringbuf_sample_release().
> > +        */
> > +       cons_pos = smp_load_acquire(&rb->consumer_pos);
> > +       if (cons_pos >= prod_pos) {
> > +               atomic_set(&rb->busy, 0);
> > +               return -ENODATA;
> > +       }
> > +
> > +       hdr = (u32 *)((uintptr_t)rb->data + (cons_pos & rb->mask));
> > +       sample_len = *hdr;
> 
> do we need smp_load_acquire() here? libbpf's ring_buffer
> implementation uses load_acquire here

I thought about this when I was first adding the logic, but I can't
convince myself that it's necessary and wasn't able to figure out why we
did a load acquire on the len in libbpf. The kernel doesn't do a store
release on the header, so I'm not sure what the load acquire in libbpf
actually accomplishes. I could certainly be missing something, but I
_think_ the important thing is that we have load-acquire / store-release
pairs for the consumer and producer positions.

> > +       /* Check that the sample can fit into a dynptr. */
> > +       err = bpf_dynptr_check_size(sample_len);
> > +       if (err) {
> > +               atomic_set(&rb->busy, 0);
> > +               return err;
> > +       }
> > +
> > +       /* Check that the sample fits within the region advertised by the
> > +        * consumer position.
> > +        */
> > +       total_len = sample_len + BPF_RINGBUF_HDR_SZ;
> 
> round up to closest multiple of 8? All the pointers into ringbuf data
> area should be 8-byte aligned

Will do.

> > +       if (total_len > prod_pos - cons_pos) {
> > +               atomic_set(&rb->busy, 0);
> > +               return -E2BIG;
> > +       }
> > +
> > +       /* Check that the sample fits within the data region of the ring buffer.
> > +        */
> > +       if (total_len > rb->mask + 1) {
> > +               atomic_set(&rb->busy, 0);
> > +               return -E2BIG;
> > +       }
> > +
> > +       /* consumer_pos is updated when the sample is released.
> > +        */
> > +
> 
> nit: unnecessary empty line
> 
> and please keep single-line comments as single-line, no */ on separate
> line in such case

Will do.

> > +       *sample = (void *)((uintptr_t)rb->data +
> > +                          ((cons_pos + BPF_RINGBUF_HDR_SZ) & rb->mask));
> > +       *size = sample_len;
> > +
> > +       return 0;
> > +}
> > +
> > +static void
> > +__bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size,
> > +                                 u64 flags)
> 
> try to keep single lines if they are under 100 characters

Ack, this seems to really differ by subsystem. I'll follow this norm for
BPF.

> > +{
> > +
> > +
> 
> empty lines, why?

Apologies, thanks for catching this.

> > +       /* To release the ringbuffer, just increment the producer position to
> > +        * signal that a new sample can be consumed. The busy bit is cleared by
> > +        * userspace when posting a new sample to the ringbuffer.
> > +        */
> > +       smp_store_release(&rb->consumer_pos, rb->consumer_pos + size +
> > +                         BPF_RINGBUF_HDR_SZ);
> > +
> > +       if (flags & BPF_RB_FORCE_WAKEUP || !(flags & BPF_RB_NO_WAKEUP))
> 
> please use () around bit operator expressions

Ack.

> > +               irq_work_queue(&rb->work);
> > +
> > +       atomic_set(&rb->busy, 0);
> 
> set busy before scheduling irq_work? why delaying?

Yeah, I thought about this. I don't think there's any problem with clearing
busy before we schedule the irq_work_queue(). I elected to do this to err
on the side of simpler logic until we observed contention, but yeah, let me
just do the more performant thing here.

> > +}
> > +
> > +BPF_CALL_4(bpf_user_ringbuf_drain, struct bpf_map *, map,
> > +          void *, callback_fn, void *, callback_ctx, u64, flags)
> > +{
> > +       struct bpf_ringbuf *rb;
> > +       long num_samples = 0, ret = 0;
> > +       int err;
> > +       bpf_callback_t callback = (bpf_callback_t)callback_fn;
> > +       u64 wakeup_flags = BPF_RB_NO_WAKEUP | BPF_RB_FORCE_WAKEUP;
> > +
> > +       if (unlikely(flags & ~wakeup_flags))
> > +               return -EINVAL;
> > +
> > +       /* The two wakeup flags are mutually exclusive. */
> > +       if (unlikely((flags & wakeup_flags) == wakeup_flags))
> > +               return -EINVAL;
> 
> we don't check this for existing ringbuf, so maybe let's keep it
> consistent? FORCE_WAKEUP is just stronger than NO_WAKEUP

Ack.

> > +
> > +       rb = container_of(map, struct bpf_ringbuf_map, map)->rb;
> > +       do {
> > +               u32 size;
> > +               void *sample;
> > +
> > +               err = __bpf_user_ringbuf_poll(rb, &sample, &size);
> > +
> 
> nit: don't keep empty line between function call and error check

Ack.

> > +               if (!err) {
> 
> so -ENODATA is a special error and you should stop if you get that.
> But for any other error we should propagate error back, not just
> silently consuming it
> 
> maybe
> 
> err = ...
> if (err) {
>     if (err == -ENODATA)
>       break;
>     return err;
> }
> 
> ?

Agreed, I'll fix this.

> > +                       struct bpf_dynptr_kern dynptr;
> > +
> > +                       bpf_dynptr_init(&dynptr, sample, BPF_DYNPTR_TYPE_LOCAL,
> > +                                       0, size);
> 
> we should try to avoid unnecessary re-initialization of dynptr, let's
> initialize it once and then just update data and size field inside the
> loop?

Hmm ok, let me give that a try. 

> > +                       ret = callback((u64)&dynptr,
> > +                                       (u64)(uintptr_t)callback_ctx, 0, 0, 0);
> > +
> > +                       __bpf_user_ringbuf_sample_release(rb, size, flags);
> > +                       num_samples++;
> > +               }
> > +       } while (err == 0 && num_samples < 4096 && ret == 0);
> > +
> 
> 4096 is pretty arbitrary. Definitely worth noting it somewhere and it
> seems somewhat low, tbh...
> 
> ideally we'd cond_resched() from time to time, but that would require
> BPF program to be sleepable, so we can't do that :(

Yeah, I knew this would come up in discussion. I would love to do
cond_resched() here, but as you said, I don't think it's an option :-/ And
given the fact that we're calling back into the BPF program, we have to be
cognizant of things taking a while and clogging up the CPU. What do you
think is a more reasonable number than 4096?

[...]

> >         case ARG_PTR_TO_DYNPTR:
> > +               /* We only need to check for initialized / uninitialized helper
> > +                * dynptr args if the dynptr is not MEM_ALLOC, as the assumption
> > +                * is that if it is, that a helper function initialized the
> > +                * dynptr on behalf of the BPF program.
> > +                */
> > +               if (reg->type & MEM_ALLOC)
> 
> Isn't PTR_TO_DYNPTR enough indication? Do we need MEM_ALLOC modifier?
> Normal dynptr created and used inside BPF program on the stack are
> actually PTR_TO_STACK, so that should be enough distinction? Or am I
> missing something?

I think this would also work in the current state of the codebase, but IIUC
it relies on PTR_TO_STACK being the only way that a BPF program could ever
allocate a dynptr. I was trying to guard against the case of a helper being
added in the future that e.g. returned a dynamically allocated dynptr that
the caller would eventually need to release in another helper call.
MEM_ALLOC seems like the correct modifier to more generally denote that the
dynptr was externally allocated.  If you think this is overkill I'm totally
fine with removing MEM_ALLOC. We can always add it down the road if we add
a new helper that requires it.

[...]

> > @@ -7477,11 +7524,15 @@ static int check_helper_call(struct bpf_verifier_env *env, struct bpf_insn *insn
> >                 /* Find the id of the dynptr we're acquiring a reference to */
> >                 for (i = 0; i < MAX_BPF_FUNC_REG_ARGS; i++) {
> >                         if (arg_type_is_dynptr(fn->arg_type[i])) {
> > +                               struct bpf_reg_state *reg = &regs[BPF_REG_1 + i];
> > +
> >                                 if (dynptr_id) {
> >                                         verbose(env, "verifier internal error: multiple dynptr args in func\n");
> >                                         return -EFAULT;
> >                                 }
> > -                               dynptr_id = stack_slot_get_id(env, &regs[BPF_REG_1 + i]);
> > +
> > +                               if (!(reg->type & MEM_ALLOC))
> > +                                       dynptr_id = stack_slot_get_id(env, reg);
> 
> this part has changed in bpf-next

Yeah, this is all rewired in the new version I sent out in v2 (and will
send out in v3 when I apply your other suggestions).

[...]

Thanks for the thorough review!

- David



[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux