[tip:core/rcu] srcu: Introduce CLASSIC_SRCU Kconfig option

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Commit-ID:  dad81a2026841b5e2651aab58a7398c13cc05847
Gitweb:     http://git.kernel.org/tip/dad81a2026841b5e2651aab58a7398c13cc05847
Author:     Paul E. McKenney <paulmck@xxxxxxxxxxxxxxxxxx>
AuthorDate: Sat, 25 Mar 2017 17:23:44 -0700
Committer:  Paul E. McKenney <paulmck@xxxxxxxxxxxxxxxxxx>
CommitDate: Tue, 18 Apr 2017 11:38:23 -0700

srcu: Introduce CLASSIC_SRCU Kconfig option

The TREE_SRCU rewrite is large and a bit on the non-simple side, so
this commit helps reduce risk by allowing the old v4.11 SRCU algorithm
to be selected using a new CLASSIC_SRCU Kconfig option that depends
on RCU_EXPERT.  The default is to use the new TREE_SRCU and TINY_SRCU
algorithms, in order to help get these the testing that they need.
However, if your users do not require the update-side scalability that
is to be provided by TREE_SRCU, select RCU_EXPERT and then CLASSIC_SRCU
to revert back to the old classic SRCU algorithm.

Signed-off-by: Paul E. McKenney <paulmck@xxxxxxxxxxxxxxxxxx>
---
 include/linux/srcu.h                        |   2 +
 include/linux/{srcutree.h => srcuclassic.h} |  36 +--
 init/Kconfig                                |  21 +-
 kernel/rcu/Makefile                         |   3 +-
 kernel/rcu/rcutorture.c                     |   2 +-
 kernel/rcu/srcu.c                           | 347 ++++++++++++++++------------
 kernel/rcu/{srcu.c => srcutree.c}           |  48 ++--
 7 files changed, 265 insertions(+), 194 deletions(-)

diff --git a/include/linux/srcu.h b/include/linux/srcu.h
index 907f09b..167ad88 100644
--- a/include/linux/srcu.h
+++ b/include/linux/srcu.h
@@ -60,6 +60,8 @@ int init_srcu_struct(struct srcu_struct *sp);
 #include <linux/srcutiny.h>
 #elif defined(CONFIG_TREE_SRCU)
 #include <linux/srcutree.h>
+#elif defined(CONFIG_CLASSIC_SRCU)
+#include <linux/srcuclassic.h>
 #else
 #error "Unknown SRCU implementation specified to kernel configuration"
 #endif
diff --git a/include/linux/srcutree.h b/include/linux/srcuclassic.h
similarity index 77%
copy from include/linux/srcutree.h
copy to include/linux/srcuclassic.h
index f2b3bd6..41cf999 100644
--- a/include/linux/srcutree.h
+++ b/include/linux/srcuclassic.h
@@ -1,6 +1,6 @@
 /*
  * Sleepable Read-Copy Update mechanism for mutual exclusion,
- *	tree variant.
+ *	classic v4.11 variant.
  *
  * This program is free software; you can redistribute it and/or modify
  * it under the terms of the GNU General Public License as published by
@@ -21,32 +21,38 @@
  * Author: Paul McKenney <paulmck@xxxxxxxxxx>
  */
 
-#ifndef _LINUX_SRCU_TREE_H
-#define _LINUX_SRCU_TREE_H
+#ifndef _LINUX_SRCU_CLASSIC_H
+#define _LINUX_SRCU_CLASSIC_H
 
 struct srcu_array {
 	unsigned long lock_count[2];
 	unsigned long unlock_count[2];
 };
 
+struct rcu_batch {
+	struct rcu_head *head, **tail;
+};
+
+#define RCU_BATCH_INIT(name) { NULL, &(name.head) }
+
 struct srcu_struct {
 	unsigned long completed;
-	unsigned long srcu_gp_seq;
-	atomic_t srcu_exp_cnt;
 	struct srcu_array __percpu *per_cpu_ref;
-	spinlock_t queue_lock; /* protect ->srcu_cblist */
-	struct rcu_segcblist srcu_cblist;
+	spinlock_t queue_lock; /* protect ->batch_queue, ->running */
+	bool running;
+	/* callbacks just queued */
+	struct rcu_batch batch_queue;
+	/* callbacks try to do the first check_zero */
+	struct rcu_batch batch_check0;
+	/* callbacks done with the first check_zero and the flip */
+	struct rcu_batch batch_check1;
+	struct rcu_batch batch_done;
 	struct delayed_work work;
 #ifdef CONFIG_DEBUG_LOCK_ALLOC
 	struct lockdep_map dep_map;
 #endif /* #ifdef CONFIG_DEBUG_LOCK_ALLOC */
 };
 
