On Thu, Sep 01, 2022 at 10:17:08PM +0000, Joel Fernandes (Google) wrote: > Implement timer-based RCU lazy callback batching. The batch is flushed > whenever a certain amount of time has passed, or the batch on a > particular CPU grows too big. Also memory pressure will flush it in a > future patch. > > To handle several corner cases automagically (such as rcu_barrier() and > hotplug), we re-use bypass lists to handle lazy CBs. The bypass list > length has the lazy CB length included in it. A separate lazy CB length > counter is also introduced to keep track of the number of lazy CBs. > > Suggested-by: Paul McKenney <paulmck@xxxxxxxxxx> > Signed-off-by: Joel Fernandes (Google) <joel@xxxxxxxxxxxxxxxxx> I got this from TREE01 and TREE04: kernel/rcu/tree_nocb.h:439:46: error: ‘struct rcu_data’ has no member named ‘lazy_len’ So I moved the lazy_len field out from under CONFIG_RCU_LAZY. Thanx, Paul > --- > include/linux/rcupdate.h | 6 ++ > kernel/rcu/Kconfig | 8 ++ > kernel/rcu/rcu.h | 11 +++ > kernel/rcu/rcu_segcblist.c | 2 +- > kernel/rcu/tree.c | 130 +++++++++++++++--------- > kernel/rcu/tree.h | 13 ++- > kernel/rcu/tree_nocb.h | 198 ++++++++++++++++++++++++++++++------- > 7 files changed, 280 insertions(+), 88 deletions(-) > > diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h > index 08605ce7379d..82e8a07e0856 100644 > --- a/include/linux/rcupdate.h > +++ b/include/linux/rcupdate.h > @@ -108,6 +108,12 @@ static inline int rcu_preempt_depth(void) > > #endif /* #else #ifdef CONFIG_PREEMPT_RCU */ > > +#ifdef CONFIG_RCU_LAZY > +void call_rcu_lazy(struct rcu_head *head, rcu_callback_t func); > +#else > +#define call_rcu_lazy(head, func) call_rcu(head, func) > +#endif > + > /* Internal to kernel */ > void rcu_init(void); > extern int rcu_scheduler_active; > diff --git a/kernel/rcu/Kconfig b/kernel/rcu/Kconfig > index d471d22a5e21..3128d01427cb 100644 > --- a/kernel/rcu/Kconfig > +++ b/kernel/rcu/Kconfig > @@ -311,4 +311,12 @@ config TASKS_TRACE_RCU_READ_MB > Say N here if you hate read-side memory barriers. > Take the default if you are unsure. > > +config RCU_LAZY > + bool "RCU callback lazy invocation functionality" > + depends on RCU_NOCB_CPU > + default n > + help > + To save power, batch RCU callbacks and flush after delay, memory > + pressure or callback list growing too big. > + > endmenu # "RCU Subsystem" > diff --git a/kernel/rcu/rcu.h b/kernel/rcu/rcu.h > index be5979da07f5..94675f14efe8 100644 > --- a/kernel/rcu/rcu.h > +++ b/kernel/rcu/rcu.h > @@ -474,6 +474,14 @@ enum rcutorture_type { > INVALID_RCU_FLAVOR > }; > > +#if defined(CONFIG_RCU_LAZY) > +unsigned long rcu_lazy_get_jiffies_till_flush(void); > +void rcu_lazy_set_jiffies_till_flush(unsigned long j); > +#else > +static inline unsigned long rcu_lazy_get_jiffies_till_flush(void) { return 0; } > +static inline void rcu_lazy_set_jiffies_till_flush(unsigned long j) { } > +#endif > + > #if defined(CONFIG_TREE_RCU) > void rcutorture_get_gp_data(enum rcutorture_type test_type, int *flags, > unsigned long *gp_seq); > @@ -483,6 +491,8 @@ void do_trace_rcu_torture_read(const char *rcutorturename, > unsigned long c_old, > unsigned long c); > void rcu_gp_set_torture_wait(int duration); > +void rcu_force_call_rcu_to_lazy(bool force); > + > #else > static inline void rcutorture_get_gp_data(enum rcutorture_type test_type, > int *flags, unsigned long *gp_seq) > @@ -501,6 +511,7 @@ void do_trace_rcu_torture_read(const char *rcutorturename, > do { } while (0) > #endif > static inline void rcu_gp_set_torture_wait(int duration) { } > +static inline void rcu_force_call_rcu_to_lazy(bool force) { } > #endif > > #if IS_ENABLED(CONFIG_RCU_TORTURE_TEST) || IS_MODULE(CONFIG_RCU_TORTURE_TEST) > diff --git a/kernel/rcu/rcu_segcblist.c b/kernel/rcu/rcu_segcblist.c > index c54ea2b6a36b..55b50e592986 100644 > --- a/kernel/rcu/rcu_segcblist.c > +++ b/kernel/rcu/rcu_segcblist.c > @@ -38,7 +38,7 @@ void rcu_cblist_enqueue(struct rcu_cblist *rclp, struct rcu_head *rhp) > * element of the second rcu_cblist structure, but ensuring that the second > * rcu_cblist structure, if initially non-empty, always appears non-empty > * throughout the process. If rdp is NULL, the second rcu_cblist structure > - * is instead initialized to empty. > + * is instead initialized to empty. Also account for lazy_len for lazy CBs. > */ > void rcu_cblist_flush_enqueue(struct rcu_cblist *drclp, > struct rcu_cblist *srclp, > diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c > index 9fe581be8696..aaced29a0a71 100644 > --- a/kernel/rcu/tree.c > +++ b/kernel/rcu/tree.c > @@ -2728,47 +2728,8 @@ static void check_cb_ovld(struct rcu_data *rdp) > raw_spin_unlock_rcu_node(rnp); > } > > -/** > - * call_rcu() - Queue an RCU callback for invocation after a grace period. > - * @head: structure to be used for queueing the RCU updates. > - * @func: actual callback function to be invoked after the grace period > - * > - * The callback function will be invoked some time after a full grace > - * period elapses, in other words after all pre-existing RCU read-side > - * critical sections have completed. However, the callback function > - * might well execute concurrently with RCU read-side critical sections > - * that started after call_rcu() was invoked. > - * > - * RCU read-side critical sections are delimited by rcu_read_lock() > - * and rcu_read_unlock(), and may be nested. In addition, but only in > - * v5.0 and later, regions of code across which interrupts, preemption, > - * or softirqs have been disabled also serve as RCU read-side critical > - * sections. This includes hardware interrupt handlers, softirq handlers, > - * and NMI handlers. > - * > - * Note that all CPUs must agree that the grace period extended beyond > - * all pre-existing RCU read-side critical section. On systems with more > - * than one CPU, this means that when "func()" is invoked, each CPU is > - * guaranteed to have executed a full memory barrier since the end of its > - * last RCU read-side critical section whose beginning preceded the call > - * to call_rcu(). It also means that each CPU executing an RCU read-side > - * critical section that continues beyond the start of "func()" must have > - * executed a memory barrier after the call_rcu() but before the beginning > - * of that RCU read-side critical section. Note that these guarantees > - * include CPUs that are offline, idle, or executing in user mode, as > - * well as CPUs that are executing in the kernel. > - * > - * Furthermore, if CPU A invoked call_rcu() and CPU B invoked the > - * resulting RCU callback function "func()", then both CPU A and CPU B are > - * guaranteed to execute a full memory barrier during the time interval > - * between the call to call_rcu() and the invocation of "func()" -- even > - * if CPU A and CPU B are the same CPU (but again only if the system has > - * more than one CPU). > - * > - * Implementation of these memory-ordering guarantees is described here: > - * Documentation/RCU/Design/Memory-Ordering/Tree-RCU-Memory-Ordering.rst. > - */ > -void call_rcu(struct rcu_head *head, rcu_callback_t func) > +static void > +__call_rcu_common(struct rcu_head *head, rcu_callback_t func, bool lazy) > { > static atomic_t doublefrees; > unsigned long flags; > @@ -2818,7 +2779,7 @@ void call_rcu(struct rcu_head *head, rcu_callback_t func) > trace_rcu_callback(rcu_state.name, head, > rcu_segcblist_n_cbs(&rdp->cblist)); > > - if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags)) > + if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags, lazy)) > return; // Enqueued onto ->nocb_bypass, so just leave. > // If no-CBs CPU gets here, rcu_nocb_try_bypass() acquired ->nocb_lock. > rcu_segcblist_enqueue(&rdp->cblist, head); > @@ -2833,8 +2794,86 @@ void call_rcu(struct rcu_head *head, rcu_callback_t func) > local_irq_restore(flags); > } > } > -EXPORT_SYMBOL_GPL(call_rcu); > > +#ifdef CONFIG_RCU_LAZY > +/** > + * call_rcu_lazy() - Lazily queue RCU callback for invocation after grace period. > + * @head: structure to be used for queueing the RCU updates. > + * @func: actual callback function to be invoked after the grace period > + * > + * The callback function will be invoked some time after a full grace > + * period elapses, in other words after all pre-existing RCU read-side > + * critical sections have completed. > + * > + * Use this API instead of call_rcu() if you don't mind the callback being > + * invoked after very long periods of time on systems without memory pressure > + * and on systems which are lightly loaded or mostly idle. > + * > + * Other than the extra delay in callbacks being invoked, this function is > + * identical to, and reuses call_rcu()'s logic. Refer to call_rcu() for more > + * details about memory ordering and other functionality. > + */ > +void call_rcu_lazy(struct rcu_head *head, rcu_callback_t func) > +{ > + return __call_rcu_common(head, func, true); > +} > +EXPORT_SYMBOL_GPL(call_rcu_lazy); > +#endif > + > +static bool force_call_rcu_to_lazy; > + > +void rcu_force_call_rcu_to_lazy(bool force) > +{ > + if (IS_ENABLED(CONFIG_RCU_SCALE_TEST)) > + WRITE_ONCE(force_call_rcu_to_lazy, force); > +} > +EXPORT_SYMBOL_GPL(rcu_force_call_rcu_to_lazy); > + > +/** > + * call_rcu() - Queue an RCU callback for invocation after a grace period. > + * @head: structure to be used for queueing the RCU updates. > + * @func: actual callback function to be invoked after the grace period > + * > + * The callback function will be invoked some time after a full grace > + * period elapses, in other words after all pre-existing RCU read-side > + * critical sections have completed. However, the callback function > + * might well execute concurrently with RCU read-side critical sections > + * that started after call_rcu() was invoked. > + * > + * RCU read-side critical sections are delimited by rcu_read_lock() > + * and rcu_read_unlock(), and may be nested. In addition, but only in > + * v5.0 and later, regions of code across which interrupts, preemption, > + * or softirqs have been disabled also serve as RCU read-side critical > + * sections. This includes hardware interrupt handlers, softirq handlers, > + * and NMI handlers. > + * > + * Note that all CPUs must agree that the grace period extended beyond > + * all pre-existing RCU read-side critical section. On systems with more > + * than one CPU, this means that when "func()" is invoked, each CPU is > + * guaranteed to have executed a full memory barrier since the end of its > + * last RCU read-side critical section whose beginning preceded the call > + * to call_rcu(). It also means that each CPU executing an RCU read-side > + * critical section that continues beyond the start of "func()" must have > + * executed a memory barrier after the call_rcu() but before the beginning > + * of that RCU read-side critical section. Note that these guarantees > + * include CPUs that are offline, idle, or executing in user mode, as > + * well as CPUs that are executing in the kernel. > + * > + * Furthermore, if CPU A invoked call_rcu() and CPU B invoked the > + * resulting RCU callback function "func()", then both CPU A and CPU B are > + * guaranteed to execute a full memory barrier during the time interval > + * between the call to call_rcu() and the invocation of "func()" -- even > + * if CPU A and CPU B are the same CPU (but again only if the system has > + * more than one CPU). > + * > + * Implementation of these memory-ordering guarantees is described here: > + * Documentation/RCU/Design/Memory-Ordering/Tree-RCU-Memory-Ordering.rst. > + */ > +void call_rcu(struct rcu_head *head, rcu_callback_t func) > +{ > + return __call_rcu_common(head, func, force_call_rcu_to_lazy); > +} > +EXPORT_SYMBOL_GPL(call_rcu); > > /* Maximum number of jiffies to wait before draining a batch. */ > #define KFREE_DRAIN_JIFFIES (5 * HZ) > @@ -3904,7 +3943,8 @@ static void rcu_barrier_entrain(struct rcu_data *rdp) > rdp->barrier_head.func = rcu_barrier_callback; > debug_rcu_head_queue(&rdp->barrier_head); > rcu_nocb_lock(rdp); > - WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies)); > + WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies, false, > + /* wake gp thread */ true)); > if (rcu_segcblist_entrain(&rdp->cblist, &rdp->barrier_head)) { > atomic_inc(&rcu_state.barrier_cpu_count); > } else { > @@ -4325,7 +4365,7 @@ void rcutree_migrate_callbacks(int cpu) > my_rdp = this_cpu_ptr(&rcu_data); > my_rnp = my_rdp->mynode; > rcu_nocb_lock(my_rdp); /* irqs already disabled. */ > - WARN_ON_ONCE(!rcu_nocb_flush_bypass(my_rdp, NULL, jiffies)); > + WARN_ON_ONCE(!rcu_nocb_flush_bypass(my_rdp, NULL, jiffies, false, false)); > raw_spin_lock_rcu_node(my_rnp); /* irqs already disabled. */ > /* Leverage recent GPs and set GP for new callbacks. */ > needwake = rcu_advance_cbs(my_rnp, rdp) || > diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h > index d4a97e40ea9c..946d819b23fc 100644 > --- a/kernel/rcu/tree.h > +++ b/kernel/rcu/tree.h > @@ -263,14 +263,18 @@ struct rcu_data { > unsigned long last_fqs_resched; /* Time of last rcu_resched(). */ > unsigned long last_sched_clock; /* Jiffies of last rcu_sched_clock_irq(). */ > > +#ifdef CONFIG_RCU_LAZY > + long lazy_len; /* Length of buffered lazy callbacks. */ > +#endif > int cpu; > }; > > /* Values for nocb_defer_wakeup field in struct rcu_data. */ > #define RCU_NOCB_WAKE_NOT 0 > #define RCU_NOCB_WAKE_BYPASS 1 > -#define RCU_NOCB_WAKE 2 > -#define RCU_NOCB_WAKE_FORCE 3 > +#define RCU_NOCB_WAKE_LAZY 2 > +#define RCU_NOCB_WAKE 3 > +#define RCU_NOCB_WAKE_FORCE 4 > > #define RCU_JIFFIES_TILL_FORCE_QS (1 + (HZ > 250) + (HZ > 500)) > /* For jiffies_till_first_fqs and */ > @@ -440,9 +444,10 @@ static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp); > static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq); > static void rcu_init_one_nocb(struct rcu_node *rnp); > static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > - unsigned long j); > + unsigned long j, bool lazy, bool wakegp); > static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > - bool *was_alldone, unsigned long flags); > + bool *was_alldone, unsigned long flags, > + bool lazy); > static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_empty, > unsigned long flags); > static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp, int level); > diff --git a/kernel/rcu/tree_nocb.h b/kernel/rcu/tree_nocb.h > index 31068dd31315..7e97a7b6e046 100644 > --- a/kernel/rcu/tree_nocb.h > +++ b/kernel/rcu/tree_nocb.h > @@ -256,6 +256,31 @@ static bool wake_nocb_gp(struct rcu_data *rdp, bool force) > return __wake_nocb_gp(rdp_gp, rdp, force, flags); > } > > +/* > + * LAZY_FLUSH_JIFFIES decides the maximum amount of time that > + * can elapse before lazy callbacks are flushed. Lazy callbacks > + * could be flushed much earlier for a number of other reasons > + * however, LAZY_FLUSH_JIFFIES will ensure no lazy callbacks are > + * left unsubmitted to RCU after those many jiffies. > + */ > +#define LAZY_FLUSH_JIFFIES (10 * HZ) > +unsigned long jiffies_till_flush = LAZY_FLUSH_JIFFIES; > + > +#ifdef CONFIG_RCU_LAZY > +// To be called only from test code. > +void rcu_lazy_set_jiffies_till_flush(unsigned long jif) > +{ > + jiffies_till_flush = jif; > +} > +EXPORT_SYMBOL(rcu_lazy_set_jiffies_till_flush); > + > +unsigned long rcu_lazy_get_jiffies_till_flush(void) > +{ > + return jiffies_till_flush; > +} > +EXPORT_SYMBOL(rcu_lazy_get_jiffies_till_flush); > +#endif > + > /* > * Arrange to wake the GP kthread for this NOCB group at some future > * time when it is safe to do so. > @@ -265,23 +290,39 @@ static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype, > { > unsigned long flags; > struct rcu_data *rdp_gp = rdp->nocb_gp_rdp; > + unsigned long mod_jif = 0; > > raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags); > > /* > - * Bypass wakeup overrides previous deferments. In case > - * of callback storm, no need to wake up too early. > + * Bypass and lazy wakeup overrides previous deferments. In case of > + * callback storm, no need to wake up too early. > */ > - if (waketype == RCU_NOCB_WAKE_BYPASS) { > - mod_timer(&rdp_gp->nocb_timer, jiffies + 2); > - WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype); > - } else { > + switch (waketype) { > + case RCU_NOCB_WAKE_LAZY: > + if (rdp->nocb_defer_wakeup != RCU_NOCB_WAKE_LAZY) > + mod_jif = jiffies_till_flush; > + break; > + > + case RCU_NOCB_WAKE_BYPASS: > + mod_jif = 2; > + break; > + > + case RCU_NOCB_WAKE: > + case RCU_NOCB_WAKE_FORCE: > + // For these, make it wake up the soonest if we > + // were in a bypass or lazy sleep before. > if (rdp_gp->nocb_defer_wakeup < RCU_NOCB_WAKE) > - mod_timer(&rdp_gp->nocb_timer, jiffies + 1); > - if (rdp_gp->nocb_defer_wakeup < waketype) > - WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype); > + mod_jif = 1; > + break; > } > > + if (mod_jif) > + mod_timer(&rdp_gp->nocb_timer, jiffies + mod_jif); > + > + if (rdp_gp->nocb_defer_wakeup < waketype) > + WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype); > + > raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags); > > trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, reason); > @@ -293,10 +334,13 @@ static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype, > * proves to be initially empty, just return false because the no-CB GP > * kthread may need to be awakened in this case. > * > + * Return true if there was something to be flushed and it succeeded, otherwise > + * false. > + * > * Note that this function always returns true if rhp is NULL. > */ > static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > - unsigned long j) > + unsigned long j, bool lazy) > { > struct rcu_cblist rcl; > > @@ -310,7 +354,18 @@ static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > /* Note: ->cblist.len already accounts for ->nocb_bypass contents. */ > if (rhp) > rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */ > - rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp); > + > + /* > + * If the new CB requested was a lazy one, queue it onto the main > + * ->cblist so we can take advantage of a sooner grade period. > + */ > + if (lazy && rhp) { > + rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, NULL); > + rcu_cblist_enqueue(&rcl, rhp); > + } else { > + rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp); > + } > + > rcu_segcblist_insert_pend_cbs(&rdp->cblist, &rcl); > WRITE_ONCE(rdp->nocb_bypass_first, j); > rcu_nocb_bypass_unlock(rdp); > @@ -326,13 +381,20 @@ static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > * Note that this function always returns true if rhp is NULL. > */ > static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > - unsigned long j) > + unsigned long j, bool lazy, bool wake_gp) > { > + bool ret; > + > if (!rcu_rdp_is_offloaded(rdp)) > return true; > rcu_lockdep_assert_cblist_protected(rdp); > rcu_nocb_bypass_lock(rdp); > - return rcu_nocb_do_flush_bypass(rdp, rhp, j); > + ret = rcu_nocb_do_flush_bypass(rdp, rhp, j, lazy); > + > + if (wake_gp) > + wake_nocb_gp(rdp, true); > + > + return ret; > } > > /* > @@ -345,7 +407,7 @@ static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j) > if (!rcu_rdp_is_offloaded(rdp) || > !rcu_nocb_bypass_trylock(rdp)) > return; > - WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j)); > + WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j, false)); > } > > /* > @@ -367,12 +429,14 @@ static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j) > * there is only one CPU in operation. > */ > static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > - bool *was_alldone, unsigned long flags) > + bool *was_alldone, unsigned long flags, > + bool lazy) > { > unsigned long c; > unsigned long cur_gp_seq; > unsigned long j = jiffies; > long ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); > + bool bypass_is_lazy = (ncbs == READ_ONCE(rdp->lazy_len)); > > lockdep_assert_irqs_disabled(); > > @@ -417,23 +481,29 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > // If there hasn't yet been all that many ->cblist enqueues > // this jiffy, tell the caller to enqueue onto ->cblist. But flush > // ->nocb_bypass first. > - if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) { > + // Lazy CBs throttle this back and do immediate bypass queuing. > + if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy && !lazy) { > rcu_nocb_lock(rdp); > *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); > if (*was_alldone) > trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, > TPS("FirstQ")); > - WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j)); > + > + WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j, false, false)); > + > WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass)); > return false; // Caller must enqueue the callback. > } > > // If ->nocb_bypass has been used too long or is too full, > // flush ->nocb_bypass to ->cblist. > - if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) || > + if ((ncbs && !bypass_is_lazy && j != READ_ONCE(rdp->nocb_bypass_first)) || > + (ncbs && bypass_is_lazy && > + (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + jiffies_till_flush))) || > ncbs >= qhimark) { > rcu_nocb_lock(rdp); > - if (!rcu_nocb_flush_bypass(rdp, rhp, j)) { > + > + if (!rcu_nocb_flush_bypass(rdp, rhp, j, lazy, false)) { > *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); > if (*was_alldone) > trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, > @@ -460,16 +530,29 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > // We need to use the bypass. > rcu_nocb_wait_contended(rdp); > rcu_nocb_bypass_lock(rdp); > + > ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); > rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */ > rcu_cblist_enqueue(&rdp->nocb_bypass, rhp); > + > + if (IS_ENABLED(CONFIG_RCU_LAZY) && lazy) > + WRITE_ONCE(rdp->lazy_len, rdp->lazy_len + 1); > + > if (!ncbs) { > WRITE_ONCE(rdp->nocb_bypass_first, j); > trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FirstBQ")); > } > + > rcu_nocb_bypass_unlock(rdp); > smp_mb(); /* Order enqueue before wake. */ > - if (ncbs) { > + > + // We had CBs in the bypass list before. There is nothing else to do if: > + // There were only non-lazy CBs before, in this case, the bypass timer > + // or GP-thread will handle the CBs including any new lazy ones. > + // Or, the new CB is lazy and the old bypass-CBs were also lazy. In this > + // case the old lazy timer would have been setup. When that expires, > + // the new lazy one will be handled. > + if (ncbs && (!bypass_is_lazy || lazy)) { > local_irq_restore(flags); > } else { > // No-CBs GP kthread might be indefinitely asleep, if so, wake. > @@ -478,6 +561,10 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, > TPS("FirstBQwake")); > __call_rcu_nocb_wake(rdp, true, flags); > + } else if (bypass_is_lazy && !lazy) { > + trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, > + TPS("FirstBQwakeLazy2Non")); > + __call_rcu_nocb_wake(rdp, true, flags); > } else { > trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, > TPS("FirstBQnoWake")); > @@ -499,7 +586,7 @@ static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone, > { > unsigned long cur_gp_seq; > unsigned long j; > - long len; > + long len, lazy_len, bypass_len; > struct task_struct *t; > > // If we are being polled or there is no kthread, just leave. > @@ -512,9 +599,16 @@ static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone, > } > // Need to actually to a wakeup. > len = rcu_segcblist_n_cbs(&rdp->cblist); > + bypass_len = rcu_cblist_n_cbs(&rdp->nocb_bypass); > + lazy_len = READ_ONCE(rdp->lazy_len); > if (was_alldone) { > rdp->qlen_last_fqs_check = len; > - if (!irqs_disabled_flags(flags)) { > + // Only lazy CBs in bypass list > + if (lazy_len && bypass_len == lazy_len) { > + rcu_nocb_unlock_irqrestore(rdp, flags); > + wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE_LAZY, > + TPS("WakeLazy")); > + } else if (!irqs_disabled_flags(flags)) { > /* ... if queue was empty ... */ > rcu_nocb_unlock_irqrestore(rdp, flags); > wake_nocb_gp(rdp, false); > @@ -604,8 +698,8 @@ static void nocb_gp_sleep(struct rcu_data *my_rdp, int cpu) > */ > static void nocb_gp_wait(struct rcu_data *my_rdp) > { > - bool bypass = false; > - long bypass_ncbs; > + bool bypass = false, lazy = false; > + long bypass_ncbs, lazy_ncbs; > int __maybe_unused cpu = my_rdp->cpu; > unsigned long cur_gp_seq; > unsigned long flags; > @@ -640,24 +734,41 @@ static void nocb_gp_wait(struct rcu_data *my_rdp) > * won't be ignored for long. > */ > list_for_each_entry(rdp, &my_rdp->nocb_head_rdp, nocb_entry_rdp) { > + bool flush_bypass = false; > + > trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Check")); > rcu_nocb_lock_irqsave(rdp, flags); > lockdep_assert_held(&rdp->nocb_lock); > bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); > - if (bypass_ncbs && > + lazy_ncbs = READ_ONCE(rdp->lazy_len); > + > + if (bypass_ncbs && (lazy_ncbs == bypass_ncbs) && > + (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + jiffies_till_flush) || > + bypass_ncbs > 2 * qhimark)) { > + flush_bypass = true; > + } else if (bypass_ncbs && (lazy_ncbs != bypass_ncbs) && > (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + 1) || > bypass_ncbs > 2 * qhimark)) { > - // Bypass full or old, so flush it. > - (void)rcu_nocb_try_flush_bypass(rdp, j); > - bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); > + flush_bypass = true; > } else if (!bypass_ncbs && rcu_segcblist_empty(&rdp->cblist)) { > rcu_nocb_unlock_irqrestore(rdp, flags); > continue; /* No callbacks here, try next. */ > } > + > + if (flush_bypass) { > + // Bypass full or old, so flush it. > + (void)rcu_nocb_try_flush_bypass(rdp, j); > + bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); > + lazy_ncbs = READ_ONCE(rdp->lazy_len); > + } > + > if (bypass_ncbs) { > trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, > - TPS("Bypass")); > - bypass = true; > + bypass_ncbs == lazy_ncbs ? TPS("Lazy") : TPS("Bypass")); > + if (bypass_ncbs == lazy_ncbs) > + lazy = true; > + else > + bypass = true; > } > rnp = rdp->mynode; > > @@ -705,12 +816,21 @@ static void nocb_gp_wait(struct rcu_data *my_rdp) > my_rdp->nocb_gp_gp = needwait_gp; > my_rdp->nocb_gp_seq = needwait_gp ? wait_gp_seq : 0; > > - if (bypass && !rcu_nocb_poll) { > - // At least one child with non-empty ->nocb_bypass, so set > - // timer in order to avoid stranding its callbacks. > - wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_BYPASS, > - TPS("WakeBypassIsDeferred")); > + // At least one child with non-empty ->nocb_bypass, so set > + // timer in order to avoid stranding its callbacks. > + if (!rcu_nocb_poll) { > + // If bypass list only has lazy CBs. Add a deferred > + // lazy wake up. > + if (lazy && !bypass) { > + wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_LAZY, > + TPS("WakeLazyIsDeferred")); > + // Otherwise add a deferred bypass wake up. > + } else if (bypass) { > + wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_BYPASS, > + TPS("WakeBypassIsDeferred")); > + } > } > + > if (rcu_nocb_poll) { > /* Polling, so trace if first poll in the series. */ > if (gotcbs) > @@ -1036,7 +1156,7 @@ static long rcu_nocb_rdp_deoffload(void *arg) > * return false, which means that future calls to rcu_nocb_try_bypass() > * will refuse to put anything into the bypass. > */ > - WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies)); > + WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies, false, false)); > /* > * Start with invoking rcu_core() early. This way if the current thread > * happens to preempt an ongoing call to rcu_core() in the middle, > @@ -1290,6 +1410,7 @@ static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp) > raw_spin_lock_init(&rdp->nocb_gp_lock); > timer_setup(&rdp->nocb_timer, do_nocb_deferred_wakeup_timer, 0); > rcu_cblist_init(&rdp->nocb_bypass); > + WRITE_ONCE(rdp->lazy_len, 0); > mutex_init(&rdp->nocb_gp_kthread_mutex); > } > > @@ -1571,13 +1692,14 @@ static void rcu_init_one_nocb(struct rcu_node *rnp) > } > > static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > - unsigned long j) > + unsigned long j, bool lazy, bool wakegp) > { > return true; > } > > static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > - bool *was_alldone, unsigned long flags) > + bool *was_alldone, unsigned long flags, > + bool lazy) > { > return false; > } > -- > 2.37.2.789.g6183377224-goog >