On Tue 15-11-22 17:45:53, Gabriel Krisman Bertazi wrote: > Jan reported the new algorithm as merged might be problematic if the > queue being awaken becomes empty between the waitqueue_active inside > sbq_wake_ptr check and the wake up. If that happens, wake_up_nr will > not wake up any waiter and we loose too many wake ups. In order to > guarantee progress, we need to wake up at least one waiter here, if > there are any. This now requires trying to wake up from every queue. > > Instead of walking through all the queues with sbq_wake_ptr, this call > moves the wake up inside that function. In a previous version of the > patch, I found that updating wake_index several times when walking > through queues had a measurable overhead. This ensures we only update > it once, at the end. > > Fixes: 4f8126bb2308 ("sbitmap: Use single per-bitmap counting to wake up queued tags") > Reported-by: Jan Kara <jack@xxxxxxx> > Signed-off-by: Gabriel Krisman Bertazi <krisman@xxxxxxx> Elegant and looks good to me! Feel free to add: Reviewed-by: Jan Kara <jack@xxxxxxx> Honza > --- > lib/sbitmap.c | 28 ++++++++++++---------------- > 1 file changed, 12 insertions(+), 16 deletions(-) > > diff --git a/lib/sbitmap.c b/lib/sbitmap.c > index bea7984f7987..586deb333237 100644 > --- a/lib/sbitmap.c > +++ b/lib/sbitmap.c > @@ -560,12 +560,12 @@ void sbitmap_queue_min_shallow_depth(struct sbitmap_queue *sbq, > } > EXPORT_SYMBOL_GPL(sbitmap_queue_min_shallow_depth); > > -static struct sbq_wait_state *sbq_wake_ptr(struct sbitmap_queue *sbq) > +static void __sbitmap_queue_wake_up(struct sbitmap_queue *sbq, int nr) > { > int i, wake_index; > > if (!atomic_read(&sbq->ws_active)) > - return NULL; > + return; > > wake_index = atomic_read(&sbq->wake_index); > for (i = 0; i < SBQ_WAIT_QUEUES; i++) { > @@ -579,20 +579,22 @@ static struct sbq_wait_state *sbq_wake_ptr(struct sbitmap_queue *sbq) > */ > wake_index = sbq_index_inc(wake_index); > > - if (waitqueue_active(&ws->wait)) { > - if (wake_index != atomic_read(&sbq->wake_index)) > - atomic_set(&sbq->wake_index, wake_index); > - return ws; > - } > + /* > + * It is sufficient to wake up at least one waiter to > + * guarantee forward progress. > + */ > + if (waitqueue_active(&ws->wait) && > + wake_up_nr(&ws->wait, nr)) > + break; > } > > - return NULL; > + if (wake_index != atomic_read(&sbq->wake_index)) > + atomic_set(&sbq->wake_index, wake_index); > } > > void sbitmap_queue_wake_up(struct sbitmap_queue *sbq, int nr) > { > unsigned int wake_batch = READ_ONCE(sbq->wake_batch); > - struct sbq_wait_state *ws = NULL; > unsigned int wakeups; > > if (!atomic_read(&sbq->ws_active)) > @@ -604,16 +606,10 @@ void sbitmap_queue_wake_up(struct sbitmap_queue *sbq, int nr) > do { > if (atomic_read(&sbq->completion_cnt) - wakeups < wake_batch) > return; > - > - if (!ws) { > - ws = sbq_wake_ptr(sbq); > - if (!ws) > - return; > - } > } while (!atomic_try_cmpxchg(&sbq->wakeup_cnt, > &wakeups, wakeups + wake_batch)); > > - wake_up_nr(&ws->wait, wake_batch); > + __sbitmap_queue_wake_up(sbq, wake_batch); > } > EXPORT_SYMBOL_GPL(sbitmap_queue_wake_up); > > -- > 2.35.3 > -- Jan Kara <jack@xxxxxxxx> SUSE Labs, CR