-/* Values for -> state variable. */
-#define SRCU_STATE_IDLE		0
-#define SRCU_STATE_SCAN1	1
-#define SRCU_STATE_SCAN2	2
-
 void process_srcu(struct work_struct *work);
 
 #define __SRCU_STRUCT_INIT(name)					\
@@ -54,7 +60,11 @@ void process_srcu(struct work_struct *work);
 		.completed = -300,					\
 		.per_cpu_ref = &name##_srcu_array,			\
 		.queue_lock = __SPIN_LOCK_UNLOCKED(name.queue_lock),	\
-		.srcu_cblist = RCU_SEGCBLIST_INITIALIZER(name.srcu_cblist),\
+		.running = false,					\
+		.batch_queue = RCU_BATCH_INIT(name.batch_queue),	\
+		.batch_check0 = RCU_BATCH_INIT(name.batch_check0),	\
+		.batch_check1 = RCU_BATCH_INIT(name.batch_check1),	\
+		.batch_done = RCU_BATCH_INIT(name.batch_done),		\
 		.work = __DELAYED_WORK_INITIALIZER(name.work, process_srcu, 0),\
 		__SRCU_DEP_MAP_INIT(name)				\
 	}
diff --git a/init/Kconfig b/init/Kconfig
index d269f2c..558cc36 100644
--- a/init/Kconfig
+++ b/init/Kconfig
@@ -526,15 +526,32 @@ config SRCU
 	  permits arbitrary sleeping or blocking within RCU read-side critical
 	  sections.
 
+config CLASSIC_SRCU
+	bool "Use v4.11 classic SRCU implementation"
+	default n
+	depends on RCU_EXPERT && SRCU
+	help
+	  This option selects the traditional well-tested classic SRCU
+	  implementation from v4.11, as might be desired for enterprise
+	  Linux distributions.  Without this option, the shiny new
+	  Tiny SRCU and Tree SRCU implementations are used instead.
+	  At some point, it is hoped that Tiny SRCU and Tree SRCU
+	  will accumulate enough test time and confidence to allow
+	  Classic SRCU to be dropped entirely.
+
+	  Say Y if you need a rock-solid SRCU.
+
+	  Say N if you would like help test Tree SRCU.
+
 config TINY_SRCU
 	bool
-	default y if TINY_RCU
+	default y if TINY_RCU && !CLASSIC_SRCU
 	help
 	  This option selects the single-CPU non-preemptible version of SRCU.
 
 config TREE_SRCU
 	bool
-	default y if !TINY_RCU
+	default y if !TINY_RCU && !CLASSIC_SRCU
 	help
 	  This option selects the full-fledged version of SRCU.
 
diff --git a/kernel/rcu/Makefile b/kernel/rcu/Makefile
index b853214..158e659 100644
--- a/kernel/rcu/Makefile
+++ b/kernel/rcu/Makefile
@@ -3,7 +3,8 @@
 KCOV_INSTRUMENT := n
 
 obj-y += update.o sync.o
-obj-$(CONFIG_TREE_SRCU) += srcu.o
+obj-$(CONFIG_CLASSIC_SRCU) += srcu.o
+obj-$(CONFIG_TREE_SRCU) += srcutree.o
 obj-$(CONFIG_TINY_SRCU) += srcutiny.o
 obj-$(CONFIG_RCU_TORTURE_TEST) += rcutorture.o
 obj-$(CONFIG_RCU_PERF_TEST) += rcuperf.o
diff --git a/kernel/rcu/rcutorture.c b/kernel/rcu/rcutorture.c
index 9cbb8a7..6f344b6 100644
--- a/kernel/rcu/rcutorture.c
+++ b/kernel/rcu/rcutorture.c
@@ -562,7 +562,7 @@ static void srcu_torture_stats(void)
 	int __maybe_unused cpu;
 	int idx;
 
