On Wed, Aug 24, 2022 at 02:58:31PM -0700, Andrii Nakryiko wrote: [...] > > +LIBBPF_API struct user_ring_buffer * > > +user_ring_buffer__new(int map_fd, const struct user_ring_buffer_opts *opts); > > +LIBBPF_API void *user_ring_buffer__reserve(struct user_ring_buffer *rb, > > + __u32 size); > > + > > +LIBBPF_API void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb, > > + __u32 size, > > + int timeout_ms); > > +LIBBPF_API void user_ring_buffer__submit(struct user_ring_buffer *rb, > > + void *sample); > > +LIBBPF_API void user_ring_buffer__discard(struct user_ring_buffer *rb, > > + void *sample); > > +LIBBPF_API void user_ring_buffer__free(struct user_ring_buffer *rb); > > + > > Let's make sure that all the relevant comments and description of > inputs/outputs/errors are documented here. These doccomments go to > https://libbpf.readthedocs.io/en/latest/api.html No problem, I'll add these to the docs. > also, please make sure that declarations that fit within 100 > characters stay on single line, it's much more readable that way My mistake, will fix. > > /* Perf buffer APIs */ > > struct perf_buffer; > > > > diff --git a/tools/lib/bpf/libbpf.map b/tools/lib/bpf/libbpf.map > > index 2b928dc21af0..40c83563f90a 100644 > > --- a/tools/lib/bpf/libbpf.map > > +++ b/tools/lib/bpf/libbpf.map > > @@ -367,4 +367,10 @@ LIBBPF_1.0.0 { > > now that 1.0 is released, this will have to go into a new LIBBPF_1.1.0 > section (which inherits from LIBBPF_1.0.0) Sounds good, I'll do that in v4. [...] > > + /* Map read-write the producer page and data pages. We map the data > > + * region as twice the total size of the ringbuffer to allow the simple > > + * reading and writing of samples that wrap around the end of the > > + * buffer. See the kernel implementation for details. > > + */ > > + tmp = mmap(NULL, rb->page_size + 2 * info.max_entries, > > + PROT_READ | PROT_WRITE, MAP_SHARED, map_fd, rb->page_size); > > + if (tmp == MAP_FAILED) { > > + err = -errno; > > + pr_warn("user ringbuf: failed to mmap data pages for map fd=%d: %d\n", > > + map_fd, err); > > + return libbpf_err(err); > > + } > > + > > + rb->producer_pos = tmp; > > + rb->data = tmp + rb->page_size; > > + > > + rb_epoll = &rb->event; > > + rb_epoll->events = EPOLLOUT; > > + if (epoll_ctl(rb->epoll_fd, EPOLL_CTL_ADD, map_fd, rb_epoll) < 0) { > > + err = -errno; > > + pr_warn("user ringbuf: failed to epoll add map fd=%d: %d\n", map_fd, err); > > + return libbpf_err(err); > > this is internal helper function, so there is no need to use > libbpf_err() helpers, just return errors directly. Only user-facing > functions should make sure to set both errno and return error Will fix. > > + } > > + > > + return 0; > > +} > > + > > +struct user_ring_buffer * > > +user_ring_buffer__new(int map_fd, const struct user_ring_buffer_opts *opts) > > +{ > > + struct user_ring_buffer *rb; > > + int err; > > + > > + if (!OPTS_VALID(opts, ring_buffer_opts)) > > user_ring_buffer_opts Good catch, will fix. > > +static void user_ringbuf__commit(struct user_ring_buffer *rb, void *sample, bool discard) > > +{ > > + __u32 new_len; > > + struct ringbuf_hdr *hdr; > > + > > + /* All samples are aligned to 8 bytes, so the header will only ever > > + * wrap around the back of the ringbuffer if the sample is at the > > + * very beginning of the ringbuffer. > > + */ > > + if (sample == rb->data) > > + hdr = rb->data + (rb->mask - BPF_RINGBUF_HDR_SZ + 1); > > + else > > + hdr = sample - BPF_RINGBUF_HDR_SZ; > > let's avoid extra if in a hot path? > > hdr = rb->data + (rb->mask + 1 + (sample - rb->data) - > BPF_RINGBUF_HDR_SZ) & rb->mask; Nice idea, will do. > > + > > + new_len = hdr->len & ~BPF_RINGBUF_BUSY_BIT; > > + if (discard) > > + new_len |= BPF_RINGBUF_DISCARD_BIT; > > + > > + /* Synchronizes with smp_load_acquire() in __bpf_user_ringbuf_peek() in > > + * the kernel. > > + */ > > + __atomic_exchange_n(&hdr->len, new_len, __ATOMIC_ACQ_REL); > > +} > > + > > +/* Discard a previously reserved sample into the ring buffer. It is not > > + * necessary to synchronize amongst multiple producers when invoking this > > + * function. > > + */ > > +void user_ring_buffer__discard(struct user_ring_buffer *rb, void *sample) > > +{ > > + user_ringbuf__commit(rb, sample, true); > > +} > > + > > +/* Submit a previously reserved sample into the ring buffer. It is not > > + * necessary to synchronize amongst multiple producers when invoking this > > + * function. > > + */ > > +void user_ring_buffer__submit(struct user_ring_buffer *rb, void *sample) > > +{ > > + user_ringbuf__commit(rb, sample, false); > > +} > > + > > +/* Reserve a pointer to a sample in the user ring buffer. This function is > > + * *not* thread safe, and callers must synchronize accessing this function if > > + * there are multiple producers. > > + * > > + * If a size is requested that is larger than the size of the entire > > + * ringbuffer, errno is set to E2BIG and NULL is returned. If the ringbuffer > > + * could accommodate the size, but currently does not have enough space, errno > > + * is set to ENODATA and NULL is returned. > > ENOSPC seems more appropriate for such a situation? For the latter? Hmmm, yeah I suppose I agree, I'll make that adjustment. > > + * > > + * Otherwise, a pointer to the sample is returned. After initializing the > > + * sample, callers must invoke user_ring_buffer__submit() to post the sample to > > + * the kernel. Otherwise, the sample must be freed with > > + * user_ring_buffer__discard(). > > + */ > > usual complaints about "ringbuffer", feels like a typo No problem, I'll fix that here and in the rest of the patch-set. > > +void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size) > > +{ > > + __u32 avail_size, total_size, max_size; > > + /* 64-bit to avoid overflow in case of extreme application behavior */ > > + __u64 cons_pos, prod_pos; > > + struct ringbuf_hdr *hdr; > > + > > + /* Synchronizes with smp_store_release() in __bpf_user_ringbuf_peek() in > > + * the kernel. > > + */ > > + cons_pos = smp_load_acquire(rb->consumer_pos); > > + /* Synchronizes with smp_store_release() in user_ringbuf__commit() */ > > + prod_pos = smp_load_acquire(rb->producer_pos); > > + > > + /* Round up size to a multiple of 8. */ > > + size = (size + 7) / 8 * 8; > > + max_size = rb->mask + 1; > > + avail_size = max_size - (prod_pos - cons_pos); > > + total_size = size + BPF_RINGBUF_HDR_SZ; > > + > > + if (total_size > max_size) > > + return errno = E2BIG, NULL; > > + > > + if (avail_size < total_size) > > + return errno = ENODATA, NULL; > > + > > + hdr = rb->data + (prod_pos & rb->mask); > > + hdr->len = size | BPF_RINGBUF_BUSY_BIT; > > so I double-checked what kernel ringbuf is doing with size. We still > record exact user-requested size in header, but all the logic knows > that it has to be rounded up to closest 8. I think that's best > behavior because it preserves user-supplied information exactly. So if > I wanted to reserve and communicate 4 byte sample to my producers, > they should see that there are 4 bytes of data available, not 8. So > let's do the same here? No problem, I'll do this in v4. > We still should validate that all the positions are multiples of 8, of > course, as you do in this revision. Ack. > > + hdr->pad = 0; > > + > > + /* Synchronizes with smp_load_acquire() in __bpf_user_ringbuf_peek() in > > + * the kernel. > > + */ > > + smp_store_release(rb->producer_pos, prod_pos + total_size); > > + > > + return (void *)rb->data + ((prod_pos + BPF_RINGBUF_HDR_SZ) & rb->mask); > > +} > > + > > +static int ms_elapsed_timespec(const struct timespec *start, const struct timespec *end) > > +{ > > + int total, ns_per_ms = 1000000, ns_per_s = ns_per_ms * 1000; > > + > > + if (end->tv_sec > start->tv_sec) { > > + total = 1000 * (end->tv_sec - start->tv_sec); > > + total += (end->tv_nsec + (ns_per_s - start->tv_nsec)) / ns_per_ms; > > + } else { > > + total = (end->tv_nsec - start->tv_nsec) / ns_per_ms; > > + } > > + > > hm... this seems overengineered, tbh > > u64 start_ns = (u64)start->tv_sec * 1000000000 + start->tv_nsec; > u64 end_ns = (u64)end->tv_sec * 1000000000 + start->tv_nsec; > > return (end_ns - start_ns) / 1000000; > > ? Yeah, this is much simpler. Thanks for the suggestion. > > + return total; > > +} > > + > > +/* Reserve a record in the ringbuffer, possibly blocking for up to @timeout_ms > > + * until a sample becomes available. This function is *not* thread safe, and > > + * callers must synchronize accessing this function if there are multiple > > + * producers. > > + * > > + * If @timeout_ms is -1, the function will block indefinitely until a sample > > + * becomes available. Otherwise, @timeout_ms must be non-negative, or errno > > + * will be set to EINVAL, and NULL will be returned. If @timeout_ms is 0, > > + * no blocking will occur and the function will return immediately after > > + * attempting to reserve a sample. > > + * > > + * If @size is larger than the size of the entire ringbuffer, errno is set to > > + * E2BIG and NULL is returned. If the ringbuffer could accommodate @size, but > > + * currently does not have enough space, the caller will block until at most > > + * @timeout_ms has elapsed. If insufficient space is available at that time, > > + * errno will be set to ENODATA, and NULL will be returned. > > ENOSPC? Ack. > > + * > > + * The kernel guarantees that it will wake up this thread to check if > > + * sufficient space is available in the ringbuffer at least once per invocation > > + * of the bpf_ringbuf_drain() helper function, provided that at least one > > + * sample is consumed, and the BPF program did not invoke the function with > > + * BPF_RB_NO_WAKEUP. A wakeup may occur sooner than that, but the kernel does > > + * not guarantee this. > > + * > > + * When a sample of size @size is found within @timeout_ms, a pointer to the > > + * sample is returned. After initializing the sample, callers must invoke > > + * user_ring_buffer__submit() to post the sample to the ringbuffer. Otherwise, > > + * the sample must be freed with user_ring_buffer__discard(). > > + */ > > so comments like this should go into doccomments for this functions in libbpf.h Ack. > > +void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb, __u32 size, int timeout_ms) > > +{ > > + int ms_elapsed = 0, err; > > + struct timespec start; > > + > > + if (timeout_ms < 0 && timeout_ms != -1) > > + return errno = EINVAL, NULL; > > + > > + if (timeout_ms != -1) { > > + err = clock_gettime(CLOCK_MONOTONIC, &start); > > + if (err) > > + return NULL; > > + } > > + > > + do { > > + int cnt, ms_remaining = timeout_ms - ms_elapsed; > > let's max(0, timeout_ms - ms_elapsed) to avoid negative ms_remaining > in some edge timing cases We actually want to have a negative ms_remaining if timeout_ms is -1. -1 in epoll_wait() specifies an infinite timeout. If we were to round up to 0, it wouldn't block at all. > > + void *sample; > > + struct timespec curr; > > + > > + sample = user_ring_buffer__reserve(rb, size); > > + if (sample) > > + return sample; > > + else if (errno != ENODATA) > > + return NULL; > > + > > + /* The kernel guarantees at least one event notification > > + * delivery whenever at least one sample is drained from the > > + * ringbuffer in an invocation to bpf_ringbuf_drain(). Other > > + * additional events may be delivered at any time, but only one > > + * event is guaranteed per bpf_ringbuf_drain() invocation, > > + * provided that a sample is drained, and the BPF program did > > + * not pass BPF_RB_NO_WAKEUP to bpf_ringbuf_drain(). > > + */ > > + cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, ms_remaining); > > + if (cnt < 0) > > + return NULL; > > + > > + if (timeout_ms == -1) > > + continue; > > + > > + err = clock_gettime(CLOCK_MONOTONIC, &curr); > > + if (err) > > + return NULL; > > + > > + ms_elapsed = ms_elapsed_timespec(&start, &curr); > > + } while (ms_elapsed <= timeout_ms); > > let's simplify all the time keeping to use nanosecond timestamps and > only convert to ms when calling epoll_wait()? Then you can just have a > tiny helper to convert timespec to nanosecond ts ((u64)ts.tv_sec * > 1000000000 + ts.tv_nsec) and compare u64s directly. WDYT? Sounds like an improvement to me! Thanks, David