Re: [PATCH v5 06/18] rcu: Introduce call_rcu_lazy() API implementation

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 




On 9/3/2022 10:03 AM, Paul E. McKenney wrote:
> On Thu, Sep 01, 2022 at 10:17:08PM +0000, Joel Fernandes (Google) wrote:
>> Implement timer-based RCU lazy callback batching. The batch is flushed
>> whenever a certain amount of time has passed, or the batch on a
>> particular CPU grows too big. Also memory pressure will flush it in a
>> future patch.
>>
>> To handle several corner cases automagically (such as rcu_barrier() and
>> hotplug), we re-use bypass lists to handle lazy CBs. The bypass list
>> length has the lazy CB length included in it. A separate lazy CB length
>> counter is also introduced to keep track of the number of lazy CBs.
>>
>> Suggested-by: Paul McKenney <paulmck@xxxxxxxxxx>
>> Signed-off-by: Joel Fernandes (Google) <joel@xxxxxxxxxxxxxxxxx>
> 
> I got this from TREE01 and TREE04:
> 
> kernel/rcu/tree_nocb.h:439:46: error: ‘struct rcu_data’ has no member named ‘lazy_len’
> 
> So I moved the lazy_len field out from under CONFIG_RCU_LAZY.

Ok, thank you!

- Joel