-#ifdef CONFIG_TREE_SRCU
+#if defined(CONFIG_TREE_SRCU) || defined(CONFIG_CLASSIC_SRCU)
 	idx = srcu_ctlp->completed & 0x1;
 	pr_alert("%s%s Tree SRCU per-CPU(idx=%d):",
 		 torture_type, TORTURE_FLAG, idx);
diff --git a/kernel/rcu/srcu.c b/kernel/rcu/srcu.c
index 3cfcc59..584d8a9 100644
--- a/kernel/rcu/srcu.c
+++ b/kernel/rcu/srcu.c
@@ -36,16 +36,75 @@
 #include <linux/delay.h>
 #include <linux/srcu.h>
 
-#include <linux/rcu_node_tree.h>
 #include "rcu.h"
 
+/*
+ * Initialize an rcu_batch structure to empty.
+ */
+static inline void rcu_batch_init(struct rcu_batch *b)
+{
+	b->head = NULL;
+	b->tail = &b->head;
+}
+
+/*
+ * Enqueue a callback onto the tail of the specified rcu_batch structure.
+ */
+static inline void rcu_batch_queue(struct rcu_batch *b, struct rcu_head *head)
+{
+	*b->tail = head;
+	b->tail = &head->next;
+}
+
+/*
+ * Is the specified rcu_batch structure empty?
+ */
+static inline bool rcu_batch_empty(struct rcu_batch *b)
+{
+	return b->tail == &b->head;
+}
+
+/*
+ * Remove the callback at the head of the specified rcu_batch structure
+ * and return a pointer to it, or return NULL if the structure is empty.
+ */
+static inline struct rcu_head *rcu_batch_dequeue(struct rcu_batch *b)
+{
+	struct rcu_head *head;
+
+	if (rcu_batch_empty(b))
+		return NULL;
+
+	head = b->head;
+	b->head = head->next;
+	if (b->tail == &head->next)
+		rcu_batch_init(b);
+
+	return head;
+}
+
+/*
+ * Move all callbacks from the rcu_batch structure specified by "from" to
+ * the structure specified by "to".
+ */
+static inline void rcu_batch_move(struct rcu_batch *to, struct rcu_batch *from)
+{
+	if (!rcu_batch_empty(from)) {
+		*to->tail = from->head;
+		to->tail = from->tail;
+		rcu_batch_init(from);
+	}
+}
+
 static int init_srcu_struct_fields(struct srcu_struct *sp)
 {
 	sp->completed = 0;
-	sp->srcu_gp_seq = 0;
-	atomic_set(&sp->srcu_exp_cnt, 0);
 	spin_lock_init(&sp->queue_lock);
-	rcu_segcblist_init(&sp->srcu_cblist);
+	sp->running = false;
+	rcu_batch_init(&sp->batch_queue);
+	rcu_batch_init(&sp->batch_check0);
+	rcu_batch_init(&sp->batch_check1);
+	rcu_batch_init(&sp->batch_done);
 	INIT_DELAYED_WORK(&sp->work, process_srcu);
 	sp->per_cpu_ref = alloc_percpu(struct srcu_array);
 	return sp->per_cpu_ref ? 0 : -ENOMEM;
@@ -180,8 +239,6 @@ static bool srcu_readers_active(struct srcu_struct *sp)
 	return sum;
 }
 
-#define SRCU_INTERVAL		1
-
 /**
  * cleanup_srcu_struct - deconstruct a sleep-RCU structure
  * @sp: structure to clean up.
@@ -197,16 +254,8 @@ static bool srcu_readers_active(struct srcu_struct *sp)
  */
 void cleanup_srcu_struct(struct srcu_struct *sp)
 {
-	WARN_ON_ONCE(atomic_read(&sp->srcu_exp_cnt));
 	if (WARN_ON(srcu_readers_active(sp)))
 		return; /* Leakage unless caller handles error. */
-	if (WARN_ON(!rcu_segcblist_empty(&sp->srcu_cblist)))
-		return; /* Leakage unless caller handles error. */
-	flush_delayed_work(&sp->work);
-	if (WARN_ON(rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) != SRCU_STATE_IDLE)) {
-		pr_info("cleanup_srcu_struct: Active srcu_struct %lu CBs %c state: %d\n", rcu_segcblist_n_cbs(&sp->srcu_cblist), ".E"[rcu_segcblist_empty(&sp->srcu_cblist)], rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)));
-		return; /* Caller forgot to stop doing call_srcu()? */
-	}
 	free_percpu(sp->per_cpu_ref);
 	sp->per_cpu_ref = NULL;
 }
