On Thu, 23 Feb 2023 13:26:45 +0100 Peter Zijlstra <peterz@xxxxxxxxxxxxx> > +static void rwsem_writer_wake(struct rw_semaphore *sem, > + struct rwsem_waiter *waiter, > + struct wake_q_head *wake_q) > +{ > + struct rwsem_waiter *first = rwsem_first_waiter(sem); > + long count, new; > + > + lockdep_assert_held(&sem->wait_lock); > + > + count = atomic_long_read(&sem->count); > + do { > + bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF); > + > + if (has_handoff) { > + /* > + * Honor handoff bit and yield only when the first > + * waiter is the one that set it. Otherwisee, we > + * still try to acquire the rwsem. > + */ > + if (first->handoff_set && (waiter != first)) > + return; Given that HANDOFF disables all spinning and stealing in 2/6, what sense made by still trying to acquire the rwsem? The answer is perhaps it is called at wake time. > + } > + > + new = count; > + > + if (count & RWSEM_LOCK_MASK) { If it is only called at wake time, the chance for a transient RWSEM_READER_BIAS to ruin the wakeup is not zero. > + /* > + * A waiter (first or not) can set the handoff bit > + * if it is an RT task or wait in the wait queue > + * for too long. > + */ > + if (has_handoff || (!rt_task(waiter->task) && > + !time_after(jiffies, waiter->timeout))) > + return; > + > + new |= RWSEM_FLAG_HANDOFF; > + } else { > + new |= RWSEM_WRITER_LOCKED; > + new &= ~RWSEM_FLAG_HANDOFF; > + > + if (list_is_singular(&sem->wait_list)) > + new &= ~RWSEM_FLAG_WAITERS; > + } > + } while (!atomic_long_try_cmpxchg_acquire(&sem->count, &count, new)); > + > + /* > + * We have either acquired the lock with handoff bit cleared or set > + * the handoff bit. Only the first waiter can have its handoff_set > + * set here to enable optimistic spinning in slowpath loop. > + */ > + if (new & RWSEM_FLAG_HANDOFF) { > + first->handoff_set = true; > + lockevent_inc(rwsem_wlock_handoff); > + return; > + } > + > + /* > + * Have rwsem_writer_wake() fully imply rwsem_del_waiter() on > + * success. > + */ > + list_del(&waiter->list); > + rwsem_set_owner(sem); > + rwsem_waiter_wake(waiter, wake_q); > +} > + > /* > * handle the lock release when processes blocked on it that can now run > * - if we come here from up_xxxx(), then the RWSEM_FLAG_WAITERS bit must > @@ -424,23 +518,12 @@ static void rwsem_mark_wake(struct rw_se > */ > waiter = rwsem_first_waiter(sem); > > - if (waiter->type != RWSEM_WAITING_FOR_WRITE) > - goto wake_readers; > - > - if (wake_type == RWSEM_WAKE_ANY) { > - /* > - * Mark writer at the front of the queue for wakeup. > - * Until the task is actually later awoken later by > - * the caller, other writers are able to steal it. > - * Readers, on the other hand, will block as they > - * will notice the queued writer. > - */ > - wake_q_add(wake_q, waiter->task); > - lockevent_inc(rwsem_wake_writer); > + if (waiter->type == RWSEM_WAITING_FOR_WRITE) { > + if (wake_type == RWSEM_WAKE_ANY) > + rwsem_writer_wake(sem, waiter, wake_q); > + return;