When going through the lazy-rcu work, I noticed that rcu_barrier_entrain() does not really wake up the rcuog GP thread in any path after entraining. This means it is possible the GP thread is not awakened soon (say there were no CBs in the cblist after entraining time). Further, nothing appears to be calling the rcu_barrier callback directly in the case the ->cblist was empty which means if the IPI gets delayed enough to make the ->cblist empty and it turns out to be the last CPU holding, then nothing calls completes rcu_state.barrier_completion. Fix both these issues. A note on the wakeup, there are 3 cases AFAICS after the call to rcu_nocb_flush_bypass(): 1. The rdp->cblist has pending CBs. 2. The rdp->cblist has all done CBs. 3. The rdp->cblist has no CBs at all (say the IPI took a long time to arrive and some other path dequeued them in the meanwhile). For #3, entraining a CB is not needed and we should bail. For #1 and needed. But for #2 it is needed. Signed-off-by: Joel Fernandes (Google) <joel@xxxxxxxxxxxxxxxxx> --- I only build tested this and wanted to post it in advance for discussions. I will test it more soon. Thanks. kernel/rcu/tree.c | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c index 18f07e167d5e..65d439286757 100644 --- a/kernel/rcu/tree.c +++ b/kernel/rcu/tree.c @@ -3904,10 +3904,11 @@ static void rcu_barrier_callback(struct rcu_head *rhp) /* * If needed, entrain an rcu_barrier() callback on rdp->cblist. */ -static void rcu_barrier_entrain(struct rcu_data *rdp) +static void rcu_barrier_entrain(struct rcu_data *rdp, unsigned long flags) { unsigned long gseq = READ_ONCE(rcu_state.barrier_sequence); unsigned long lseq = READ_ONCE(rdp->barrier_seq_snap); + bool was_alldone; lockdep_assert_held(&rcu_state.barrier_lock); if (rcu_seq_state(lseq) || !rcu_seq_state(gseq) || rcu_seq_ctr(lseq) != rcu_seq_ctr(gseq)) @@ -3916,14 +3917,20 @@ static void rcu_barrier_entrain(struct rcu_data *rdp) rdp->barrier_head.func = rcu_barrier_callback; debug_rcu_head_queue(&rdp->barrier_head); rcu_nocb_lock(rdp); + was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies)); + if (rcu_segcblist_entrain(&rdp->cblist, &rdp->barrier_head)) { atomic_inc(&rcu_state.barrier_cpu_count); + __call_rcu_nocb_wake(rdp, was_alldone, flags); /* unlocks */ } else { + /* rdp->cblist is empty so directly call the callback. */ + atomic_inc(&rcu_state.barrier_cpu_count); + rcu_barrier_callback(&rdp->barrier_head); debug_rcu_head_unqueue(&rdp->barrier_head); rcu_barrier_trace(TPS("IRQNQ"), -1, rcu_state.barrier_sequence); + rcu_nocb_unlock(rdp); } - rcu_nocb_unlock(rdp); smp_store_release(&rdp->barrier_seq_snap, gseq); } @@ -3932,15 +3939,16 @@ static void rcu_barrier_entrain(struct rcu_data *rdp) */ static void rcu_barrier_handler(void *cpu_in) { + unsigned long flags; uintptr_t cpu = (uintptr_t)cpu_in; struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu); lockdep_assert_irqs_disabled(); WARN_ON_ONCE(cpu != rdp->cpu); WARN_ON_ONCE(cpu != smp_processor_id()); - raw_spin_lock(&rcu_state.barrier_lock); - rcu_barrier_entrain(rdp); - raw_spin_unlock(&rcu_state.barrier_lock); + raw_spin_lock_irqsave(&rcu_state.barrier_lock, flags); + rcu_barrier_entrain(rdp, flags); + raw_spin_unlock_irqrestore(&rcu_state.barrier_lock, flags); } /** @@ -4007,7 +4015,7 @@ void rcu_barrier(void) continue; } if (!rcu_rdp_cpu_online(rdp)) { - rcu_barrier_entrain(rdp); + rcu_barrier_entrain(rdp, flags); WARN_ON_ONCE(READ_ONCE(rdp->barrier_seq_snap) != gseq); raw_spin_unlock_irqrestore(&rcu_state.barrier_lock, flags); rcu_barrier_trace(TPS("OfflineNoCBQ"), cpu, rcu_state.barrier_sequence); @@ -4333,7 +4341,7 @@ void rcutree_migrate_callbacks(int cpu) raw_spin_lock_irqsave(&rcu_state.barrier_lock, flags); WARN_ON_ONCE(rcu_rdp_cpu_online(rdp)); - rcu_barrier_entrain(rdp); + rcu_barrier_entrain(rdp, flags); my_rdp = this_cpu_ptr(&rcu_data); my_rnp = my_rdp->mynode; rcu_nocb_lock(my_rdp); /* irqs already disabled. */ -- 2.37.3.968.ga6b4b080e4-goog