This patch permits using over writable feature for BPF ring buffer from userspace. Signed-off-by: Francis Laniel <flaniel@xxxxxxxxxxxxxxxxxxx> --- tools/include/uapi/linux/bpf.h | 3 +++ tools/lib/bpf/ringbuf.c | 35 +++++++++++++++++++++++++++++++++- 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/tools/include/uapi/linux/bpf.h b/tools/include/uapi/linux/bpf.h index ef78e0e1a754..19c7039265d8 100644 --- a/tools/include/uapi/linux/bpf.h +++ b/tools/include/uapi/linux/bpf.h @@ -1226,6 +1226,9 @@ enum { /* Create a map that is suitable to be an inner map with dynamic max entries */ BPF_F_INNER_MAP = (1U << 12), + +/* Create an over writable BPF_RINGBUF */ + BFP_F_RB_OVER_WRITABLE = (1U << 13), }; /* Flags for BPF_PROG_QUERY. */ diff --git a/tools/lib/bpf/ringbuf.c b/tools/lib/bpf/ringbuf.c index 8bc117bcc7bc..2bd584f7250b 100644 --- a/tools/lib/bpf/ringbuf.c +++ b/tools/lib/bpf/ringbuf.c @@ -23,6 +23,8 @@ struct ring { ring_buffer_sample_fn sample_cb; + __u8 over_writable: 1, + __reserved: 7; void *ctx; void *data; unsigned long *consumer_pos; @@ -95,6 +97,7 @@ int ring_buffer__add(struct ring_buffer *rb, int map_fd, r->sample_cb = sample_cb; r->ctx = ctx; r->mask = info.max_entries - 1; + r->over_writable = !!(info.map_flags & BFP_F_RB_OVER_WRITABLE); /* Map writable consumer page */ tmp = mmap(NULL, rb->page_size, PROT_READ | PROT_WRITE, MAP_SHARED, @@ -202,6 +205,11 @@ static inline int roundup_len(__u32 len) return (len + 7) / 8 * 8; } +static inline bool is_over_writable(struct ring *r) +{ + return !!r->over_writable; +} + static int64_t ringbuf_process_ring(struct ring* r) { int *len_ptr, len, err; @@ -209,12 +217,25 @@ static int64_t ringbuf_process_ring(struct ring* r) int64_t cnt = 0; unsigned long cons_pos, prod_pos; bool got_new_data; + int rounded_len; void *sample; cons_pos = smp_load_acquire(r->consumer_pos); do { got_new_data = false; prod_pos = smp_load_acquire(r->producer_pos); + + /* + * If the difference between the producrer position and that of + * the consumer is higher than the buffer size, it means the + * producer already looped over the buffer. + * So, data at consumer position were already over written. + * We can then bump consumer position to be that of the producer + * minus the buffer size. + */ + if (is_over_writable(r) && prod_pos - cons_pos > r->mask) + cons_pos = prod_pos - (r->mask + 1); + while (cons_pos < prod_pos) { len_ptr = r->data + (cons_pos & r->mask); len = smp_load_acquire(len_ptr); @@ -224,7 +245,19 @@ static int64_t ringbuf_process_ring(struct ring* r) goto done; got_new_data = true; - cons_pos += roundup_len(len); + rounded_len = roundup_len(len); + cons_pos += rounded_len; + + /* + * rounded_len is rounded to be divisible by 8, but a + * length divisible by 8 can be not divisible by 4096. + * So, we need to round again to avoid writing at new + * places. + * See kernel implementation for more details. + */ + if (is_over_writable(r)) { + cons_pos -= (cons_pos & r->mask) % rounded_len; + } if ((len & BPF_RINGBUF_DISCARD_BIT) == 0) { sample = (void *)len_ptr + BPF_RINGBUF_HDR_SZ; -- 2.25.1