On Thu, 6 May 2021 09:57:42 +0800 Yunsheng Lin wrote: > @@ -159,8 +160,37 @@ static inline bool qdisc_is_empty(const struct Qdisc *qdisc) > static inline bool qdisc_run_begin(struct Qdisc *qdisc) > { > if (qdisc->flags & TCQ_F_NOLOCK) { > + bool dont_retry = test_bit(__QDISC_STATE_MISSED, > + &qdisc->state); > + > + if (spin_trylock(&qdisc->seqlock)) > + goto nolock_empty; > + > + /* If the flag is set before doing the spin_trylock() and > + * the above spin_trylock() return false, it means other cpu > + * holding the lock will do dequeuing for us, or it wil see s/wil/will/ > + * the flag set after releasing lock and reschedule the > + * net_tx_action() to do the dequeuing. I don't understand why MISSED is checked before the trylock. Could you explain why it can't be tested directly here? > + */ > + if (dont_retry) > + return false; > + > + /* We could do set_bit() before the first spin_trylock(), > + * and avoid doing second spin_trylock() completely, then > + * we could have multi cpus doing the set_bit(). Here use > + * dont_retry to avoid doing the set_bit() and the second > + * spin_trylock(), which has 5% performance improvement than > + * doing the set_bit() before the first spin_trylock(). > + */ > + set_bit(__QDISC_STATE_MISSED, &qdisc->state); > + > + /* Retry again in case other CPU may not see the new flag > + * after it releases the lock at the end of qdisc_run_end(). > + */ > if (!spin_trylock(&qdisc->seqlock)) > return false; > + > +nolock_empty: > WRITE_ONCE(qdisc->empty, false); > } else if (qdisc_is_running(qdisc)) { > return false; > @@ -176,8 +206,13 @@ static inline bool qdisc_run_begin(struct Qdisc *qdisc) > static inline void qdisc_run_end(struct Qdisc *qdisc) > { > write_seqcount_end(&qdisc->running); > - if (qdisc->flags & TCQ_F_NOLOCK) > + if (qdisc->flags & TCQ_F_NOLOCK) { > spin_unlock(&qdisc->seqlock); > + > + if (unlikely(test_bit(__QDISC_STATE_MISSED, > + &qdisc->state))) > + __netif_schedule(qdisc); > + } > } > > static inline bool qdisc_may_bulk(const struct Qdisc *qdisc) > diff --git a/net/sched/sch_generic.c b/net/sched/sch_generic.c > index 44991ea..9bc73ea 100644 > --- a/net/sched/sch_generic.c > +++ b/net/sched/sch_generic.c > @@ -640,8 +640,10 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc *qdisc) > { > struct pfifo_fast_priv *priv = qdisc_priv(qdisc); > struct sk_buff *skb = NULL; > + bool need_retry = true; > int band; > > +retry: > for (band = 0; band < PFIFO_FAST_BANDS && !skb; band++) { > struct skb_array *q = band2list(priv, band); > > @@ -652,6 +654,16 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc *qdisc) > } > if (likely(skb)) { > qdisc_update_stats_at_dequeue(qdisc, skb); > + } else if (need_retry && > + test_and_clear_bit(__QDISC_STATE_MISSED, > + &qdisc->state)) { Why test_and_clear_bit() here? AFAICT this is the only place the bit is cleared. So the test and clear do not have to be atomic. To my limited understanding on x86 test_bit() is never a locked operation, while test_and_clear_bit() is always locked. So we'd save an atomic operation in un-contended case if we tested first and then cleared. > + /* do another dequeuing after clearing the flag to > + * avoid calling __netif_schedule(). > + */ > + smp_mb__after_atomic(); test_and_clear_bit() which returned true implies a memory barrier, AFAIU, so the barrier is not needed with the code as is. It will be needed if we switch to test_bit() + clear_bit(), but please clarify what it is paring with. > + need_retry = false; > + > + goto retry; > } else { > WRITE_ONCE(qdisc->empty, true); > }