Re: [PATCH] libceph: multiple workspaces for CRUSH computations

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Wed, 2020-08-19 at 11:36 +0200, Ilya Dryomov wrote:
> Replace a global map->crush_workspace (protected by a global mutex)
> with a list of workspaces, up to the number of CPUs + 1.
> 
> This is based on a patch from Robin Geuze <robing@xxxxxxxxxxxx>.
> Robin and his team have observed a 10-20% increase in IOPS on all
> queue depths and lower CPU usage as well on a high-end all-NVMe
> 100GbE cluster.
> 
> Signed-off-by: Ilya Dryomov <idryomov@xxxxxxxxx>
> ---
>  include/linux/ceph/osdmap.h |  14 ++-
>  include/linux/crush/crush.h |   3 +
>  net/ceph/osdmap.c           | 166 ++++++++++++++++++++++++++++++++----
>  3 files changed, 166 insertions(+), 17 deletions(-)
> 
> diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h
> index 3f4498fef6ad..cad9acfbc320 100644
> --- a/include/linux/ceph/osdmap.h
> +++ b/include/linux/ceph/osdmap.h
> @@ -137,6 +137,17 @@ int ceph_oid_aprintf(struct ceph_object_id *oid, gfp_t gfp,
>  		     const char *fmt, ...);
>  void ceph_oid_destroy(struct ceph_object_id *oid);
>  
> +struct workspace_manager {
> +	struct list_head idle_ws;
> +	spinlock_t ws_lock;
> +	/* Number of free workspaces */
> +	int free_ws;
> +	/* Total number of allocated workspaces */
> +	atomic_t total_ws;
> +	/* Waiters for a free workspace */
> +	wait_queue_head_t ws_wait;
> +};
> +
>  struct ceph_pg_mapping {
>  	struct rb_node node;
>  	struct ceph_pg pgid;
> @@ -184,8 +195,7 @@ struct ceph_osdmap {
>  	 * the list of osds that store+replicate them. */
>  	struct crush_map *crush;
>  
> -	struct mutex crush_workspace_mutex;
> -	void *crush_workspace;
> +	struct workspace_manager crush_wsm;
>  };
>  
>  static inline bool ceph_osd_exists(struct ceph_osdmap *map, int osd)
> diff --git a/include/linux/crush/crush.h b/include/linux/crush/crush.h
> index 2f811baf78d2..30dba392b730 100644
> --- a/include/linux/crush/crush.h
> +++ b/include/linux/crush/crush.h
> @@ -346,6 +346,9 @@ struct crush_work_bucket {
>  
>  struct crush_work {
>  	struct crush_work_bucket **work; /* Per-bucket working store */
> +#ifdef __KERNEL__
> +	struct list_head item;
> +#endif
>  };
>  
>  #ifdef __KERNEL__
> diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
> index 96c25f5e064a..fa08c15be0c0 100644
> --- a/net/ceph/osdmap.c
> +++ b/net/ceph/osdmap.c
> @@ -964,6 +964,143 @@ static int decode_pool_names(void **p, void *end, struct ceph_osdmap *map)
>  	return -EINVAL;
>  }
>  
> +/*
> + * CRUSH workspaces
> + *
> + * workspace_manager framework borrowed from fs/btrfs/compression.c.
> + * Two simplifications: there is only one type of workspace and there
> + * is always at least one workspace.
> + */
> +static struct crush_work *alloc_workspace(const struct crush_map *c)
> +{
> +	struct crush_work *work;
> +	size_t work_size;
> +
> +	WARN_ON(!c->working_size);
> +	work_size = crush_work_size(c, CEPH_PG_MAX_SIZE);
> +	dout("%s work_size %zu bytes\n", __func__, work_size);
> +
> +	work = ceph_kvmalloc(work_size, GFP_NOIO);
> +	if (!work)
> +		return NULL;
> +

In general, how big are these allocations? They're all uniform so you
could make a dedicated slabcache for this. Granted you'll only have one
a max of per cpu, but some boxes have a lot of CPUs these days.

> +	INIT_LIST_HEAD(&work->item);
> +	crush_init_workspace(c, work);
> +	return work;
> +}
> +
> +static void free_workspace(struct crush_work *work)
> +{
> +	WARN_ON(!list_empty(&work->item));
> +	kvfree(work);
> +}
> +
> +static void init_workspace_manager(struct workspace_manager *wsm)
> +{
> +	INIT_LIST_HEAD(&wsm->idle_ws);
> +	spin_lock_init(&wsm->ws_lock);
> +	atomic_set(&wsm->total_ws, 0);
> +	wsm->free_ws = 0;
> +	init_waitqueue_head(&wsm->ws_wait);
> +}
> +
> +static void add_initial_workspace(struct workspace_manager *wsm,
> +				  struct crush_work *work)
> +{
> +	WARN_ON(!list_empty(&wsm->idle_ws));
> +
> +	list_add(&work->item, &wsm->idle_ws);
> +	atomic_set(&wsm->total_ws, 1);
> +	wsm->free_ws = 1;
> +}
> +
> +static void cleanup_workspace_manager(struct workspace_manager *wsm)
> +{
> +	struct crush_work *work;
> +
> +	while (!list_empty(&wsm->idle_ws)) {
> +		work = list_first_entry(&wsm->idle_ws, struct crush_work,
> +					item);
> +		list_del_init(&work->item);
> +		free_workspace(work);
> +	}
> +	atomic_set(&wsm->total_ws, 0);
> +	wsm->free_ws = 0;
> +}
> +
> +/*
> + * Finds an available workspace or allocates a new one.  If it's not
> + * possible to allocate a new one, waits until there is one.
> + */
> +static struct crush_work *get_workspace(struct workspace_manager *wsm,
> +					const struct crush_map *c)
> +{
> +	struct crush_work *work;
> +	int cpus = num_online_cpus();
> +
> +again:
> +	spin_lock(&wsm->ws_lock);
> +	if (!list_empty(&wsm->idle_ws)) {
> +		work = list_first_entry(&wsm->idle_ws, struct crush_work,
> +					item);
> +		list_del_init(&work->item);
> +		wsm->free_ws--;
> +		spin_unlock(&wsm->ws_lock);
> +		return work;
> +
> +	}
> +	if (atomic_read(&wsm->total_ws) > cpus) {
> +		DEFINE_WAIT(wait);
> +
> +		spin_unlock(&wsm->ws_lock);
> +		prepare_to_wait(&wsm->ws_wait, &wait, TASK_UNINTERRUPTIBLE);
> +		if (atomic_read(&wsm->total_ws) > cpus && !wsm->free_ws)
> +			schedule();
> +		finish_wait(&wsm->ws_wait, &wait);
> +		goto again;
> +	}
> +	atomic_inc(&wsm->total_ws);
> +	spin_unlock(&wsm->ws_lock);
> +
> +	work = alloc_workspace(c);
> +	if (!work) {
> +		atomic_dec(&wsm->total_ws);
> +		wake_up(&wsm->ws_wait);
> +
> +		/*
> +		 * Do not return the error but go back to waiting.  We
> +		 * have the inital workspace and the CRUSH computation
> +		 * time is bounded so we will get it eventually.
> +		 */
> +		WARN_ON(atomic_read(&wsm->total_ws) < 1);
> +		goto again;
> +	}
> +	return work;
> +}
> +
> +/*
> + * Puts a workspace back on the list or frees it if we have enough
> + * idle ones sitting around.
> + */
> +static void put_workspace(struct workspace_manager *wsm,
> +			  struct crush_work *work)
> +{
> +	spin_lock(&wsm->ws_lock);
> +	if (wsm->free_ws <= num_online_cpus()) {
> +		list_add(&work->item, &wsm->idle_ws);
> +		wsm->free_ws++;
> +		spin_unlock(&wsm->ws_lock);
> +		goto wake;
> +	}
> +	spin_unlock(&wsm->ws_lock);
> +
> +	free_workspace(work);
> +	atomic_dec(&wsm->total_ws);
> +wake:
> +	if (wq_has_sleeper(&wsm->ws_wait))
> +		wake_up(&wsm->ws_wait);

Is this racy? Could you end up missing a wakeup because something began
waiting between the check and wake_up? This is not being checked under
any sort of lock, so you could end up preempted between the two.

It might be better to just unconditionally call wake_up. In principle,
that should be a no-op if there is nothing waiting.

> +}
> +
>  /*
>   * osd map
>   */
> @@ -981,7 +1118,8 @@ struct ceph_osdmap *ceph_osdmap_alloc(void)
>  	map->primary_temp = RB_ROOT;
>  	map->pg_upmap = RB_ROOT;
>  	map->pg_upmap_items = RB_ROOT;
> -	mutex_init(&map->crush_workspace_mutex);
> +
> +	init_workspace_manager(&map->crush_wsm);
>  
>  	return map;
>  }
> @@ -989,8 +1127,11 @@ struct ceph_osdmap *ceph_osdmap_alloc(void)
>  void ceph_osdmap_destroy(struct ceph_osdmap *map)
>  {
>  	dout("osdmap_destroy %p\n", map);
> +
>  	if (map->crush)
>  		crush_destroy(map->crush);
> +	cleanup_workspace_manager(&map->crush_wsm);
> +
>  	while (!RB_EMPTY_ROOT(&map->pg_temp)) {
>  		struct ceph_pg_mapping *pg =
>  			rb_entry(rb_first(&map->pg_temp),
> @@ -1029,7 +1170,6 @@ void ceph_osdmap_destroy(struct ceph_osdmap *map)
>  	kvfree(map->osd_weight);
>  	kvfree(map->osd_addr);
>  	kvfree(map->osd_primary_affinity);
> -	kvfree(map->crush_workspace);
>  	kfree(map);
>  }
>  
> @@ -1104,26 +1244,22 @@ static int osdmap_set_max_osd(struct ceph_osdmap *map, u32 max)
>  
>  static int osdmap_set_crush(struct ceph_osdmap *map, struct crush_map *crush)
>  {
> -	void *workspace;
> -	size_t work_size;
> +	struct crush_work *work;
>  
>  	if (IS_ERR(crush))
>  		return PTR_ERR(crush);
>  
> -	work_size = crush_work_size(crush, CEPH_PG_MAX_SIZE);
> -	dout("%s work_size %zu bytes\n", __func__, work_size);
> -	workspace = ceph_kvmalloc(work_size, GFP_NOIO);
> -	if (!workspace) {
> +	work = alloc_workspace(crush);
> +	if (!work) {
>  		crush_destroy(crush);
>  		return -ENOMEM;
>  	}
> -	crush_init_workspace(crush, workspace);
>  
>  	if (map->crush)
>  		crush_destroy(map->crush);
> -	kvfree(map->crush_workspace);
> +	cleanup_workspace_manager(&map->crush_wsm);
>  	map->crush = crush;
> -	map->crush_workspace = workspace;
> +	add_initial_workspace(&map->crush_wsm, work);
>  	return 0;
>  }
>  
> @@ -2322,6 +2458,7 @@ static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
>  		    s64 choose_args_index)
>  {
>  	struct crush_choose_arg_map *arg_map;
> +	struct crush_work *work;
>  	int r;
>  
>  	BUG_ON(result_max > CEPH_PG_MAX_SIZE);
> @@ -2332,12 +2469,11 @@ static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
>  		arg_map = lookup_choose_arg_map(&map->crush->choose_args,
>  						CEPH_DEFAULT_CHOOSE_ARGS);
>  
> -	mutex_lock(&map->crush_workspace_mutex);
> +	work = get_workspace(&map->crush_wsm, map->crush);
>  	r = crush_do_rule(map->crush, ruleno, x, result, result_max,
> -			  weight, weight_max, map->crush_workspace,
> +			  weight, weight_max, work,
>  			  arg_map ? arg_map->args : NULL);
> -	mutex_unlock(&map->crush_workspace_mutex);
> -
> +	put_workspace(&map->crush_wsm, work);
>  	return r;
>  }
>  

-- 
Jeff Layton <jlayton@xxxxxxxxxx>




[Index of Archives]     [CEPH Users]     [Ceph Large]     [Ceph Dev]     [Information on CEPH]     [Linux BTRFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]

  Powered by Linux