[RFC PATCH 2/5] futex: add optimistic spinning to FUTEX_SPIN_LOCK

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



This patch adds code to do optimistic spinning for the FUTEX_SPIN_LOCK
primitive on the futex value when the lock owner is running. It is
the same optimistic spinning technique that is done in the mutex and
rw semaphore code to improve their performance especially on large
systems with large number of CPUs. When the lock owner is not running,
the spinning tasks will go to sleep.

There is 2 major advantages of doing optimistic spinning here:
 1) It eliminates the context switching latency and overhead (at
    least a few us) associated with sleeping and wakeup.
 2) It eliminates most of the need to call futex(2) with
    FUTEX_SPIN_UNLOCK as spinning is done without the need to set
    the FUTEX_WAITERS bit.

Active spinning, however, does consume time in the current quantum of
time slice, make it a bit more likely to be preempted while running
in the critcal section due to time slice expiration. The heavy spinlock
contention of a wait-wake futex has the same effect. So it is not
specific
to this new primitive.

With the addition of optimistic spinning, it can significantly speed
up the amount of mutex operations that can be done in a certain unit
of time. With a userspace mutex microbenchmark running 10 million
mutex operations with 256 threads on a 4-socket 40-core server, the
spinning futex can achieve a rate of about 4.9 Mops/s with a critical
section load of 10 pause instructions. Whereas the wait-wake futex can
only achieve a rate of 3.7 Mops/s. When increasing the load to 100,
the corresponding rates become 2.8 Mops/s and 0.8 Mops/s respectively.

Signed-off-by: Waiman Long <Waiman.Long@xxxxxx>
---
 kernel/futex.c |  190 ++++++++++++++++++++++++++++++++++++++++++++++++++-----
 1 files changed, 172 insertions(+), 18 deletions(-)

diff --git a/kernel/futex.c b/kernel/futex.c
index ec9b6ee..ddc24d1 100644
--- a/kernel/futex.c
+++ b/kernel/futex.c
@@ -71,6 +71,7 @@
 #include <asm/futex.h>
 
 #include "locking/rtmutex_common.h"
+#include "locking/mcs_spinlock.h"
 
 /*
  * READ this before attempting to hack on futexes!
@@ -2995,30 +2996,51 @@ void exit_robust_list(struct task_struct *curr)
 #define FUTEX_TID(u)		(pid_t)((u) & FUTEX_TID_MASK)
 #define FUTEX_HAS_WAITERS(u)	((u) & FUTEX_WAITERS)
 
+/*
+ * Bit usage of the locker count:
+ * bit  0-23: number of lockers (spinners + waiters)
+ * bit 24-30: number of spinners
+ */
+#define FUTEX_SPINCNT_MAX	64	/* Maximum # of spinners */
+#define FUTEX_SPINCNT_SHIFT	24
+#define FUTEX_SPINCNT_BIAS	(1U << FUTEX_SPINCNT_SHIFT)
+#define FUTEX_LOCKCNT_MASK	(FUTEX_SPINCNT_BIAS - 1)
+#define FUTEX_LOCKCNT(qh)	(atomic_read(&(qh)->lcnt) & FUTEX_LOCKCNT_MASK)
+#define FUTEX_SPINCNT(qh)	(atomic_read(&(qh)->lcnt)>>FUTEX_SPINCNT_SHIFT)
+
 /**
  * struct futex_q_head - head of the optspin futex queue, one per unique key
  * @hnode:	list entry from the hash bucket
  * @waitq:	a list of waiting tasks
  * @key:	the key the futex is hashed on
+ * @osq:	pointer to optimisitic spinning queue
+ * @owner:	task_struct pointer of the futex owner
+ * @otid:	TID of the futex owner
  * @wlock:	spinlock for managing wait queue
- * @lcnt:	locker count
+ * @lcnt:	locker count (spinners + waiters)
  *
  * Locking sequence
  * ----------------
  * 1) Lock hash bucket spinlock, locate the futex queue head
  * 2) Inc lcnt (lock) or read lcnt (unlock), release hash bucket spinlock
- * 3) For waiter:
+ * 3) For spinner:
+ *    - enqueue into the spinner queue and wait for its turn.
+ * 4) For waiter:
  *    - lock futex queue head spinlock
  *    - enqueue into the wait queue
  *    - release the lock & sleep
- * 4) For unlocker:
+ * 5) For unlocker:
  *    - locate the queue head just like a locker does
- *    - Take the queue head lock and wake up the first waiter there.
+ *    - clear the owner field if it is the current owner
+ *    - if the locker count is not 0 & osq is empty, take the queue head lock
+ *      and wake up the first waiter.
  */
 struct futex_q_head {
 	struct list_head	      hnode;
 	struct list_head	      waitq;
 	union futex_key		      key;
+	struct optimistic_spin_queue *osq;
+	struct task_struct	     *owner;
 	pid_t			      otid;
 	spinlock_t		      wlock;
 	atomic_t		      lcnt;
@@ -3034,6 +3056,13 @@ struct futex_q_node {
 	struct task_struct     *task;
 };
 
+/*
+ * The maximum number of tasks that can be a futex spin queue
+ *
+ * It is set to the lesser of half of the total number of CPUs and
+ * FUTEX_SPINCNT_MAX to avoid locking up all the CPUs in spinning.
+ */
+static int __read_mostly futex_spincnt_max;
 
 /*
  * find_qhead - Find a queue head structure with the matching key
@@ -3061,7 +3090,7 @@ find_qhead(struct futex_hash_bucket *hb, union futex_key *key)
  * contention with no hash bucket collision.
  */
 static inline struct futex_q_head *
-qhead_alloc_init(struct futex_hash_bucket *hb, union futex_key *key)
+qhead_alloc_init(struct futex_hash_bucket *hb, union futex_key *key, u32 uval)
 {
 	struct futex_q_head *qh = NULL;
 	static const struct futex_q_head qh0 = { { 0 } };
@@ -3073,10 +3102,16 @@ qhead_alloc_init(struct futex_hash_bucket *hb, union futex_key *key)
 
 	/*
 	 * Initialize the queue head structure
+	 * The lock owner field may be NULL if the task has released the lock
+	 * and exit.
 	 */
 	if (qh) {
-		*qh = qh0;
-		qh->key = *key;
+		*qh	  = qh0;
+		qh->key   = *key;
+		qh->otid  = FUTEX_TID(uval);
+		qh->owner = futex_find_get_task(qh->otid);
+		if (unlikely(!qh->owner))
+			qh->otid = 0;
 		atomic_set(&qh->lcnt, 1);
 		INIT_LIST_HEAD(&qh->waitq);
 		spin_lock_init(&qh->wlock);
@@ -3120,9 +3155,11 @@ qhead_free(struct futex_q_head *qh, struct futex_hash_bucket *hb)
 	/*
 	 * Free the queue head structure
 	 */
-	BUG_ON(!list_empty(&qh->waitq));
+	BUG_ON(!list_empty(&qh->waitq) || qh->osq);
 	list_del(&qh->hnode);
 	spin_unlock(&hb->lock);
+	if (qh->owner)
+		put_task_struct(qh->owner);
 
 	if (!hb->qhcache && (cmpxchg(&hb->qhcache, NULL, qh) == NULL))
 		return;
@@ -3134,14 +3171,19 @@ qhead_free(struct futex_q_head *qh, struct futex_hash_bucket *hb)
  * Return: 1 if successful or an error happen
  *	   0 otherwise
  *
+ * Optimistic spinning is done without holding lock, but with page fault
+ * explicitly disabled. So different functions need to be used to access
+ * the userspace futex value.
+ *
  * Side effect: The uval and ret will be updated.
  */
 static inline int futex_spin_trylock(u32 __user *uaddr, u32 *puval,
-				       int *pret, u32 vpid)
+				     int *pret, u32 vpid, bool spinning)
 {
-	u32	  old;
+	u32 old;
 
-	*pret = get_futex_value_locked(puval, uaddr);
+	*pret = spinning ? __copy_from_user_inatomic(puval, uaddr, sizeof(u32))
+			 : get_futex_value_locked(puval, uaddr);
 	if (*pret)
 		return 1;
 
@@ -3150,18 +3192,102 @@ static inline int futex_spin_trylock(u32 __user *uaddr, u32 *puval,
 
 	old = *puval;
 
-	*pret = cmpxchg_futex_value_locked(puval, uaddr, old, vpid | old);
+	*pret = spinning
+	      ? futex_atomic_cmpxchg_inatomic(puval, uaddr, old, vpid)
+	      : cmpxchg_futex_value_locked(puval, uaddr, old, vpid | old);
+
 	if (*pret)
 		return 1;
 	if (*puval == old) {
 		/* Adjust uval to reflect current value */
-		*puval = vpid | old;
+		*puval = spinning ? vpid : (vpid | old);
 		return 1;
 	}
 	return 0;
 }
 
 /*
+ * futex_optspin - optimistic spinning loop
+ * Return: 1 if lock successfully acquired
+ *	   0 if need to fall back to waiting
+ *
+ * Page fault and preemption are disabled in the optimistic spinning
+ * loop. Preemption should have been disabled before calling this function.
+ *
+ * The number of spinners may temporarily exceed the threshold due to
+ * racing (the spin count check and add aren't atomic), but that shouldn't
+ * be a big problem.
+ */
+static inline int futex_optspin(struct futex_q_head *qh,
+				struct futex_q_node *qn,
+				u32 __user	    *uaddr,
+				u32		     vpid)
+{
+	u32 uval;
+	int ret, gotlock = false;
+	struct task_struct *owner;
+
+	/*
+	 * Increment the spinner count
+	 */
+	atomic_add(FUTEX_SPINCNT_BIAS, &qh->lcnt);
+	if (!osq_lock(&qh->osq)) {
+		atomic_sub(FUTEX_SPINCNT_BIAS, &qh->lcnt);
+		return gotlock;
+	}
+	pagefault_disable();
+	for (;; cpu_relax()) {
+		if (futex_spin_trylock(uaddr, &uval, &ret, vpid, true)) {
+			/*
+			 * Fall back to waiting if an error happen
+			 */
+			if (ret)
+				break;
+			qh->otid  = vpid;
+			owner     = xchg(&qh->owner, qn->task);
+			get_task_struct(qn->task);
+			if (owner)
+				put_task_struct(owner);
+			gotlock = true;
+			break;
+		} else if (unlikely(FUTEX_HAS_WAITERS(uval))) {
+			/*
+			 * Try to turn off the waiter bit as it now has a
+			 * spinner. It doesn't matter if it fails as it will
+			 * try again in the next iteration.
+			 */
+			(void)futex_atomic_cmpxchg_inatomic
+			      (&uval, uaddr, uval, uval & ~FUTEX_WAITERS);
+		}
+
+		if (unlikely(FUTEX_TID(uval) != qh->otid)) {
+			/*
+			 * Owner has changed
+			 */
+			qh->otid = FUTEX_TID(uval);
+			owner = xchg(&qh->owner, futex_find_get_task(qh->otid));
+			if (owner)
+				put_task_struct(owner);
+		}
+		owner = ACCESS_ONCE(qh->owner);
+		if ((owner && !ACCESS_ONCE(owner->on_cpu)) || need_resched())
+			break;
+	}
+	pagefault_enable();
+	osq_unlock(&qh->osq);
+	atomic_sub(FUTEX_SPINCNT_BIAS, &qh->lcnt);
+
+	/*
+	 * If we fell out of the spin path because of need_resched(),
+	 * reschedule now.
+	 */
+	if (!gotlock && need_resched())
+		schedule_preempt_disabled();
+
+	return gotlock;
+}
+
+/*
  * futex_spin_lock
  */
 static noinline int futex_spin_lock(u32 __user *uaddr, unsigned int flags)
@@ -3170,6 +3296,7 @@ static noinline int futex_spin_lock(u32 __user *uaddr, unsigned int flags)
 	struct futex_q_head	 *qh = NULL;
 	struct futex_q_node	  qnode;
 	union futex_key		  key;
+	struct task_struct	 *owner;
 	bool			  gotlock;
 	int			  ret, cnt;
 	u32			  uval, vpid, old;
@@ -3193,7 +3320,7 @@ static noinline int futex_spin_lock(u32 __user *uaddr, unsigned int flags)
 	 * Check the futex value under the hash bucket lock as it might
 	 * be changed.
 	 */
-	if (futex_spin_trylock(uaddr, &uval, &ret, vpid))
+	if (futex_spin_trylock(uaddr, &uval, &ret, vpid, false))
 		goto hbunlock_out;
 
 	if (!qh) {
@@ -3201,7 +3328,7 @@ static noinline int futex_spin_lock(u32 __user *uaddr, unsigned int flags)
 		 * First waiter:
 		 * Allocate a queue head structure & initialize it
 		 */
-		qh = qhead_alloc_init(hb, &key);
+		qh = qhead_alloc_init(hb, &key, uval);
 		if (unlikely(!qh)) {
 			ret = -ENOMEM;
 			goto hbunlock_out;
@@ -3212,9 +3339,18 @@ static noinline int futex_spin_lock(u32 __user *uaddr, unsigned int flags)
 	spin_unlock(&hb->lock);
 
 	/*
-	 * Put the task into the wait queue and sleep
+	 * Perform optimisitic spinning if the owner is running.
 	 */
 	preempt_disable();
+	owner = ACCESS_ONCE(qh->owner);
+	if ((FUTEX_SPINCNT(qh) < futex_spincnt_max) &&
+	    (!owner || owner->on_cpu))
+		if (futex_optspin(qh, &qnode, uaddr, vpid))
+			goto penable_out;
+
+	/*
+	 * Put the task into the wait queue and sleep
+	 */
 	get_task_struct(qnode.task);
 	spin_lock(&qh->wlock);
 	list_add_tail(&qnode.wnode, &qh->waitq);
@@ -3238,6 +3374,11 @@ static noinline int futex_spin_lock(u32 __user *uaddr, unsigned int flags)
 					goto dequeue;
 				} else if (uval == old) {
 					gotlock = true;
+					qh->otid = vpid;
+					owner = xchg(&qh->owner, qnode.task);
+					get_task_struct(qnode.task);
+					if (owner)
+						put_task_struct(owner);
 					goto dequeue;
 				}
 				continue;
@@ -3286,15 +3427,17 @@ dequeue:
 		}
 	}
 	spin_unlock(&qh->wlock);
+
+penable_out:
 	preempt_enable();
 
 	cnt = atomic_dec_return(&qh->lcnt);
 	if (cnt == 0)
 		qhead_free(qh, hb);
 	/*
-	 * Need to set the waiters bit there are still waiters
+	 * Need to set the waiters bit there no spinner running
 	 */
-	else if (!ret)
+	else if (!ret && ((cnt >> FUTEX_SPINCNT_SHIFT) == 0))
 		ret = put_user(vpid | FUTEX_WAITERS, uaddr);
 out:
 	put_futex_key(&key);
@@ -3356,6 +3499,13 @@ static noinline int futex_spin_unlock(u32 __user *uaddr, unsigned int flags)
 	}
 
 	/*
+	 * Clear the owner field
+	 */
+	if ((qh->owner == current) &&
+	    (cmpxchg(&qh->owner, current, NULL) == current))
+		put_task_struct(current);
+
+	/*
 	 * The hash bucket lock is being hold while retrieving the task
 	 * structure from the queue head to make sure that the qh structure
 	 * won't go away under the hood.
@@ -3520,6 +3670,10 @@ static int __init futex_init(void)
 
 	futex_detect_cmpxchg();
 
+	futex_spincnt_max = num_possible_cpus()/2;
+	if (futex_spincnt_max > FUTEX_SPINCNT_MAX)
+		futex_spincnt_max = FUTEX_SPINCNT_MAX;
+
 	for (i = 0; i < futex_hashsize; i++) {
 		atomic_set(&futex_queues[i].waiters, 0);
 		plist_head_init(&futex_queues[i].chain);
-- 
1.7.1

--
To unsubscribe from this list: send the line "unsubscribe linux-api" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html




[Index of Archives]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]

  Powered by Linux