If the BPF ring buffer is overwritable, ringbuf_process_overwritable_ring() will be called to handle the data consumption. All the available data will be consumed but some checks will be performed: * check we do not read data we already read, if there is no new data, nothing happens. * check we do not read more than the buffer size. * check we do not read invalid data by checking they fit the buffer size. Signed-off-by: Francis Laniel <flaniel@xxxxxxxxxxxxxxxxxxx> --- tools/include/uapi/linux/bpf.h | 3 + tools/lib/bpf/ringbuf.c | 106 +++++++++++++++++++++++++++++++++ 2 files changed, 109 insertions(+) diff --git a/tools/include/uapi/linux/bpf.h b/tools/include/uapi/linux/bpf.h index 59a217ca2dfd..cd73a89e8ead 100644 --- a/tools/include/uapi/linux/bpf.h +++ b/tools/include/uapi/linux/bpf.h @@ -1227,6 +1227,9 @@ enum { /* Create a map that is suitable to be an inner map with dynamic max entries */ BPF_F_INNER_MAP = (1U << 12), + +/* Create an over writable BPF_RINGBUF */ + BFP_F_RB_OVERWRITABLE = (1U << 13), }; /* Flags for BPF_PROG_QUERY. */ diff --git a/tools/lib/bpf/ringbuf.c b/tools/lib/bpf/ringbuf.c index 8bc117bcc7bc..2362a6280fc5 100644 --- a/tools/lib/bpf/ringbuf.c +++ b/tools/lib/bpf/ringbuf.c @@ -23,6 +23,8 @@ struct ring { ring_buffer_sample_fn sample_cb; + __u8 overwritable: 1, + __reserved: 7; void *ctx; void *data; unsigned long *consumer_pos; @@ -51,6 +53,11 @@ static void ringbuf_unmap_ring(struct ring_buffer *rb, struct ring *r) } } +static inline bool is_overwritable(struct ring *r) +{ + return !!r->overwritable; +} + /* Add extra RINGBUF maps to this ring buffer manager */ int ring_buffer__add(struct ring_buffer *rb, int map_fd, ring_buffer_sample_fn sample_cb, void *ctx) @@ -95,6 +102,7 @@ int ring_buffer__add(struct ring_buffer *rb, int map_fd, r->sample_cb = sample_cb; r->ctx = ctx; r->mask = info.max_entries - 1; + r->overwritable = !!(info.map_flags & BFP_F_RB_OVERWRITABLE); /* Map writable consumer page */ tmp = mmap(NULL, rb->page_size, PROT_READ | PROT_WRITE, MAP_SHARED, @@ -202,6 +210,101 @@ static inline int roundup_len(__u32 len) return (len + 7) / 8 * 8; } + +static int64_t ringbuf_process_overwritable_ring(struct ring *r) +{ + /* 64-bit to avoid overflow in case of extreme application behavior */ + int64_t cnt = 0; + unsigned long read_pos, prod_pos, previous_prod_pos; + + prod_pos = smp_load_acquire(r->producer_pos); + previous_prod_pos = smp_load_acquire(r->consumer_pos); + + /* + * For overwritable ring buffer, we use consumer_pos as the previous + * producer_pos. + * So, if between two calls to this function, the prod_pos did not move, + * it means there is no new data, so we can return right now rather than + * dealing with data we already proceeded. + * NOTE the kernel space does not care about consumer_pos to reserve() + * in overwritable ring buffers, hence we can hijack this field. + */ + if (previous_prod_pos == prod_pos) + return 0; + + /* + * BPF ring buffer is over writable, we start reading from + * producer position. + */ + read_pos = prod_pos; + while (read_pos - prod_pos < r->mask) { + int *len_ptr, len; + + len_ptr = r->data + (read_pos & r->mask); + len = smp_load_acquire(len_ptr); + + /* sample not committed yet, bail out for now */ + if (len & BPF_RINGBUF_BUSY_BIT) + break; + + /* + * If len is 0, it means we read all the data + * available in the buffer and jump on 0 data: + * + * prod_pos read_pos + * | | + * V V + * +---+------+----------+-------+------+ + * | |D....D|C........C|B.....B|A....A| + * +---+------+----------+-------+------+ + */ + if (!len) + break; + + /* + * If adding the event len to the current + * consumer position makes us wrap the buffer, + * it means we already did "one loop" around the + * buffer. + * So, the pointed data would not be usable: + * + * prod_pos + * read_pos----+ | + * | | + * V V + * +---+------+----------+-------+---+--+ + * |..E|D....D|C........C|B.....B|A..|E.| + * +---+------+----------+-------+---+--+ + */ + if (read_pos - prod_pos + len > r->mask) + break; + + read_pos += roundup_len(len); + + if ((len & BPF_RINGBUF_DISCARD_BIT) == 0) { + void *sample; + int err; + + sample = (void *)len_ptr + BPF_RINGBUF_HDR_SZ; + err = r->sample_cb(r->ctx, sample, len); + if (err < 0) { + /* update consumer pos and bail out */ + smp_store_release(r->consumer_pos, + prod_pos); + return err; + } + cnt++; + } + + /* This prevents reading data we already processed. */ + if (previous_prod_pos && read_pos >= previous_prod_pos) + break; + } + + smp_store_release(r->consumer_pos, prod_pos); + return cnt; +} + static int64_t ringbuf_process_ring(struct ring* r) { int *len_ptr, len, err; @@ -211,6 +314,9 @@ static int64_t ringbuf_process_ring(struct ring* r) bool got_new_data; void *sample; + if (is_overwritable(r)) + return ringbuf_process_overwritable_ring(r); + cons_pos = smp_load_acquire(r->consumer_pos); do { got_new_data = false; -- 2.25.1