On Tue, Jul 28, 2020 at 06:10:42PM +0800, Ming Lei wrote: > On Tue, Jul 28, 2020 at 02:43:06AM -0700, Sagi Grimberg wrote: > > > > > > > > > I like the tagset based interface. But the idea of doing a per-hctx > > > > > > > allocation and wait doesn't seem very scalable. > > > > > > > > > > > > > > Paul, do you have any good idea for an interface that waits on > > > > > > > multiple srcu heads? As far as I can tell we could just have a single > > > > > > > global completion and counter, and each call_srcu would just just > > > > > > > decrement it and then the final one would do the > > > > > > > wakeup. It would just > > > > > > > be great to figure out a way to keep the struct rcu_synchronize and > > > > > > > counter on stack to avoid an allocation. > > > > > > > > > > > > > > But if we can't do with an on-stack object I'd much rather just embedd > > > > > > > the rcu_head in the hw_ctx. > > > > > > > > > > > > I think we can do that, please see the following patch which > > > > > > is against Sagi's V5: > > > > > > > > > > I don't think you can send a single rcu_head to multiple > > > > > call_srcu calls. > > > > > > > > OK, then one variant is to put the rcu_head into blk_mq_hw_ctx, and put > > > > rcu_synchronize into blk_mq_tag_set. > > > > > > I can cook up a spin, > > > > Nope.. spoke too soon, the rcu_head needs to be in a context that has > > access to the counter (which is what you called blk_mq_srcu_sync). > > you want to add also a pointer to hctx? that is almost as big as > > rcu_synchronize... > > We can just put rcu_head into hctx, and put the count & completion into > tag_set, and the tagset can be retrieved via hctx, something like the > following patch: A few questions and comments below, hopefully helpful ones. Thanx, Paul > diff --git a/block/blk-mq.c b/block/blk-mq.c > index c3856377b961..129665da4dbd 100644 > --- a/block/blk-mq.c > +++ b/block/blk-mq.c > @@ -27,6 +27,7 @@ > #include <linux/crash_dump.h> > #include <linux/prefetch.h> > #include <linux/blk-crypto.h> > +#include <linux/rcupdate_wait.h> > > #include <trace/events/block.h> > > @@ -209,6 +210,34 @@ void blk_mq_quiesce_queue_nowait(struct request_queue *q) > } > EXPORT_SYMBOL_GPL(blk_mq_quiesce_queue_nowait); > > +static void blk_mq_wakeme_after_rcu(struct rcu_head *head) > +{ > + > + struct blk_mq_srcu_struct *srcu = container_of(head, > + struct blk_mq_srcu_struct, head); > + struct blk_mq_hw_ctx *hctx = (void *)srcu - > + sizeof(struct blk_mq_hw_ctx); > + struct blk_mq_tag_set *set = hctx->queue->tag_set; > + > + if (atomic_dec_and_test(&set->quiesce_count)) > + complete(&set->quiesce_completion); > +} > + > +static void blk_mq_quiesce_blocking_queue_async(struct request_queue *q) > +{ > + struct blk_mq_hw_ctx *hctx; > + unsigned int i; > + > + blk_mq_quiesce_queue_nowait(q); > + > + queue_for_each_hw_ctx(q, hctx, i) { This loop, combined with the second list_for_each_entry() in blk_mq_quiesce_tagset(), needs to have the name number of iterations as the first list_for_each_entry() in blk_mq_quiesce_tagset(). This might be the case, but it is not obvious to me. Then again, I freely admit that I don't know this code. > + WARN_ON_ONCE(!(hctx->flags & BLK_MQ_F_BLOCKING)); > + init_rcu_head(&hctx->srcu->head); > + call_srcu(&hctx->srcu->srcu, &hctx->srcu->head, > + blk_mq_wakeme_after_rcu); So the SRCU readers are specific to a given blk_mq_hw_ctx? This looks OK if so. > + } > +} > + > /** > * blk_mq_quiesce_queue() - wait until all ongoing dispatches have finished > * @q: request queue. > @@ -228,7 +257,7 @@ void blk_mq_quiesce_queue(struct request_queue *q) > > queue_for_each_hw_ctx(q, hctx, i) { > if (hctx->flags & BLK_MQ_F_BLOCKING) > - synchronize_srcu(hctx->srcu); > + synchronize_srcu(&hctx->srcu->srcu); This waits only for an SRCU grace period (that is, for any relevant hctx_unlock() calls), not for the invocation of any callbacks from any prior call_srcu(). Is this what you need here? The ->srcu->srcu looks like it might become confusing, but I don't have any specific suggstions. > else > rcu = true; > } > @@ -700,23 +729,23 @@ void blk_mq_complete_request(struct request *rq) > EXPORT_SYMBOL(blk_mq_complete_request); > > static void hctx_unlock(struct blk_mq_hw_ctx *hctx, int srcu_idx) > - __releases(hctx->srcu) > + __releases(&hctx->srcu->srcu) > { > if (!(hctx->flags & BLK_MQ_F_BLOCKING)) > rcu_read_unlock(); > else > - srcu_read_unlock(hctx->srcu, srcu_idx); > + srcu_read_unlock(&hctx->srcu->srcu, srcu_idx); Oh, either RCU or SRCU. Got it! > } > > static void hctx_lock(struct blk_mq_hw_ctx *hctx, int *srcu_idx) > - __acquires(hctx->srcu) > + __acquires(&hctx->srcu->srcu) > { > if (!(hctx->flags & BLK_MQ_F_BLOCKING)) { > /* shut up gcc false positive */ > *srcu_idx = 0; > rcu_read_lock(); > } else > - *srcu_idx = srcu_read_lock(hctx->srcu); > + *srcu_idx = srcu_read_lock(&hctx->srcu->srcu); > } > > /** > @@ -2599,7 +2628,7 @@ static int blk_mq_hw_ctx_size(struct blk_mq_tag_set *tag_set) > sizeof(struct blk_mq_hw_ctx)); > > if (tag_set->flags & BLK_MQ_F_BLOCKING) > - hw_ctx_size += sizeof(struct srcu_struct); > + hw_ctx_size += sizeof(struct blk_mq_srcu_struct); > > return hw_ctx_size; > } > @@ -2684,7 +2713,7 @@ blk_mq_alloc_hctx(struct request_queue *q, struct blk_mq_tag_set *set, > goto free_bitmap; > > if (hctx->flags & BLK_MQ_F_BLOCKING) > - init_srcu_struct(hctx->srcu); > + init_srcu_struct(&hctx->srcu->srcu); > blk_mq_hctx_kobj_init(hctx); > > return hctx; > @@ -2880,6 +2909,43 @@ static void queue_set_hctx_shared(struct request_queue *q, bool shared) > } > } > > +void blk_mq_quiesce_tagset(struct blk_mq_tag_set *set) > +{ > + struct request_queue *q; > + > + mutex_lock(&set->tag_list_lock); > + if (set->flags & BLK_MQ_F_BLOCKING) { > + int count = 0; > + > + list_for_each_entry(q, &set->tag_list, tag_set_list) > + count++; > + > + atomic_set(&set->quiesce_count, count); > + init_completion(&set->quiesce_completion); > + > + list_for_each_entry(q, &set->tag_list, tag_set_list) > + blk_mq_quiesce_blocking_queue_async(q); So Christoph would like the mutex_unlock() up here? > + wait_for_completion(&set->quiesce_completion); > + } else { > + list_for_each_entry(q, &set->tag_list, tag_set_list) > + blk_mq_quiesce_queue_nowait(q); And up here? > + synchronize_rcu(); > + } > + mutex_unlock(&set->tag_list_lock); > +} > +EXPORT_SYMBOL_GPL(blk_mq_quiesce_tagset); > + > +void blk_mq_unquiesce_tagset(struct blk_mq_tag_set *set) > +{ > + struct request_queue *q; > + > + mutex_lock(&set->tag_list_lock); > + list_for_each_entry(q, &set->tag_list, tag_set_list) > + blk_mq_unquiesce_queue(q); > + mutex_unlock(&set->tag_list_lock); > +} > +EXPORT_SYMBOL_GPL(blk_mq_unquiesce_tagset); > + > static void blk_mq_update_tag_set_depth(struct blk_mq_tag_set *set, > bool shared) > { > diff --git a/include/linux/blk-mq.h b/include/linux/blk-mq.h > index 23230c1d031e..9ef7fdb809a7 100644 > --- a/include/linux/blk-mq.h > +++ b/include/linux/blk-mq.h > @@ -9,6 +9,11 @@ > struct blk_mq_tags; > struct blk_flush_queue; > > +struct blk_mq_srcu_struct { > + struct srcu_struct srcu; > + struct rcu_head head; > +}; > + > /** > * struct blk_mq_hw_ctx - State for a hardware queue facing the hardware > * block device > @@ -175,7 +180,7 @@ struct blk_mq_hw_ctx { > * blocking (BLK_MQ_F_BLOCKING). Must be the last member - see also > * blk_mq_hw_ctx_size(). > */ > - struct srcu_struct srcu[]; > + struct blk_mq_srcu_struct srcu[]; > }; > > /** > @@ -254,6 +259,9 @@ struct blk_mq_tag_set { > > struct mutex tag_list_lock; > struct list_head tag_list; > + > + struct completion quiesce_completion; > + atomic_t quiesce_count; > }; > > /** > @@ -532,6 +540,8 @@ int blk_mq_map_queues(struct blk_mq_queue_map *qmap); > void blk_mq_update_nr_hw_queues(struct blk_mq_tag_set *set, int nr_hw_queues); > > void blk_mq_quiesce_queue_nowait(struct request_queue *q); > +void blk_mq_quiesce_tagset(struct blk_mq_tag_set *set); > +void blk_mq_unquiesce_tagset(struct blk_mq_tag_set *set); > > unsigned int blk_mq_rq_cpu(struct request *rq); > > > -- > Ming >