On Wed, Apr 10, 2019 at 6:03 AM Jesper Dangaard Brouer <brouer@xxxxxxxxxx> wrote: > > Move ptr_ring dequeue outside loop, that allocate SKBs and calls network > stack, as these operations that can take some time. The ptr_ring is a > communication channel between CPUs, where we want to reduce/limit any > cacheline bouncing. > > Do a concentrated bulk dequeue via ptr_ring_consume_batched, to shorten the > period and times the remote cacheline in ptr_ring is read > > Batch size 8 is both to (1) limit BH-disable period, and (2) consume one > cacheline on 64-bit archs. After reducing the BH-disable section further > then we can consider changing this, while still thinking about L1 cacheline > size being active. > > Signed-off-by: Jesper Dangaard Brouer <brouer@xxxxxxxxxx> Acked-by: Song Liu <songliubraving@xxxxxx> > --- > kernel/bpf/cpumap.c | 21 +++++++++++---------- > 1 file changed, 11 insertions(+), 10 deletions(-) > > diff --git a/kernel/bpf/cpumap.c b/kernel/bpf/cpumap.c > index 3c18260403dd..430103e182a0 100644 > --- a/kernel/bpf/cpumap.c > +++ b/kernel/bpf/cpumap.c > @@ -240,6 +240,8 @@ static void put_cpu_map_entry(struct bpf_cpu_map_entry *rcpu) > } > } > > +#define CPUMAP_BATCH 8 > + > static int cpu_map_kthread_run(void *data) > { > struct bpf_cpu_map_entry *rcpu = data; > @@ -252,8 +254,9 @@ static int cpu_map_kthread_run(void *data) > * kthread_stop signal until queue is empty. > */ > while (!kthread_should_stop() || !__ptr_ring_empty(rcpu->queue)) { > - unsigned int processed = 0, drops = 0, sched = 0; > - struct xdp_frame *xdpf; > + unsigned int drops = 0, sched = 0; > + void *frames[CPUMAP_BATCH]; > + int i, n; > > /* Release CPU reschedule checks */ > if (__ptr_ring_empty(rcpu->queue)) { > @@ -269,14 +272,16 @@ static int cpu_map_kthread_run(void *data) > sched = cond_resched(); > } > > - /* Process packets in rcpu->queue */ > - local_bh_disable(); > /* > * The bpf_cpu_map_entry is single consumer, with this > * kthread CPU pinned. Lockless access to ptr_ring > * consume side valid as no-resize allowed of queue. > */ > - while ((xdpf = __ptr_ring_consume(rcpu->queue))) { > + n = ptr_ring_consume_batched(rcpu->queue, frames, CPUMAP_BATCH); > + > + local_bh_disable(); > + for (i = 0; i < n; i++) { > + struct xdp_frame *xdpf = frames[i]; > struct sk_buff *skb; > int ret; > > @@ -290,13 +295,9 @@ static int cpu_map_kthread_run(void *data) > ret = netif_receive_skb_core(skb); > if (ret == NET_RX_DROP) > drops++; > - > - /* Limit BH-disable period */ > - if (++processed == 8) > - break; > } > /* Feedback loop via tracepoint */ > - trace_xdp_cpumap_kthread(rcpu->map_id, processed, drops, sched); > + trace_xdp_cpumap_kthread(rcpu->map_id, n, drops, sched); btw: can we do the tracepoint after local_bh_enable()? > > local_bh_enable(); /* resched point, may call do_softirq() */ > } >