diff mbox series

libceph: multiple workspaces for CRUSH computations

Message ID 20200819093614.22774-1-idryomov@gmail.com (mailing list archive)
State New, archived
Headers show
Series libceph: multiple workspaces for CRUSH computations | expand

Commit Message

Ilya Dryomov Aug. 19, 2020, 9:36 a.m. UTC
Replace a global map->crush_workspace (protected by a global mutex)
with a list of workspaces, up to the number of CPUs + 1.

This is based on a patch from Robin Geuze <robing@nl.team.blue>.
Robin and his team have observed a 10-20% increase in IOPS on all
queue depths and lower CPU usage as well on a high-end all-NVMe
100GbE cluster.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
---
 include/linux/ceph/osdmap.h |  14 ++-
 include/linux/crush/crush.h |   3 +
 net/ceph/osdmap.c           | 166 ++++++++++++++++++++++++++++++++----
 3 files changed, 166 insertions(+), 17 deletions(-)

Comments

Jeff Layton Aug. 19, 2020, 12:25 p.m. UTC | #1
On Wed, 2020-08-19 at 11:36 +0200, Ilya Dryomov wrote:
> Replace a global map->crush_workspace (protected by a global mutex)
> with a list of workspaces, up to the number of CPUs + 1.
> 
> This is based on a patch from Robin Geuze <robing@nl.team.blue>.
> Robin and his team have observed a 10-20% increase in IOPS on all
> queue depths and lower CPU usage as well on a high-end all-NVMe
> 100GbE cluster.
> 
> Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
> ---
>  include/linux/ceph/osdmap.h |  14 ++-
>  include/linux/crush/crush.h |   3 +
>  net/ceph/osdmap.c           | 166 ++++++++++++++++++++++++++++++++----
>  3 files changed, 166 insertions(+), 17 deletions(-)
> 
> diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h
> index 3f4498fef6ad..cad9acfbc320 100644
> --- a/include/linux/ceph/osdmap.h
> +++ b/include/linux/ceph/osdmap.h
> @@ -137,6 +137,17 @@ int ceph_oid_aprintf(struct ceph_object_id *oid, gfp_t gfp,
>  		     const char *fmt, ...);
>  void ceph_oid_destroy(struct ceph_object_id *oid);
>  
> +struct workspace_manager {
> +	struct list_head idle_ws;
> +	spinlock_t ws_lock;
> +	/* Number of free workspaces */
> +	int free_ws;
> +	/* Total number of allocated workspaces */
> +	atomic_t total_ws;
> +	/* Waiters for a free workspace */
> +	wait_queue_head_t ws_wait;
> +};
> +
>  struct ceph_pg_mapping {
>  	struct rb_node node;
>  	struct ceph_pg pgid;
> @@ -184,8 +195,7 @@ struct ceph_osdmap {
>  	 * the list of osds that store+replicate them. */
>  	struct crush_map *crush;
>  
> -	struct mutex crush_workspace_mutex;
> -	void *crush_workspace;
> +	struct workspace_manager crush_wsm;
>  };
>  
>  static inline bool ceph_osd_exists(struct ceph_osdmap *map, int osd)
> diff --git a/include/linux/crush/crush.h b/include/linux/crush/crush.h
> index 2f811baf78d2..30dba392b730 100644
> --- a/include/linux/crush/crush.h
> +++ b/include/linux/crush/crush.h
> @@ -346,6 +346,9 @@ struct crush_work_bucket {
>  
>  struct crush_work {
>  	struct crush_work_bucket **work; /* Per-bucket working store */
> +#ifdef __KERNEL__
> +	struct list_head item;
> +#endif
>  };
>  
>  #ifdef __KERNEL__
> diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
> index 96c25f5e064a..fa08c15be0c0 100644
> --- a/net/ceph/osdmap.c
> +++ b/net/ceph/osdmap.c
> @@ -964,6 +964,143 @@ static int decode_pool_names(void **p, void *end, struct ceph_osdmap *map)
>  	return -EINVAL;
>  }
>  
> +/*
> + * CRUSH workspaces
> + *
> + * workspace_manager framework borrowed from fs/btrfs/compression.c.
> + * Two simplifications: there is only one type of workspace and there
> + * is always at least one workspace.
> + */
> +static struct crush_work *alloc_workspace(const struct crush_map *c)
> +{
> +	struct crush_work *work;
> +	size_t work_size;
> +
> +	WARN_ON(!c->working_size);
> +	work_size = crush_work_size(c, CEPH_PG_MAX_SIZE);
> +	dout("%s work_size %zu bytes\n", __func__, work_size);
> +
> +	work = ceph_kvmalloc(work_size, GFP_NOIO);
> +	if (!work)
> +		return NULL;
> +

In general, how big are these allocations? They're all uniform so you
could make a dedicated slabcache for this. Granted you'll only have one
a max of per cpu, but some boxes have a lot of CPUs these days.

> +	INIT_LIST_HEAD(&work->item);
> +	crush_init_workspace(c, work);
> +	return work;
> +}
> +
> +static void free_workspace(struct crush_work *work)
> +{
> +	WARN_ON(!list_empty(&work->item));
> +	kvfree(work);
> +}
> +
> +static void init_workspace_manager(struct workspace_manager *wsm)
> +{
> +	INIT_LIST_HEAD(&wsm->idle_ws);
> +	spin_lock_init(&wsm->ws_lock);
> +	atomic_set(&wsm->total_ws, 0);
> +	wsm->free_ws = 0;
> +	init_waitqueue_head(&wsm->ws_wait);
> +}
> +
> +static void add_initial_workspace(struct workspace_manager *wsm,
> +				  struct crush_work *work)
> +{
> +	WARN_ON(!list_empty(&wsm->idle_ws));
> +
> +	list_add(&work->item, &wsm->idle_ws);
> +	atomic_set(&wsm->total_ws, 1);
> +	wsm->free_ws = 1;
> +}
> +
> +static void cleanup_workspace_manager(struct workspace_manager *wsm)
> +{
> +	struct crush_work *work;
> +
> +	while (!list_empty(&wsm->idle_ws)) {
> +		work = list_first_entry(&wsm->idle_ws, struct crush_work,
> +					item);
> +		list_del_init(&work->item);
> +		free_workspace(work);
> +	}
> +	atomic_set(&wsm->total_ws, 0);
> +	wsm->free_ws = 0;
> +}
> +
> +/*
> + * Finds an available workspace or allocates a new one.  If it's not
> + * possible to allocate a new one, waits until there is one.
> + */
> +static struct crush_work *get_workspace(struct workspace_manager *wsm,
> +					const struct crush_map *c)
> +{
> +	struct crush_work *work;
> +	int cpus = num_online_cpus();
> +
> +again:
> +	spin_lock(&wsm->ws_lock);
> +	if (!list_empty(&wsm->idle_ws)) {
> +		work = list_first_entry(&wsm->idle_ws, struct crush_work,
> +					item);
> +		list_del_init(&work->item);
> +		wsm->free_ws--;
> +		spin_unlock(&wsm->ws_lock);
> +		return work;
> +
> +	}
> +	if (atomic_read(&wsm->total_ws) > cpus) {
> +		DEFINE_WAIT(wait);
> +
> +		spin_unlock(&wsm->ws_lock);
> +		prepare_to_wait(&wsm->ws_wait, &wait, TASK_UNINTERRUPTIBLE);
> +		if (atomic_read(&wsm->total_ws) > cpus && !wsm->free_ws)
> +			schedule();
> +		finish_wait(&wsm->ws_wait, &wait);
> +		goto again;
> +	}
> +	atomic_inc(&wsm->total_ws);
> +	spin_unlock(&wsm->ws_lock);
> +
> +	work = alloc_workspace(c);
> +	if (!work) {
> +		atomic_dec(&wsm->total_ws);
> +		wake_up(&wsm->ws_wait);
> +
> +		/*
> +		 * Do not return the error but go back to waiting.  We
> +		 * have the inital workspace and the CRUSH computation
> +		 * time is bounded so we will get it eventually.
> +		 */
> +		WARN_ON(atomic_read(&wsm->total_ws) < 1);
> +		goto again;
> +	}
> +	return work;
> +}
> +
> +/*
> + * Puts a workspace back on the list or frees it if we have enough
> + * idle ones sitting around.
> + */
> +static void put_workspace(struct workspace_manager *wsm,
> +			  struct crush_work *work)
> +{
> +	spin_lock(&wsm->ws_lock);
> +	if (wsm->free_ws <= num_online_cpus()) {
> +		list_add(&work->item, &wsm->idle_ws);
> +		wsm->free_ws++;
> +		spin_unlock(&wsm->ws_lock);
> +		goto wake;
> +	}
> +	spin_unlock(&wsm->ws_lock);
> +
> +	free_workspace(work);
> +	atomic_dec(&wsm->total_ws);
> +wake:
> +	if (wq_has_sleeper(&wsm->ws_wait))
> +		wake_up(&wsm->ws_wait);

Is this racy? Could you end up missing a wakeup because something began
waiting between the check and wake_up? This is not being checked under
any sort of lock, so you could end up preempted between the two.

It might be better to just unconditionally call wake_up. In principle,
that should be a no-op if there is nothing waiting.

> +}
> +
>  /*
>   * osd map
>   */
> @@ -981,7 +1118,8 @@ struct ceph_osdmap *ceph_osdmap_alloc(void)
>  	map->primary_temp = RB_ROOT;
>  	map->pg_upmap = RB_ROOT;
>  	map->pg_upmap_items = RB_ROOT;
> -	mutex_init(&map->crush_workspace_mutex);
> +
> +	init_workspace_manager(&map->crush_wsm);
>  
>  	return map;
>  }
> @@ -989,8 +1127,11 @@ struct ceph_osdmap *ceph_osdmap_alloc(void)
>  void ceph_osdmap_destroy(struct ceph_osdmap *map)
>  {
>  	dout("osdmap_destroy %p\n", map);
> +
>  	if (map->crush)
>  		crush_destroy(map->crush);
> +	cleanup_workspace_manager(&map->crush_wsm);
> +
>  	while (!RB_EMPTY_ROOT(&map->pg_temp)) {
>  		struct ceph_pg_mapping *pg =
>  			rb_entry(rb_first(&map->pg_temp),
> @@ -1029,7 +1170,6 @@ void ceph_osdmap_destroy(struct ceph_osdmap *map)
>  	kvfree(map->osd_weight);
>  	kvfree(map->osd_addr);
>  	kvfree(map->osd_primary_affinity);
> -	kvfree(map->crush_workspace);
>  	kfree(map);
>  }
>  
> @@ -1104,26 +1244,22 @@ static int osdmap_set_max_osd(struct ceph_osdmap *map, u32 max)
>  
>  static int osdmap_set_crush(struct ceph_osdmap *map, struct crush_map *crush)
>  {
> -	void *workspace;
> -	size_t work_size;
> +	struct crush_work *work;
>  
>  	if (IS_ERR(crush))
>  		return PTR_ERR(crush);
>  
> -	work_size = crush_work_size(crush, CEPH_PG_MAX_SIZE);
> -	dout("%s work_size %zu bytes\n", __func__, work_size);
> -	workspace = ceph_kvmalloc(work_size, GFP_NOIO);
> -	if (!workspace) {
> +	work = alloc_workspace(crush);
> +	if (!work) {
>  		crush_destroy(crush);
>  		return -ENOMEM;
>  	}
> -	crush_init_workspace(crush, workspace);
>  
>  	if (map->crush)
>  		crush_destroy(map->crush);
> -	kvfree(map->crush_workspace);
> +	cleanup_workspace_manager(&map->crush_wsm);
>  	map->crush = crush;
> -	map->crush_workspace = workspace;
> +	add_initial_workspace(&map->crush_wsm, work);
>  	return 0;
>  }
>  
> @@ -2322,6 +2458,7 @@ static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
>  		    s64 choose_args_index)
>  {
>  	struct crush_choose_arg_map *arg_map;
> +	struct crush_work *work;
>  	int r;
>  
>  	BUG_ON(result_max > CEPH_PG_MAX_SIZE);
> @@ -2332,12 +2469,11 @@ static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
>  		arg_map = lookup_choose_arg_map(&map->crush->choose_args,
>  						CEPH_DEFAULT_CHOOSE_ARGS);
>  
> -	mutex_lock(&map->crush_workspace_mutex);
> +	work = get_workspace(&map->crush_wsm, map->crush);
>  	r = crush_do_rule(map->crush, ruleno, x, result, result_max,
> -			  weight, weight_max, map->crush_workspace,
> +			  weight, weight_max, work,
>  			  arg_map ? arg_map->args : NULL);
> -	mutex_unlock(&map->crush_workspace_mutex);
> -
> +	put_workspace(&map->crush_wsm, work);
>  	return r;
>  }
>
Ilya Dryomov Aug. 19, 2020, 1:07 p.m. UTC | #2
On Wed, Aug 19, 2020 at 2:25 PM Jeff Layton <jlayton@kernel.org> wrote:
>
> On Wed, 2020-08-19 at 11:36 +0200, Ilya Dryomov wrote:
> > Replace a global map->crush_workspace (protected by a global mutex)
> > with a list of workspaces, up to the number of CPUs + 1.
> >
> > This is based on a patch from Robin Geuze <robing@nl.team.blue>.
> > Robin and his team have observed a 10-20% increase in IOPS on all
> > queue depths and lower CPU usage as well on a high-end all-NVMe
> > 100GbE cluster.
> >
> > Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
> > ---
> >  include/linux/ceph/osdmap.h |  14 ++-
> >  include/linux/crush/crush.h |   3 +
> >  net/ceph/osdmap.c           | 166 ++++++++++++++++++++++++++++++++----
> >  3 files changed, 166 insertions(+), 17 deletions(-)
> >
> > diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h
> > index 3f4498fef6ad..cad9acfbc320 100644
> > --- a/include/linux/ceph/osdmap.h
> > +++ b/include/linux/ceph/osdmap.h
> > @@ -137,6 +137,17 @@ int ceph_oid_aprintf(struct ceph_object_id *oid, gfp_t gfp,
> >                    const char *fmt, ...);
> >  void ceph_oid_destroy(struct ceph_object_id *oid);
> >
> > +struct workspace_manager {
> > +     struct list_head idle_ws;
> > +     spinlock_t ws_lock;
> > +     /* Number of free workspaces */
> > +     int free_ws;
> > +     /* Total number of allocated workspaces */
> > +     atomic_t total_ws;
> > +     /* Waiters for a free workspace */
> > +     wait_queue_head_t ws_wait;
> > +};
> > +
> >  struct ceph_pg_mapping {
> >       struct rb_node node;
> >       struct ceph_pg pgid;
> > @@ -184,8 +195,7 @@ struct ceph_osdmap {
> >        * the list of osds that store+replicate them. */
> >       struct crush_map *crush;
> >
> > -     struct mutex crush_workspace_mutex;
> > -     void *crush_workspace;
> > +     struct workspace_manager crush_wsm;
> >  };
> >
> >  static inline bool ceph_osd_exists(struct ceph_osdmap *map, int osd)
> > diff --git a/include/linux/crush/crush.h b/include/linux/crush/crush.h
> > index 2f811baf78d2..30dba392b730 100644
> > --- a/include/linux/crush/crush.h
> > +++ b/include/linux/crush/crush.h
> > @@ -346,6 +346,9 @@ struct crush_work_bucket {
> >
> >  struct crush_work {
> >       struct crush_work_bucket **work; /* Per-bucket working store */
> > +#ifdef __KERNEL__
> > +     struct list_head item;
> > +#endif
> >  };
> >
> >  #ifdef __KERNEL__
> > diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
> > index 96c25f5e064a..fa08c15be0c0 100644
> > --- a/net/ceph/osdmap.c
> > +++ b/net/ceph/osdmap.c
> > @@ -964,6 +964,143 @@ static int decode_pool_names(void **p, void *end, struct ceph_osdmap *map)
> >       return -EINVAL;
> >  }
> >
> > +/*
> > + * CRUSH workspaces
> > + *
> > + * workspace_manager framework borrowed from fs/btrfs/compression.c.
> > + * Two simplifications: there is only one type of workspace and there
> > + * is always at least one workspace.
> > + */
> > +static struct crush_work *alloc_workspace(const struct crush_map *c)
> > +{
> > +     struct crush_work *work;
> > +     size_t work_size;
> > +
> > +     WARN_ON(!c->working_size);
> > +     work_size = crush_work_size(c, CEPH_PG_MAX_SIZE);
> > +     dout("%s work_size %zu bytes\n", __func__, work_size);
> > +
> > +     work = ceph_kvmalloc(work_size, GFP_NOIO);
> > +     if (!work)
> > +             return NULL;
> > +
>
> In general, how big are these allocations? They're all uniform so you
> could make a dedicated slabcache for this. Granted you'll only have one
> a max of per cpu, but some boxes have a lot of CPUs these days.

Hi Jeff,

They are going to vary in size from CRUSH map to CRUSH map and need
a vmalloc fallback, so a dedicated cache wouldn't work.

>
> > +     INIT_LIST_HEAD(&work->item);
> > +     crush_init_workspace(c, work);
> > +     return work;
> > +}
> > +
> > +static void free_workspace(struct crush_work *work)
> > +{
> > +     WARN_ON(!list_empty(&work->item));
> > +     kvfree(work);
> > +}
> > +
> > +static void init_workspace_manager(struct workspace_manager *wsm)
> > +{
> > +     INIT_LIST_HEAD(&wsm->idle_ws);
> > +     spin_lock_init(&wsm->ws_lock);
> > +     atomic_set(&wsm->total_ws, 0);
> > +     wsm->free_ws = 0;
> > +     init_waitqueue_head(&wsm->ws_wait);
> > +}
> > +
> > +static void add_initial_workspace(struct workspace_manager *wsm,
> > +                               struct crush_work *work)
> > +{
> > +     WARN_ON(!list_empty(&wsm->idle_ws));
> > +
> > +     list_add(&work->item, &wsm->idle_ws);
> > +     atomic_set(&wsm->total_ws, 1);
> > +     wsm->free_ws = 1;
> > +}
> > +
> > +static void cleanup_workspace_manager(struct workspace_manager *wsm)
> > +{
> > +     struct crush_work *work;
> > +
> > +     while (!list_empty(&wsm->idle_ws)) {
> > +             work = list_first_entry(&wsm->idle_ws, struct crush_work,
> > +                                     item);
> > +             list_del_init(&work->item);
> > +             free_workspace(work);
> > +     }
> > +     atomic_set(&wsm->total_ws, 0);
> > +     wsm->free_ws = 0;
> > +}
> > +
> > +/*
> > + * Finds an available workspace or allocates a new one.  If it's not
> > + * possible to allocate a new one, waits until there is one.
> > + */
> > +static struct crush_work *get_workspace(struct workspace_manager *wsm,
> > +                                     const struct crush_map *c)
> > +{
> > +     struct crush_work *work;
> > +     int cpus = num_online_cpus();
> > +
> > +again:
> > +     spin_lock(&wsm->ws_lock);
> > +     if (!list_empty(&wsm->idle_ws)) {
> > +             work = list_first_entry(&wsm->idle_ws, struct crush_work,
> > +                                     item);
> > +             list_del_init(&work->item);
> > +             wsm->free_ws--;
> > +             spin_unlock(&wsm->ws_lock);
> > +             return work;
> > +
> > +     }
> > +     if (atomic_read(&wsm->total_ws) > cpus) {
> > +             DEFINE_WAIT(wait);
> > +
> > +             spin_unlock(&wsm->ws_lock);
> > +             prepare_to_wait(&wsm->ws_wait, &wait, TASK_UNINTERRUPTIBLE);
> > +             if (atomic_read(&wsm->total_ws) > cpus && !wsm->free_ws)
> > +                     schedule();
> > +             finish_wait(&wsm->ws_wait, &wait);
> > +             goto again;
> > +     }
> > +     atomic_inc(&wsm->total_ws);
> > +     spin_unlock(&wsm->ws_lock);
> > +
> > +     work = alloc_workspace(c);
> > +     if (!work) {
> > +             atomic_dec(&wsm->total_ws);
> > +             wake_up(&wsm->ws_wait);
> > +
> > +             /*
> > +              * Do not return the error but go back to waiting.  We
> > +              * have the inital workspace and the CRUSH computation
> > +              * time is bounded so we will get it eventually.
> > +              */
> > +             WARN_ON(atomic_read(&wsm->total_ws) < 1);
> > +             goto again;
> > +     }
> > +     return work;
> > +}
> > +
> > +/*
> > + * Puts a workspace back on the list or frees it if we have enough
> > + * idle ones sitting around.
> > + */
> > +static void put_workspace(struct workspace_manager *wsm,
> > +                       struct crush_work *work)
> > +{
> > +     spin_lock(&wsm->ws_lock);
> > +     if (wsm->free_ws <= num_online_cpus()) {
> > +             list_add(&work->item, &wsm->idle_ws);
> > +             wsm->free_ws++;
> > +             spin_unlock(&wsm->ws_lock);
> > +             goto wake;
> > +     }
> > +     spin_unlock(&wsm->ws_lock);
> > +
> > +     free_workspace(work);
> > +     atomic_dec(&wsm->total_ws);
> > +wake:
> > +     if (wq_has_sleeper(&wsm->ws_wait))
> > +             wake_up(&wsm->ws_wait);
>
> Is this racy? Could you end up missing a wakeup because something began
> waiting between the check and wake_up? This is not being checked under
> any sort of lock, so you could end up preempted between the two.
>
> It might be better to just unconditionally call wake_up. In principle,
> that should be a no-op if there is nothing waiting.

Yeah, it raised my eyebrows as well (and !wsm->free_ws test without
a lock in get_workspace() too).  This is a straight up cut-and-paste
from Btrfs through, so I decided to leave it as is.  This is pretty
old code and it survived there for over ten years, although I'm not
sure how many people actually enable compression in the field.

If we hit issues, I'd just reimplement the whole thing from scratch
rather than attempting to tweak a cut-and-paste that is not entirely
straightforward.

Thanks,

                Ilya
Jeff Layton Aug. 19, 2020, 1:48 p.m. UTC | #3
On Wed, 2020-08-19 at 15:07 +0200, Ilya Dryomov wrote:
> On Wed, Aug 19, 2020 at 2:25 PM Jeff Layton <jlayton@kernel.org> wrote:
> > On Wed, 2020-08-19 at 11:36 +0200, Ilya Dryomov wrote:
> > > Replace a global map->crush_workspace (protected by a global mutex)
> > > with a list of workspaces, up to the number of CPUs + 1.
> > > 
> > > This is based on a patch from Robin Geuze <robing@nl.team.blue>.
> > > Robin and his team have observed a 10-20% increase in IOPS on all
> > > queue depths and lower CPU usage as well on a high-end all-NVMe
> > > 100GbE cluster.
> > > 
> > > Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
> > > ---
> > >  include/linux/ceph/osdmap.h |  14 ++-
> > >  include/linux/crush/crush.h |   3 +
> > >  net/ceph/osdmap.c           | 166 ++++++++++++++++++++++++++++++++----
> > >  3 files changed, 166 insertions(+), 17 deletions(-)
> > > 
> > > diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h
> > > index 3f4498fef6ad..cad9acfbc320 100644
> > > --- a/include/linux/ceph/osdmap.h
> > > +++ b/include/linux/ceph/osdmap.h
> > > @@ -137,6 +137,17 @@ int ceph_oid_aprintf(struct ceph_object_id *oid, gfp_t gfp,
> > >                    const char *fmt, ...);
> > >  void ceph_oid_destroy(struct ceph_object_id *oid);
> > > 
> > > +struct workspace_manager {
> > > +     struct list_head idle_ws;
> > > +     spinlock_t ws_lock;
> > > +     /* Number of free workspaces */
> > > +     int free_ws;
> > > +     /* Total number of allocated workspaces */
> > > +     atomic_t total_ws;
> > > +     /* Waiters for a free workspace */
> > > +     wait_queue_head_t ws_wait;
> > > +};
> > > +
> > >  struct ceph_pg_mapping {
> > >       struct rb_node node;
> > >       struct ceph_pg pgid;
> > > @@ -184,8 +195,7 @@ struct ceph_osdmap {
> > >        * the list of osds that store+replicate them. */
> > >       struct crush_map *crush;
> > > 
> > > -     struct mutex crush_workspace_mutex;
> > > -     void *crush_workspace;
> > > +     struct workspace_manager crush_wsm;
> > >  };
> > > 
> > >  static inline bool ceph_osd_exists(struct ceph_osdmap *map, int osd)
> > > diff --git a/include/linux/crush/crush.h b/include/linux/crush/crush.h
> > > index 2f811baf78d2..30dba392b730 100644
> > > --- a/include/linux/crush/crush.h
> > > +++ b/include/linux/crush/crush.h
> > > @@ -346,6 +346,9 @@ struct crush_work_bucket {
> > > 
> > >  struct crush_work {
> > >       struct crush_work_bucket **work; /* Per-bucket working store */
> > > +#ifdef __KERNEL__
> > > +     struct list_head item;
> > > +#endif
> > >  };
> > > 
> > >  #ifdef __KERNEL__
> > > diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
> > > index 96c25f5e064a..fa08c15be0c0 100644
> > > --- a/net/ceph/osdmap.c
> > > +++ b/net/ceph/osdmap.c
> > > @@ -964,6 +964,143 @@ static int decode_pool_names(void **p, void *end, struct ceph_osdmap *map)
> > >       return -EINVAL;
> > >  }
> > > 
> > > +/*
> > > + * CRUSH workspaces
> > > + *
> > > + * workspace_manager framework borrowed from fs/btrfs/compression.c.
> > > + * Two simplifications: there is only one type of workspace and there
> > > + * is always at least one workspace.
> > > + */
> > > +static struct crush_work *alloc_workspace(const struct crush_map *c)
> > > +{
> > > +     struct crush_work *work;
> > > +     size_t work_size;
> > > +
> > > +     WARN_ON(!c->working_size);
> > > +     work_size = crush_work_size(c, CEPH_PG_MAX_SIZE);
> > > +     dout("%s work_size %zu bytes\n", __func__, work_size);
> > > +
> > > +     work = ceph_kvmalloc(work_size, GFP_NOIO);
> > > +     if (!work)
> > > +             return NULL;
> > > +
> > 
> > In general, how big are these allocations? They're all uniform so you
> > could make a dedicated slabcache for this. Granted you'll only have one
> > a max of per cpu, but some boxes have a lot of CPUs these days.
> 
> Hi Jeff,
> 
> They are going to vary in size from CRUSH map to CRUSH map and need
> a vmalloc fallback, so a dedicated cache wouldn't work.
> 

Got it. Ok.

> > > +     INIT_LIST_HEAD(&work->item);
> > > +     crush_init_workspace(c, work);
> > > +     return work;
> > > +}
> > > +
> > > +static void free_workspace(struct crush_work *work)
> > > +{
> > > +     WARN_ON(!list_empty(&work->item));
> > > +     kvfree(work);
> > > +}
> > > +
> > > +static void init_workspace_manager(struct workspace_manager *wsm)
> > > +{
> > > +     INIT_LIST_HEAD(&wsm->idle_ws);
> > > +     spin_lock_init(&wsm->ws_lock);
> > > +     atomic_set(&wsm->total_ws, 0);
> > > +     wsm->free_ws = 0;
> > > +     init_waitqueue_head(&wsm->ws_wait);
> > > +}
> > > +
> > > +static void add_initial_workspace(struct workspace_manager *wsm,
> > > +                               struct crush_work *work)
> > > +{
> > > +     WARN_ON(!list_empty(&wsm->idle_ws));
> > > +
> > > +     list_add(&work->item, &wsm->idle_ws);
> > > +     atomic_set(&wsm->total_ws, 1);
> > > +     wsm->free_ws = 1;
> > > +}
> > > +
> > > +static void cleanup_workspace_manager(struct workspace_manager *wsm)
> > > +{
> > > +     struct crush_work *work;
> > > +
> > > +     while (!list_empty(&wsm->idle_ws)) {
> > > +             work = list_first_entry(&wsm->idle_ws, struct crush_work,
> > > +                                     item);
> > > +             list_del_init(&work->item);
> > > +             free_workspace(work);
> > > +     }
> > > +     atomic_set(&wsm->total_ws, 0);
> > > +     wsm->free_ws = 0;
> > > +}
> > > +
> > > +/*
> > > + * Finds an available workspace or allocates a new one.  If it's not
> > > + * possible to allocate a new one, waits until there is one.
> > > + */
> > > +static struct crush_work *get_workspace(struct workspace_manager *wsm,
> > > +                                     const struct crush_map *c)
> > > +{
> > > +     struct crush_work *work;
> > > +     int cpus = num_online_cpus();
> > > +
> > > +again:
> > > +     spin_lock(&wsm->ws_lock);
> > > +     if (!list_empty(&wsm->idle_ws)) {
> > > +             work = list_first_entry(&wsm->idle_ws, struct crush_work,
> > > +                                     item);
> > > +             list_del_init(&work->item);
> > > +             wsm->free_ws--;
> > > +             spin_unlock(&wsm->ws_lock);
> > > +             return work;
> > > +
> > > +     }
> > > +     if (atomic_read(&wsm->total_ws) > cpus) {
> > > +             DEFINE_WAIT(wait);
> > > +
> > > +             spin_unlock(&wsm->ws_lock);
> > > +             prepare_to_wait(&wsm->ws_wait, &wait, TASK_UNINTERRUPTIBLE);
> > > +             if (atomic_read(&wsm->total_ws) > cpus && !wsm->free_ws)
> > > +                     schedule();
> > > +             finish_wait(&wsm->ws_wait, &wait);
> > > +             goto again;
> > > +     }
> > > +     atomic_inc(&wsm->total_ws);
> > > +     spin_unlock(&wsm->ws_lock);
> > > +
> > > +     work = alloc_workspace(c);
> > > +     if (!work) {
> > > +             atomic_dec(&wsm->total_ws);
> > > +             wake_up(&wsm->ws_wait);
> > > +
> > > +             /*
> > > +              * Do not return the error but go back to waiting.  We
> > > +              * have the inital workspace and the CRUSH computation
> > > +              * time is bounded so we will get it eventually.
> > > +              */
> > > +             WARN_ON(atomic_read(&wsm->total_ws) < 1);
> > > +             goto again;
> > > +     }
> > > +     return work;
> > > +}
> > > +
> > > +/*
> > > + * Puts a workspace back on the list or frees it if we have enough
> > > + * idle ones sitting around.
> > > + */
> > > +static void put_workspace(struct workspace_manager *wsm,
> > > +                       struct crush_work *work)
> > > +{
> > > +     spin_lock(&wsm->ws_lock);
> > > +     if (wsm->free_ws <= num_online_cpus()) {
> > > +             list_add(&work->item, &wsm->idle_ws);
> > > +             wsm->free_ws++;
> > > +             spin_unlock(&wsm->ws_lock);
> > > +             goto wake;
> > > +     }
> > > +     spin_unlock(&wsm->ws_lock);
> > > +
> > > +     free_workspace(work);
> > > +     atomic_dec(&wsm->total_ws);
> > > +wake:
> > > +     if (wq_has_sleeper(&wsm->ws_wait))
> > > +             wake_up(&wsm->ws_wait);
> > 
> > Is this racy? Could you end up missing a wakeup because something began
> > waiting between the check and wake_up? This is not being checked under
> > any sort of lock, so you could end up preempted between the two.
> > 
> > It might be better to just unconditionally call wake_up. In principle,
> > that should be a no-op if there is nothing waiting.
> 
> Yeah, it raised my eyebrows as well (and !wsm->free_ws test without
> a lock in get_workspace() too).  This is a straight up cut-and-paste
> from Btrfs through, so I decided to leave it as is.  This is pretty
> old code and it survived there for over ten years, although I'm not
> sure how many people actually enable compression in the field.
> 
> If we hit issues, I'd just reimplement the whole thing from scratch
> rather than attempting to tweak a cut-and-paste that is not entirely
> straightforward.

Yeah, the free_ws handling looks quite suspicious. At least total_ws is
atomic...

In practice, I often find that problems like this only become evident
when you start running on more exotic arches with different caching
behavior. Hopefully it won't be an issue, but if it ever is it'll
probably be quite hard to reproduce without some sort of fault
injection.
Ilya Dryomov Aug. 19, 2020, 2:14 p.m. UTC | #4
On Wed, Aug 19, 2020 at 3:48 PM Jeff Layton <jlayton@kernel.org> wrote:
>
> On Wed, 2020-08-19 at 15:07 +0200, Ilya Dryomov wrote:
> > On Wed, Aug 19, 2020 at 2:25 PM Jeff Layton <jlayton@kernel.org> wrote:
> > > On Wed, 2020-08-19 at 11:36 +0200, Ilya Dryomov wrote:
> > > > Replace a global map->crush_workspace (protected by a global mutex)
> > > > with a list of workspaces, up to the number of CPUs + 1.
> > > >
> > > > This is based on a patch from Robin Geuze <robing@nl.team.blue>.
> > > > Robin and his team have observed a 10-20% increase in IOPS on all
> > > > queue depths and lower CPU usage as well on a high-end all-NVMe
> > > > 100GbE cluster.
> > > >
> > > > Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
> > > > ---
> > > >  include/linux/ceph/osdmap.h |  14 ++-
> > > >  include/linux/crush/crush.h |   3 +
> > > >  net/ceph/osdmap.c           | 166 ++++++++++++++++++++++++++++++++----
> > > >  3 files changed, 166 insertions(+), 17 deletions(-)
> > > >
> > > > diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h
> > > > index 3f4498fef6ad..cad9acfbc320 100644
> > > > --- a/include/linux/ceph/osdmap.h
> > > > +++ b/include/linux/ceph/osdmap.h
> > > > @@ -137,6 +137,17 @@ int ceph_oid_aprintf(struct ceph_object_id *oid, gfp_t gfp,
> > > >                    const char *fmt, ...);
> > > >  void ceph_oid_destroy(struct ceph_object_id *oid);
> > > >
> > > > +struct workspace_manager {
> > > > +     struct list_head idle_ws;
> > > > +     spinlock_t ws_lock;
> > > > +     /* Number of free workspaces */
> > > > +     int free_ws;
> > > > +     /* Total number of allocated workspaces */
> > > > +     atomic_t total_ws;
> > > > +     /* Waiters for a free workspace */
> > > > +     wait_queue_head_t ws_wait;
> > > > +};
> > > > +
> > > >  struct ceph_pg_mapping {
> > > >       struct rb_node node;
> > > >       struct ceph_pg pgid;
> > > > @@ -184,8 +195,7 @@ struct ceph_osdmap {
> > > >        * the list of osds that store+replicate them. */
> > > >       struct crush_map *crush;
> > > >
> > > > -     struct mutex crush_workspace_mutex;
> > > > -     void *crush_workspace;
> > > > +     struct workspace_manager crush_wsm;
> > > >  };
> > > >
> > > >  static inline bool ceph_osd_exists(struct ceph_osdmap *map, int osd)
> > > > diff --git a/include/linux/crush/crush.h b/include/linux/crush/crush.h
> > > > index 2f811baf78d2..30dba392b730 100644
> > > > --- a/include/linux/crush/crush.h
> > > > +++ b/include/linux/crush/crush.h
> > > > @@ -346,6 +346,9 @@ struct crush_work_bucket {
> > > >
> > > >  struct crush_work {
> > > >       struct crush_work_bucket **work; /* Per-bucket working store */
> > > > +#ifdef __KERNEL__
> > > > +     struct list_head item;
> > > > +#endif
> > > >  };
> > > >
> > > >  #ifdef __KERNEL__
> > > > diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
> > > > index 96c25f5e064a..fa08c15be0c0 100644
> > > > --- a/net/ceph/osdmap.c
> > > > +++ b/net/ceph/osdmap.c
> > > > @@ -964,6 +964,143 @@ static int decode_pool_names(void **p, void *end, struct ceph_osdmap *map)
> > > >       return -EINVAL;
> > > >  }
> > > >
> > > > +/*
> > > > + * CRUSH workspaces
> > > > + *
> > > > + * workspace_manager framework borrowed from fs/btrfs/compression.c.
> > > > + * Two simplifications: there is only one type of workspace and there
> > > > + * is always at least one workspace.
> > > > + */
> > > > +static struct crush_work *alloc_workspace(const struct crush_map *c)
> > > > +{
> > > > +     struct crush_work *work;
> > > > +     size_t work_size;
> > > > +
> > > > +     WARN_ON(!c->working_size);
> > > > +     work_size = crush_work_size(c, CEPH_PG_MAX_SIZE);
> > > > +     dout("%s work_size %zu bytes\n", __func__, work_size);
> > > > +
> > > > +     work = ceph_kvmalloc(work_size, GFP_NOIO);
> > > > +     if (!work)
> > > > +             return NULL;
> > > > +
> > >
> > > In general, how big are these allocations? They're all uniform so you
> > > could make a dedicated slabcache for this. Granted you'll only have one
> > > a max of per cpu, but some boxes have a lot of CPUs these days.
> >
> > Hi Jeff,
> >
> > They are going to vary in size from CRUSH map to CRUSH map and need
> > a vmalloc fallback, so a dedicated cache wouldn't work.
> >
>
> Got it. Ok.
>
> > > > +     INIT_LIST_HEAD(&work->item);
> > > > +     crush_init_workspace(c, work);
> > > > +     return work;
> > > > +}
> > > > +
> > > > +static void free_workspace(struct crush_work *work)
> > > > +{
> > > > +     WARN_ON(!list_empty(&work->item));
> > > > +     kvfree(work);
> > > > +}
> > > > +
> > > > +static void init_workspace_manager(struct workspace_manager *wsm)
> > > > +{
> > > > +     INIT_LIST_HEAD(&wsm->idle_ws);
> > > > +     spin_lock_init(&wsm->ws_lock);
> > > > +     atomic_set(&wsm->total_ws, 0);
> > > > +     wsm->free_ws = 0;
> > > > +     init_waitqueue_head(&wsm->ws_wait);
> > > > +}
> > > > +
> > > > +static void add_initial_workspace(struct workspace_manager *wsm,
> > > > +                               struct crush_work *work)
> > > > +{
> > > > +     WARN_ON(!list_empty(&wsm->idle_ws));
> > > > +
> > > > +     list_add(&work->item, &wsm->idle_ws);
> > > > +     atomic_set(&wsm->total_ws, 1);
> > > > +     wsm->free_ws = 1;
> > > > +}
> > > > +
> > > > +static void cleanup_workspace_manager(struct workspace_manager *wsm)
> > > > +{
> > > > +     struct crush_work *work;
> > > > +
> > > > +     while (!list_empty(&wsm->idle_ws)) {
> > > > +             work = list_first_entry(&wsm->idle_ws, struct crush_work,
> > > > +                                     item);
> > > > +             list_del_init(&work->item);
> > > > +             free_workspace(work);
> > > > +     }
> > > > +     atomic_set(&wsm->total_ws, 0);
> > > > +     wsm->free_ws = 0;
> > > > +}
> > > > +
> > > > +/*
> > > > + * Finds an available workspace or allocates a new one.  If it's not
> > > > + * possible to allocate a new one, waits until there is one.
> > > > + */
> > > > +static struct crush_work *get_workspace(struct workspace_manager *wsm,
> > > > +                                     const struct crush_map *c)
> > > > +{
> > > > +     struct crush_work *work;
> > > > +     int cpus = num_online_cpus();
> > > > +
> > > > +again:
> > > > +     spin_lock(&wsm->ws_lock);
> > > > +     if (!list_empty(&wsm->idle_ws)) {
> > > > +             work = list_first_entry(&wsm->idle_ws, struct crush_work,
> > > > +                                     item);
> > > > +             list_del_init(&work->item);
> > > > +             wsm->free_ws--;
> > > > +             spin_unlock(&wsm->ws_lock);
> > > > +             return work;
> > > > +
> > > > +     }
> > > > +     if (atomic_read(&wsm->total_ws) > cpus) {
> > > > +             DEFINE_WAIT(wait);
> > > > +
> > > > +             spin_unlock(&wsm->ws_lock);
> > > > +             prepare_to_wait(&wsm->ws_wait, &wait, TASK_UNINTERRUPTIBLE);
> > > > +             if (atomic_read(&wsm->total_ws) > cpus && !wsm->free_ws)
> > > > +                     schedule();
> > > > +             finish_wait(&wsm->ws_wait, &wait);
> > > > +             goto again;
> > > > +     }
> > > > +     atomic_inc(&wsm->total_ws);
> > > > +     spin_unlock(&wsm->ws_lock);
> > > > +
> > > > +     work = alloc_workspace(c);
> > > > +     if (!work) {
> > > > +             atomic_dec(&wsm->total_ws);
> > > > +             wake_up(&wsm->ws_wait);
> > > > +
> > > > +             /*
> > > > +              * Do not return the error but go back to waiting.  We
> > > > +              * have the inital workspace and the CRUSH computation
> > > > +              * time is bounded so we will get it eventually.
> > > > +              */
> > > > +             WARN_ON(atomic_read(&wsm->total_ws) < 1);
> > > > +             goto again;
> > > > +     }
> > > > +     return work;
> > > > +}
> > > > +
> > > > +/*
> > > > + * Puts a workspace back on the list or frees it if we have enough
> > > > + * idle ones sitting around.
> > > > + */
> > > > +static void put_workspace(struct workspace_manager *wsm,
> > > > +                       struct crush_work *work)
> > > > +{
> > > > +     spin_lock(&wsm->ws_lock);
> > > > +     if (wsm->free_ws <= num_online_cpus()) {
> > > > +             list_add(&work->item, &wsm->idle_ws);
> > > > +             wsm->free_ws++;
> > > > +             spin_unlock(&wsm->ws_lock);
> > > > +             goto wake;
> > > > +     }
> > > > +     spin_unlock(&wsm->ws_lock);
> > > > +
> > > > +     free_workspace(work);
> > > > +     atomic_dec(&wsm->total_ws);
> > > > +wake:
> > > > +     if (wq_has_sleeper(&wsm->ws_wait))
> > > > +             wake_up(&wsm->ws_wait);
> > >
> > > Is this racy? Could you end up missing a wakeup because something began
> > > waiting between the check and wake_up? This is not being checked under
> > > any sort of lock, so you could end up preempted between the two.
> > >
> > > It might be better to just unconditionally call wake_up. In principle,
> > > that should be a no-op if there is nothing waiting.
> >
> > Yeah, it raised my eyebrows as well (and !wsm->free_ws test without
> > a lock in get_workspace() too).  This is a straight up cut-and-paste
> > from Btrfs through, so I decided to leave it as is.  This is pretty
> > old code and it survived there for over ten years, although I'm not
> > sure how many people actually enable compression in the field.
> >
> > If we hit issues, I'd just reimplement the whole thing from scratch
> > rather than attempting to tweak a cut-and-paste that is not entirely
> > straightforward.
>
> Yeah, the free_ws handling looks quite suspicious. At least total_ws is
> atomic...

Yeah, and if not for this suspicious test, free_ws wouldn't be
needed at all.

>
> In practice, I often find that problems like this only become evident
> when you start running on more exotic arches with different caching
> behavior. Hopefully it won't be an issue, but if it ever is it'll
> probably be quite hard to reproduce without some sort of fault
> injection.

I figured that if we ever encounter a hang in get_workspace(),
it would be obvious and wouldn't need reproducing.  Perhaps we
shouldn't wait for that to happen and redo this now, dunno...

Thanks,

                Ilya
diff mbox series

Patch

diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h
index 3f4498fef6ad..cad9acfbc320 100644
--- a/include/linux/ceph/osdmap.h
+++ b/include/linux/ceph/osdmap.h
@@ -137,6 +137,17 @@  int ceph_oid_aprintf(struct ceph_object_id *oid, gfp_t gfp,
 		     const char *fmt, ...);
 void ceph_oid_destroy(struct ceph_object_id *oid);
 
+struct workspace_manager {
+	struct list_head idle_ws;
+	spinlock_t ws_lock;
+	/* Number of free workspaces */
+	int free_ws;
+	/* Total number of allocated workspaces */
+	atomic_t total_ws;
+	/* Waiters for a free workspace */
+	wait_queue_head_t ws_wait;
+};
+
 struct ceph_pg_mapping {
 	struct rb_node node;
 	struct ceph_pg pgid;
@@ -184,8 +195,7 @@  struct ceph_osdmap {
 	 * the list of osds that store+replicate them. */
 	struct crush_map *crush;
 
-	struct mutex crush_workspace_mutex;
-	void *crush_workspace;
+	struct workspace_manager crush_wsm;
 };
 
 static inline bool ceph_osd_exists(struct ceph_osdmap *map, int osd)
diff --git a/include/linux/crush/crush.h b/include/linux/crush/crush.h
index 2f811baf78d2..30dba392b730 100644
--- a/include/linux/crush/crush.h
+++ b/include/linux/crush/crush.h
@@ -346,6 +346,9 @@  struct crush_work_bucket {
 
 struct crush_work {
 	struct crush_work_bucket **work; /* Per-bucket working store */
+#ifdef __KERNEL__
+	struct list_head item;
+#endif
 };
 
 #ifdef __KERNEL__
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index 96c25f5e064a..fa08c15be0c0 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -964,6 +964,143 @@  static int decode_pool_names(void **p, void *end, struct ceph_osdmap *map)
 	return -EINVAL;
 }
 
+/*
+ * CRUSH workspaces
+ *
+ * workspace_manager framework borrowed from fs/btrfs/compression.c.
+ * Two simplifications: there is only one type of workspace and there
+ * is always at least one workspace.
+ */
+static struct crush_work *alloc_workspace(const struct crush_map *c)
+{
+	struct crush_work *work;
+	size_t work_size;
+
+	WARN_ON(!c->working_size);
+	work_size = crush_work_size(c, CEPH_PG_MAX_SIZE);
+	dout("%s work_size %zu bytes\n", __func__, work_size);
+
+	work = ceph_kvmalloc(work_size, GFP_NOIO);
+	if (!work)
+		return NULL;
+
+	INIT_LIST_HEAD(&work->item);
+	crush_init_workspace(c, work);
+	return work;
+}
+
+static void free_workspace(struct crush_work *work)
+{
+	WARN_ON(!list_empty(&work->item));
+	kvfree(work);
+}
+
+static void init_workspace_manager(struct workspace_manager *wsm)
+{
+	INIT_LIST_HEAD(&wsm->idle_ws);
+	spin_lock_init(&wsm->ws_lock);
+	atomic_set(&wsm->total_ws, 0);
+	wsm->free_ws = 0;
+	init_waitqueue_head(&wsm->ws_wait);
+}
+
+static void add_initial_workspace(struct workspace_manager *wsm,
+				  struct crush_work *work)
+{
+	WARN_ON(!list_empty(&wsm->idle_ws));
+
+	list_add(&work->item, &wsm->idle_ws);
+	atomic_set(&wsm->total_ws, 1);
+	wsm->free_ws = 1;
+}
+
+static void cleanup_workspace_manager(struct workspace_manager *wsm)
+{
+	struct crush_work *work;
+
+	while (!list_empty(&wsm->idle_ws)) {
+		work = list_first_entry(&wsm->idle_ws, struct crush_work,
+					item);
+		list_del_init(&work->item);
+		free_workspace(work);
+	}
+	atomic_set(&wsm->total_ws, 0);
+	wsm->free_ws = 0;
+}
+
+/*
+ * Finds an available workspace or allocates a new one.  If it's not
+ * possible to allocate a new one, waits until there is one.
+ */
+static struct crush_work *get_workspace(struct workspace_manager *wsm,
+					const struct crush_map *c)
+{
+	struct crush_work *work;
+	int cpus = num_online_cpus();
+
+again:
+	spin_lock(&wsm->ws_lock);
+	if (!list_empty(&wsm->idle_ws)) {
+		work = list_first_entry(&wsm->idle_ws, struct crush_work,
+					item);
+		list_del_init(&work->item);
+		wsm->free_ws--;
+		spin_unlock(&wsm->ws_lock);
+		return work;
+
+	}
+	if (atomic_read(&wsm->total_ws) > cpus) {
+		DEFINE_WAIT(wait);
+
+		spin_unlock(&wsm->ws_lock);
+		prepare_to_wait(&wsm->ws_wait, &wait, TASK_UNINTERRUPTIBLE);
+		if (atomic_read(&wsm->total_ws) > cpus && !wsm->free_ws)
+			schedule();
+		finish_wait(&wsm->ws_wait, &wait);
+		goto again;
+	}
+	atomic_inc(&wsm->total_ws);
+	spin_unlock(&wsm->ws_lock);
+
+	work = alloc_workspace(c);
+	if (!work) {
+		atomic_dec(&wsm->total_ws);
+		wake_up(&wsm->ws_wait);
+
+		/*
+		 * Do not return the error but go back to waiting.  We
+		 * have the inital workspace and the CRUSH computation
+		 * time is bounded so we will get it eventually.
+		 */
+		WARN_ON(atomic_read(&wsm->total_ws) < 1);
+		goto again;
+	}
+	return work;
+}
+
+/*
+ * Puts a workspace back on the list or frees it if we have enough
+ * idle ones sitting around.
+ */
+static void put_workspace(struct workspace_manager *wsm,
+			  struct crush_work *work)
+{
+	spin_lock(&wsm->ws_lock);
+	if (wsm->free_ws <= num_online_cpus()) {
+		list_add(&work->item, &wsm->idle_ws);
+		wsm->free_ws++;
+		spin_unlock(&wsm->ws_lock);
+		goto wake;
+	}
+	spin_unlock(&wsm->ws_lock);
+
+	free_workspace(work);
+	atomic_dec(&wsm->total_ws);
+wake:
+	if (wq_has_sleeper(&wsm->ws_wait))
+		wake_up(&wsm->ws_wait);
+}
+
 /*
  * osd map
  */
@@ -981,7 +1118,8 @@  struct ceph_osdmap *ceph_osdmap_alloc(void)
 	map->primary_temp = RB_ROOT;
 	map->pg_upmap = RB_ROOT;
 	map->pg_upmap_items = RB_ROOT;
-	mutex_init(&map->crush_workspace_mutex);
+
+	init_workspace_manager(&map->crush_wsm);
 
 	return map;
 }
@@ -989,8 +1127,11 @@  struct ceph_osdmap *ceph_osdmap_alloc(void)
 void ceph_osdmap_destroy(struct ceph_osdmap *map)
 {
 	dout("osdmap_destroy %p\n", map);
+
 	if (map->crush)
 		crush_destroy(map->crush);
+	cleanup_workspace_manager(&map->crush_wsm);
+
 	while (!RB_EMPTY_ROOT(&map->pg_temp)) {
 		struct ceph_pg_mapping *pg =
 			rb_entry(rb_first(&map->pg_temp),
@@ -1029,7 +1170,6 @@  void ceph_osdmap_destroy(struct ceph_osdmap *map)
 	kvfree(map->osd_weight);
 	kvfree(map->osd_addr);
 	kvfree(map->osd_primary_affinity);
-	kvfree(map->crush_workspace);
 	kfree(map);
 }
 
@@ -1104,26 +1244,22 @@  static int osdmap_set_max_osd(struct ceph_osdmap *map, u32 max)
 
 static int osdmap_set_crush(struct ceph_osdmap *map, struct crush_map *crush)
 {
-	void *workspace;
-	size_t work_size;
+	struct crush_work *work;
 
 	if (IS_ERR(crush))
 		return PTR_ERR(crush);
 
-	work_size = crush_work_size(crush, CEPH_PG_MAX_SIZE);
-	dout("%s work_size %zu bytes\n", __func__, work_size);
-	workspace = ceph_kvmalloc(work_size, GFP_NOIO);
-	if (!workspace) {
+	work = alloc_workspace(crush);
+	if (!work) {
 		crush_destroy(crush);
 		return -ENOMEM;
 	}
-	crush_init_workspace(crush, workspace);
 
 	if (map->crush)
 		crush_destroy(map->crush);
-	kvfree(map->crush_workspace);
+	cleanup_workspace_manager(&map->crush_wsm);
 	map->crush = crush;
-	map->crush_workspace = workspace;
+	add_initial_workspace(&map->crush_wsm, work);
 	return 0;
 }
 
@@ -2322,6 +2458,7 @@  static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
 		    s64 choose_args_index)
 {
 	struct crush_choose_arg_map *arg_map;
+	struct crush_work *work;
 	int r;
 
 	BUG_ON(result_max > CEPH_PG_MAX_SIZE);
@@ -2332,12 +2469,11 @@  static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
 		arg_map = lookup_choose_arg_map(&map->crush->choose_args,
 						CEPH_DEFAULT_CHOOSE_ARGS);
 
-	mutex_lock(&map->crush_workspace_mutex);
+	work = get_workspace(&map->crush_wsm, map->crush);
 	r = crush_do_rule(map->crush, ruleno, x, result, result_max,
-			  weight, weight_max, map->crush_workspace,
+			  weight, weight_max, work,
 			  arg_map ? arg_map->args : NULL);
-	mutex_unlock(&map->crush_workspace_mutex);
-
+	put_workspace(&map->crush_wsm, work);
 	return r;
 }