On Tue, Jul 28, 2020 at 02:43:06AM -0700, Sagi Grimberg wrote: > > > > > > > I like the tagset based interface. But the idea of doing a per-hctx > > > > > > allocation and wait doesn't seem very scalable. > > > > > > > > > > > > Paul, do you have any good idea for an interface that waits on > > > > > > multiple srcu heads? As far as I can tell we could just have a single > > > > > > global completion and counter, and each call_srcu would just just > > > > > > decrement it and then the final one would do the > > > > > > wakeup. It would just > > > > > > be great to figure out a way to keep the struct rcu_synchronize and > > > > > > counter on stack to avoid an allocation. > > > > > > > > > > > > But if we can't do with an on-stack object I'd much rather just embedd > > > > > > the rcu_head in the hw_ctx. > > > > > > > > > > I think we can do that, please see the following patch which > > > > > is against Sagi's V5: > > > > > > > > I don't think you can send a single rcu_head to multiple > > > > call_srcu calls. > > > > > > OK, then one variant is to put the rcu_head into blk_mq_hw_ctx, and put > > > rcu_synchronize into blk_mq_tag_set. > > > > I can cook up a spin, > > Nope.. spoke too soon, the rcu_head needs to be in a context that has > access to the counter (which is what you called blk_mq_srcu_sync). > you want to add also a pointer to hctx? that is almost as big as > rcu_synchronize... We can just put rcu_head into hctx, and put the count & completion into tag_set, and the tagset can be retrieved via hctx, something like the following patch: diff --git a/block/blk-mq.c b/block/blk-mq.c index c3856377b961..129665da4dbd 100644 --- a/block/blk-mq.c +++ b/block/blk-mq.c @@ -27,6 +27,7 @@ #include <linux/crash_dump.h> #include <linux/prefetch.h> #include <linux/blk-crypto.h> +#include <linux/rcupdate_wait.h> #include <trace/events/block.h> @@ -209,6 +210,34 @@ void blk_mq_quiesce_queue_nowait(struct request_queue *q) } EXPORT_SYMBOL_GPL(blk_mq_quiesce_queue_nowait); +static void blk_mq_wakeme_after_rcu(struct rcu_head *head) +{ + + struct blk_mq_srcu_struct *srcu = container_of(head, + struct blk_mq_srcu_struct, head); + struct blk_mq_hw_ctx *hctx = (void *)srcu - + sizeof(struct blk_mq_hw_ctx); + struct blk_mq_tag_set *set = hctx->queue->tag_set; + + if (atomic_dec_and_test(&set->quiesce_count)) + complete(&set->quiesce_completion); +} + +static void blk_mq_quiesce_blocking_queue_async(struct request_queue *q) +{ + struct blk_mq_hw_ctx *hctx; + unsigned int i; + + blk_mq_quiesce_queue_nowait(q); + + queue_for_each_hw_ctx(q, hctx, i) { + WARN_ON_ONCE(!(hctx->flags & BLK_MQ_F_BLOCKING)); + init_rcu_head(&hctx->srcu->head); + call_srcu(&hctx->srcu->srcu, &hctx->srcu->head, + blk_mq_wakeme_after_rcu); + } +} + /** * blk_mq_quiesce_queue() - wait until all ongoing dispatches have finished * @q: request queue. @@ -228,7 +257,7 @@ void blk_mq_quiesce_queue(struct request_queue *q) queue_for_each_hw_ctx(q, hctx, i) { if (hctx->flags & BLK_MQ_F_BLOCKING) - synchronize_srcu(hctx->srcu); + synchronize_srcu(&hctx->srcu->srcu); else rcu = true; } @@ -700,23 +729,23 @@ void blk_mq_complete_request(struct request *rq) EXPORT_SYMBOL(blk_mq_complete_request); static void hctx_unlock(struct blk_mq_hw_ctx *hctx, int srcu_idx) - __releases(hctx->srcu) + __releases(&hctx->srcu->srcu) { if (!(hctx->flags & BLK_MQ_F_BLOCKING)) rcu_read_unlock(); else - srcu_read_unlock(hctx->srcu, srcu_idx); + srcu_read_unlock(&hctx->srcu->srcu, srcu_idx); } static void hctx_lock(struct blk_mq_hw_ctx *hctx, int *srcu_idx) - __acquires(hctx->srcu) + __acquires(&hctx->srcu->srcu) { if (!(hctx->flags & BLK_MQ_F_BLOCKING)) { /* shut up gcc false positive */ *srcu_idx = 0; rcu_read_lock(); } else - *srcu_idx = srcu_read_lock(hctx->srcu); + *srcu_idx = srcu_read_lock(&hctx->srcu->srcu); } /** @@ -2599,7 +2628,7 @@ static int blk_mq_hw_ctx_size(struct blk_mq_tag_set *tag_set) sizeof(struct blk_mq_hw_ctx)); if (tag_set->flags & BLK_MQ_F_BLOCKING) - hw_ctx_size += sizeof(struct srcu_struct); + hw_ctx_size += sizeof(struct blk_mq_srcu_struct); return hw_ctx_size; } @@ -2684,7 +2713,7 @@ blk_mq_alloc_hctx(struct request_queue *q, struct blk_mq_tag_set *set, goto free_bitmap; if (hctx->flags & BLK_MQ_F_BLOCKING) - init_srcu_struct(hctx->srcu); + init_srcu_struct(&hctx->srcu->srcu); blk_mq_hctx_kobj_init(hctx); return hctx; @@ -2880,6 +2909,43 @@ static void queue_set_hctx_shared(struct request_queue *q, bool shared) } } +void blk_mq_quiesce_tagset(struct blk_mq_tag_set *set) +{ + struct request_queue *q; + + mutex_lock(&set->tag_list_lock); + if (set->flags & BLK_MQ_F_BLOCKING) { + int count = 0; + + list_for_each_entry(q, &set->tag_list, tag_set_list) + count++; + + atomic_set(&set->quiesce_count, count); + init_completion(&set->quiesce_completion); + + list_for_each_entry(q, &set->tag_list, tag_set_list) + blk_mq_quiesce_blocking_queue_async(q); + wait_for_completion(&set->quiesce_completion); + } else { + list_for_each_entry(q, &set->tag_list, tag_set_list) + blk_mq_quiesce_queue_nowait(q); + synchronize_rcu(); + } + mutex_unlock(&set->tag_list_lock); +} +EXPORT_SYMBOL_GPL(blk_mq_quiesce_tagset); + +void blk_mq_unquiesce_tagset(struct blk_mq_tag_set *set) +{ + struct request_queue *q; + + mutex_lock(&set->tag_list_lock); + list_for_each_entry(q, &set->tag_list, tag_set_list) + blk_mq_unquiesce_queue(q); + mutex_unlock(&set->tag_list_lock); +} +EXPORT_SYMBOL_GPL(blk_mq_unquiesce_tagset); + static void blk_mq_update_tag_set_depth(struct blk_mq_tag_set *set, bool shared) { diff --git a/include/linux/blk-mq.h b/include/linux/blk-mq.h index 23230c1d031e..9ef7fdb809a7 100644 --- a/include/linux/blk-mq.h +++ b/include/linux/blk-mq.h @@ -9,6 +9,11 @@ struct blk_mq_tags; struct blk_flush_queue; +struct blk_mq_srcu_struct { + struct srcu_struct srcu; + struct rcu_head head; +}; + /** * struct blk_mq_hw_ctx - State for a hardware queue facing the hardware * block device @@ -175,7 +180,7 @@ struct blk_mq_hw_ctx { * blocking (BLK_MQ_F_BLOCKING). Must be the last member - see also * blk_mq_hw_ctx_size(). */ - struct srcu_struct srcu[]; + struct blk_mq_srcu_struct srcu[]; }; /** @@ -254,6 +259,9 @@ struct blk_mq_tag_set { struct mutex tag_list_lock; struct list_head tag_list; + + struct completion quiesce_completion; + atomic_t quiesce_count; }; /** @@ -532,6 +540,8 @@ int blk_mq_map_queues(struct blk_mq_queue_map *qmap); void blk_mq_update_nr_hw_queues(struct blk_mq_tag_set *set, int nr_hw_queues); void blk_mq_quiesce_queue_nowait(struct request_queue *q); +void blk_mq_quiesce_tagset(struct blk_mq_tag_set *set); +void blk_mq_unquiesce_tagset(struct blk_mq_tag_set *set); unsigned int blk_mq_rq_cpu(struct request *rq); -- Ming