[PATCH] rcu/nocb: Simplify (de-)offloading state machine

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Now that the (de-)offloading process can only apply to offline CPUs,
there is no more concurrency between rcu_core and nocb kthreads. Also
the mutation now happens on empty queues.

Therefore the state machine can be reduced to a single bit called
SEGCBLIST_OFFLOADED. Simplify the transition as follows:

* Upon offloading: queue the rdp to be added to the rcuog list and
  wait for the rcuog kthread to set the SEGCBLIST_OFFLOADED bit. Unpark
  rcuo kthread.

* Upon de-offloading: Park rcuo kthread. Queue the rdp to be removed
  from the rcuog list and wait for the rcuog kthread to clear the
  SEGCBLIST_OFFLOADED bit.

Signed-off-by: Frederic Weisbecker <frederic@xxxxxxxxxx>
Signed-off-by: Paul E. McKenney <paulmck@xxxxxxxxxx>
Reviewed-by: Paul E. McKenney <paulmck@xxxxxxxxxx>
Signed-off-by: Neeraj Upadhyay <neeraj.upadhyay@xxxxxxxxxx>
---
 include/linux/rcu_segcblist.h |   4 +-
 kernel/rcu/rcu_segcblist.c    |  11 ---
 kernel/rcu/rcu_segcblist.h    |   2 +-
 kernel/rcu/tree_nocb.h        | 138 ++++++++++++++++------------------
 4 files changed, 68 insertions(+), 87 deletions(-)

diff --git a/include/linux/rcu_segcblist.h b/include/linux/rcu_segcblist.h
index 1ef1bb54853d..2fdc2208f1ca 100644
--- a/include/linux/rcu_segcblist.h
+++ b/include/linux/rcu_segcblist.h
@@ -185,9 +185,7 @@ struct rcu_cblist {
  *  ----------------------------------------------------------------------------
  */
 #define SEGCBLIST_ENABLED	BIT(0)
-#define SEGCBLIST_LOCKING	BIT(1)
-#define SEGCBLIST_KTHREAD_GP	BIT(2)
-#define SEGCBLIST_OFFLOADED	BIT(3)
+#define SEGCBLIST_OFFLOADED	BIT(1)
 
 struct rcu_segcblist {
 	struct rcu_head *head;
diff --git a/kernel/rcu/rcu_segcblist.c b/kernel/rcu/rcu_segcblist.c
index 1693ea22ef1b..298a2c573f02 100644
--- a/kernel/rcu/rcu_segcblist.c
+++ b/kernel/rcu/rcu_segcblist.c
@@ -260,17 +260,6 @@ void rcu_segcblist_disable(struct rcu_segcblist *rsclp)
 	rcu_segcblist_clear_flags(rsclp, SEGCBLIST_ENABLED);
 }
 
-/*
- * Mark the specified rcu_segcblist structure as offloaded (or not)
- */
-void rcu_segcblist_offload(struct rcu_segcblist *rsclp, bool offload)
-{
-	if (offload)
-		rcu_segcblist_set_flags(rsclp, SEGCBLIST_LOCKING | SEGCBLIST_OFFLOADED);
-	else
-		rcu_segcblist_clear_flags(rsclp, SEGCBLIST_OFFLOADED);
-}
-
 /*
  * Does the specified rcu_segcblist structure contain callbacks that
  * are ready to be invoked?
diff --git a/kernel/rcu/rcu_segcblist.h b/kernel/rcu/rcu_segcblist.h
index 7a0962dfee86..259904075636 100644
--- a/kernel/rcu/rcu_segcblist.h
+++ b/kernel/rcu/rcu_segcblist.h
@@ -89,7 +89,7 @@ static inline bool rcu_segcblist_is_enabled(struct rcu_segcblist *rsclp)
 static inline bool rcu_segcblist_is_offloaded(struct rcu_segcblist *rsclp)
 {
 	if (IS_ENABLED(CONFIG_RCU_NOCB_CPU) &&
-	    rcu_segcblist_test_flags(rsclp, SEGCBLIST_LOCKING))
+	    rcu_segcblist_test_flags(rsclp, SEGCBLIST_OFFLOADED))
 		return true;
 
 	return false;
diff --git a/kernel/rcu/tree_nocb.h b/kernel/rcu/tree_nocb.h
index 6dac4f266c41..d7dc8ffa6ba8 100644
--- a/kernel/rcu/tree_nocb.h
+++ b/kernel/rcu/tree_nocb.h
@@ -604,37 +604,33 @@ static void call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *head,
 	}
 }
 
-static int nocb_gp_toggle_rdp(struct rcu_data *rdp)
+static void nocb_gp_toggle_rdp(struct rcu_data *rdp_gp, struct rcu_data *rdp)
 {
 	struct rcu_segcblist *cblist = &rdp->cblist;
 	unsigned long flags;
-	int ret;
 
-	rcu_nocb_lock_irqsave(rdp, flags);
-	if (rcu_segcblist_test_flags(cblist, SEGCBLIST_OFFLOADED) &&
-	    !rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP)) {
+	/*
+	 * Locking orders future de-offloaded callbacks enqueue against previous
+	 * handling of this rdp. Ie: Make sure rcuog is done with this rdp before
+	 * deoffloaded callbacks can be enqueued.
+	 */
+	raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
+	if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_OFFLOADED)) {
 		/*
 		 * Offloading. Set our flag and notify the offload worker.
 		 * We will handle this rdp until it ever gets de-offloaded.
 		 */
-		rcu_segcblist_set_flags(cblist, SEGCBLIST_KTHREAD_GP);
-		ret = 1;
-	} else if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_OFFLOADED) &&
-		   rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP)) {
+		list_add_tail(&rdp->nocb_entry_rdp, &rdp_gp->nocb_head_rdp);
+		rcu_segcblist_set_flags(cblist, SEGCBLIST_OFFLOADED);
+	} else {
 		/*
 		 * De-offloading. Clear our flag and notify the de-offload worker.
 		 * We will ignore this rdp until it ever gets re-offloaded.
 		 */
-		rcu_segcblist_clear_flags(cblist, SEGCBLIST_KTHREAD_GP);
-		ret = 0;
-	} else {
-		WARN_ON_ONCE(1);
-		ret = -1;
+		list_del(&rdp->nocb_entry_rdp);
+		rcu_segcblist_clear_flags(cblist, SEGCBLIST_OFFLOADED);
 	}
