On Mon, 15 Mar 2021 11:10:18 +0800 Yunsheng Lin wrote: > @@ -606,6 +623,11 @@ static const u8 prio2band[TC_PRIO_MAX + 1] = { > */ > struct pfifo_fast_priv { > struct skb_array q[PFIFO_FAST_BANDS]; > + > + /* protect against data race between enqueue/dequeue and > + * qdisc->empty setting > + */ > + spinlock_t lock; > }; > > static inline struct skb_array *band2list(struct pfifo_fast_priv *priv, > @@ -623,7 +645,10 @@ static int pfifo_fast_enqueue(struct sk_buff *skb, struct Qdisc *qdisc, > unsigned int pkt_len = qdisc_pkt_len(skb); > int err; > > - err = skb_array_produce(q, skb); > + spin_lock(&priv->lock); > + err = __ptr_ring_produce(&q->ring, skb); > + WRITE_ONCE(qdisc->empty, false); > + spin_unlock(&priv->lock); > > if (unlikely(err)) { > if (qdisc_is_percpu_stats(qdisc)) > @@ -642,6 +667,7 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc *qdisc) > struct sk_buff *skb = NULL; > int band; > > + spin_lock(&priv->lock); > for (band = 0; band < PFIFO_FAST_BANDS && !skb; band++) { > struct skb_array *q = band2list(priv, band); > > @@ -655,6 +681,7 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc *qdisc) > } else { > WRITE_ONCE(qdisc->empty, true); > } > + spin_unlock(&priv->lock); > > return skb; > } I thought pfifo was supposed to be "lockless" and this change re-introduces a lock between producer and consumer, no?