Re: [PATCH bpf-next 1/6] bpf: implement BPF ring buffer and verifier support for it

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Thu, May 14, 2020 at 12:06 PM Alexei Starovoitov
<alexei.starovoitov@xxxxxxxxx> wrote:
>
> On Wed, May 13, 2020 at 12:25:27PM -0700, Andrii Nakryiko wrote:
> > +
> > +/* Given pointer to ring buffer record metadata, restore pointer to struct
> > + * bpf_ringbuf itself by using page offset stored at offset 4
> > + */
> > +static struct bpf_ringbuf *bpf_ringbuf_restore_from_rec(void *meta_ptr)
> > +{
> > +     unsigned long addr = (unsigned long)meta_ptr;
> > +     unsigned long off = *(u32 *)(meta_ptr + 4) << PAGE_SHIFT;
>
> Looking at the further code it seems this one should be READ_ONCE, but...
>

This will be called from commit(), which will be called after
successful reserve(), which dose spin_unlock at the end. So in terms
of memory barriers, everything should be fine? Or am I missing some
trickier aspect?

> > +
> > +     return (void*)((addr & PAGE_MASK) - off);
> > +}
> > +
> > +static void *__bpf_ringbuf_reserve(struct bpf_ringbuf *rb, u64 size)
> > +{
> > +     unsigned long cons_pos, prod_pos, new_prod_pos, flags;
> > +     u32 len, pg_off;
> > +     void *meta_ptr;
> > +
> > +     if (unlikely(size > UINT_MAX))
> > +             return NULL;
> > +
> > +     len = round_up(size + RINGBUF_META_SZ, 8);
>
> it may overflow despite the check above.

As Jonathan pointed out, max size should be enforced about to be at
most (UINT_MAX/4), due to 2 bits reserved for BUSY and DISCARD. So it
shouldn't overflow.

>
> > +     cons_pos = READ_ONCE(rb->consumer_pos);
> > +
> > +     if (in_nmi()) {
> > +             if (!spin_trylock_irqsave(&rb->spinlock, flags))
> > +                     return NULL;
> > +     } else {
> > +             spin_lock_irqsave(&rb->spinlock, flags);
> > +     }
> > +
> > +     prod_pos = rb->producer_pos;
> > +     new_prod_pos = prod_pos + len;
> > +
> > +     /* check for out of ringbuf space by ensuring producer position
> > +      * doesn't advance more than (ringbuf_size - 1) ahead
> > +      */
> > +     if (new_prod_pos - cons_pos > rb->mask) {
> > +             spin_unlock_irqrestore(&rb->spinlock, flags);
> > +             return NULL;
> > +     }
> > +
> > +     meta_ptr = rb->data + (prod_pos & rb->mask);
> > +     pg_off = bpf_ringbuf_rec_pg_off(rb, meta_ptr);
> > +
> > +     WRITE_ONCE(*(u32 *)meta_ptr, RINGBUF_BUSY_BIT | size);
> > +     WRITE_ONCE(*(u32 *)(meta_ptr + 4), pg_off);
>
> it doens't match to few other places where normal read is done.
> But why WRITE_ONCE here?
> How does it race with anything?
> producer_pos is updated later.

Yeah, I think you are right. I left it as WRITE_ONCE from my initial
lockless variant, but this could be just a normal store, will fix.

>
> > +
> > +     /* ensure length prefix is written before updating producer positions */
> > +     smp_wmb();
>
> this barrier is enough to make sure meta_ptr and meta_ptr+4 init
> is visible before producer_pos is updated below.

yep, 100% agree


>
> > +     WRITE_ONCE(rb->producer_pos, new_prod_pos);

consumer reads this with smp_load_acquire, I guess for consistency
I'll switch this to smp_store_release() then, right?

> > +
> > +     spin_unlock_irqrestore(&rb->spinlock, flags);
> > +
> > +     return meta_ptr + RINGBUF_META_SZ;
> > +}
> > +
> > +BPF_CALL_3(bpf_ringbuf_reserve, struct bpf_map *, map, u64, size, u64, flags)
> > +{
> > +     struct bpf_ringbuf_map *rb_map;
> > +
> > +     if (unlikely(flags))
> > +             return -EINVAL;
> > +
> > +     rb_map = container_of(map, struct bpf_ringbuf_map, map);
> > +     return (unsigned long)__bpf_ringbuf_reserve(rb_map->rb, size);
> > +}
> > +
> > +const struct bpf_func_proto bpf_ringbuf_reserve_proto = {
> > +     .func           = bpf_ringbuf_reserve,
> > +     .ret_type       = RET_PTR_TO_ALLOC_MEM_OR_NULL,
> > +     .arg1_type      = ARG_CONST_MAP_PTR,
> > +     .arg2_type      = ARG_CONST_ALLOC_SIZE_OR_ZERO,
> > +     .arg3_type      = ARG_ANYTHING,
> > +};
> > +
> > +static void bpf_ringbuf_commit(void *sample, bool discard)
> > +{
> > +     unsigned long rec_pos, cons_pos;
> > +     u32 new_meta, old_meta;
> > +     void *meta_ptr;
> > +     struct bpf_ringbuf *rb;
> > +
> > +     meta_ptr = sample - RINGBUF_META_SZ;
> > +     rb = bpf_ringbuf_restore_from_rec(meta_ptr);
> > +     old_meta = *(u32 *)meta_ptr;
>
> I think this one will race with user space and should be READ_ONCE.

This is never modified by user-space, only by previous reserve(). Is
that still considered a race by some tooling?

>
> > +     new_meta = old_meta ^ RINGBUF_BUSY_BIT;
> > +     if (discard)
> > +             new_meta |= RINGBUF_DISCARD_BIT;
> > +
> > +     /* update metadata header with correct final size prefix */
> > +     xchg((u32 *)meta_ptr, new_meta);
> > +
> > +     /* if consumer caught up and is waiting for our record, notify about
> > +      * new data availability
> > +      */
> > +     rec_pos = (void *)meta_ptr - (void *)rb->data;
> > +     cons_pos = smp_load_acquire(&rb->consumer_pos) & rb->mask;
>
> hmm. Earlier WRITE_ONCE(rb->producer_pos) is used, but here it's load_acquire.
> Please be consistent with pairing.

wait, it's *consumer* vs *producer* positions. consumer_pos is updated
by consumer using smp_store_release(), so it's paired. But I mentioned
above that producer_pos has to be written with smp_store_release().
Does that address your concern?


>
> > +     if (cons_pos == rec_pos)
> > +             wake_up_all(&rb->waitq);
>
> Is it legal to do from preempt_disabled region?

Don't know, couldn't find anything about this aspect. But looking at
perf buffer code, all the wakeups are done after irq_work_queue(), so
I guess it's not safe then? I'll add irq_work_queue() then.



[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux