Re: [PATCH -v4 5/7] locking, arch: Update spin_unlock_wait()

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Thu, Jun 02, 2016 at 06:57:00PM +0100, Will Deacon wrote:
> > This 'replaces' commit:
> > 
> >   54cf809b9512 ("locking,qspinlock: Fix spin_is_locked() and spin_unlock_wait()")
> > 
> > and seems to still work with the test case from that thread while
> > getting rid of the extra barriers.

New version :-)

This one has moar comments; and also tries to address an issue with
xchg_tail(), which is its own consumer. Paul, Will said you'd love the
address dependency through union members :-)

(I should split this in at least 3 patches I suppose)

ARM64 and PPC should provide private versions of is_locked and
unlock_wait; because while this one deals with the unordered store as
per qspinlock construction, it still relies on cmpxchg_acquire()'s store
being visible.

---
 include/asm-generic/qspinlock.h |  40 ++++---------
 kernel/locking/qspinlock.c      | 126 ++++++++++++++++++++++++++++++++++++++++
 2 files changed, 137 insertions(+), 29 deletions(-)

diff --git a/include/asm-generic/qspinlock.h b/include/asm-generic/qspinlock.h
index 6bd05700d8c9..3020bdb7a43c 100644
--- a/include/asm-generic/qspinlock.h
+++ b/include/asm-generic/qspinlock.h
@@ -26,33 +26,18 @@
  * @lock: Pointer to queued spinlock structure
  * Return: 1 if it is locked, 0 otherwise
  */
+#ifndef queued_spin_is_locked
 static __always_inline int queued_spin_is_locked(struct qspinlock *lock)
 {
 	/*
-	 * queued_spin_lock_slowpath() can ACQUIRE the lock before
-	 * issuing the unordered store that sets _Q_LOCKED_VAL.
+	 * See queued_spin_unlock_wait().
 	 *
-	 * See both smp_cond_acquire() sites for more detail.
-	 *
-	 * This however means that in code like:
-	 *
-	 *   spin_lock(A)		spin_lock(B)
-	 *   spin_unlock_wait(B)	spin_is_locked(A)
-	 *   do_something()		do_something()
-	 *
-	 * Both CPUs can end up running do_something() because the store
-	 * setting _Q_LOCKED_VAL will pass through the loads in
-	 * spin_unlock_wait() and/or spin_is_locked().
-	 *
-	 * Avoid this by issuing a full memory barrier between the spin_lock()
-	 * and the loads in spin_unlock_wait() and spin_is_locked().
-	 *
-	 * Note that regular mutual exclusion doesn't care about this
-	 * delayed store.
+	 * Any !0 state indicates it is locked, even if _Q_LOCKED_VAL
+	 * isn't immediately observable.
 	 */
-	smp_mb();
-	return atomic_read(&lock->val) & _Q_LOCKED_MASK;
+	return !!atomic_read(&lock->val);
 }
+#endif
 
 /**
  * queued_spin_value_unlocked - is the spinlock structure unlocked?
@@ -78,6 +63,7 @@ static __always_inline int queued_spin_is_contended(struct qspinlock *lock)
 {
 	return atomic_read(&lock->val) & ~_Q_LOCKED_MASK;
 }
+
 /**
  * queued_spin_trylock - try to acquire the queued spinlock
  * @lock : Pointer to queued spinlock structure
@@ -123,19 +109,15 @@ static __always_inline void queued_spin_unlock(struct qspinlock *lock)
 #endif
 
 /**
- * queued_spin_unlock_wait - wait until current lock holder releases the lock
+ * queued_spin_unlock_wait - wait until the _current_ lock holder releases the lock
  * @lock : Pointer to queued spinlock structure
  *
  * There is a very slight possibility of live-lock if the lockers keep coming
  * and the waiter is just unfortunate enough to not see any unlock state.
  */
-static inline void queued_spin_unlock_wait(struct qspinlock *lock)
-{
-	/* See queued_spin_is_locked() */
-	smp_mb();
-	while (atomic_read(&lock->val) & _Q_LOCKED_MASK)
-		cpu_relax();
-}
+#ifndef queued_spin_unlock_wait
+void queued_spin_unlock_wait(struct qspinlock *lock);
+#endif
 
 #ifndef virt_spin_lock
 static __always_inline bool virt_spin_lock(struct qspinlock *lock)
diff --git a/kernel/locking/qspinlock.c b/kernel/locking/qspinlock.c
index ce2f75e32ae1..e1c29d352e0e 100644
--- a/kernel/locking/qspinlock.c
+++ b/kernel/locking/qspinlock.c
@@ -395,6 +395,8 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
 	 * pending stuff.
 	 *
 	 * p,*,* -> n,*,*
+	 *
+	 * RELEASE, such that the stores to @node must be complete.
 	 */
 	old = xchg_tail(lock, tail);
 	next = NULL;
