Re: [PATCH bpf-next 02/12] xsk: consolidate to one single cached producer pointer

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Thu, Dec 19, 2019 at 3:35 PM Maxim Mikityanskiy <maxtram95@xxxxxxxxx> wrote:
>
> On Mon, Dec 16, 2019 at 10:46 AM Magnus Karlsson
> <magnus.karlsson@xxxxxxxxx> wrote:
> >
> > On Fri, Dec 13, 2019 at 7:04 PM Maxim Mikityanskiy <maximmi@xxxxxxxxxxxx> wrote:
> > >
> > > On 2019-12-09 09:56, Magnus Karlsson wrote:
> > > > Currently, the xsk ring code has two cached producer pointers:
> > > > prod_head and prod_tail. This patch consolidates these two into a
> > > > single one called cached_prod to make the code simpler and easier to
> > > > maintain. This will be in line with the user space part of the the
> > > > code found in libbpf, that only uses a single cached pointer.
> > > >
> > > > The Rx path only uses the two top level functions
> > > > xskq_produce_batch_desc and xskq_produce_flush_desc and they both use
> > > > prod_head and never prod_tail. So just move them over to
> > > > cached_prod.
> > > >
> > > > The Tx XDP_DRV path uses xskq_produce_addr_lazy and
> > > > xskq_produce_flush_addr_n and unnecessarily operates on both prod_tail
> > > > and prod_cons, so move them over to just use cached_prod by skipping
> > > > the intermediate step of updating prod_tail.
> > > >
> > > > The Tx path in XDP_SKB mode uses xskq_reserve_addr and
> > > > xskq_produce_addr. They currently use both cached pointers, but we can
> > > > operate on the global producer pointer in xskq_produce_addr since it
> > > > has to be updated anyway, thus eliminating the use of both cached
> > > > pointers. We can also remove the xskq_nb_free in xskq_produce_addr
> > > > since it is already called in xskq_reserve_addr. No need to do it
> > > > twice.
> > > >
> > > > When there is only one cached producer pointer, we can also simplify
> > > > xskq_nb_free by removing one argument.
> > > >
> > > > Signed-off-by: Magnus Karlsson <magnus.karlsson@xxxxxxxxx>
> > > > ---
> > > >   net/xdp/xsk_queue.h | 49 ++++++++++++++++++++++---------------------------
> > > >   1 file changed, 22 insertions(+), 27 deletions(-)
> > > >
> > > > diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h
> > > > index a2f0ba6..d88e1a0 100644
> > > > --- a/net/xdp/xsk_queue.h
> > > > +++ b/net/xdp/xsk_queue.h
> > > > @@ -35,8 +35,7 @@ struct xsk_queue {
> > > >       u64 size;
> > > >       u32 ring_mask;
> > > >       u32 nentries;
> > > > -     u32 prod_head;
> > > > -     u32 prod_tail;
> > > > +     u32 cached_prod;
> > > >       u32 cons_head;
> > > >       u32 cons_tail;
> > > >       struct xdp_ring *ring;
> > > > @@ -94,39 +93,39 @@ static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q)
> > > >
> > > >   static inline u32 xskq_nb_avail(struct xsk_queue *q, u32 dcnt)
> > > >   {
> > > > -     u32 entries = q->prod_tail - q->cons_tail;
> > > > +     u32 entries = q->cached_prod - q->cons_tail;
> > > >
> > > >       if (entries == 0) {
> > > >               /* Refresh the local pointer */
> > > > -             q->prod_tail = READ_ONCE(q->ring->producer);
> > > > -             entries = q->prod_tail - q->cons_tail;
> > > > +             q->cached_prod = READ_ONCE(q->ring->producer);
> > > > +             entries = q->cached_prod - q->cons_tail;
> > > >       }
> > > >
> > > >       return (entries > dcnt) ? dcnt : entries;
> > > >   }
> > > >
> > > > -static inline u32 xskq_nb_free(struct xsk_queue *q, u32 producer, u32 dcnt)
> > > > +static inline u32 xskq_nb_free(struct xsk_queue *q, u32 dcnt)
> > > >   {
> > > > -     u32 free_entries = q->nentries - (producer - q->cons_tail);
> > > > +     u32 free_entries = q->nentries - (q->cached_prod - q->cons_tail);
> > > >
> > > >       if (free_entries >= dcnt)
> > > >               return free_entries;
> > > >
> > > >       /* Refresh the local tail pointer */
> > > >       q->cons_tail = READ_ONCE(q->ring->consumer);
> > > > -     return q->nentries - (producer - q->cons_tail);
> > > > +     return q->nentries - (q->cached_prod - q->cons_tail);
> > > >   }
> > > >
> > > >   static inline bool xskq_has_addrs(struct xsk_queue *q, u32 cnt)
> > > >   {
> > > > -     u32 entries = q->prod_tail - q->cons_tail;
> > > > +     u32 entries = q->cached_prod - q->cons_tail;
> > > >
> > > >       if (entries >= cnt)
> > > >               return true;
> > > >
> > > >       /* Refresh the local pointer. */
> > > > -     q->prod_tail = READ_ONCE(q->ring->producer);
> > > > -     entries = q->prod_tail - q->cons_tail;
> > > > +     q->cached_prod = READ_ONCE(q->ring->producer);
> > > > +     entries = q->cached_prod - q->cons_tail;
> > > >
> > > >       return entries >= cnt;
> > > >   }
> > > > @@ -220,17 +219,15 @@ static inline void xskq_discard_addr(struct xsk_queue *q)
> > > >   static inline int xskq_produce_addr(struct xsk_queue *q, u64 addr)
> > > >   {
> > > >       struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
> > > > -
> > > > -     if (xskq_nb_free(q, q->prod_tail, 1) == 0)
> > > > -             return -ENOSPC;
> > > > +     unsigned int idx = q->ring->producer;
> > > >
> > > >       /* A, matches D */
> > > > -     ring->desc[q->prod_tail++ & q->ring_mask] = addr;
> > > > +     ring->desc[idx++ & q->ring_mask] = addr;
> > > >
> > > >       /* Order producer and data */
> > > >       smp_wmb(); /* B, matches C */
> > > >
> > > > -     WRITE_ONCE(q->ring->producer, q->prod_tail);
> > > > +     WRITE_ONCE(q->ring->producer, idx);
> > > >       return 0;
> > > >   }
> > > >
> > > > @@ -238,11 +235,11 @@ static inline int xskq_produce_addr_lazy(struct xsk_queue *q, u64 addr)
> > > >   {
> > > >       struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
> > > >
> > > > -     if (xskq_nb_free(q, q->prod_head, 1) == 0)
> > > > +     if (xskq_nb_free(q, 1) == 0)
> > > >               return -ENOSPC;
> > > >
> > > >       /* A, matches D */
> > > > -     ring->desc[q->prod_head++ & q->ring_mask] = addr;
> > > > +     ring->desc[q->cached_prod++ & q->ring_mask] = addr;
> > > >       return 0;
> > > >   }
> > > >
> > > > @@ -252,17 +249,16 @@ static inline void xskq_produce_flush_addr_n(struct xsk_queue *q,
> > > >       /* Order producer and data */
> > > >       smp_wmb(); /* B, matches C */
> > > >
> > > > -     q->prod_tail += nb_entries;
> > > > -     WRITE_ONCE(q->ring->producer, q->prod_tail);
> > > > +     WRITE_ONCE(q->ring->producer, q->ring->producer + nb_entries);
> > > >   }
> > > >
> > > >   static inline int xskq_reserve_addr(struct xsk_queue *q)
> > > >   {
> > > > -     if (xskq_nb_free(q, q->prod_head, 1) == 0)
> > > > +     if (xskq_nb_free(q, 1) == 0)
> > > >               return -ENOSPC;
> > > >
> > > >       /* A, matches D */
> > > > -     q->prod_head++;
> > > > +     q->cached_prod++;
> > > >       return 0;
> > > >   }
> > > >
> > > > @@ -340,11 +336,11 @@ static inline int xskq_produce_batch_desc(struct xsk_queue *q,
> > > >       struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
> > > >       unsigned int idx;
> > > >
> > > > -     if (xskq_nb_free(q, q->prod_head, 1) == 0)
> > > > +     if (xskq_nb_free(q, 1) == 0)
> > > >               return -ENOSPC;
> > > >
> > > >       /* A, matches D */
> > > > -     idx = (q->prod_head++) & q->ring_mask;
> > > > +     idx = q->cached_prod++ & q->ring_mask;
> > > >       ring->desc[idx].addr = addr;
> > > >       ring->desc[idx].len = len;
> > > >
> > > > @@ -356,8 +352,7 @@ static inline void xskq_produce_flush_desc(struct xsk_queue *q)
> > > >       /* Order producer and data */
> > > >       smp_wmb(); /* B, matches C */
> > > >
> > > > -     q->prod_tail = q->prod_head;
> > > > -     WRITE_ONCE(q->ring->producer, q->prod_tail);
> > > > +     WRITE_ONCE(q->ring->producer, q->cached_prod);
> > > >   }
> > > >
> > > >   static inline bool xskq_full_desc(struct xsk_queue *q)
> > > > @@ -367,7 +362,7 @@ static inline bool xskq_full_desc(struct xsk_queue *q)
> > > >
> > > >   static inline bool xskq_empty_desc(struct xsk_queue *q)
> > > >   {
> > > > -     return xskq_nb_free(q, q->prod_tail, q->nentries) == q->nentries;
> > > > +     return xskq_nb_free(q, q->nentries) == q->nentries;
> > >
> > > I don't think this change is correct. The old code checked the number of
> > > free items against prod_tail (== producer). The new code changes it to
> > > prod_head (which is now cached_prod). xskq_nb_free is used in xsk_poll
> > > to set EPOLLIN. After this change EPOLLIN will be set right after
> > > xskq_produce_batch_desc, but it should only be set after
> > > xskq_produce_flush_desc, just as before, otherwise the application will
> > > wake up before the data is available, and it will just waste CPU cycles.
> >
> > That is correct. It will be inefficient during patch 2 and 3 as this
> > gets fixed in patch 4.
>
> Looking at patch 4, I see it still uses cached_prod, not producer.
> However, I see you changed it in the v2, so it should be fine now.
>
> > I chose this as I thought the patch progression
> > and simplification process would be clearer this way. So what to do
> > about it? Some options:
> >
> > * Document this in patch 2 and keep the current order
> >
> > *  Put patch 4 before patch 2 so that the code is always efficient.
> > This is doable, but I have the feeling it will be somewhat less clear
> > from a simplification perspective. The advantage, on the other hand,
> > is that the poll code is always efficient during the whole patch set.
>
> I'm sorry that it takes long for me to answer, I'm on vacation now.
> Anyway, either option looks good to me, as long as xskq_prod_is_empty
> has the correct check (as in v2 patch 2).

Thanks for taking a look at the patch during your vacation Maxim. But
now, please go and enjoy the holidays :-)!

/Magnus

> > /Magnus
> >
> > > >   }
> > > >
> > > >   void xskq_set_umem(struct xsk_queue *q, u64 size, u64 chunk_mask);
> > > >
> > >



[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux