Re: [PATCH bpf-next 02/12] xsk: consolidate to one single cached producer pointer

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Mon, Dec 16, 2019 at 10:46 AM Magnus Karlsson
<magnus.karlsson@xxxxxxxxx> wrote:
>
> On Fri, Dec 13, 2019 at 7:04 PM Maxim Mikityanskiy <maximmi@xxxxxxxxxxxx> wrote:
> >
> > On 2019-12-09 09:56, Magnus Karlsson wrote:
> > > Currently, the xsk ring code has two cached producer pointers:
> > > prod_head and prod_tail. This patch consolidates these two into a
> > > single one called cached_prod to make the code simpler and easier to
> > > maintain. This will be in line with the user space part of the the
> > > code found in libbpf, that only uses a single cached pointer.
> > >
> > > The Rx path only uses the two top level functions
> > > xskq_produce_batch_desc and xskq_produce_flush_desc and they both use
> > > prod_head and never prod_tail. So just move them over to
> > > cached_prod.
> > >
> > > The Tx XDP_DRV path uses xskq_produce_addr_lazy and
> > > xskq_produce_flush_addr_n and unnecessarily operates on both prod_tail
> > > and prod_cons, so move them over to just use cached_prod by skipping
> > > the intermediate step of updating prod_tail.
> > >
> > > The Tx path in XDP_SKB mode uses xskq_reserve_addr and
> > > xskq_produce_addr. They currently use both cached pointers, but we can
> > > operate on the global producer pointer in xskq_produce_addr since it
> > > has to be updated anyway, thus eliminating the use of both cached
> > > pointers. We can also remove the xskq_nb_free in xskq_produce_addr
> > > since it is already called in xskq_reserve_addr. No need to do it
> > > twice.
> > >
> > > When there is only one cached producer pointer, we can also simplify
> > > xskq_nb_free by removing one argument.
> > >
> > > Signed-off-by: Magnus Karlsson <magnus.karlsson@xxxxxxxxx>
> > > ---
> > >   net/xdp/xsk_queue.h | 49 ++++++++++++++++++++++---------------------------
> > >   1 file changed, 22 insertions(+), 27 deletions(-)
> > >
> > > diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h
> > > index a2f0ba6..d88e1a0 100644
> > > --- a/net/xdp/xsk_queue.h
> > > +++ b/net/xdp/xsk_queue.h
> > > @@ -35,8 +35,7 @@ struct xsk_queue {
> > >       u64 size;
> > >       u32 ring_mask;
> > >       u32 nentries;
> > > -     u32 prod_head;
> > > -     u32 prod_tail;
> > > +     u32 cached_prod;
> > >       u32 cons_head;
> > >       u32 cons_tail;
> > >       struct xdp_ring *ring;
> > > @@ -94,39 +93,39 @@ static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q)
> > >
> > >   static inline u32 xskq_nb_avail(struct xsk_queue *q, u32 dcnt)
> > >   {
> > > -     u32 entries = q->prod_tail - q->cons_tail;
> > > +     u32 entries = q->cached_prod - q->cons_tail;
> > >
> > >       if (entries == 0) {
> > >               /* Refresh the local pointer */
> > > -             q->prod_tail = READ_ONCE(q->ring->producer);
> > > -             entries = q->prod_tail - q->cons_tail;
> > > +             q->cached_prod = READ_ONCE(q->ring->producer);
> > > +             entries = q->cached_prod - q->cons_tail;
> > >       }
> > >
> > >       return (entries > dcnt) ? dcnt : entries;
> > >   }
> > >
> > > -static inline u32 xskq_nb_free(struct xsk_queue *q, u32 producer, u32 dcnt)
> > > +static inline u32 xskq_nb_free(struct xsk_queue *q, u32 dcnt)
> > >   {
> > > -     u32 free_entries = q->nentries - (producer - q->cons_tail);
> > > +     u32 free_entries = q->nentries - (q->cached_prod - q->cons_tail);
> > >
> > >       if (free_entries >= dcnt)
> > >               return free_entries;
> > >
> > >       /* Refresh the local tail pointer */
> > >       q->cons_tail = READ_ONCE(q->ring->consumer);
> > > -     return q->nentries - (producer - q->cons_tail);
> > > +     return q->nentries - (q->cached_prod - q->cons_tail);
> > >   }
> > >
> > >   static inline bool xskq_has_addrs(struct xsk_queue *q, u32 cnt)
> > >   {
> > > -     u32 entries = q->prod_tail - q->cons_tail;
> > > +     u32 entries = q->cached_prod - q->cons_tail;
> > >
> > >       if (entries >= cnt)
> > >               return true;
> > >
> > >       /* Refresh the local pointer. */
> > > -     q->prod_tail = READ_ONCE(q->ring->producer);
> > > -     entries = q->prod_tail - q->cons_tail;
> > > +     q->cached_prod = READ_ONCE(q->ring->producer);
> > > +     entries = q->cached_prod - q->cons_tail;
> > >
> > >       return entries >= cnt;
> > >   }
> > > @@ -220,17 +219,15 @@ static inline void xskq_discard_addr(struct xsk_queue *q)
> > >   static inline int xskq_produce_addr(struct xsk_queue *q, u64 addr)
> > >   {
> > >       struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
> > > -
> > > -     if (xskq_nb_free(q, q->prod_tail, 1) == 0)
> > > -             return -ENOSPC;
> > > +     unsigned int idx = q->ring->producer;
> > >
> > >       /* A, matches D */
> > > -     ring->desc[q->prod_tail++ & q->ring_mask] = addr;
> > > +     ring->desc[idx++ & q->ring_mask] = addr;
> > >
> > >       /* Order producer and data */
> > >       smp_wmb(); /* B, matches C */
> > >
> > > -     WRITE_ONCE(q->ring->producer, q->prod_tail);
> > > +     WRITE_ONCE(q->ring->producer, idx);
> > >       return 0;
> > >   }
> > >
> > > @@ -238,11 +235,11 @@ static inline int xskq_produce_addr_lazy(struct xsk_queue *q, u64 addr)
> > >   {
> > >       struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
> > >
> > > -     if (xskq_nb_free(q, q->prod_head, 1) == 0)
> > > +     if (xskq_nb_free(q, 1) == 0)
> > >               return -ENOSPC;
> > >
> > >       /* A, matches D */
> > > -     ring->desc[q->prod_head++ & q->ring_mask] = addr;
> > > +     ring->desc[q->cached_prod++ & q->ring_mask] = addr;
> > >       return 0;
> > >   }
> > >
> > > @@ -252,17 +249,16 @@ static inline void xskq_produce_flush_addr_n(struct xsk_queue *q,
> > >       /* Order producer and data */
> > >       smp_wmb(); /* B, matches C */
> > >
> > > -     q->prod_tail += nb_entries;
> > > -     WRITE_ONCE(q->ring->producer, q->prod_tail);
> > > +     WRITE_ONCE(q->ring->producer, q->ring->producer + nb_entries);
> > >   }
> > >
> > >   static inline int xskq_reserve_addr(struct xsk_queue *q)
> > >   {
> > > -     if (xskq_nb_free(q, q->prod_head, 1) == 0)
> > > +     if (xskq_nb_free(q, 1) == 0)
> > >               return -ENOSPC;
> > >
> > >       /* A, matches D */
> > > -     q->prod_head++;
> > > +     q->cached_prod++;
> > >       return 0;
> > >   }
> > >
> > > @@ -340,11 +336,11 @@ static inline int xskq_produce_batch_desc(struct xsk_queue *q,
> > >       struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
> > >       unsigned int idx;
> > >
> > > -     if (xskq_nb_free(q, q->prod_head, 1) == 0)
> > > +     if (xskq_nb_free(q, 1) == 0)
> > >               return -ENOSPC;
> > >
> > >       /* A, matches D */
> > > -     idx = (q->prod_head++) & q->ring_mask;
> > > +     idx = q->cached_prod++ & q->ring_mask;
> > >       ring->desc[idx].addr = addr;
> > >       ring->desc[idx].len = len;
> > >
> > > @@ -356,8 +352,7 @@ static inline void xskq_produce_flush_desc(struct xsk_queue *q)
> > >       /* Order producer and data */
> > >       smp_wmb(); /* B, matches C */
> > >
> > > -     q->prod_tail = q->prod_head;
> > > -     WRITE_ONCE(q->ring->producer, q->prod_tail);
> > > +     WRITE_ONCE(q->ring->producer, q->cached_prod);
> > >   }
> > >
> > >   static inline bool xskq_full_desc(struct xsk_queue *q)
> > > @@ -367,7 +362,7 @@ static inline bool xskq_full_desc(struct xsk_queue *q)
> > >
> > >   static inline bool xskq_empty_desc(struct xsk_queue *q)
> > >   {
> > > -     return xskq_nb_free(q, q->prod_tail, q->nentries) == q->nentries;
> > > +     return xskq_nb_free(q, q->nentries) == q->nentries;
> >
> > I don't think this change is correct. The old code checked the number of
> > free items against prod_tail (== producer). The new code changes it to
> > prod_head (which is now cached_prod). xskq_nb_free is used in xsk_poll
> > to set EPOLLIN. After this change EPOLLIN will be set right after
> > xskq_produce_batch_desc, but it should only be set after
> > xskq_produce_flush_desc, just as before, otherwise the application will
> > wake up before the data is available, and it will just waste CPU cycles.
>
> That is correct. It will be inefficient during patch 2 and 3 as this
> gets fixed in patch 4.

Looking at patch 4, I see it still uses cached_prod, not producer.
However, I see you changed it in the v2, so it should be fine now.

> I chose this as I thought the patch progression
> and simplification process would be clearer this way. So what to do
> about it? Some options:
>
> * Document this in patch 2 and keep the current order
>
> *  Put patch 4 before patch 2 so that the code is always efficient.
> This is doable, but I have the feeling it will be somewhat less clear
> from a simplification perspective. The advantage, on the other hand,
> is that the poll code is always efficient during the whole patch set.

I'm sorry that it takes long for me to answer, I'm on vacation now.
Anyway, either option looks good to me, as long as xskq_prod_is_empty
has the correct check (as in v2 patch 2).

> /Magnus
>
> > >   }
> > >
> > >   void xskq_set_umem(struct xsk_queue *q, u64 size, u64 chunk_mask);
> > >
> >



[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux