Re: [PATCH 11/11 v2] rcu/nocb: Simplify (de-)offloading state machine

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Thu, Jul 04, 2024 at 12:56:40AM +0200, Frederic Weisbecker wrote:
> Now that the (de-)offloading process can only apply to offline CPUs,
> there is no more concurrency between rcu_core and nocb kthreads. Also
> the mutation now happens on empty queues.
> 
> Therefore the state machine can be reduced to a single bit called
> SEGCBLIST_OFFLOADED. Simplify the transition as follows:
> 
> * Upon offloading: queue the rdp to be added to the rcuog list and
>   wait for the rcuog kthread to set the SEGCBLIST_OFFLOADED bit. Unpark
>   rcuo kthread.
> 
> * Upon de-offloading: Park rcuo kthread. Queue the rdp to be removed
>   from the rcuog list and wait for the rcuog kthread to clear the
>   SEGCBLIST_OFFLOADED bit.
> 
> Signed-off-by: Frederic Weisbecker <frederic@xxxxxxxxxx>
> Signed-off-by: Paul E. McKenney <paulmck@xxxxxxxxxx>

I have reverted the earlier patch and queued this one, thank you both!
If testing goes well, I will put the stack onto the nocb branch in
preparation for pushing it into the upcoming merge window.

							Thanx, Paul