-
-	rcu_nocb_unlock_irqrestore(rdp, flags);
-
-	return ret;
+	raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
 }
 
 static void nocb_gp_sleep(struct rcu_data *my_rdp, int cpu)
@@ -841,14 +837,7 @@ static void nocb_gp_wait(struct rcu_data *my_rdp)
 	}
 
 	if (rdp_toggling) {
-		int ret;
-
-		ret = nocb_gp_toggle_rdp(rdp_toggling);
-		if (ret == 1)
-			list_add_tail(&rdp_toggling->nocb_entry_rdp, &my_rdp->nocb_head_rdp);
-		else if (ret == 0)
-			list_del(&rdp_toggling->nocb_entry_rdp);
-
+		nocb_gp_toggle_rdp(my_rdp, rdp_toggling);
 		swake_up_one(&rdp_toggling->nocb_state_wq);
 	}
 
@@ -1018,16 +1007,11 @@ void rcu_nocb_flush_deferred_wakeup(void)
 }
 EXPORT_SYMBOL_GPL(rcu_nocb_flush_deferred_wakeup);
 
-static int rdp_offload_toggle(struct rcu_data *rdp,
-			       bool offload, unsigned long flags)
-	__releases(rdp->nocb_lock)
+static int rcu_nocb_queue_toggle_rdp(struct rcu_data *rdp)
 {
-	struct rcu_segcblist *cblist = &rdp->cblist;
 	struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
 	bool wake_gp = false;
-
-	rcu_segcblist_offload(cblist, offload);
-	rcu_nocb_unlock_irqrestore(rdp, flags);
+	unsigned long flags;
 
 	raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
 	// Queue this rdp for add/del to/from the list to iterate on rcuog
@@ -1041,9 +1025,25 @@ static int rdp_offload_toggle(struct rcu_data *rdp,
 	return wake_gp;
 }
 
+static bool rcu_nocb_rdp_deoffload_wait_cond(struct rcu_data *rdp)
+{
+	unsigned long flags;
+	bool ret;
+
+	/*
+	 * Locking makes sure rcuog is done handling this rdp before deoffloaded
+	 * enqueue can happen. Also it keeps the SEGCBLIST_OFFLOADED flag stable
+	 * while the ->nocb_lock is held.
+	 */
+	raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
+	ret = !rcu_segcblist_test_flags(&rdp->cblist, SEGCBLIST_OFFLOADED);
+	raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
+
+	return ret;
+}
+
 static int rcu_nocb_rdp_deoffload(struct rcu_data *rdp)
 {
-	struct rcu_segcblist *cblist = &rdp->cblist;
 	unsigned long flags;
 	int wake_gp;
 	struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
@@ -1056,51 +1056,42 @@ static int rcu_nocb_rdp_deoffload(struct rcu_data *rdp)
 	/* Flush all callbacks from segcblist and bypass */
 	rcu_barrier();
 
+	/*
+	 * Make sure the rcuoc kthread isn't in the middle of a nocb locked
+	 * sequence while offloading is deactivated, along with nocb locking.
+	 */
+	if (rdp->nocb_cb_kthread)
+		kthread_park(rdp->nocb_cb_kthread);
+
 	rcu_nocb_lock_irqsave(rdp, flags);
 	WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
 	WARN_ON_ONCE(rcu_segcblist_n_cbs(&rdp->cblist));
+	rcu_nocb_unlock_irqrestore(rdp, flags);
 
-	wake_gp = rdp_offload_toggle(rdp, false, flags);
+	wake_gp = rcu_nocb_queue_toggle_rdp(rdp);
 
 	mutex_lock(&rdp_gp->nocb_gp_kthread_mutex);
+
 	if (rdp_gp->nocb_gp_kthread) {
 		if (wake_gp)
 			wake_up_process(rdp_gp->nocb_gp_kthread);
 
 		swait_event_exclusive(rdp->nocb_state_wq,
-				      !rcu_segcblist_test_flags(cblist,
-								SEGCBLIST_KTHREAD_GP));
-		if (rdp->nocb_cb_kthread)
-			kthread_park(rdp->nocb_cb_kthread);
+				      rcu_nocb_rdp_deoffload_wait_cond(rdp));
 	} else {
 		/*
 		 * No kthread to clear the flags for us or remove the rdp from the nocb list
 		 * to iterate. Do it here instead. Locking doesn't look stricly necessary
 		 * but we stick to paranoia in this rare path.
 		 */
-		rcu_nocb_lock_irqsave(rdp, flags);
-		rcu_segcblist_clear_flags(&rdp->cblist, SEGCBLIST_KTHREAD_GP);
-		rcu_nocb_unlock_irqrestore(rdp, flags);
+		raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
+		rcu_segcblist_clear_flags(&rdp->cblist, SEGCBLIST_OFFLOADED);
+		raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
 
 		list_del(&rdp->nocb_entry_rdp);
 	}
-	mutex_unlock(&rdp_gp->nocb_gp_kthread_mutex);
 
-	/*
-	 * Lock one last time to acquire latest callback updates from kthreads
-	 * so we can later handle callbacks locally without locking.
-	 */
-	rcu_nocb_lock_irqsave(rdp, flags);
-	/*
-	 * Theoretically we could clear SEGCBLIST_LOCKING after the nocb
-	 * lock is released but how about being paranoid for once?
-	 */
-	rcu_segcblist_clear_flags(cblist, SEGCBLIST_LOCKING);
-	/*
-	 * Without SEGCBLIST_LOCKING, we can't use
-	 * rcu_nocb_unlock_irqrestore() anymore.
-	 */
-	raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
+	mutex_unlock(&rdp_gp->nocb_gp_kthread_mutex);
 
 	return 0;
 }
