On 2019-12-09 09:56, Magnus Karlsson wrote: > Currently, the xsk ring code has two cached producer pointers: > prod_head and prod_tail. This patch consolidates these two into a > single one called cached_prod to make the code simpler and easier to > maintain. This will be in line with the user space part of the the > code found in libbpf, that only uses a single cached pointer. > > The Rx path only uses the two top level functions > xskq_produce_batch_desc and xskq_produce_flush_desc and they both use > prod_head and never prod_tail. So just move them over to > cached_prod. > > The Tx XDP_DRV path uses xskq_produce_addr_lazy and > xskq_produce_flush_addr_n and unnecessarily operates on both prod_tail > and prod_cons, so move them over to just use cached_prod by skipping > the intermediate step of updating prod_tail. > > The Tx path in XDP_SKB mode uses xskq_reserve_addr and > xskq_produce_addr. They currently use both cached pointers, but we can > operate on the global producer pointer in xskq_produce_addr since it > has to be updated anyway, thus eliminating the use of both cached > pointers. We can also remove the xskq_nb_free in xskq_produce_addr > since it is already called in xskq_reserve_addr. No need to do it > twice. > > When there is only one cached producer pointer, we can also simplify > xskq_nb_free by removing one argument. > > Signed-off-by: Magnus Karlsson <magnus.karlsson@xxxxxxxxx> > --- > net/xdp/xsk_queue.h | 49 ++++++++++++++++++++++--------------------------- > 1 file changed, 22 insertions(+), 27 deletions(-) > > diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h > index a2f0ba6..d88e1a0 100644 > --- a/net/xdp/xsk_queue.h > +++ b/net/xdp/xsk_queue.h > @@ -35,8 +35,7 @@ struct xsk_queue { > u64 size; > u32 ring_mask; > u32 nentries; > - u32 prod_head; > - u32 prod_tail; > + u32 cached_prod; > u32 cons_head; > u32 cons_tail; > struct xdp_ring *ring; > @@ -94,39 +93,39 @@ static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q) > > static inline u32 xskq_nb_avail(struct xsk_queue *q, u32 dcnt) > { > - u32 entries = q->prod_tail - q->cons_tail; > + u32 entries = q->cached_prod - q->cons_tail; > > if (entries == 0) { > /* Refresh the local pointer */ > - q->prod_tail = READ_ONCE(q->ring->producer); > - entries = q->prod_tail - q->cons_tail; > + q->cached_prod = READ_ONCE(q->ring->producer); > + entries = q->cached_prod - q->cons_tail; > } > > return (entries > dcnt) ? dcnt : entries; > } > > -static inline u32 xskq_nb_free(struct xsk_queue *q, u32 producer, u32 dcnt) > +static inline u32 xskq_nb_free(struct xsk_queue *q, u32 dcnt) > { > - u32 free_entries = q->nentries - (producer - q->cons_tail); > + u32 free_entries = q->nentries - (q->cached_prod - q->cons_tail); > > if (free_entries >= dcnt) > return free_entries; > > /* Refresh the local tail pointer */ > q->cons_tail = READ_ONCE(q->ring->consumer); > - return q->nentries - (producer - q->cons_tail); > + return q->nentries - (q->cached_prod - q->cons_tail); > } > > static inline bool xskq_has_addrs(struct xsk_queue *q, u32 cnt) > { > - u32 entries = q->prod_tail - q->cons_tail; > + u32 entries = q->cached_prod - q->cons_tail; > > if (entries >= cnt) > return true; > > /* Refresh the local pointer. */ > - q->prod_tail = READ_ONCE(q->ring->producer); > - entries = q->prod_tail - q->cons_tail; > + q->cached_prod = READ_ONCE(q->ring->producer); > + entries = q->cached_prod - q->cons_tail; > > return entries >= cnt; > } > @@ -220,17 +219,15 @@ static inline void xskq_discard_addr(struct xsk_queue *q) > static inline int xskq_produce_addr(struct xsk_queue *q, u64 addr) > { > struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; > - > - if (xskq_nb_free(q, q->prod_tail, 1) == 0) > - return -ENOSPC; > + unsigned int idx = q->ring->producer; > > /* A, matches D */ > - ring->desc[q->prod_tail++ & q->ring_mask] = addr; > + ring->desc[idx++ & q->ring_mask] = addr; > > /* Order producer and data */ > smp_wmb(); /* B, matches C */ > > - WRITE_ONCE(q->ring->producer, q->prod_tail); > + WRITE_ONCE(q->ring->producer, idx); > return 0; > } > > @@ -238,11 +235,11 @@ static inline int xskq_produce_addr_lazy(struct xsk_queue *q, u64 addr) > { > struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; > > - if (xskq_nb_free(q, q->prod_head, 1) == 0) > + if (xskq_nb_free(q, 1) == 0) > return -ENOSPC; > > /* A, matches D */ > - ring->desc[q->prod_head++ & q->ring_mask] = addr; > + ring->desc[q->cached_prod++ & q->ring_mask] = addr; > return 0; > } > > @@ -252,17 +249,16 @@ static inline void xskq_produce_flush_addr_n(struct xsk_queue *q, > /* Order producer and data */ > smp_wmb(); /* B, matches C */ > > - q->prod_tail += nb_entries; > - WRITE_ONCE(q->ring->producer, q->prod_tail); > + WRITE_ONCE(q->ring->producer, q->ring->producer + nb_entries); > } > > static inline int xskq_reserve_addr(struct xsk_queue *q) > { > - if (xskq_nb_free(q, q->prod_head, 1) == 0) > + if (xskq_nb_free(q, 1) == 0) > return -ENOSPC; > > /* A, matches D */ > - q->prod_head++; > + q->cached_prod++; > return 0; > } > > @@ -340,11 +336,11 @@ static inline int xskq_produce_batch_desc(struct xsk_queue *q, > struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring; > unsigned int idx; > > - if (xskq_nb_free(q, q->prod_head, 1) == 0) > + if (xskq_nb_free(q, 1) == 0) > return -ENOSPC; > > /* A, matches D */ > - idx = (q->prod_head++) & q->ring_mask; > + idx = q->cached_prod++ & q->ring_mask; > ring->desc[idx].addr = addr; > ring->desc[idx].len = len; > > @@ -356,8 +352,7 @@ static inline void xskq_produce_flush_desc(struct xsk_queue *q) > /* Order producer and data */ > smp_wmb(); /* B, matches C */ > > - q->prod_tail = q->prod_head; > - WRITE_ONCE(q->ring->producer, q->prod_tail); > + WRITE_ONCE(q->ring->producer, q->cached_prod); > } > > static inline bool xskq_full_desc(struct xsk_queue *q) > @@ -367,7 +362,7 @@ static inline bool xskq_full_desc(struct xsk_queue *q) > > static inline bool xskq_empty_desc(struct xsk_queue *q) > { > - return xskq_nb_free(q, q->prod_tail, q->nentries) == q->nentries; > + return xskq_nb_free(q, q->nentries) == q->nentries; I don't think this change is correct. The old code checked the number of free items against prod_tail (== producer). The new code changes it to prod_head (which is now cached_prod). xskq_nb_free is used in xsk_poll to set EPOLLIN. After this change EPOLLIN will be set right after xskq_produce_batch_desc, but it should only be set after xskq_produce_flush_desc, just as before, otherwise the application will wake up before the data is available, and it will just waste CPU cycles. > } > > void xskq_set_umem(struct xsk_queue *q, u64 size, u64 chunk_mask); >