@@ -245,36 +294,26 @@ EXPORT_SYMBOL_GPL(__srcu_read_unlock);
  * We use an adaptive strategy for synchronize_srcu() and especially for
  * synchronize_srcu_expedited().  We spin for a fixed time period
  * (defined below) to allow SRCU readers to exit their read-side critical
- * sections.  If there are still some readers after a few microseconds,
- * we repeatedly block for 1-millisecond time periods.
+ * sections.  If there are still some readers after 10 microseconds,
+ * we repeatedly block for 1-millisecond time periods.  This approach
+ * has done well in testing, so there is no need for a config parameter.
  */
 #define SRCU_RETRY_CHECK_DELAY		5
+#define SYNCHRONIZE_SRCU_TRYCOUNT	2
+#define SYNCHRONIZE_SRCU_EXP_TRYCOUNT	12
 
 /*
- * Start an SRCU grace period.
- */
-static void srcu_gp_start(struct srcu_struct *sp)
-{
-	int state;
-
-	rcu_segcblist_accelerate(&sp->srcu_cblist,
-				 rcu_seq_snap(&sp->srcu_gp_seq));
-	rcu_seq_start(&sp->srcu_gp_seq);
-	state = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq));
-	WARN_ON_ONCE(state != SRCU_STATE_SCAN1);
-}
-
-/*
- * Wait until all readers counted by array index idx complete, but
- * loop an additional time if there is an expedited grace period pending.
- * The caller must ensure that ->completed is not changed while checking.
+ * @@@ Wait until all pre-existing readers complete.  Such readers
+ * will have used the index specified by "idx".
+ * the caller should ensures the ->completed is not changed while checking
+ * and idx = (->completed & 1) ^ 1
  */
 static bool try_check_zero(struct srcu_struct *sp, int idx, int trycount)
 {
 	for (;;) {
 		if (srcu_readers_active_idx_check(sp, idx))
 			return true;
-		if (--trycount + !!atomic_read(&sp->srcu_exp_cnt) <= 0)
+		if (--trycount <= 0)
 			return false;
 		udelay(SRCU_RETRY_CHECK_DELAY);
 	}
@@ -300,19 +339,6 @@ static void srcu_flip(struct srcu_struct *sp)
 }
 
 /*
- * End an SRCU grace period.
- */
-static void srcu_gp_end(struct srcu_struct *sp)
-{
-	rcu_seq_end(&sp->srcu_gp_seq);
-
-	spin_lock_irq(&sp->queue_lock);
-	rcu_segcblist_advance(&sp->srcu_cblist,
-			      rcu_seq_current(&sp->srcu_gp_seq));
-	spin_unlock_irq(&sp->queue_lock);
-}
-
-/*
  * Enqueue an SRCU callback on the specified srcu_struct structure,
  * initiating grace-period processing if it is not already running.
  *
@@ -348,24 +374,26 @@ void call_srcu(struct srcu_struct *sp, struct rcu_head *head,
 	head->func = func;
 	spin_lock_irqsave(&sp->queue_lock, flags);
 	smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */
-	rcu_segcblist_enqueue(&sp->srcu_cblist, head, false);
-	if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_IDLE) {
-		srcu_gp_start(sp);
+	rcu_batch_queue(&sp->batch_queue, head);
+	if (!sp->running) {
+		sp->running = true;
 		queue_delayed_work(system_power_efficient_wq, &sp->work, 0);
 	}
 	spin_unlock_irqrestore(&sp->queue_lock, flags);
 }
 EXPORT_SYMBOL_GPL(call_srcu);
 
-static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay);
+static void srcu_advance_batches(struct srcu_struct *sp, int trycount);
+static void srcu_reschedule(struct srcu_struct *sp);
 
 /*
  * Helper function for synchronize_srcu() and synchronize_srcu_expedited().
  */
