On Mon, Aug 15, 2022 at 02:23:04PM -0700, Hao Luo wrote: Hi Hao, Thanks for the review. > On Thu, Aug 11, 2022 at 4:50 PM David Vernet <void@xxxxxxxxxxxxx> wrote: > > > > Now that we have a BPF_MAP_TYPE_USER_RINGBUF map type, we need to add a > > helper function that allows BPF programs to drain samples from the ring > > buffer, and invoke a callback for each. This patch adds a new > > bpf_user_ringbuf_drain() helper that provides this abstraction. > > > > In order to support this, we needed to also add a new PTR_TO_DYNPTR > > register type to reflect a dynptr that was allocated by a helper function > > and passed to a BPF program. The verifier currently only supports > > PTR_TO_DYNPTR registers that are also DYNPTR_TYPE_LOCAL and MEM_ALLOC. > > > > Signed-off-by: David Vernet <void@xxxxxxxxxxxxx> > > --- > [...] > > diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c > > index c0f3bca4bb09..73fa6ed12052 100644 > > --- a/kernel/bpf/ringbuf.c > > +++ b/kernel/bpf/ringbuf.c > [...] > > +static int __bpf_user_ringbuf_poll(struct bpf_ringbuf *rb, void **sample, > > + u32 *size) > > +{ > > + unsigned long cons_pos, prod_pos; > > + u32 sample_len, total_len; > > + u32 *hdr; > > + int err; > > + int busy = 0; > > + > > + /* If another consumer is already consuming a sample, wait for them to > > + * finish. > > + */ > > + if (!atomic_try_cmpxchg(&rb->busy, &busy, 1)) > > + return -EBUSY; > > + > > + /* Synchronizes with smp_store_release() in user-space. */ > > + prod_pos = smp_load_acquire(&rb->producer_pos); > > + /* Synchronizes with smp_store_release() in > > + * __bpf_user_ringbuf_sample_release(). > > + */ > > + cons_pos = smp_load_acquire(&rb->consumer_pos); > > + if (cons_pos >= prod_pos) { > > + atomic_set(&rb->busy, 0); > > + return -ENODATA; > > + } > > + > > + hdr = (u32 *)((uintptr_t)rb->data + (uintptr_t)(cons_pos & rb->mask)); > > + sample_len = *hdr; > > + > > rb->data and rb->mask better be protected by READ_ONCE. Could you please clarify about the behavior you're protecting against here? We're just calculating an offset from rb->data, and both rb->data and rb->mask are set only once when the ringbuffer is first created in bpf_ringbuf_area_alloc(). I'm not following what we'd be protecting against by making these volatile, though I freely admit that I may be missing some weird possible behavior in the compiler. For what it's worth, in a follow-on version of the patch, I've updated this read of the sample len to be an smp_load_acquire() to accommodate Andrii's suggestion [0] that we should support using the busy bit and discard bit in the header from the get-go, as we do with BPF_MAP_TYPE_RINGBUF ringbuffers. [0]: https://lore.kernel.org/all/CAEf4BzYVLgd=rHaxzZjyv0WJBzBpMqGSStgVhXG9XOHpB7qDRQ@xxxxxxxxxxxxxx/ > > + /* Check that the sample can fit into a dynptr. */ > > + err = bpf_dynptr_check_size(sample_len); > > + if (err) { > > + atomic_set(&rb->busy, 0); > > + return err; > > + } > > + > > + /* Check that the sample fits within the region advertised by the > > + * consumer position. > > + */ > > + total_len = sample_len + BPF_RINGBUF_HDR_SZ; > > + if (total_len > prod_pos - cons_pos) { > > + atomic_set(&rb->busy, 0); > > + return -E2BIG; > > + } > > + > > + /* Check that the sample fits within the data region of the ring buffer. > > + */ > > + if (total_len > rb->mask + 1) { > > + atomic_set(&rb->busy, 0); > > + return -E2BIG; > > + } > > + > > + /* consumer_pos is updated when the sample is released. > > + */ > > + > > + *sample = (void *)((uintptr_t)rb->data + > > + (uintptr_t)((cons_pos + BPF_RINGBUF_HDR_SZ) & rb->mask)); > > + *size = sample_len; > > + > > + return 0; > > +} > > + > > +static void > > +__bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size, > > + u64 flags) > > +{ > > + > > + > > + /* To release the ringbuffer, just increment the producer position to > > + * signal that a new sample can be consumed. The busy bit is cleared by > > + * userspace when posting a new sample to the ringbuffer. > > + */ > > + smp_store_release(&rb->consumer_pos, rb->consumer_pos + size + > > + BPF_RINGBUF_HDR_SZ); > > + > > + if (flags & BPF_RB_FORCE_WAKEUP || !(flags & BPF_RB_NO_WAKEUP)) > > + irq_work_queue(&rb->work); > > + > > + atomic_set(&rb->busy, 0); > > +} > > atomic_set() doesn't imply barrier, so it could be observed before > smp_store_release(). So the paired smp_load_acquire could observe > rb->busy == 0 while seeing the old consumer_pos. At least, you need > smp_mb__before_atomic() as a barrier before atomic_set. Or smp_wmb() > to ensure all _writes_ complete when see rb->busy == 0. Thanks for catching this. I should have been more careful to not assume the semantics of atomic_set(), and I see now that you're of course correct that it's just a WRITE_ONCE() and has no implications at all w.r.t. memory or compiler barriers. I'll fix this in the follow-on version, and will give another closer read over memory-barriers.txt and atomic_t.txt. > Similarly rb->work could be observed before smp_store_release. Yes, in the follow-on version I'll move the atomic_set() to before the irq_work_queue() invocation (per Andrii's comment in [1], though that discussion is still ongoing), and will add the missing smp_mb__before_atomic(). Thanks again for catching this. [1]: https://lore.kernel.org/all/CAEf4BzZ-m-AUX+1+CGr7nMxMDnT=fjkn8DP9nP21Uts1y7fMyg@xxxxxxxxxxxxxx/ > Is it possible for __bpf_user_ringbuf_sample_release to be called > concurrently? If yes, there are races. Because the load of > rb->consumer_pos is not protected by smp_load_acquire, they are not > synchronized with this smp_store_release. Concurrently calling > __bpf_user_ringbuf_sample_release may cause both threads getting stale > consumer_pos values. If we add smp_mb__before_atomic() per your proposed fix above, I don't believe this is an issue. __bpf_user_ringbuf_sample_release() should only be invoked when a caller has an unreleased sample, and that can only happen in a serial context due to the protection afforded by the atomic busy bit. A single caller will not see a stale value, as they must first invoke __bpf_user_ringbuf_peek(), and then invoke __bpf_user_ringbuf_sample_release() with the sample they received. The consumer pos they read in __bpf_user_ringbuf_sample_release() was already smp_load_acquire()'d in __bpf_user_ringbuf_peek(), so they shouldn't see a stale value there. We could certainly add another smp_load_acquire() here for safety, but it would be redundant. Thanks, David