Re: [PATCH 6/6] loop: don't add worker into idle list

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Mon, Jul 05, 2021 at 06:26:07PM +0800, Ming Lei wrote:
> We can retrieve any workers via xarray, so not add it into idle list.
> Meantime reduce .lo_work_lock coverage, especially we don't need that
> in IO path except for adding/deleting worker into xarray.
> 
> Also simplify code a bit.
> 
> Cc: Michal Koutný <mkoutny@xxxxxxxx>
> Cc: Dan Schatzberg <schatzberg.dan@xxxxxxxxx>
> Signed-off-by: Ming Lei <ming.lei@xxxxxxxxxx>
> ---
>  drivers/block/loop.c | 138 ++++++++++++++++++++++++++-----------------
>  1 file changed, 84 insertions(+), 54 deletions(-)
> 
> diff --git a/drivers/block/loop.c b/drivers/block/loop.c
> index 6e9725521330..146eaa03629b 100644
> --- a/drivers/block/loop.c
> +++ b/drivers/block/loop.c
> @@ -920,10 +920,11 @@ static void loop_config_discard(struct loop_device *lo)
>  struct loop_worker {
>  	struct work_struct work;
>  	struct list_head cmd_list;
> -	struct list_head idle_list;
>  	struct loop_device *lo;
>  	struct cgroup_subsys_state *blkcg_css;
>  	unsigned long last_ran_at;
> +	spinlock_t lock;
> +	refcount_t refcnt;
>  };
>  
>  static void loop_workfn(struct work_struct *work);
> @@ -941,13 +942,56 @@ static inline int queue_on_root_worker(struct cgroup_subsys_state *css)
>  }
>  #endif
>  
> +static struct loop_worker *loop_alloc_or_get_worker(struct loop_device *lo,
> +		struct cgroup_subsys_state *blkcg_css)
> +{
> +	gfp_t gfp = GFP_NOWAIT | __GFP_NOWARN;
> +	struct loop_worker *worker = kzalloc(sizeof(*worker), gfp);
> +	struct loop_worker *worker_old;
> +
> +	if (!worker)
> +		return NULL;
> +
> +	worker->blkcg_css = blkcg_css;
> +	INIT_WORK(&worker->work, loop_workfn);
> +	INIT_LIST_HEAD(&worker->cmd_list);
> +	worker->lo = lo;
> +	spin_lock_init(&worker->lock);
> +	refcount_set(&worker->refcnt, 2);	/* INIT + INC */

Can you elaborate on the reference counting a bit more here? I notice
you have a reference per loop_cmd, but there are a couple extra
refcounts that aren't obvious to me. Having a comment describing it
might be useful.

> +
> +	spin_lock(&lo->lo_work_lock);
> +	/* maybe someone is storing a new worker */
> +	worker_old = xa_load(&lo->workers, blkcg_css->id);
> +	if (!worker_old || !refcount_inc_not_zero(&worker_old->refcnt)) {

I gather you increment the refcount here under lo_work_lock to ensure
the worker isn't destroyed before queueing the cmd on it.

> +		if (xa_err(xa_store(&lo->workers, blkcg_css->id, worker, gfp))) {
> +			kfree(worker);
> +			worker = NULL;
> +		} else {
> +			css_get(worker->blkcg_css);
> +		}
> +	} else {
> +		kfree(worker);
> +		worker = worker_old;
> +	}
> +	spin_unlock(&lo->lo_work_lock);
> +
> +	return worker;
> +}
> +
> +static void loop_release_worker(struct loop_worker *worker)
> +{
> +	xa_erase(&worker->lo->workers, worker->blkcg_css->id);
> +	css_put(worker->blkcg_css);
> +	kfree(worker);
> +}
> +
>  static void loop_queue_work(struct loop_device *lo, struct loop_cmd *cmd)
>  {
>  	struct loop_worker *worker = NULL;
>  	struct work_struct *work;
>  	struct list_head *cmd_list;
>  	struct cgroup_subsys_state *blkcg_css = NULL;
> -	gfp_t gfp = GFP_NOWAIT | __GFP_NOWARN;
> +	spinlock_t	*lock;
>  #ifdef CONFIG_BLK_CGROUP
>  	struct request *rq = blk_mq_rq_from_pdu(cmd);
>  
> @@ -955,54 +999,38 @@ static void loop_queue_work(struct loop_device *lo, struct loop_cmd *cmd)
>  		blkcg_css = &bio_blkcg(rq->bio)->css;
>  #endif
>  
> -	spin_lock(&lo->lo_work_lock);
> -
> -	if (queue_on_root_worker(blkcg_css))
> -		goto queue_work;
> -
> -	/* css->id is unique in each cgroup subsystem */
> -	worker = xa_load(&lo->workers, blkcg_css->id);
> -	if (worker)
> -		goto queue_work;
> -
> -	worker = kzalloc(sizeof(*worker), gfp);
> -	/*
> -	 * In the event we cannot allocate a worker, just queue on the
> -	 * rootcg worker and issue the I/O as the rootcg
> -	 */
> -	if (!worker)
> -		goto queue_work;
> +	if (!queue_on_root_worker(blkcg_css)) {
> +		int ret = 0;
>  
> -	worker->blkcg_css = blkcg_css;
> -	css_get(worker->blkcg_css);
> -	INIT_WORK(&worker->work, loop_workfn);
> -	INIT_LIST_HEAD(&worker->cmd_list);
> -	INIT_LIST_HEAD(&worker->idle_list);
> -	worker->lo = lo;
> +		rcu_read_lock();
> +		/* css->id is unique in each cgroup subsystem */
> +		worker = xa_load(&lo->workers, blkcg_css->id);
> +		if (worker)
> +			ret = refcount_inc_not_zero(&worker->refcnt);
> +		rcu_read_unlock();

I don't follow the refcount decrement here. Also, what's the purpose
of the rcu critical section here?

>  
> -	if (xa_err(xa_store(&lo->workers, blkcg_css->id, worker, gfp))) {
> -		kfree(worker);
> -		worker = NULL;
> +		if (!worker || !ret)
> +			worker = loop_alloc_or_get_worker(lo, blkcg_css);
> +		/*
> +		 * In the event we cannot allocate a worker, just queue on the
> +		 * rootcg worker and issue the I/O as the rootcg
> +		 */
>  	}
>  
> -queue_work:
>  	if (worker) {
> -		/*
> -		 * We need to remove from the idle list here while
> -		 * holding the lock so that the idle timer doesn't
> -		 * free the worker
> -		 */
> -		if (!list_empty(&worker->idle_list))
> -			list_del_init(&worker->idle_list);
>  		work = &worker->work;
>  		cmd_list = &worker->cmd_list;
> +		lock = &worker->lock;
>  	} else {
>  		work = &lo->rootcg_work;
>  		cmd_list = &lo->rootcg_cmd_list;
> +		lock = &lo->lo_work_lock;

Is lo_work_lock special here or is it just because the root "worker"
lacks a lock itself? I wonder if a separate spinlock is more clear.

>  	}
> +
> +	spin_lock(lock);
>  	list_add_tail(&cmd->list_entry, cmd_list);
> +	spin_unlock(lock);
>  	queue_work(lo->workqueue, work);
> -	spin_unlock(&lo->lo_work_lock);
>  }
>  
>  static void loop_update_rotational(struct loop_device *lo)
> @@ -1131,20 +1159,18 @@ static void loop_set_timer(struct loop_device *lo)
>  
>  static void __loop_free_idle_workers(struct loop_device *lo, bool force)
>  {
> -	struct loop_worker *pos, *worker;
> +	struct loop_worker *worker;
> +	unsigned long id;
>  
>  	spin_lock(&lo->lo_work_lock);
> -	list_for_each_entry_safe(worker, pos, &lo->idle_worker_list,
> -				idle_list) {
> +	xa_for_each(&lo->workers, id, worker) {
>  		if (!force && time_is_after_jiffies(worker->last_ran_at +
>  						LOOP_IDLE_WORKER_TIMEOUT))
>  			break;
> -		list_del(&worker->idle_list);
> -		xa_erase(&lo->workers, worker->blkcg_css->id);
> -		css_put(worker->blkcg_css);
> -		kfree(worker);
> +		if (refcount_dec_and_test(&worker->refcnt))
> +			loop_release_worker(worker);

This one is puzzling to me. Can't you hit this refcount decrement
superfluously each time the loop timer fires?

>  	}
> -	if (!list_empty(&lo->idle_worker_list))
> +	if (!xa_empty(&lo->workers))
>  		loop_set_timer(lo);
>  	spin_unlock(&lo->lo_work_lock);
>  }
> @@ -2148,27 +2174,29 @@ static void loop_handle_cmd(struct loop_cmd *cmd)
>  }
>  
>  static void loop_process_work(struct loop_worker *worker,
> -			struct list_head *cmd_list, struct loop_device *lo)
> +			struct list_head *cmd_list, spinlock_t *lock)
>  {
>  	int orig_flags = current->flags;
>  	struct loop_cmd *cmd;
>  	LIST_HEAD(list);
> +	int cnt = 0;
>  
>  	current->flags |= PF_LOCAL_THROTTLE | PF_MEMALLOC_NOIO;
>  
> -	spin_lock(&lo->lo_work_lock);
> +	spin_lock(lock);
>   again:
>  	list_splice_init(cmd_list, &list);
> -	spin_unlock(&lo->lo_work_lock);
> +	spin_unlock(lock);
>  
>  	while (!list_empty(&list)) {
>  		cmd = list_first_entry(&list, struct loop_cmd, list_entry);
>  		list_del_init(&cmd->list_entry);
>  
>  		loop_handle_cmd(cmd);
> +		cnt++;
>  	}
>  
> -	spin_lock(&lo->lo_work_lock);
> +	spin_lock(lock);
>  	if (!list_empty(cmd_list))
>  		goto again;
>  
> @@ -2179,11 +2207,13 @@ static void loop_process_work(struct loop_worker *worker,
>  	 */
>  	if (worker && !work_pending(&worker->work)) {
>  		worker->last_ran_at = jiffies;
> -		list_add_tail(&worker->idle_list, &lo->idle_worker_list);
> -		loop_set_timer(lo);
> +		loop_set_timer(worker->lo);
>  	}
> -	spin_unlock(&lo->lo_work_lock);
> +	spin_unlock(lock);
>  	current->flags = orig_flags;
> +
> +	if (worker && refcount_sub_and_test(cnt, &worker->refcnt))
> +		loop_release_worker(worker);
>  }
>  
>  static void loop_workfn(struct work_struct *work)
> @@ -2202,7 +2232,7 @@ static void loop_workfn(struct work_struct *work)
>  		old_memcg = set_active_memcg(
>  				mem_cgroup_from_css(memcg_css));
>  
> -	loop_process_work(worker, &worker->cmd_list, worker->lo);
> +	loop_process_work(worker, &worker->cmd_list, &worker->lock);
>  
>  	kthread_associate_blkcg(NULL);
>  	if (memcg_css) {
> @@ -2215,7 +2245,7 @@ static void loop_rootcg_workfn(struct work_struct *work)
>  {
>  	struct loop_device *lo =
>  		container_of(work, struct loop_device, rootcg_work);
> -	loop_process_work(NULL, &lo->rootcg_cmd_list, lo);
> +	loop_process_work(NULL, &lo->rootcg_cmd_list, &lo->lo_work_lock);
>  }
>  
>  static const struct blk_mq_ops loop_mq_ops = {
> -- 
> 2.31.1
> 

The rest of this patch series looks great. Feel free to add my
Acked-by to the others.



[Index of Archives]     [Linux RAID]     [Linux SCSI]     [Linux ATA RAID]     [IDE]     [Linux Wireless]     [Linux Kernel]     [ATH6KL]     [Linux Bluetooth]     [Linux Netdev]     [Kernel Newbies]     [Security]     [Git]     [Netfilter]     [Bugtraq]     [Yosemite News]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Device Mapper]

  Powered by Linux