Re: [PATCH v5 2/4] bpf: Add bpf_user_ringbuf_drain() helper

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Fri, Sep 09, 2022 at 04:42:07PM -0700, Andrii Nakryiko wrote:
> On Fri, Sep 2, 2022 at 4:43 PM David Vernet <void@xxxxxxxxxxxxx> wrote:
> >
> > In a prior change, we added a new BPF_MAP_TYPE_USER_RINGBUF map type which
> > will allow user-space applications to publish messages to a ring buffer
> > that is consumed by a BPF program in kernel-space. In order for this
> > map-type to be useful, it will require a BPF helper function that BPF
> > programs can invoke to drain samples from the ring buffer, and invoke
> > callbacks on those samples. This change adds that capability via a new BPF
> > helper function:
> >
> > bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void *ctx,
> >                        u64 flags)
> >
> > BPF programs may invoke this function to run callback_fn() on a series of
> > samples in the ring buffer. callback_fn() has the following signature:
> >
> > long callback_fn(struct bpf_dynptr *dynptr, void *context);
> >
> > Samples are provided to the callback in the form of struct bpf_dynptr *'s,
> > which the program can read using BPF helper functions for querying
> > struct bpf_dynptr's.
> >
> > In order to support bpf_ringbuf_drain(), a new PTR_TO_DYNPTR register
> > type is added to the verifier to reflect a dynptr that was allocated by
> > a helper function and passed to a BPF program. Unlike PTR_TO_STACK
> > dynptrs which are allocated on the stack by a BPF program, PTR_TO_DYNPTR
> > dynptrs need not use reference tracking, as the BPF helper is trusted to
> > properly free the dynptr before returning. The verifier currently only
> > supports PTR_TO_DYNPTR registers that are also DYNPTR_TYPE_LOCAL.
> >
> > Note that while the corresponding user-space libbpf logic will be added in
> > a subsequent patch, this patch does contain an implementation of the
> > .map_poll() callback for BPF_MAP_TYPE_USER_RINGBUF maps. This .map_poll()
> > callback guarantees that an epoll-waiting user-space producer will
> > receive at least one event notification whenever at least one sample is
> > drained in an invocation of bpf_user_ringbuf_drain(), provided that the
> > function is not invoked with the BPF_RB_NO_WAKEUP flag.
> >
> > Sending an event notification for every sample is not an option, as it
> > could cause the system to hang due to invoking irq_work_queue() in
> > too-frequent succession. So as to try and optimize for the common case,
> > however, bpf_user_ringbuf_drain() will also send an event notification
> > whenever a sample being drained causes the ring buffer to no longer be
> > full. This heuristic may not help some user-space producers, as a
> > producer can publish samples of varying size, and there may not be
> > enough space in the ring buffer after the first sample is drained which
> > causes it to no longer be full. In this case, the producer may have to
> > wait until bpf_ringbuf_drain() returns to receive an event notification.
> 
> most of this paragraph should be removed now?

Sorry, good catch. Will remove in v6.

> I mentioned few things I didn't see last time (or that were added),
> but overall looks good to me.

Hmm, let me know if you remember what this is. After reading through this,
I can't think of anything else that should be added in this commit summary.

[...]