@@ -405,6 +407,15 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
 	 */
 	if (old & _Q_TAIL_MASK) {
 		prev = decode_tail(old);
+		/*
+		 * The above xchg_tail() is also load of @lock which generates,
+		 * through decode_tail(), a pointer.
+		 *
+		 * The address dependency matches the RELEASE of xchg_tail()
+		 * such that the access to @prev must happen after.
+		 */
+		smp_read_barrier_depends();
+
 		WRITE_ONCE(prev->next, node);
 
 		pv_wait_node(node, prev);
@@ -496,6 +507,121 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
 EXPORT_SYMBOL(queued_spin_lock_slowpath);
 
 /*
+ * PROBLEM: some architectures have an interesting issue with atomic ACQUIRE
+ * operations in that the ACQUIRE applies to the LOAD _not_ the STORE (ARM64,
+ * PPC). Also qspinlock has a similar issue per construction, the setting of
+ * the locked byte can be unordered acquiring the lock proper.
+ *
+ * This gets to be 'interesting' in the following cases, where the /should/s
+ * end up false because of this issue.
+ *
+ *
+ * CASE 1:
+ *
+ * So the spin_is_locked() correctness issue comes from something like:
+ *
+ *   CPU0				CPU1
+ *
+ *   global_lock();			local_lock(i)
+ *     spin_lock(&G)			  spin_lock(&L[i])
+ *     for (i)				  if (!spin_is_locked(&G)) {
+ *       spin_unlock_wait(&L[i]);	    smp_acquire__after_ctrl_dep();
+ * 					    return;
+ * 					  }
+ * 					  // deal with fail
+ *
+ * Where it is important CPU1 sees G locked or CPU0 sees L[i] locked such
+ * that there is exclusion between the two critical sections.
+ *
+ * The load from spin_is_locked(&G) /should/ be constrained by the ACQUIRE from
+ * spin_lock(&L[i]), and similarly the load(s) from spin_unlock_wait(&L[i])
+ * /should/ be constrained by the ACQUIRE from spin_lock(&G).
+ *
+ * Similarly, later stuff is constrained by the ACQUIRE from CTRL+RMB.
+ *
+ *
+ * CASE 2:
+ *
+ * For spin_unlock_wait() there is a second correctness issue, namely:
+ *
+ *   CPU0				CPU1
+ *
+ *   flag = set;
+ *   smp_mb();				spin_lock(&l)
+ *   spin_unlock_wait(&l);		if (!flag)
+ *					  // add to lockless list
+ * 					spin_unlock(&l);
+ *   // iterate lockless list
+ *
+ * Which wants to ensure that CPU1 will stop adding bits to the list and CPU0
+ * will observe the last entry on the list (if spin_unlock_wait() had ACQUIRE
+ * semantics etc..)
+ *
+ * Where flag /should/ be ordered against the locked store of l.
+ */
+
+
+/*
+ * queued_spin_lock_slowpath() can (load-)ACQUIRE the lock before
+ * issuing an _unordered_ store to set _Q_LOCKED_VAL.
+ *
+ * This means that the store can be delayed, but no later than the
+ * store-release from the unlock. This means that simply observing
+ * _Q_LOCKED_VAL is not sufficient to determine if the lock is acquired.
+ *
+ * There are two paths that can issue the unordered store:
+ *
+ *  (1) clear_pending_set_locked():	*,1,0 -> *,0,1
+ *
+ *  (2) set_locked():			t,0,0 -> t,0,1 ; t != 0
+ *      atomic_cmpxchg_relaxed():	t,0,0 -> 0,0,1
+ *
+ * However, in both cases we have other !0 state we've set before to queue
+ * ourseves:
+ *
+ * For (1) we have the atomic_cmpxchg_acquire() that set _Q_PENDING_VAL, our
+ * load is constrained by that ACQUIRE to not pass before that, and thus must
+ * observe the store.
+ *
+ * For (2) we have a more intersting scenario. We enqueue ourselves using
+ * xchg_tail(), which ends up being a RELEASE. This in itself is not
+ * sufficient, however that is followed by an smp_cond_acquire() on the same
+ * word, giving a RELEASE->ACQUIRE ordering. This again constrains our load and
+ * guarantees we must observe that store.
+ *
+ * Therefore both cases have other !0 state that is observable before the
+ * unordered locked byte store comes through. This means we can use that to
+ * wait for the lock store, and then wait for an unlock.
+ */
+#ifndef queued_spin_unlock_wait
+void queued_spin_unlock_wait(struct qspinlock *lock)
+{
+	u32 val;
+
+	for (;;) {
+		val = atomic_read(&lock->val);
+
+		if (!val) /* not locked, we're done */
+			goto done;
+
+		if (val & _Q_LOCKED_MASK) /* locked, go wait for unlock */
+			break;
+
+		/* not locked, but pending, wait until we observe the lock */
+		cpu_relax();
+	}
+
+	/* any unlock is good */
+	while (atomic_read(&lock->val) & _Q_LOCKED_MASK)
+		cpu_relax();
+
+done:
+	smp_rmb();
+}
+EXPORT_SYMBOL(queued_spin_unlock_wait);
+#endif
+
+/*
  * Generate the paravirt code for queued_spin_unlock_slowpath().
  */
 #if !defined(_GEN_PV_LOCK_SLOWPATH) && defined(CONFIG_PARAVIRT_SPINLOCKS)
--
To unsubscribe from this list: send the line "unsubscribe netfilter-devel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html



[Index of Archives]     [Netfitler Users]     [LARTC]     [Bugtraq]     [Yosemite Forum]

  Powered by Linux