Re: [PATCH v5 3/4] bpf: Add libbpf logic for user-space ring buffer

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Fri, Sep 2, 2022 at 4:43 PM David Vernet <void@xxxxxxxxxxxxx> wrote:
>
> Now that all of the logic is in place in the kernel to support user-space
> produced ring buffers, we can add the user-space logic to libbpf. This
> patch therefore adds the following public symbols to libbpf:
>
> struct user_ring_buffer *
> user_ring_buffer__new(int map_fd,
>                       const struct user_ring_buffer_opts *opts);
> void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size);
> void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb,
>                                          __u32 size, int timeout_ms);
> void user_ring_buffer__submit(struct user_ring_buffer *rb, void *sample);
> void user_ring_buffer__discard(struct user_ring_buffer *rb,
> void user_ring_buffer__free(struct user_ring_buffer *rb);
>
> A user-space producer must first create a struct user_ring_buffer * object
> with user_ring_buffer__new(), and can then reserve samples in the
> ring buffer using one of the following two symbols:
>
> void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size);
> void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb,
>                                          __u32 size, int timeout_ms);
>
> With user_ring_buffer__reserve(), a pointer to a 'size' region of the ring
> buffer will be returned if sufficient space is available in the buffer.
> user_ring_buffer__reserve_blocking() provides similar semantics, but will
> block for up to 'timeout_ms' in epoll_wait if there is insufficient space
> in the buffer. This function has the guarantee from the kernel that it will
> receive at least one event-notification per invocation to
> bpf_ringbuf_drain(), provided that at least one sample is drained, and the
> BPF program did not pass the BPF_RB_NO_WAKEUP flag to bpf_ringbuf_drain().
>
> Once a sample is reserved, it must either be committed to the ring buffer
> with user_ring_buffer__submit(), or discarded with
> user_ring_buffer__discard().
>
> Signed-off-by: David Vernet <void@xxxxxxxxxxxxx>
> ---
>  tools/lib/bpf/libbpf.c         |  10 +-
>  tools/lib/bpf/libbpf.h         | 105 +++++++++++++
>  tools/lib/bpf/libbpf.map       |  10 ++
>  tools/lib/bpf/libbpf_probes.c  |   1 +
>  tools/lib/bpf/libbpf_version.h |   2 +-
>  tools/lib/bpf/ringbuf.c        | 270 +++++++++++++++++++++++++++++++++
>  6 files changed, 395 insertions(+), 3 deletions(-)
>

Thanks for adding nice doc comments! See below for few minor nits, but
ns_elapsed issue I think is pretty bad and should be fixed.

> @@ -1011,6 +1011,7 @@ LIBBPF_API int bpf_tc_query(const struct bpf_tc_hook *hook,
>
>  /* Ring buffer APIs */
>  struct ring_buffer;
> +struct user_ring_buffer;
>
>  typedef int (*ring_buffer_sample_fn)(void *ctx, void *data, size_t size);
>
> @@ -1030,6 +1031,110 @@ LIBBPF_API int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms);
>  LIBBPF_API int ring_buffer__consume(struct ring_buffer *rb);
>  LIBBPF_API int ring_buffer__epoll_fd(const struct ring_buffer *rb);
>
> +struct user_ring_buffer_opts {
> +       size_t sz; /* size of this struct, for forward/backward compatibility */
> +};
> +
> +#define user_ring_buffer_opts__last_field sz
> +
> +/* @brief **user_ring_buffer__new()** creates a new instance of a user ring
> + * buffer.
> + *
> + * @param map_fd A file descriptor to a BPF_MAP_TYPE_RINGBUF map.

typo: USER_RINGBUF

> + * @param opts Options for how the ring buffer should be created.
> + * @return A user ring buffer on success; NULL and errno being set on a
> + * failure.
> + */
> +LIBBPF_API struct user_ring_buffer *
> +user_ring_buffer__new(int map_fd, const struct user_ring_buffer_opts *opts);
> +
> +/* @brief **user_ring_buffer__reserve()** reserves a pointer to a sample in the
> + * user ring buffer.
> + * @param rb A pointer to a user ring buffer.
> + * @param size The size of the sample, in bytes.
> + * @return A pointer to a reserved region of the user ring buffer; NULL, and
> + * errno being set if a sample could not be reserved.
> + *
> + * This function is *not* thread safe, and callers must synchronize accessing
> + * this function if there are multiple producers.  If a size is requested that
> + * is larger than the size of the entire ring buffer, errno will be set to
> + * E2BIG and NULL is returned. If the ring buffer could accommodate the size,
> + * but currently does not have enough space, errno is set to ENOSPC and NULL is
> + * returned.

we might want to mention that returned pointer is 8-byte aligned

> + *
> + * After initializing the sample, callers must invoke
> + * **user_ring_buffer__submit()** to post the sample to the kernel. Otherwise,
> + * the sample must be freed with **user_ring_buffer__discard()**.
> + */
> +LIBBPF_API void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size);
> +

[...]

> +       err = user_ringbuf_map(rb, map_fd);
> +       if (err)
> +               goto err_out;
> +
> +       return rb;
> +
> +err_out:
> +       user_ring_buffer__free(rb);
> +       return errno = -err, NULL;
> +}
> +
> +static void user_ringbuf__commit(struct user_ring_buffer *rb, void *sample, bool discard)

small nit if you are going to resubmit: we stopped using double
underscore naming for internal static functions, so this should be
called user_ringbuf_commit

> +{
> +       __u32 new_len;
> +       struct ringbuf_hdr *hdr;
> +       uintptr_t hdr_offset;
> +

[...]

> +void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb, __u32 size, int timeout_ms)
> +{
> +       int err;
> +       struct timespec start;
> +       __u64 ns_per_ms = 1000000, ns_elapsed = 0, timeout_ns;
> +
> +       if (timeout_ms < 0 && timeout_ms != -1)
> +               return errno = EINVAL, NULL;
> +
> +       if (timeout_ms != -1) {
> +               err = clock_gettime(CLOCK_MONOTONIC, &start);
> +               if (err)
> +                       return NULL;
> +       }
> +
> +       timeout_ns = timeout_ms * ns_per_ms;
> +       do {
> +               __u64 ns_remaining = timeout_ns - ns_elapsed;
> +               int cnt, ms_remaining;
> +               void *sample;
> +               struct timespec curr;
> +
> +               sample = user_ring_buffer__reserve(rb, size);
> +               if (sample)
> +                       return sample;
> +               else if (errno != ENOSPC)
> +                       return NULL;
> +
> +               ms_remaining = timeout_ms == -1 ? -1 : ns_remaining / ns_per_ms;

ok, so you've special-cased timeout_ms == -1 but still didn't do
max(0, ns_remaining). Can you prove that ns_elapsed will never be
bigger than timeout_ns due to various delays in waking up this thread?
If not, let's please have max(0) otherwise we can accidentally
epoll_wait(-1).

> +               /* The kernel guarantees at least one event notification
> +                * delivery whenever at least one sample is drained from the
> +                * ring buffer in an invocation to bpf_ringbuf_drain(). Other
> +                * additional events may be delivered at any time, but only one
> +                * event is guaranteed per bpf_ringbuf_drain() invocation,
> +                * provided that a sample is drained, and the BPF program did
> +                * not pass BPF_RB_NO_WAKEUP to bpf_ringbuf_drain().
> +                */
> +               cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, ms_remaining);
> +               if (cnt < 0)
> +                       return NULL;
> +
> +               if (timeout_ms == -1)
> +                       continue;
> +
> +               err = clock_gettime(CLOCK_MONOTONIC, &curr);
> +               if (err)
> +                       return NULL;
> +
> +               ns_elapsed = ns_elapsed_timespec(&start, &curr);

nit: if you move re-calculation of ms_remaining and ns_remaining to
here, I think overall loop logic will be even more straightforwad. You
can initialize ms_remaining to -1 if timeout_ms < 0 and never
recalculate it, right? Note that you can also do ns_elapsed conversion
to ms right here and then keep everything else in ms (so no need for
timeout_ns, ns_remaining, etc).

> +       } while (ns_elapsed <= timeout_ns);
> +
> +       errno = ENOSPC;
> +       return NULL;
> +}
> --
> 2.37.1
>



[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux