In some cases, instead of always consuming all items from ring buffers in a greedy way, we may want to consume up to a certain amount of items, for example when we need to copy items from the BPF ring buffer to a limited user buffer. This change allows to set an upper limit to the amount of items consumed from one or more ring buffers. Link: https://lore.kernel.org/lkml/20240310154726.734289-1-andrea.righi@xxxxxxxxxxxxx/T Signed-off-by: Andrea Righi <andrea.righi@xxxxxxxxxxxxx> --- tools/lib/bpf/ringbuf.c | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/tools/lib/bpf/ringbuf.c b/tools/lib/bpf/ringbuf.c index aacb64278a01..81df535040d1 100644 --- a/tools/lib/bpf/ringbuf.c +++ b/tools/lib/bpf/ringbuf.c @@ -231,7 +231,7 @@ static inline int roundup_len(__u32 len) return (len + 7) / 8 * 8; } -static int64_t ringbuf_process_ring(struct ring *r) +static int64_t ringbuf_process_ring(struct ring *r, int64_t max_items) { int *len_ptr, len, err; /* 64-bit to avoid overflow in case of extreme application behavior */ @@ -264,7 +264,14 @@ static int64_t ringbuf_process_ring(struct ring *r) cons_pos); return err; } - cnt++; + if (++cnt >= max_items) { + /* update consumer pos and return the + * total amount of items consumed. + */ + smp_store_release(r->consumer_pos, + cons_pos); + goto done; + } } smp_store_release(r->consumer_pos, cons_pos); @@ -281,19 +288,18 @@ static int64_t ringbuf_process_ring(struct ring *r) */ int ring_buffer__consume(struct ring_buffer *rb) { - int64_t err, res = 0; + int64_t err, res = 0, max_items = INT_MAX; int i; for (i = 0; i < rb->ring_cnt; i++) { struct ring *ring = rb->rings[i]; - err = ringbuf_process_ring(ring); + err = ringbuf_process_ring(ring, max_items); if (err < 0) return libbpf_err(err); res += err; + max_items -= err; } - if (res > INT_MAX) - return INT_MAX; return res; } @@ -304,7 +310,7 @@ int ring_buffer__consume(struct ring_buffer *rb) int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms) { int i, cnt; - int64_t err, res = 0; + int64_t err, res = 0, max_items = INT_MAX; cnt = epoll_wait(rb->epoll_fd, rb->events, rb->ring_cnt, timeout_ms); if (cnt < 0) @@ -314,13 +320,12 @@ int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms) __u32 ring_id = rb->events[i].data.fd; struct ring *ring = rb->rings[ring_id]; - err = ringbuf_process_ring(ring); + err = ringbuf_process_ring(ring, max_items); if (err < 0) return libbpf_err(err); res += err; + max_items -= err; } - if (res > INT_MAX) - return INT_MAX; return res; } @@ -375,11 +380,11 @@ int ring__consume(struct ring *r) { int64_t res; - res = ringbuf_process_ring(r); + res = ringbuf_process_ring(r, INT_MAX); if (res < 0) return libbpf_err(res); - return res > INT_MAX ? INT_MAX : res; + return res; } static void user_ringbuf_unmap_ring(struct user_ring_buffer *rb) -- 2.43.0