From: Björn Töpel <bjorn.topel@xxxxxxxxx> Currently, the AF_XDP rings uses smp_{r,w,}mb() fences on the kernel-side. By updating the rings for load-acquire/store-release semantics, the full barrier on the consumer side can be replaced with improved performance as a nice side-effect. Note that this change does *not* require similar changes on the libbpf/userland side, however it is recommended [1]. On x86-64 systems, by removing the smp_mb() on the Rx and Tx side, the l2fwd AF_XDP xdpsock sample performance increases by 1%. Weakly-ordered platforms, such as ARM64 might benefit even more. [1] https://lore.kernel.org/bpf/20200316184423.GA14143@willie-the-truck/ Signed-off-by: Björn Töpel <bjorn.topel@xxxxxxxxx> --- net/xdp/xsk_queue.h | 27 +++++++++++---------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h index 2823b7c3302d..e24279d8d845 100644 --- a/net/xdp/xsk_queue.h +++ b/net/xdp/xsk_queue.h @@ -47,19 +47,18 @@ struct xsk_queue { u64 queue_empty_descs; }; -/* The structure of the shared state of the rings are the same as the - * ring buffer in kernel/events/ring_buffer.c. For the Rx and completion - * ring, the kernel is the producer and user space is the consumer. For - * the Tx and fill rings, the kernel is the consumer and user space is - * the producer. +/* The structure of the shared state of the rings are a simple + * circular buffer, as outlined in + * Documentation/core-api/circular-buffers.rst. For the Rx and + * completion ring, the kernel is the producer and user space is the + * consumer. For the Tx and fill rings, the kernel is the consumer and + * user space is the producer. * * producer consumer * - * if (LOAD ->consumer) { LOAD ->producer - * (A) smp_rmb() (C) + * if (LOAD ->consumer) { (A) LOAD.acq ->producer (C) * STORE $data LOAD $data - * smp_wmb() (B) smp_mb() (D) - * STORE ->producer STORE ->consumer + * STORE.rel ->producer (B) STORE.rel ->consumer (D) * } * * (A) pairs with (D), and (B) pairs with (C). @@ -227,15 +226,13 @@ static inline u32 xskq_cons_read_desc_batch(struct xsk_queue *q, static inline void __xskq_cons_release(struct xsk_queue *q) { - smp_mb(); /* D, matches A */ - WRITE_ONCE(q->ring->consumer, q->cached_cons); + smp_store_release(&q->ring->consumer, q->cached_cons); /* D, matchees A */ } static inline void __xskq_cons_peek(struct xsk_queue *q) { /* Refresh the local pointer */ - q->cached_prod = READ_ONCE(q->ring->producer); - smp_rmb(); /* C, matches B */ + q->cached_prod = smp_load_acquire(&q->ring->producer); /* C, matches B */ } static inline void xskq_cons_get_entries(struct xsk_queue *q) @@ -397,9 +394,7 @@ static inline int xskq_prod_reserve_desc(struct xsk_queue *q, static inline void __xskq_prod_submit(struct xsk_queue *q, u32 idx) { - smp_wmb(); /* B, matches C */ - - WRITE_ONCE(q->ring->producer, idx); + smp_store_release(&q->ring->producer, idx); /* B, matches C */ } static inline void xskq_prod_submit(struct xsk_queue *q) -- 2.27.0