Re: [PATCH bpf-next v3] libbpf: handle producer position overflow

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Hi,

On 8/25/2023 6:09 AM, Andrew Werner wrote:
> Before this patch, the producer position could overflow `unsigned
> long`, in which case libbpf would forever stop processing new writes to
> the ringbuf. Similarly, overflows of the producer position could result
> in __bpf_user_ringbuf_peek not discovering available data. This patch
> addresses that bug by computing using the signed delta between the
> consumer and producer position to determine if data is available; the
> delta computation is robust to overflow.
>
> A more defensive check could be to ensure that the delta is within
> the allowed range, but such defensive checks are neither present in
> the kernel side code nor in libbpf. The overflow that this patch
> handles can occur while the producer and consumer follow a correct
> protocol.
>
> Secondarily, the type used to represent the positions in the
> user_ring_buffer functions in both libbpf and the kernel has been
> changed from u64 to unsigned long to match the type used in the
> kernel's representation of the structure. The change occurs in the
> same patch because it's required to align the data availability
> calculations between the userspace producing ringbuf and the bpf
> producing ringbuf.

Because the changes include both the change for ring buffer and user
ring buffer. I think it is better to split the changes into three
patches to ease the backports of these changes: one patch for change in
libbpf for ring buffer, and another two patches for changes in libbpf
and kernel for user ring buffer.
>
> Not included in this patch, a selftest was written to demonstrate the
> bug, and indeed this patch allows the test to continue to make progress
> past the overflow. The shape of the self test was as follows:
>
>  a) Set up ringbuf of 2GB size (the maximum permitted size).
>  b) reserve+commit maximum-sized records (ULONG_MAX/4) constantly as
>     fast as possible.

ULONG_MAX -> UINT_MAX ?
>
> With 1 million records per second repro time should be about 4.7 hours.
> Such a test duration is impractical to run, hence the omission.
>
> Additionally, this patch adds commentary around a separate point to note
> that the modular arithmetic is valid in the face of overflows, as that
> fact may not be obvious to future readers.
>
> v2->v3:
>   - Changed the representation of the consumer and producer positions
>     from u64 to unsigned long in user_ring_buffer functions. 
>   - Addressed overflow in __bpf_user_ringbuf_peek.
>   - Changed data availability computations to use the signed delta
>     between the consumer and producer positions rather than merely
>     checking whether their values were unequal.
> v1->v2:
>   - Fixed comment grammar.
>   - Properly formatted subject line.
>
> Signed-off-by: Andrew Werner <awerner32@xxxxxxxxx>
> ---
>  kernel/bpf/ringbuf.c    | 11 ++++++++---
>  tools/lib/bpf/ringbuf.c | 16 +++++++++++++---
>  2 files changed, 21 insertions(+), 6 deletions(-)

Otherwise, these changes look good to me:

Acked-by: Hou Tao <houtao1@xxxxxxxxxx>

>
> diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c
> index f045fde632e5..0c48673520fb 100644
> --- a/kernel/bpf/ringbuf.c
> +++ b/kernel/bpf/ringbuf.c
> @@ -658,7 +658,7 @@ static int __bpf_user_ringbuf_peek(struct bpf_ringbuf *rb, void **sample, u32 *s
>  {
>  	int err;
>  	u32 hdr_len, sample_len, total_len, flags, *hdr;
> -	u64 cons_pos, prod_pos;
> +	unsigned long cons_pos, prod_pos;
>  
>  	/* Synchronizes with smp_store_release() in user-space producer. */
>  	prod_pos = smp_load_acquire(&rb->producer_pos);
> @@ -667,7 +667,12 @@ static int __bpf_user_ringbuf_peek(struct bpf_ringbuf *rb, void **sample, u32 *s
>  
>  	/* Synchronizes with smp_store_release() in __bpf_user_ringbuf_sample_release() */
>  	cons_pos = smp_load_acquire(&rb->consumer_pos);
> -	if (cons_pos >= prod_pos)
> +
> +	/* Check if there's data available by computing the signed delta between
> +	 * cons_pos and prod_pos; a negative delta indicates that the consumer has
> +	 * not caught up. This formulation is robust to prod_pos wrapping around.
> +	 */
> +	if ((long)(cons_pos - prod_pos) >= 0)
>  		return -ENODATA;
>  
>  	hdr = (u32 *)((uintptr_t)rb->data + (uintptr_t)(cons_pos & rb->mask));
> @@ -711,7 +716,7 @@ static int __bpf_user_ringbuf_peek(struct bpf_ringbuf *rb, void **sample, u32 *s
>  
>  static void __bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size, u64 flags)
>  {
> -	u64 consumer_pos;
> +	unsigned long consumer_pos;
>  	u32 rounded_size = round_up(size + BPF_RINGBUF_HDR_SZ, 8);
>  
>  	/* Using smp_load_acquire() is unnecessary here, as the busy-bit
> diff --git a/tools/lib/bpf/ringbuf.c b/tools/lib/bpf/ringbuf.c
> index 02199364db13..141030a89370 100644
> --- a/tools/lib/bpf/ringbuf.c
> +++ b/tools/lib/bpf/ringbuf.c
> @@ -237,7 +237,13 @@ static int64_t ringbuf_process_ring(struct ring *r)
>  	do {
>  		got_new_data = false;
>  		prod_pos = smp_load_acquire(r->producer_pos);
> -		while (cons_pos < prod_pos) {
> +
> +		/* Check if there's data available by computing the signed delta
> +		 * between cons_pos and prod_pos; a negative delta indicates that the
> +		 * consumer has not caught up. This formulation is robust to prod_pos
> +		 * wrapping around.
> +		 */
> +		while ((long)(cons_pos - prod_pos) < 0) {
>  			len_ptr = r->data + (cons_pos & r->mask);
>  			len = smp_load_acquire(len_ptr);
>  
> @@ -482,8 +488,7 @@ void user_ring_buffer__submit(struct user_ring_buffer *rb, void *sample)
>  void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size)
>  {
>  	__u32 avail_size, total_size, max_size;
> -	/* 64-bit to avoid overflow in case of extreme application behavior */
> -	__u64 cons_pos, prod_pos;
> +	unsigned long cons_pos, prod_pos;
>  	struct ringbuf_hdr *hdr;
>  
>  	/* The top two bits are used as special flags */
> @@ -498,6 +503,11 @@ void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size)
>  	prod_pos = smp_load_acquire(rb->producer_pos);
>  
>  	max_size = rb->mask + 1;
> +
> +	/* Note that this formulation is valid in the face of overflow of
> +	 * prod_pos so long as the delta between prod_pos and cons_pos is
> +	 * no greater than max_size.
> +	 */
>  	avail_size = max_size - (prod_pos - cons_pos);
>  	/* Round up total size to a multiple of 8. */
>  	total_size = (size + BPF_RINGBUF_HDR_SZ + 7) / 8 * 8;





[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux