> On Sep 24, 2022, at 5:11 PM, Paul E. McKenney <paulmck@xxxxxxxxxx> wrote: > > On Sat, Sep 24, 2022 at 12:20:00PM -0400, Joel Fernandes wrote: >> Hi Paul, >> Let’s see whether my iPhone can handle replies ;-) > > Now let's see if it can handle replies to replies! Are you using a > keyboard, or small-screen finger gestures? Haha here it comes. It’s the usual onscreen keyboard which has finger gesture support and is quite fast to type with ;-). It’s also connected to my apple watch so I can reply from there but I’ll take it one step a time ;-) >> More below: >> >>>> On Sep 23, 2022, at 5:44 PM, Paul E. McKenney <paulmck@xxxxxxxxxx> wrote: >>> >>> On Thu, Sep 22, 2022 at 10:01:01PM +0000, Joel Fernandes (Google) wrote: >>>> Implement timer-based RCU lazy callback batching. The batch is flushed >>>> whenever a certain amount of time has passed, or the batch on a >>>> particular CPU grows too big. Also memory pressure will flush it in a >>>> future patch. >>>> >>>> To handle several corner cases automagically (such as rcu_barrier() and >>>> hotplug), we re-use bypass lists to handle lazy CBs. The bypass list >>>> length has the lazy CB length included in it. A separate lazy CB length >>>> counter is also introduced to keep track of the number of lazy CBs. >>>> >>>> v5->v6: >>>> >>>> [ Frederic Weisbec: Program the lazy timer only if WAKE_NOT, since other >>>> deferral levels wake much earlier so for those it is not needed. ] >>>> >>>> [ Frederic Weisbec: Use flush flags to keep bypass API code clean. ] >>>> >>>> [ Frederic Weisbec: Make rcu_barrier() wake up only if main list empty. ] >>>> >>>> [ Frederic Weisbec: Remove extra 'else if' branch in rcu_nocb_try_bypass(). ] >>>> >>>> [ Joel: Fix issue where I was not resetting lazy_len after moving it to rdp ] >>>> >>>> [ Paul/Thomas/Joel: Make call_rcu() default lazy so users don't mess up. ] >>>> >>>> Suggested-by: Paul McKenney <paulmck@xxxxxxxxxx> >>>> Signed-off-by: Joel Fernandes (Google) <joel@xxxxxxxxxxxxxxxxx> >>> >>> I am going to put these on a local branch for testing, but a few comments >>> and questions interspersed below. >>> >>> Thanx, Paul >> >> Ok I replied below, thank you! >> >>>> include/linux/rcupdate.h | 7 ++ >>>> kernel/rcu/Kconfig | 8 ++ >>>> kernel/rcu/rcu.h | 8 ++ >>>> kernel/rcu/tiny.c | 2 +- >>>> kernel/rcu/tree.c | 133 ++++++++++++++++++---------- >>>> kernel/rcu/tree.h | 17 +++- >>>> kernel/rcu/tree_exp.h | 2 +- >>>> kernel/rcu/tree_nocb.h | 184 ++++++++++++++++++++++++++++++++------- >>>> 8 files changed, 277 insertions(+), 84 deletions(-) >>>> >>>> diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h >>>> index 08605ce7379d..40ae36904825 100644 >>>> --- a/include/linux/rcupdate.h >>>> +++ b/include/linux/rcupdate.h >>>> @@ -108,6 +108,13 @@ static inline int rcu_preempt_depth(void) >>>> >>>> #endif /* #else #ifdef CONFIG_PREEMPT_RCU */ >>>> >>>> +#ifdef CONFIG_RCU_LAZY >>>> +void call_rcu_flush(struct rcu_head *head, rcu_callback_t func); >>>> +#else >>>> +static inline void call_rcu_flush(struct rcu_head *head, >>>> + rcu_callback_t func) { call_rcu(head, func); } >>>> +#endif >>>> + >>>> /* Internal to kernel */ >>>> void rcu_init(void); >>>> extern int rcu_scheduler_active; >>>> diff --git a/kernel/rcu/Kconfig b/kernel/rcu/Kconfig >>>> index f53ad63b2bc6..edd632e68497 100644 >>>> --- a/kernel/rcu/Kconfig >>>> +++ b/kernel/rcu/Kconfig >>>> @@ -314,4 +314,12 @@ config TASKS_TRACE_RCU_READ_MB >>>> Say N here if you hate read-side memory barriers. >>>> Take the default if you are unsure. >>>> >>>> +config RCU_LAZY >>>> + bool "RCU callback lazy invocation functionality" >>>> + depends on RCU_NOCB_CPU >>>> + default n >>>> + help >>>> + To save power, batch RCU callbacks and flush after delay, memory >>>> + pressure or callback list growing too big. >>>> + >>>> endmenu # "RCU Subsystem" >>>> diff --git a/kernel/rcu/rcu.h b/kernel/rcu/rcu.h >>>> index be5979da07f5..65704cbc9df7 100644 >>>> --- a/kernel/rcu/rcu.h >>>> +++ b/kernel/rcu/rcu.h >>>> @@ -474,6 +474,14 @@ enum rcutorture_type { >>>> INVALID_RCU_FLAVOR >>>> }; >>>> >>>> +#if defined(CONFIG_RCU_LAZY) >>>> +unsigned long rcu_lazy_get_jiffies_till_flush(void); >>>> +void rcu_lazy_set_jiffies_till_flush(unsigned long j); >>>> +#else >>>> +static inline unsigned long rcu_lazy_get_jiffies_till_flush(void) { return 0; } >>>> +static inline void rcu_lazy_set_jiffies_till_flush(unsigned long j) { } >>>> +#endif >>>> + >>>> #if defined(CONFIG_TREE_RCU) >>>> void rcutorture_get_gp_data(enum rcutorture_type test_type, int *flags, >>>> unsigned long *gp_seq); >>>> diff --git a/kernel/rcu/tiny.c b/kernel/rcu/tiny.c >>>> index a33a8d4942c3..810479cf17ba 100644 >>>> --- a/kernel/rcu/tiny.c >>>> +++ b/kernel/rcu/tiny.c >>>> @@ -44,7 +44,7 @@ static struct rcu_ctrlblk rcu_ctrlblk = { >>>> >>>> void rcu_barrier(void) >>>> { >>>> - wait_rcu_gp(call_rcu); >>>> + wait_rcu_gp(call_rcu_flush); >>>> } >>>> EXPORT_SYMBOL(rcu_barrier); >>>> >>>> diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c >>>> index 5ec97e3f7468..736d0d724207 100644 >>>> --- a/kernel/rcu/tree.c >>>> +++ b/kernel/rcu/tree.c >>>> @@ -2728,47 +2728,8 @@ static void check_cb_ovld(struct rcu_data *rdp) >>>> raw_spin_unlock_rcu_node(rnp); >>>> } >>>> >>>> -/** >>>> - * call_rcu() - Queue an RCU callback for invocation after a grace period. >>>> - * @head: structure to be used for queueing the RCU updates. >>>> - * @func: actual callback function to be invoked after the grace period >>>> - * >>>> - * The callback function will be invoked some time after a full grace >>>> - * period elapses, in other words after all pre-existing RCU read-side >>>> - * critical sections have completed. However, the callback function >>>> - * might well execute concurrently with RCU read-side critical sections >>>> - * that started after call_rcu() was invoked. >>>> - * >>>> - * RCU read-side critical sections are delimited by rcu_read_lock() >>>> - * and rcu_read_unlock(), and may be nested. In addition, but only in >>>> - * v5.0 and later, regions of code across which interrupts, preemption, >>>> - * or softirqs have been disabled also serve as RCU read-side critical >>>> - * sections. This includes hardware interrupt handlers, softirq handlers, >>>> - * and NMI handlers. >>>> - * >>>> - * Note that all CPUs must agree that the grace period extended beyond >>>> - * all pre-existing RCU read-side critical section. On systems with more >>>> - * than one CPU, this means that when "func()" is invoked, each CPU is >>>> - * guaranteed to have executed a full memory barrier since the end of its >>>> - * last RCU read-side critical section whose beginning preceded the call >>>> - * to call_rcu(). It also means that each CPU executing an RCU read-side >>>> - * critical section that continues beyond the start of "func()" must have >>>> - * executed a memory barrier after the call_rcu() but before the beginning >>>> - * of that RCU read-side critical section. Note that these guarantees >>>> - * include CPUs that are offline, idle, or executing in user mode, as >>>> - * well as CPUs that are executing in the kernel. >>>> - * >>>> - * Furthermore, if CPU A invoked call_rcu() and CPU B invoked the >>>> - * resulting RCU callback function "func()", then both CPU A and CPU B are >>>> - * guaranteed to execute a full memory barrier during the time interval >>>> - * between the call to call_rcu() and the invocation of "func()" -- even >>>> - * if CPU A and CPU B are the same CPU (but again only if the system has >>>> - * more than one CPU). >>>> - * >>>> - * Implementation of these memory-ordering guarantees is described here: >>>> - * Documentation/RCU/Design/Memory-Ordering/Tree-RCU-Memory-Ordering.rst. >>>> - */ >>>> -void call_rcu(struct rcu_head *head, rcu_callback_t func) >>>> +static void >>>> +__call_rcu_common(struct rcu_head *head, rcu_callback_t func, bool lazy) >>>> { >>>> static atomic_t doublefrees; >>>> unsigned long flags; >>>> @@ -2809,7 +2770,7 @@ void call_rcu(struct rcu_head *head, rcu_callback_t func) >>>> } >>>> >>>> check_cb_ovld(rdp); >>>> - if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags)) >>>> + if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags, lazy)) >>>> return; // Enqueued onto ->nocb_bypass, so just leave. >>>> // If no-CBs CPU gets here, rcu_nocb_try_bypass() acquired ->nocb_lock. >>>> rcu_segcblist_enqueue(&rdp->cblist, head); >>>> @@ -2831,8 +2792,84 @@ void call_rcu(struct rcu_head *head, rcu_callback_t func) >>>> local_irq_restore(flags); >>>> } >>>> } >>>> -EXPORT_SYMBOL_GPL(call_rcu); >>>> >>>> +#ifdef CONFIG_RCU_LAZY >>>> +/** >>>> + * call_rcu_flush() - Queue RCU callback for invocation after grace period, and >>>> + * flush all lazy callbacks (including the new one) to the main ->cblist while >>>> + * doing so. >>>> + * >>>> + * @head: structure to be used for queueing the RCU updates. >>>> + * @func: actual callback function to be invoked after the grace period >>>> + * >>>> + * The callback function will be invoked some time after a full grace >>>> + * period elapses, in other words after all pre-existing RCU read-side >>>> + * critical sections have completed. >>>> + * >>>> + * Use this API instead of call_rcu() if you don't mind the callback being >>>> + * invoked after very long periods of time on systems without memory pressure >>>> + * and on systems which are lightly loaded or mostly idle. >>> >>> This comment is backwards, right? Shouldn't it say something like "Use >>> this API instead of call_rcu() if you don't mind burning extra power..."? >> >> Yes sorry my mistake :-(. It’s a stale comment from the rework and I’ll update it. > > Very good, thank you! Anything for you! > >>>> + * >>>> + * Other than the extra delay in callbacks being invoked, this function is >>>> + * identical to, and reuses call_rcu()'s logic. Refer to call_rcu() for more >>>> + * details about memory ordering and other functionality. >>>> + */ >>>> +void call_rcu_flush(struct rcu_head *head, rcu_callback_t func) >>>> +{ >>>> + return __call_rcu_common(head, func, false); >>>> +} >>>> +EXPORT_SYMBOL_GPL(call_rcu_flush); >>>> +#endif >>>> + >>>> +/** >>>> + * call_rcu() - Queue an RCU callback for invocation after a grace period. >>>> + * By default the callbacks are 'lazy' and are kept hidden from the main >>>> + * ->cblist to prevent starting of grace periods too soon. >>>> + * If you desire grace periods to start very soon, use call_rcu_flush(). >>>> + * >>>> + * @head: structure to be used for queueing the RCU updates. >>>> + * @func: actual callback function to be invoked after the grace period >>>> + * >>>> + * The callback function will be invoked some time after a full grace >>>> + * period elapses, in other words after all pre-existing RCU read-side >>>> + * critical sections have completed. However, the callback function >>>> + * might well execute concurrently with RCU read-side critical sections >>>> + * that started after call_rcu() was invoked. >>>> + * >>>> + * RCU read-side critical sections are delimited by rcu_read_lock() >>>> + * and rcu_read_unlock(), and may be nested. In addition, but only in >>>> + * v5.0 and later, regions of code across which interrupts, preemption, >>>> + * or softirqs have been disabled also serve as RCU read-side critical >>>> + * sections. This includes hardware interrupt handlers, softirq handlers, >>>> + * and NMI handlers. >>>> + * >>>> + * Note that all CPUs must agree that the grace period extended beyond >>>> + * all pre-existing RCU read-side critical section. On systems with more >>>> + * than one CPU, this means that when "func()" is invoked, each CPU is >>>> + * guaranteed to have executed a full memory barrier since the end of its >>>> + * last RCU read-side critical section whose beginning preceded the call >>>> + * to call_rcu(). It also means that each CPU executing an RCU read-side >>>> + * critical section that continues beyond the start of "func()" must have >>>> + * executed a memory barrier after the call_rcu() but before the beginning >>>> + * of that RCU read-side critical section. Note that these guarantees >>>> + * include CPUs that are offline, idle, or executing in user mode, as >>>> + * well as CPUs that are executing in the kernel. >>>> + * >>>> + * Furthermore, if CPU A invoked call_rcu() and CPU B invoked the >>>> + * resulting RCU callback function "func()", then both CPU A and CPU B are >>>> + * guaranteed to execute a full memory barrier during the time interval >>>> + * between the call to call_rcu() and the invocation of "func()" -- even >>>> + * if CPU A and CPU B are the same CPU (but again only if the system has >>>> + * more than one CPU). >>>> + * >>>> + * Implementation of these memory-ordering guarantees is described here: >>>> + * Documentation/RCU/Design/Memory-Ordering/Tree-RCU-Memory-Ordering.rst. >>>> + */ >>>> +void call_rcu(struct rcu_head *head, rcu_callback_t func) >>>> +{ >>>> + return __call_rcu_common(head, func, true); >>>> +} >>>> +EXPORT_SYMBOL_GPL(call_rcu); >>>> >>>> /* Maximum number of jiffies to wait before draining a batch. */ >>>> #define KFREE_DRAIN_JIFFIES (5 * HZ) >>>> @@ -3507,7 +3544,7 @@ void synchronize_rcu(void) >>>> if (rcu_gp_is_expedited()) >>>> synchronize_rcu_expedited(); >>>> else >>>> - wait_rcu_gp(call_rcu); >>>> + wait_rcu_gp(call_rcu_flush); >>>> return; >>>> } >>>> >>>> @@ -3902,7 +3939,11 @@ static void rcu_barrier_entrain(struct rcu_data *rdp) >>>> rdp->barrier_head.func = rcu_barrier_callback; >>>> debug_rcu_head_queue(&rdp->barrier_head); >>>> rcu_nocb_lock(rdp); >>>> - WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies)); >>>> + /* >>>> + * Flush the bypass list, but also wake up the GP thread as otherwise >>>> + * bypass/lazy CBs maynot be noticed, and can cause real long delays! >>>> + */ >>>> + WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies, FLUSH_BP_WAKE)); >>>> if (rcu_segcblist_entrain(&rdp->cblist, &rdp->barrier_head)) { >>>> atomic_inc(&rcu_state.barrier_cpu_count); >>>> } else { >>>> @@ -4323,7 +4364,7 @@ void rcutree_migrate_callbacks(int cpu) >>>> my_rdp = this_cpu_ptr(&rcu_data); >>>> my_rnp = my_rdp->mynode; >>>> rcu_nocb_lock(my_rdp); /* irqs already disabled. */ >>>> - WARN_ON_ONCE(!rcu_nocb_flush_bypass(my_rdp, NULL, jiffies)); >>>> + WARN_ON_ONCE(!rcu_nocb_flush_bypass(my_rdp, NULL, jiffies, FLUSH_BP_NONE)); >>>> raw_spin_lock_rcu_node(my_rnp); /* irqs already disabled. */ >>>> /* Leverage recent GPs and set GP for new callbacks. */ >>>> needwake = rcu_advance_cbs(my_rnp, rdp) || >>>> diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h >>>> index d4a97e40ea9c..361c41d642c7 100644 >>>> --- a/kernel/rcu/tree.h >>>> +++ b/kernel/rcu/tree.h >>>> @@ -263,14 +263,16 @@ struct rcu_data { >>>> unsigned long last_fqs_resched; /* Time of last rcu_resched(). */ >>>> unsigned long last_sched_clock; /* Jiffies of last rcu_sched_clock_irq(). */ >>>> >>>> + long lazy_len; /* Length of buffered lazy callbacks. */ >>> >>> Do we ever actually care about the length as opposed to whether or not all >>> the bypass callbacks are lazy? If not, a "some_nonlazy" boolean would be >>> initialed to zero and ORed with the non-laziness of the added callback. >>> Or, if there was a test anyway, simply set to 1 in the presence of a >>> non-lazy callback. And as now, gets zeroed when the bypass is flushed. >> >> We had discussed this before, and my point was >> we could use it for tracing and future extension >> as well. If it’s ok with you, I prefer to keep it this >> way than having to come back add the length later. >> >> On a minor point as well, if you really want it this >> way, I am afraid of changing this now since I tested >> this way for last several iterations and it’s easy to >> add a regression. So I prefer to keep it this way and >> then I can add more patches later on top if that’s >> Ok with you. > > No, you are right, the debug information might be helpful. > > But hey, at least I am consistent! Haha ;-) > >>> This might shorten a few lines of code. >>> >>>> int cpu; >>>> }; >>>> >>>> /* Values for nocb_defer_wakeup field in struct rcu_data. */ >>>> #define RCU_NOCB_WAKE_NOT 0 >>>> #define RCU_NOCB_WAKE_BYPASS 1 >>>> -#define RCU_NOCB_WAKE 2 >>>> -#define RCU_NOCB_WAKE_FORCE 3 >>>> +#define RCU_NOCB_WAKE_LAZY 2 >>>> +#define RCU_NOCB_WAKE 3 >>>> +#define RCU_NOCB_WAKE_FORCE 4 >>>> >>>> #define RCU_JIFFIES_TILL_FORCE_QS (1 + (HZ > 250) + (HZ > 500)) >>>> /* For jiffies_till_first_fqs and */ >>>> @@ -439,10 +441,17 @@ static void zero_cpu_stall_ticks(struct rcu_data *rdp); >>>> static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp); >>>> static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq); >>>> static void rcu_init_one_nocb(struct rcu_node *rnp); >>>> + >>>> +#define FLUSH_BP_NONE 0 >>>> +/* Is the CB being enqueued after the flush, a lazy CB? */ >>>> +#define FLUSH_BP_LAZY BIT(0) >>>> +/* Wake up nocb-GP thread after flush? */ >>>> +#define FLUSH_BP_WAKE BIT(1) >>>> static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, >>>> - unsigned long j); >>>> + unsigned long j, unsigned long flush_flags); >>>> static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, >>>> - bool *was_alldone, unsigned long flags); >>>> + bool *was_alldone, unsigned long flags, >>>> + bool lazy); >>>> static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_empty, >>>> unsigned long flags); >>>> static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp, int level); >>>> diff --git a/kernel/rcu/tree_exp.h b/kernel/rcu/tree_exp.h >>>> index 18e9b4cd78ef..5cac05600798 100644 >>>> --- a/kernel/rcu/tree_exp.h >>>> +++ b/kernel/rcu/tree_exp.h >>>> @@ -937,7 +937,7 @@ void synchronize_rcu_expedited(void) >>>> >>>> /* If expedited grace periods are prohibited, fall back to normal. */ >>>> if (rcu_gp_is_normal()) { >>>> - wait_rcu_gp(call_rcu); >>>> + wait_rcu_gp(call_rcu_flush); >>>> return; >>>> } >>>> >>>> diff --git a/kernel/rcu/tree_nocb.h b/kernel/rcu/tree_nocb.h >>>> index f77a6d7e1356..661c685aba3f 100644 >>>> --- a/kernel/rcu/tree_nocb.h >>>> +++ b/kernel/rcu/tree_nocb.h >>>> @@ -256,6 +256,31 @@ static bool wake_nocb_gp(struct rcu_data *rdp, bool force) >>>> return __wake_nocb_gp(rdp_gp, rdp, force, flags); >>>> } >>>> >>>> +/* >>>> + * LAZY_FLUSH_JIFFIES decides the maximum amount of time that >>>> + * can elapse before lazy callbacks are flushed. Lazy callbacks >>>> + * could be flushed much earlier for a number of other reasons >>>> + * however, LAZY_FLUSH_JIFFIES will ensure no lazy callbacks are >>>> + * left unsubmitted to RCU after those many jiffies. >>>> + */ >>>> +#define LAZY_FLUSH_JIFFIES (10 * HZ) >>>> +static unsigned long jiffies_till_flush = LAZY_FLUSH_JIFFIES; >>>> + >>>> +#ifdef CONFIG_RCU_LAZY >>>> +// To be called only from test code. >>>> +void rcu_lazy_set_jiffies_till_flush(unsigned long jif) >>>> +{ >>>> + jiffies_till_flush = jif; >>>> +} >>>> +EXPORT_SYMBOL(rcu_lazy_set_jiffies_till_flush); >>>> + >>>> +unsigned long rcu_lazy_get_jiffies_till_flush(void) >>>> +{ >>>> + return jiffies_till_flush; >>>> +} >>>> +EXPORT_SYMBOL(rcu_lazy_get_jiffies_till_flush); >>>> +#endif >>>> + >>>> /* >>>> * Arrange to wake the GP kthread for this NOCB group at some future >>>> * time when it is safe to do so. >>>> @@ -269,10 +294,14 @@ static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype, >>>> raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags); >>>> >>>> /* >>>> - * Bypass wakeup overrides previous deferments. In case >>>> - * of callback storm, no need to wake up too early. >>>> + * Bypass wakeup overrides previous deferments. In case of >>>> + * callback storm, no need to wake up too early. >>>> */ >>>> - if (waketype == RCU_NOCB_WAKE_BYPASS) { >>>> + if (waketype == RCU_NOCB_WAKE_LAZY >>>> + && READ_ONCE(rdp->nocb_defer_wakeup) == RCU_NOCB_WAKE_NOT) { >>> >>> Please leave the "&&" on the previous line and line up the "READ_ONCE(" >>> with the "waketype". That makes it easier to tell the condition from >>> the following code. >> >> Will do! >> >>> >>>> + mod_timer(&rdp_gp->nocb_timer, jiffies + jiffies_till_flush); >>>> + WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype); >>>> + } else if (waketype == RCU_NOCB_WAKE_BYPASS) { >>>> mod_timer(&rdp_gp->nocb_timer, jiffies + 2); >>>> WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype); >>>> } else { >>>> @@ -293,12 +322,16 @@ static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype, >>>> * proves to be initially empty, just return false because the no-CB GP >>>> * kthread may need to be awakened in this case. >>>> * >>>> + * Return true if there was something to be flushed and it succeeded, otherwise >>>> + * false. >>>> + * >>>> * Note that this function always returns true if rhp is NULL. >>>> */ >>>> static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, >>>> - unsigned long j) >>>> + unsigned long j, unsigned long flush_flags) >>>> { >>>> struct rcu_cblist rcl; >>>> + bool lazy = flush_flags & FLUSH_BP_LAZY; >>>> >>>> WARN_ON_ONCE(!rcu_rdp_is_offloaded(rdp)); >>>> rcu_lockdep_assert_cblist_protected(rdp); >>>> @@ -310,7 +343,20 @@ static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, >>>> /* Note: ->cblist.len already accounts for ->nocb_bypass contents. */ >>>> if (rhp) >>>> rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */ >>>> - rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp); >>>> + >>>> + /* >>>> + * If the new CB requested was a lazy one, queue it onto the main >>>> + * ->cblist so we can take advantage of a sooner grade period. >>> >>> "take advantage of a grace period that will happen regardless."? >> >> Sure will update. >> >>> >>>> + */ >>>> + if (lazy && rhp) { >>>> + rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, NULL); >>>> + rcu_cblist_enqueue(&rcl, rhp); >>> >>> Would it makes sense to enqueue rhp onto ->nocb_bypass first, NULL out >>> rhp, then let the rcu_cblist_flush_enqueue() be common code? Or did this >>> function grow a later use of rhp that I missed? >> >> No that could be done, but it prefer to keep it this >> way because rhp is a function parameter and I >> prefer not to modify those since it could add a >> bug in future where rhp passed by user is now >> NULL for some reason, half way through the >> function. > > I agree that changing a function parameter is bad practice. > > So the question becomes whether introducing a local would outweigh > consolidating this code. Could you please at least give it a shot? Yes for sure I will try this. Was thinking the same. > >>>> + WRITE_ONCE(rdp->lazy_len, 0); >>>> + } else { >>>> + rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp); >>>> + WRITE_ONCE(rdp->lazy_len, 0); >>> >>> This WRITE_ONCE() can be dropped out of the "if" statement, correct? >> >> Yes will update. > > Thank you! > >>> If so, this could be an "if" statement with two statements in its "then" >>> clause, no "else" clause, and two statements following the "if" statement. >> >> I don’t think we can get rid of the else part but I’ll see what it looks like. > > In the function header, s/rhp/rhp_in/, then: > > struct rcu_head *rhp = rhp_in; > > And then: > > if (lazy && rhp) { > rcu_cblist_enqueue(&rdp->nocb_bypass, rhp); > rhp = NULL; > } > rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp); > WRITE_ONCE(rdp->lazy_len, 0); > > Or did I mess something up? Yes you are spot on. It seems more shorter, I’ll do it this way! > >>>> + } >>>> + >>>> rcu_segcblist_insert_pend_cbs(&rdp->cblist, &rcl); >>>> WRITE_ONCE(rdp->nocb_bypass_first, j); >>>> rcu_nocb_bypass_unlock(rdp); >>>> @@ -326,13 +372,33 @@ static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, >>>> * Note that this function always returns true if rhp is NULL. >>>> */ >>>> static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, >>>> - unsigned long j) >>>> + unsigned long j, unsigned long flush_flags) >>>> { >>>> + bool ret; >>>> + bool was_alldone = false; >>>> + bool bypass_all_lazy = false; >>>> + long bypass_ncbs; >>> >>> Alphabetical order by variable name, please. (Yes, I know that this is >>> strange, but what can I say?) >> >> Sure. > > Thank you! My pleasure! > >>>> + >>>> if (!rcu_rdp_is_offloaded(rdp)) >>>> return true; >>>> rcu_lockdep_assert_cblist_protected(rdp); >>>> rcu_nocb_bypass_lock(rdp); >>>> - return rcu_nocb_do_flush_bypass(rdp, rhp, j); >>>> + >>>> + if (flush_flags & FLUSH_BP_WAKE) { >>>> + was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); >>>> + bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); >>>> + bypass_all_lazy = bypass_ncbs && (bypass_ncbs == rdp->lazy_len); >>>> + } >>>> + >>>> + ret = rcu_nocb_do_flush_bypass(rdp, rhp, j, flush_flags); >>>> + >>>> + // Wake up the nocb GP thread if needed. GP thread could be sleeping >>>> + // while waiting for lazy timer to expire (otherwise rcu_barrier may >>>> + // end up waiting for the duration of the lazy timer). >>>> + if (flush_flags & FLUSH_BP_WAKE && was_alldone && bypass_all_lazy) >>>> + wake_nocb_gp(rdp, false); >>>> + >>>> + return ret; >>>> } >>>> >>>> /* >>>> @@ -345,7 +411,7 @@ static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j) >>>> if (!rcu_rdp_is_offloaded(rdp) || >>>> !rcu_nocb_bypass_trylock(rdp)) >>>> return; >>>> - WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j)); >>>> + WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j, FLUSH_BP_NONE)); >>>> } >>>> >>>> /* >>>> @@ -367,12 +433,14 @@ static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j) >>>> * there is only one CPU in operation. >>>> */ >>>> static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, >>>> - bool *was_alldone, unsigned long flags) >>>> + bool *was_alldone, unsigned long flags, >>>> + bool lazy) >>>> { >>>> unsigned long c; >>>> unsigned long cur_gp_seq; >>>> unsigned long j = jiffies; >>>> long ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); >>>> + bool bypass_is_lazy = (ncbs == READ_ONCE(rdp->lazy_len)); >>>> >>>> lockdep_assert_irqs_disabled(); >>>> >>>> @@ -417,25 +485,30 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, >>>> // If there hasn't yet been all that many ->cblist enqueues >>>> // this jiffy, tell the caller to enqueue onto ->cblist. But flush >>>> // ->nocb_bypass first. >>>> - if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) { >>>> + // Lazy CBs throttle this back and do immediate bypass queuing. >>>> + if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy && !lazy) { >>>> rcu_nocb_lock(rdp); >>>> *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); >>>> if (*was_alldone) >>>> trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, >>>> TPS("FirstQ")); >>>> - WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j)); >>>> + >>>> + WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j, FLUSH_BP_NONE)); >>>> WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass)); >>>> return false; // Caller must enqueue the callback. >>>> } >>>> >>>> // If ->nocb_bypass has been used too long or is too full, >>>> // flush ->nocb_bypass to ->cblist. >>>> - if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) || >>>> + if ((ncbs && !bypass_is_lazy && j != READ_ONCE(rdp->nocb_bypass_first)) || >>>> + (ncbs && bypass_is_lazy && >>>> + (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + jiffies_till_flush))) || >>>> ncbs >= qhimark) { >>>> rcu_nocb_lock(rdp); >>>> *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); >>>> >>>> - if (!rcu_nocb_flush_bypass(rdp, rhp, j)) { >>>> + if (!rcu_nocb_flush_bypass(rdp, rhp, j, >>>> + lazy ? FLUSH_BP_LAZY : FLUSH_BP_NONE)) { >>>> if (*was_alldone) >>>> trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, >>>> TPS("FirstQ")); >>>> @@ -460,16 +533,29 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, >>>> // We need to use the bypass. >>>> rcu_nocb_wait_contended(rdp); >>>> rcu_nocb_bypass_lock(rdp); >>>> + >>>> ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); >>>> rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */ >>>> rcu_cblist_enqueue(&rdp->nocb_bypass, rhp); >>>> + >>>> + if (IS_ENABLED(CONFIG_RCU_LAZY) && lazy) >>> >>> Won't !IS_ENABLED(CONFIG_RCU_LAZY) mean that lazy cannot be set? >>> Why do we need to check both? Or are you going for dead code? If so, >>> shouldn't there be IS_ENABLED(CONFIG_RCU_LAZY) checks above as well? >>> >>> Me, I am not convinced that the dead code would buy you much. In fact, >>> the compiler might well be propagating the constants on its own. >>> >>> Ah! The reason the compiler cannot figure this out is because you put >>> the switch into rcu.h. If you instead always export the call_rcu_flush() >>> definition, and check IS_ENABLED(CONFIG_RCU_LAZY) at the beginning of >>> call_rcu(), the compiler should have the information that it needs to >>> do this for you. >> >> Ah ok, I will try to do it this way. > > Very good, thank you, for this and for the rest below. Glad we could do this review, thanks. - Joel > > Thanx, Paul > >>>> + WRITE_ONCE(rdp->lazy_len, rdp->lazy_len + 1); >>>> + >>>> if (!ncbs) { >>>> WRITE_ONCE(rdp->nocb_bypass_first, j); >>>> trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FirstBQ")); >>>> } >>>> + >>>> rcu_nocb_bypass_unlock(rdp); >>>> smp_mb(); /* Order enqueue before wake. */ >>>> - if (ncbs) { >>>> + >>>> + // A wake up of the grace period kthread or timer adjustment needs to >>>> + // be done only if: >>>> + // 1. Bypass list was fully empty before (this is the first bypass list entry). >>>> + // Or, both the below conditions are met: >>>> + // 1. Bypass list had only lazy CBs before. >>>> + // 2. The new CB is non-lazy. >>>> + if (ncbs && (!bypass_is_lazy || lazy)) { >>>> local_irq_restore(flags); >>>> } else { >>>> // No-CBs GP kthread might be indefinitely asleep, if so, wake. >>>> @@ -499,7 +585,7 @@ static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone, >>>> { >>>> unsigned long cur_gp_seq; >>>> unsigned long j; >>>> - long len; >>>> + long len, lazy_len, bypass_len; >>>> struct task_struct *t; >>> >>> Again, alphabetical please, strange though that might seem. >> >> Yes sure >> >>> >>>> // If we are being polled or there is no kthread, just leave. >>>> @@ -512,9 +598,16 @@ static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone, >>>> } >>>> // Need to actually to a wakeup. >>>> len = rcu_segcblist_n_cbs(&rdp->cblist); >>>> + bypass_len = rcu_cblist_n_cbs(&rdp->nocb_bypass); >>>> + lazy_len = READ_ONCE(rdp->lazy_len); >>>> if (was_alldone) { >>>> rdp->qlen_last_fqs_check = len; >>>> - if (!irqs_disabled_flags(flags)) { >>>> + // Only lazy CBs in bypass list >>>> + if (lazy_len && bypass_len == lazy_len) { >>>> + rcu_nocb_unlock_irqrestore(rdp, flags); >>>> + wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE_LAZY, >>>> + TPS("WakeLazy")); >>>> + } else if (!irqs_disabled_flags(flags)) { >>>> /* ... if queue was empty ... */ >>>> rcu_nocb_unlock_irqrestore(rdp, flags); >>>> wake_nocb_gp(rdp, false); >>>> @@ -604,8 +697,8 @@ static void nocb_gp_sleep(struct rcu_data *my_rdp, int cpu) >>>> */ >>>> static void nocb_gp_wait(struct rcu_data *my_rdp) >>>> { >>>> - bool bypass = false; >>>> - long bypass_ncbs; >>>> + bool bypass = false, lazy = false; >>>> + long bypass_ncbs, lazy_ncbs; >>> >>> And ditto. >> >> Ok >> >>> >>>> int __maybe_unused cpu = my_rdp->cpu; >>>> unsigned long cur_gp_seq; >>>> unsigned long flags; >>>> @@ -640,24 +733,41 @@ static void nocb_gp_wait(struct rcu_data *my_rdp) >>>> * won't be ignored for long. >>>> */ >>>> list_for_each_entry(rdp, &my_rdp->nocb_head_rdp, nocb_entry_rdp) { >>>> + bool flush_bypass = false; >>>> + >>>> trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Check")); >>>> rcu_nocb_lock_irqsave(rdp, flags); >>>> lockdep_assert_held(&rdp->nocb_lock); >>>> bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); >>>> - if (bypass_ncbs && >>>> + lazy_ncbs = READ_ONCE(rdp->lazy_len); >>>> + >>>> + if (bypass_ncbs && (lazy_ncbs == bypass_ncbs) && >>>> + (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + jiffies_till_flush) || >>>> + bypass_ncbs > 2 * qhimark)) { >>>> + flush_bypass = true; >>>> + } else if (bypass_ncbs && (lazy_ncbs != bypass_ncbs) && >>>> (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + 1) || >>>> bypass_ncbs > 2 * qhimark)) { >>>> - // Bypass full or old, so flush it. >>>> - (void)rcu_nocb_try_flush_bypass(rdp, j); >>>> - bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); >>>> + flush_bypass = true; >>>> } else if (!bypass_ncbs && rcu_segcblist_empty(&rdp->cblist)) { >>>> rcu_nocb_unlock_irqrestore(rdp, flags); >>>> continue; /* No callbacks here, try next. */ >>>> } >>>> + >>>> + if (flush_bypass) { >>>> + // Bypass full or old, so flush it. >>>> + (void)rcu_nocb_try_flush_bypass(rdp, j); >>>> + bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); >>>> + lazy_ncbs = READ_ONCE(rdp->lazy_len); >>>> + } >>>> + >>>> if (bypass_ncbs) { >>>> trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, >>>> - TPS("Bypass")); >>>> - bypass = true; >>>> + bypass_ncbs == lazy_ncbs ? TPS("Lazy") : TPS("Bypass")); >>>> + if (bypass_ncbs == lazy_ncbs) >>>> + lazy = true; >>>> + else >>>> + bypass = true; >>>> } >>>> rnp = rdp->mynode; >>>> >>>> @@ -705,12 +815,21 @@ static void nocb_gp_wait(struct rcu_data *my_rdp) >>>> my_rdp->nocb_gp_gp = needwait_gp; >>>> my_rdp->nocb_gp_seq = needwait_gp ? wait_gp_seq : 0; >>>> >>>> - if (bypass && !rcu_nocb_poll) { >>>> - // At least one child with non-empty ->nocb_bypass, so set >>>> - // timer in order to avoid stranding its callbacks. >>>> - wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_BYPASS, >>>> - TPS("WakeBypassIsDeferred")); >>>> + // At least one child with non-empty ->nocb_bypass, so set >>>> + // timer in order to avoid stranding its callbacks. >>>> + if (!rcu_nocb_poll) { >>>> + // If bypass list only has lazy CBs. Add a deferred >>>> + // lazy wake up. >>> >>> One sentence rather than two. >> >> Ok >> >>> >>>> + if (lazy && !bypass) { >>>> + wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_LAZY, >>>> + TPS("WakeLazyIsDeferred")); >>>> + // Otherwise add a deferred bypass wake up. >>>> + } else if (bypass) { >>>> + wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_BYPASS, >>>> + TPS("WakeBypassIsDeferred")); >>>> + } >>>> } >>>> + >>>> if (rcu_nocb_poll) { >>>> /* Polling, so trace if first poll in the series. */ >>>> if (gotcbs) >>>> @@ -1036,7 +1155,7 @@ static long rcu_nocb_rdp_deoffload(void *arg) >>>> * return false, which means that future calls to rcu_nocb_try_bypass() >>>> * will refuse to put anything into the bypass. >>>> */ >>>> - WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies)); >>>> + WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies, FLUSH_BP_NONE)); >>>> /* >>>> * Start with invoking rcu_core() early. This way if the current thread >>>> * happens to preempt an ongoing call to rcu_core() in the middle, >>>> @@ -1278,6 +1397,7 @@ static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp) >>>> raw_spin_lock_init(&rdp->nocb_gp_lock); >>>> timer_setup(&rdp->nocb_timer, do_nocb_deferred_wakeup_timer, 0); >>>> rcu_cblist_init(&rdp->nocb_bypass); >>>> + WRITE_ONCE(rdp->lazy_len, 0); >>>> mutex_init(&rdp->nocb_gp_kthread_mutex); >>>> } >>>> >>>> @@ -1559,13 +1679,13 @@ static void rcu_init_one_nocb(struct rcu_node *rnp) >>>> } >>>> >>>> static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, >>>> - unsigned long j) >>>> + unsigned long j, unsigned long flush_flags) >>>> { >>>> return true; >>>> } >>>> >>>> static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, >>>> - bool *was_alldone, unsigned long flags) >>>> + bool *was_alldone, unsigned long flags, bool lazy) >>>> { >>>> return false; >>>> } >>>> -- >>>> 2.37.3.998.g577e59143f-goog >>>>