[RFC][PATCH 19/26] srcu: Implement call_srcu()

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Implement call_srcu() by using a state machine driven by
call_rcu_sched() and timer callbacks.

The state machine is a direct derivation of the existing
synchronize_srcu() code and replaces synchronize_sched() calls with a
call_rcu_sched() callback and the schedule_timeout() calls with simple
timer callbacks.

It then re-implements synchronize_srcu() using a completion where we
send the complete through call_srcu().

It completely wrecks synchronize_srcu_extradited() which is only used
by KVM.

Signed-off-by: Peter Zijlstra <a.p.zijlstra@xxxxxxxxx>
---
 include/linux/srcu.h |   23 +++
 kernel/srcu.c        |  304 +++++++++++++++++++++++++++++----------------------
 2 files changed, 196 insertions(+), 131 deletions(-)

--- a/include/linux/srcu.h
+++ b/include/linux/srcu.h
@@ -27,17 +27,35 @@
 #ifndef _LINUX_SRCU_H
 #define _LINUX_SRCU_H
 
-#include <linux/mutex.h>
+#include <linux/spinlock.h>
 #include <linux/rcupdate.h>
+#include <linux/timer.h>
 
 struct srcu_struct_array {
 	int c[2];
 };
 
+enum srcu_state {
+	srcu_idle,
+	srcu_sync_1,
+	srcu_sync_2,
+	srcu_sync_2b,
+	srcu_wait,
+	srcu_wait_b,
+	srcu_sync_3,
+	srcu_sync_3b,
+};
+
 struct srcu_struct {
 	int completed;
 	struct srcu_struct_array __percpu *per_cpu_ref;
-	struct mutex mutex;
+	raw_spinlock_t lock;
+	enum srcu_state state;
+	union {
+		struct rcu_head head;
+		struct timer_list timer;
+	};
+	struct rcu_head *pending[2];
 #ifdef CONFIG_DEBUG_LOCK_ALLOC
 	struct lockdep_map dep_map;
 #endif /* #ifdef CONFIG_DEBUG_LOCK_ALLOC */
@@ -73,6 +91,7 @@ void __srcu_read_unlock(struct srcu_stru
 void synchronize_srcu(struct srcu_struct *sp);
 void synchronize_srcu_expedited(struct srcu_struct *sp);
 long srcu_batches_completed(struct srcu_struct *sp);
+void call_srcu(struct srcu_struct *sp, struct rcu_head *head, void (*func)(struct rcu_head *));
 
 #ifdef CONFIG_DEBUG_LOCK_ALLOC
 
--- a/kernel/srcu.c
+++ b/kernel/srcu.c
@@ -16,6 +16,7 @@
  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
  *
  * Copyright (C) IBM Corporation, 2006
+ * Copyright (C) 2012 Red Hat, Inc., Peter Zijlstra <pzijlstr@xxxxxxxxxx>
  *
  * Author: Paul McKenney <paulmck@xxxxxxxxxx>
  *
@@ -33,11 +34,14 @@
 #include <linux/smp.h>
 #include <linux/delay.h>
 #include <linux/srcu.h>
+#include <linux/completion.h>
 
 static int init_srcu_struct_fields(struct srcu_struct *sp)
 {
 	sp->completed = 0;
-	mutex_init(&sp->mutex);
+	raw_spin_lock_init(&sp->lock);
+	sp->state = srcu_idle;
+	sp->pending[0] = sp->pending[1] = NULL;
 	sp->per_cpu_ref = alloc_percpu(struct srcu_struct_array);
 	return sp->per_cpu_ref ? 0 : -ENOMEM;
 }
@@ -155,119 +159,190 @@ void __srcu_read_unlock(struct srcu_stru
 }
 EXPORT_SYMBOL_GPL(__srcu_read_unlock);
 
-/*
- * We use an adaptive strategy for synchronize_srcu() and especially for
- * synchronize_srcu_expedited().  We spin for a fixed time period
- * (defined below) to allow SRCU readers to exit their read-side critical
- * sections.  If there are still some readers after 10 microseconds,
- * we repeatedly block for 1-millisecond time periods.  This approach
- * has done well in testing, so there is no need for a config parameter.
+
+/**
+ * synchronize_srcu_expedited - like synchronize_srcu, but less patient
+ * @sp: srcu_struct with which to synchronize.
+ *
+ * Note that it is illegal to call synchronize_srcu_expedited()
+ * from the corresponding SRCU read-side critical section; doing so
+ * will result in deadlock.  However, it is perfectly legal to call
+ * synchronize_srcu_expedited() on one srcu_struct from some other
+ * srcu_struct's read-side critical section.
  */
-#define SYNCHRONIZE_SRCU_READER_DELAY 10
+void synchronize_srcu_expedited(struct srcu_struct *sp)
+{
+	/* XXX kill me */
+	synchronize_srcu(sp);
+}
+EXPORT_SYMBOL_GPL(synchronize_srcu_expedited);
 
-/*
- * Helper function for synchronize_srcu() and synchronize_srcu_expedited().
+/**
+ * srcu_batches_completed - return batches completed.
+ * @sp: srcu_struct on which to report batch completion.
+ *
+ * Report the number of batches, correlated with, but not necessarily
+ * precisely the same as, the number of grace periods that have elapsed.
  */
-static void __synchronize_srcu(struct srcu_struct *sp, void (*sync_func)(void))
+long srcu_batches_completed(struct srcu_struct *sp)
 {
-	int idx;
+	return sp->completed;
+}
+EXPORT_SYMBOL_GPL(srcu_batches_completed);
+
+static void do_srcu_state(struct srcu_struct *sp);
 
-	idx = sp->completed;
-	mutex_lock(&sp->mutex);
+static void do_srcu_state_timer(unsigned long __data)
+{
+	struct srcu_struct *sp = (void *)__data;
+	do_srcu_state(sp);
+}
 
-	/*
-	 * Check to see if someone else did the work for us while we were
-	 * waiting to acquire the lock.  We need -two- advances of
-	 * the counter, not just one.  If there was but one, we might have
-	 * shown up -after- our helper's first synchronize_sched(), thus
-	 * having failed to prevent CPU-reordering races with concurrent
-	 * srcu_read_unlock()s on other CPUs (see comment below).  So we
-	 * either (1) wait for two or (2) supply the second ourselves.
-	 */
+static void do_srcu_state_rcu(struct rcu_head *head)
+{
+	struct srcu_struct *sp = container_of(head, struct srcu_struct, head);
+	do_srcu_state(sp);
+}
 
-	if ((sp->completed - idx) >= 2) {
-		mutex_unlock(&sp->mutex);
-		return;
+static void do_srcu_state(struct srcu_struct *sp)
+{
+	struct rcu_head *head, *next;
+	unsigned long flags;
+	int idx;
+
+	raw_spin_lock_irqsave(&sp->lock, flags);
+	switch (sp->state) {
+	case srcu_idle:
+		BUG();
+
+	case srcu_sync_1:
+		/*
+		 * The preceding synchronize_sched() ensures that any CPU that
+		 * sees the new value of sp->completed will also see any
+		 * preceding changes to data structures made by this CPU.  This
+		 * prevents some other CPU from reordering the accesses in its
+		 * SRCU read-side critical section to precede the corresponding
+		 * srcu_read_lock() -- ensuring that such references will in
+		 * fact be protected.
+		 *
+		 * So it is now safe to do the flip.
+		 */
+		idx = sp->completed & 0x1;
+		sp->completed++;
+
+		sp->state = srcu_sync_2 + idx;
+		call_rcu_sched(&sp->head, do_srcu_state_rcu);
+		break;
+
+	case srcu_sync_2:
+	case srcu_sync_2b:
+		idx = sp->state - srcu_sync_2;
+
+		init_timer(&sp->timer);
+		sp->timer.data = (unsigned long)sp;
+		sp->timer.function = do_srcu_state_timer;
+		sp->state = srcu_wait + idx;
+
+		/*
+		 * At this point, because of the preceding synchronize_sched(),
+		 * all srcu_read_lock() calls using the old counters have
+		 * completed. Their corresponding critical sections might well
+		 * be still executing, but the srcu_read_lock() primitives
+		 * themselves will have finished executing.
+		 */
+test_pending:
+		if (!srcu_readers_active_idx(sp, idx)) {
+			sp->state = srcu_sync_3 + idx;
+			call_rcu_sched(&sp->head, do_srcu_state_rcu);
+			break;
+		}
+
+		mod_timer(&sp->timer, jiffies + 1);
+		break;
+
+	case srcu_wait:
+	case srcu_wait_b:
+		idx = sp->state - srcu_wait;
+		goto test_pending;
+
+	case srcu_sync_3:
+	case srcu_sync_3b:
+		idx = sp->state - srcu_sync_3;
+		/*
+		 * The preceding synchronize_sched() forces all
+		 * srcu_read_unlock() primitives that were executing
+		 * concurrently with the preceding for_each_possible_cpu() loop
+		 * to have completed by this point. More importantly, it also
+		 * forces the corresponding SRCU read-side critical sections to
+		 * have also completed, and the corresponding references to
+		 * SRCU-protected data items to be dropped.
+		 */
+		head = sp->pending[idx];
+		sp->pending[idx] = NULL;
+		raw_spin_unlock(&sp->lock);
+		while (head) {
+			next = head->next;
+			head->func(head);
+			head = next;
+		}
+		raw_spin_lock(&sp->lock);
+
+		/*
+		 * If there's a new batch waiting...
+		 */
+		if (sp->pending[idx ^ 1]) {
+			sp->state = srcu_sync_1;
+			call_rcu_sched(&sp->head, do_srcu_state_rcu);
+			break;
+		}
+
+		/*
+		 * We done!!
+		 */
+		sp->state = srcu_idle;
+		break;
 	}
+	raw_spin_unlock_irqrestore(&sp->lock, flags);
+}
 
-	sync_func();  /* Force memory barrier on all CPUs. */
+void call_srcu(struct srcu_struct *sp,
+	       struct rcu_head *head, void (*func)(struct rcu_head *))
+{
+	unsigned long flags;
+	int idx;
 
-	/*
-	 * The preceding synchronize_sched() ensures that any CPU that
-	 * sees the new value of sp->completed will also see any preceding
-	 * changes to data structures made by this CPU.  This prevents
-	 * some other CPU from reordering the accesses in its SRCU
-	 * read-side critical section to precede the corresponding
-	 * srcu_read_lock() -- ensuring that such references will in
-	 * fact be protected.
-	 *
-	 * So it is now safe to do the flip.
-	 */
+	head->func = func;
 
-	idx = sp->completed & 0x1;
-	sp->completed++;
+	raw_spin_lock_irqsave(&sp->lock, flags);
+	idx = sp->completed & 1;
+	barrier(); /* look at sp->completed once */
+	head->next = sp->pending[idx];
+	sp->pending[idx] = head;
+
+	if (sp->state == srcu_idle) {
+		sp->state = srcu_sync_1;
+		call_rcu_sched(&sp->head, do_srcu_state_rcu);
+	}
+	raw_spin_unlock_irqrestore(&sp->lock, flags);
+}
+EXPORT_SYMBOL_GPL(call_srcu);
 
-	sync_func();  /* Force memory barrier on all CPUs. */
+struct srcu_waiter {
+	struct completion wait;
+	struct rcu_head head;
+};
 
-	/*
-	 * At this point, because of the preceding synchronize_sched(),
-	 * all srcu_read_lock() calls using the old counters have completed.
-	 * Their corresponding critical sections might well be still
-	 * executing, but the srcu_read_lock() primitives themselves
-	 * will have finished executing.  We initially give readers
-	 * an arbitrarily chosen 10 microseconds to get out of their
-	 * SRCU read-side critical sections, then loop waiting 1/HZ
-	 * seconds per iteration.  The 10-microsecond value has done
-	 * very well in testing.
-	 */
-
-	if (srcu_readers_active_idx(sp, idx))
-		udelay(SYNCHRONIZE_SRCU_READER_DELAY);
-	while (srcu_readers_active_idx(sp, idx))
-		schedule_timeout_interruptible(1);
-
-	sync_func();  /* Force memory barrier on all CPUs. */
-
-	/*
-	 * The preceding synchronize_sched() forces all srcu_read_unlock()
-	 * primitives that were executing concurrently with the preceding
-	 * for_each_possible_cpu() loop to have completed by this point.
-	 * More importantly, it also forces the corresponding SRCU read-side
-	 * critical sections to have also completed, and the corresponding
-	 * references to SRCU-protected data items to be dropped.
-	 *
-	 * Note:
-	 *
-	 *	Despite what you might think at first glance, the
-	 *	preceding synchronize_sched() -must- be within the
-	 *	critical section ended by the following mutex_unlock().
-	 *	Otherwise, a task taking the early exit can race
-	 *	with a srcu_read_unlock(), which might have executed
-	 *	just before the preceding srcu_readers_active() check,
-	 *	and whose CPU might have reordered the srcu_read_unlock()
-	 *	with the preceding critical section.  In this case, there
-	 *	is nothing preventing the synchronize_sched() task that is
-	 *	taking the early exit from freeing a data structure that
-	 *	is still being referenced (out of order) by the task
-	 *	doing the srcu_read_unlock().
-	 *
-	 *	Alternatively, the comparison with "2" on the early exit
-	 *	could be changed to "3", but this increases synchronize_srcu()
-	 *	latency for bulk loads.  So the current code is preferred.
-	 */
+static void synchronize_srcu_complete(struct rcu_head *head)
+{
+	struct srcu_waiter *waiter = container_of(head, struct srcu_waiter, head);
 
-	mutex_unlock(&sp->mutex);
+	complete(&waiter->wait);
 }
 
 /**
  * synchronize_srcu - wait for prior SRCU read-side critical-section completion
  * @sp: srcu_struct with which to synchronize.
  *
- * Flip the completed counter, and wait for the old count to drain to zero.
- * As with classic RCU, the updater must use some separate means of
- * synchronizing concurrent updates.  Can block; must be called from
- * process context.
- *
  * Note that it is illegal to call synchronize_srcu() from the corresponding
  * SRCU read-side critical section; doing so will result in deadlock.
  * However, it is perfectly legal to call synchronize_srcu() on one
@@ -275,41 +350,12 @@ static void __synchronize_srcu(struct sr
  */
 void synchronize_srcu(struct srcu_struct *sp)
 {
-	__synchronize_srcu(sp, synchronize_sched);
-}
-EXPORT_SYMBOL_GPL(synchronize_srcu);
+	struct srcu_waiter waiter = {
+		.wait = COMPLETION_INITIALIZER_ONSTACK(waiter.wait),
+	};
 
-/**
- * synchronize_srcu_expedited - like synchronize_srcu, but less patient
- * @sp: srcu_struct with which to synchronize.
- *
- * Flip the completed counter, and wait for the old count to drain to zero.
- * As with classic RCU, the updater must use some separate means of
- * synchronizing concurrent updates.  Can block; must be called from
- * process context.
- *
- * Note that it is illegal to call synchronize_srcu_expedited()
- * from the corresponding SRCU read-side critical section; doing so
- * will result in deadlock.  However, it is perfectly legal to call
- * synchronize_srcu_expedited() on one srcu_struct from some other
- * srcu_struct's read-side critical section.
- */
-void synchronize_srcu_expedited(struct srcu_struct *sp)
-{
-	__synchronize_srcu(sp, synchronize_sched_expedited);
-}
-EXPORT_SYMBOL_GPL(synchronize_srcu_expedited);
-
-/**
- * srcu_batches_completed - return batches completed.
- * @sp: srcu_struct on which to report batch completion.
- *
- * Report the number of batches, correlated with, but not necessarily
- * precisely the same as, the number of grace periods that have elapsed.
- */
+	call_srcu(sp, &waiter.head, synchronize_srcu_complete);
 
-long srcu_batches_completed(struct srcu_struct *sp)
-{
-	return sp->completed;
+	wait_for_completion(&waiter.wait);
 }
-EXPORT_SYMBOL_GPL(srcu_batches_completed);
+EXPORT_SYMBOL_GPL(synchronize_srcu);


--
To unsubscribe, send a message with 'unsubscribe linux-mm' in
the body to majordomo@xxxxxxxxx.  For more info on Linux MM,
see: http://www.linux-mm.org/ .
Fight unfair telecom internet charges in Canada: sign http://stopthemeter.ca/
Don't email: <a href=mailto:"dont@xxxxxxxxx";> email@xxxxxxxxx </a>


[Index of Archives]     [Linux ARM Kernel]     [Linux ARM]     [Linux Omap]     [Fedora ARM]     [IETF Annouce]     [Bugtraq]     [Linux]     [Linux OMAP]     [Linux MIPS]     [ECOS]     [Asterisk Internet PBX]     [Linux API]