> 
> 							Thanx, Paul
> 
>> ---
>>  include/linux/rcupdate.h   |   6 ++
>>  kernel/rcu/Kconfig         |   8 ++
>>  kernel/rcu/rcu.h           |  11 +++
>>  kernel/rcu/rcu_segcblist.c |   2 +-
>>  kernel/rcu/tree.c          | 130 +++++++++++++++---------
>>  kernel/rcu/tree.h          |  13 ++-
>>  kernel/rcu/tree_nocb.h     | 198 ++++++++++++++++++++++++++++++-------
>>  7 files changed, 280 insertions(+), 88 deletions(-)
>>
>> diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h
>> index 08605ce7379d..82e8a07e0856 100644
>> --- a/include/linux/rcupdate.h
>> +++ b/include/linux/rcupdate.h
>> @@ -108,6 +108,12 @@ static inline int rcu_preempt_depth(void)
>>  
>>  #endif /* #else #ifdef CONFIG_PREEMPT_RCU */
>>  
>> +#ifdef CONFIG_RCU_LAZY
>> +void call_rcu_lazy(struct rcu_head *head, rcu_callback_t func);
>> +#else
>> +#define call_rcu_lazy(head, func) call_rcu(head, func)
>> +#endif
>> +
>>  /* Internal to kernel */
>>  void rcu_init(void);
>>  extern int rcu_scheduler_active;
>> diff --git a/kernel/rcu/Kconfig b/kernel/rcu/Kconfig
>> index d471d22a5e21..3128d01427cb 100644
>> --- a/kernel/rcu/Kconfig
>> +++ b/kernel/rcu/Kconfig
>> @@ -311,4 +311,12 @@ config TASKS_TRACE_RCU_READ_MB
>>  	  Say N here if you hate read-side memory barriers.
>>  	  Take the default if you are unsure.
>>  
>> +config RCU_LAZY
>> +	bool "RCU callback lazy invocation functionality"
>> +	depends on RCU_NOCB_CPU
>> +	default n
>> +	help
>> +	  To save power, batch RCU callbacks and flush after delay, memory
>> +	  pressure or callback list growing too big.
>> +
>>  endmenu # "RCU Subsystem"
>> diff --git a/kernel/rcu/rcu.h b/kernel/rcu/rcu.h
>> index be5979da07f5..94675f14efe8 100644
>> --- a/kernel/rcu/rcu.h
>> +++ b/kernel/rcu/rcu.h
>> @@ -474,6 +474,14 @@ enum rcutorture_type {
>>  	INVALID_RCU_FLAVOR
>>  };
>>  
>> +#if defined(CONFIG_RCU_LAZY)
>> +unsigned long rcu_lazy_get_jiffies_till_flush(void);
>> +void rcu_lazy_set_jiffies_till_flush(unsigned long j);
>> +#else
>> +static inline unsigned long rcu_lazy_get_jiffies_till_flush(void) { return 0; }
>> +static inline void rcu_lazy_set_jiffies_till_flush(unsigned long j) { }
>> +#endif
>> +
>>  #if defined(CONFIG_TREE_RCU)
>>  void rcutorture_get_gp_data(enum rcutorture_type test_type, int *flags,
>>  			    unsigned long *gp_seq);
>> @@ -483,6 +491,8 @@ void do_trace_rcu_torture_read(const char *rcutorturename,
>>  			       unsigned long c_old,
>>  			       unsigned long c);
>>  void rcu_gp_set_torture_wait(int duration);
>> +void rcu_force_call_rcu_to_lazy(bool force);
>> +
>>  #else
>>  static inline void rcutorture_get_gp_data(enum rcutorture_type test_type,
>>  					  int *flags, unsigned long *gp_seq)
>> @@ -501,6 +511,7 @@ void do_trace_rcu_torture_read(const char *rcutorturename,
>>  	do { } while (0)
>>  #endif
>>  static inline void rcu_gp_set_torture_wait(int duration) { }
>> +static inline void rcu_force_call_rcu_to_lazy(bool force) { }
>>  #endif
>>  
>>  #if IS_ENABLED(CONFIG_RCU_TORTURE_TEST) || IS_MODULE(CONFIG_RCU_TORTURE_TEST)
>> diff --git a/kernel/rcu/rcu_segcblist.c b/kernel/rcu/rcu_segcblist.c
>> index c54ea2b6a36b..55b50e592986 100644
>> --- a/kernel/rcu/rcu_segcblist.c
>> +++ b/kernel/rcu/rcu_segcblist.c
>> @@ -38,7 +38,7 @@ void rcu_cblist_enqueue(struct rcu_cblist *rclp, struct rcu_head *rhp)
>>   * element of the second rcu_cblist structure, but ensuring that the second
>>   * rcu_cblist structure, if initially non-empty, always appears non-empty
>>   * throughout the process.  If rdp is NULL, the second rcu_cblist structure
>> - * is instead initialized to empty.
>> + * is instead initialized to empty. Also account for lazy_len for lazy CBs.
>>   */
>>  void rcu_cblist_flush_enqueue(struct rcu_cblist *drclp,
>>  			      struct rcu_cblist *srclp,
>> diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
>> index 9fe581be8696..aaced29a0a71 100644
>> --- a/kernel/rcu/tree.c
>> +++ b/kernel/rcu/tree.c
>> @@ -2728,47 +2728,8 @@ static void check_cb_ovld(struct rcu_data *rdp)
>>  	raw_spin_unlock_rcu_node(rnp);
>>  }
>>  
>> -/**
>> - * call_rcu() - Queue an RCU callback for invocation after a grace period.
>> - * @head: structure to be used for queueing the RCU updates.
>> - * @func: actual callback function to be invoked after the grace period
>> - *
>> - * The callback function will be invoked some time after a full grace
>> - * period elapses, in other words after all pre-existing RCU read-side
>> - * critical sections have completed.  However, the callback function
>> - * might well execute concurrently with RCU read-side critical sections
>> - * that started after call_rcu() was invoked.
>> - *
>> - * RCU read-side critical sections are delimited by rcu_read_lock()
>> - * and rcu_read_unlock(), and may be nested.  In addition, but only in
>> - * v5.0 and later, regions of code across which interrupts, preemption,
>> - * or softirqs have been disabled also serve as RCU read-side critical
>> - * sections.  This includes hardware interrupt handlers, softirq handlers,
>> - * and NMI handlers.
>> - *
>> - * Note that all CPUs must agree that the grace period extended beyond
>> - * all pre-existing RCU read-side critical section.  On systems with more
>> - * than one CPU, this means that when "func()" is invoked, each CPU is
>> - * guaranteed to have executed a full memory barrier since the end of its
>> - * last RCU read-side critical section whose beginning preceded the call
>> - * to call_rcu().  It also means that each CPU executing an RCU read-side
>> - * critical section that continues beyond the start of "func()" must have
>> - * executed a memory barrier after the call_rcu() but before the beginning
>> - * of that RCU read-side critical section.  Note that these guarantees
>> - * include CPUs that are offline, idle, or executing in user mode, as
>> - * well as CPUs that are executing in the kernel.
>> - *
>> - * Furthermore, if CPU A invoked call_rcu() and CPU B invoked the
>> - * resulting RCU callback function "func()", then both CPU A and CPU B are
>> - * guaranteed to execute a full memory barrier during the time interval
>> - * between the call to call_rcu() and the invocation of "func()" -- even
>> - * if CPU A and CPU B are the same CPU (but again only if the system has
>> - * more than one CPU).
>> - *
>> - * Implementation of these memory-ordering guarantees is described here:
>> - * Documentation/RCU/Design/Memory-Ordering/Tree-RCU-Memory-Ordering.rst.
>> - */
>> -void call_rcu(struct rcu_head *head, rcu_callback_t func)
>> +static void
>> +__call_rcu_common(struct rcu_head *head, rcu_callback_t func, bool lazy)
>>  {
>>  	static atomic_t doublefrees;
>>  	unsigned long flags;
>> @@ -2818,7 +2779,7 @@ void call_rcu(struct rcu_head *head, rcu_callback_t func)
>>  		trace_rcu_callback(rcu_state.name, head,
>>  				   rcu_segcblist_n_cbs(&rdp->cblist));
>>  
>> -	if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags))
>> +	if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags, lazy))
>>  		return; // Enqueued onto ->nocb_bypass, so just leave.
>>  	// If no-CBs CPU gets here, rcu_nocb_try_bypass() acquired ->nocb_lock.
>>  	rcu_segcblist_enqueue(&rdp->cblist, head);
>> @@ -2833,8 +2794,86 @@ void call_rcu(struct rcu_head *head, rcu_callback_t func)
>>  		local_irq_restore(flags);
>>  	}
>>  }
>> -EXPORT_SYMBOL_GPL(call_rcu);
>>  
>> +#ifdef CONFIG_RCU_LAZY
>> +/**
>> + * call_rcu_lazy() - Lazily queue RCU callback for invocation after grace period.
>> + * @head: structure to be used for queueing the RCU updates.
>> + * @func: actual callback function to be invoked after the grace period
>> + *
>> + * The callback function will be invoked some time after a full grace
>> + * period elapses, in other words after all pre-existing RCU read-side
>> + * critical sections have completed.
>> + *
>> + * Use this API instead of call_rcu() if you don't mind the callback being
>> + * invoked after very long periods of time on systems without memory pressure
>> + * and on systems which are lightly loaded or mostly idle.
>> + *
>> + * Other than the extra delay in callbacks being invoked, this function is
>> + * identical to, and reuses call_rcu()'s logic. Refer to call_rcu() for more
>> + * details about memory ordering and other functionality.
>> + */
>> +void call_rcu_lazy(struct rcu_head *head, rcu_callback_t func)
>> +{
>> +	return __call_rcu_common(head, func, true);
>> +}
>> +EXPORT_SYMBOL_GPL(call_rcu_lazy);
>> +#endif
>> +
>> +static bool force_call_rcu_to_lazy;
>> +
>> +void rcu_force_call_rcu_to_lazy(bool force)
>> +{
>> +	if (IS_ENABLED(CONFIG_RCU_SCALE_TEST))
>> +		WRITE_ONCE(force_call_rcu_to_lazy, force);
>> +}
>> +EXPORT_SYMBOL_GPL(rcu_force_call_rcu_to_lazy);
>> +
>> +/**
>> + * call_rcu() - Queue an RCU callback for invocation after a grace period.
>> + * @head: structure to be used for queueing the RCU updates.
>> + * @func: actual callback function to be invoked after the grace period
>> + *
>> + * The callback function will be invoked some time after a full grace
>> + * period elapses, in other words after all pre-existing RCU read-side
>> + * critical sections have completed.  However, the callback function
>> + * might well execute concurrently with RCU read-side critical sections
>> + * that started after call_rcu() was invoked.
>> + *
>> + * RCU read-side critical sections are delimited by rcu_read_lock()
>> + * and rcu_read_unlock(), and may be nested.  In addition, but only in
>> + * v5.0 and later, regions of code across which interrupts, preemption,
>> + * or softirqs have been disabled also serve as RCU read-side critical
>> + * sections.  This includes hardware interrupt handlers, softirq handlers,
>> + * and NMI handlers.
>> + *
>> + * Note that all CPUs must agree that the grace period extended beyond
>> + * all pre-existing RCU read-side critical section.  On systems with more
>> + * than one CPU, this means that when "func()" is invoked, each CPU is
>> + * guaranteed to have executed a full memory barrier since the end of its
>> + * last RCU read-side critical section whose beginning preceded the call
>> + * to call_rcu().  It also means that each CPU executing an RCU read-side
>> + * critical section that continues beyond the start of "func()" must have
>> + * executed a memory barrier after the call_rcu() but before the beginning
>> + * of that RCU read-side critical section.  Note that these guarantees
>> + * include CPUs that are offline, idle, or executing in user mode, as
>> + * well as CPUs that are executing in the kernel.
>> + *
>> + * Furthermore, if CPU A invoked call_rcu() and CPU B invoked the
>> + * resulting RCU callback function "func()", then both CPU A and CPU B are
>> + * guaranteed to execute a full memory barrier during the time interval
>> + * between the call to call_rcu() and the invocation of "func()" -- even
>> + * if CPU A and CPU B are the same CPU (but again only if the system has
>> + * more than one CPU).
>> + *
>> + * Implementation of these memory-ordering guarantees is described here:
>> + * Documentation/RCU/Design/Memory-Ordering/Tree-RCU-Memory-Ordering.rst.
>> + */
>> +void call_rcu(struct rcu_head *head, rcu_callback_t func)
>> +{
>> +	return __call_rcu_common(head, func, force_call_rcu_to_lazy);
>> +}
>> +EXPORT_SYMBOL_GPL(call_rcu);
>>  
>>  /* Maximum number of jiffies to wait before draining a batch. */
>>  #define KFREE_DRAIN_JIFFIES (5 * HZ)
>> @@ -3904,7 +3943,8 @@ static void rcu_barrier_entrain(struct rcu_data *rdp)
>>  	rdp->barrier_head.func = rcu_barrier_callback;
>>  	debug_rcu_head_queue(&rdp->barrier_head);
>>  	rcu_nocb_lock(rdp);
>> -	WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies));
>> +	WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies, false,
>> +		     /* wake gp thread */ true));
>>  	if (rcu_segcblist_entrain(&rdp->cblist, &rdp->barrier_head)) {
>>  		atomic_inc(&rcu_state.barrier_cpu_count);
>>  	} else {
>> @@ -4325,7 +4365,7 @@ void rcutree_migrate_callbacks(int cpu)
>>  	my_rdp = this_cpu_ptr(&rcu_data);
>>  	my_rnp = my_rdp->mynode;
>>  	rcu_nocb_lock(my_rdp); /* irqs already disabled. */
>> -	WARN_ON_ONCE(!rcu_nocb_flush_bypass(my_rdp, NULL, jiffies));
>> +	WARN_ON_ONCE(!rcu_nocb_flush_bypass(my_rdp, NULL, jiffies, false, false));
>>  	raw_spin_lock_rcu_node(my_rnp); /* irqs already disabled. */
>>  	/* Leverage recent GPs and set GP for new callbacks. */
>>  	needwake = rcu_advance_cbs(my_rnp, rdp) ||
>> diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h
>> index d4a97e40ea9c..946d819b23fc 100644
>> --- a/kernel/rcu/tree.h
>> +++ b/kernel/rcu/tree.h
>> @@ -263,14 +263,18 @@ struct rcu_data {
>>  	unsigned long last_fqs_resched;	/* Time of last rcu_resched(). */
>>  	unsigned long last_sched_clock;	/* Jiffies of last rcu_sched_clock_irq(). */
>>  
>> +#ifdef CONFIG_RCU_LAZY
>> +	long lazy_len;			/* Length of buffered lazy callbacks. */
>> +#endif
>>  	int cpu;
>>  };
>>  
>>  /* Values for nocb_defer_wakeup field in struct rcu_data. */
>>  #define RCU_NOCB_WAKE_NOT	0
>>  #define RCU_NOCB_WAKE_BYPASS	1
>> -#define RCU_NOCB_WAKE		2
>> -#define RCU_NOCB_WAKE_FORCE	3
>> +#define RCU_NOCB_WAKE_LAZY	2
>> +#define RCU_NOCB_WAKE		3
>> +#define RCU_NOCB_WAKE_FORCE	4
>>  
>>  #define RCU_JIFFIES_TILL_FORCE_QS (1 + (HZ > 250) + (HZ > 500))
>>  					/* For jiffies_till_first_fqs and */
>> @@ -440,9 +444,10 @@ static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp);
>>  static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq);
>>  static void rcu_init_one_nocb(struct rcu_node *rnp);
>>  static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
>> -				  unsigned long j);
>> +				  unsigned long j, bool lazy, bool wakegp);
>>  static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
>> -				bool *was_alldone, unsigned long flags);
>> +				bool *was_alldone, unsigned long flags,
>> +				bool lazy);
>>  static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_empty,
>>  				 unsigned long flags);
>>  static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp, int level);
>> diff --git a/kernel/rcu/tree_nocb.h b/kernel/rcu/tree_nocb.h
>> index 31068dd31315..7e97a7b6e046 100644
>> --- a/kernel/rcu/tree_nocb.h
>> +++ b/kernel/rcu/tree_nocb.h
>> @@ -256,6 +256,31 @@ static bool wake_nocb_gp(struct rcu_data *rdp, bool force)
>>  	return __wake_nocb_gp(rdp_gp, rdp, force, flags);
>>  }
>>  
>> +/*
>> + * LAZY_FLUSH_JIFFIES decides the maximum amount of time that
>> + * can elapse before lazy callbacks are flushed. Lazy callbacks
>> + * could be flushed much earlier for a number of other reasons
>> + * however, LAZY_FLUSH_JIFFIES will ensure no lazy callbacks are
>> + * left unsubmitted to RCU after those many jiffies.
>> + */
>> +#define LAZY_FLUSH_JIFFIES (10 * HZ)
>> +unsigned long jiffies_till_flush = LAZY_FLUSH_JIFFIES;
>> +
>> +#ifdef CONFIG_RCU_LAZY
>> +// To be called only from test code.
>> +void rcu_lazy_set_jiffies_till_flush(unsigned long jif)
>> +{
>> +	jiffies_till_flush = jif;
>> +}
>> +EXPORT_SYMBOL(rcu_lazy_set_jiffies_till_flush);
>> +
>> +unsigned long rcu_lazy_get_jiffies_till_flush(void)
>> +{
>> +	return jiffies_till_flush;
>> +}
>> +EXPORT_SYMBOL(rcu_lazy_get_jiffies_till_flush);
>> +#endif
>> +
>>  /*
>>   * Arrange to wake the GP kthread for this NOCB group at some future
>>   * time when it is safe to do so.
>> @@ -265,23 +290,39 @@ static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype,
>>  {
>>  	unsigned long flags;
>>  	struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
>> +	unsigned long mod_jif = 0;
>>  
>>  	raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
>>  
>>  	/*
>> -	 * Bypass wakeup overrides previous deferments. In case
>> -	 * of callback storm, no need to wake up too early.
>> +	 * Bypass and lazy wakeup overrides previous deferments. In case of
>> +	 * callback storm, no need to wake up too early.
>>  	 */
>> -	if (waketype == RCU_NOCB_WAKE_BYPASS) {
>> -		mod_timer(&rdp_gp->nocb_timer, jiffies + 2);
>> -		WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype);
>> -	} else {
>> +	switch (waketype) {
>> +	case RCU_NOCB_WAKE_LAZY:
>> +		if (rdp->nocb_defer_wakeup != RCU_NOCB_WAKE_LAZY)
>> +			mod_jif = jiffies_till_flush;
>> +		break;
>> +
>> +	case RCU_NOCB_WAKE_BYPASS:
>> +		mod_jif = 2;
>> +		break;
>> +
>> +	case RCU_NOCB_WAKE:
>> +	case RCU_NOCB_WAKE_FORCE:
>> +		// For these, make it wake up the soonest if we
>> +		// were in a bypass or lazy sleep before.
>>  		if (rdp_gp->nocb_defer_wakeup < RCU_NOCB_WAKE)
>> -			mod_timer(&rdp_gp->nocb_timer, jiffies + 1);
>> -		if (rdp_gp->nocb_defer_wakeup < waketype)
>> -			WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype);
>> +			mod_jif = 1;
>> +		break;
>>  	}
>>  
>> +	if (mod_jif)
>> +		mod_timer(&rdp_gp->nocb_timer, jiffies + mod_jif);
>> +
>> +	if (rdp_gp->nocb_defer_wakeup < waketype)
>> +		WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype);
>> +
>>  	raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
>>  
>>  	trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, reason);
>> @@ -293,10 +334,13 @@ static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype,
>>   * proves to be initially empty, just return false because the no-CB GP
>>   * kthread may need to be awakened in this case.
>>   *
>> + * Return true if there was something to be flushed and it succeeded, otherwise
>> + * false.
>> + *
>>   * Note that this function always returns true if rhp is NULL.
>>   */
>>  static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
>> -				     unsigned long j)
>> +				     unsigned long j, bool lazy)
>>  {
>>  	struct rcu_cblist rcl;
>>  
>> @@ -310,7 +354,18 @@ static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
>>  	/* Note: ->cblist.len already accounts for ->nocb_bypass contents. */
>>  	if (rhp)
>>  		rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */
>> -	rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp);
>> +
>> +	/*
>> +	 * If the new CB requested was a lazy one, queue it onto the main
>> +	 * ->cblist so we can take advantage of a sooner grade period.
>> +	 */
>> +	if (lazy && rhp) {
>> +		rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, NULL);
>> +		rcu_cblist_enqueue(&rcl, rhp);
>> +	} else {
>> +		rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp);
>> +	}
>> +
>>  	rcu_segcblist_insert_pend_cbs(&rdp->cblist, &rcl);
>>  	WRITE_ONCE(rdp->nocb_bypass_first, j);
>>  	rcu_nocb_bypass_unlock(rdp);
>> @@ -326,13 +381,20 @@ static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
>>   * Note that this function always returns true if rhp is NULL.
>>   */
>>  static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
>> -				  unsigned long j)
>> +				  unsigned long j, bool lazy, bool wake_gp)
>>  {
>> +	bool ret;
>> +
>>  	if (!rcu_rdp_is_offloaded(rdp))
>>  		return true;
>>  	rcu_lockdep_assert_cblist_protected(rdp);
>>  	rcu_nocb_bypass_lock(rdp);
>> -	return rcu_nocb_do_flush_bypass(rdp, rhp, j);
>> +	ret = rcu_nocb_do_flush_bypass(rdp, rhp, j, lazy);
>> +
>> +	if (wake_gp)
>> +		wake_nocb_gp(rdp, true);
>> +
>> +	return ret;
>>  }
>>  
>>  /*
>> @@ -345,7 +407,7 @@ static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j)
>>  	if (!rcu_rdp_is_offloaded(rdp) ||
>>  	    !rcu_nocb_bypass_trylock(rdp))
>>  		return;
>> -	WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j));
>> +	WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j, false));
>>  }
>>  
>>  /*
>> @@ -367,12 +429,14 @@ static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j)
>>   * there is only one CPU in operation.
>>   */
>>  static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
>> -				bool *was_alldone, unsigned long flags)
>> +				bool *was_alldone, unsigned long flags,
>> +				bool lazy)
>>  {
>>  	unsigned long c;
>>  	unsigned long cur_gp_seq;
>>  	unsigned long j = jiffies;
>>  	long ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
>> +	bool bypass_is_lazy = (ncbs == READ_ONCE(rdp->lazy_len));
>>  
>>  	lockdep_assert_irqs_disabled();
>>  
>> @@ -417,23 +481,29 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
>>  	// If there hasn't yet been all that many ->cblist enqueues
>>  	// this jiffy, tell the caller to enqueue onto ->cblist.  But flush
>>  	// ->nocb_bypass first.
>> -	if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) {
>> +	// Lazy CBs throttle this back and do immediate bypass queuing.
>> +	if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy && !lazy) {
>>  		rcu_nocb_lock(rdp);
>>  		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
>>  		if (*was_alldone)
>>  			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
>>  					    TPS("FirstQ"));
>> -		WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j));
>> +
>> +		WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j, false, false));
>> +
>>  		WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
>>  		return false; // Caller must enqueue the callback.
>>  	}
>>  
>>  	// If ->nocb_bypass has been used too long or is too full,
>>  	// flush ->nocb_bypass to ->cblist.
>> -	if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) ||
>> +	if ((ncbs && !bypass_is_lazy && j != READ_ONCE(rdp->nocb_bypass_first)) ||
>> +	    (ncbs &&  bypass_is_lazy &&
>> +		(time_after(j, READ_ONCE(rdp->nocb_bypass_first) + jiffies_till_flush))) ||
>>  	    ncbs >= qhimark) {
>>  		rcu_nocb_lock(rdp);
>> -		if (!rcu_nocb_flush_bypass(rdp, rhp, j)) {
>> +
>> +		if (!rcu_nocb_flush_bypass(rdp, rhp, j, lazy, false)) {
>>  			*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
>>  			if (*was_alldone)
>>  				trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
>> @@ -460,16 +530,29 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
>>  	// We need to use the bypass.
>>  	rcu_nocb_wait_contended(rdp);
>>  	rcu_nocb_bypass_lock(rdp);
>> +
>>  	ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
>>  	rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */
>>  	rcu_cblist_enqueue(&rdp->nocb_bypass, rhp);
>> +
>> +	if (IS_ENABLED(CONFIG_RCU_LAZY) && lazy)
>> +		WRITE_ONCE(rdp->lazy_len, rdp->lazy_len + 1);
>> +
>>  	if (!ncbs) {
>>  		WRITE_ONCE(rdp->nocb_bypass_first, j);
>>  		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FirstBQ"));
>>  	}
>> +
>>  	rcu_nocb_bypass_unlock(rdp);
>>  	smp_mb(); /* Order enqueue before wake. */
>> -	if (ncbs) {
>> +
>> +	// We had CBs in the bypass list before. There is nothing else to do if:
>> +	// There were only non-lazy CBs before, in this case, the bypass timer
>> +	// or GP-thread will handle the CBs including any new lazy ones.
>> +	// Or, the new CB is lazy and the old bypass-CBs were also lazy. In this
>> +	// case the old lazy timer would have been setup. When that expires,
>> +	// the new lazy one will be handled.
>> +	if (ncbs && (!bypass_is_lazy || lazy)) {
>>  		local_irq_restore(flags);
>>  	} else {
>>  		// No-CBs GP kthread might be indefinitely asleep, if so, wake.
>> @@ -478,6 +561,10 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
>>  			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
>>  					    TPS("FirstBQwake"));
>>  			__call_rcu_nocb_wake(rdp, true, flags);
>> +		} else if (bypass_is_lazy && !lazy) {
>> +			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
>> +					    TPS("FirstBQwakeLazy2Non"));
>> +			__call_rcu_nocb_wake(rdp, true, flags);
>>  		} else {
>>  			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
>>  					    TPS("FirstBQnoWake"));
>> @@ -499,7 +586,7 @@ static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone,
>>  {
>>  	unsigned long cur_gp_seq;
>>  	unsigned long j;
>> -	long len;
>> +	long len, lazy_len, bypass_len;
>>  	struct task_struct *t;
>>  
>>  	// If we are being polled or there is no kthread, just leave.
>> @@ -512,9 +599,16 @@ static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone,
>>  	}
>>  	// Need to actually to a wakeup.
>>  	len = rcu_segcblist_n_cbs(&rdp->cblist);
>> +	bypass_len = rcu_cblist_n_cbs(&rdp->nocb_bypass);
>> +	lazy_len = READ_ONCE(rdp->lazy_len);
>>  	if (was_alldone) {
>>  		rdp->qlen_last_fqs_check = len;
>> -		if (!irqs_disabled_flags(flags)) {
>> +		// Only lazy CBs in bypass list
>> +		if (lazy_len && bypass_len == lazy_len) {
>> +			rcu_nocb_unlock_irqrestore(rdp, flags);
>> +			wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE_LAZY,
>> +					   TPS("WakeLazy"));
>> +		} else if (!irqs_disabled_flags(flags)) {
>>  			/* ... if queue was empty ... */
>>  			rcu_nocb_unlock_irqrestore(rdp, flags);
>>  			wake_nocb_gp(rdp, false);
>> @@ -604,8 +698,8 @@ static void nocb_gp_sleep(struct rcu_data *my_rdp, int cpu)
>>   */
>>  static void nocb_gp_wait(struct rcu_data *my_rdp)
>>  {
>> -	bool bypass = false;
>> -	long bypass_ncbs;
>> +	bool bypass = false, lazy = false;
>> +	long bypass_ncbs, lazy_ncbs;
>>  	int __maybe_unused cpu = my_rdp->cpu;
>>  	unsigned long cur_gp_seq;
>>  	unsigned long flags;
>> @@ -640,24 +734,41 @@ static void nocb_gp_wait(struct rcu_data *my_rdp)
>>  	 * won't be ignored for long.
>>  	 */
>>  	list_for_each_entry(rdp, &my_rdp->nocb_head_rdp, nocb_entry_rdp) {
>> +		bool flush_bypass = false;
>> +
>>  		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Check"));
>>  		rcu_nocb_lock_irqsave(rdp, flags);
>>  		lockdep_assert_held(&rdp->nocb_lock);
>>  		bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
>> -		if (bypass_ncbs &&
>> +		lazy_ncbs = READ_ONCE(rdp->lazy_len);
>> +
>> +		if (bypass_ncbs && (lazy_ncbs == bypass_ncbs) &&
>> +		    (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + jiffies_till_flush) ||
>> +		     bypass_ncbs > 2 * qhimark)) {
>> +			flush_bypass = true;
>> +		} else if (bypass_ncbs && (lazy_ncbs != bypass_ncbs) &&
>>  		    (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + 1) ||
>>  		     bypass_ncbs > 2 * qhimark)) {
>> -			// Bypass full or old, so flush it.
>> -			(void)rcu_nocb_try_flush_bypass(rdp, j);
>> -			bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
>> +			flush_bypass = true;
>>  		} else if (!bypass_ncbs && rcu_segcblist_empty(&rdp->cblist)) {
>>  			rcu_nocb_unlock_irqrestore(rdp, flags);
>>  			continue; /* No callbacks here, try next. */
>>  		}
>> +
>> +		if (flush_bypass) {
>> +			// Bypass full or old, so flush it.
>> +			(void)rcu_nocb_try_flush_bypass(rdp, j);
>> +			bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
>> +			lazy_ncbs = READ_ONCE(rdp->lazy_len);
>> +		}
>> +
>>  		if (bypass_ncbs) {
>>  			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
>> -					    TPS("Bypass"));
>> -			bypass = true;
>> +				    bypass_ncbs == lazy_ncbs ? TPS("Lazy") : TPS("Bypass"));
>> +			if (bypass_ncbs == lazy_ncbs)
>> +				lazy = true;
>> +			else
>> +				bypass = true;
>>  		}
>>  		rnp = rdp->mynode;
>>  
>> @@ -705,12 +816,21 @@ static void nocb_gp_wait(struct rcu_data *my_rdp)
>>  	my_rdp->nocb_gp_gp = needwait_gp;
>>  	my_rdp->nocb_gp_seq = needwait_gp ? wait_gp_seq : 0;
>>  
>> -	if (bypass && !rcu_nocb_poll) {
>> -		// At least one child with non-empty ->nocb_bypass, so set
>> -		// timer in order to avoid stranding its callbacks.
>> -		wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_BYPASS,
>> -				   TPS("WakeBypassIsDeferred"));
>> +	// At least one child with non-empty ->nocb_bypass, so set
>> +	// timer in order to avoid stranding its callbacks.
>> +	if (!rcu_nocb_poll) {
>> +		// If bypass list only has lazy CBs. Add a deferred
>> +		// lazy wake up.
>> +		if (lazy && !bypass) {
>> +			wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_LAZY,
>> +					TPS("WakeLazyIsDeferred"));
>> +		// Otherwise add a deferred bypass wake up.
>> +		} else if (bypass) {
>> +			wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_BYPASS,
>> +					TPS("WakeBypassIsDeferred"));
>> +		}
>>  	}
>> +
>>  	if (rcu_nocb_poll) {
>>  		/* Polling, so trace if first poll in the series. */
>>  		if (gotcbs)
>> @@ -1036,7 +1156,7 @@ static long rcu_nocb_rdp_deoffload(void *arg)
>>  	 * return false, which means that future calls to rcu_nocb_try_bypass()
>>  	 * will refuse to put anything into the bypass.
>>  	 */
>> -	WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies));
>> +	WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies, false, false));
>>  	/*
>>  	 * Start with invoking rcu_core() early. This way if the current thread
>>  	 * happens to preempt an ongoing call to rcu_core() in the middle,
>> @@ -1290,6 +1410,7 @@ static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
>>  	raw_spin_lock_init(&rdp->nocb_gp_lock);
>>  	timer_setup(&rdp->nocb_timer, do_nocb_deferred_wakeup_timer, 0);
>>  	rcu_cblist_init(&rdp->nocb_bypass);
>> +	WRITE_ONCE(rdp->lazy_len, 0);
>>  	mutex_init(&rdp->nocb_gp_kthread_mutex);
>>  }
>>  
>> @@ -1571,13 +1692,14 @@ static void rcu_init_one_nocb(struct rcu_node *rnp)
>>  }
>>  
>>  static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
>> -				  unsigned long j)
>> +				  unsigned long j, bool lazy, bool wakegp)
>>  {
>>  	return true;
>>  }
>>  
>>  static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
>> -				bool *was_alldone, unsigned long flags)
>> +				bool *was_alldone, unsigned long flags,
>> +				bool lazy)
>>  {
>>  	return false;
>>  }
>> -- 
>> 2.37.2.789.g6183377224-goog
>>



[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux