On Tue 06-09-22 15:27:51, Keith Busch wrote: > On Wed, Aug 03, 2022 at 08:15:04PM +0800, Yu Kuai wrote: > > wait_cnt = atomic_dec_return(&ws->wait_cnt); > > - if (wait_cnt <= 0) { > > - int ret; > > + /* > > + * For concurrent callers of this, callers should call this function > > + * again to wakeup a new batch on a different 'ws'. > > + */ > > + if (wait_cnt < 0 || !waitqueue_active(&ws->wait)) > > + return true; > > If wait_cnt is '0', but the waitqueue_active happens to be false due to racing > with add_wait_queue(), this returns true so the caller will retry. Well, note that sbq_wake_ptr() called to obtain 'ws' did waitqueue_active() check. So !waitqueue_active() should really happen only if waiter was woken up by someone else or so. Not that it would matter much but I wanted to point it out. > The next atomic_dec will set the current waitstate wait_cnt < 0, which > also forces an early return true. When does the wake up happen, or > wait_cnt and wait_index get updated in that case? I guess your concern could be rephrased as: Who's going to ever set ws->wait_cnt to value > 0 if we ever exit with wait_cnt == 0 due to !waitqueue_active() condition? And that is a good question and I think that's a bug in this patch. I think we need something like: ... /* * For concurrent callers of this, callers should call this function * again to wakeup a new batch on a different 'ws'. */ if (wait_cnt < 0) return true; /* * If we decremented queue without waiters, retry to avoid lost * wakeups. */ if (wait_cnt > 0) return !waitqueue_active(&ws->wait); /* * When wait_cnt == 0, we have to be particularly careful as we are * responsible to reset wait_cnt regardless whether we've actually * woken up anybody. But in case we didn't wakeup anybody, we still * need to retry. */ ret = !waitqueue_active(&ws->wait); wake_batch = READ_ONCE(sbq->wake_batch); /* * Wake up first in case that concurrent callers decrease wait_cnt * while waitqueue is empty. */ wake_up_nr(&ws->wait, wake_batch); ... return ret; Does this fix your concern Keith? Honza > > > > - wake_batch = READ_ONCE(sbq->wake_batch); > > + if (wait_cnt > 0) > > + return false; > > > > - /* > > - * Pairs with the memory barrier in sbitmap_queue_resize() to > > - * ensure that we see the batch size update before the wait > > - * count is reset. > > - */ > > - smp_mb__before_atomic(); > > + wake_batch = READ_ONCE(sbq->wake_batch); > > > > - /* > > - * For concurrent callers of this, the one that failed the > > - * atomic_cmpxhcg() race should call this function again > > - * to wakeup a new batch on a different 'ws'. > > - */ > > - ret = atomic_cmpxchg(&ws->wait_cnt, wait_cnt, wake_batch); > > - if (ret == wait_cnt) { > > - sbq_index_atomic_inc(&sbq->wake_index); > > - wake_up_nr(&ws->wait, wake_batch); > > - return false; > > - } > > + /* > > + * Wake up first in case that concurrent callers decrease wait_cnt > > + * while waitqueue is empty. > > + */ > > + wake_up_nr(&ws->wait, wake_batch); > > > > - return true; > > - } > > + /* > > + * Pairs with the memory barrier in sbitmap_queue_resize() to > > + * ensure that we see the batch size update before the wait > > + * count is reset. > > + * > > + * Also pairs with the implicit barrier between decrementing wait_cnt > > + * and checking for waitqueue_active() to make sure waitqueue_active() > > + * sees result of the wakeup if atomic_dec_return() has seen the result > > + * of atomic_set(). > > + */ > > + smp_mb__before_atomic(); > > + > > + /* > > + * Increase wake_index before updating wait_cnt, otherwise concurrent > > + * callers can see valid wait_cnt in old waitqueue, which can cause > > + * invalid wakeup on the old waitqueue. > > + */ > > + sbq_index_atomic_inc(&sbq->wake_index); > > + atomic_set(&ws->wait_cnt, wake_batch); > > > > return false; > > } > > -- > > 2.31.1 > > -- Jan Kara <jack@xxxxxxxx> SUSE Labs, CR