By applying well known spin-on-lock-owner techniques, we can avoid the blocking overhead during the process of when the task is trying to take the rtmutex. The idea is that as long as the owner is running, there is a fair chance it'll release the lock soon, and thus a task trying to acquire the rtmutex will better off spinning instead of blocking immediately after the fastpath. This is similar to what we use for other locks, borrowed from -rt. The main difference (due to the obvious real-time constraints) is that top-waiter spinning must account for any new higher priority waiter, and therefore cannot steal the lock and avoid any pi-dance. As such there will be at most only one spinner waiter upon contended lock. Conditions to stop spinning and block are simple: (1) Upon need_resched() (2) Current lock owner blocks (3) We are no longer the top-waiter The unlock side remains unchanged as wake_up_process() can safely deal with calls where the task is not actually blocked (TASK_NORMAL). The biggest concern would perhaps be if we relied on any implicit barriers (in that the wake_up_process call would not imply it anymore since nothing was awoken), but this is not the case. As such, there is only unnecessary overhead dealing with the wake_q, but this allows us not to miss any wakeups between the spinning step and the unlocking side. Measuring the amount of priority inversions of the pi_stress program, there is some improvement in throughput during a 30 second window. On a 32-core box, with increasing thread-group count: pistress 4.4.3 4.4.3 vanilla rtmutex-topspinner Hmean 1 2321586.73 ( 0.00%) 2339847.23 ( 0.79%) Hmean 4 8209026.49 ( 0.00%) 8597809.55 ( 4.74%) Hmean 7 12655322.45 ( 0.00%) 13194896.45 ( 4.26%) Hmean 12 4210477.03 ( 0.00%) 4348643.08 ( 3.28%) Hmean 21 2996823.05 ( 0.00%) 3104513.47 ( 3.59%) Hmean 30 2463107.53 ( 0.00%) 2584275.71 ( 4.91%) Hmean 48 2656668.46 ( 0.00%) 2719324.53 ( 2.36%) Hmean 64 2397253.65 ( 0.00%) 2471628.92 ( 3.10%) Stddev 1 653473.88 ( 0.00%) 527076.59 (-19.34%) Stddev 4 664995.50 ( 0.00%) 359487.15 (-45.94%) Stddev 7 248476.88 ( 0.00%) 278307.31 ( 12.01%) Stddev 12 74537.42 ( 0.00%) 54305.86 (-27.14%) Stddev 21 72143.80 ( 0.00%) 40371.42 (-44.04%) Stddev 30 31981.43 ( 0.00%) 42306.07 ( 32.28%) Stddev 48 21317.95 ( 0.00%) 42608.50 ( 99.87%) Stddev 64 23433.99 ( 0.00%) 21502.56 ( -8.24%) Signed-off-by: Davidlohr Bueso <dbueso@xxxxxxx> --- Changes from v1: Stop spinning and block if we are no longer the top-waiter for the lock. This also prevents having more than one spinner at a time, as the old waiter would otherwise still think its next in line for the lock. kernel/Kconfig.locks | 4 ++ kernel/locking/rtmutex.c | 92 ++++++++++++++++++++++++++++++++++------- kernel/locking/rtmutex_common.h | 4 +- 3 files changed, 82 insertions(+), 18 deletions(-) diff --git a/kernel/Kconfig.locks b/kernel/Kconfig.locks index ebdb0043203a..e20790cdc446 100644 --- a/kernel/Kconfig.locks +++ b/kernel/Kconfig.locks @@ -227,6 +227,10 @@ config MUTEX_SPIN_ON_OWNER def_bool y depends on SMP && !DEBUG_MUTEXES && ARCH_SUPPORTS_ATOMIC_RMW +config RT_MUTEX_SPIN_ON_OWNER + def_bool y + depends on SMP && RT_MUTEXES && !DEBUG_RT_MUTEXES && ARCH_SUPPORTS_ATOMIC_RMW + config RWSEM_SPIN_ON_OWNER def_bool y depends on SMP && RWSEM_XCHGADD_ALGORITHM && ARCH_SUPPORTS_ATOMIC_RMW diff --git a/kernel/locking/rtmutex.c b/kernel/locking/rtmutex.c index 1ec0f48962b3..a789528172c2 100644 --- a/kernel/locking/rtmutex.c +++ b/kernel/locking/rtmutex.c @@ -54,13 +54,13 @@ rt_mutex_set_owner(struct rt_mutex *lock, struct task_struct *owner) if (rt_mutex_has_waiters(lock)) val |= RT_MUTEX_HAS_WAITERS; - lock->owner = (struct task_struct *)val; + WRITE_ONCE(lock->owner, (struct task_struct *)val); } static inline void clear_rt_mutex_waiters(struct rt_mutex *lock) { - lock->owner = (struct task_struct *) - ((unsigned long)lock->owner & ~RT_MUTEX_HAS_WAITERS); + WRITE_ONCE(lock->owner, (struct task_struct *) + ((unsigned long)lock->owner & ~RT_MUTEX_HAS_WAITERS)); } static void fixup_rt_mutex_waiters(struct rt_mutex *lock) @@ -198,7 +198,7 @@ rt_mutex_enqueue(struct rt_mutex *lock, struct rt_mutex_waiter *waiter) } if (leftmost) - lock->waiters_leftmost = &waiter->tree_entry; + WRITE_ONCE(lock->waiters_leftmost, &waiter->tree_entry); rb_link_node(&waiter->tree_entry, parent, link); rb_insert_color(&waiter->tree_entry, &lock->waiters); @@ -210,8 +210,8 @@ rt_mutex_dequeue(struct rt_mutex *lock, struct rt_mutex_waiter *waiter) if (RB_EMPTY_NODE(&waiter->tree_entry)) return; - if (lock->waiters_leftmost == &waiter->tree_entry) - lock->waiters_leftmost = rb_next(&waiter->tree_entry); + if (READ_ONCE(lock->waiters_leftmost) == &waiter->tree_entry) + WRITE_ONCE(lock->waiters_leftmost, rb_next(&waiter->tree_entry)); rb_erase(&waiter->tree_entry, &lock->waiters); RB_CLEAR_NODE(&waiter->tree_entry); @@ -989,14 +989,17 @@ static void mark_wakeup_next_waiter(struct wake_q_head *wake_q, rt_mutex_dequeue_pi(current, waiter); /* - * As we are waking up the top waiter, and the waiter stays - * queued on the lock until it gets the lock, this lock - * obviously has waiters. Just set the bit here and this has - * the added benefit of forcing all new tasks into the - * slow path making sure no task of lower priority than - * the top waiter can steal this lock. + * As we are potentially waking up the top waiter, and the waiter + * stays queued on the lock until it gets the lock, this lock + * obviously has waiters. Just set the bit here and this has the + * added benefit of forcing all new tasks into the slow path + * making sure no task of lower priority than the top waiter can + * steal this lock. + * + * If the top waiter, otoh, is spinning on ->owner, this will also + * serve to exit out of the loop and try to acquire the lock. */ - lock->owner = (void *) RT_MUTEX_HAS_WAITERS; + WRITE_ONCE(lock->owner, (void *) RT_MUTEX_HAS_WAITERS); raw_spin_unlock(¤t->pi_lock); @@ -1089,6 +1092,51 @@ void rt_mutex_adjust_pi(struct task_struct *task) next_lock, NULL, task); } +#ifdef CONFIG_RT_MUTEX_SPIN_ON_OWNER +static bool rt_mutex_spin_on_owner(struct rt_mutex *lock, + struct task_struct *owner, + struct rt_mutex_waiter *waiter) +{ + bool ret = true; + + /* + * The last owner could have just released the lock, + * immediately try taking it again. + */ + if (!owner) + goto done; + + rcu_read_lock(); + while (rt_mutex_owner(lock) == owner) { + /* + * Ensure we emit the owner->on_cpu, dereference _after_ + * checking lock->owner still matches owner. If that fails, + * owner might point to freed memory. If it still matches, + * the rcu_read_lock() ensures the memory stays valid. + */ + barrier(); + if (!owner->on_cpu || need_resched() || + rt_mutex_top_waiter(lock) != waiter) { + ret = false; + break; + } + + cpu_relax_lowlatency(); + } + rcu_read_unlock(); +done: + return ret; +} + +#else +static bool rt_mutex_spin_on_owner(struct rt_mutex *lock, + struct task_struct *owner, + struct rt_mutex_waiter *waiter) +{ + return false; +} +#endif + /** * __rt_mutex_slowlock() - Perform the wait-wake-try-to-take loop * @lock: the rt_mutex to take @@ -1107,6 +1155,8 @@ __rt_mutex_slowlock(struct rt_mutex *lock, int state, int ret = 0; for (;;) { + struct rt_mutex_waiter *top_waiter = NULL; + /* Try to acquire the lock: */ if (try_to_take_rt_mutex(lock, current, waiter)) break; @@ -1125,11 +1175,21 @@ __rt_mutex_slowlock(struct rt_mutex *lock, int state, break; } + top_waiter = rt_mutex_top_waiter(lock); + raw_spin_unlock_irq(&lock->wait_lock); debug_rt_mutex_print_deadlock(waiter); - schedule(); + /* + * At this point the PI-dance is done, and, as the top waiter, + * we are next in line for the lock. Try to spin on the current + * owner for a while, in the hope that the lock will be released + * soon. Otherwise fallback and block. + */ + if (top_waiter != waiter || + !rt_mutex_spin_on_owner(lock, rt_mutex_owner(lock), waiter)) + schedule(); raw_spin_lock_irq(&lock->wait_lock); set_current_state(state); @@ -1555,7 +1615,7 @@ EXPORT_SYMBOL_GPL(__rt_mutex_init); * rt_mutex_init_proxy_locked - initialize and lock a rt_mutex on behalf of a * proxy owner * - * @lock: the rt_mutex to be locked + * @lock: the rt_mutex to be locked * @proxy_owner:the task to set as owner * * No locking. Caller has to do serializing itself @@ -1573,7 +1633,7 @@ void rt_mutex_init_proxy_locked(struct rt_mutex *lock, /** * rt_mutex_proxy_unlock - release a lock on behalf of owner * - * @lock: the rt_mutex to be locked + * @lock: the rt_mutex to be locked * * No locking. Caller has to do serializing itself * Special API call for PI-futex support diff --git a/kernel/locking/rtmutex_common.h b/kernel/locking/rtmutex_common.h index 4f5f83c7d2d3..dc80fce3b407 100644 --- a/kernel/locking/rtmutex_common.h +++ b/kernel/locking/rtmutex_common.h @@ -48,7 +48,7 @@ rt_mutex_top_waiter(struct rt_mutex *lock) { struct rt_mutex_waiter *w; - w = rb_entry(lock->waiters_leftmost, struct rt_mutex_waiter, + w = rb_entry(READ_ONCE(lock->waiters_leftmost), struct rt_mutex_waiter, tree_entry); BUG_ON(w->lock != lock); @@ -76,7 +76,7 @@ task_top_pi_waiter(struct task_struct *p) static inline struct task_struct *rt_mutex_owner(struct rt_mutex *lock) { return (struct task_struct *) - ((unsigned long)lock->owner & ~RT_MUTEX_OWNER_MASKALL); + ((unsigned long)READ_ONCE(lock->owner) & ~RT_MUTEX_OWNER_MASKALL); } /* -- 2.6.6 -- To unsubscribe from this list: send the line "unsubscribe linux-doc" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html