On Thu, May 24, 2018 at 11:04:40AM +0800, Ming Lei wrote: > When the allocation process is scheduled back and the mapped hw queue is > changed, fake one extra wake up on previous queue for compensating wake up > miss, so other allocations on the previous queue won't be starved. > > This patch fixes one request allocation hang issue, which can be > triggered easily in case of very low nr_request. Thanks, Ming, this looks better. One comment below. > Cc: <stable@xxxxxxxxxxxxxxx> > Cc: Omar Sandoval <osandov@xxxxxx> > Signed-off-by: Ming Lei <ming.lei@xxxxxxxxxx> > --- > V3: > - fix comments as suggested by Jens > - remove the wrapper as suggested by Omar > V2: > - fix build failure > > > block/blk-mq-tag.c | 12 ++++++++++++ > include/linux/sbitmap.h | 7 +++++++ > lib/sbitmap.c | 22 ++++++++++++---------- > 3 files changed, 31 insertions(+), 10 deletions(-) > > diff --git a/block/blk-mq-tag.c b/block/blk-mq-tag.c > index 336dde07b230..a4e58fc28a06 100644 > --- a/block/blk-mq-tag.c > +++ b/block/blk-mq-tag.c > @@ -134,6 +134,8 @@ unsigned int blk_mq_get_tag(struct blk_mq_alloc_data *data) > ws = bt_wait_ptr(bt, data->hctx); > drop_ctx = data->ctx == NULL; > do { > + struct sbitmap_queue *bt_prev; > + > /* > * We're out of tags on this hardware queue, kick any > * pending IO submits before going to sleep waiting for > @@ -159,6 +161,7 @@ unsigned int blk_mq_get_tag(struct blk_mq_alloc_data *data) > if (data->ctx) > blk_mq_put_ctx(data->ctx); > > + bt_prev = bt; > io_schedule(); > > data->ctx = blk_mq_get_ctx(data->q); > @@ -170,6 +173,15 @@ unsigned int blk_mq_get_tag(struct blk_mq_alloc_data *data) > bt = &tags->bitmap_tags; > > finish_wait(&ws->wait, &wait); > + > + /* > + * If destination hw queue is changed, fake wake up on > + * previous queue for compensating the wake up miss, so > + * other allocations on previous queue won't be starved. > + */ > + if (bt != bt_prev) > + sbitmap_queue_wake_up(bt_prev); > + > ws = bt_wait_ptr(bt, data->hctx); > } while (1); > > diff --git a/include/linux/sbitmap.h b/include/linux/sbitmap.h > index 841585f6e5f2..bba9d80191b7 100644 > --- a/include/linux/sbitmap.h > +++ b/include/linux/sbitmap.h > @@ -484,6 +484,13 @@ static inline struct sbq_wait_state *sbq_wait_ptr(struct sbitmap_queue *sbq, > void sbitmap_queue_wake_all(struct sbitmap_queue *sbq); > > /** > + * sbitmap_queue_wake_up() - Wake up some of waiters in one waitqueue > + * on a &struct sbitmap_queue. > + * @sbq: Bitmap queue to wake up. > + */ > +void sbitmap_queue_wake_up(struct sbitmap_queue *sbq); > + > +/** > * sbitmap_queue_show() - Dump &struct sbitmap_queue information to a &struct > * seq_file. > * @sbq: Bitmap queue to show. > diff --git a/lib/sbitmap.c b/lib/sbitmap.c > index e6a9c06ec70c..14e027a33ffa 100644 > --- a/lib/sbitmap.c > +++ b/lib/sbitmap.c > @@ -335,8 +335,9 @@ void sbitmap_queue_resize(struct sbitmap_queue *sbq, unsigned int depth) > if (sbq->wake_batch != wake_batch) { > WRITE_ONCE(sbq->wake_batch, wake_batch); > /* > - * Pairs with the memory barrier in sbq_wake_up() to ensure that > - * the batch size is updated before the wait counts. > + * Pairs with the memory barrier in sbitmap_queue_wake_up() > + * to ensure that the batch size is updated before the wait > + * counts. > */ > smp_mb__before_atomic(); > for (i = 0; i < SBQ_WAIT_QUEUES; i++) > @@ -425,7 +426,7 @@ static struct sbq_wait_state *sbq_wake_ptr(struct sbitmap_queue *sbq) > return NULL; > } > > -static void sbq_wake_up(struct sbitmap_queue *sbq) > +void sbitmap_queue_wake_up(struct sbitmap_queue *sbq) > { > struct sbq_wait_state *ws; > unsigned int wake_batch; Just after this is: /* * Pairs with the memory barrier in set_current_state() to ensure the * proper ordering of clear_bit()/waitqueue_active() in the waker and * test_and_set_bit_lock()/prepare_to_wait()/finish_wait() in the * waiter. See the comment on waitqueue_active(). This is __after_atomic * because we just did clear_bit_unlock() in the caller. */ smp_mb__after_atomic(); But there's no atomic operation before this in blk_mq_get_tag(). I don't think this memory barrier is necessary for the blk_mq_get_tag() case, so let's move it to sbitmap_queue_clear(): diff --git a/lib/sbitmap.c b/lib/sbitmap.c index 14e027a33ffa..537328a98c06 100644 --- a/lib/sbitmap.c +++ b/lib/sbitmap.c @@ -432,15 +432,6 @@ void sbitmap_queue_wake_up(struct sbitmap_queue *sbq) unsigned int wake_batch; int wait_cnt; - /* - * Pairs with the memory barrier in set_current_state() to ensure the - * proper ordering of clear_bit()/waitqueue_active() in the waker and - * test_and_set_bit_lock()/prepare_to_wait()/finish_wait() in the - * waiter. See the comment on waitqueue_active(). This is __after_atomic - * because we just did clear_bit_unlock() in the caller. - */ - smp_mb__after_atomic(); - ws = sbq_wake_ptr(sbq); if (!ws) return; @@ -472,6 +463,13 @@ void sbitmap_queue_clear(struct sbitmap_queue *sbq, unsigned int nr, unsigned int cpu) { sbitmap_clear_bit_unlock(&sbq->sb, nr); + /* + * Pairs with the memory barrier in set_current_state() to ensure the + * proper ordering of clear_bit_unlock()/waitqueue_active() in the waker + * and test_and_set_bit_lock()/prepare_to_wait()/finish_wait() in the + * waiter. See the comment on waitqueue_active(). + */ + smp_mb__after_atomic(); sbitmap_queue_wake_up(sbq); if (likely(!sbq->round_robin && nr < sbq->sb.depth)) *per_cpu_ptr(sbq->alloc_hint, cpu) = nr; That comment also mentioned clear_bit() but we do clear_bit_unlock() now, so we can update that at the same time. > @@ -454,23 +455,24 @@ static void sbq_wake_up(struct sbitmap_queue *sbq) > */ > smp_mb__before_atomic(); > /* > - * If there are concurrent callers to sbq_wake_up(), the last > - * one to decrement the wait count below zero will bump it back > - * up. If there is a concurrent resize, the count reset will > - * either cause the cmpxchg to fail or overwrite after the > - * cmpxchg. > + * If there are concurrent callers to sbitmap_queue_wake_up(), > + * the last one to decrement the wait count below zero will > + * bump it back up. If there is a concurrent resize, the count > + * reset will either cause the cmpxchg to fail or overwrite > + * after the cmpxchg. > */ > atomic_cmpxchg(&ws->wait_cnt, wait_cnt, wait_cnt + wake_batch); > sbq_index_atomic_inc(&sbq->wake_index); > wake_up_nr(&ws->wait, wake_batch); > } > } > +EXPORT_SYMBOL_GPL(sbitmap_queue_wake_up); > > void sbitmap_queue_clear(struct sbitmap_queue *sbq, unsigned int nr, > unsigned int cpu) > { > sbitmap_clear_bit_unlock(&sbq->sb, nr); > - sbq_wake_up(sbq); > + sbitmap_queue_wake_up(sbq); > if (likely(!sbq->round_robin && nr < sbq->sb.depth)) > *per_cpu_ptr(sbq->alloc_hint, cpu) = nr; > } > @@ -482,7 +484,7 @@ void sbitmap_queue_wake_all(struct sbitmap_queue *sbq) > > /* > * Pairs with the memory barrier in set_current_state() like in > - * sbq_wake_up(). > + * sbitmap_queue_wake_up(). > */ > smp_mb(); > wake_index = atomic_read(&sbq->wake_index); > -- > 2.9.5 >