On Wed, Aug 24, 2022 at 10:00:23AM -0700, Keith Busch wrote: > +static bool __sbq_wake_up(struct sbitmap_queue *sbq, int *nr) > { > struct sbq_wait_state *ws; > - unsigned int wake_batch; > - int wait_cnt; > + int wake_batch, wait_cnt, sub; > > ws = sbq_wake_ptr(sbq); > - if (!ws) > + if (!ws || !(*nr)) > return false; > > - wait_cnt = atomic_dec_return(&ws->wait_cnt); > + wake_batch = READ_ONCE(sbq->wake_batch); > + sub = min3(wake_batch, *nr, atomic_read(&ws->wait_cnt)); > + wait_cnt = atomic_sub_return(sub, &ws->wait_cnt); > + *nr -= sub; > + > /* > * For concurrent callers of this, callers should call this function > * again to wakeup a new batch on a different 'ws'. I'll need to send a new version. The code expects 'wait_cnt' to be 0 in order to wake up waiters, but if two batched completions have different amounts of cleared bits, one thread may see > 0, the other may see < 0, and no one will make progress. I think we may need to do atomic_dec_return() in a loop so that a completer is guaranteed to eventually see '0'.