On Thu, Dec 19, 2019 at 3:35 PM Maxim Mikityanskiy <maxtram95@xxxxxxxxx> wrote: > > On Mon, Dec 16, 2019 at 10:46 AM Magnus Karlsson > <magnus.karlsson@xxxxxxxxx> wrote: > > > > On Fri, Dec 13, 2019 at 7:04 PM Maxim Mikityanskiy <maximmi@xxxxxxxxxxxx> wrote: > > > > > > On 2019-12-09 09:56, Magnus Karlsson wrote: > > > > Currently, the xsk ring code has two cached producer pointers: > > > > prod_head and prod_tail. This patch consolidates these two into a > > > > single one called cached_prod to make the code simpler and easier to > > > > maintain. This will be in line with the user space part of the the > > > > code found in libbpf, that only uses a single cached pointer. > > > > > > > > The Rx path only uses the two top level functions > > > > xskq_produce_batch_desc and xskq_produce_flush_desc and they both use > > > > prod_head and never prod_tail. So just move them over to > > > > cached_prod. > > > > > > > > The Tx XDP_DRV path uses xskq_produce_addr_lazy and > > > > xskq_produce_flush_addr_n and unnecessarily operates on both prod_tail > > > > and prod_cons, so move them over to just use cached_prod by skipping > > > > the intermediate step of updating prod_tail. > > > > > > > > The Tx path in XDP_SKB mode uses xskq_reserve_addr and > > > > xskq_produce_addr. They currently use both cached pointers, but we can > > > > operate on the global producer pointer in xskq_produce_addr since it > > > > has to be updated anyway, thus eliminating the use of both cached > > > > pointers. We can also remove the xskq_nb_free in xskq_produce_addr > > > > since it is already called in xskq_reserve_addr. No need to do it > > > > twice. > > > > > > > > When there is only one cached producer pointer, we can also simplify > > > > xskq_nb_free by removing one argument. > > > > > > > > Signed-off-by: Magnus Karlsson <magnus.karlsson@xxxxxxxxx> > > > > --- > > > > net/xdp/xsk_queue.h | 49 ++++++++++++++++++++++--------------------------- > > > > 1 file changed, 22 insertions(+), 27 deletions(-) > > > > > > > > diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h > > > > index a2f0ba6..d88e1a0 100644 > > > > --- a/net/xdp/xsk_queue.h > > > > +++ b/net/xdp/xsk_queue.h > > > > @@ -35,8 +35,7 @@ struct xsk_queue { > > > > u64 size; > > > > u32 ring_mask; > > > > u32 nentries; > > > > - u32 prod_head; > > > > - u32 prod_tail; > > > > + u32 cached_prod; > > > > u32 cons_head; > > > > u32 cons_tail; > > > > struct xdp_ring *ring; > > > > @@ -94,39 +93,39 @@ static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q) > > > > > > > > static inline u32 xskq_nb_avail(struct xsk_queue *q, u32 dcnt) > > > > { > > > > - u32 entries = q->prod_tail - q->cons_tail; > > > > + u32 entries = q->cached_prod - q->cons_tail; > > > > > > > > if (entries == 0) { > > > > /* Refresh the local pointer */ > > > > - q->prod_tail = READ_ONCE(q->ring->producer); > > > > - entries = q->prod_tail - q->cons_tail; > > > > + q->cached_prod = READ_ONCE(q->ring->producer); > > > > + entries = q->cached_prod - q->cons_tail; > > > > } > > > > > > > > return (entries > dcnt) ? dcnt : entries; > > > > } > > > > > > > > -static inline u32 xskq_nb_free(struct xsk_queue *q, u32 producer, u32 dcnt) > > > > +static inline u32 xskq_nb_free(struct xsk_queue *q, u32 dcnt) > > > > { > > > > - u32 free_entries = q->nentries - (producer - q->cons_tail); > > > > + u32 free_entries = q->nentries - (q->cached_prod - q->cons_tail); > > > > > > > > if (free_entries >= dcnt) > > > > return free_entries; > > > > > > > > /* Refresh the local tail pointer */ > > > > q->cons_tail = READ_ONCE(q->ring->consumer); > > > > - return q->nentries - (producer - q->cons_tail); > > > > + return q->nentries - (q->cached_prod - q->cons_tail); > > > > } > > > > > > > > static inline bool xskq_has_addrs(struct xsk_queue *q, u32 cnt) > > > > { > > > > - u32 entries = q->prod_tail - q->cons_tail; > > > > + u32 entries = q->cached_prod - q->cons_tail; > > > > > > > > if (entries >= cnt) > > > > return true; > > > > > > > > /* Refresh the local pointer. */ > > > > - q->prod_tail = READ_ONCE(q->ring->producer); > > > > - entries = q->prod_tail - q->cons_tail; > > > > + q->cached_prod = READ_ONCE(q->ring->producer); > > > > + entries = q->cached_prod - q->cons_tail; > > > > > > > > return entries >= cnt; > > > > } > > > > @@ -220,17 +219,15 @@ static inline void xskq_discard_addr(struct xsk_queue *q) > > > > static inline int xskq_produce_addr(struct xsk_queue *q, u64 addr) > > > > { > > > > struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; > > > > - > > > > - if (xskq_nb_free(q, q->prod_tail, 1) == 0) > > > > - return -ENOSPC; > > > > + unsigned int idx = q->ring->producer; > > > > > > > > /* A, matches D */ > > > > - ring->desc[q->prod_tail++ & q->ring_mask] = addr; > > > > + ring->desc[idx++ & q->ring_mask] = addr; > > > > > > > > /* Order producer and data */ > > > > smp_wmb(); /* B, matches C */ > > > > > > > > - WRITE_ONCE(q->ring->producer, q->prod_tail); > > > > + WRITE_ONCE(q->ring->producer, idx); > > > > return 0; > > > > } > > > > > > > > @@ -238,11 +235,11 @@ static inline int xskq_produce_addr_lazy(struct xsk_queue *q, u64 addr) > > > > { > > > > struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; > > > > > > > > - if (xskq_nb_free(q, q->prod_head, 1) == 0) > > > > + if (xskq_nb_free(q, 1) == 0) > > > > return -ENOSPC; > > > > > > > > /* A, matches D */ > > > > - ring->desc[q->prod_head++ & q->ring_mask] = addr; > > > > + ring->desc[q->cached_prod++ & q->ring_mask] = addr; > > > > return 0; > > > > } > > > > > > > > @@ -252,17 +249,16 @@ static inline void xskq_produce_flush_addr_n(struct xsk_queue *q, > > > > /* Order producer and data */ > > > > smp_wmb(); /* B, matches C */ > > > > > > > > - q->prod_tail += nb_entries; > > > > - WRITE_ONCE(q->ring->producer, q->prod_tail); > > > > + WRITE_ONCE(q->ring->producer, q->ring->producer + nb_entries); > > > > } > > > > > > > > static inline int xskq_reserve_addr(struct xsk_queue *q) > > > > { > > > > - if (xskq_nb_free(q, q->prod_head, 1) == 0) > > > > + if (xskq_nb_free(q, 1) == 0) > > > > return -ENOSPC; > > > > > > > > /* A, matches D */ > > > > - q->prod_head++; > > > > + q->cached_prod++; > > > > return 0; > > > > } > > > > > > > > @@ -340,11 +336,11 @@ static inline int xskq_produce_batch_desc(struct xsk_queue *q, > > > > struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring; > > > > unsigned int idx; > > > > > > > > - if (xskq_nb_free(q, q->prod_head, 1) == 0) > > > > + if (xskq_nb_free(q, 1) == 0) > > > > return -ENOSPC; > > > > > > > > /* A, matches D */ > > > > - idx = (q->prod_head++) & q->ring_mask; > > > > + idx = q->cached_prod++ & q->ring_mask; > > > > ring->desc[idx].addr = addr; > > > > ring->desc[idx].len = len; > > > > > > > > @@ -356,8 +352,7 @@ static inline void xskq_produce_flush_desc(struct xsk_queue *q) > > > > /* Order producer and data */ > > > > smp_wmb(); /* B, matches C */ > > > > > > > > - q->prod_tail = q->prod_head; > > > > - WRITE_ONCE(q->ring->producer, q->prod_tail); > > > > + WRITE_ONCE(q->ring->producer, q->cached_prod); > > > > } > > > > > > > > static inline bool xskq_full_desc(struct xsk_queue *q) > > > > @@ -367,7 +362,7 @@ static inline bool xskq_full_desc(struct xsk_queue *q) > > > > > > > > static inline bool xskq_empty_desc(struct xsk_queue *q) > > > > { > > > > - return xskq_nb_free(q, q->prod_tail, q->nentries) == q->nentries; > > > > + return xskq_nb_free(q, q->nentries) == q->nentries; > > > > > > I don't think this change is correct. The old code checked the number of > > > free items against prod_tail (== producer). The new code changes it to > > > prod_head (which is now cached_prod). xskq_nb_free is used in xsk_poll > > > to set EPOLLIN. After this change EPOLLIN will be set right after > > > xskq_produce_batch_desc, but it should only be set after > > > xskq_produce_flush_desc, just as before, otherwise the application will > > > wake up before the data is available, and it will just waste CPU cycles. > > > > That is correct. It will be inefficient during patch 2 and 3 as this > > gets fixed in patch 4. > > Looking at patch 4, I see it still uses cached_prod, not producer. > However, I see you changed it in the v2, so it should be fine now. > > > I chose this as I thought the patch progression > > and simplification process would be clearer this way. So what to do > > about it? Some options: > > > > * Document this in patch 2 and keep the current order > > > > * Put patch 4 before patch 2 so that the code is always efficient. > > This is doable, but I have the feeling it will be somewhat less clear > > from a simplification perspective. The advantage, on the other hand, > > is that the poll code is always efficient during the whole patch set. > > I'm sorry that it takes long for me to answer, I'm on vacation now. > Anyway, either option looks good to me, as long as xskq_prod_is_empty > has the correct check (as in v2 patch 2). Thanks for taking a look at the patch during your vacation Maxim. But now, please go and enjoy the holidays :-)! /Magnus > > /Magnus > > > > > > } > > > > > > > > void xskq_set_umem(struct xsk_queue *q, u64 size, u64 chunk_mask); > > > > > > >