On Fri, Sep 2, 2022 at 4:43 PM David Vernet <void@xxxxxxxxxxxxx> wrote: > > Now that all of the logic is in place in the kernel to support user-space > produced ring buffers, we can add the user-space logic to libbpf. This > patch therefore adds the following public symbols to libbpf: > > struct user_ring_buffer * > user_ring_buffer__new(int map_fd, > const struct user_ring_buffer_opts *opts); > void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size); > void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb, > __u32 size, int timeout_ms); > void user_ring_buffer__submit(struct user_ring_buffer *rb, void *sample); > void user_ring_buffer__discard(struct user_ring_buffer *rb, > void user_ring_buffer__free(struct user_ring_buffer *rb); > > A user-space producer must first create a struct user_ring_buffer * object > with user_ring_buffer__new(), and can then reserve samples in the > ring buffer using one of the following two symbols: > > void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size); > void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb, > __u32 size, int timeout_ms); > > With user_ring_buffer__reserve(), a pointer to a 'size' region of the ring > buffer will be returned if sufficient space is available in the buffer. > user_ring_buffer__reserve_blocking() provides similar semantics, but will > block for up to 'timeout_ms' in epoll_wait if there is insufficient space > in the buffer. This function has the guarantee from the kernel that it will > receive at least one event-notification per invocation to > bpf_ringbuf_drain(), provided that at least one sample is drained, and the > BPF program did not pass the BPF_RB_NO_WAKEUP flag to bpf_ringbuf_drain(). > > Once a sample is reserved, it must either be committed to the ring buffer > with user_ring_buffer__submit(), or discarded with > user_ring_buffer__discard(). > > Signed-off-by: David Vernet <void@xxxxxxxxxxxxx> > --- > tools/lib/bpf/libbpf.c | 10 +- > tools/lib/bpf/libbpf.h | 105 +++++++++++++ > tools/lib/bpf/libbpf.map | 10 ++ > tools/lib/bpf/libbpf_probes.c | 1 + > tools/lib/bpf/libbpf_version.h | 2 +- > tools/lib/bpf/ringbuf.c | 270 +++++++++++++++++++++++++++++++++ > 6 files changed, 395 insertions(+), 3 deletions(-) > Thanks for adding nice doc comments! See below for few minor nits, but ns_elapsed issue I think is pretty bad and should be fixed. > @@ -1011,6 +1011,7 @@ LIBBPF_API int bpf_tc_query(const struct bpf_tc_hook *hook, > > /* Ring buffer APIs */ > struct ring_buffer; > +struct user_ring_buffer; > > typedef int (*ring_buffer_sample_fn)(void *ctx, void *data, size_t size); > > @@ -1030,6 +1031,110 @@ LIBBPF_API int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms); > LIBBPF_API int ring_buffer__consume(struct ring_buffer *rb); > LIBBPF_API int ring_buffer__epoll_fd(const struct ring_buffer *rb); > > +struct user_ring_buffer_opts { > + size_t sz; /* size of this struct, for forward/backward compatibility */ > +}; > + > +#define user_ring_buffer_opts__last_field sz > + > +/* @brief **user_ring_buffer__new()** creates a new instance of a user ring > + * buffer. > + * > + * @param map_fd A file descriptor to a BPF_MAP_TYPE_RINGBUF map. typo: USER_RINGBUF > + * @param opts Options for how the ring buffer should be created. > + * @return A user ring buffer on success; NULL and errno being set on a > + * failure. > + */ > +LIBBPF_API struct user_ring_buffer * > +user_ring_buffer__new(int map_fd, const struct user_ring_buffer_opts *opts); > + > +/* @brief **user_ring_buffer__reserve()** reserves a pointer to a sample in the > + * user ring buffer. > + * @param rb A pointer to a user ring buffer. > + * @param size The size of the sample, in bytes. > + * @return A pointer to a reserved region of the user ring buffer; NULL, and > + * errno being set if a sample could not be reserved. > + * > + * This function is *not* thread safe, and callers must synchronize accessing > + * this function if there are multiple producers. If a size is requested that > + * is larger than the size of the entire ring buffer, errno will be set to > + * E2BIG and NULL is returned. If the ring buffer could accommodate the size, > + * but currently does not have enough space, errno is set to ENOSPC and NULL is > + * returned. we might want to mention that returned pointer is 8-byte aligned > + * > + * After initializing the sample, callers must invoke > + * **user_ring_buffer__submit()** to post the sample to the kernel. Otherwise, > + * the sample must be freed with **user_ring_buffer__discard()**. > + */ > +LIBBPF_API void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size); > + [...] > + err = user_ringbuf_map(rb, map_fd); > + if (err) > + goto err_out; > + > + return rb; > + > +err_out: > + user_ring_buffer__free(rb); > + return errno = -err, NULL; > +} > + > +static void user_ringbuf__commit(struct user_ring_buffer *rb, void *sample, bool discard) small nit if you are going to resubmit: we stopped using double underscore naming for internal static functions, so this should be called user_ringbuf_commit > +{ > + __u32 new_len; > + struct ringbuf_hdr *hdr; > + uintptr_t hdr_offset; > + [...] > +void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb, __u32 size, int timeout_ms) > +{ > + int err; > + struct timespec start; > + __u64 ns_per_ms = 1000000, ns_elapsed = 0, timeout_ns; > + > + if (timeout_ms < 0 && timeout_ms != -1) > + return errno = EINVAL, NULL; > + > + if (timeout_ms != -1) { > + err = clock_gettime(CLOCK_MONOTONIC, &start); > + if (err) > + return NULL; > + } > + > + timeout_ns = timeout_ms * ns_per_ms; > + do { > + __u64 ns_remaining = timeout_ns - ns_elapsed; > + int cnt, ms_remaining; > + void *sample; > + struct timespec curr; > + > + sample = user_ring_buffer__reserve(rb, size); > + if (sample) > + return sample; > + else if (errno != ENOSPC) > + return NULL; > + > + ms_remaining = timeout_ms == -1 ? -1 : ns_remaining / ns_per_ms; ok, so you've special-cased timeout_ms == -1 but still didn't do max(0, ns_remaining). Can you prove that ns_elapsed will never be bigger than timeout_ns due to various delays in waking up this thread? If not, let's please have max(0) otherwise we can accidentally epoll_wait(-1). > + /* The kernel guarantees at least one event notification > + * delivery whenever at least one sample is drained from the > + * ring buffer in an invocation to bpf_ringbuf_drain(). Other > + * additional events may be delivered at any time, but only one > + * event is guaranteed per bpf_ringbuf_drain() invocation, > + * provided that a sample is drained, and the BPF program did > + * not pass BPF_RB_NO_WAKEUP to bpf_ringbuf_drain(). > + */ > + cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, ms_remaining); > + if (cnt < 0) > + return NULL; > + > + if (timeout_ms == -1) > + continue; > + > + err = clock_gettime(CLOCK_MONOTONIC, &curr); > + if (err) > + return NULL; > + > + ns_elapsed = ns_elapsed_timespec(&start, &curr); nit: if you move re-calculation of ms_remaining and ns_remaining to here, I think overall loop logic will be even more straightforwad. You can initialize ms_remaining to -1 if timeout_ms < 0 and never recalculate it, right? Note that you can also do ns_elapsed conversion to ms right here and then keep everything else in ms (so no need for timeout_ns, ns_remaining, etc). > + } while (ns_elapsed <= timeout_ns); > + > + errno = ENOSPC; > + return NULL; > +} > -- > 2.37.1 >