[PATCH bpf-next 4/6] libbpf: add BPF ring buffer support

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Declaring and instantiating BPF ring buffer doesn't require any changes to
libbpf, as it's just another type of maps. So using existing BTF-defined maps
syntax with __uint(type, BPF_MAP_TYPE_RINGBUF) and __uint(max_elemenetns,
<size-of-ring-buf>) is all that's necessary to create and use BPF ring buffer.

This patch adds BPF ring buffer consumer to libbpf, similar to perf_buffer
implementation for perf ring buffer, but also attempts to fix some minor
problems and inconveniences with existing perf_buffer API.

ring_buffer support both single ring buffer use case (with just using
ring_buffer__new()), as well as allows to add more ring buffers, each with its
own callback and context. This allows to efficiently poll and consume
multiple, and potentially completely independent, ring buffers, using single
epoll instance.

The latter is actually a problem in practice for applications
that are using multiple sets of perf buffers. They have to create multiple
instances for struct perf_buffer and poll them independently or in a loop,
each approach having its own problems (e.g., inability to use a common poll
timeout). struct ring_buffer eliminates this problem by aggregating many
independent ring buffer instances under the single "ring buffer manager".

Second, perf_buffer's callback can't return error, so applications that need
to stop polling due to error in data or data signalling the end, have to use
extra mechanisms to signal that polling has to stop. ring_buffer's callback
can return error, which will be passed through back to user code and can be
acted upon appropariately.

Signed-off-by: Andrii Nakryiko <andriin@xxxxxx>
---
 tools/lib/bpf/Build           |   2 +-
 tools/lib/bpf/libbpf.h        |  20 +++
 tools/lib/bpf/libbpf.map      |   4 +
 tools/lib/bpf/libbpf_probes.c |   5 +
 tools/lib/bpf/ringbuf.c       | 264 ++++++++++++++++++++++++++++++++++
 5 files changed, 294 insertions(+), 1 deletion(-)
 create mode 100644 tools/lib/bpf/ringbuf.c

diff --git a/tools/lib/bpf/Build b/tools/lib/bpf/Build
index e3962cfbc9a6..190366d05588 100644
--- a/tools/lib/bpf/Build
+++ b/tools/lib/bpf/Build
@@ -1,3 +1,3 @@
 libbpf-y := libbpf.o bpf.o nlattr.o btf.o libbpf_errno.o str_error.o \
 	    netlink.o bpf_prog_linfo.o libbpf_probes.o xsk.o hashmap.o \
-	    btf_dump.o
+	    btf_dump.o ringbuf.o
diff --git a/tools/lib/bpf/libbpf.h b/tools/lib/bpf/libbpf.h
index 8ea69558f0a8..037b764ca85f 100644
--- a/tools/lib/bpf/libbpf.h
+++ b/tools/lib/bpf/libbpf.h
@@ -478,6 +478,26 @@ LIBBPF_API int bpf_get_link_xdp_id(int ifindex, __u32 *prog_id, __u32 flags);
 LIBBPF_API int bpf_get_link_xdp_info(int ifindex, struct xdp_link_info *info,
 				     size_t info_size, __u32 flags);
 
