Re: Behaviour of smp_mb__{before,after}_spin* and acquire/release

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Tue, Jan 20, 2015 at 09:35:45PM +0000, Paul E. McKenney wrote:
> On Tue, Jan 20, 2015 at 10:34:43AM +0100, Peter Zijlstra wrote:
> > On Tue, Jan 13, 2015 at 04:33:54PM +0000, Will Deacon wrote:
> > > I started dusting off a series I've been working to implement a relaxed
> > > atomic API in Linux (i.e. things like atomic_read(v, ACQUIRE)) but I'm
> > > having trouble making sense of the ordering semantics we have in mainline
> > > today:
> > 
> > >   2. Does smp_mb__after_unlock_lock order smp_store_release against
> > >      smp_load_acquire? Again, Documentation/memory-barriers.txt puts
> > >      these operations into the RELEASE and ACQUIRE classes respectively,
> > >      but since smp_mb__after_unlock_lock is a NOP everywhere other than
> > >      PowerPC, I don't think this is enforced by the current code. 
> > 
> > Yeah, wasn't Paul going to talk to Ben about that? PPC is the only arch
> > that has the weak ACQUIRE/RELEASE for its spinlocks.
> 
> I thought that you guys were going to propose something and we would see
> what the reaction was.  ;-)

Well, ripping it out is quite easy (patch below) :)

I didn't fully grok the comment in the MCS code, since mcs_lock uses an
xchg, which does have full barrier semantics and therefore mcs_unlock +
mcs_lock also has full barrier semantics afaict.

The remaining problem it that this puts PowerPC back into its original state
(before the barrier) where UNLOCK + LOCK doesn't imply a full barrier, so
that would need to be fixed.

Anyway, kicking this out there for comments.

Will

--->8

>From 23d1db44f20a12924d2addeb73231f10d90e9f05 Mon Sep 17 00:00:00 2001
From: Will Deacon <will.deacon@xxxxxxx>
Date: Fri, 23 Jan 2015 13:55:34 +0000
Subject: [PATCH] memory-barriers: remove smp_mb__after_unlock_lock()

smp_mb__after_unlock_lock is used to promote an UNLOCK + LOCK sequence
into a full memory barrier.

However:

  - This ordering guarantee is already provided without the barrier on
    all architectures apart from PowerPC

  - The barrier only applies to UNLOCK + LOCK, not general
    RELEASE + ACQUIRE operations

  - The barrier is not well used outside of RCU and, because it was
    retrofitted into the kernel, it's not clear whether other areas of
    the kernel are incorrectly relying on UNLOCK + LOCK implying a full
    barrier

This patch removes the barrier and instead requires architectures to
provide full barrier semantics for an UNLOCK + LOCK sequence.

TODO: add a barrier to PowerPC unlock?

Signed-off-by: Will Deacon <will.deacon@xxxxxxx>
---
 Documentation/memory-barriers.txt   | 77 ++-----------------------------------
 arch/powerpc/include/asm/spinlock.h |  2 -
 include/linux/spinlock.h            | 10 -----
 kernel/locking/mcs_spinlock.h       |  9 -----
 kernel/rcu/tree.c                   | 19 +--------
 kernel/rcu/tree_plugin.h            | 13 -------
 6 files changed, 4 insertions(+), 126 deletions(-)

diff --git a/Documentation/memory-barriers.txt b/Documentation/memory-barriers.txt
index 9c0e3c45a807..fed47737e4c6 100644
--- a/Documentation/memory-barriers.txt
+++ b/Documentation/memory-barriers.txt
@@ -1777,74 +1777,9 @@ RELEASE are to the same lock variable, but only from the perspective of
 another CPU not holding that lock.  In short, a ACQUIRE followed by an
 RELEASE may -not- be assumed to be a full memory barrier.
 
