On Fri, Dec 13, 2019 at 7:04 PM Maxim Mikityanskiy <maximmi@xxxxxxxxxxxx> wrote: > > On 2019-12-09 09:56, Magnus Karlsson wrote: > > Currently, the xsk ring code has two cached producer pointers: > > prod_head and prod_tail. This patch consolidates these two into a > > single one called cached_prod to make the code simpler and easier to > > maintain. This will be in line with the user space part of the the > > code found in libbpf, that only uses a single cached pointer. > > > > The Rx path only uses the two top level functions > > xskq_produce_batch_desc and xskq_produce_flush_desc and they both use > > prod_head and never prod_tail. So just move them over to > > cached_prod. > > > > The Tx XDP_DRV path uses xskq_produce_addr_lazy and > > xskq_produce_flush_addr_n and unnecessarily operates on both prod_tail > > and prod_cons, so move them over to just use cached_prod by skipping > > the intermediate step of updating prod_tail. > > > > The Tx path in XDP_SKB mode uses xskq_reserve_addr and > > xskq_produce_addr. They currently use both cached pointers, but we can > > operate on the global producer pointer in xskq_produce_addr since it > > has to be updated anyway, thus eliminating the use of both cached > > pointers. We can also remove the xskq_nb_free in xskq_produce_addr > > since it is already called in xskq_reserve_addr. No need to do it > > twice. > > > > When there is only one cached producer pointer, we can also simplify > > xskq_nb_free by removing one argument. > > > > Signed-off-by: Magnus Karlsson <magnus.karlsson@xxxxxxxxx> > > --- > > net/xdp/xsk_queue.h | 49 ++++++++++++++++++++++--------------------------- > > 1 file changed, 22 insertions(+), 27 deletions(-) > > > > diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h > > index a2f0ba6..d88e1a0 100644 > > --- a/net/xdp/xsk_queue.h > > +++ b/net/xdp/xsk_queue.h > > @@ -35,8 +35,7 @@ struct xsk_queue { > > u64 size; > > u32 ring_mask; > > u32 nentries; > > - u32 prod_head; > > - u32 prod_tail; > > + u32 cached_prod; > > u32 cons_head; > > u32 cons_tail; > > struct xdp_ring *ring; > > @@ -94,39 +93,39 @@ static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q) > > > > static inline u32 xskq_nb_avail(struct xsk_queue *q, u32 dcnt) > > { > > - u32 entries = q->prod_tail - q->cons_tail; > > + u32 entries = q->cached_prod - q->cons_tail; > > > > if (entries == 0) { > > /* Refresh the local pointer */ > > - q->prod_tail = READ_ONCE(q->ring->producer); > > - entries = q->prod_tail - q->cons_tail; > > + q->cached_prod = READ_ONCE(q->ring->producer); > > + entries = q->cached_prod - q->cons_tail; > > } > > > > return (entries > dcnt) ? dcnt : entries; > > } > > > > -static inline u32 xskq_nb_free(struct xsk_queue *q, u32 producer, u32 dcnt) > > +static inline u32 xskq_nb_free(struct xsk_queue *q, u32 dcnt) > > { > > - u32 free_entries = q->nentries - (producer - q->cons_tail); > > + u32 free_entries = q->nentries - (q->cached_prod - q->cons_tail); > > > > if (free_entries >= dcnt) > > return free_entries; > > > > /* Refresh the local tail pointer */ > > q->cons_tail = READ_ONCE(q->ring->consumer); > > - return q->nentries - (producer - q->cons_tail); > > + return q->nentries - (q->cached_prod - q->cons_tail); > > } > > > > static inline bool xskq_has_addrs(struct xsk_queue *q, u32 cnt) > > { > > - u32 entries = q->prod_tail - q->cons_tail; > > + u32 entries = q->cached_prod - q->cons_tail; > > > > if (entries >= cnt) > > return true; > > > > /* Refresh the local pointer. */ > > - q->prod_tail = READ_ONCE(q->ring->producer); > > - entries = q->prod_tail - q->cons_tail; > > + q->cached_prod = READ_ONCE(q->ring->producer); > > + entries = q->cached_prod - q->cons_tail; > > > > return entries >= cnt; > > } > > @@ -220,17 +219,15 @@ static inline void xskq_discard_addr(struct xsk_queue *q) > > static inline int xskq_produce_addr(struct xsk_queue *q, u64 addr) > > { > > struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; > > - > > - if (xskq_nb_free(q, q->prod_tail, 1) == 0) > > - return -ENOSPC; > > + unsigned int idx = q->ring->producer; > > > > /* A, matches D */ > > - ring->desc[q->prod_tail++ & q->ring_mask] = addr; > > + ring->desc[idx++ & q->ring_mask] = addr; > > > > /* Order producer and data */ > > smp_wmb(); /* B, matches C */ > > > > - WRITE_ONCE(q->ring->producer, q->prod_tail); > > + WRITE_ONCE(q->ring->producer, idx); > > return 0; > > } > > > > @@ -238,11 +235,11 @@ static inline int xskq_produce_addr_lazy(struct xsk_queue *q, u64 addr) > > { > > struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; > > > > - if (xskq_nb_free(q, q->prod_head, 1) == 0) > > + if (xskq_nb_free(q, 1) == 0) > > return -ENOSPC; > > > > /* A, matches D */ > > - ring->desc[q->prod_head++ & q->ring_mask] = addr; > > + ring->desc[q->cached_prod++ & q->ring_mask] = addr; > > return 0; > > } > > > > @@ -252,17 +249,16 @@ static inline void xskq_produce_flush_addr_n(struct xsk_queue *q, > > /* Order producer and data */ > > smp_wmb(); /* B, matches C */ > > > > - q->prod_tail += nb_entries; > > - WRITE_ONCE(q->ring->producer, q->prod_tail); > > + WRITE_ONCE(q->ring->producer, q->ring->producer + nb_entries); > > } > > > > static inline int xskq_reserve_addr(struct xsk_queue *q) > > { > > - if (xskq_nb_free(q, q->prod_head, 1) == 0) > > + if (xskq_nb_free(q, 1) == 0) > > return -ENOSPC; > > > > /* A, matches D */ > > - q->prod_head++; > > + q->cached_prod++; > > return 0; > > } > > > > @@ -340,11 +336,11 @@ static inline int xskq_produce_batch_desc(struct xsk_queue *q, > > struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring; > > unsigned int idx; > > > > - if (xskq_nb_free(q, q->prod_head, 1) == 0) > > + if (xskq_nb_free(q, 1) == 0) > > return -ENOSPC; > > > > /* A, matches D */ > > - idx = (q->prod_head++) & q->ring_mask; > > + idx = q->cached_prod++ & q->ring_mask; > > ring->desc[idx].addr = addr; > > ring->desc[idx].len = len; > > > > @@ -356,8 +352,7 @@ static inline void xskq_produce_flush_desc(struct xsk_queue *q) > > /* Order producer and data */ > > smp_wmb(); /* B, matches C */ > > > > - q->prod_tail = q->prod_head; > > - WRITE_ONCE(q->ring->producer, q->prod_tail); > > + WRITE_ONCE(q->ring->producer, q->cached_prod); > > } > > > > static inline bool xskq_full_desc(struct xsk_queue *q) > > @@ -367,7 +362,7 @@ static inline bool xskq_full_desc(struct xsk_queue *q) > > > > static inline bool xskq_empty_desc(struct xsk_queue *q) > > { > > - return xskq_nb_free(q, q->prod_tail, q->nentries) == q->nentries; > > + return xskq_nb_free(q, q->nentries) == q->nentries; > > I don't think this change is correct. The old code checked the number of > free items against prod_tail (== producer). The new code changes it to > prod_head (which is now cached_prod). xskq_nb_free is used in xsk_poll > to set EPOLLIN. After this change EPOLLIN will be set right after > xskq_produce_batch_desc, but it should only be set after > xskq_produce_flush_desc, just as before, otherwise the application will > wake up before the data is available, and it will just waste CPU cycles. That is correct. It will be inefficient during patch 2 and 3 as this gets fixed in patch 4. I chose this as I thought the patch progression and simplification process would be clearer this way. So what to do about it? Some options: * Document this in patch 2 and keep the current order * Put patch 4 before patch 2 so that the code is always efficient. This is doable, but I have the feeling it will be somewhat less clear from a simplification perspective. The advantage, on the other hand, is that the poll code is always efficient during the whole patch set. /Magnus > > } > > > > void xskq_set_umem(struct xsk_queue *q, u64 size, u64 chunk_mask); > > >