> ---
>  include/linux/rcu_segcblist.h |   4 +-
>  kernel/rcu/rcu_segcblist.c    |  11 ---
>  kernel/rcu/rcu_segcblist.h    |   2 +-
>  kernel/rcu/tree_nocb.h        | 131 ++++++++++++++++------------------
>  4 files changed, 62 insertions(+), 86 deletions(-)
> 
> diff --git a/include/linux/rcu_segcblist.h b/include/linux/rcu_segcblist.h
> index 1ef1bb54853d..2fdc2208f1ca 100644
> --- a/include/linux/rcu_segcblist.h
> +++ b/include/linux/rcu_segcblist.h
> @@ -185,9 +185,7 @@ struct rcu_cblist {
>   *  ----------------------------------------------------------------------------
>   */
>  #define SEGCBLIST_ENABLED	BIT(0)
> -#define SEGCBLIST_LOCKING	BIT(1)
> -#define SEGCBLIST_KTHREAD_GP	BIT(2)
> -#define SEGCBLIST_OFFLOADED	BIT(3)
> +#define SEGCBLIST_OFFLOADED	BIT(1)
>  
>  struct rcu_segcblist {
>  	struct rcu_head *head;
> diff --git a/kernel/rcu/rcu_segcblist.c b/kernel/rcu/rcu_segcblist.c
> index 1693ea22ef1b..298a2c573f02 100644
> --- a/kernel/rcu/rcu_segcblist.c
> +++ b/kernel/rcu/rcu_segcblist.c
> @@ -260,17 +260,6 @@ void rcu_segcblist_disable(struct rcu_segcblist *rsclp)
>  	rcu_segcblist_clear_flags(rsclp, SEGCBLIST_ENABLED);
>  }
>  
> -/*
> - * Mark the specified rcu_segcblist structure as offloaded (or not)
> - */
> -void rcu_segcblist_offload(struct rcu_segcblist *rsclp, bool offload)
> -{
> -	if (offload)
> -		rcu_segcblist_set_flags(rsclp, SEGCBLIST_LOCKING | SEGCBLIST_OFFLOADED);
> -	else
> -		rcu_segcblist_clear_flags(rsclp, SEGCBLIST_OFFLOADED);
> -}
> -
>  /*
>   * Does the specified rcu_segcblist structure contain callbacks that
>   * are ready to be invoked?
> diff --git a/kernel/rcu/rcu_segcblist.h b/kernel/rcu/rcu_segcblist.h
> index 7a0962dfee86..259904075636 100644
> --- a/kernel/rcu/rcu_segcblist.h
> +++ b/kernel/rcu/rcu_segcblist.h
> @@ -89,7 +89,7 @@ static inline bool rcu_segcblist_is_enabled(struct rcu_segcblist *rsclp)
>  static inline bool rcu_segcblist_is_offloaded(struct rcu_segcblist *rsclp)
>  {
>  	if (IS_ENABLED(CONFIG_RCU_NOCB_CPU) &&
> -	    rcu_segcblist_test_flags(rsclp, SEGCBLIST_LOCKING))
> +	    rcu_segcblist_test_flags(rsclp, SEGCBLIST_OFFLOADED))
>  		return true;
>  
>  	return false;
> diff --git a/kernel/rcu/tree_nocb.h b/kernel/rcu/tree_nocb.h
> index 24daf606de0c..c8fc2f37c9ed 100644
> --- a/kernel/rcu/tree_nocb.h
> +++ b/kernel/rcu/tree_nocb.h
> @@ -604,37 +604,33 @@ static void call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *head,
>  	}
>  }
>  
> -static int nocb_gp_toggle_rdp(struct rcu_data *rdp)
> +static void nocb_gp_toggle_rdp(struct rcu_data *rdp_gp, struct rcu_data *rdp)
>  {
>  	struct rcu_segcblist *cblist = &rdp->cblist;
>  	unsigned long flags;
> -	int ret;
>  
> -	rcu_nocb_lock_irqsave(rdp, flags);
> -	if (rcu_segcblist_test_flags(cblist, SEGCBLIST_OFFLOADED) &&
> -	    !rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP)) {
> +	/*
> +	 * Locking orders future de-offloaded callbacks enqueue against previous
> +	 * handling of this rdp. Ie: Make sure rcuog is done with this rdp before
> +	 * deoffloaded callbacks can be enqueued.
> +	 */
> +	raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
> +	if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_OFFLOADED)) {
>  		/*
>  		 * Offloading. Set our flag and notify the offload worker.
>  		 * We will handle this rdp until it ever gets de-offloaded.
>  		 */
> -		rcu_segcblist_set_flags(cblist, SEGCBLIST_KTHREAD_GP);
> -		ret = 1;
> -	} else if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_OFFLOADED) &&
> -		   rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP)) {
> +		list_add_tail(&rdp->nocb_entry_rdp, &rdp_gp->nocb_head_rdp);
> +		rcu_segcblist_set_flags(cblist, SEGCBLIST_OFFLOADED);
> +	} else {
>  		/*
>  		 * De-offloading. Clear our flag and notify the de-offload worker.
>  		 * We will ignore this rdp until it ever gets re-offloaded.
>  		 */
> -		rcu_segcblist_clear_flags(cblist, SEGCBLIST_KTHREAD_GP);
> -		ret = 0;
> -	} else {
> -		WARN_ON_ONCE(1);
> -		ret = -1;
> +		list_del(&rdp->nocb_entry_rdp);
> +		rcu_segcblist_clear_flags(cblist, SEGCBLIST_OFFLOADED);
>  	}
> -
> -	rcu_nocb_unlock_irqrestore(rdp, flags);
> -
> -	return ret;
> +	raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
>  }
>  
>  static void nocb_gp_sleep(struct rcu_data *my_rdp, int cpu)
> @@ -841,14 +837,7 @@ static void nocb_gp_wait(struct rcu_data *my_rdp)
>  	}
>  
>  	if (rdp_toggling) {
> -		int ret;
> -
> -		ret = nocb_gp_toggle_rdp(rdp_toggling);
> -		if (ret == 1)
> -			list_add_tail(&rdp_toggling->nocb_entry_rdp, &my_rdp->nocb_head_rdp);
> -		else if (ret == 0)
> -			list_del(&rdp_toggling->nocb_entry_rdp);
> -
> +		nocb_gp_toggle_rdp(my_rdp, rdp_toggling);
>  		swake_up_one(&rdp_toggling->nocb_state_wq);
>  	}
>  
> @@ -1018,16 +1007,11 @@ void rcu_nocb_flush_deferred_wakeup(void)
>  }
>  EXPORT_SYMBOL_GPL(rcu_nocb_flush_deferred_wakeup);
>  
> -static int rdp_offload_toggle(struct rcu_data *rdp,
> -			       bool offload, unsigned long flags)
> -	__releases(rdp->nocb_lock)
> +static int rcu_nocb_queue_toggle_rdp(struct rcu_data *rdp)
>  {
> -	struct rcu_segcblist *cblist = &rdp->cblist;
>  	struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
>  	bool wake_gp = false;
> -
> -	rcu_segcblist_offload(cblist, offload);
> -	rcu_nocb_unlock_irqrestore(rdp, flags);
> +	unsigned long flags;
>  
>  	raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
>  	// Queue this rdp for add/del to/from the list to iterate on rcuog
> @@ -1041,9 +1025,25 @@ static int rdp_offload_toggle(struct rcu_data *rdp,
>  	return wake_gp;
>  }
>  
> +static bool rcu_nocb_rdp_deoffload_wait_cond(struct rcu_data *rdp)
> +{
> +	unsigned long flags;
> +	bool ret;
> +
> +	/*
> +	 * Locking makes sure rcuog is done handling this rdp before deoffloaded
> +	 * enqueue can happen. Also it keeps the SEGCBLIST_OFFLOADED flag stable
> +	 * while the ->nocb_lock is held.
> +	 */
> +	raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
> +	ret = !rcu_segcblist_test_flags(&rdp->cblist, SEGCBLIST_OFFLOADED);
> +	raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
> +
> +	return ret;
> +}
> +
>  static int rcu_nocb_rdp_deoffload(struct rcu_data *rdp)
>  {
> -	struct rcu_segcblist *cblist = &rdp->cblist;
>  	unsigned long flags;
>  	int wake_gp;
>  	struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
> @@ -1059,48 +1059,34 @@ static int rcu_nocb_rdp_deoffload(struct rcu_data *rdp)
>  	rcu_nocb_lock_irqsave(rdp, flags);
>  	WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
>  	WARN_ON_ONCE(rcu_segcblist_n_cbs(&rdp->cblist));
> +	rcu_nocb_unlock_irqrestore(rdp, flags);
>  
> -	wake_gp = rdp_offload_toggle(rdp, false, flags);
> +	wake_gp = rcu_nocb_queue_toggle_rdp(rdp);
>  
>  	mutex_lock(&rdp_gp->nocb_gp_kthread_mutex);
>  	if (rdp_gp->nocb_gp_kthread) {
>  		if (wake_gp)
>  			wake_up_process(rdp_gp->nocb_gp_kthread);
>  
> -		swait_event_exclusive(rdp->nocb_state_wq,
> -				      !rcu_segcblist_test_flags(cblist,
> -								SEGCBLIST_KTHREAD_GP));
>  		if (rdp->nocb_cb_kthread)
>  			kthread_park(rdp->nocb_cb_kthread);
> +
> +		swait_event_exclusive(rdp->nocb_state_wq,
> +				      rcu_nocb_rdp_deoffload_wait_cond(rdp));
>  	} else {
>  		/*
>  		 * No kthread to clear the flags for us or remove the rdp from the nocb list
>  		 * to iterate. Do it here instead. Locking doesn't look stricly necessary
>  		 * but we stick to paranoia in this rare path.
>  		 */
> -		rcu_nocb_lock_irqsave(rdp, flags);
> -		rcu_segcblist_clear_flags(&rdp->cblist, SEGCBLIST_KTHREAD_GP);
> -		rcu_nocb_unlock_irqrestore(rdp, flags);
> +		raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
> +		rcu_segcblist_clear_flags(&rdp->cblist, SEGCBLIST_OFFLOADED);
> +		raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
>  
>  		list_del(&rdp->nocb_entry_rdp);
>  	}
> -	mutex_unlock(&rdp_gp->nocb_gp_kthread_mutex);
>  
> -	/*
> -	 * Lock one last time to acquire latest callback updates from kthreads
> -	 * so we can later handle callbacks locally without locking.
> -	 */
> -	rcu_nocb_lock_irqsave(rdp, flags);
> -	/*
> -	 * Theoretically we could clear SEGCBLIST_LOCKING after the nocb
> -	 * lock is released but how about being paranoid for once?
> -	 */
> -	rcu_segcblist_clear_flags(cblist, SEGCBLIST_LOCKING);
> -	/*
> -	 * Without SEGCBLIST_LOCKING, we can't use
> -	 * rcu_nocb_unlock_irqrestore() anymore.
> -	 */
> -	raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
> +	mutex_unlock(&rdp_gp->nocb_gp_kthread_mutex);
>  
>  	return 0;
>  }
> @@ -1129,10 +1115,20 @@ int rcu_nocb_cpu_deoffload(int cpu)
>  }
>  EXPORT_SYMBOL_GPL(rcu_nocb_cpu_deoffload);
>  
> -static int rcu_nocb_rdp_offload(struct rcu_data *rdp)
> +static bool rcu_nocb_rdp_offload_wait_cond(struct rcu_data *rdp)
>  {
> -	struct rcu_segcblist *cblist = &rdp->cblist;
>  	unsigned long flags;
> +	bool ret;
> +
> +	raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
> +	ret = rcu_segcblist_test_flags(&rdp->cblist, SEGCBLIST_OFFLOADED);
> +	raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
> +
> +	return ret;
> +}
> +
> +static int rcu_nocb_rdp_offload(struct rcu_data *rdp)
> +{
>  	int wake_gp;
>  	struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
>  
> @@ -1152,20 +1148,14 @@ static int rcu_nocb_rdp_offload(struct rcu_data *rdp)
>  	WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
>  	WARN_ON_ONCE(rcu_segcblist_n_cbs(&rdp->cblist));
>  
> -	/*
> -	 * Can't use rcu_nocb_lock_irqsave() before SEGCBLIST_LOCKING
> -	 * is set.
> -	 */
> -	raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
> -
> -	wake_gp = rdp_offload_toggle(rdp, true, flags);
> +	wake_gp = rcu_nocb_queue_toggle_rdp(rdp);
>  	if (wake_gp)
>  		wake_up_process(rdp_gp->nocb_gp_kthread);
>  
> -	kthread_unpark(rdp->nocb_cb_kthread);
> -
>  	swait_event_exclusive(rdp->nocb_state_wq,
> -			      rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP));
> +			      rcu_nocb_rdp_offload_wait_cond(rdp));
> +
> +	kthread_unpark(rdp->nocb_cb_kthread);
>  
>  	return 0;
>  }
> @@ -1340,8 +1330,7 @@ void __init rcu_init_nohz(void)
>  		rdp = per_cpu_ptr(&rcu_data, cpu);
>  		if (rcu_segcblist_empty(&rdp->cblist))
>  			rcu_segcblist_init(&rdp->cblist);
> -		rcu_segcblist_offload(&rdp->cblist, true);
> -		rcu_segcblist_set_flags(&rdp->cblist, SEGCBLIST_KTHREAD_GP);
> +		rcu_segcblist_set_flags(&rdp->cblist, SEGCBLIST_OFFLOADED);
>  	}
>  	rcu_organize_nocb_kthreads();
>  }
> -- 
> 2.34.1
> 




[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux