On Sun, Jun 15, 2014 at 02:47:02PM +0200, Peter Zijlstra wrote: > From: Peter Zijlstra <peterz@xxxxxxxxxxxxx> > > When we allow for a max NR_CPUS < 2^14 we can optimize the pending > wait-acquire and the xchg_tail() operations. > > By growing the pending bit to a byte, we reduce the tail to 16bit. > This means we can use xchg16 for the tail part and do away with all > the repeated compxchg() operations. > > This in turn allows us to unconditionally acquire; the locked state > as observed by the wait loops cannot change. And because both locked > and pending are now a full byte we can use simple stores for the > state transition, obviating one atomic operation entirely. I have to ask - how much more performance do you get from this? Is this extra atomic operation hurting that much? > > All this is horribly broken on Alpha pre EV56 (and any other arch that > cannot do single-copy atomic byte stores). > > Signed-off-by: Peter Zijlstra <peterz@xxxxxxxxxxxxx> > --- > include/asm-generic/qspinlock_types.h | 13 ++++ > kernel/locking/qspinlock.c | 103 ++++++++++++++++++++++++++++++---- > 2 files changed, 106 insertions(+), 10 deletions(-) > > --- a/include/asm-generic/qspinlock_types.h > +++ b/include/asm-generic/qspinlock_types.h > @@ -38,6 +38,14 @@ typedef struct qspinlock { > /* > * Bitfields in the atomic value: > * > + * When NR_CPUS < 16K > + * 0- 7: locked byte > + * 8: pending > + * 9-15: not used > + * 16-17: tail index > + * 18-31: tail cpu (+1) > + * > + * When NR_CPUS >= 16K > * 0- 7: locked byte > * 8: pending > * 9-10: tail index > @@ -50,7 +58,11 @@ typedef struct qspinlock { > #define _Q_LOCKED_MASK _Q_SET_MASK(LOCKED) > > #define _Q_PENDING_OFFSET (_Q_LOCKED_OFFSET + _Q_LOCKED_BITS) > +#if CONFIG_NR_CPUS < (1U << 14) > +#define _Q_PENDING_BITS 8 > +#else > #define _Q_PENDING_BITS 1 > +#endif > #define _Q_PENDING_MASK _Q_SET_MASK(PENDING) > > #define _Q_TAIL_IDX_OFFSET (_Q_PENDING_OFFSET + _Q_PENDING_BITS) > @@ -61,6 +73,7 @@ typedef struct qspinlock { > #define _Q_TAIL_CPU_BITS (32 - _Q_TAIL_CPU_OFFSET) > #define _Q_TAIL_CPU_MASK _Q_SET_MASK(TAIL_CPU) > > +#define _Q_TAIL_OFFSET _Q_TAIL_IDX_OFFSET > #define _Q_TAIL_MASK (_Q_TAIL_IDX_MASK | _Q_TAIL_CPU_MASK) > > #define _Q_LOCKED_VAL (1U << _Q_LOCKED_OFFSET) > --- a/kernel/locking/qspinlock.c > +++ b/kernel/locking/qspinlock.c > @@ -22,6 +22,7 @@ > #include <linux/percpu.h> > #include <linux/hardirq.h> > #include <linux/mutex.h> > +#include <asm/byteorder.h> > #include <asm/qspinlock.h> > > /* > @@ -48,6 +49,9 @@ > * We can further change the first spinner to spin on a bit in the lock word > * instead of its node; whereby avoiding the need to carry a node from lock to > * unlock, and preserving API. > + * > + * N.B. The current implementation only supports architectures that allow > + * atomic operations on smaller 8-bit and 16-bit data types. > */ > > #include "mcs_spinlock.h" > @@ -85,6 +89,87 @@ static inline struct mcs_spinlock *decod > > #define _Q_LOCKED_PENDING_MASK (_Q_LOCKED_MASK | _Q_PENDING_MASK) > > +/* > + * By using the whole 2nd least significant byte for the pending bit, we > + * can allow better optimization of the lock acquisition for the pending > + * bit holder. > + */ > +#if _Q_PENDING_BITS == 8 > + > +struct __qspinlock { > + union { > + atomic_t val; > + struct { > +#ifdef __LITTLE_ENDIAN > + u16 locked_pending; > + u16 tail; > +#else > + u16 tail; > + u16 locked_pending; > +#endif > + }; > + }; > +}; > + > +/** > + * clear_pending_set_locked - take ownership and clear the pending bit. > + * @lock: Pointer to queue spinlock structure > + * @val : Current value of the queue spinlock 32-bit word > + * > + * *,1,0 -> *,0,1 > + * > + * Lock stealing is not allowed if this function is used. > + */ > +static __always_inline void > +clear_pending_set_locked(struct qspinlock *lock, u32 val) > +{ > + struct __qspinlock *l = (void *)lock; > + > + ACCESS_ONCE(l->locked_pending) = _Q_LOCKED_VAL; > +} > + > +/* > + * xchg_tail - Put in the new queue tail code word & retrieve previous one Missing full stop. > + * @lock : Pointer to queue spinlock structure > + * @tail : The new queue tail code word > + * Return: The previous queue tail code word > + * > + * xchg(lock, tail) > + * > + * p,*,* -> n,*,* ; prev = xchg(lock, node) > + */ > +static __always_inline u32 xchg_tail(struct qspinlock *lock, u32 tail) > +{ > + struct __qspinlock *l = (void *)lock; > + > + return (u32)xchg(&l->tail, tail >> _Q_TAIL_OFFSET) << _Q_TAIL_OFFSET; > +} > + > +#else /* _Q_PENDING_BITS == 8 */ > + > +/** > + * clear_pending_set_locked - take ownership and clear the pending bit. > + * @lock: Pointer to queue spinlock structure > + * @val : Current value of the queue spinlock 32-bit word > + * > + * *,1,0 -> *,0,1 > + */ > +static __always_inline void > +clear_pending_set_locked(struct qspinlock *lock, u32 val) > +{ > + u32 new, old; > + > + for (;;) { > + new = (val & ~_Q_PENDING_MASK) | _Q_LOCKED_VAL; > + > + old = atomic_cmpxchg(&lock->val, val, new); > + if (old == val) > + break; > + > + val = old; > + } > +} > + > /** > * xchg_tail - Put in the new queue tail code word & retrieve previous one > * @lock : Pointer to queue spinlock structure > @@ -109,6 +194,7 @@ static __always_inline u32 xchg_tail(str > } > return old; > } > +#endif /* _Q_PENDING_BITS == 8 */ > > /** > * queue_spin_lock_slowpath - acquire the queue spinlock > @@ -173,8 +259,13 @@ void queue_spin_lock_slowpath(struct qsp > * we're pending, wait for the owner to go away. > * > * *,1,1 -> *,1,0 > + * > + * this wait loop must be a load-acquire such that we match the > + * store-release that clears the locked bit and create lock > + * sequentiality; this because not all clear_pending_set_locked() > + * implementations imply full barriers. > */ > - while ((val = atomic_read(&lock->val)) & _Q_LOCKED_MASK) > + while ((val = smp_load_acquire(&lock->val.counter)) & _Q_LOCKED_MASK) lock->val.counter? Ugh, all to deal with the 'int' -> 'u32' (or 'u64') Could you introduce a macro in atomic.h called 'atomic_read_raw' which would do the this? Like this: diff --git a/include/linux/atomic.h b/include/linux/atomic.h index fef3a80..5a83750 100644 --- a/include/linux/atomic.h +++ b/include/linux/atomic.h @@ -160,6 +160,8 @@ static inline void atomic_or(int i, atomic_t *v) } #endif /* #ifndef CONFIG_ARCH_HAS_ATOMIC_OR */ +#define atomic_read_raw(v) (v.counter) + #include <asm-generic/atomic-long.h> #ifdef CONFIG_GENERIC_ATOMIC64 #include <asm-generic/atomic64.h> diff --git a/kernel/locking/qspinlock.c b/kernel/locking/qspinlock.c index fc7fd8c..2833fe1 100644 --- a/kernel/locking/qspinlock.c +++ b/kernel/locking/qspinlock.c @@ -265,7 +265,7 @@ void queue_spin_lock_slowpath(struct qspinlock *lock, u32 val) * sequentiality; this because not all clear_pending_set_locked() * implementations imply full barriers. */ - while ((val = smp_load_acquire(&lock->val.counter)) & _Q_LOCKED_MASK) + while ((val = smp_load_acquire(atomic_read_raw(&lock->val))) & _Q_LOCKED_MASK) arch_mutex_cpu_relax(); /* ? > cpu_relax(); > > /* > @@ -182,15 +273,7 @@ void queue_spin_lock_slowpath(struct qsp > * > * *,1,0 -> *,0,1 > */ > - for (;;) { > - new = (val & ~_Q_PENDING_MASK) | _Q_LOCKED_VAL; > - > - old = atomic_cmpxchg(&lock->val, val, new); > - if (old == val) > - break; > - > - val = old; > - } > + clear_pending_set_locked(lock, val); > return; > > /* > > -- To unsubscribe from this list: send the line "unsubscribe linux-arch" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html