On Wed, Mar 18, 2015 at 04:50:37PM -0400, Waiman Long wrote: > >+ this_cpu_write(__pv_lock_wait, lock); > > We may run into the same problem of needing to have 4 queue nodes per CPU. > If an interrupt happens just after the write and before the actual wait and > it goes through the same sequence, it will overwrite the __pv_lock_wait[] > entry. So we may have lost wakeup. That is why the pvticket lock code did > that just before the actual wait with interrupt disabled. We probably > couldn't disable interrupt here. So we may need to move the actual write to > the KVM and Xen code if we keep the current logic. Hmm indeed. So I didn't actually mean to keep this code, but yes I missed that. > >+ /* > >+ * At this point the memory pointed at by lock can be freed/reused, > >+ * however we can still use the pointer value to search in our cpu > >+ * array. > >+ * > >+ * XXX: get rid of this loop > >+ */ > >+ for_each_possible_cpu(cpu) { > >+ if (per_cpu(__pv_lock_wait, cpu) == lock) > >+ pv_kick(cpu); > >+ } > >+} > > I do want to get rid of this loop too. On average, we have to scan about > half the number of CPUs available. So it isn't that different > performance-wise compared with my original idea of following the list from > tail to head. And how about your idea of propagating the current head down > the linked list? Yeah, so I was half done with that patch when I fell asleep on the flight home. Didn't get around to 'fixing' it. It applies and builds but doesn't actually work. _However_ I'm not sure I actually like it much. The problem I have with it are these wait loops, they can generate the same problem we're trying to avoid. So I was now thinking of hashing the lock pointer; let me go and quickly put something together. --- kernel/locking/qspinlock.c | 8 +- kernel/locking/qspinlock_paravirt.h | 124 ++++++++++++++++++++++++++++-------- 2 files changed, 101 insertions(+), 31 deletions(-) --- a/kernel/locking/qspinlock.c +++ b/kernel/locking/qspinlock.c @@ -246,10 +246,10 @@ static __always_inline void set_locked(s */ static __always_inline void __pv_init_node(struct mcs_spinlock *node) { } -static __always_inline void __pv_wait_node(struct mcs_spinlock *node) { } +static __always_inline void __pv_wait_node(u32 old, struct mcs_spinlock *node) { } static __always_inline void __pv_kick_node(struct mcs_spinlock *node) { } -static __always_inline void __pv_wait_head(struct qspinlock *lock) { } +static __always_inline void __pv_wait_head(struct qspinlock *lock, struct mcs_spinlock *node) { } #define pv_enabled() false @@ -399,7 +399,7 @@ void queue_spin_lock_slowpath(struct qsp prev = decode_tail(old); WRITE_ONCE(prev->next, node); - pv_wait_node(node); + pv_wait_node(old, node); arch_mcs_spin_lock_contended(&node->locked); } @@ -414,7 +414,7 @@ void queue_spin_lock_slowpath(struct qsp * sequentiality; this is because the set_locked() function below * does not imply a full barrier. */ - pv_wait_head(lock); + pv_wait_head(lock, node); while ((val = smp_load_acquire(&lock->val.counter)) & _Q_LOCKED_PENDING_MASK) cpu_relax(); --- a/kernel/locking/qspinlock_paravirt.h +++ b/kernel/locking/qspinlock_paravirt.h @@ -29,8 +29,14 @@ struct pv_node { int cpu; u8 state; + struct pv_node *head; }; +static inline struct pv_node *pv_decode_tail(u32 tail) +{ + return (struct pv_node *)decode_tail(tail); +} + /* * Initialize the PV part of the mcs_spinlock node. */ @@ -42,17 +48,49 @@ static void pv_init_node(struct mcs_spin pn->cpu = smp_processor_id(); pn->state = vcpu_running; + pn->head = NULL; +} + +static void pv_propagate_head(struct pv_node *pn, struct pv_node *ppn) +{ + /* + * When we race against the first waiter or ourselves we have to wait + * until the previous link updates its head pointer before we can + * propagate it. + */ + while (!READ_ONCE(ppn->head)) { + /* + * queue_spin_lock_slowpath could have been waiting for the + * node->next store above before setting node->locked. + */ + if (ppn->mcs.locked) + return; + + cpu_relax(); + } + /* + * If we interleave such that the store from pv_set_head() happens + * between our load and store we could have over-written the new head + * value. + * + * Therefore use cmpxchg to only propagate the old head value if our + * head value is unmodified. + */ + (void)cmpxchg(&pn->head, NULL, READ_ONCE(ppn->head)); } /* * Wait for node->locked to become true, halt the vcpu after a short spin. * pv_kick_node() is used to wake the vcpu again. */ -static void pv_wait_node(struct mcs_spinlock *node) +static void pv_wait_node(u32 old, struct mcs_spinlock *node) { + struct pv_node *ppn = pv_decode_tail(old); struct pv_node *pn = (struct pv_node *)node; int loop; + pv_propagate_head(pn, ppn); + for (;;) { for (loop = SPIN_THRESHOLD; loop; loop--) { if (READ_ONCE(node->locked)) @@ -107,13 +145,33 @@ static void pv_kick_node(struct mcs_spin pv_kick(pn->cpu); } -static DEFINE_PER_CPU(struct qspinlock *, __pv_lock_wait); +static void pv_set_head(struct qspinlock *lock, struct pv_node *head) +{ + struct pv_node *tail, *new_tail; + + new_tail = pv_decode_tail(atomic_read(&lock->val)); + do { + tail = new_tail; + while (!READ_ONCE(tail->head)) + cpu_relax(); + + (void)xchg(&tail->head, head); + /* + * pv_set_head() pv_wait_node() + * + * [S] tail->head, head [S] lock->tail + * MB MB + * [L] lock->tail [L] tail->head + */ + new_tail = pv_decode_tail(atomic_read(&lock->val)); + } while (tail != new_tail); +} /* * Wait for l->locked to become clear; halt the vcpu after a short spin. * __pv_queue_spin_unlock() will wake us. */ -static void pv_wait_head(struct qspinlock *lock) +static void pv_wait_head(struct qspinlock *lock, struct mcs_spinlock *node) { struct __qspinlock *l = (void *)lock; int loop; @@ -121,28 +179,24 @@ static void pv_wait_head(struct qspinloc for (;;) { for (loop = SPIN_THRESHOLD; loop; loop--) { if (!READ_ONCE(l->locked)) - goto done; + return; cpu_relax(); } - this_cpu_write(__pv_lock_wait, lock); + pv_set_head(lock, (struct pv_node *)node); /* - * __pv_lock_wait must be set before setting _Q_SLOW_VAL - * - * [S] __pv_lock_wait = lock [RmW] l = l->locked = 0 - * MB MB - * [S] l->locked = _Q_SLOW_VAL [L] __pv_lock_wait + * The head must be set before we set _Q_SLOW_VAL such that + * when pv_queue_spin_unlock() observes _Q_SLOW_VAL it must + * also observe the head pointer. * - * Matches the xchg() in pv_queue_spin_unlock(). + * We rely on the implied MB from the below cmpxchg() */ if (!cmpxchg(&l->locked, _Q_LOCKED_VAL, _Q_SLOW_VAL)) - goto done; + return; pv_wait(&l->locked, _Q_SLOW_VAL); } -done: - this_cpu_write(__pv_lock_wait, NULL); /* * Lock is unlocked now; the caller will acquire it without waiting. @@ -157,21 +211,37 @@ static void pv_wait_head(struct qspinloc */ void __pv_queue_spin_unlock(struct qspinlock *lock) { - struct __qspinlock *l = (void *)lock; - int cpu; + u32 old, val = atomic_read(&lock->val); + struct pv_node *pn = NULL; - if (xchg(&l->locked, 0) != _Q_SLOW_VAL) - return; + for (;;) { + if (val & _Q_SLOW_VAL) { + /* + * If we observe _Q_SLOW_VAL we must also observe + * pn->head; see pv_wait_head(); + */ + smp_rmb(); + pn = pv_decode_tail(val); + while (!READ_ONCE(pn->head)) + cpu_relax(); + pn = READ_ONCE(pn->head); + } + /* + * The cmpxchg() ensures the above loads are complete before + * we release the lock. + */ + old = atomic_cmpxchg(&lock->val, val, val & _Q_TAIL_MASK); + if (old == val) + break; + + val = old; + } /* - * At this point the memory pointed at by lock can be freed/reused, - * however we can still use the pointer value to search in our cpu - * array. - * - * XXX: get rid of this loop + * We've freed the lock so the lock storage can be gone; however since + * the pv_node structure is static storage we can safely use that + * still. */ - for_each_possible_cpu(cpu) { - if (per_cpu(__pv_lock_wait, cpu) == lock) - pv_kick(cpu); - } + if (pn) + pv_kick(pn->cpu); } -- To unsubscribe from this list: send the line "unsubscribe linux-arch" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html