On Mon, Aug 8, 2022 at 8:54 AM David Vernet <void@xxxxxxxxxxxxx> wrote: > > Now that we have a BPF_MAP_TYPE_USER_RINGBUF map type, we need to add a > helper function that allows BPF programs to drain samples from the ring > buffer, and invoke a callback for each. This patch adds a new > bpf_user_ringbuf_drain() helper that provides this abstraction. > > In order to support this, we needed to also add a new PTR_TO_DYNPTR > register type to reflect a dynptr that was allocated by a helper function > and passed to a BPF program. The verifier currently only supports > PTR_TO_DYNPTR registers that are also DYNPTR_TYPE_LOCAL and MEM_ALLOC. This commit message is a bit too laconic. There is a lot of implications of various parts of this patch, it would be great to highlight most important ones. Please consider elaborating a bit more. > > Signed-off-by: David Vernet <void@xxxxxxxxxxxxx> > --- > include/linux/bpf.h | 6 +- > include/uapi/linux/bpf.h | 8 ++ > kernel/bpf/helpers.c | 2 + > kernel/bpf/ringbuf.c | 157 +++++++++++++++++++++++++++++++-- > kernel/bpf/verifier.c | 57 +++++++++++- > tools/include/uapi/linux/bpf.h | 8 ++ > 6 files changed, 228 insertions(+), 10 deletions(-) > > diff --git a/include/linux/bpf.h b/include/linux/bpf.h > index 20c26aed7896..4d0d5f2a3ac8 100644 > --- a/include/linux/bpf.h > +++ b/include/linux/bpf.h > @@ -401,7 +401,7 @@ enum bpf_type_flag { > /* DYNPTR points to memory local to the bpf program. */ > DYNPTR_TYPE_LOCAL = BIT(8 + BPF_BASE_TYPE_BITS), > > - /* DYNPTR points to a ringbuf record. */ > + /* DYNPTR points to a kernel-produced ringbuf record. */ > DYNPTR_TYPE_RINGBUF = BIT(9 + BPF_BASE_TYPE_BITS), > > /* Size is known at compile time. */ > @@ -606,6 +606,7 @@ enum bpf_reg_type { > PTR_TO_MEM, /* reg points to valid memory region */ > PTR_TO_BUF, /* reg points to a read/write buffer */ > PTR_TO_FUNC, /* reg points to a bpf program function */ > + PTR_TO_DYNPTR, /* reg points to a dynptr */ > __BPF_REG_TYPE_MAX, > > /* Extended reg_types. */ > @@ -2410,6 +2411,7 @@ extern const struct bpf_func_proto bpf_loop_proto; > extern const struct bpf_func_proto bpf_copy_from_user_task_proto; > extern const struct bpf_func_proto bpf_set_retval_proto; > extern const struct bpf_func_proto bpf_get_retval_proto; > +extern const struct bpf_func_proto bpf_user_ringbuf_drain_proto; > > const struct bpf_func_proto *tracing_prog_func_proto( > enum bpf_func_id func_id, const struct bpf_prog *prog); > @@ -2554,7 +2556,7 @@ enum bpf_dynptr_type { > BPF_DYNPTR_TYPE_INVALID, > /* Points to memory that is local to the bpf program */ > BPF_DYNPTR_TYPE_LOCAL, > - /* Underlying data is a ringbuf record */ > + /* Underlying data is a kernel-produced ringbuf record */ > BPF_DYNPTR_TYPE_RINGBUF, > }; > > diff --git a/include/uapi/linux/bpf.h b/include/uapi/linux/bpf.h > index a341f877b230..ca125648d7fd 100644 > --- a/include/uapi/linux/bpf.h > +++ b/include/uapi/linux/bpf.h > @@ -5332,6 +5332,13 @@ union bpf_attr { > * **-EACCES** if the SYN cookie is not valid. > * > * **-EPROTONOSUPPORT** if CONFIG_IPV6 is not builtin. > + * > + * long bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void *ctx, u64 flags) > + * Description > + * Drain samples from the specified user ringbuffer, and invoke the > + * provided callback for each such sample. please specify what's the expected signature of callback_fn > + * Return > + * An error if a sample could not be drained. Negative error, right? And might be worth it briefly describing what are the situation(s) when you won't be able to drain a sample? Also please clarify if having no sample to drain is an error or not? It's also important to specify that if no error (and -ENODATA shouldn't be an error, actually) occurred then we get number of consumed samples back. > */ > #define __BPF_FUNC_MAPPER(FN) \ > FN(unspec), \ > @@ -5542,6 +5549,7 @@ union bpf_attr { > FN(tcp_raw_gen_syncookie_ipv6), \ > FN(tcp_raw_check_syncookie_ipv4), \ > FN(tcp_raw_check_syncookie_ipv6), \ > + FN(user_ringbuf_drain), \ > /* */ > > /* integer value in 'imm' field of BPF_CALL instruction selects which helper > diff --git a/kernel/bpf/helpers.c b/kernel/bpf/helpers.c > index 1f961f9982d2..b8097876014c 100644 > --- a/kernel/bpf/helpers.c > +++ b/kernel/bpf/helpers.c > @@ -1647,6 +1647,8 @@ bpf_base_func_proto(enum bpf_func_id func_id) > return &bpf_dynptr_write_proto; > case BPF_FUNC_dynptr_data: > return &bpf_dynptr_data_proto; > + case BPF_FUNC_user_ringbuf_drain: > + return &bpf_user_ringbuf_drain_proto; > default: > break; > } > diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c > index 29e2de42df15..fc589fc8eb7c 100644 > --- a/kernel/bpf/ringbuf.c > +++ b/kernel/bpf/ringbuf.c > @@ -291,16 +291,33 @@ static unsigned long ringbuf_avail_data_sz(struct bpf_ringbuf *rb) > } > > static __poll_t ringbuf_map_poll(struct bpf_map *map, struct file *filp, > - struct poll_table_struct *pts) > + struct poll_table_struct *pts, > + bool kernel_producer) > { > struct bpf_ringbuf_map *rb_map; > + bool buffer_empty; > > rb_map = container_of(map, struct bpf_ringbuf_map, map); > poll_wait(filp, &rb_map->rb->waitq, pts); > > - if (ringbuf_avail_data_sz(rb_map->rb)) > - return EPOLLIN | EPOLLRDNORM; > - return 0; > + buffer_empty = !ringbuf_avail_data_sz(rb_map->rb); > + > + if (kernel_producer) > + return buffer_empty ? 0 : EPOLLIN | EPOLLRDNORM; > + else > + return buffer_empty ? EPOLLOUT | EPOLLWRNORM : 0; > +} > + > +static __poll_t ringbuf_map_poll_kern(struct bpf_map *map, struct file *filp, > + struct poll_table_struct *pts) > +{ > + return ringbuf_map_poll(map, filp, pts, true); > +} > + > +static __poll_t ringbuf_map_poll_user(struct bpf_map *map, struct file *filp, > + struct poll_table_struct *pts) > +{ > + return ringbuf_map_poll(map, filp, pts, false); > } This is an even stronger case where I think we should keep two implementations completely separate. Please keep existing ringbuf_map_poll as is and just add a user variant as a separate implementation > > BTF_ID_LIST_SINGLE(ringbuf_map_btf_ids, struct, bpf_ringbuf_map) > @@ -309,7 +326,7 @@ const struct bpf_map_ops ringbuf_map_ops = { > .map_alloc = ringbuf_map_alloc, > .map_free = ringbuf_map_free, > .map_mmap = ringbuf_map_mmap_kern, > - .map_poll = ringbuf_map_poll, > + .map_poll = ringbuf_map_poll_kern, > .map_lookup_elem = ringbuf_map_lookup_elem, > .map_update_elem = ringbuf_map_update_elem, > .map_delete_elem = ringbuf_map_delete_elem, > @@ -323,6 +340,7 @@ const struct bpf_map_ops user_ringbuf_map_ops = { > .map_alloc = ringbuf_map_alloc, > .map_free = ringbuf_map_free, > .map_mmap = ringbuf_map_mmap_user, > + .map_poll = ringbuf_map_poll_user, > .map_lookup_elem = ringbuf_map_lookup_elem, > .map_update_elem = ringbuf_map_update_elem, > .map_delete_elem = ringbuf_map_delete_elem, > @@ -605,3 +623,132 @@ const struct bpf_func_proto bpf_ringbuf_discard_dynptr_proto = { > .arg1_type = ARG_PTR_TO_DYNPTR | DYNPTR_TYPE_RINGBUF | OBJ_RELEASE, > .arg2_type = ARG_ANYTHING, > }; > + > +static int __bpf_user_ringbuf_poll(struct bpf_ringbuf *rb, void **sample, > + u32 *size) "poll" part is quite confusing as it has nothing to do with epoll and the other poll implementation. Maybe "peek"? It peek into next sample without consuming it, seems appropriate nit: this declaration can also stay on single line > +{ > + unsigned long cons_pos, prod_pos; > + u32 sample_len, total_len; > + u32 *hdr; > + int err; > + int busy = 0; nit: combine matching types: u32 sample_len, total_len, *hdr; int err, busy = 0; > + > + /* If another consumer is already consuming a sample, wait for them to > + * finish. > + */ > + if (!atomic_try_cmpxchg(&rb->busy, &busy, 1)) > + return -EBUSY; > + > + /* Synchronizes with smp_store_release() in user-space. */ > + prod_pos = smp_load_acquire(&rb->producer_pos); I think we should enforce that prod_pos is a multiple of 8 > + /* Synchronizes with smp_store_release() in > + * __bpf_user_ringbuf_sample_release(). > + */ > + cons_pos = smp_load_acquire(&rb->consumer_pos); > + if (cons_pos >= prod_pos) { > + atomic_set(&rb->busy, 0); > + return -ENODATA; > + } > + > + hdr = (u32 *)((uintptr_t)rb->data + (cons_pos & rb->mask)); > + sample_len = *hdr; do we need smp_load_acquire() here? libbpf's ring_buffer implementation uses load_acquire here > + > + /* Check that the sample can fit into a dynptr. */ > + err = bpf_dynptr_check_size(sample_len); > + if (err) { > + atomic_set(&rb->busy, 0); > + return err; > + } > + > + /* Check that the sample fits within the region advertised by the > + * consumer position. > + */ > + total_len = sample_len + BPF_RINGBUF_HDR_SZ; round up to closest multiple of 8? All the pointers into ringbuf data area should be 8-byte aligned > + if (total_len > prod_pos - cons_pos) { > + atomic_set(&rb->busy, 0); > + return -E2BIG; > + } > + > + /* Check that the sample fits within the data region of the ring buffer. > + */ > + if (total_len > rb->mask + 1) { > + atomic_set(&rb->busy, 0); > + return -E2BIG; > + } > + > + /* consumer_pos is updated when the sample is released. > + */ > + nit: unnecessary empty line and please keep single-line comments as single-line, no */ on separate line in such case > + *sample = (void *)((uintptr_t)rb->data + > + ((cons_pos + BPF_RINGBUF_HDR_SZ) & rb->mask)); > + *size = sample_len; > + > + return 0; > +} > + > +static void > +__bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size, > + u64 flags) try to keep single lines if they are under 100 characters > +{ > + > + empty lines, why? > + /* To release the ringbuffer, just increment the producer position to > + * signal that a new sample can be consumed. The busy bit is cleared by > + * userspace when posting a new sample to the ringbuffer. > + */ > + smp_store_release(&rb->consumer_pos, rb->consumer_pos + size + > + BPF_RINGBUF_HDR_SZ); > + > + if (flags & BPF_RB_FORCE_WAKEUP || !(flags & BPF_RB_NO_WAKEUP)) please use () around bit operator expressions > + irq_work_queue(&rb->work); > + > + atomic_set(&rb->busy, 0); set busy before scheduling irq_work? why delaying? > +} > + > +BPF_CALL_4(bpf_user_ringbuf_drain, struct bpf_map *, map, > + void *, callback_fn, void *, callback_ctx, u64, flags) > +{ > + struct bpf_ringbuf *rb; > + long num_samples = 0, ret = 0; > + int err; > + bpf_callback_t callback = (bpf_callback_t)callback_fn; > + u64 wakeup_flags = BPF_RB_NO_WAKEUP | BPF_RB_FORCE_WAKEUP; > + > + if (unlikely(flags & ~wakeup_flags)) > + return -EINVAL; > + > + /* The two wakeup flags are mutually exclusive. */ > + if (unlikely((flags & wakeup_flags) == wakeup_flags)) > + return -EINVAL; we don't check this for existing ringbuf, so maybe let's keep it consistent? FORCE_WAKEUP is just stronger than NO_WAKEUP > + > + rb = container_of(map, struct bpf_ringbuf_map, map)->rb; > + do { > + u32 size; > + void *sample; > + > + err = __bpf_user_ringbuf_poll(rb, &sample, &size); > + nit: don't keep empty line between function call and error check > + if (!err) { so -ENODATA is a special error and you should stop if you get that. But for any other error we should propagate error back, not just silently consuming it maybe err = ... if (err) { if (err == -ENODATA) break; return err; } ? > + struct bpf_dynptr_kern dynptr; > + > + bpf_dynptr_init(&dynptr, sample, BPF_DYNPTR_TYPE_LOCAL, > + 0, size); we should try to avoid unnecessary re-initialization of dynptr, let's initialize it once and then just update data and size field inside the loop? > + ret = callback((u64)&dynptr, > + (u64)(uintptr_t)callback_ctx, 0, 0, 0); > + > + __bpf_user_ringbuf_sample_release(rb, size, flags); > + num_samples++; > + } > + } while (err == 0 && num_samples < 4096 && ret == 0); > + 4096 is pretty arbitrary. Definitely worth noting it somewhere and it seems somewhat low, tbh... ideally we'd cond_resched() from time to time, but that would require BPF program to be sleepable, so we can't do that :( > + return num_samples; > +} > + > +const struct bpf_func_proto bpf_user_ringbuf_drain_proto = { > + .func = bpf_user_ringbuf_drain, > + .ret_type = RET_INTEGER, > + .arg1_type = ARG_CONST_MAP_PTR, > + .arg2_type = ARG_PTR_TO_FUNC, > + .arg3_type = ARG_PTR_TO_STACK_OR_NULL, > + .arg4_type = ARG_ANYTHING, > +}; > diff --git a/kernel/bpf/verifier.c b/kernel/bpf/verifier.c > index 4a9562c62af0..99dfdc3ed187 100644 > --- a/kernel/bpf/verifier.c > +++ b/kernel/bpf/verifier.c > @@ -555,6 +555,7 @@ static const char *reg_type_str(struct bpf_verifier_env *env, > [PTR_TO_BUF] = "buf", > [PTR_TO_FUNC] = "func", > [PTR_TO_MAP_KEY] = "map_key", > + [PTR_TO_DYNPTR] = "dynptr_ptr", > }; > > if (type & PTR_MAYBE_NULL) { > @@ -5656,6 +5657,12 @@ static const struct bpf_reg_types stack_ptr_types = { .types = { PTR_TO_STACK } > static const struct bpf_reg_types const_str_ptr_types = { .types = { PTR_TO_MAP_VALUE } }; > static const struct bpf_reg_types timer_types = { .types = { PTR_TO_MAP_VALUE } }; > static const struct bpf_reg_types kptr_types = { .types = { PTR_TO_MAP_VALUE } }; > +static const struct bpf_reg_types dynptr_types = { > + .types = { > + PTR_TO_STACK, > + PTR_TO_DYNPTR | MEM_ALLOC | DYNPTR_TYPE_LOCAL, > + } > +}; > > static const struct bpf_reg_types *compatible_reg_types[__BPF_ARG_TYPE_MAX] = { > [ARG_PTR_TO_MAP_KEY] = &map_key_value_types, > @@ -5682,7 +5689,7 @@ static const struct bpf_reg_types *compatible_reg_types[__BPF_ARG_TYPE_MAX] = { > [ARG_PTR_TO_CONST_STR] = &const_str_ptr_types, > [ARG_PTR_TO_TIMER] = &timer_types, > [ARG_PTR_TO_KPTR] = &kptr_types, > - [ARG_PTR_TO_DYNPTR] = &stack_ptr_types, > + [ARG_PTR_TO_DYNPTR] = &dynptr_types, > }; > > static int check_reg_type(struct bpf_verifier_env *env, u32 regno, > @@ -6025,6 +6032,13 @@ static int check_func_arg(struct bpf_verifier_env *env, u32 arg, > err = check_mem_size_reg(env, reg, regno, true, meta); > break; > case ARG_PTR_TO_DYNPTR: > + /* We only need to check for initialized / uninitialized helper > + * dynptr args if the dynptr is not MEM_ALLOC, as the assumption > + * is that if it is, that a helper function initialized the > + * dynptr on behalf of the BPF program. > + */ > + if (reg->type & MEM_ALLOC) Isn't PTR_TO_DYNPTR enough indication? Do we need MEM_ALLOC modifier? Normal dynptr created and used inside BPF program on the stack are actually PTR_TO_STACK, so that should be enough distinction? Or am I missing something? > + break; > if (arg_type & MEM_UNINIT) { > if (!is_dynptr_reg_valid_uninit(env, reg)) { > verbose(env, "Dynptr has to be an uninitialized dynptr\n"); > @@ -6197,7 +6211,9 @@ static int check_map_func_compatibility(struct bpf_verifier_env *env, > goto error; > break; > case BPF_MAP_TYPE_USER_RINGBUF: > - goto error; > + if (func_id != BPF_FUNC_user_ringbuf_drain) > + goto error; > + break; > case BPF_MAP_TYPE_STACK_TRACE: > if (func_id != BPF_FUNC_get_stackid) > goto error; > @@ -6317,6 +6333,10 @@ static int check_map_func_compatibility(struct bpf_verifier_env *env, > if (map->map_type != BPF_MAP_TYPE_RINGBUF) > goto error; > break; > + case BPF_FUNC_user_ringbuf_drain: > + if (map->map_type != BPF_MAP_TYPE_USER_RINGBUF) > + goto error; > + break; > case BPF_FUNC_get_stackid: > if (map->map_type != BPF_MAP_TYPE_STACK_TRACE) > goto error; > @@ -6901,6 +6921,29 @@ static int set_find_vma_callback_state(struct bpf_verifier_env *env, > return 0; > } > > +static int set_user_ringbuf_callback_state(struct bpf_verifier_env *env, > + struct bpf_func_state *caller, > + struct bpf_func_state *callee, > + int insn_idx) > +{ > + /* bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void > + * callback_ctx, u64 flags); > + * callback_fn(struct bpf_dynptr_t* dynptr, void *callback_ctx); > + */ > + __mark_reg_not_init(env, &callee->regs[BPF_REG_0]); > + callee->regs[BPF_REG_1].type = PTR_TO_DYNPTR | DYNPTR_TYPE_LOCAL | MEM_ALLOC; > + __mark_reg_known_zero(&callee->regs[BPF_REG_1]); > + callee->regs[BPF_REG_2] = caller->regs[BPF_REG_3]; > + > + /* unused */ > + __mark_reg_not_init(env, &callee->regs[BPF_REG_3]); > + __mark_reg_not_init(env, &callee->regs[BPF_REG_4]); > + __mark_reg_not_init(env, &callee->regs[BPF_REG_5]); > + > + callee->in_callback_fn = true; > + return 0; > +} > + > static int prepare_func_exit(struct bpf_verifier_env *env, int *insn_idx) > { > struct bpf_verifier_state *state = env->cur_state; > @@ -7345,6 +7388,10 @@ static int check_helper_call(struct bpf_verifier_env *env, struct bpf_insn *insn > } > } > break; > + case BPF_FUNC_user_ringbuf_drain: > + err = __check_func_call(env, insn, insn_idx_p, meta.subprogno, > + set_user_ringbuf_callback_state); > + break; > } > > if (err) > @@ -7477,11 +7524,15 @@ static int check_helper_call(struct bpf_verifier_env *env, struct bpf_insn *insn > /* Find the id of the dynptr we're acquiring a reference to */ > for (i = 0; i < MAX_BPF_FUNC_REG_ARGS; i++) { > if (arg_type_is_dynptr(fn->arg_type[i])) { > + struct bpf_reg_state *reg = ®s[BPF_REG_1 + i]; > + > if (dynptr_id) { > verbose(env, "verifier internal error: multiple dynptr args in func\n"); > return -EFAULT; > } > - dynptr_id = stack_slot_get_id(env, ®s[BPF_REG_1 + i]); > + > + if (!(reg->type & MEM_ALLOC)) > + dynptr_id = stack_slot_get_id(env, reg); this part has changed in bpf-next > } > } > /* For release_reference() */ > diff --git a/tools/include/uapi/linux/bpf.h b/tools/include/uapi/linux/bpf.h > index ce0ce713faf9..e5e4ab12177c 100644 > --- a/tools/include/uapi/linux/bpf.h > +++ b/tools/include/uapi/linux/bpf.h > @@ -5332,6 +5332,13 @@ union bpf_attr { > * **-EACCES** if the SYN cookie is not valid. > * > * **-EPROTONOSUPPORT** if CONFIG_IPV6 is not builtin. > + * > + * long bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void *ctx, u64 flags) > + * Description > + * Drain samples from the specified user ringbuffer, and invoke the > + * provided callback for each such sample. > + * Return > + * An error if a sample could not be drained. > */ > #define __BPF_FUNC_MAPPER(FN) \ > FN(unspec), \ > @@ -5542,6 +5549,7 @@ union bpf_attr { > FN(tcp_raw_gen_syncookie_ipv6), \ > FN(tcp_raw_check_syncookie_ipv4), \ > FN(tcp_raw_check_syncookie_ipv6), \ > + FN(bpf_user_ringbuf_drain), \ > /* */ > > /* integer value in 'imm' field of BPF_CALL instruction selects which helper > -- > 2.30.2 >