> > +static __poll_t ringbuf_map_poll_user(struct bpf_map *map, struct file *filp,
> > +                                     struct poll_table_struct *pts)
> > +{
> > +       struct bpf_ringbuf_map *rb_map;
> > +
> > +       rb_map = container_of(map, struct bpf_ringbuf_map, map);
> > +       poll_wait(filp, &rb_map->rb->waitq, pts);
> > +
> > +       if (ringbuf_avail_data_sz(rb_map->rb) < ringbuf_total_data_sz(rb_map->rb))
> > +               return  EPOLLOUT | EPOLLWRNORM;
> 
> nit: extra space after return

Ack.

[...]

> > +static int __bpf_user_ringbuf_peek(struct bpf_ringbuf *rb, void **sample, u32 *size)
> > +{
> > +       int err, busy = 0;
> > +       u32 hdr_len, sample_len, total_len, flags, *hdr;
> > +       u64 cons_pos, prod_pos;
> > +
> > +       /* If another consumer is already consuming a sample, wait for them to finish. */
> > +       if (!atomic_try_cmpxchg(&rb->busy, &busy, 1))
> > +               return -EBUSY;
> > +
> > +       /* Synchronizes with smp_store_release() in user-space producer. */
> > +       prod_pos = smp_load_acquire(&rb->producer_pos);
> > +       if (prod_pos % 8) {
> > +               err = -EINVAL;
> > +               goto err_unlock;
> > +       }
> > +
> > +       /* Synchronizes with smp_store_release() in __bpf_user_ringbuf_sample_release() */
> > +       cons_pos = smp_load_acquire(&rb->consumer_pos);
> > +       if (cons_pos >= prod_pos) {
> > +               err = -ENOSPC;
> 
> funny, this one actually feels like -ENODATA (there is no data to peek
> at, right?). It's different from ENOSPC, as we are not trying to
> enqueue anything. I probably missed it before?

Ah, yeah, that makes sense. I think the value should be ENODATA for a BPF
program trying to read samples when the ring buffer is empty, and ENOSPC
for a user space program trying to post a sample to a ring buffer that
doesn't have enough room. I'll update this to be the case in v6.

[...]

> > +static void __bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size, u64 flags)
> > +{
> > +       u64 consumer_pos;
> > +       u32 rounded_size = round_up(size + BPF_RINGBUF_HDR_SZ, 8);
> > +
> > +       /* Using smp_load_acquire() is unnecessary here, as the busy-bit
> > +        * prevents another task from writing to consumer_pos after it was read
> > +        * by this task with smp_load_acquire() in __bpf_user_ringbuf_peek().
> > +        */
> > +       consumer_pos = rb->consumer_pos;
> > +        /* Synchronizes with smp_load_acquire() in user-space producer. */
> > +       smp_store_release(&rb->consumer_pos, consumer_pos + rounded_size);
> > +
> > +       /* Prevent the clearing of the busy-bit from being reordered before the
> > +        * storing of the updated rb->consumer_pos value.
> > +        */
> > +       smp_mb__before_atomic();
> > +       atomic_set(&rb->busy, 0);
> > +
> > +       if (flags & BPF_RB_FORCE_WAKEUP)
> > +               irq_work_queue(&rb->work);
> 
> I think this part is new, you decided to define that FORCE_WAKEUP
> sends wakeup after every single consumed sample? I have no strong
> opinion on this, tbh, just wonder if it wasn't enough to do it once
> after drain?

I didn't have a strong reason for doing this other than that I think it
more closely matches the behavior for BPF_MAP_TYPE_RINGBUF (which invokes
irq_work_queue() after every call to bpf_ringbuf_commit() if
BPF_RB_FORCE_WAKEUP is passed). Let's just match that behavior unless we
have a good reason not to? I think that will be more intuitive for users.

> > +}
> > +
> > +BPF_CALL_4(bpf_user_ringbuf_drain, struct bpf_map *, map,
> > +          void *, callback_fn, void *, callback_ctx, u64, flags)
> > +{
> > +       struct bpf_ringbuf *rb;
> > +       long samples, discarded_samples = 0, ret = 0;
> > +       bpf_callback_t callback = (bpf_callback_t)callback_fn;
> > +       u64 wakeup_flags = BPF_RB_NO_WAKEUP | BPF_RB_FORCE_WAKEUP;
> > +
> > +       if (unlikely(flags & ~wakeup_flags))
> > +               return -EINVAL;
> > +
> > +       rb = container_of(map, struct bpf_ringbuf_map, map)->rb;
> > +       for (samples = 0; samples < BPF_MAX_USER_RINGBUF_SAMPLES && ret == 0; samples++) {
> > +               int err;
> > +               u32 size;
> > +               void *sample;
> > +               struct bpf_dynptr_kern dynptr;
> > +
> > +               err = __bpf_user_ringbuf_peek(rb, &sample, &size);
> 
> so I also just realized that ringbuf_peek will keep setting/resetting
> busy flag, and in like all the practical case it's a completely
> useless work as we don't intend to have competing consumers, right? So
> maybe move busy bit handling into drain itself and document that peek
> expect busy taken care of?
> 
> This should be noticeable faster when there are multiple records
> consumed in one drain.

Great idea, I'll do this in v6.



[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux