On Thu, May 14, 2020 at 12:06 PM Alexei Starovoitov <alexei.starovoitov@xxxxxxxxx> wrote: > > On Wed, May 13, 2020 at 12:25:27PM -0700, Andrii Nakryiko wrote: > > + > > +/* Given pointer to ring buffer record metadata, restore pointer to struct > > + * bpf_ringbuf itself by using page offset stored at offset 4 > > + */ > > +static struct bpf_ringbuf *bpf_ringbuf_restore_from_rec(void *meta_ptr) > > +{ > > + unsigned long addr = (unsigned long)meta_ptr; > > + unsigned long off = *(u32 *)(meta_ptr + 4) << PAGE_SHIFT; > > Looking at the further code it seems this one should be READ_ONCE, but... > This will be called from commit(), which will be called after successful reserve(), which dose spin_unlock at the end. So in terms of memory barriers, everything should be fine? Or am I missing some trickier aspect? > > + > > + return (void*)((addr & PAGE_MASK) - off); > > +} > > + > > +static void *__bpf_ringbuf_reserve(struct bpf_ringbuf *rb, u64 size) > > +{ > > + unsigned long cons_pos, prod_pos, new_prod_pos, flags; > > + u32 len, pg_off; > > + void *meta_ptr; > > + > > + if (unlikely(size > UINT_MAX)) > > + return NULL; > > + > > + len = round_up(size + RINGBUF_META_SZ, 8); > > it may overflow despite the check above. As Jonathan pointed out, max size should be enforced about to be at most (UINT_MAX/4), due to 2 bits reserved for BUSY and DISCARD. So it shouldn't overflow. > > > + cons_pos = READ_ONCE(rb->consumer_pos); > > + > > + if (in_nmi()) { > > + if (!spin_trylock_irqsave(&rb->spinlock, flags)) > > + return NULL; > > + } else { > > + spin_lock_irqsave(&rb->spinlock, flags); > > + } > > + > > + prod_pos = rb->producer_pos; > > + new_prod_pos = prod_pos + len; > > + > > + /* check for out of ringbuf space by ensuring producer position > > + * doesn't advance more than (ringbuf_size - 1) ahead > > + */ > > + if (new_prod_pos - cons_pos > rb->mask) { > > + spin_unlock_irqrestore(&rb->spinlock, flags); > > + return NULL; > > + } > > + > > + meta_ptr = rb->data + (prod_pos & rb->mask); > > + pg_off = bpf_ringbuf_rec_pg_off(rb, meta_ptr); > > + > > + WRITE_ONCE(*(u32 *)meta_ptr, RINGBUF_BUSY_BIT | size); > > + WRITE_ONCE(*(u32 *)(meta_ptr + 4), pg_off); > > it doens't match to few other places where normal read is done. > But why WRITE_ONCE here? > How does it race with anything? > producer_pos is updated later. Yeah, I think you are right. I left it as WRITE_ONCE from my initial lockless variant, but this could be just a normal store, will fix. > > > + > > + /* ensure length prefix is written before updating producer positions */ > > + smp_wmb(); > > this barrier is enough to make sure meta_ptr and meta_ptr+4 init > is visible before producer_pos is updated below. yep, 100% agree > > > + WRITE_ONCE(rb->producer_pos, new_prod_pos); consumer reads this with smp_load_acquire, I guess for consistency I'll switch this to smp_store_release() then, right? > > + > > + spin_unlock_irqrestore(&rb->spinlock, flags); > > + > > + return meta_ptr + RINGBUF_META_SZ; > > +} > > + > > +BPF_CALL_3(bpf_ringbuf_reserve, struct bpf_map *, map, u64, size, u64, flags) > > +{ > > + struct bpf_ringbuf_map *rb_map; > > + > > + if (unlikely(flags)) > > + return -EINVAL; > > + > > + rb_map = container_of(map, struct bpf_ringbuf_map, map); > > + return (unsigned long)__bpf_ringbuf_reserve(rb_map->rb, size); > > +} > > + > > +const struct bpf_func_proto bpf_ringbuf_reserve_proto = { > > + .func = bpf_ringbuf_reserve, > > + .ret_type = RET_PTR_TO_ALLOC_MEM_OR_NULL, > > + .arg1_type = ARG_CONST_MAP_PTR, > > + .arg2_type = ARG_CONST_ALLOC_SIZE_OR_ZERO, > > + .arg3_type = ARG_ANYTHING, > > +}; > > + > > +static void bpf_ringbuf_commit(void *sample, bool discard) > > +{ > > + unsigned long rec_pos, cons_pos; > > + u32 new_meta, old_meta; > > + void *meta_ptr; > > + struct bpf_ringbuf *rb; > > + > > + meta_ptr = sample - RINGBUF_META_SZ; > > + rb = bpf_ringbuf_restore_from_rec(meta_ptr); > > + old_meta = *(u32 *)meta_ptr; > > I think this one will race with user space and should be READ_ONCE. This is never modified by user-space, only by previous reserve(). Is that still considered a race by some tooling? > > > + new_meta = old_meta ^ RINGBUF_BUSY_BIT; > > + if (discard) > > + new_meta |= RINGBUF_DISCARD_BIT; > > + > > + /* update metadata header with correct final size prefix */ > > + xchg((u32 *)meta_ptr, new_meta); > > + > > + /* if consumer caught up and is waiting for our record, notify about > > + * new data availability > > + */ > > + rec_pos = (void *)meta_ptr - (void *)rb->data; > > + cons_pos = smp_load_acquire(&rb->consumer_pos) & rb->mask; > > hmm. Earlier WRITE_ONCE(rb->producer_pos) is used, but here it's load_acquire. > Please be consistent with pairing. wait, it's *consumer* vs *producer* positions. consumer_pos is updated by consumer using smp_store_release(), so it's paired. But I mentioned above that producer_pos has to be written with smp_store_release(). Does that address your concern? > > > + if (cons_pos == rec_pos) > > + wake_up_all(&rb->waitq); > > Is it legal to do from preempt_disabled region? Don't know, couldn't find anything about this aspect. But looking at perf buffer code, all the wakeups are done after irq_work_queue(), so I guess it's not safe then? I'll add irq_work_queue() then.