-static void __synchronize_srcu(struct srcu_struct *sp)
+static void __synchronize_srcu(struct srcu_struct *sp, int trycount)
 {
 	struct rcu_synchronize rcu;
 	struct rcu_head *head = &rcu.head;
+	bool done = false;
 
 	RCU_LOCKDEP_WARN(lock_is_held(&sp->dep_map) ||
 			 lock_is_held(&rcu_bh_lock_map) ||
@@ -373,8 +401,6 @@ static void __synchronize_srcu(struct srcu_struct *sp)
 			 lock_is_held(&rcu_sched_lock_map),
 			 "Illegal synchronize_srcu() in same-type SRCU (or in RCU) read-side critical section");
 
-	if (rcu_scheduler_active == RCU_SCHEDULER_INACTIVE)
-		return;
 	might_sleep();
 	init_completion(&rcu.completion);
 
@@ -382,47 +408,31 @@ static void __synchronize_srcu(struct srcu_struct *sp)
 	head->func = wakeme_after_rcu;
 	spin_lock_irq(&sp->queue_lock);
 	smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */
-	if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_IDLE) {
+	if (!sp->running) {
 		/* steal the processing owner */
-		rcu_segcblist_enqueue(&sp->srcu_cblist, head, false);
-		srcu_gp_start(sp);
+		sp->running = true;
+		rcu_batch_queue(&sp->batch_check0, head);
 		spin_unlock_irq(&sp->queue_lock);
+
+		srcu_advance_batches(sp, trycount);
+		if (!rcu_batch_empty(&sp->batch_done)) {
+			BUG_ON(sp->batch_done.head != head);
+			rcu_batch_dequeue(&sp->batch_done);
+			done = true;
+		}
 		/* give the processing owner to work_struct */
-		srcu_reschedule(sp, 0);
+		srcu_reschedule(sp);
 	} else {
-		rcu_segcblist_enqueue(&sp->srcu_cblist, head, false);
+		rcu_batch_queue(&sp->batch_queue, head);
 		spin_unlock_irq(&sp->queue_lock);
 	}
 
-	wait_for_completion(&rcu.completion);
-	smp_mb(); /* Caller's later accesses after GP. */
-}
-
-/**
- * synchronize_srcu_expedited - Brute-force SRCU grace period
- * @sp: srcu_struct with which to synchronize.
- *
- * Wait for an SRCU grace period to elapse, but be more aggressive about
- * spinning rather than blocking when waiting.
- *
- * Note that synchronize_srcu_expedited() has the same deadlock and
- * memory-ordering properties as does synchronize_srcu().
- */
-void synchronize_srcu_expedited(struct srcu_struct *sp)
-{
-	bool do_norm = rcu_gp_is_normal();
-
-	if (!do_norm) {
-		atomic_inc(&sp->srcu_exp_cnt);
-		smp_mb__after_atomic(); /* increment before GP. */
-	}
-	__synchronize_srcu(sp);
-	if (!do_norm) {
-		smp_mb__before_atomic(); /* GP before decrement. */
-		atomic_dec(&sp->srcu_exp_cnt);
+	if (!done) {
+		wait_for_completion(&rcu.completion);
+		smp_mb(); /* Caller's later accesses after GP. */
 	}
+
 }
-EXPORT_SYMBOL_GPL(synchronize_srcu_expedited);
 
 /**
  * synchronize_srcu - wait for prior SRCU read-side critical-section completion
@@ -465,14 +475,29 @@ EXPORT_SYMBOL_GPL(synchronize_srcu_expedited);
  */
 void synchronize_srcu(struct srcu_struct *sp)
 {
-	if (rcu_gp_is_expedited())
-		synchronize_srcu_expedited(sp);
-	else
-		__synchronize_srcu(sp);
+	__synchronize_srcu(sp, (rcu_gp_is_expedited() && !rcu_gp_is_normal())
+			   ? SYNCHRONIZE_SRCU_EXP_TRYCOUNT
+			   : SYNCHRONIZE_SRCU_TRYCOUNT);
 }
 EXPORT_SYMBOL_GPL(synchronize_srcu);
 
 /**
+ * synchronize_srcu_expedited - Brute-force SRCU grace period
+ * @sp: srcu_struct with which to synchronize.
+ *
+ * Wait for an SRCU grace period to elapse, but be more aggressive about
+ * spinning rather than blocking when waiting.
+ *
+ * Note that synchronize_srcu_expedited() has the same deadlock and
+ * memory-ordering properties as does synchronize_srcu().
+ */
+void synchronize_srcu_expedited(struct srcu_struct *sp)
+{
+	__synchronize_srcu(sp, SYNCHRONIZE_SRCU_EXP_TRYCOUNT);
+}
+EXPORT_SYMBOL_GPL(synchronize_srcu_expedited);
+
+/**
  * srcu_barrier - Wait until all in-flight call_srcu() callbacks complete.
  * @sp: srcu_struct on which to wait for in-flight callbacks.
  */
@@ -495,13 +520,29 @@ unsigned long srcu_batches_completed(struct srcu_struct *sp)
 }
 EXPORT_SYMBOL_GPL(srcu_batches_completed);
 
+#define SRCU_CALLBACK_BATCH	10
+#define SRCU_INTERVAL		1
+
+/*
+ * Move any new SRCU callbacks to the first stage of the SRCU grace
+ * period pipeline.
+ */
+static void srcu_collect_new(struct srcu_struct *sp)
+{
+	if (!rcu_batch_empty(&sp->batch_queue)) {
+		spin_lock_irq(&sp->queue_lock);
+		rcu_batch_move(&sp->batch_check0, &sp->batch_queue);
+		spin_unlock_irq(&sp->queue_lock);
+	}
+}
+
 /*
  * Core SRCU state machine.  Advance callbacks from ->batch_check0 to
  * ->batch_check1 and then to ->batch_done as readers drain.
  */
-static void srcu_advance_batches(struct srcu_struct *sp)
+static void srcu_advance_batches(struct srcu_struct *sp, int trycount)
 {
-	int idx;
+	int idx = 1 ^ (sp->completed & 1);
 
 	/*
 	 * Because readers might be delayed for an extended period after
@@ -509,44 +550,50 @@ static void srcu_advance_batches(struct srcu_struct *sp)
 	 * might well be readers using both idx=0 and idx=1.  We therefore
 	 * need to wait for readers to clear from both index values before
 	 * invoking a callback.
-	 *
-	 * The load-acquire ensures that we see the accesses performed
-	 * by the prior grace period.
 	 */
-	idx = rcu_seq_state(smp_load_acquire(&sp->srcu_gp_seq)); /* ^^^ */
-	if (idx == SRCU_STATE_IDLE) {
-		spin_lock_irq(&sp->queue_lock);
-		if (rcu_segcblist_empty(&sp->srcu_cblist)) {
-			spin_unlock_irq(&sp->queue_lock);
-			return;
-		}
-		idx = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq));
-		if (idx == SRCU_STATE_IDLE)
-			srcu_gp_start(sp);
-		spin_unlock_irq(&sp->queue_lock);
-		if (idx != SRCU_STATE_IDLE)
-			return; /* Someone else started the grace period. */
-	}
 
-	if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_SCAN1) {
-		idx = 1 ^ (sp->completed & 1);
-		if (!try_check_zero(sp, idx, 1))
-			return; /* readers present, retry later. */
-		srcu_flip(sp);
-		rcu_seq_set_state(&sp->srcu_gp_seq, SRCU_STATE_SCAN2);
-	}
+	if (rcu_batch_empty(&sp->batch_check0) &&
+	    rcu_batch_empty(&sp->batch_check1))
+		return; /* no callbacks need to be advanced */
 