-Similarly, the reverse case of a RELEASE followed by an ACQUIRE does not
-imply a full memory barrier.  If it is necessary for a RELEASE-ACQUIRE
-pair to produce a full barrier, the ACQUIRE can be followed by an
-smp_mb__after_unlock_lock() invocation.  This will produce a full barrier
-if either (a) the RELEASE and the ACQUIRE are executed by the same
-CPU or task, or (b) the RELEASE and ACQUIRE act on the same variable.
-The smp_mb__after_unlock_lock() primitive is free on many architectures.
-Without smp_mb__after_unlock_lock(), the CPU's execution of the critical
-sections corresponding to the RELEASE and the ACQUIRE can cross, so that:
-
-	*A = a;
-	RELEASE M
-	ACQUIRE N
-	*B = b;
-
-could occur as:
-
-	ACQUIRE N, STORE *B, STORE *A, RELEASE M
-
-It might appear that this reordering could introduce a deadlock.
-However, this cannot happen because if such a deadlock threatened,
-the RELEASE would simply complete, thereby avoiding the deadlock.
-
-	Why does this work?
-
-	One key point is that we are only talking about the CPU doing
-	the reordering, not the compiler.  If the compiler (or, for
-	that matter, the developer) switched the operations, deadlock
-	-could- occur.
-
-	But suppose the CPU reordered the operations.  In this case,
-	the unlock precedes the lock in the assembly code.  The CPU
-	simply elected to try executing the later lock operation first.
-	If there is a deadlock, this lock operation will simply spin (or
-	try to sleep, but more on that later).	The CPU will eventually
-	execute the unlock operation (which preceded the lock operation
-	in the assembly code), which will unravel the potential deadlock,
-	allowing the lock operation to succeed.
-
-	But what if the lock is a sleeplock?  In that case, the code will
-	try to enter the scheduler, where it will eventually encounter
-	a memory barrier, which will force the earlier unlock operation
-	to complete, again unraveling the deadlock.  There might be
-	a sleep-unlock race, but the locking primitive needs to resolve
-	such races properly in any case.
-
-With smp_mb__after_unlock_lock(), the two critical sections cannot overlap.
-For example, with the following code, the store to *A will always be
-seen by other CPUs before the store to *B:
-
-	*A = a;
-	RELEASE M
-	ACQUIRE N
-	smp_mb__after_unlock_lock();
-	*B = b;
-
-The operations will always occur in one of the following orders:
-
-	STORE *A, RELEASE, ACQUIRE, smp_mb__after_unlock_lock(), STORE *B
-	STORE *A, ACQUIRE, RELEASE, smp_mb__after_unlock_lock(), STORE *B
-	ACQUIRE, STORE *A, RELEASE, smp_mb__after_unlock_lock(), STORE *B
-
-If the RELEASE and ACQUIRE were instead both operating on the same lock
-variable, only the first of these alternatives can occur.  In addition,
-the more strongly ordered systems may rule out some of the above orders.
-But in any case, as noted earlier, the smp_mb__after_unlock_lock()
-ensures that the store to *A will always be seen as happening before
-the store to *B.
+However, the reverse case of a RELEASE followed by an ACQUIRE _does_
+imply a full memory barrier when these accesses are performed as a pair
+of UNLOCK and LOCK operations, irrespective of the lock variable.
 
 Locks and semaphores may not provide any guarantee of ordering on UP compiled
 systems, and so cannot be counted on in such a situation to actually achieve
@@ -2087,7 +2022,6 @@ However, if the following occurs:
 	RELEASE M	     [1]
 	ACCESS_ONCE(*D) = d;		ACCESS_ONCE(*E) = e;
 					ACQUIRE M		     [2]
-					smp_mb__after_unlock_lock();
 					ACCESS_ONCE(*F) = f;
 					ACCESS_ONCE(*G) = g;
 					RELEASE M	     [2]
@@ -2105,11 +2039,6 @@ But assuming CPU 1 gets the lock first, CPU 3 won't see any of:
 	*F, *G or *H preceding ACQUIRE M [2]
 	*A, *B, *C, *E, *F or *G following RELEASE M [2]
 
-Note that the smp_mb__after_unlock_lock() is critically important
-here: Without it CPU 3 might see some of the above orderings.
-Without smp_mb__after_unlock_lock(), the accesses are not guaranteed
-to be seen in order unless CPU 3 holds lock M.
-
 
 ACQUIRES VS I/O ACCESSES
 ------------------------
diff --git a/arch/powerpc/include/asm/spinlock.h b/arch/powerpc/include/asm/spinlock.h
index 4dbe072eecbe..523673d7583c 100644
--- a/arch/powerpc/include/asm/spinlock.h
+++ b/arch/powerpc/include/asm/spinlock.h
@@ -28,8 +28,6 @@
 #include <asm/synch.h>
 #include <asm/ppc-opcode.h>
 
-#define smp_mb__after_unlock_lock()	smp_mb()  /* Full ordering for lock. */
-
 #ifdef CONFIG_PPC64
 /* use 0x800000yy when locked, where yy == CPU number */
 #ifdef __BIG_ENDIAN__
diff --git a/include/linux/spinlock.h b/include/linux/spinlock.h
index 262ba4ef9a8e..9e8ba39640ce 100644
--- a/include/linux/spinlock.h
+++ b/include/linux/spinlock.h
@@ -130,16 +130,6 @@ do {								\
 #define smp_mb__before_spinlock()	smp_wmb()
 #endif
 
-/*
- * Place this after a lock-acquisition primitive to guarantee that
- * an UNLOCK+LOCK pair act as a full barrier.  This guarantee applies
- * if the UNLOCK and LOCK are executed by the same CPU or if the
- * UNLOCK and LOCK operate on the same lock variable.
- */
-#ifndef smp_mb__after_unlock_lock
-#define smp_mb__after_unlock_lock()	do { } while (0)
-#endif
-
 /**
  * raw_spin_unlock_wait - wait until the spinlock gets unlocked
  * @lock: the spinlock in question.
diff --git a/kernel/locking/mcs_spinlock.h b/kernel/locking/mcs_spinlock.h
index 4d60986fcbee..6d60c9f7dc87 100644
--- a/kernel/locking/mcs_spinlock.h
+++ b/kernel/locking/mcs_spinlock.h
@@ -42,15 +42,6 @@ do {									\
 #endif
 
 /*
- * Note: the smp_load_acquire/smp_store_release pair is not
- * sufficient to form a full memory barrier across
- * cpus for many architectures (except x86) for mcs_unlock and mcs_lock.
- * For applications that need a full barrier across multiple cpus
- * with mcs_unlock and mcs_lock pair, smp_mb__after_unlock_lock() should be
- * used after mcs_lock.
- */
-
-/*
  * In order to acquire the lock, the caller should declare a local node and
  * pass a reference of the node to this function in addition to the lock.
  * If the lock has already been acquired, then this will proceed to spin
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index 7680fc275036..86595ead34ec 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -1322,10 +1322,8 @@ rcu_start_future_gp(struct rcu_node *rnp, struct rcu_data *rdp,
 	 * hold it, acquire the root rcu_node structure's lock in order to
 	 * start one (if needed).
 	 */
-	if (rnp != rnp_root) {
+	if (rnp != rnp_root)
 		raw_spin_lock(&rnp_root->lock);
-		smp_mb__after_unlock_lock();
-	}
 
 	/*
 	 * Get a new grace-period number.  If there really is no grace
@@ -1574,7 +1572,6 @@ static void note_gp_changes(struct rcu_state *rsp, struct rcu_data *rdp)
 		local_irq_restore(flags);
 		return;
 	}
-	smp_mb__after_unlock_lock();
 	needwake = __note_gp_changes(rsp, rnp, rdp);
 	raw_spin_unlock_irqrestore(&rnp->lock, flags);
 	if (needwake)
@@ -1591,7 +1588,6 @@ static int rcu_gp_init(struct rcu_state *rsp)
 
 	rcu_bind_gp_kthread();
 	raw_spin_lock_irq(&rnp->lock);
-	smp_mb__after_unlock_lock();
 	if (!ACCESS_ONCE(rsp->gp_flags)) {
 		/* Spurious wakeup, tell caller to go back to sleep.  */
 		raw_spin_unlock_irq(&rnp->lock);
@@ -1617,7 +1613,6 @@ static int rcu_gp_init(struct rcu_state *rsp)
 
 	/* Exclude any concurrent CPU-hotplug operations. */
 	mutex_lock(&rsp->onoff_mutex);
-	smp_mb__after_unlock_lock(); /* ->gpnum increment before GP! */
 
 	/*
 	 * Set the quiescent-state-needed bits in all the rcu_node
@@ -1634,7 +1629,6 @@ static int rcu_gp_init(struct rcu_state *rsp)
 	 */
 	rcu_for_each_node_breadth_first(rsp, rnp) {
 		raw_spin_lock_irq(&rnp->lock);
-		smp_mb__after_unlock_lock();
 		rdp = this_cpu_ptr(rsp->rda);
 		rcu_preempt_check_blocked_tasks(rnp);
 		rnp->qsmask = rnp->qsmaskinit;
@@ -1684,7 +1678,6 @@ static int rcu_gp_fqs(struct rcu_state *rsp, int fqs_state_in)
 	/* Clear flag to prevent immediate re-entry. */
 	if (ACCESS_ONCE(rsp->gp_flags) & RCU_GP_FLAG_FQS) {
 		raw_spin_lock_irq(&rnp->lock);
-		smp_mb__after_unlock_lock();
 		ACCESS_ONCE(rsp->gp_flags) =
 			ACCESS_ONCE(rsp->gp_flags) & ~RCU_GP_FLAG_FQS;
 		raw_spin_unlock_irq(&rnp->lock);
@@ -1704,7 +1697,6 @@ static void rcu_gp_cleanup(struct rcu_state *rsp)
 	struct rcu_node *rnp = rcu_get_root(rsp);
 
 	raw_spin_lock_irq(&rnp->lock);
-	smp_mb__after_unlock_lock();
 	gp_duration = jiffies - rsp->gp_start;
 	if (gp_duration > rsp->gp_max)
 		rsp->gp_max = gp_duration;
@@ -1730,7 +1722,6 @@ static void rcu_gp_cleanup(struct rcu_state *rsp)
 	 */
 	rcu_for_each_node_breadth_first(rsp, rnp) {
 		raw_spin_lock_irq(&rnp->lock);
-		smp_mb__after_unlock_lock();
 		ACCESS_ONCE(rnp->completed) = rsp->gpnum;
 		rdp = this_cpu_ptr(rsp->rda);
 		if (rnp == rdp->mynode)
@@ -1742,7 +1733,6 @@ static void rcu_gp_cleanup(struct rcu_state *rsp)
 	}
 	rnp = rcu_get_root(rsp);
 	raw_spin_lock_irq(&rnp->lock);
-	smp_mb__after_unlock_lock(); /* Order GP before ->completed update. */
 	rcu_nocb_gp_set(rnp, nocb);
 
 	/* Declare grace period done. */
@@ -1978,7 +1968,6 @@ rcu_report_qs_rnp(unsigned long mask, struct rcu_state *rsp,
 		rnp_c = rnp;
 		rnp = rnp->parent;
 		raw_spin_lock_irqsave(&rnp->lock, flags);
-		smp_mb__after_unlock_lock();
 		WARN_ON_ONCE(rnp_c->qsmask);
 	}
 
@@ -2009,7 +1998,6 @@ rcu_report_qs_rdp(int cpu, struct rcu_state *rsp, struct rcu_data *rdp)
 
 	rnp = rdp->mynode;
 	raw_spin_lock_irqsave(&rnp->lock, flags);
-	smp_mb__after_unlock_lock();
 	if (rdp->passed_quiesce == 0 || rdp->gpnum != rnp->gpnum ||
 	    rnp->completed == rnp->gpnum) {
 
@@ -2224,7 +2212,6 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
 	mask = rdp->grpmask;	/* rnp->grplo is constant. */
 	do {
 		raw_spin_lock(&rnp->lock);	/* irqs already disabled. */
-		smp_mb__after_unlock_lock();
 		rnp->qsmaskinit &= ~mask;
 		if (rnp->qsmaskinit != 0) {
 			if (rnp != rdp->mynode)
@@ -2437,7 +2424,6 @@ static void force_qs_rnp(struct rcu_state *rsp,
 		cond_resched_rcu_qs();
 		mask = 0;
 		raw_spin_lock_irqsave(&rnp->lock, flags);
-		smp_mb__after_unlock_lock();
 		if (!rcu_gp_in_progress(rsp)) {
 			raw_spin_unlock_irqrestore(&rnp->lock, flags);
 			return;
@@ -2467,7 +2453,6 @@ static void force_qs_rnp(struct rcu_state *rsp,
 	rnp = rcu_get_root(rsp);
 	if (rnp->qsmask == 0) {
 		raw_spin_lock_irqsave(&rnp->lock, flags);
-		smp_mb__after_unlock_lock();
 		rcu_initiate_boost(rnp, flags); /* releases rnp->lock. */
 	}
 }
@@ -2500,7 +2485,6 @@ static void force_quiescent_state(struct rcu_state *rsp)
 
 	/* Reached the root of the rcu_node tree, acquire lock. */
 	raw_spin_lock_irqsave(&rnp_old->lock, flags);
-	smp_mb__after_unlock_lock();
 	raw_spin_unlock(&rnp_old->fqslock);
 	if (ACCESS_ONCE(rsp->gp_flags) & RCU_GP_FLAG_FQS) {
 		rsp->n_force_qs_lh++;
@@ -2625,7 +2609,6 @@ static void __call_rcu_core(struct rcu_state *rsp, struct rcu_data *rdp,
 			struct rcu_node *rnp_root = rcu_get_root(rsp);
 
 			raw_spin_lock(&rnp_root->lock);
-			smp_mb__after_unlock_lock();
 			needwake = rcu_start_gp(rsp);
 			raw_spin_unlock(&rnp_root->lock);
 			if (needwake)
diff --git a/kernel/rcu/tree_plugin.h b/kernel/rcu/tree_plugin.h
index 3ec85cb5d544..a89523605545 100644
--- a/kernel/rcu/tree_plugin.h
+++ b/kernel/rcu/tree_plugin.h
@@ -180,7 +180,6 @@ static void rcu_preempt_note_context_switch(void)
 		rdp = this_cpu_ptr(rcu_preempt_state.rda);
 		rnp = rdp->mynode;
 		raw_spin_lock_irqsave(&rnp->lock, flags);
-		smp_mb__after_unlock_lock();
 		t->rcu_read_unlock_special.b.blocked = true;
 		t->rcu_blocked_node = rnp;
 
@@ -287,7 +286,6 @@ static void rcu_report_unblock_qs_rnp(struct rcu_node *rnp, unsigned long flags)
 	mask = rnp->grpmask;
 	raw_spin_unlock(&rnp->lock);	/* irqs remain disabled. */
 	raw_spin_lock(&rnp_p->lock);	/* irqs already disabled. */
-	smp_mb__after_unlock_lock();
 	rcu_report_qs_rnp(mask, &rcu_preempt_state, rnp_p, flags);
 }
 
@@ -362,7 +360,6 @@ void rcu_read_unlock_special(struct task_struct *t)
 		for (;;) {
 			rnp = t->rcu_blocked_node;
 			raw_spin_lock(&rnp->lock);  /* irqs already disabled. */
-			smp_mb__after_unlock_lock();
 			if (rnp == t->rcu_blocked_node)
 				break;
 			raw_spin_unlock(&rnp->lock); /* irqs remain disabled. */
@@ -576,7 +573,6 @@ static int rcu_preempt_offline_tasks(struct rcu_state *rsp,
 	while (!list_empty(lp)) {
 		t = list_entry(lp->next, typeof(*t), rcu_node_entry);
 		raw_spin_lock(&rnp_root->lock); /* irqs already disabled */
-		smp_mb__after_unlock_lock();
 		list_del(&t->rcu_node_entry);
 		t->rcu_blocked_node = rnp_root;
 		list_add(&t->rcu_node_entry, lp_root);
@@ -601,7 +597,6 @@ static int rcu_preempt_offline_tasks(struct rcu_state *rsp,
 	 * in this case.
 	 */
 	raw_spin_lock(&rnp_root->lock); /* irqs already disabled */
-	smp_mb__after_unlock_lock();
 	if (rnp_root->boost_tasks != NULL &&
 	    rnp_root->boost_tasks != rnp_root->gp_tasks &&
 	    rnp_root->boost_tasks != rnp_root->exp_tasks)
@@ -732,7 +727,6 @@ static void rcu_report_exp_rnp(struct rcu_state *rsp, struct rcu_node *rnp,
 	unsigned long mask;
 
 	raw_spin_lock_irqsave(&rnp->lock, flags);
-	smp_mb__after_unlock_lock();
 	for (;;) {
 		if (!sync_rcu_preempt_exp_done(rnp)) {
 			raw_spin_unlock_irqrestore(&rnp->lock, flags);
@@ -750,7 +744,6 @@ static void rcu_report_exp_rnp(struct rcu_state *rsp, struct rcu_node *rnp,
 		raw_spin_unlock(&rnp->lock); /* irqs remain disabled */
 		rnp = rnp->parent;
 		raw_spin_lock(&rnp->lock); /* irqs already disabled */
-		smp_mb__after_unlock_lock();
 		rnp->expmask &= ~mask;
 	}
 }
@@ -770,7 +763,6 @@ sync_rcu_preempt_exp_init(struct rcu_state *rsp, struct rcu_node *rnp)
 	int must_wait = 0;
 
 	raw_spin_lock_irqsave(&rnp->lock, flags);
-	smp_mb__after_unlock_lock();
 	if (list_empty(&rnp->blkd_tasks)) {
 		raw_spin_unlock_irqrestore(&rnp->lock, flags);
 	} else {
@@ -850,7 +842,6 @@ void synchronize_rcu_expedited(void)
 	/* Initialize ->expmask for all non-leaf rcu_node structures. */
 	rcu_for_each_nonleaf_node_breadth_first(rsp, rnp) {
 		raw_spin_lock_irqsave(&rnp->lock, flags);
-		smp_mb__after_unlock_lock();
 		rnp->expmask = rnp->qsmaskinit;
 		raw_spin_unlock_irqrestore(&rnp->lock, flags);
 	}
@@ -1131,7 +1122,6 @@ static int rcu_boost(struct rcu_node *rnp)
 		return 0;  /* Nothing left to boost. */
 
 	raw_spin_lock_irqsave(&rnp->lock, flags);
-	smp_mb__after_unlock_lock();
 
 	/*
 	 * Recheck under the lock: all tasks in need of boosting
@@ -1323,7 +1313,6 @@ static int rcu_spawn_one_boost_kthread(struct rcu_state *rsp,
 	if (IS_ERR(t))
 		return PTR_ERR(t);
 	raw_spin_lock_irqsave(&rnp->lock, flags);
-	smp_mb__after_unlock_lock();
 	rnp->boost_kthread_task = t;
 	raw_spin_unlock_irqrestore(&rnp->lock, flags);
 	sp.sched_priority = kthread_prio;
@@ -1717,7 +1706,6 @@ static void rcu_prepare_for_idle(void)
 			continue;
 		rnp = rdp->mynode;
 		raw_spin_lock(&rnp->lock); /* irqs already disabled. */
-		smp_mb__after_unlock_lock();
 		needwake = rcu_accelerate_cbs(rsp, rnp, rdp);
 		raw_spin_unlock(&rnp->lock); /* irqs remain disabled. */
 		if (needwake)
@@ -2226,7 +2214,6 @@ static void rcu_nocb_wait_gp(struct rcu_data *rdp)
 	struct rcu_node *rnp = rdp->mynode;
 
 	raw_spin_lock_irqsave(&rnp->lock, flags);
-	smp_mb__after_unlock_lock();
 	needwake = rcu_start_future_gp(rnp, rdp, &c);
 	raw_spin_unlock_irqrestore(&rnp->lock, flags);
 	if (needwake)
-- 
2.1.4

--
To unsubscribe from this list: send the line "unsubscribe linux-arch" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html




[Index of Archives]     [Linux Kernel]     [Kernel Newbies]     [x86 Platform Driver]     [Netdev]     [Linux Wireless]     [Netfilter]     [Bugtraq]     [Linux Filesystems]     [Yosemite Discussion]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux RAID]     [Samba]     [Device Mapper]

  Powered by Linux