On Fri, Sep 09, 2022 at 04:59:27PM -0700, Andrii Nakryiko wrote: > > @@ -1011,6 +1011,7 @@ LIBBPF_API int bpf_tc_query(const struct bpf_tc_hook *hook, > > > > /* Ring buffer APIs */ > > struct ring_buffer; > > +struct user_ring_buffer; > > > > typedef int (*ring_buffer_sample_fn)(void *ctx, void *data, size_t size); > > > > @@ -1030,6 +1031,110 @@ LIBBPF_API int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms); > > LIBBPF_API int ring_buffer__consume(struct ring_buffer *rb); > > LIBBPF_API int ring_buffer__epoll_fd(const struct ring_buffer *rb); > > > > +struct user_ring_buffer_opts { > > + size_t sz; /* size of this struct, for forward/backward compatibility */ > > +}; > > + > > +#define user_ring_buffer_opts__last_field sz > > + > > +/* @brief **user_ring_buffer__new()** creates a new instance of a user ring > > + * buffer. > > + * > > + * @param map_fd A file descriptor to a BPF_MAP_TYPE_RINGBUF map. > > typo: USER_RINGBUF Good catch. > > + * @param opts Options for how the ring buffer should be created. > > + * @return A user ring buffer on success; NULL and errno being set on a > > + * failure. > > + */ > > +LIBBPF_API struct user_ring_buffer * > > +user_ring_buffer__new(int map_fd, const struct user_ring_buffer_opts *opts); > > + > > +/* @brief **user_ring_buffer__reserve()** reserves a pointer to a sample in the > > + * user ring buffer. > > + * @param rb A pointer to a user ring buffer. > > + * @param size The size of the sample, in bytes. > > + * @return A pointer to a reserved region of the user ring buffer; NULL, and > > + * errno being set if a sample could not be reserved. > > + * > > + * This function is *not* thread safe, and callers must synchronize accessing > > + * this function if there are multiple producers. If a size is requested that > > + * is larger than the size of the entire ring buffer, errno will be set to > > + * E2BIG and NULL is returned. If the ring buffer could accommodate the size, > > + * but currently does not have enough space, errno is set to ENOSPC and NULL is > > + * returned. > > we might want to mention that returned pointer is 8-byte aligned Will do. > > + * > > + * After initializing the sample, callers must invoke > > + * **user_ring_buffer__submit()** to post the sample to the kernel. Otherwise, > > + * the sample must be freed with **user_ring_buffer__discard()**. > > + */ > > +LIBBPF_API void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size); > > + > > [...] > > > + err = user_ringbuf_map(rb, map_fd); > > + if (err) > > + goto err_out; > > + > > + return rb; > > + > > +err_out: > > + user_ring_buffer__free(rb); > > + return errno = -err, NULL; > > +} > > + > > +static void user_ringbuf__commit(struct user_ring_buffer *rb, void *sample, bool discard) > > small nit if you are going to resubmit: we stopped using double > underscore naming for internal static functions, so this should be > called user_ringbuf_commit Will do. > > +{ > > + __u32 new_len; > > + struct ringbuf_hdr *hdr; > > + uintptr_t hdr_offset; > > + > > [...] > > > +void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb, __u32 size, int timeout_ms) > > +{ > > + int err; > > + struct timespec start; > > + __u64 ns_per_ms = 1000000, ns_elapsed = 0, timeout_ns; > > + > > + if (timeout_ms < 0 && timeout_ms != -1) > > + return errno = EINVAL, NULL; > > + > > + if (timeout_ms != -1) { > > + err = clock_gettime(CLOCK_MONOTONIC, &start); > > + if (err) > > + return NULL; > > + } > > + > > + timeout_ns = timeout_ms * ns_per_ms; > > + do { > > + __u64 ns_remaining = timeout_ns - ns_elapsed; > > + int cnt, ms_remaining; > > + void *sample; > > + struct timespec curr; > > + > > + sample = user_ring_buffer__reserve(rb, size); > > + if (sample) > > + return sample; > > + else if (errno != ENOSPC) > > + return NULL; > > + > > + ms_remaining = timeout_ms == -1 ? -1 : ns_remaining / ns_per_ms; > > ok, so you've special-cased timeout_ms == -1 but still didn't do > max(0, ns_remaining). Can you prove that ns_elapsed will never be > bigger than timeout_ns due to various delays in waking up this thread? > If not, let's please have max(0) otherwise we can accidentally > epoll_wait(-1). Yes you're right, this was an oversight. Thanks for catching this! > > + /* The kernel guarantees at least one event notification > > + * delivery whenever at least one sample is drained from the > > + * ring buffer in an invocation to bpf_ringbuf_drain(). Other > > + * additional events may be delivered at any time, but only one > > + * event is guaranteed per bpf_ringbuf_drain() invocation, > > + * provided that a sample is drained, and the BPF program did > > + * not pass BPF_RB_NO_WAKEUP to bpf_ringbuf_drain(). > > + */ > > + cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, ms_remaining); > > + if (cnt < 0) > > + return NULL; > > + > > + if (timeout_ms == -1) > > + continue; > > + > > + err = clock_gettime(CLOCK_MONOTONIC, &curr); > > + if (err) > > + return NULL; > > + > > + ns_elapsed = ns_elapsed_timespec(&start, &curr); > > nit: if you move re-calculation of ms_remaining and ns_remaining to > here, I think overall loop logic will be even more straightforwad. You > can initialize ms_remaining to -1 if timeout_ms < 0 and never > recalculate it, right? Note that you can also do ns_elapsed conversion > to ms right here and then keep everything else in ms (so no need for > timeout_ns, ns_remaining, etc). Sounds good, let me give this a shot in v6. Thanks for another detailed review!