-	if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_SCAN2) {
+	if (!try_check_zero(sp, idx, trycount))
+		return; /* failed to advance, will try after SRCU_INTERVAL */
 
-		/*
-		 * SRCU read-side critical sections are normally short,
-		 * so check at least twice in quick succession after a flip.
-		 */
-		idx = 1 ^ (sp->completed & 1);
-		if (!try_check_zero(sp, idx, 2))
-			return; /* readers present, retry after later. */
-		srcu_gp_end(sp);
-	}
+	/*
+	 * The callbacks in ->batch_check1 have already done with their
+	 * first zero check and flip back when they were enqueued on
+	 * ->batch_check0 in a previous invocation of srcu_advance_batches().
+	 * (Presumably try_check_zero() returned false during that
+	 * invocation, leaving the callbacks stranded on ->batch_check1.)
+	 * They are therefore ready to invoke, so move them to ->batch_done.
+	 */
+	rcu_batch_move(&sp->batch_done, &sp->batch_check1);
+
+	if (rcu_batch_empty(&sp->batch_check0))
+		return; /* no callbacks need to be advanced */
+	srcu_flip(sp);
+
+	/*
+	 * The callbacks in ->batch_check0 just finished their
+	 * first check zero and flip, so move them to ->batch_check1
+	 * for future checking on the other idx.
+	 */
+	rcu_batch_move(&sp->batch_check1, &sp->batch_check0);
+
+	/*
+	 * SRCU read-side critical sections are normally short, so check
+	 * at least twice in quick succession after a flip.
+	 */
+	trycount = trycount < 2 ? 2 : trycount;
+	if (!try_check_zero(sp, idx^1, trycount))
+		return; /* failed to advance, will try after SRCU_INTERVAL */
+
+	/*
+	 * The callbacks in ->batch_check1 have now waited for all
+	 * pre-existing readers using both idx values.  They are therefore
+	 * ready to invoke, so move them to ->batch_done.
+	 */
+	rcu_batch_move(&sp->batch_done, &sp->batch_check1);
 }
 
 /*
@@ -557,48 +604,45 @@ static void srcu_advance_batches(struct srcu_struct *sp)
  */
 static void srcu_invoke_callbacks(struct srcu_struct *sp)
 {
-	struct rcu_cblist ready_cbs;
-	struct rcu_head *rhp;
+	int i;
+	struct rcu_head *head;
 
-	spin_lock_irq(&sp->queue_lock);
-	if (!rcu_segcblist_ready_cbs(&sp->srcu_cblist)) {
-		spin_unlock_irq(&sp->queue_lock);
-		return;
-	}
-	rcu_cblist_init(&ready_cbs);
-	rcu_segcblist_extract_done_cbs(&sp->srcu_cblist, &ready_cbs);
-	spin_unlock_irq(&sp->queue_lock);
-	rhp = rcu_cblist_dequeue(&ready_cbs);
-	for (; rhp != NULL; rhp = rcu_cblist_dequeue(&ready_cbs)) {
+	for (i = 0; i < SRCU_CALLBACK_BATCH; i++) {
+		head = rcu_batch_dequeue(&sp->batch_done);
+		if (!head)
+			break;
 		local_bh_disable();
-		rhp->func(rhp);
+		head->func(head);
 		local_bh_enable();
 	}
-	spin_lock_irq(&sp->queue_lock);
-	rcu_segcblist_insert_count(&sp->srcu_cblist, &ready_cbs);
-	spin_unlock_irq(&sp->queue_lock);
 }
 
 /*
  * Finished one round of SRCU grace period.  Start another if there are
  * more SRCU callbacks queued, otherwise put SRCU into not-running state.
  */
-static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay)
+static void srcu_reschedule(struct srcu_struct *sp)
 {
 	bool pending = true;
-	int state;
 
-	if (rcu_segcblist_empty(&sp->srcu_cblist)) {
+	if (rcu_batch_empty(&sp->batch_done) &&
+	    rcu_batch_empty(&sp->batch_check1) &&
+	    rcu_batch_empty(&sp->batch_check0) &&
+	    rcu_batch_empty(&sp->batch_queue)) {
 		spin_lock_irq(&sp->queue_lock);
-		state = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq));
-		if (rcu_segcblist_empty(&sp->srcu_cblist) &&
-		    state == SRCU_STATE_IDLE)
+		if (rcu_batch_empty(&sp->batch_done) &&
+		    rcu_batch_empty(&sp->batch_check1) &&
+		    rcu_batch_empty(&sp->batch_check0) &&
+		    rcu_batch_empty(&sp->batch_queue)) {
+			sp->running = false;
 			pending = false;
+		}
 		spin_unlock_irq(&sp->queue_lock);
 	}
 
 	if (pending)
