Re: [PATCH bpf-next 02/12] xsk: consolidate to one single cached producer pointer

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Fri, Dec 13, 2019 at 7:04 PM Maxim Mikityanskiy <maximmi@xxxxxxxxxxxx> wrote:
>
> On 2019-12-09 09:56, Magnus Karlsson wrote:
> > Currently, the xsk ring code has two cached producer pointers:
> > prod_head and prod_tail. This patch consolidates these two into a
> > single one called cached_prod to make the code simpler and easier to
> > maintain. This will be in line with the user space part of the the
> > code found in libbpf, that only uses a single cached pointer.
> >
> > The Rx path only uses the two top level functions
> > xskq_produce_batch_desc and xskq_produce_flush_desc and they both use
> > prod_head and never prod_tail. So just move them over to
> > cached_prod.
> >
> > The Tx XDP_DRV path uses xskq_produce_addr_lazy and
> > xskq_produce_flush_addr_n and unnecessarily operates on both prod_tail
> > and prod_cons, so move them over to just use cached_prod by skipping
> > the intermediate step of updating prod_tail.
> >
> > The Tx path in XDP_SKB mode uses xskq_reserve_addr and
> > xskq_produce_addr. They currently use both cached pointers, but we can
> > operate on the global producer pointer in xskq_produce_addr since it
> > has to be updated anyway, thus eliminating the use of both cached
> > pointers. We can also remove the xskq_nb_free in xskq_produce_addr
> > since it is already called in xskq_reserve_addr. No need to do it
> > twice.
> >
> > When there is only one cached producer pointer, we can also simplify
> > xskq_nb_free by removing one argument.
> >
> > Signed-off-by: Magnus Karlsson <magnus.karlsson@xxxxxxxxx>
> > ---
> >   net/xdp/xsk_queue.h | 49 ++++++++++++++++++++++---------------------------
> >   1 file changed, 22 insertions(+), 27 deletions(-)
> >
> > diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h
> > index a2f0ba6..d88e1a0 100644
> > --- a/net/xdp/xsk_queue.h
> > +++ b/net/xdp/xsk_queue.h
> > @@ -35,8 +35,7 @@ struct xsk_queue {
> >       u64 size;
> >       u32 ring_mask;
> >       u32 nentries;
> > -     u32 prod_head;
> > -     u32 prod_tail;
> > +     u32 cached_prod;
> >       u32 cons_head;
> >       u32 cons_tail;
> >       struct xdp_ring *ring;
> > @@ -94,39 +93,39 @@ static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q)
> >
> >   static inline u32 xskq_nb_avail(struct xsk_queue *q, u32 dcnt)
> >   {
> > -     u32 entries = q->prod_tail - q->cons_tail;
> > +     u32 entries = q->cached_prod - q->cons_tail;
> >
> >       if (entries == 0) {
> >               /* Refresh the local pointer */
> > -             q->prod_tail = READ_ONCE(q->ring->producer);
> > -             entries = q->prod_tail - q->cons_tail;
> > +             q->cached_prod = READ_ONCE(q->ring->producer);
> > +             entries = q->cached_prod - q->cons_tail;
> >       }
> >
> >       return (entries > dcnt) ? dcnt : entries;
> >   }
> >
> > -static inline u32 xskq_nb_free(struct xsk_queue *q, u32 producer, u32 dcnt)
> > +static inline u32 xskq_nb_free(struct xsk_queue *q, u32 dcnt)
> >   {
> > -     u32 free_entries = q->nentries - (producer - q->cons_tail);
> > +     u32 free_entries = q->nentries - (q->cached_prod - q->cons_tail);
> >
> >       if (free_entries >= dcnt)
> >               return free_entries;
> >
> >       /* Refresh the local tail pointer */
> >       q->cons_tail = READ_ONCE(q->ring->consumer);
> > -     return q->nentries - (producer - q->cons_tail);
> > +     return q->nentries - (q->cached_prod - q->cons_tail);
> >   }
> >
> >   static inline bool xskq_has_addrs(struct xsk_queue *q, u32 cnt)
> >   {
> > -     u32 entries = q->prod_tail - q->cons_tail;
> > +     u32 entries = q->cached_prod - q->cons_tail;
> >
> >       if (entries >= cnt)
> >               return true;
> >
> >       /* Refresh the local pointer. */
> > -     q->prod_tail = READ_ONCE(q->ring->producer);
> > -     entries = q->prod_tail - q->cons_tail;
> > +     q->cached_prod = READ_ONCE(q->ring->producer);
> > +     entries = q->cached_prod - q->cons_tail;
> >
> >       return entries >= cnt;
> >   }
> > @@ -220,17 +219,15 @@ static inline void xskq_discard_addr(struct xsk_queue *q)
> >   static inline int xskq_produce_addr(struct xsk_queue *q, u64 addr)
> >   {
> >       struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
> > -
> > -     if (xskq_nb_free(q, q->prod_tail, 1) == 0)
> > -             return -ENOSPC;
> > +     unsigned int idx = q->ring->producer;
> >
> >       /* A, matches D */
> > -     ring->desc[q->prod_tail++ & q->ring_mask] = addr;
> > +     ring->desc[idx++ & q->ring_mask] = addr;
> >
> >       /* Order producer and data */
> >       smp_wmb(); /* B, matches C */
> >
> > -     WRITE_ONCE(q->ring->producer, q->prod_tail);
> > +     WRITE_ONCE(q->ring->producer, idx);
> >       return 0;
> >   }
> >
> > @@ -238,11 +235,11 @@ static inline int xskq_produce_addr_lazy(struct xsk_queue *q, u64 addr)
> >   {
> >       struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
> >
> > -     if (xskq_nb_free(q, q->prod_head, 1) == 0)
> > +     if (xskq_nb_free(q, 1) == 0)
> >               return -ENOSPC;
> >
> >       /* A, matches D */
> > -     ring->desc[q->prod_head++ & q->ring_mask] = addr;
> > +     ring->desc[q->cached_prod++ & q->ring_mask] = addr;
> >       return 0;
> >   }
> >
> > @@ -252,17 +249,16 @@ static inline void xskq_produce_flush_addr_n(struct xsk_queue *q,
> >       /* Order producer and data */
> >       smp_wmb(); /* B, matches C */
> >
> > -     q->prod_tail += nb_entries;
> > -     WRITE_ONCE(q->ring->producer, q->prod_tail);
> > +     WRITE_ONCE(q->ring->producer, q->ring->producer + nb_entries);
> >   }
> >
> >   static inline int xskq_reserve_addr(struct xsk_queue *q)
> >   {
> > -     if (xskq_nb_free(q, q->prod_head, 1) == 0)
> > +     if (xskq_nb_free(q, 1) == 0)
> >               return -ENOSPC;
> >
> >       /* A, matches D */
> > -     q->prod_head++;
> > +     q->cached_prod++;
> >       return 0;
> >   }
> >
> > @@ -340,11 +336,11 @@ static inline int xskq_produce_batch_desc(struct xsk_queue *q,
> >       struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
> >       unsigned int idx;
> >
> > -     if (xskq_nb_free(q, q->prod_head, 1) == 0)
> > +     if (xskq_nb_free(q, 1) == 0)
> >               return -ENOSPC;
> >
> >       /* A, matches D */
> > -     idx = (q->prod_head++) & q->ring_mask;
> > +     idx = q->cached_prod++ & q->ring_mask;
> >       ring->desc[idx].addr = addr;
> >       ring->desc[idx].len = len;
> >
> > @@ -356,8 +352,7 @@ static inline void xskq_produce_flush_desc(struct xsk_queue *q)
> >       /* Order producer and data */
> >       smp_wmb(); /* B, matches C */
> >
> > -     q->prod_tail = q->prod_head;
> > -     WRITE_ONCE(q->ring->producer, q->prod_tail);
> > +     WRITE_ONCE(q->ring->producer, q->cached_prod);
> >   }
> >
> >   static inline bool xskq_full_desc(struct xsk_queue *q)
> > @@ -367,7 +362,7 @@ static inline bool xskq_full_desc(struct xsk_queue *q)
> >
> >   static inline bool xskq_empty_desc(struct xsk_queue *q)
> >   {
> > -     return xskq_nb_free(q, q->prod_tail, q->nentries) == q->nentries;
> > +     return xskq_nb_free(q, q->nentries) == q->nentries;
>
> I don't think this change is correct. The old code checked the number of
> free items against prod_tail (== producer). The new code changes it to
> prod_head (which is now cached_prod). xskq_nb_free is used in xsk_poll
> to set EPOLLIN. After this change EPOLLIN will be set right after
> xskq_produce_batch_desc, but it should only be set after
> xskq_produce_flush_desc, just as before, otherwise the application will
> wake up before the data is available, and it will just waste CPU cycles.

That is correct. It will be inefficient during patch 2 and 3 as this
gets fixed in patch 4. I chose this as I thought the patch progression
and simplification process would be clearer this way. So what to do
about it? Some options:

* Document this in patch 2 and keep the current order

*  Put patch 4 before patch 2 so that the code is always efficient.
This is doable, but I have the feeling it will be somewhat less clear
from a simplification perspective. The advantage, on the other hand,
is that the poll code is always efficient during the whole patch set.

/Magnus

> >   }
> >
> >   void xskq_set_umem(struct xsk_queue *q, u64 size, u64 chunk_mask);
> >
>



[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux