Hello, Joel! Sorry for late checking, see below few comments: > In the synchronize_rcu() common case, we will have less than > SR_MAX_USERS_WAKE_FROM_GP number of users per GP. Waking up the kworker > is pointless just to free the last injected wait head since at that point, > all the users have already been awakened. > > Introduce a new counter to track this and prevent the wakeup in the > common case. > > Signed-off-by: Joel Fernandes (Google) <joel@xxxxxxxxxxxxxxxxx> > --- > Rebased on paul/dev of today. > > kernel/rcu/tree.c | 36 +++++++++++++++++++++++++++++++----- > kernel/rcu/tree.h | 1 + > 2 files changed, 32 insertions(+), 5 deletions(-) > > diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c > index 9fbb5ab57c84..bd29fe3c76bf 100644 > --- a/kernel/rcu/tree.c > +++ b/kernel/rcu/tree.c > @@ -96,6 +96,7 @@ static struct rcu_state rcu_state = { > .ofl_lock = __ARCH_SPIN_LOCK_UNLOCKED, > .srs_cleanup_work = __WORK_INITIALIZER(rcu_state.srs_cleanup_work, > rcu_sr_normal_gp_cleanup_work), > + .srs_cleanups_pending = ATOMIC_INIT(0), > }; > > /* Dump rcu_node combining tree at boot to verify correct setup. */ > @@ -1642,8 +1643,11 @@ static void rcu_sr_normal_gp_cleanup_work(struct work_struct *work) > * the done tail list manipulations are protected here. > */ > done = smp_load_acquire(&rcu_state.srs_done_tail); > - if (!done) > + if (!done) { > + /* See comments below. */ > + atomic_dec_return_release(&rcu_state.srs_cleanups_pending); > return; > + } > > WARN_ON_ONCE(!rcu_sr_is_wait_head(done)); > head = done->next; > @@ -1666,6 +1670,9 @@ static void rcu_sr_normal_gp_cleanup_work(struct work_struct *work) > > rcu_sr_put_wait_head(rcu); > } > + > + /* Order list manipulations with atomic access. */ > + atomic_dec_return_release(&rcu_state.srs_cleanups_pending); > } > > /* > @@ -1673,7 +1680,7 @@ static void rcu_sr_normal_gp_cleanup_work(struct work_struct *work) > */ > static void rcu_sr_normal_gp_cleanup(void) > { > - struct llist_node *wait_tail, *next, *rcu; > + struct llist_node *wait_tail, *next = NULL, *rcu = NULL; > int done = 0; > > wait_tail = rcu_state.srs_wait_tail; > @@ -1699,16 +1706,35 @@ static void rcu_sr_normal_gp_cleanup(void) > break; > } > > - // concurrent sr_normal_gp_cleanup work might observe this update. > - smp_store_release(&rcu_state.srs_done_tail, wait_tail); > + /* > + * Fast path, no more users to process. Remove the last wait head > + * if no inflight-workers. If there are in-flight workers, let them > + * remove the last wait head. > + */ > + WARN_ON_ONCE(!rcu); > This assumption is not correct. An "rcu" can be NULL in fact. > ASSERT_EXCLUSIVE_WRITER(rcu_state.srs_done_tail); > > + if (rcu && rcu_sr_is_wait_head(rcu) && rcu->next == NULL && > + /* Order atomic access with list manipulation. */ > + !atomic_read_acquire(&rcu_state.srs_cleanups_pending)) { > + wait_tail->next = NULL; > + rcu_sr_put_wait_head(rcu); > + smp_store_release(&rcu_state.srs_done_tail, wait_tail); > + return; > + } > + > + /* Concurrent sr_normal_gp_cleanup work might observe this update. */ > + smp_store_release(&rcu_state.srs_done_tail, wait_tail); > + > /* > * We schedule a work in order to perform a final processing > * of outstanding users(if still left) and releasing wait-heads > * added by rcu_sr_normal_gp_init() call. > */ > - queue_work(sync_wq, &rcu_state.srs_cleanup_work); > + atomic_inc(&rcu_state.srs_cleanups_pending); > + if (!queue_work(sync_wq, &rcu_state.srs_cleanup_work)) { > + atomic_dec(&rcu_state.srs_cleanups_pending); > + } > } No need an extra "{}" pair. > > /* > diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h > index bae7925c497f..affcb92a358c 100644 > --- a/kernel/rcu/tree.h > +++ b/kernel/rcu/tree.h > @@ -420,6 +420,7 @@ struct rcu_state { > struct llist_node *srs_done_tail; /* ready for GP users. */ > struct sr_wait_node srs_wait_nodes[SR_NORMAL_GP_WAIT_HEAD_MAX]; > struct work_struct srs_cleanup_work; > + atomic_t srs_cleanups_pending; /* srs inflight worker cleanups. */ > }; > > /* Values for rcu_state structure's gp_flags field. */ > -- > 2.34.1 >