On Thu, May 28, 2020 at 3:11 PM Daniel Borkmann <daniel@xxxxxxxxxxxxx> wrote: > > Hey Andrii, > > On 5/26/20 8:32 AM, Andrii Nakryiko wrote: > [...] > > +static struct bpf_ringbuf *bpf_ringbuf_area_alloc(size_t data_sz, int numa_node) > > +{ > > + const gfp_t flags = GFP_KERNEL | __GFP_RETRY_MAYFAIL | __GFP_NOWARN | > > + __GFP_ZERO; > > + int nr_meta_pages = RINGBUF_PGOFF + RINGBUF_POS_PAGES; > > + int nr_data_pages = data_sz >> PAGE_SHIFT; > > + int nr_pages = nr_meta_pages + nr_data_pages; > > + struct page **pages, *page; > > + size_t array_size; > > + void *addr; > > + int i; > > + > > + /* Each data page is mapped twice to allow "virtual" > > + * continuous read of samples wrapping around the end of ring > > + * buffer area: > > + * ------------------------------------------------------ > > + * | meta pages | real data pages | same data pages | > > + * ------------------------------------------------------ > > + * | | 1 2 3 4 5 6 7 8 9 | 1 2 3 4 5 6 7 8 9 | > > + * ------------------------------------------------------ > > + * | | TA DA | TA DA | > > + * ------------------------------------------------------ > > + * ^^^^^^^ > > + * | > > + * Here, no need to worry about special handling of wrapped-around > > + * data due to double-mapped data pages. This works both in kernel and > > + * when mmap()'ed in user-space, simplifying both kernel and > > + * user-space implementations significantly. > > + */ > > + array_size = (nr_meta_pages + 2 * nr_data_pages) * sizeof(*pages); > > + if (array_size > PAGE_SIZE) > > + pages = vmalloc_node(array_size, numa_node); > > + else > > + pages = kmalloc_node(array_size, flags, numa_node); > > + if (!pages) > > + return NULL; > > + > > + for (i = 0; i < nr_pages; i++) { > > + page = alloc_pages_node(numa_node, flags, 0); > > + if (!page) { > > + nr_pages = i; > > + goto err_free_pages; > > + } > > + pages[i] = page; > > + if (i >= nr_meta_pages) > > + pages[nr_data_pages + i] = page; > > + } > > + > > + addr = vmap(pages, nr_meta_pages + 2 * nr_data_pages, > > + VM_ALLOC | VM_USERMAP, PAGE_KERNEL); > > + if (addr) > > + return addr; > > Does this need an explicit vunmap() as well in bpf_ringbuf_free()? I can see that the > __vfree() calls __vunmap(addr, 1) which does the deallocation, but how does this stand Good catch. I think kvfree(rb) below will free the first page only. I think I need to store array of page pointers (pages above) and iterate page by page and free pages as well. I'll fix it. > with the case of kmalloc_node()? (And does it even make sense to support < PAGE_SIZE > array size here?) Pages is just an array of pointers, it's passed into vmap and could be discarded immediately afterwards. So if ringbuf needs less than 512 pages, I need only 1 page to keep those pointers, which means I can use kmalloc. But if ringbuf is bigger, then I might need more than one, at which point kmalloc() might fail to find enough contiguous pages. Or you mean to just drop kmalloc() optimization and always do vmalloc? > > > +err_free_pages: > > + for (i = 0; i < nr_pages; i++) > > + free_page((unsigned long)pages[i]); > > + kvfree(pages); > > + return NULL; > > +} > > + > > +static void bpf_ringbuf_notify(struct irq_work *work) > > +{ > > + struct bpf_ringbuf *rb = container_of(work, struct bpf_ringbuf, work); > > + > > + wake_up_all(&rb->waitq); > > +} > > + > > +static struct bpf_ringbuf *bpf_ringbuf_alloc(size_t data_sz, int numa_node) > > +{ > > + struct bpf_ringbuf *rb; > > + > > + if (!data_sz || !PAGE_ALIGNED(data_sz)) > > + return ERR_PTR(-EINVAL); > > + > > +#ifdef CONFIG_64BIT > > + /* on 32-bit arch, it's impossible to overflow record's hdr->pgoff */ > > + if (data_sz > RINGBUF_MAX_DATA_SZ) > > + return ERR_PTR(-E2BIG); > > +#endif > > + > > + rb = bpf_ringbuf_area_alloc(data_sz, numa_node); > > + if (!rb) > > + return ERR_PTR(-ENOMEM); > > + > > + spin_lock_init(&rb->spinlock); > > + init_waitqueue_head(&rb->waitq); > > + init_irq_work(&rb->work, bpf_ringbuf_notify); > > + > > + rb->mask = data_sz - 1; > > + rb->consumer_pos = 0; > > + rb->producer_pos = 0; > > + > > + return rb; > > +} > > + > > +static struct bpf_map *ringbuf_map_alloc(union bpf_attr *attr) > > +{ > > + struct bpf_ringbuf_map *rb_map; > > + u64 cost; > > + int err; > > + > > + if (attr->map_flags & ~RINGBUF_CREATE_FLAG_MASK) > > + return ERR_PTR(-EINVAL); > > + > > + if (attr->key_size || attr->value_size || > > + attr->max_entries == 0 || !PAGE_ALIGNED(attr->max_entries)) > > + return ERR_PTR(-EINVAL); > > + > > + rb_map = kzalloc(sizeof(*rb_map), GFP_USER); > > + if (!rb_map) > > + return ERR_PTR(-ENOMEM); > > + > > + bpf_map_init_from_attr(&rb_map->map, attr); > > + > > + cost = sizeof(struct bpf_ringbuf_map) + > > + sizeof(struct bpf_ringbuf) + > > + attr->max_entries; > > + err = bpf_map_charge_init(&rb_map->map.memory, cost); > > + if (err) > > + goto err_free_map; > > + > > + rb_map->rb = bpf_ringbuf_alloc(attr->max_entries, rb_map->map.numa_node); > > + if (IS_ERR(rb_map->rb)) { > > + err = PTR_ERR(rb_map->rb); > > + goto err_uncharge; > > + } > > + > > + return &rb_map->map; > > + > > +err_uncharge: > > + bpf_map_charge_finish(&rb_map->map.memory); > > +err_free_map: > > + kfree(rb_map); > > + return ERR_PTR(err); > > +} > > + > > +static void bpf_ringbuf_free(struct bpf_ringbuf *ringbuf) > > +{ > > + kvfree(ringbuf); > > ... here. Yep, this is buggy, instead of kvfree, we need vunmap + kvfree each page individually. Thanks for catching! > > > +} > > + > > +static void ringbuf_map_free(struct bpf_map *map) > > +{ > > + struct bpf_ringbuf_map *rb_map; > > + > > + /* at this point bpf_prog->aux->refcnt == 0 and this map->refcnt == 0, > > + * so the programs (can be more than one that used this map) were > > + * disconnected from events. Wait for outstanding critical sections in > > + * these programs to complete > > + */ > > + synchronize_rcu(); > > + > > + rb_map = container_of(map, struct bpf_ringbuf_map, map); > > + bpf_ringbuf_free(rb_map->rb); > > + kfree(rb_map); > > +} > > + > > +static void *ringbuf_map_lookup_elem(struct bpf_map *map, void *key) > > +{ > > + return ERR_PTR(-ENOTSUPP); > > +} > > + > > +static int ringbuf_map_update_elem(struct bpf_map *map, void *key, void *value, > > + u64 flags) > > +{ > > + return -ENOTSUPP; > > +} > > + > > +static int ringbuf_map_delete_elem(struct bpf_map *map, void *key) > > +{ > > + return -ENOTSUPP; > > +} > > + > > +static int ringbuf_map_get_next_key(struct bpf_map *map, void *key, > > + void *next_key) > > +{ > > + return -ENOTSUPP; > > +} > > One use-case we'd have that would be quite interesting to resolve as well is > to implement ->map_push_elem() callback from here and have a bpf_ringbuf_output() > like way to feed also data from user space into the same ring buffer that BPF > programs do. In our case we use perf RB for all sort of event messages from BPF > side and we also have trace events from our golang agent, both get "merged" into > an event stream together and then exported. I think it might be really useful to > allow this here natively. Feels a bit round-about way to do user-space -> kernel ringbuf -> another user-space, but I don't see any problems to support this. Sort of bpf_ringbuf_event_output() interface for user-space, right? But let's land this patch set first, and then extend it with this and what you request below. > > > +static size_t bpf_ringbuf_mmap_page_cnt(const struct bpf_ringbuf *rb) > > +{ > > + size_t data_pages = (rb->mask + 1) >> PAGE_SHIFT; > > + > > + /* consumer page + producer page + 2 x data pages */ > > + return RINGBUF_POS_PAGES + 2 * data_pages; > > +} > > + > > +static int ringbuf_map_mmap(struct bpf_map *map, struct vm_area_struct *vma) > > +{ > > + struct bpf_ringbuf_map *rb_map; > > + size_t mmap_sz; > > + > > + rb_map = container_of(map, struct bpf_ringbuf_map, map); > > + mmap_sz = bpf_ringbuf_mmap_page_cnt(rb_map->rb) << PAGE_SHIFT; > > + > > + if (vma->vm_pgoff * PAGE_SIZE + (vma->vm_end - vma->vm_start) > mmap_sz) > > + return -EINVAL; > > + > > + return remap_vmalloc_range(vma, rb_map->rb, > > + vma->vm_pgoff + RINGBUF_PGOFF); > > +} > > + > > +static unsigned long ringbuf_avail_data_sz(struct bpf_ringbuf *rb) > > +{ > > + unsigned long cons_pos, prod_pos; > > + > > + cons_pos = smp_load_acquire(&rb->consumer_pos); > > + prod_pos = smp_load_acquire(&rb->producer_pos); > > + return prod_pos - cons_pos; > > +} > > + > > +static __poll_t ringbuf_map_poll(struct bpf_map *map, struct file *filp, > > + struct poll_table_struct *pts) > > +{ > > + struct bpf_ringbuf_map *rb_map; > > + > > + rb_map = container_of(map, struct bpf_ringbuf_map, map); > > + poll_wait(filp, &rb_map->rb->waitq, pts); > > + > > + if (ringbuf_avail_data_sz(rb_map->rb)) > > + return EPOLLIN | EPOLLRDNORM; > > + return 0; > > +} > > + > > +const struct bpf_map_ops ringbuf_map_ops = { > > + .map_alloc = ringbuf_map_alloc, > > + .map_free = ringbuf_map_free, > > + .map_mmap = ringbuf_map_mmap, > > + .map_poll = ringbuf_map_poll, > > + .map_lookup_elem = ringbuf_map_lookup_elem, > > + .map_update_elem = ringbuf_map_update_elem, > > + .map_delete_elem = ringbuf_map_delete_elem, > > + .map_get_next_key = ringbuf_map_get_next_key, > > +}; > > + > > +/* Given pointer to ring buffer record metadata and struct bpf_ringbuf itself, > > + * calculate offset from record metadata to ring buffer in pages, rounded > > + * down. This page offset is stored as part of record metadata and allows to > > + * restore struct bpf_ringbuf * from record pointer. This page offset is > > + * stored at offset 4 of record metadata header. > > + */ > > +static size_t bpf_ringbuf_rec_pg_off(struct bpf_ringbuf *rb, > > + struct bpf_ringbuf_hdr *hdr) > > +{ > > + return ((void *)hdr - (void *)rb) >> PAGE_SHIFT; > > +} > > + > > +/* Given pointer to ring buffer record header, restore pointer to struct > > + * bpf_ringbuf itself by using page offset stored at offset 4 > > + */ > > +static struct bpf_ringbuf * > > +bpf_ringbuf_restore_from_rec(struct bpf_ringbuf_hdr *hdr) > > +{ > > + unsigned long addr = (unsigned long)(void *)hdr; > > + unsigned long off = (unsigned long)hdr->pg_off << PAGE_SHIFT; > > + > > + return (void*)((addr & PAGE_MASK) - off); > > +} > > + > > +static void *__bpf_ringbuf_reserve(struct bpf_ringbuf *rb, u64 size) > > +{ > > + unsigned long cons_pos, prod_pos, new_prod_pos, flags; > > + u32 len, pg_off; > > + struct bpf_ringbuf_hdr *hdr; > > + > > + if (unlikely(size > RINGBUF_MAX_RECORD_SZ)) > > + return NULL; > > + > > + len = round_up(size + BPF_RINGBUF_HDR_SZ, 8); > > + cons_pos = smp_load_acquire(&rb->consumer_pos); > > + > > + if (in_nmi()) { > > + if (!spin_trylock_irqsave(&rb->spinlock, flags)) > > + return NULL; > > + } else { > > + spin_lock_irqsave(&rb->spinlock, flags); > > Should this side probe with trylock as well to avoid potential blockage? I don't think so, otherwise we will drop records if we happen to contend with another CPU very briefly. I had to do this for NMI context, of course, but in other contexts we should do straight lock and not drop records. Overall, reserve itself is very lightweight, so it's not a problem, as benchmarks show. > > > + } > > + > > + prod_pos = rb->producer_pos; > > + new_prod_pos = prod_pos + len; > > + > > + /* check for out of ringbuf space by ensuring producer position > > + * doesn't advance more than (ringbuf_size - 1) ahead > > + */ > > + if (new_prod_pos - cons_pos > rb->mask) { > > + spin_unlock_irqrestore(&rb->spinlock, flags); > > + return NULL; > > + } > > + > > + hdr = (void *)rb->data + (prod_pos & rb->mask); > > + pg_off = bpf_ringbuf_rec_pg_off(rb, hdr); > > + hdr->len = size | BPF_RINGBUF_BUSY_BIT; > > + hdr->pg_off = pg_off; > > + > > + /* pairs with consumer's smp_load_acquire() */ > > + smp_store_release(&rb->producer_pos, new_prod_pos); > > + > > + spin_unlock_irqrestore(&rb->spinlock, flags); > > + > > + return (void *)hdr + BPF_RINGBUF_HDR_SZ; > > +} > > + > > +BPF_CALL_3(bpf_ringbuf_reserve, struct bpf_map *, map, u64, size, u64, flags) > > +{ > > + struct bpf_ringbuf_map *rb_map; > > + > > + if (unlikely(flags)) > > + return 0; > > + > > + rb_map = container_of(map, struct bpf_ringbuf_map, map); > > + return (unsigned long)__bpf_ringbuf_reserve(rb_map->rb, size); > > +} > > + > > +const struct bpf_func_proto bpf_ringbuf_reserve_proto = { > > + .func = bpf_ringbuf_reserve, > > + .ret_type = RET_PTR_TO_ALLOC_MEM_OR_NULL, > > + .arg1_type = ARG_CONST_MAP_PTR, > > + .arg2_type = ARG_CONST_ALLOC_SIZE_OR_ZERO, > > + .arg3_type = ARG_ANYTHING, > > +}; > > + > > +static void bpf_ringbuf_commit(void *sample, u64 flags, bool discard) > > +{ > > + unsigned long rec_pos, cons_pos; > > + struct bpf_ringbuf_hdr *hdr; > > + struct bpf_ringbuf *rb; > > + u32 new_len; > > + > > + hdr = sample - BPF_RINGBUF_HDR_SZ; > > + rb = bpf_ringbuf_restore_from_rec(hdr); > > + new_len = hdr->len ^ BPF_RINGBUF_BUSY_BIT; > > + if (discard) > > + new_len |= BPF_RINGBUF_DISCARD_BIT; > > + > > + /* update record header with correct final size prefix */ > > + xchg(&hdr->len, new_len); > > + > > + /* if consumer caught up and is waiting for our record, notify about > > + * new data availability > > + */ > > + rec_pos = (void *)hdr - (void *)rb->data; > > + cons_pos = smp_load_acquire(&rb->consumer_pos) & rb->mask; > > + > > + if (flags & BPF_RB_FORCE_WAKEUP) > > + irq_work_queue(&rb->work); > > + else if (cons_pos == rec_pos && !(flags & BPF_RB_NO_WAKEUP)) > > + irq_work_queue(&rb->work); > > +} > > + > > +BPF_CALL_2(bpf_ringbuf_submit, void *, sample, u64, flags) > > +{ > > + bpf_ringbuf_commit(sample, flags, false /* discard */); > > + return 0; > > +} > > + > > +const struct bpf_func_proto bpf_ringbuf_submit_proto = { > > + .func = bpf_ringbuf_submit, > > + .ret_type = RET_VOID, > > + .arg1_type = ARG_PTR_TO_ALLOC_MEM, > > + .arg2_type = ARG_ANYTHING, > > +}; > > + > > +BPF_CALL_2(bpf_ringbuf_discard, void *, sample, u64, flags) > > +{ > > + bpf_ringbuf_commit(sample, flags, true /* discard */); > > + return 0; > > +} > > + > > +const struct bpf_func_proto bpf_ringbuf_discard_proto = { > > + .func = bpf_ringbuf_discard, > > + .ret_type = RET_VOID, > > + .arg1_type = ARG_PTR_TO_ALLOC_MEM, > > + .arg2_type = ARG_ANYTHING, > > +}; > > + > > +BPF_CALL_4(bpf_ringbuf_output, struct bpf_map *, map, void *, data, u64, size, > > + u64, flags) > > +{ > > + struct bpf_ringbuf_map *rb_map; > > + void *rec; > > + > > + if (unlikely(flags & ~(BPF_RB_NO_WAKEUP | BPF_RB_FORCE_WAKEUP))) > > + return -EINVAL; > > + > > + rb_map = container_of(map, struct bpf_ringbuf_map, map); > > + rec = __bpf_ringbuf_reserve(rb_map->rb, size); > > + if (!rec) > > + return -EAGAIN; > > + > > + memcpy(rec, data, size); > > As discussed, (non-linear) skb capture would be needed as well. Is the plan to > integrate this here into the same record (data + skb capture as one)? Yes. I think we can do variant of bpf_ringbuf_output() helper that will accept user-provided "prefix" and will append 4-byte length "sub-prefix" + data after it. I'm not sure if we need to support multiple data "attachments"? E.g., something like (ignore namings): bpf_ringbuf_output_with_extra(my_data, my_data_size, BPF_RB_EXTRA_SKB_DATA | BPF_RB_EXTRA_SOME_OTHER_DATA); Then in ringbuf we'll have something like: <record length> <my_data bytes> <skb data len (4 bytes)> <skb data bytes> <some other data len (4 bytes)> <some other data bytes> In this case, record length will be my_data_size + skb_data size + that some_other_data size, and user-space will just know how to parse this. I'm not super happy about this, but it would cover your use-cases. It doesn't solve the case KP brought up, where they want to read a bunch of variable-length strings and append them, though. Anyways, this seems a bit orthogonal to ringbuf itself, as it's a simple len + payload approach, which can be built upon. So let's have this discussion separately after this lands. > > > + bpf_ringbuf_commit(rec, flags, false /* discard */); > > + return 0; > > +} > > + > > +const struct bpf_func_proto bpf_ringbuf_output_proto = { > > + .func = bpf_ringbuf_output, > > + .ret_type = RET_INTEGER, > > + .arg1_type = ARG_CONST_MAP_PTR, > > + .arg2_type = ARG_PTR_TO_MEM, > > + .arg3_type = ARG_CONST_SIZE_OR_ZERO, > > + .arg4_type = ARG_ANYTHING, > > +}; > > + > > +BPF_CALL_2(bpf_ringbuf_query, struct bpf_map *, map, u64, flags) > > +{ > > + struct bpf_ringbuf *rb; > > + > > + rb = container_of(map, struct bpf_ringbuf_map, map)->rb; > > + > > + switch (flags) { > > + case BPF_RB_AVAIL_DATA: > > + return ringbuf_avail_data_sz(rb); > > + case BPF_RB_RING_SIZE: > > + return rb->mask + 1; > > + case BPF_RB_CONS_POS: > > + return smp_load_acquire(&rb->consumer_pos); > > + case BPF_RB_PROD_POS: > > + return smp_load_acquire(&rb->producer_pos); > > Do you have an example where the latter two are needed/useful for non-debugging? > Maybe leave out for now if only used for debugging? One example would be the logic to do something (not necessarily ringbuf event notification) every 1MB of data emitted. You can do it based on producer or consumer position, even if you do not use manual notification strategy. Debugging and logging is another case as well. It feels useful and costs nothing, so I'd rather keep it. Another advanced case would be to self-pace event production based on speed of consumption. E.g., if consumer is overwhelmed and doesn't make enough progress, BPF side might decide to do more aggressive sampling until consumer recovers. > > > + default: > > + return 0; > > + } > > +} > > + > > +const struct bpf_func_proto bpf_ringbuf_query_proto = { > > + .func = bpf_ringbuf_query, > > + .ret_type = RET_INTEGER, > > + .arg1_type = ARG_CONST_MAP_PTR, > > + .arg2_type = ARG_ANYTHING, > > +};