On Tue, Jul 28, 2020 at 09:18:59AM +0200, Christoph Hellwig wrote: > I like the tagset based interface. But the idea of doing a per-hctx > allocation and wait doesn't seem very scalable. > > Paul, do you have any good idea for an interface that waits on > multiple srcu heads? As far as I can tell we could just have a single > global completion and counter, and each call_srcu would just just > decrement it and then the final one would do the wakeup. It would just > be great to figure out a way to keep the struct rcu_synchronize and > counter on stack to avoid an allocation. > > But if we can't do with an on-stack object I'd much rather just embedd > the rcu_head in the hw_ctx. I think we can do that, please see the following patch which is against Sagi's V5: diff --git a/block/blk-mq.c b/block/blk-mq.c index c3856377b961..fc46e77460f1 100644 --- a/block/blk-mq.c +++ b/block/blk-mq.c @@ -27,6 +27,7 @@ #include <linux/crash_dump.h> #include <linux/prefetch.h> #include <linux/blk-crypto.h> +#include <linux/rcupdate_wait.h> #include <trace/events/block.h> @@ -209,6 +210,50 @@ void blk_mq_quiesce_queue_nowait(struct request_queue *q) } EXPORT_SYMBOL_GPL(blk_mq_quiesce_queue_nowait); +struct blk_mq_srcu_sync { + struct rcu_synchronize srcu_sync; + atomic_t count; +}; + +static void blk_mq_srcu_sync_init(struct blk_mq_srcu_sync *sync, int count) +{ + init_completion(&sync->srcu_sync.completion); + init_rcu_head(&sync->srcu_sync.head); + + atomic_set(&sync->count, count); +} + +static void blk_mq_srcu_sync_wait(struct blk_mq_srcu_sync *sync) +{ + wait_for_completion(&sync->srcu_sync.completion); + destroy_rcu_head_on_stack(&sync->srcu_sync.head); +} + +static void blk_mq_wakeme_after_rcu(struct rcu_head *head) +{ + struct blk_mq_srcu_sync *sync; + + sync = container_of(head, struct blk_mq_srcu_sync, srcu_sync.head); + + if (atomic_dec_and_test(&sync->count)) + complete(&sync->srcu_sync.completion); +} + +static void blk_mq_quiesce_blocking_queue_async(struct request_queue *q, + struct blk_mq_srcu_sync *sync) +{ + struct blk_mq_hw_ctx *hctx; + unsigned int i; + + blk_mq_quiesce_queue_nowait(q); + + queue_for_each_hw_ctx(q, hctx, i) { + WARN_ON_ONCE(!(hctx->flags & BLK_MQ_F_BLOCKING)); + call_srcu(hctx->srcu, &sync->srcu_sync.head, + blk_mq_wakeme_after_rcu); + } +} + /** * blk_mq_quiesce_queue() - wait until all ongoing dispatches have finished * @q: request queue. @@ -2880,6 +2925,45 @@ static void queue_set_hctx_shared(struct request_queue *q, bool shared) } } +void blk_mq_quiesce_tagset(struct blk_mq_tag_set *set) +{ + struct request_queue *q; + + mutex_lock(&set->tag_list_lock); + if (set->flags & BLK_MQ_F_BLOCKING) { + struct blk_mq_srcu_sync sync; + int count = 0; + + list_for_each_entry(q, &set->tag_list, tag_set_list) + count++; + + blk_mq_srcu_sync_init(&sync, count); + + list_for_each_entry(q, &set->tag_list, tag_set_list) + blk_mq_quiesce_blocking_queue_async(q, &sync); + + blk_mq_srcu_sync_wait(&sync); + + } else { + list_for_each_entry(q, &set->tag_list, tag_set_list) + blk_mq_quiesce_queue_nowait(q); + synchronize_rcu(); + } + mutex_unlock(&set->tag_list_lock); +} +EXPORT_SYMBOL_GPL(blk_mq_quiesce_tagset); + +void blk_mq_unquiesce_tagset(struct blk_mq_tag_set *set) +{ + struct request_queue *q; + + mutex_lock(&set->tag_list_lock); + list_for_each_entry(q, &set->tag_list, tag_set_list) + blk_mq_unquiesce_queue(q); + mutex_unlock(&set->tag_list_lock); +} +EXPORT_SYMBOL_GPL(blk_mq_unquiesce_tagset); + static void blk_mq_update_tag_set_depth(struct blk_mq_tag_set *set, bool shared) { diff --git a/include/linux/blk-mq.h b/include/linux/blk-mq.h index 23230c1d031e..d5e0974a1dcc 100644 --- a/include/linux/blk-mq.h +++ b/include/linux/blk-mq.h @@ -532,6 +532,8 @@ int blk_mq_map_queues(struct blk_mq_queue_map *qmap); void blk_mq_update_nr_hw_queues(struct blk_mq_tag_set *set, int nr_hw_queues); void blk_mq_quiesce_queue_nowait(struct request_queue *q); +void blk_mq_quiesce_tagset(struct blk_mq_tag_set *set); +void blk_mq_unquiesce_tagset(struct blk_mq_tag_set *set); unsigned int blk_mq_rq_cpu(struct request *rq); -- Ming