Re: [PATCH 05/11] qspinlock: Optimize for smaller NR_CPUS

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Sun, Jun 15, 2014 at 02:47:02PM +0200, Peter Zijlstra wrote:
> From: Peter Zijlstra <peterz@xxxxxxxxxxxxx>
> 
> When we allow for a max NR_CPUS < 2^14 we can optimize the pending
> wait-acquire and the xchg_tail() operations.
> 
> By growing the pending bit to a byte, we reduce the tail to 16bit.
> This means we can use xchg16 for the tail part and do away with all
> the repeated compxchg() operations.
> 
> This in turn allows us to unconditionally acquire; the locked state
> as observed by the wait loops cannot change. And because both locked
> and pending are now a full byte we can use simple stores for the
> state transition, obviating one atomic operation entirely.

I have to ask - how much more performance do you get from this?

Is this extra atomic operation hurting that much?
> 
> All this is horribly broken on Alpha pre EV56 (and any other arch that
> cannot do single-copy atomic byte stores).
> 
> Signed-off-by: Peter Zijlstra <peterz@xxxxxxxxxxxxx>
> ---
>  include/asm-generic/qspinlock_types.h |   13 ++++
>  kernel/locking/qspinlock.c            |  103 ++++++++++++++++++++++++++++++----
>  2 files changed, 106 insertions(+), 10 deletions(-)
> 
> --- a/include/asm-generic/qspinlock_types.h
> +++ b/include/asm-generic/qspinlock_types.h
> @@ -38,6 +38,14 @@ typedef struct qspinlock {
>  /*
>   * Bitfields in the atomic value:
>   *
> + * When NR_CPUS < 16K
> + *  0- 7: locked byte
> + *     8: pending
> + *  9-15: not used
> + * 16-17: tail index
> + * 18-31: tail cpu (+1)
> + *
> + * When NR_CPUS >= 16K
>   *  0- 7: locked byte
>   *     8: pending
>   *  9-10: tail index
> @@ -50,7 +58,11 @@ typedef struct qspinlock {
>  #define _Q_LOCKED_MASK		_Q_SET_MASK(LOCKED)
>  
>  #define _Q_PENDING_OFFSET	(_Q_LOCKED_OFFSET + _Q_LOCKED_BITS)
> +#if CONFIG_NR_CPUS < (1U << 14)
> +#define _Q_PENDING_BITS		8
> +#else
>  #define _Q_PENDING_BITS		1
> +#endif
>  #define _Q_PENDING_MASK		_Q_SET_MASK(PENDING)
>  
>  #define _Q_TAIL_IDX_OFFSET	(_Q_PENDING_OFFSET + _Q_PENDING_BITS)
> @@ -61,6 +73,7 @@ typedef struct qspinlock {
>  #define _Q_TAIL_CPU_BITS	(32 - _Q_TAIL_CPU_OFFSET)
>  #define _Q_TAIL_CPU_MASK	_Q_SET_MASK(TAIL_CPU)
>  
> +#define _Q_TAIL_OFFSET		_Q_TAIL_IDX_OFFSET
>  #define _Q_TAIL_MASK		(_Q_TAIL_IDX_MASK | _Q_TAIL_CPU_MASK)
>  
>  #define _Q_LOCKED_VAL		(1U << _Q_LOCKED_OFFSET)
> --- a/kernel/locking/qspinlock.c
> +++ b/kernel/locking/qspinlock.c
> @@ -22,6 +22,7 @@
>  #include <linux/percpu.h>
>  #include <linux/hardirq.h>
>  #include <linux/mutex.h>
> +#include <asm/byteorder.h>
>  #include <asm/qspinlock.h>
>  
>  /*
> @@ -48,6 +49,9 @@
>   * We can further change the first spinner to spin on a bit in the lock word
>   * instead of its node; whereby avoiding the need to carry a node from lock to
>   * unlock, and preserving API.
> + *
> + * N.B. The current implementation only supports architectures that allow
> + *      atomic operations on smaller 8-bit and 16-bit data types.
>   */
>  
>  #include "mcs_spinlock.h"
> @@ -85,6 +89,87 @@ static inline struct mcs_spinlock *decod
>  
>  #define _Q_LOCKED_PENDING_MASK	(_Q_LOCKED_MASK | _Q_PENDING_MASK)
>  
> +/*
> + * By using the whole 2nd least significant byte for the pending bit, we
> + * can allow better optimization of the lock acquisition for the pending
> + * bit holder.
> + */
> +#if _Q_PENDING_BITS == 8
> +
> +struct __qspinlock {
> +	union {
> +		atomic_t val;
> +		struct {
> +#ifdef __LITTLE_ENDIAN
> +			u16	locked_pending;
> +			u16	tail;
> +#else
> +			u16	tail;
> +			u16	locked_pending;
> +#endif
> +		};
> +	};
> +};
> +
> +/**
> + * clear_pending_set_locked - take ownership and clear the pending bit.
> + * @lock: Pointer to queue spinlock structure
> + * @val : Current value of the queue spinlock 32-bit word
> + *
> + * *,1,0 -> *,0,1
> + *
> + * Lock stealing is not allowed if this function is used.
> + */
> +static __always_inline void
> +clear_pending_set_locked(struct qspinlock *lock, u32 val)
> +{
> +	struct __qspinlock *l = (void *)lock;
> +
> +	ACCESS_ONCE(l->locked_pending) = _Q_LOCKED_VAL;
> +}
> +
> +/*
> + * xchg_tail - Put in the new queue tail code word & retrieve previous one

Missing full stop.
> + * @lock : Pointer to queue spinlock structure
> + * @tail : The new queue tail code word
> + * Return: The previous queue tail code word
> + *
> + * xchg(lock, tail)
> + *
> + * p,*,* -> n,*,* ; prev = xchg(lock, node)
> + */
> +static __always_inline u32 xchg_tail(struct qspinlock *lock, u32 tail)
> +{
> +	struct __qspinlock *l = (void *)lock;
> +
> +	return (u32)xchg(&l->tail, tail >> _Q_TAIL_OFFSET) << _Q_TAIL_OFFSET;
> +}
> +
> +#else /* _Q_PENDING_BITS == 8 */
> +
> +/**
> + * clear_pending_set_locked - take ownership and clear the pending bit.
> + * @lock: Pointer to queue spinlock structure
> + * @val : Current value of the queue spinlock 32-bit word
> + *
> + * *,1,0 -> *,0,1
> + */
> +static __always_inline void
> +clear_pending_set_locked(struct qspinlock *lock, u32 val)
> +{
> +	u32 new, old;
> +
> +	for (;;) {
> +		new = (val & ~_Q_PENDING_MASK) | _Q_LOCKED_VAL;
> +
> +		old = atomic_cmpxchg(&lock->val, val, new);
> +		if (old == val)
> +			break;
> +
> +		val = old;
> +	}
> +}
> +
>  /**
>   * xchg_tail - Put in the new queue tail code word & retrieve previous one
>   * @lock : Pointer to queue spinlock structure
> @@ -109,6 +194,7 @@ static __always_inline u32 xchg_tail(str
>  	}
>  	return old;
>  }
> +#endif /* _Q_PENDING_BITS == 8 */
>  
>  /**
>   * queue_spin_lock_slowpath - acquire the queue spinlock
> @@ -173,8 +259,13 @@ void queue_spin_lock_slowpath(struct qsp
>  	 * we're pending, wait for the owner to go away.
>  	 *
>  	 * *,1,1 -> *,1,0
> +	 *
> +	 * this wait loop must be a load-acquire such that we match the
> +	 * store-release that clears the locked bit and create lock
> +	 * sequentiality; this because not all clear_pending_set_locked()
> +	 * implementations imply full barriers.
>  	 */
> -	while ((val = atomic_read(&lock->val)) & _Q_LOCKED_MASK)
> +	while ((val = smp_load_acquire(&lock->val.counter)) & _Q_LOCKED_MASK)

lock->val.counter? Ugh, all to deal with the 'int' -> 'u32' (or 'u64')

Could you introduce a macro in atomic.h called 'atomic_read_raw' which
would do the this? Like this:


diff --git a/include/linux/atomic.h b/include/linux/atomic.h
index fef3a80..5a83750 100644
--- a/include/linux/atomic.h
+++ b/include/linux/atomic.h
@@ -160,6 +160,8 @@ static inline void atomic_or(int i, atomic_t *v)
 }
 #endif /* #ifndef CONFIG_ARCH_HAS_ATOMIC_OR */
 
+#define atomic_read_raw(v)	(v.counter)
+
 #include <asm-generic/atomic-long.h>
 #ifdef CONFIG_GENERIC_ATOMIC64
 #include <asm-generic/atomic64.h>
diff --git a/kernel/locking/qspinlock.c b/kernel/locking/qspinlock.c
index fc7fd8c..2833fe1 100644
--- a/kernel/locking/qspinlock.c
+++ b/kernel/locking/qspinlock.c
@@ -265,7 +265,7 @@ void queue_spin_lock_slowpath(struct qspinlock *lock, u32 val)
 	 * sequentiality; this because not all clear_pending_set_locked()
 	 * implementations imply full barriers.
 	 */
-	while ((val = smp_load_acquire(&lock->val.counter)) & _Q_LOCKED_MASK)
+	while ((val = smp_load_acquire(atomic_read_raw(&lock->val))) & _Q_LOCKED_MASK)
 		arch_mutex_cpu_relax();
 
 	/*

?
>  		cpu_relax();
>  
>  	/*
> @@ -182,15 +273,7 @@ void queue_spin_lock_slowpath(struct qsp
>  	 *
>  	 * *,1,0 -> *,0,1
>  	 */
> -	for (;;) {
> -		new = (val & ~_Q_PENDING_MASK) | _Q_LOCKED_VAL;
> -
> -		old = atomic_cmpxchg(&lock->val, val, new);
> -		if (old == val)
> -			break;
> -
> -		val = old;
> -	}
> +	clear_pending_set_locked(lock, val);
>  	return;
>  
>  	/*
> 
> 
--
To unsubscribe from this list: send the line "unsubscribe linux-arch" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html




[Index of Archives]     [Linux Kernel]     [Kernel Newbies]     [x86 Platform Driver]     [Netdev]     [Linux Wireless]     [Netfilter]     [Bugtraq]     [Linux Filesystems]     [Yosemite Discussion]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux RAID]     [Samba]     [Device Mapper]

  Powered by Linux