@@ -1129,10 +1120,20 @@ int rcu_nocb_cpu_deoffload(int cpu)
 }
 EXPORT_SYMBOL_GPL(rcu_nocb_cpu_deoffload);
 
-static int rcu_nocb_rdp_offload(struct rcu_data *rdp)
+static bool rcu_nocb_rdp_offload_wait_cond(struct rcu_data *rdp)
 {
-	struct rcu_segcblist *cblist = &rdp->cblist;
 	unsigned long flags;
+	bool ret;
+
+	raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
+	ret = rcu_segcblist_test_flags(&rdp->cblist, SEGCBLIST_OFFLOADED);
+	raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
+
+	return ret;
+}
+
+static int rcu_nocb_rdp_offload(struct rcu_data *rdp)
+{
 	int wake_gp;
 	struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
 
@@ -1152,20 +1153,14 @@ static int rcu_nocb_rdp_offload(struct rcu_data *rdp)
 	WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
 	WARN_ON_ONCE(rcu_segcblist_n_cbs(&rdp->cblist));
 
-	/*
-	 * Can't use rcu_nocb_lock_irqsave() before SEGCBLIST_LOCKING
-	 * is set.
-	 */
-	raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
-
-	wake_gp = rdp_offload_toggle(rdp, true, flags);
+	wake_gp = rcu_nocb_queue_toggle_rdp(rdp);
 	if (wake_gp)
 		wake_up_process(rdp_gp->nocb_gp_kthread);
 
-	kthread_unpark(rdp->nocb_cb_kthread);
-
 	swait_event_exclusive(rdp->nocb_state_wq,
-			      rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP));
+			      rcu_nocb_rdp_offload_wait_cond(rdp));
+
+	kthread_unpark(rdp->nocb_cb_kthread);
 
 	return 0;
 }
@@ -1340,8 +1335,7 @@ void __init rcu_init_nohz(void)
 		rdp = per_cpu_ptr(&rcu_data, cpu);
 		if (rcu_segcblist_empty(&rdp->cblist))
 			rcu_segcblist_init(&rdp->cblist);
-		rcu_segcblist_offload(&rdp->cblist, true);
-		rcu_segcblist_set_flags(&rdp->cblist, SEGCBLIST_KTHREAD_GP);
+		rcu_segcblist_set_flags(&rdp->cblist, SEGCBLIST_OFFLOADED);
 	}
 	rcu_organize_nocb_kthreads();
 }
-- 
2.46.0





[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux