On Thu, Jun 20, 2024 at 2:34 PM Daniel Borkmann <daniel@xxxxxxxxxxxxx> wrote: > > The BPF ring buffer internally is implemented as a power-of-2 sized circular > buffer, with two logical and ever-increasing counters: consumer_pos is the > consumer counter to show which logical position the consumer consumed the > data, and producer_pos which is the producer counter denoting the amount of > data reserved by all producers. > > Each time a record is reserved, the producer that "owns" the record will > successfully advance producer counter. In user space each time a record is > read, the consumer of the data advanced the consumer counter once it finished > processing. Both counters are stored in separate pages so that from user > space, the producer counter is read-only and the consumer counter is read-write. > > One aspect that simplifies and thus speeds up the implementation of both > producers and consumers is how the data area is mapped twice contiguously > back-to-back in the virtual memory, allowing to not take any special measures > for samples that have to wrap around at the end of the circular buffer data > area, because the next page after the last data page would be first data page > again, and thus the sample will still appear completely contiguous in virtual > memory. > > Each record has a struct bpf_ringbuf_hdr { u32 len; u32 pg_off; } header for > book-keeping the length and offset, and is inaccessible to the BPF program. > Helpers like bpf_ringbuf_reserve() return `(void *)hdr + BPF_RINGBUF_HDR_SZ` > for the BPF program to use. Bing-Jhong and Muhammad reported that it is however > possible to make a second allocated memory chunk overlapping with the first > chunk and as a result, the BPF program is now able to edit first chunk's > header. > > For example, consider the creation of a BPF_MAP_TYPE_RINGBUF map with size > of 0x4000. Next, the consumer_pos is modified to 0x3000 /before/ a call to > bpf_ringbuf_reserve() is made. This will allocate a chunk A, which is in > [0x0,0x3008], and the BPF program is able to edit [0x8,0x3008]. Now, lets > allocate a chunk B with size 0x3000. This will succeed because consumer_pos > was edited ahead of time to pass the `new_prod_pos - cons_pos > rb->mask` > check. Chunk B will be in range [0x3008,0x6010], and the BPF program is able > to edit [0x3010,0x6010]. Due to the ring buffer memory layout mentioned > earlier, the ranges [0x0,0x4000] and [0x4000,0x8000] point to the same data > pages. This means that chunk B at [0x4000,0x4008] is chunk A's header. > bpf_ringbuf_submit() / bpf_ringbuf_discard() use the header's pg_off to then > locate the bpf_ringbuf itself via bpf_ringbuf_restore_from_rec(). Once chunk > B modified chunk A's header, then bpf_ringbuf_commit() refers to the wrong > page and could cause a crash. > > Fix it by calculating the oldest pending_pos and check whether the range > from the oldest outstanding record to the newest would span beyond the ring > buffer size. If that is the case, then reject the request. We've tested with > the ring buffer benchmark in BPF selftests (./benchs/run_bench_ringbufs.sh) > before/after the fix and while it seems a bit slower on some benchmarks, it > is still not significantly enough to matter. > > Fixes: 457f44363a88 ("bpf: Implement BPF ring buffer and verifier support for it") > Reported-by: Bing-Jhong Billy Jheng <billy@xxxxxxxxxxx> > Reported-by: Muhammad Ramdhan <ramdhan@xxxxxxxxxxx> > Co-developed-by: Bing-Jhong Billy Jheng <billy@xxxxxxxxxxx> > Signed-off-by: Bing-Jhong Billy Jheng <billy@xxxxxxxxxxx> > Co-developed-by: Andrii Nakryiko <andrii@xxxxxxxxxx> > Signed-off-by: Andrii Nakryiko <andrii@xxxxxxxxxx> > Signed-off-by: Daniel Borkmann <daniel@xxxxxxxxxxxxx> > --- > kernel/bpf/ringbuf.c | 28 +++++++++++++++++++++++----- > 1 file changed, 23 insertions(+), 5 deletions(-) > > diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c > index 0ee653a936ea..7f82116b46ec 100644 > --- a/kernel/bpf/ringbuf.c > +++ b/kernel/bpf/ringbuf.c > @@ -29,6 +29,7 @@ struct bpf_ringbuf { > u64 mask; > struct page **pages; > int nr_pages; > + unsigned long pending_pos; let's put it right after producer_pos, as that one is also updated in the same reserve step, so cache line will be exclusive. By putting it into read-mostly parts of bpf_ringbuf struct we'll induce unnecessary cache line bouncing > spinlock_t spinlock ____cacheline_aligned_in_smp; > /* For user-space producer ring buffers, an atomic_t busy bit is used > * to synchronize access to the ring buffers in the kernel, rather than > @@ -179,6 +180,7 @@ static struct bpf_ringbuf *bpf_ringbuf_alloc(size_t data_sz, int numa_node) > rb->mask = data_sz - 1; > rb->consumer_pos = 0; > rb->producer_pos = 0; > + rb->pending_pos = 0; > > return rb; > } > @@ -404,9 +406,10 @@ bpf_ringbuf_restore_from_rec(struct bpf_ringbuf_hdr *hdr) > > static void *__bpf_ringbuf_reserve(struct bpf_ringbuf *rb, u64 size) > { > - unsigned long cons_pos, prod_pos, new_prod_pos, flags; > - u32 len, pg_off; > + unsigned long cons_pos, prod_pos, new_prod_pos, pend_pos, flags; > struct bpf_ringbuf_hdr *hdr; > + u32 len, pg_off; > + u64 tmp_size; > > if (unlikely(size > RINGBUF_MAX_RECORD_SZ)) > return NULL; > @@ -424,13 +427,28 @@ static void *__bpf_ringbuf_reserve(struct bpf_ringbuf *rb, u64 size) > spin_lock_irqsave(&rb->spinlock, flags); > } > > + pend_pos = rb->pending_pos; > prod_pos = rb->producer_pos; > new_prod_pos = prod_pos + len; > > - /* check for out of ringbuf space by ensuring producer position > - * doesn't advance more than (ringbuf_size - 1) ahead > + while (pend_pos < prod_pos) { > + hdr = (void *)rb->data + (pend_pos & rb->mask); > + if (hdr->len & BPF_RINGBUF_BUSY_BIT) > + break; > + tmp_size = hdr->len & ~BPF_RINGBUF_DISCARD_BIT; it feels right to have hdr_len = READ_ONCE(hdr->len) and then using that for bit checks and manipulations, WDYT? > + tmp_size = round_up(tmp_size + BPF_RINGBUF_HDR_SZ, 8); > + pend_pos += tmp_size; > + } > + rb->pending_pos = pend_pos; > + > + /* check for out of ringbuf space: > + * - by ensuring producer position doesn't advance more than > + * (ringbuf_size - 1) ahead > + * - by ensuring oldest not yet committed record until newest > + * record does not span more than (ringbuf_size - 1) > */ > - if (new_prod_pos - cons_pos > rb->mask) { > + if ((new_prod_pos - cons_pos > rb->mask) || > + (new_prod_pos - pend_pos > rb->mask)) { > spin_unlock_irqrestore(&rb->spinlock, flags); > return NULL; > } > -- > 2.43.0 >