Now that all of the logic is in place in the kernel to support user-space produced ringbuffers, we can add the user-space logic to libbpf. Signed-off-by: David Vernet <void@xxxxxxxxxxxxx> --- kernel/bpf/ringbuf.c | 7 +- tools/lib/bpf/libbpf.c | 10 +- tools/lib/bpf/libbpf.h | 19 +++ tools/lib/bpf/libbpf.map | 6 + tools/lib/bpf/libbpf_probes.c | 1 + tools/lib/bpf/ringbuf.c | 216 ++++++++++++++++++++++++++++++++++ 6 files changed, 256 insertions(+), 3 deletions(-) diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c index fc589fc8eb7c..a10558e79ec8 100644 --- a/kernel/bpf/ringbuf.c +++ b/kernel/bpf/ringbuf.c @@ -639,7 +639,9 @@ static int __bpf_user_ringbuf_poll(struct bpf_ringbuf *rb, void **sample, if (!atomic_try_cmpxchg(&rb->busy, &busy, 1)) return -EBUSY; - /* Synchronizes with smp_store_release() in user-space. */ + /* Synchronizes with smp_store_release() in __ring_buffer_user__commit() + * in user-space. + */ prod_pos = smp_load_acquire(&rb->producer_pos); /* Synchronizes with smp_store_release() in * __bpf_user_ringbuf_sample_release(). @@ -695,6 +697,9 @@ __bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size, /* To release the ringbuffer, just increment the producer position to * signal that a new sample can be consumed. The busy bit is cleared by * userspace when posting a new sample to the ringbuffer. + * + * Synchronizes with smp_load_acquire() in ring_buffer_user__reserve() + * in user-space. */ smp_store_release(&rb->consumer_pos, rb->consumer_pos + size + BPF_RINGBUF_HDR_SZ); diff --git a/tools/lib/bpf/libbpf.c b/tools/lib/bpf/libbpf.c index 9c1f2d09f44e..f7fe09dce643 100644 --- a/tools/lib/bpf/libbpf.c +++ b/tools/lib/bpf/libbpf.c @@ -2367,6 +2367,12 @@ static size_t adjust_ringbuf_sz(size_t sz) return sz; } +static bool map_is_ringbuf(const struct bpf_map *map) +{ + return map->def.type == BPF_MAP_TYPE_RINGBUF || + map->def.type == BPF_MAP_TYPE_USER_RINGBUF; +} + static void fill_map_from_def(struct bpf_map *map, const struct btf_map_def *def) { map->def.type = def->map_type; @@ -2381,7 +2387,7 @@ static void fill_map_from_def(struct bpf_map *map, const struct btf_map_def *def map->btf_value_type_id = def->value_type_id; /* auto-adjust BPF ringbuf map max_entries to be a multiple of page size */ - if (map->def.type == BPF_MAP_TYPE_RINGBUF) + if (map_is_ringbuf(map)) map->def.max_entries = adjust_ringbuf_sz(map->def.max_entries); if (def->parts & MAP_DEF_MAP_TYPE) @@ -4363,7 +4369,7 @@ int bpf_map__set_max_entries(struct bpf_map *map, __u32 max_entries) map->def.max_entries = max_entries; /* auto-adjust BPF ringbuf map max_entries to be a multiple of page size */ - if (map->def.type == BPF_MAP_TYPE_RINGBUF) + if (map_is_ringbuf(map)) map->def.max_entries = adjust_ringbuf_sz(map->def.max_entries); return 0; diff --git a/tools/lib/bpf/libbpf.h b/tools/lib/bpf/libbpf.h index 61493c4cddac..6d1d0539b08d 100644 --- a/tools/lib/bpf/libbpf.h +++ b/tools/lib/bpf/libbpf.h @@ -1009,6 +1009,7 @@ LIBBPF_API int bpf_tc_query(const struct bpf_tc_hook *hook, /* Ring buffer APIs */ struct ring_buffer; +struct ring_buffer_user; typedef int (*ring_buffer_sample_fn)(void *ctx, void *data, size_t size); @@ -1028,6 +1029,24 @@ LIBBPF_API int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms); LIBBPF_API int ring_buffer__consume(struct ring_buffer *rb); LIBBPF_API int ring_buffer__epoll_fd(const struct ring_buffer *rb); +struct ring_buffer_user_opts { + size_t sz; /* size of this struct, for forward/backward compatibility */ +}; + +#define ring_buffer_user_opts__last_field sz + +LIBBPF_API struct ring_buffer_user * +ring_buffer_user__new(int map_fd, const struct ring_buffer_user_opts *opts); +LIBBPF_API void *ring_buffer_user__reserve(struct ring_buffer_user *rb, + uint32_t size); +LIBBPF_API void *ring_buffer_user__poll(struct ring_buffer_user *rb, + uint32_t size, int timeout_ms); +LIBBPF_API void ring_buffer_user__submit(struct ring_buffer_user *rb, + void *sample); +LIBBPF_API void ring_buffer_user__discard(struct ring_buffer_user *rb, + void *sample); +LIBBPF_API void ring_buffer_user__free(struct ring_buffer_user *rb); + /* Perf buffer APIs */ struct perf_buffer; diff --git a/tools/lib/bpf/libbpf.map b/tools/lib/bpf/libbpf.map index 119e6e1ea7f1..8db11040df1b 100644 --- a/tools/lib/bpf/libbpf.map +++ b/tools/lib/bpf/libbpf.map @@ -365,4 +365,10 @@ LIBBPF_1.0.0 { libbpf_bpf_map_type_str; libbpf_bpf_prog_type_str; perf_buffer__buffer; + ring_buffer_user__discard; + ring_buffer_user__free; + ring_buffer_user__new; + ring_buffer_user__poll; + ring_buffer_user__reserve; + ring_buffer_user__submit; }; diff --git a/tools/lib/bpf/libbpf_probes.c b/tools/lib/bpf/libbpf_probes.c index 6d495656f554..f3a8e8e74eb8 100644 --- a/tools/lib/bpf/libbpf_probes.c +++ b/tools/lib/bpf/libbpf_probes.c @@ -231,6 +231,7 @@ static int probe_map_create(enum bpf_map_type map_type) return btf_fd; break; case BPF_MAP_TYPE_RINGBUF: + case BPF_MAP_TYPE_USER_RINGBUF: key_size = 0; value_size = 0; max_entries = 4096; diff --git a/tools/lib/bpf/ringbuf.c b/tools/lib/bpf/ringbuf.c index 8bc117bcc7bc..86e3c11d8486 100644 --- a/tools/lib/bpf/ringbuf.c +++ b/tools/lib/bpf/ringbuf.c @@ -39,6 +39,17 @@ struct ring_buffer { int ring_cnt; }; +struct ring_buffer_user { + struct epoll_event event; + unsigned long *consumer_pos; + unsigned long *producer_pos; + void *data; + unsigned long mask; + size_t page_size; + int map_fd; + int epoll_fd; +}; + static void ringbuf_unmap_ring(struct ring_buffer *rb, struct ring *r) { if (r->consumer_pos) { @@ -300,3 +311,208 @@ int ring_buffer__epoll_fd(const struct ring_buffer *rb) { return rb->epoll_fd; } + +static void __user_ringbuf_unmap_ring(struct ring_buffer_user *rb) +{ + if (rb->consumer_pos) { + munmap(rb->consumer_pos, rb->page_size); + rb->consumer_pos = NULL; + } + if (rb->producer_pos) { + munmap(rb->producer_pos, rb->page_size + 2 * (rb->mask + 1)); + rb->producer_pos = NULL; + } +} + +void ring_buffer_user__free(struct ring_buffer_user *rb) +{ + if (!rb) + return; + + __user_ringbuf_unmap_ring(rb); + + if (rb->epoll_fd >= 0) + close(rb->epoll_fd); + + free(rb); +} + +static int __ring_buffer_user_map(struct ring_buffer_user *rb, int map_fd) +{ + + struct bpf_map_info info; + __u32 len = sizeof(info); + void *tmp; + struct epoll_event *rb_epoll; + int err; + + memset(&info, 0, sizeof(info)); + + err = bpf_obj_get_info_by_fd(map_fd, &info, &len); + if (err) { + err = -errno; + pr_warn("user ringbuf: failed to get map info for fd=%d: %d\n", + map_fd, err); + return libbpf_err(err); + } + + if (info.type != BPF_MAP_TYPE_USER_RINGBUF) { + pr_warn("user ringbuf: map fd=%d is not BPF_MAP_TYPE_USER_RINGBUF\n", + map_fd); + return libbpf_err(-EINVAL); + } + + rb->map_fd = map_fd; + rb->mask = info.max_entries - 1; + + /* Map read-only consumer page */ + tmp = mmap(NULL, rb->page_size, PROT_READ, MAP_SHARED, map_fd, 0); + if (tmp == MAP_FAILED) { + err = -errno; + pr_warn("user ringbuf: failed to mmap consumer page for map fd=%d: %d\n", + map_fd, err); + return libbpf_err(err); + } + rb->consumer_pos = tmp; + + /* Map read-write the producer page and data pages. We map the data + * region as twice the total size of the ringbuffer to allow the simple + * reading and writing of samples that wrap around the end of the + * buffer. See the kernel implementation for details. + */ + tmp = mmap(NULL, rb->page_size + 2 * info.max_entries, + PROT_READ | PROT_WRITE, MAP_SHARED, map_fd, rb->page_size); + if (tmp == MAP_FAILED) { + err = -errno; + pr_warn("user ringbuf: failed to mmap data pages for map fd=%d: %d\n", + map_fd, err); + return libbpf_err(err); + } + + rb->producer_pos = tmp; + rb->data = tmp + rb->page_size; + + rb_epoll = &rb->event; + rb_epoll->events = EPOLLOUT; + if (epoll_ctl(rb->epoll_fd, EPOLL_CTL_ADD, map_fd, rb_epoll) < 0) { + err = -errno; + pr_warn("user ringbuf: failed to epoll add map fd=%d: %d\n", + map_fd, err); + return libbpf_err(err); + } + + return 0; +} + +struct ring_buffer_user * +ring_buffer_user__new(int map_fd, const struct ring_buffer_user_opts *opts) +{ + struct ring_buffer_user *rb; + int err; + + if (!OPTS_VALID(opts, ring_buffer_opts)) + return errno = EINVAL, NULL; + + rb = calloc(1, sizeof(*rb)); + if (!rb) + return errno = ENOMEM, NULL; + + rb->page_size = getpagesize(); + + rb->epoll_fd = epoll_create1(EPOLL_CLOEXEC); + if (rb->epoll_fd < 0) { + err = -errno; + pr_warn("user ringbuf: failed to create epoll instance: %d\n", + err); + goto err_out; + } + + err = __ring_buffer_user_map(rb, map_fd); + if (err) + goto err_out; + + return rb; + +err_out: + ring_buffer_user__free(rb); + return errno = -err, NULL; +} + +static void __ring_buffer_user__commit(struct ring_buffer_user *rb) +{ + uint32_t *hdr; + uint32_t total_len; + unsigned long prod_pos; + + prod_pos = *rb->producer_pos; + hdr = rb->data + (prod_pos & rb->mask); + + total_len = *hdr + BPF_RINGBUF_HDR_SZ; + + /* Synchronizes with smp_load_acquire() in __bpf_user_ringbuf_poll() in + * the kernel. + */ + smp_store_release(rb->producer_pos, prod_pos + total_len); +} + +/* Discard a previously reserved sample into the ring buffer. Because the user + * ringbuffer is assumed to be single producer, this can simply be a no-op, and + * the producer pointer is left in the same place as when it was reserved. + */ +void ring_buffer_user__discard(struct ring_buffer_user *rb, void *sample) +{} + +/* Submit a previously reserved sample into the ring buffer. + */ +void ring_buffer_user__submit(struct ring_buffer_user *rb, void *sample) +{ + __ring_buffer_user__commit(rb); +} + +/* Reserve a pointer to a sample in the user ring buffer. This function is *not* + * thread safe, and the ring-buffer supports only a single producer. + */ +void *ring_buffer_user__reserve(struct ring_buffer_user *rb, uint32_t size) +{ + uint32_t *hdr; + /* 64-bit to avoid overflow in case of extreme application behavior */ + size_t avail_size, total_size, max_size; + unsigned long cons_pos, prod_pos; + + /* Synchronizes with smp_store_release() in __bpf_user_ringbuf_poll() in + * the kernel. + */ + cons_pos = smp_load_acquire(rb->consumer_pos); + /* Synchronizes with smp_store_release() in __ring_buffer_user__commit() + */ + prod_pos = smp_load_acquire(rb->producer_pos); + + max_size = rb->mask + 1; + avail_size = max_size - (prod_pos - cons_pos); + total_size = size + BPF_RINGBUF_HDR_SZ; + + if (total_size > max_size || avail_size < total_size) + return NULL; + + hdr = rb->data + (prod_pos & rb->mask); + *hdr = size; + + /* Producer pos is updated when a sample is submitted. */ + + return (void *)rb->data + ((prod_pos + BPF_RINGBUF_HDR_SZ) & rb->mask); +} + +/* Poll for available space in the ringbuffer, and reserve a record when it + * becomes available. + */ +void *ring_buffer_user__poll(struct ring_buffer_user *rb, uint32_t size, + int timeout_ms) +{ + int cnt; + + cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, timeout_ms); + if (cnt < 0) + return NULL; + + return ring_buffer_user__reserve(rb, size); +} -- 2.30.2