This patch enables readers to optimistically spin on a rwsem when it is owned by a writer instead of going to sleep directly. The rwsem_can_spin_on_owner() function is extracted out of rwsem_optimistic_spin() and is called directly by __rwsem_down_read_failed_common() and __rwsem_down_write_failed_common(). This patch may actually reduce performance under certain circumstances for reader-mostly workload as the readers may not be grouped together in the wait queue anymore. So we may have a number of small reader groups among writers instead of a large reader group. However, this change is needed for some of the subsequent patches. With a locking microbenchmark running on 5.0 based kernel, the total locking rates (in kops/s) of the benchmark on a 4-socket 56-core x86-64 system with equal numbers of readers and writers before and after the patch were as follows: # of Threads Pre-patch Post-patch ------------ --------- ---------- 2 1,926 2,120 4 1,391 1,320 8 716 694 16 618 606 32 501 487 64 61 57 Signed-off-by: Waiman Long <longman@xxxxxxxxxx> --- kernel/locking/lock_events_list.h | 1 + kernel/locking/rwsem-xadd.c | 80 ++++++++++++++++++++++++++++++++++----- kernel/locking/rwsem-xadd.h | 3 ++ 3 files changed, 74 insertions(+), 10 deletions(-) diff --git a/kernel/locking/lock_events_list.h b/kernel/locking/lock_events_list.h index 4cde507..54b6650 100644 --- a/kernel/locking/lock_events_list.h +++ b/kernel/locking/lock_events_list.h @@ -57,6 +57,7 @@ LOCK_EVENT(rwsem_sleep_writer) /* # of writer sleeps */ LOCK_EVENT(rwsem_wake_reader) /* # of reader wakeups */ LOCK_EVENT(rwsem_wake_writer) /* # of writer wakeups */ +LOCK_EVENT(rwsem_opt_rlock) /* # of read locks opt-spin acquired */ LOCK_EVENT(rwsem_opt_wlock) /* # of write locks opt-spin acquired */ LOCK_EVENT(rwsem_opt_fail) /* # of failed opt-spinnings */ LOCK_EVENT(rwsem_rlock) /* # of read locks acquired */ diff --git a/kernel/locking/rwsem-xadd.c b/kernel/locking/rwsem-xadd.c index 0a29aac..015edd6 100644 --- a/kernel/locking/rwsem-xadd.c +++ b/kernel/locking/rwsem-xadd.c @@ -240,6 +240,30 @@ static inline bool rwsem_try_write_lock(long count, struct rw_semaphore *sem, #ifdef CONFIG_RWSEM_SPIN_ON_OWNER /* + * Try to acquire read lock before the reader is put on wait queue. + * Lock acquisition isn't allowed if the rwsem is locked or a writer handoff + * is ongoing. + */ +static inline bool rwsem_try_read_lock_unqueued(struct rw_semaphore *sem) +{ + long count = atomic_long_read(&sem->count); + + if (RWSEM_COUNT_WLOCKED_OR_HANDOFF(count)) + return false; + + count = atomic_long_fetch_add_acquire(RWSEM_READER_BIAS, &sem->count); + if (!RWSEM_COUNT_WLOCKED_OR_HANDOFF(count)) { + rwsem_set_reader_owned(sem); + lockevent_inc(rwsem_opt_rlock); + return true; + } + + /* Back out the change */ + atomic_long_add(-RWSEM_READER_BIAS, &sem->count); + return false; +} + +/* * Try to acquire write lock before the writer has been put on wait queue. */ static inline bool rwsem_try_write_lock_unqueued(struct rw_semaphore *sem, @@ -291,8 +315,10 @@ static inline bool rwsem_can_spin_on_owner(struct rw_semaphore *sem) BUILD_BUG_ON(!rwsem_has_anonymous_owner(RWSEM_OWNER_UNKNOWN)); - if (need_resched()) + if (need_resched()) { + lockevent_inc(rwsem_opt_fail); return false; + } rcu_read_lock(); owner = rwsem_get_owner(sem); @@ -301,6 +327,7 @@ static inline bool rwsem_can_spin_on_owner(struct rw_semaphore *sem) owner_on_cpu(owner, sem); } rcu_read_unlock(); + lockevent_cond_inc(rwsem_opt_fail, !ret); return ret; } @@ -371,9 +398,6 @@ static bool rwsem_optimistic_spin(struct rw_semaphore *sem, const long wlock) preempt_disable(); /* sem->wait_lock should not be held when doing optimistic spinning */ - if (!rwsem_can_spin_on_owner(sem)) - goto done; - if (!osq_lock(&sem->osq)) goto done; @@ -388,10 +412,11 @@ static bool rwsem_optimistic_spin(struct rw_semaphore *sem, const long wlock) /* * Try to acquire the lock */ - if (rwsem_try_write_lock_unqueued(sem, wlock)) { - taken = true; + taken = wlock ? rwsem_try_write_lock_unqueued(sem, wlock) + : rwsem_try_read_lock_unqueued(sem); + + if (taken) break; - } /* * When there's no owner, we might have preempted between the @@ -418,7 +443,13 @@ static bool rwsem_optimistic_spin(struct rw_semaphore *sem, const long wlock) return taken; } #else -static bool rwsem_optimistic_spin(struct rw_semaphore *sem, const long wlock) +static inline bool rwsem_can_spin_on_owner(struct rw_semaphore *sem) +{ + return false; +} + +static inline bool rwsem_optimistic_spin(struct rw_semaphore *sem, + const long wlock) { return false; } @@ -444,6 +475,33 @@ static bool rwsem_optimistic_spin(struct rw_semaphore *sem, const long wlock) struct rwsem_waiter waiter; DEFINE_WAKE_Q(wake_q); + if (!rwsem_can_spin_on_owner(sem)) + goto queue; + + /* + * Undo read bias from down_read() and do optimistic spinning. + */ + atomic_long_add(-RWSEM_READER_BIAS, &sem->count); + adjustment = 0; + if (rwsem_optimistic_spin(sem, 0)) { + unsigned long flags; + + /* + * Opportunistically wake up other readers in the wait queue. + * It has another chance of wakeup at unlock time. + */ + if ((atomic_long_read(&sem->count) & RWSEM_FLAG_WAITERS) && + raw_spin_trylock_irqsave(&sem->wait_lock, flags)) { + if (!list_empty(&sem->wait_list)) + __rwsem_mark_wake(sem, RWSEM_WAKE_READ_OWNED, + &wake_q); + raw_spin_unlock_irqrestore(&sem->wait_lock, flags); + wake_up_q(&wake_q); + } + return sem; + } + +queue: waiter.task = current; waiter.type = RWSEM_WAITING_FOR_READ; waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT; @@ -456,7 +514,8 @@ static bool rwsem_optimistic_spin(struct rw_semaphore *sem, const long wlock) * immediately as its RWSEM_READER_BIAS has already been * set in the count. */ - if (!(atomic_long_read(&sem->count) & RWSEM_WRITER_MASK)) { + if (adjustment && + !(atomic_long_read(&sem->count) & RWSEM_WRITER_MASK)) { raw_spin_unlock_irq(&sem->wait_lock); rwsem_set_reader_owned(sem); lockevent_inc(rwsem_rlock_fast); @@ -543,7 +602,8 @@ static bool rwsem_optimistic_spin(struct rw_semaphore *sem, const long wlock) const long wlock = RWSEM_WRITER_LOCKED; /* do optimistic spinning and steal lock if possible */ - if (rwsem_optimistic_spin(sem, wlock)) + if (rwsem_can_spin_on_owner(sem) && + rwsem_optimistic_spin(sem, wlock)) return sem; /* diff --git a/kernel/locking/rwsem-xadd.h b/kernel/locking/rwsem-xadd.h index 1de6f1e..eb4ef36 100644 --- a/kernel/locking/rwsem-xadd.h +++ b/kernel/locking/rwsem-xadd.h @@ -109,9 +109,12 @@ RWSEM_FLAG_HANDOFF) #define RWSEM_COUNT_LOCKED(c) ((c) & RWSEM_LOCK_MASK) +#define RWSEM_COUNT_WLOCKED(c) ((c) & RWSEM_WRITER_MASK) #define RWSEM_COUNT_HANDOFF(c) ((c) & RWSEM_FLAG_HANDOFF) #define RWSEM_COUNT_LOCKED_OR_HANDOFF(c) \ ((c) & (RWSEM_LOCK_MASK|RWSEM_FLAG_HANDOFF)) +#define RWSEM_COUNT_WLOCKED_OR_HANDOFF(c) \ + ((c) & (RWSEM_WRITER_MASK | RWSEM_FLAG_HANDOFF)) /* * Task structure pointer compression (64-bit only): -- 1.8.3.1