+/* Ring buffer APIs */
+struct ring_buffer;
+
+typedef int (*ring_buffer_sample_fn)(void *ctx, void *data, size_t size);
+
+struct ring_buffer_opts {
+	size_t sz; /* size of this struct, for forward/backward compatiblity */
+};
+
+#define ring_buffer_opts__last_field sz
+
+LIBBPF_API struct ring_buffer *
+ring_buffer__new(int map_fd, ring_buffer_sample_fn sample_cb, void *ctx,
+		 const struct ring_buffer_opts *opts);
+LIBBPF_API void ring_buffer__free(struct ring_buffer *rb);
+LIBBPF_API int ring_buffer__add(struct ring_buffer *rb, int map_fd,
+				ring_buffer_sample_fn sample_cb, void *ctx);
+LIBBPF_API int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms);
+
+/* Perf buffer APIs */
 struct perf_buffer;
 
 typedef void (*perf_buffer_sample_fn)(void *ctx, int cpu,
diff --git a/tools/lib/bpf/libbpf.map b/tools/lib/bpf/libbpf.map
index 0133d469d30b..0f32a9ba5bb5 100644
--- a/tools/lib/bpf/libbpf.map
+++ b/tools/lib/bpf/libbpf.map
@@ -262,4 +262,8 @@ LIBBPF_0.0.9 {
 		bpf_link_get_fd_by_id;
 		bpf_link_get_next_id;
 		bpf_program__attach_iter;
+		ring_buffer__add;
+		ring_buffer__free;
+		ring_buffer__new;
+		ring_buffer__poll;
 } LIBBPF_0.0.8;
diff --git a/tools/lib/bpf/libbpf_probes.c b/tools/lib/bpf/libbpf_probes.c
index 2c92059c0c90..10cd8d1891f5 100644
--- a/tools/lib/bpf/libbpf_probes.c
+++ b/tools/lib/bpf/libbpf_probes.c
@@ -238,6 +238,11 @@ bool bpf_probe_map_type(enum bpf_map_type map_type, __u32 ifindex)
 		if (btf_fd < 0)
 			return false;
 		break;
+	case BPF_MAP_TYPE_RINGBUF:
+		key_size = 0;
+		value_size = 0;
+		max_entries = 4096;
+		break;
 	case BPF_MAP_TYPE_UNSPEC:
 	case BPF_MAP_TYPE_HASH:
 	case BPF_MAP_TYPE_ARRAY:
diff --git a/tools/lib/bpf/ringbuf.c b/tools/lib/bpf/ringbuf.c
new file mode 100644
index 000000000000..b55eef8317b2
--- /dev/null
+++ b/tools/lib/bpf/ringbuf.c
@@ -0,0 +1,264 @@
+// SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause)
+/*
+ * Ring buffer operations.
+ *
+ * Copyright (C) 2020 Facebook, Inc.
+ */
+#include <stdlib.h>
+#include <stdio.h>
+#include <errno.h>
+#include <unistd.h>
+#include <linux/err.h>
+#include <linux/bpf.h>
+#include <asm/barrier.h>
+#include <sys/mman.h>
+#include <sys/epoll.h>
+#include <tools/libc_compat.h>
+
+#include "libbpf.h"
+#include "libbpf_internal.h"
+#include "bpf.h"
+
+/* make sure libbpf doesn't use kernel-only integer typedefs */
+#pragma GCC poison u8 u16 u32 u64 s8 s16 s32 s64
+
+struct ring {
+	ring_buffer_sample_fn sample_cb;
+	void *ctx;
+	void *data;
+	__u64 *consumer_pos;
+	__u64 *producer_pos;
+	__u64 mask;
+	int map_fd;
+};
+
+struct ring_buffer {
+	struct epoll_event *events;
+	struct ring *rings;
+	size_t page_size;
+	int epoll_fd;
+	int ring_cnt;
+};
+
+static void ringbuf_unmap_ring(struct ring_buffer *rb, struct ring *r)
+{
+	if (r->consumer_pos) {
+		munmap(r->consumer_pos, rb->page_size);
+		r->consumer_pos = NULL;
+	}
+	if (r->producer_pos) {
+		munmap(r->producer_pos, rb->page_size + 2 * (r->mask + 1));
+		r->producer_pos = NULL;
+	}
+}
+
+/* Add extra RINGBUF maps to this ring buffer manager */
+int ring_buffer__add(struct ring_buffer *rb, int map_fd,
+		     ring_buffer_sample_fn sample_cb, void *ctx)
+{
+	struct bpf_map_info info;
+	__u32 len = sizeof(info);
+	struct epoll_event *e;
+	struct ring *r;
+	void *tmp;
+	int err;
+
+	memset(&info, 0, sizeof(info));
+
+	err = bpf_obj_get_info_by_fd(map_fd, &info, &len);
+	if (err) {
+		err = -errno;
+		pr_warn("ringbuf: failed to get map info for fd=%d: %d\n",
+			map_fd, err);
+		return err;
+	}
+
+	if (info.type != BPF_MAP_TYPE_RINGBUF) {
+		pr_warn("ringbuf: map fd=%d is not BPF_MAP_TYPE_RINGBUF\n",
+			map_fd);
+		return -EINVAL;
+	}
+
+	tmp = reallocarray(rb->rings, rb->ring_cnt + 1, sizeof(*rb->rings));
+	if (!tmp)
+		return -ENOMEM;
+	rb->rings = tmp;
+
+	tmp = reallocarray(rb->events, rb->ring_cnt + 1, sizeof(*rb->events));
+	if (!tmp)
+		return -ENOMEM;
+	rb->events = tmp;
+
+	r = &rb->rings[rb->ring_cnt];
+	memset(r, 0, sizeof(*r));
+
+	r->map_fd = map_fd;
+	r->sample_cb = sample_cb;
+	r->ctx = ctx;
+	r->mask = info.max_entries - 1;
+
+	/* Map writable consumer page */
+	tmp = mmap(NULL, rb->page_size, PROT_READ | PROT_WRITE, MAP_SHARED,
+		   map_fd, 0);
+	if (tmp == MAP_FAILED) {
+		err = -errno;
+		pr_warn("ringbuf: failed to mmap consumer page for map fd=%d: %d\n",
+			map_fd, err);
+		return err;
+	}
+	r->consumer_pos = tmp;
+
+	/* Map read-only producer page and data pages. We map twice as big
+	 * data size to allow simple reading of samples that wrap around the
+	 * end of a ring buffer. See kernel implementation for details.
+	 * */
+	tmp = mmap(NULL, rb->page_size + 2 * info.max_entries, PROT_READ,
+		   MAP_SHARED, map_fd, rb->page_size);
+	if (tmp == MAP_FAILED) {
+		err = -errno;
+		ringbuf_unmap_ring(rb, r);
+		pr_warn("ringbuf: failed to mmap data pages for map fd=%d: %d\n",
+			map_fd, err);
+		return err;
+	}
+	r->producer_pos = tmp;
+	r->data = tmp + rb->page_size;
+
+	e = &rb->events[rb->ring_cnt];
+	memset(e, 0, sizeof(*e));
+
+	e->events = EPOLLIN;
+	e->data.fd = rb->ring_cnt;
+	if (epoll_ctl(rb->epoll_fd, EPOLL_CTL_ADD, map_fd, e) < 0) {
+		err = -errno;
+		ringbuf_unmap_ring(rb, r);
+		pr_warn("ringbuf: failed to epoll add map fd=%d: %d\n",
+			map_fd, err);
+		return err;
+	}
+
+	rb->ring_cnt++;
+	return 0;
+}
+
+void ring_buffer__free(struct ring_buffer *rb)
+{
+	int i;
+
+	if (!rb)
+		return;
+
+	for (i = 0; i < rb->ring_cnt; ++i)
+		ringbuf_unmap_ring(rb, &rb->rings[i]);
+	if (rb->epoll_fd >= 0)
+		close(rb->epoll_fd);
+
+	free(rb->events);
+	free(rb->rings);
+	free(rb);
+}
+
+struct ring_buffer *
+ring_buffer__new(int map_fd, ring_buffer_sample_fn sample_cb, void *ctx,
+		 const struct ring_buffer_opts *opts)
+{
+	struct ring_buffer *rb;
+	int err;
+
+	if (!OPTS_VALID(opts, ring_buffer_opts))
+		return NULL;
+
+	rb = calloc(1, sizeof(*rb));
+	if (!rb)
+		return NULL;
+
+	rb->page_size = getpagesize();
+
+	rb->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
+	if (rb->epoll_fd < 0) {
+		err = -errno;
+		pr_warn("ringbuf: failed to create epoll instance: %d\n", err);
+		goto err_out;
+	}
+
+	err = ring_buffer__add(rb, map_fd, sample_cb, ctx);
+	if (err)
+		goto err_out;
+
+	return rb;
+
+err_out:
+	ring_buffer__free(rb);
+	return NULL;
+}
+
+#define RINGBUF_BUSY_BIT (1 << 31)
+#define RINGBUF_DISCARD_BIT (1 << 30)
+#define RINGBUF_META_LEN 8
+
+static inline int roundup_len(__u32 len)
+{
+	/* clear out top 2 bits */
+	len <<= 2;
+	len >>= 2;
+	/* add length prefix */
+	len += RINGBUF_META_LEN;
+	/* round up to 8 byte alignment */
+	return (len + 7) / 8 * 8;
+}
+
+static int ringbuf_process_ring(struct ring* r)
+{
+	__u64 cons_pos, prod_pos;
+	int *len_ptr, len, err;
+	bool got_new_data;
+	void *sample;
+
+	cons_pos = *r->consumer_pos;
+	do {
+		got_new_data = false;
+		prod_pos = smp_load_acquire(r->producer_pos);
+		while (cons_pos < prod_pos) {
+			len_ptr = r->data + (cons_pos & r->mask);
+			len = smp_load_acquire(len_ptr);
+
+			/* sample not committed yet, bail out for now */
+			if (len & RINGBUF_BUSY_BIT)
+				goto done;
+
+			got_new_data = true;
+			cons_pos += roundup_len(len);
+
+			if ((len & RINGBUF_DISCARD_BIT) == 0) {
+				sample = (void *)len_ptr + RINGBUF_META_LEN;
+				err = r->sample_cb(r->ctx, sample, len);
+				if (err) {
+					/* update consumer pos and bail out */
+					smp_store_release(r->consumer_pos,
+							  cons_pos);
+					return err;
+				}
+			}
+
+			smp_store_release(r->consumer_pos, cons_pos);
+		}
+	} while (got_new_data);
+done:
+	return 0;
+}
+
+int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms)
+{
+	int i, cnt, err;
+
+	cnt = epoll_wait(rb->epoll_fd, rb->events, rb->ring_cnt, timeout_ms);
+	for (i = 0; i < cnt; i++) {
+		__u32 ring_id = rb->events[i].data.fd;
+		struct ring *ring = &rb->rings[ring_id];
+
+		err = ringbuf_process_ring(ring);
+		if (err)
+			return err;
+	}
+	return cnt < 0 ? -errno : cnt;
+}
-- 
2.24.1





[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux