[RFC PATCH 2/4] workqueue: introduce support for attaching to cgroups

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Introduce cgroup aware unbounded worker pools. Whenever a new worker thread is
created, create_worker attaches itself to the cgroups of the task that called
alloc_workqueue(). New worker pools are created if there's no match in the global
list of cgroup aware worker pools.

Signed-off-by: Bandan Das <bsd@xxxxxxxxxx>
---
 include/linux/workqueue.h   |   2 +
 kernel/workqueue.c          | 212 +++++++++++++++++++++++++++++++++++++++++---
 kernel/workqueue_internal.h |   4 +
 3 files changed, 204 insertions(+), 14 deletions(-)

diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index ca73c50..7afb72d 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -131,6 +131,7 @@ struct workqueue_attrs {
 	int			nice;		/* nice level */
 	cpumask_var_t		cpumask;	/* allowed CPUs */
 	bool			no_numa;	/* disable NUMA affinity */
+	bool                    cg_enabled;     /* cgroups aware */
 };
 
 static inline struct delayed_work *to_delayed_work(struct work_struct *work)
@@ -308,6 +309,7 @@ enum {
 	 * http://thread.gmane.org/gmane.linux.kernel/1480396
 	 */
 	WQ_POWER_EFFICIENT	= 1 << 7,
+	WQ_CGROUPS              = 1 << 8,
 
 	__WQ_DRAINING		= 1 << 16, /* internal: workqueue is draining */
 	__WQ_ORDERED		= 1 << 17, /* internal: workqueue is ordered */
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 7ff5dc7..f052d85 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -48,6 +48,7 @@
 #include <linux/nodemask.h>
 #include <linux/moduleparam.h>
 #include <linux/uaccess.h>
+#include <linux/cgroup.h>
 
 #include "workqueue_internal.h"
 
@@ -139,8 +140,18 @@ enum {
  * MD: wq_mayday_lock protected.
  */
 
+/*
+ * list of tasks that "own" the cgroups that
+ * this pool is attached to
+ */
+struct cgroup_owners {
+	struct task_struct *owner;
+	struct list_head    link;
+};
+
 /* struct worker is defined in workqueue_internal.h */
 
+
 struct worker_pool {
 	spinlock_t		lock;		/* the pool lock */
 	int			cpu;		/* I: the associated cpu */
@@ -169,6 +180,8 @@ struct worker_pool {
 	struct worker		*manager;	/* L: purely informational */
 	struct mutex		attach_mutex;	/* attach/detach exclusion */
 	struct list_head	workers;	/* A: attached workers */
+	struct list_head        cg_owners;      /* tasks using this pool*/
+	struct list_head        unbound_node;   /* all cgroup aware pools */
 	struct completion	*detach_completion; /* all workers detached */
 
 	struct ida		worker_ida;	/* worker IDs for task name */
@@ -219,6 +232,7 @@ struct pool_workqueue {
 	 */
 	struct work_struct	unbound_release_work;
 	struct rcu_head		rcu;
+	struct task_struct      *owner; /*for cgroups */
 } __aligned(1 << WORK_STRUCT_FLAG_BITS);
 
 /*
@@ -299,6 +313,7 @@ static DEFINE_MUTEX(wq_pool_mutex);	/* protects pools and workqueues list */
 static DEFINE_SPINLOCK(wq_mayday_lock);	/* protects wq->maydays list */
 
 static LIST_HEAD(workqueues);		/* PR: list of all workqueues */
+static LIST_HEAD(unbound_cgpool);       /* list of cgroup aware worker pools */
 static bool workqueue_freezing;		/* PL: have wqs started freezing? */
 
 /* PL: allowable cpus for unbound wqs and work items */
@@ -425,6 +440,12 @@ static void workqueue_sysfs_unregister(struct workqueue_struct *wq);
 		if (({ assert_rcu_or_wq_mutex(wq); false; })) { }	\
 		else
 
+#define for_each_unbound_cgpool(pool)					 \
+	list_for_each_entry_rcu((pool), &(unbound_cgpool), unbound_node)
+
+#define for_each_task_cgpool(cgtask, pool)				 \
+	list_for_each_entry_rcu((cgtask), &(pool)->cg_owners, link)
+
 #ifdef CONFIG_DEBUG_OBJECTS_WORK
 
 static struct debug_obj_descr work_debug_descr;
@@ -700,6 +721,7 @@ static struct pool_workqueue *get_work_pwq(struct work_struct *work)
  *
  * Return: The worker_pool @work was last associated with.  %NULL if none.
  */
+
 static struct worker_pool *get_work_pool(struct work_struct *work)
 {
 	unsigned long data = atomic_long_read(&work->data);
@@ -757,6 +779,7 @@ static bool work_is_canceling(struct work_struct *work)
  * they're being called with pool->lock held.
  */
 
+
 static bool __need_more_worker(struct worker_pool *pool)
 {
 	return !atomic_read(&pool->nr_running);
@@ -1072,6 +1095,7 @@ static void get_pwq(struct pool_workqueue *pwq)
 static void put_pwq(struct pool_workqueue *pwq)
 {
 	lockdep_assert_held(&pwq->pool->lock);
+
 	if (likely(--pwq->refcnt))
 		return;
 	if (WARN_ON_ONCE(!(pwq->wq->flags & WQ_UNBOUND)))
@@ -1387,6 +1411,9 @@ retry:
 	/* pwq which will be used unless @work is executing elsewhere */
 	if (!(wq->flags & WQ_UNBOUND))
 		pwq = per_cpu_ptr(wq->cpu_pwqs, cpu);
+	else if (wq->flags & WQ_CGROUPS)
+		/* use the default pwq */
+		pwq = unbound_pwq_by_node(wq, NUMA_NO_NODE);
 	else
 		pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu));
 
@@ -1674,6 +1701,8 @@ static struct worker *alloc_worker(int node)
 		/* on creation a worker is in !idle && prep state */
 		worker->flags = WORKER_PREP;
 	}
+	worker->attach_pending = false;
+	worker->attach_to = NULL;
 	return worker;
 }
 
@@ -1695,7 +1724,8 @@ static void worker_attach_to_pool(struct worker *worker,
 	 * set_cpus_allowed_ptr() will fail if the cpumask doesn't have any
 	 * online CPUs.  It'll be re-applied when any of the CPUs come up.
 	 */
-	set_cpus_allowed_ptr(worker->task, pool->attrs->cpumask);
+	if (!pool->attrs->cg_enabled)
+		set_cpus_allowed_ptr(worker->task, pool->attrs->cpumask);
 
 	/*
 	 * The pool->attach_mutex ensures %POOL_DISASSOCIATED remains
@@ -1760,6 +1790,7 @@ static struct worker *create_worker(struct worker_pool *pool)
 	if (id < 0)
 		goto fail;
 
+	/* Note: if user specified cgroups, node is NUMA_NO_NODE */
 	worker = alloc_worker(pool->node);
 	if (!worker)
 		goto fail;
@@ -1779,7 +1810,11 @@ static struct worker *create_worker(struct worker_pool *pool)
 		goto fail;
 
 	set_user_nice(worker->task, pool->attrs->nice);
-	kthread_bind_mask(worker->task, pool->attrs->cpumask);
+	if (pool->attrs->cg_enabled) {
+		worker->attach_pending = true;
+		worker->attach_to = current;
+	} else
+		kthread_bind_mask(worker->task, pool->attrs->cpumask);
 
 	/* successful, attach the worker to the pool */
 	worker_attach_to_pool(worker, pool);
@@ -2172,6 +2207,7 @@ static int worker_thread(void *__worker)
 {
 	struct worker *worker = __worker;
 	struct worker_pool *pool = worker->pool;
+	int cgattach;
 
 	/* tell the scheduler that this is a workqueue worker */
 	worker->task->flags |= PF_WQ_WORKER;
@@ -2191,6 +2227,14 @@ woke_up:
 		return 0;
 	}
 
+	/* this is supposed to run only the first time to attach to cgroups */
+	if (worker->attach_pending) {
+		cgattach = cgroup_attach_task_all(worker->attach_to, current);
+		if (cgattach)
+			pr_warn("workqueue: worker cgroup attach failed but we will still run!");
+		worker->attach_pending = false;
+	}
+
 	worker_leave_idle(worker);
 recheck:
 	/* no more worker necessary? */
@@ -3181,6 +3225,7 @@ static int init_worker_pool(struct worker_pool *pool)
 	pool->watchdog_ts = jiffies;
 	INIT_LIST_HEAD(&pool->worklist);
 	INIT_LIST_HEAD(&pool->idle_list);
+	INIT_LIST_HEAD(&pool->cg_owners);
 	hash_init(pool->busy_hash);
 
 	init_timer_deferrable(&pool->idle_timer);
@@ -3251,13 +3296,22 @@ static void put_unbound_pool(struct worker_pool *pool)
 
 	/* sanity checks */
 	if (WARN_ON(!(pool->cpu < 0)) ||
-	    WARN_ON(!list_empty(&pool->worklist)))
+	    WARN_ON(!list_empty(&pool->worklist)) ||
+	    WARN_ON(!list_empty(&pool->cg_owners)))
 		return;
 
 	/* release id and unhash */
 	if (pool->id >= 0)
 		idr_remove(&worker_pool_idr, pool->id);
-	hash_del(&pool->hash_node);
+
+	/*
+	 * this pool is going down, so remove from the list of
+	 * cgroup aware pools
+	 */
+	if (pool->attrs->cg_enabled)
+		list_del(&pool->unbound_node);
+	else
+		hash_del(&pool->hash_node);
 
 	/*
 	 * Become the manager and destroy all workers.  Grabbing
@@ -3290,6 +3344,65 @@ static void put_unbound_pool(struct worker_pool *pool)
 	call_rcu_sched(&pool->rcu, rcu_free_pool);
 }
 
+static void remove_task_cgpool(struct worker_pool *pool,
+			       struct task_struct *tsk)
+{
+	struct cgroup_owners *iter;
+
+	if (pool->attrs->cg_enabled) {
+		for_each_task_cgpool(iter, pool) {
+			if (iter->owner == tsk) {
+				list_del(&iter->link);
+				break;
+			}
+		}
+	}
+}
+
+static bool attach_task_cgpool(struct worker_pool *pool,
+			       struct task_struct *tsk)
+{
+	bool result = true;
+	struct cgroup_owners *entry = kzalloc(sizeof(*entry), GFP_KERNEL);
+
+	if (!entry) {
+		result = false;
+		goto done;
+	}
+
+	entry->owner = tsk;
+	list_add_tail(&entry->link, &pool->cg_owners);
+
+done:
+	return result;
+}
+
+static struct worker_pool *find_cg_matching_pool(struct task_struct *tsk)
+{
+	struct worker_pool *pool = NULL, *iter;
+	bool found = false;
+
+	for_each_unbound_cgpool(iter) {
+		struct cgroup_owners *cgtask;
+
+		for_each_task_cgpool(cgtask, iter) {
+			if (cgtask->owner == tsk ||
+			    cgroup_match_groups(cgtask->owner, tsk)) {
+				found = true;
+				break;
+			}
+		}
+
+		if (found) {
+			pool = iter;
+			pool->refcnt++;
+			break;
+		}
+	}
+
+	return pool;
+}
+
 /**
  * get_unbound_pool - get a worker_pool with the specified attributes
  * @attrs: the attributes of the worker_pool to get
@@ -3310,9 +3423,19 @@ static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs)
 	struct worker_pool *pool;
 	int node;
 	int target_node = NUMA_NO_NODE;
+	bool cgroups_enabled = attrs->cg_enabled;
 
 	lockdep_assert_held(&wq_pool_mutex);
 
+	if (cgroups_enabled) {
+		/* "current" is the owner */
+		pool = find_cg_matching_pool(current);
+		if (!pool)
+			goto create;
+		else
+			return pool;
+	}
+
 	/* do we already have a matching pool? */
 	hash_for_each_possible(unbound_pool_hash, pool, hash_node, hash) {
 		if (wqattrs_equal(pool->attrs, attrs)) {
@@ -3332,6 +3455,8 @@ static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs)
 		}
 	}
 
+
+create:
 	/* nope, create a new one */
 	pool = kzalloc_node(sizeof(*pool), GFP_KERNEL, target_node);
 	if (!pool || init_worker_pool(pool) < 0)
@@ -3347,6 +3472,9 @@ static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs)
 	 */
 	pool->attrs->no_numa = false;
 
+	if (cgroups_enabled)
+		pool->attrs->cg_enabled = true;
+
 	if (worker_pool_assign_id(pool) < 0)
 		goto fail;
 
@@ -3355,7 +3483,10 @@ static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs)
 		goto fail;
 
 	/* install */
-	hash_add(unbound_pool_hash, &pool->hash_node, hash);
+	if (cgroups_enabled)
+		list_add_tail(&pool->unbound_node, &unbound_cgpool);
+	else
+		hash_add(unbound_pool_hash, &pool->hash_node, hash);
 
 	return pool;
 fail:
@@ -3390,6 +3521,8 @@ static void pwq_unbound_release_workfn(struct work_struct *work)
 	is_last = list_empty(&wq->pwqs);
 	mutex_unlock(&wq->mutex);
 
+	remove_task_cgpool(pool, pwq->owner);
+
 	mutex_lock(&wq_pool_mutex);
 	put_unbound_pool(pool);
 	mutex_unlock(&wq_pool_mutex);
@@ -3462,6 +3595,11 @@ static void init_pwq(struct pool_workqueue *pwq, struct workqueue_struct *wq,
 	pwq->wq = wq;
 	pwq->flush_color = -1;
 	pwq->refcnt = 1;
+	if (pool->attrs->cg_enabled) {
+		/* Add the current task to pool cg_owners */
+		WARN_ON(!attach_task_cgpool(pool, current));
+		pwq->owner = current;
+	}
 	INIT_LIST_HEAD(&pwq->delayed_works);
 	INIT_LIST_HEAD(&pwq->pwqs_node);
 	INIT_LIST_HEAD(&pwq->mayday_node);
@@ -3502,7 +3640,11 @@ static struct pool_workqueue *alloc_unbound_pwq(struct workqueue_struct *wq,
 	if (!pool)
 		return NULL;
 
-	pwq = kmem_cache_alloc_node(pwq_cache, GFP_KERNEL, pool->node);
+	if (wq->unbound_attrs->cg_enabled)
+		pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL);
+	else
+		pwq = kmem_cache_alloc_node(pwq_cache, GFP_KERNEL, pool->node);
+
 	if (!pwq) {
 		put_unbound_pool(pool);
 		return NULL;
@@ -3590,8 +3732,10 @@ static void apply_wqattrs_cleanup(struct apply_wqattrs_ctx *ctx)
 	if (ctx) {
 		int node;
 
-		for_each_node(node)
-			put_pwq_unlocked(ctx->pwq_tbl[node]);
+		if (ctx->attrs->cg_enabled) {
+			for_each_node(node)
+				put_pwq_unlocked(ctx->pwq_tbl[node]);
+		}
 		put_pwq_unlocked(ctx->dfl_pwq);
 
 		free_workqueue_attrs(ctx->attrs);
@@ -3607,11 +3751,14 @@ apply_wqattrs_prepare(struct workqueue_struct *wq,
 {
 	struct apply_wqattrs_ctx *ctx;
 	struct workqueue_attrs *new_attrs, *tmp_attrs;
-	int node;
+	int node, numa_nodes = nr_node_ids;
+	bool cgroups_enabled = wq->unbound_attrs->cg_enabled;
 
 	lockdep_assert_held(&wq_pool_mutex);
 
-	ctx = kzalloc(sizeof(*ctx) + nr_node_ids * sizeof(ctx->pwq_tbl[0]),
+	if (cgroups_enabled)
+		numa_nodes = 0;
+	ctx = kzalloc(sizeof(*ctx) + numa_nodes * sizeof(ctx->pwq_tbl[0]),
 		      GFP_KERNEL);
 
 	new_attrs = alloc_workqueue_attrs(GFP_KERNEL);
@@ -3623,6 +3770,7 @@ apply_wqattrs_prepare(struct workqueue_struct *wq,
 	 * Calculate the attrs of the default pwq.
 	 * If the user configured cpumask doesn't overlap with the
 	 * wq_unbound_cpumask, we fallback to the wq_unbound_cpumask.
+	 * This does not copy attrs->cg_enabled
 	 */
 	copy_workqueue_attrs(new_attrs, attrs);
 	cpumask_and(new_attrs->cpumask, new_attrs->cpumask, wq_unbound_cpumask);
@@ -3640,11 +3788,16 @@ apply_wqattrs_prepare(struct workqueue_struct *wq,
 	 * If something goes wrong during CPU up/down, we'll fall back to
 	 * the default pwq covering whole @attrs->cpumask.  Always create
 	 * it even if we don't use it immediately.
+	 * For cgroups aware wqs, there will be on only one pwq
 	 */
+	new_attrs->cg_enabled = cgroups_enabled;
 	ctx->dfl_pwq = alloc_unbound_pwq(wq, new_attrs);
 	if (!ctx->dfl_pwq)
 		goto out_free;
 
+	if (cgroups_enabled)
+		goto done;
+
 	for_each_node(node) {
 		if (wq_calc_node_cpumask(new_attrs, node, -1, tmp_attrs->cpumask)) {
 			ctx->pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs);
@@ -3656,8 +3809,10 @@ apply_wqattrs_prepare(struct workqueue_struct *wq,
 		}
 	}
 
+done:
 	/* save the user configured attrs and sanitize it. */
 	copy_workqueue_attrs(new_attrs, attrs);
+	/* at this point, note that cg_enabled is untouched */
 	cpumask_and(new_attrs->cpumask, new_attrs->cpumask, cpu_possible_mask);
 	ctx->attrs = new_attrs;
 
@@ -3676,16 +3831,23 @@ out_free:
 static void apply_wqattrs_commit(struct apply_wqattrs_ctx *ctx)
 {
 	int node;
+	bool cgroups_enabled = ctx->wq->unbound_attrs->cg_enabled;
 
 	/* all pwqs have been created successfully, let's install'em */
 	mutex_lock(&ctx->wq->mutex);
 
 	copy_workqueue_attrs(ctx->wq->unbound_attrs, ctx->attrs);
 
-	/* save the previous pwq and install the new one */
+	/*
+	 * save the previous pwq and install the new one
+	 * if WQ_CGROUPS is set, then we don't allocate space for pwq_tbl at all
+	 * so in that case, only dfl_pwq is valid
+	 */
+	if (!cgroups_enabled) {
 	for_each_node(node)
 		ctx->pwq_tbl[node] = numa_pwq_tbl_install(ctx->wq, node,
 							  ctx->pwq_tbl[node]);
+	}
 
 	/* @dfl_pwq might not have been used, ensure it's linked */
 	link_pwq(ctx->dfl_pwq);
@@ -3882,6 +4044,7 @@ static int alloc_and_link_pwqs(struct workqueue_struct *wq)
 static int wq_clamp_max_active(int max_active, unsigned int flags,
 			       const char *name)
 {
+	/* Determine max for cgroups ? */
 	int lim = flags & WQ_UNBOUND ? WQ_UNBOUND_MAX_ACTIVE : WQ_MAX_ACTIVE;
 
 	if (max_active < 1 || max_active > lim)
@@ -3901,14 +4064,30 @@ struct workqueue_struct *__alloc_workqueue_key(const char *fmt,
 	va_list args;
 	struct workqueue_struct *wq;
 	struct pool_workqueue *pwq;
+	bool cgroups_enabled = false;
+
+#ifdef CONFIG_CGROUPS
+	/* Only unbound workqueues but not ordered */
+	if ((flags & WQ_CGROUPS) && (flags & WQ_UNBOUND) &&
+	    !(flags & __WQ_ORDERED))
+		cgroups_enabled = true;
+#endif
 
 	/* see the comment above the definition of WQ_POWER_EFFICIENT */
-	if ((flags & WQ_POWER_EFFICIENT) && wq_power_efficient)
+	if ((flags & WQ_POWER_EFFICIENT) && wq_power_efficient) {
 		flags |= WQ_UNBOUND;
+		if (cgroups_enabled) {
+			pr_warn("workqueue: disabling cgroups because WQ_POWER_EFFICIENT specified");
+			cgroups_enabled = false;
+		}
+	}
 
 	/* allocate wq and format name */
-	if (flags & WQ_UNBOUND)
-		tbl_size = nr_node_ids * sizeof(wq->numa_pwq_tbl[0]);
+	if (flags & WQ_UNBOUND) {
+		if (!cgroups_enabled)
+			tbl_size = nr_node_ids * sizeof(wq->numa_pwq_tbl[0]);
+		/* else let cgroups take care of us */
+	}
 
 	wq = kzalloc(sizeof(*wq) + tbl_size, GFP_KERNEL);
 	if (!wq)
@@ -3918,6 +4097,8 @@ struct workqueue_struct *__alloc_workqueue_key(const char *fmt,
 		wq->unbound_attrs = alloc_workqueue_attrs(GFP_KERNEL);
 		if (!wq->unbound_attrs)
 			goto err_free_wq;
+		if (cgroups_enabled)
+			wq->unbound_attrs->cg_enabled = true;
 	}
 
 	va_start(args, lock_name);
@@ -4980,6 +5161,9 @@ static ssize_t wq_pool_ids_show(struct device *dev,
 	const char *delim = "";
 	int node, written = 0;
 
+	if (wq->unbound_attrs->cg_enabled)
+		return 0;
+
 	rcu_read_lock_sched();
 	for_each_node(node) {
 		written += scnprintf(buf + written, PAGE_SIZE - written,
diff --git a/kernel/workqueue_internal.h b/kernel/workqueue_internal.h
index 4521587..49228cab 100644
--- a/kernel/workqueue_internal.h
+++ b/kernel/workqueue_internal.h
@@ -52,6 +52,10 @@ struct worker {
 
 	/* used only by rescuers to point to the target workqueue */
 	struct workqueue_struct	*rescue_wq;	/* I: the workqueue to rescue */
+
+	/* for cgroups */
+	bool attach_pending;
+	struct task_struct *attach_to;
 };
 
 /**
-- 
2.5.0

--
To unsubscribe from this list: send the line "unsubscribe kvm" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html



[Index of Archives]     [KVM ARM]     [KVM ia64]     [KVM ppc]     [Virtualization Tools]     [Spice Development]     [Libvirt]     [Libvirt Users]     [Linux USB Devel]     [Linux Audio Users]     [Yosemite Questions]     [Linux Kernel]     [Linux SCSI]     [XFree86]
  Powered by Linux