-		queue_delayed_work(system_power_efficient_wq, &sp->work, delay);
+		queue_delayed_work(system_power_efficient_wq,
+				   &sp->work, SRCU_INTERVAL);
 }
 
 /*
@@ -610,8 +654,9 @@ void process_srcu(struct work_struct *work)
 
 	sp = container_of(work, struct srcu_struct, work.work);
 
-	srcu_advance_batches(sp);
+	srcu_collect_new(sp);
+	srcu_advance_batches(sp, 1);
 	srcu_invoke_callbacks(sp);
-	srcu_reschedule(sp, atomic_read(&sp->srcu_exp_cnt) ? 0 : SRCU_INTERVAL);
+	srcu_reschedule(sp);
 }
 EXPORT_SYMBOL_GPL(process_srcu);
diff --git a/kernel/rcu/srcu.c b/kernel/rcu/srcutree.c
similarity index 92%
copy from kernel/rcu/srcu.c
copy to kernel/rcu/srcutree.c
index 3cfcc59..da676b0 100644
--- a/kernel/rcu/srcu.c
+++ b/kernel/rcu/srcutree.c
@@ -126,31 +126,33 @@ static bool srcu_readers_active_idx_check(struct srcu_struct *sp, int idx)
 	unlocks = srcu_readers_unlock_idx(sp, idx);
 
 	/*
-	 * Make sure that a lock is always counted if the corresponding unlock
-	 * is counted. Needs to be a smp_mb() as the read side may contain a
-	 * read from a variable that is written to before the synchronize_srcu()
-	 * in the write side. In this case smp_mb()s A and B act like the store
-	 * buffering pattern.
+	 * Make sure that a lock is always counted if the corresponding
+	 * unlock is counted. Needs to be a smp_mb() as the read side may
+	 * contain a read from a variable that is written to before the
+	 * synchronize_srcu() in the write side. In this case smp_mb()s
+	 * A and B act like the store buffering pattern.
 	 *
-	 * This smp_mb() also pairs with smp_mb() C to prevent accesses after the
-	 * synchronize_srcu() from being executed before the grace period ends.
+	 * This smp_mb() also pairs with smp_mb() C to prevent accesses
+	 * after the synchronize_srcu() from being executed before the
+	 * grace period ends.
 	 */
 	smp_mb(); /* A */
 
 	/*
 	 * If the locks are the same as the unlocks, then there must have
-	 * been no readers on this index at some time in between. This does not
-	 * mean that there are no more readers, as one could have read the
-	 * current index but not have incremented the lock counter yet.
+	 * been no readers on this index at some time in between. This does
+	 * not mean that there are no more readers, as one could have read
+	 * the current index but not have incremented the lock counter yet.
 	 *
-	 * Possible bug: There is no guarantee that there haven't been ULONG_MAX
-	 * increments of ->lock_count[] since the unlocks were counted, meaning
-	 * that this could return true even if there are still active readers.
-	 * Since there are no memory barriers around srcu_flip(), the CPU is not
-	 * required to increment ->completed before running
-	 * srcu_readers_unlock_idx(), which means that there could be an
-	 * arbitrarily large number of critical sections that execute after
-	 * srcu_readers_unlock_idx() but use the old value of ->completed.
+	 * Possible bug: There is no guarantee that there haven't been
+	 * ULONG_MAX increments of ->lock_count[] since the unlocks were
+	 * counted, meaning that this could return true even if there are
+	 * still active readers.  Since there are no memory barriers around
+	 * srcu_flip(), the CPU is not required to increment ->completed
+	 * before running srcu_readers_unlock_idx(), which means that there
+	 * could be an arbitrarily large number of critical sections that
+	 * execute after srcu_readers_unlock_idx() but use the old value
+	 * of ->completed.
 	 */
 	return srcu_readers_lock_idx(sp, idx) == unlocks;
 }
@@ -186,14 +188,8 @@ static bool srcu_readers_active(struct srcu_struct *sp)
  * cleanup_srcu_struct - deconstruct a sleep-RCU structure
  * @sp: structure to clean up.
  *
- * Must invoke this only after you are finished using a given srcu_struct
- * that was initialized via init_srcu_struct().  This code does some
- * probabalistic checking, spotting late uses of srcu_read_lock(),
- * synchronize_srcu(), synchronize_srcu_expedited(), and call_srcu().
- * If any such late uses are detected, the per-CPU memory associated with
- * the srcu_struct is simply leaked and WARN_ON() is invoked.  If the
- * caller frees the srcu_struct itself, a use-after-free crash will likely
- * ensue, but at least there will be a warning printed.
+ * Must invoke this after you are finished using a given srcu_struct that
+ * was initialized via init_srcu_struct(), else you leak memory.
  */
 void cleanup_srcu_struct(struct srcu_struct *sp)
 {
--
To unsubscribe from this list: send the line "unsubscribe linux-tip-commits" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html



[Index of Archives]     [Linux Stable Commits]     [Linux Stable Kernel]     [Linux Kernel]     [Linux USB Devel]     [Linux Video &Media]     [Linux Audio Users]     [Yosemite News]     [Linux SCSI]

  Powered by Linux