On Fri, Sep 09, 2022 at 04:42:07PM -0700, Andrii Nakryiko wrote: > On Fri, Sep 2, 2022 at 4:43 PM David Vernet <void@xxxxxxxxxxxxx> wrote: > > > > In a prior change, we added a new BPF_MAP_TYPE_USER_RINGBUF map type which > > will allow user-space applications to publish messages to a ring buffer > > that is consumed by a BPF program in kernel-space. In order for this > > map-type to be useful, it will require a BPF helper function that BPF > > programs can invoke to drain samples from the ring buffer, and invoke > > callbacks on those samples. This change adds that capability via a new BPF > > helper function: > > > > bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void *ctx, > > u64 flags) > > > > BPF programs may invoke this function to run callback_fn() on a series of > > samples in the ring buffer. callback_fn() has the following signature: > > > > long callback_fn(struct bpf_dynptr *dynptr, void *context); > > > > Samples are provided to the callback in the form of struct bpf_dynptr *'s, > > which the program can read using BPF helper functions for querying > > struct bpf_dynptr's. > > > > In order to support bpf_ringbuf_drain(), a new PTR_TO_DYNPTR register > > type is added to the verifier to reflect a dynptr that was allocated by > > a helper function and passed to a BPF program. Unlike PTR_TO_STACK > > dynptrs which are allocated on the stack by a BPF program, PTR_TO_DYNPTR > > dynptrs need not use reference tracking, as the BPF helper is trusted to > > properly free the dynptr before returning. The verifier currently only > > supports PTR_TO_DYNPTR registers that are also DYNPTR_TYPE_LOCAL. > > > > Note that while the corresponding user-space libbpf logic will be added in > > a subsequent patch, this patch does contain an implementation of the > > .map_poll() callback for BPF_MAP_TYPE_USER_RINGBUF maps. This .map_poll() > > callback guarantees that an epoll-waiting user-space producer will > > receive at least one event notification whenever at least one sample is > > drained in an invocation of bpf_user_ringbuf_drain(), provided that the > > function is not invoked with the BPF_RB_NO_WAKEUP flag. > > > > Sending an event notification for every sample is not an option, as it > > could cause the system to hang due to invoking irq_work_queue() in > > too-frequent succession. So as to try and optimize for the common case, > > however, bpf_user_ringbuf_drain() will also send an event notification > > whenever a sample being drained causes the ring buffer to no longer be > > full. This heuristic may not help some user-space producers, as a > > producer can publish samples of varying size, and there may not be > > enough space in the ring buffer after the first sample is drained which > > causes it to no longer be full. In this case, the producer may have to > > wait until bpf_ringbuf_drain() returns to receive an event notification. > > most of this paragraph should be removed now? Sorry, good catch. Will remove in v6. > I mentioned few things I didn't see last time (or that were added), > but overall looks good to me. Hmm, let me know if you remember what this is. After reading through this, I can't think of anything else that should be added in this commit summary. [...] > > +static __poll_t ringbuf_map_poll_user(struct bpf_map *map, struct file *filp, > > + struct poll_table_struct *pts) > > +{ > > + struct bpf_ringbuf_map *rb_map; > > + > > + rb_map = container_of(map, struct bpf_ringbuf_map, map); > > + poll_wait(filp, &rb_map->rb->waitq, pts); > > + > > + if (ringbuf_avail_data_sz(rb_map->rb) < ringbuf_total_data_sz(rb_map->rb)) > > + return EPOLLOUT | EPOLLWRNORM; > > nit: extra space after return Ack. [...] > > +static int __bpf_user_ringbuf_peek(struct bpf_ringbuf *rb, void **sample, u32 *size) > > +{ > > + int err, busy = 0; > > + u32 hdr_len, sample_len, total_len, flags, *hdr; > > + u64 cons_pos, prod_pos; > > + > > + /* If another consumer is already consuming a sample, wait for them to finish. */ > > + if (!atomic_try_cmpxchg(&rb->busy, &busy, 1)) > > + return -EBUSY; > > + > > + /* Synchronizes with smp_store_release() in user-space producer. */ > > + prod_pos = smp_load_acquire(&rb->producer_pos); > > + if (prod_pos % 8) { > > + err = -EINVAL; > > + goto err_unlock; > > + } > > + > > + /* Synchronizes with smp_store_release() in __bpf_user_ringbuf_sample_release() */ > > + cons_pos = smp_load_acquire(&rb->consumer_pos); > > + if (cons_pos >= prod_pos) { > > + err = -ENOSPC; > > funny, this one actually feels like -ENODATA (there is no data to peek > at, right?). It's different from ENOSPC, as we are not trying to > enqueue anything. I probably missed it before? Ah, yeah, that makes sense. I think the value should be ENODATA for a BPF program trying to read samples when the ring buffer is empty, and ENOSPC for a user space program trying to post a sample to a ring buffer that doesn't have enough room. I'll update this to be the case in v6. [...] > > +static void __bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size, u64 flags) > > +{ > > + u64 consumer_pos; > > + u32 rounded_size = round_up(size + BPF_RINGBUF_HDR_SZ, 8); > > + > > + /* Using smp_load_acquire() is unnecessary here, as the busy-bit > > + * prevents another task from writing to consumer_pos after it was read > > + * by this task with smp_load_acquire() in __bpf_user_ringbuf_peek(). > > + */ > > + consumer_pos = rb->consumer_pos; > > + /* Synchronizes with smp_load_acquire() in user-space producer. */ > > + smp_store_release(&rb->consumer_pos, consumer_pos + rounded_size); > > + > > + /* Prevent the clearing of the busy-bit from being reordered before the > > + * storing of the updated rb->consumer_pos value. > > + */ > > + smp_mb__before_atomic(); > > + atomic_set(&rb->busy, 0); > > + > > + if (flags & BPF_RB_FORCE_WAKEUP) > > + irq_work_queue(&rb->work); > > I think this part is new, you decided to define that FORCE_WAKEUP > sends wakeup after every single consumed sample? I have no strong > opinion on this, tbh, just wonder if it wasn't enough to do it once > after drain? I didn't have a strong reason for doing this other than that I think it more closely matches the behavior for BPF_MAP_TYPE_RINGBUF (which invokes irq_work_queue() after every call to bpf_ringbuf_commit() if BPF_RB_FORCE_WAKEUP is passed). Let's just match that behavior unless we have a good reason not to? I think that will be more intuitive for users. > > +} > > + > > +BPF_CALL_4(bpf_user_ringbuf_drain, struct bpf_map *, map, > > + void *, callback_fn, void *, callback_ctx, u64, flags) > > +{ > > + struct bpf_ringbuf *rb; > > + long samples, discarded_samples = 0, ret = 0; > > + bpf_callback_t callback = (bpf_callback_t)callback_fn; > > + u64 wakeup_flags = BPF_RB_NO_WAKEUP | BPF_RB_FORCE_WAKEUP; > > + > > + if (unlikely(flags & ~wakeup_flags)) > > + return -EINVAL; > > + > > + rb = container_of(map, struct bpf_ringbuf_map, map)->rb; > > + for (samples = 0; samples < BPF_MAX_USER_RINGBUF_SAMPLES && ret == 0; samples++) { > > + int err; > > + u32 size; > > + void *sample; > > + struct bpf_dynptr_kern dynptr; > > + > > + err = __bpf_user_ringbuf_peek(rb, &sample, &size); > > so I also just realized that ringbuf_peek will keep setting/resetting > busy flag, and in like all the practical case it's a completely > useless work as we don't intend to have competing consumers, right? So > maybe move busy bit handling into drain itself and document that peek > expect busy taken care of? > > This should be noticeable faster when there are multiple records > consumed in one drain. Great idea, I'll do this in v6.