Introduce cgroup aware unbounded worker pools. Whenever a new worker thread is created, create_worker attaches itself to the cgroups of the task that called alloc_workqueue(). New worker pools are created if there's no match in the global list of cgroup aware worker pools. Signed-off-by: Bandan Das <bsd@xxxxxxxxxx> --- include/linux/workqueue.h | 2 + kernel/workqueue.c | 212 +++++++++++++++++++++++++++++++++++++++++--- kernel/workqueue_internal.h | 4 + 3 files changed, 204 insertions(+), 14 deletions(-) diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h index ca73c50..7afb72d 100644 --- a/include/linux/workqueue.h +++ b/include/linux/workqueue.h @@ -131,6 +131,7 @@ struct workqueue_attrs { int nice; /* nice level */ cpumask_var_t cpumask; /* allowed CPUs */ bool no_numa; /* disable NUMA affinity */ + bool cg_enabled; /* cgroups aware */ }; static inline struct delayed_work *to_delayed_work(struct work_struct *work) @@ -308,6 +309,7 @@ enum { * http://thread.gmane.org/gmane.linux.kernel/1480396 */ WQ_POWER_EFFICIENT = 1 << 7, + WQ_CGROUPS = 1 << 8, __WQ_DRAINING = 1 << 16, /* internal: workqueue is draining */ __WQ_ORDERED = 1 << 17, /* internal: workqueue is ordered */ diff --git a/kernel/workqueue.c b/kernel/workqueue.c index 7ff5dc7..f052d85 100644 --- a/kernel/workqueue.c +++ b/kernel/workqueue.c @@ -48,6 +48,7 @@ #include <linux/nodemask.h> #include <linux/moduleparam.h> #include <linux/uaccess.h> +#include <linux/cgroup.h> #include "workqueue_internal.h" @@ -139,8 +140,18 @@ enum { * MD: wq_mayday_lock protected. */ +/* + * list of tasks that "own" the cgroups that + * this pool is attached to + */ +struct cgroup_owners { + struct task_struct *owner; + struct list_head link; +}; + /* struct worker is defined in workqueue_internal.h */ + struct worker_pool { spinlock_t lock; /* the pool lock */ int cpu; /* I: the associated cpu */ @@ -169,6 +180,8 @@ struct worker_pool { struct worker *manager; /* L: purely informational */ struct mutex attach_mutex; /* attach/detach exclusion */ struct list_head workers; /* A: attached workers */ + struct list_head cg_owners; /* tasks using this pool*/ + struct list_head unbound_node; /* all cgroup aware pools */ struct completion *detach_completion; /* all workers detached */ struct ida worker_ida; /* worker IDs for task name */ @@ -219,6 +232,7 @@ struct pool_workqueue { */ struct work_struct unbound_release_work; struct rcu_head rcu; + struct task_struct *owner; /*for cgroups */ } __aligned(1 << WORK_STRUCT_FLAG_BITS); /* @@ -299,6 +313,7 @@ static DEFINE_MUTEX(wq_pool_mutex); /* protects pools and workqueues list */ static DEFINE_SPINLOCK(wq_mayday_lock); /* protects wq->maydays list */ static LIST_HEAD(workqueues); /* PR: list of all workqueues */ +static LIST_HEAD(unbound_cgpool); /* list of cgroup aware worker pools */ static bool workqueue_freezing; /* PL: have wqs started freezing? */ /* PL: allowable cpus for unbound wqs and work items */ @@ -425,6 +440,12 @@ static void workqueue_sysfs_unregister(struct workqueue_struct *wq); if (({ assert_rcu_or_wq_mutex(wq); false; })) { } \ else +#define for_each_unbound_cgpool(pool) \ + list_for_each_entry_rcu((pool), &(unbound_cgpool), unbound_node) + +#define for_each_task_cgpool(cgtask, pool) \ + list_for_each_entry_rcu((cgtask), &(pool)->cg_owners, link) + #ifdef CONFIG_DEBUG_OBJECTS_WORK static struct debug_obj_descr work_debug_descr; @@ -700,6 +721,7 @@ static struct pool_workqueue *get_work_pwq(struct work_struct *work) * * Return: The worker_pool @work was last associated with. %NULL if none. */ + static struct worker_pool *get_work_pool(struct work_struct *work) { unsigned long data = atomic_long_read(&work->data); @@ -757,6 +779,7 @@ static bool work_is_canceling(struct work_struct *work) * they're being called with pool->lock held. */ + static bool __need_more_worker(struct worker_pool *pool) { return !atomic_read(&pool->nr_running); @@ -1072,6 +1095,7 @@ static void get_pwq(struct pool_workqueue *pwq) static void put_pwq(struct pool_workqueue *pwq) { lockdep_assert_held(&pwq->pool->lock); + if (likely(--pwq->refcnt)) return; if (WARN_ON_ONCE(!(pwq->wq->flags & WQ_UNBOUND))) @@ -1387,6 +1411,9 @@ retry: /* pwq which will be used unless @work is executing elsewhere */ if (!(wq->flags & WQ_UNBOUND)) pwq = per_cpu_ptr(wq->cpu_pwqs, cpu); + else if (wq->flags & WQ_CGROUPS) + /* use the default pwq */ + pwq = unbound_pwq_by_node(wq, NUMA_NO_NODE); else pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu)); @@ -1674,6 +1701,8 @@ static struct worker *alloc_worker(int node) /* on creation a worker is in !idle && prep state */ worker->flags = WORKER_PREP; } + worker->attach_pending = false; + worker->attach_to = NULL; return worker; } @@ -1695,7 +1724,8 @@ static void worker_attach_to_pool(struct worker *worker, * set_cpus_allowed_ptr() will fail if the cpumask doesn't have any * online CPUs. It'll be re-applied when any of the CPUs come up. */ - set_cpus_allowed_ptr(worker->task, pool->attrs->cpumask); + if (!pool->attrs->cg_enabled) + set_cpus_allowed_ptr(worker->task, pool->attrs->cpumask); /* * The pool->attach_mutex ensures %POOL_DISASSOCIATED remains @@ -1760,6 +1790,7 @@ static struct worker *create_worker(struct worker_pool *pool) if (id < 0) goto fail; + /* Note: if user specified cgroups, node is NUMA_NO_NODE */ worker = alloc_worker(pool->node); if (!worker) goto fail; @@ -1779,7 +1810,11 @@ static struct worker *create_worker(struct worker_pool *pool) goto fail; set_user_nice(worker->task, pool->attrs->nice); - kthread_bind_mask(worker->task, pool->attrs->cpumask); + if (pool->attrs->cg_enabled) { + worker->attach_pending = true; + worker->attach_to = current; + } else + kthread_bind_mask(worker->task, pool->attrs->cpumask); /* successful, attach the worker to the pool */ worker_attach_to_pool(worker, pool); @@ -2172,6 +2207,7 @@ static int worker_thread(void *__worker) { struct worker *worker = __worker; struct worker_pool *pool = worker->pool; + int cgattach; /* tell the scheduler that this is a workqueue worker */ worker->task->flags |= PF_WQ_WORKER; @@ -2191,6 +2227,14 @@ woke_up: return 0; } + /* this is supposed to run only the first time to attach to cgroups */ + if (worker->attach_pending) { + cgattach = cgroup_attach_task_all(worker->attach_to, current); + if (cgattach) + pr_warn("workqueue: worker cgroup attach failed but we will still run!"); + worker->attach_pending = false; + } + worker_leave_idle(worker); recheck: /* no more worker necessary? */ @@ -3181,6 +3225,7 @@ static int init_worker_pool(struct worker_pool *pool) pool->watchdog_ts = jiffies; INIT_LIST_HEAD(&pool->worklist); INIT_LIST_HEAD(&pool->idle_list); + INIT_LIST_HEAD(&pool->cg_owners); hash_init(pool->busy_hash); init_timer_deferrable(&pool->idle_timer); @@ -3251,13 +3296,22 @@ static void put_unbound_pool(struct worker_pool *pool) /* sanity checks */ if (WARN_ON(!(pool->cpu < 0)) || - WARN_ON(!list_empty(&pool->worklist))) + WARN_ON(!list_empty(&pool->worklist)) || + WARN_ON(!list_empty(&pool->cg_owners))) return; /* release id and unhash */ if (pool->id >= 0) idr_remove(&worker_pool_idr, pool->id); - hash_del(&pool->hash_node); + + /* + * this pool is going down, so remove from the list of + * cgroup aware pools + */ + if (pool->attrs->cg_enabled) + list_del(&pool->unbound_node); + else + hash_del(&pool->hash_node); /* * Become the manager and destroy all workers. Grabbing @@ -3290,6 +3344,65 @@ static void put_unbound_pool(struct worker_pool *pool) call_rcu_sched(&pool->rcu, rcu_free_pool); } +static void remove_task_cgpool(struct worker_pool *pool, + struct task_struct *tsk) +{ + struct cgroup_owners *iter; + + if (pool->attrs->cg_enabled) { + for_each_task_cgpool(iter, pool) { + if (iter->owner == tsk) { + list_del(&iter->link); + break; + } + } + } +} + +static bool attach_task_cgpool(struct worker_pool *pool, + struct task_struct *tsk) +{ + bool result = true; + struct cgroup_owners *entry = kzalloc(sizeof(*entry), GFP_KERNEL); + + if (!entry) { + result = false; + goto done; + } + + entry->owner = tsk; + list_add_tail(&entry->link, &pool->cg_owners); + +done: + return result; +} + +static struct worker_pool *find_cg_matching_pool(struct task_struct *tsk) +{ + struct worker_pool *pool = NULL, *iter; + bool found = false; + + for_each_unbound_cgpool(iter) { + struct cgroup_owners *cgtask; + + for_each_task_cgpool(cgtask, iter) { + if (cgtask->owner == tsk || + cgroup_match_groups(cgtask->owner, tsk)) { + found = true; + break; + } + } + + if (found) { + pool = iter; + pool->refcnt++; + break; + } + } + + return pool; +} + /** * get_unbound_pool - get a worker_pool with the specified attributes * @attrs: the attributes of the worker_pool to get @@ -3310,9 +3423,19 @@ static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs) struct worker_pool *pool; int node; int target_node = NUMA_NO_NODE; + bool cgroups_enabled = attrs->cg_enabled; lockdep_assert_held(&wq_pool_mutex); + if (cgroups_enabled) { + /* "current" is the owner */ + pool = find_cg_matching_pool(current); + if (!pool) + goto create; + else + return pool; + } + /* do we already have a matching pool? */ hash_for_each_possible(unbound_pool_hash, pool, hash_node, hash) { if (wqattrs_equal(pool->attrs, attrs)) { @@ -3332,6 +3455,8 @@ static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs) } } + +create: /* nope, create a new one */ pool = kzalloc_node(sizeof(*pool), GFP_KERNEL, target_node); if (!pool || init_worker_pool(pool) < 0) @@ -3347,6 +3472,9 @@ static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs) */ pool->attrs->no_numa = false; + if (cgroups_enabled) + pool->attrs->cg_enabled = true; + if (worker_pool_assign_id(pool) < 0) goto fail; @@ -3355,7 +3483,10 @@ static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs) goto fail; /* install */ - hash_add(unbound_pool_hash, &pool->hash_node, hash); + if (cgroups_enabled) + list_add_tail(&pool->unbound_node, &unbound_cgpool); + else + hash_add(unbound_pool_hash, &pool->hash_node, hash); return pool; fail: @@ -3390,6 +3521,8 @@ static void pwq_unbound_release_workfn(struct work_struct *work) is_last = list_empty(&wq->pwqs); mutex_unlock(&wq->mutex); + remove_task_cgpool(pool, pwq->owner); + mutex_lock(&wq_pool_mutex); put_unbound_pool(pool); mutex_unlock(&wq_pool_mutex); @@ -3462,6 +3595,11 @@ static void init_pwq(struct pool_workqueue *pwq, struct workqueue_struct *wq, pwq->wq = wq; pwq->flush_color = -1; pwq->refcnt = 1; + if (pool->attrs->cg_enabled) { + /* Add the current task to pool cg_owners */ + WARN_ON(!attach_task_cgpool(pool, current)); + pwq->owner = current; + } INIT_LIST_HEAD(&pwq->delayed_works); INIT_LIST_HEAD(&pwq->pwqs_node); INIT_LIST_HEAD(&pwq->mayday_node); @@ -3502,7 +3640,11 @@ static struct pool_workqueue *alloc_unbound_pwq(struct workqueue_struct *wq, if (!pool) return NULL; - pwq = kmem_cache_alloc_node(pwq_cache, GFP_KERNEL, pool->node); + if (wq->unbound_attrs->cg_enabled) + pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL); + else + pwq = kmem_cache_alloc_node(pwq_cache, GFP_KERNEL, pool->node); + if (!pwq) { put_unbound_pool(pool); return NULL; @@ -3590,8 +3732,10 @@ static void apply_wqattrs_cleanup(struct apply_wqattrs_ctx *ctx) if (ctx) { int node; - for_each_node(node) - put_pwq_unlocked(ctx->pwq_tbl[node]); + if (ctx->attrs->cg_enabled) { + for_each_node(node) + put_pwq_unlocked(ctx->pwq_tbl[node]); + } put_pwq_unlocked(ctx->dfl_pwq); free_workqueue_attrs(ctx->attrs); @@ -3607,11 +3751,14 @@ apply_wqattrs_prepare(struct workqueue_struct *wq, { struct apply_wqattrs_ctx *ctx; struct workqueue_attrs *new_attrs, *tmp_attrs; - int node; + int node, numa_nodes = nr_node_ids; + bool cgroups_enabled = wq->unbound_attrs->cg_enabled; lockdep_assert_held(&wq_pool_mutex); - ctx = kzalloc(sizeof(*ctx) + nr_node_ids * sizeof(ctx->pwq_tbl[0]), + if (cgroups_enabled) + numa_nodes = 0; + ctx = kzalloc(sizeof(*ctx) + numa_nodes * sizeof(ctx->pwq_tbl[0]), GFP_KERNEL); new_attrs = alloc_workqueue_attrs(GFP_KERNEL); @@ -3623,6 +3770,7 @@ apply_wqattrs_prepare(struct workqueue_struct *wq, * Calculate the attrs of the default pwq. * If the user configured cpumask doesn't overlap with the * wq_unbound_cpumask, we fallback to the wq_unbound_cpumask. + * This does not copy attrs->cg_enabled */ copy_workqueue_attrs(new_attrs, attrs); cpumask_and(new_attrs->cpumask, new_attrs->cpumask, wq_unbound_cpumask); @@ -3640,11 +3788,16 @@ apply_wqattrs_prepare(struct workqueue_struct *wq, * If something goes wrong during CPU up/down, we'll fall back to * the default pwq covering whole @attrs->cpumask. Always create * it even if we don't use it immediately. + * For cgroups aware wqs, there will be on only one pwq */ + new_attrs->cg_enabled = cgroups_enabled; ctx->dfl_pwq = alloc_unbound_pwq(wq, new_attrs); if (!ctx->dfl_pwq) goto out_free; + if (cgroups_enabled) + goto done; + for_each_node(node) { if (wq_calc_node_cpumask(new_attrs, node, -1, tmp_attrs->cpumask)) { ctx->pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs); @@ -3656,8 +3809,10 @@ apply_wqattrs_prepare(struct workqueue_struct *wq, } } +done: /* save the user configured attrs and sanitize it. */ copy_workqueue_attrs(new_attrs, attrs); + /* at this point, note that cg_enabled is untouched */ cpumask_and(new_attrs->cpumask, new_attrs->cpumask, cpu_possible_mask); ctx->attrs = new_attrs; @@ -3676,16 +3831,23 @@ out_free: static void apply_wqattrs_commit(struct apply_wqattrs_ctx *ctx) { int node; + bool cgroups_enabled = ctx->wq->unbound_attrs->cg_enabled; /* all pwqs have been created successfully, let's install'em */ mutex_lock(&ctx->wq->mutex); copy_workqueue_attrs(ctx->wq->unbound_attrs, ctx->attrs); - /* save the previous pwq and install the new one */ + /* + * save the previous pwq and install the new one + * if WQ_CGROUPS is set, then we don't allocate space for pwq_tbl at all + * so in that case, only dfl_pwq is valid + */ + if (!cgroups_enabled) { for_each_node(node) ctx->pwq_tbl[node] = numa_pwq_tbl_install(ctx->wq, node, ctx->pwq_tbl[node]); + } /* @dfl_pwq might not have been used, ensure it's linked */ link_pwq(ctx->dfl_pwq); @@ -3882,6 +4044,7 @@ static int alloc_and_link_pwqs(struct workqueue_struct *wq) static int wq_clamp_max_active(int max_active, unsigned int flags, const char *name) { + /* Determine max for cgroups ? */ int lim = flags & WQ_UNBOUND ? WQ_UNBOUND_MAX_ACTIVE : WQ_MAX_ACTIVE; if (max_active < 1 || max_active > lim) @@ -3901,14 +4064,30 @@ struct workqueue_struct *__alloc_workqueue_key(const char *fmt, va_list args; struct workqueue_struct *wq; struct pool_workqueue *pwq; + bool cgroups_enabled = false; + +#ifdef CONFIG_CGROUPS + /* Only unbound workqueues but not ordered */ + if ((flags & WQ_CGROUPS) && (flags & WQ_UNBOUND) && + !(flags & __WQ_ORDERED)) + cgroups_enabled = true; +#endif /* see the comment above the definition of WQ_POWER_EFFICIENT */ - if ((flags & WQ_POWER_EFFICIENT) && wq_power_efficient) + if ((flags & WQ_POWER_EFFICIENT) && wq_power_efficient) { flags |= WQ_UNBOUND; + if (cgroups_enabled) { + pr_warn("workqueue: disabling cgroups because WQ_POWER_EFFICIENT specified"); + cgroups_enabled = false; + } + } /* allocate wq and format name */ - if (flags & WQ_UNBOUND) - tbl_size = nr_node_ids * sizeof(wq->numa_pwq_tbl[0]); + if (flags & WQ_UNBOUND) { + if (!cgroups_enabled) + tbl_size = nr_node_ids * sizeof(wq->numa_pwq_tbl[0]); + /* else let cgroups take care of us */ + } wq = kzalloc(sizeof(*wq) + tbl_size, GFP_KERNEL); if (!wq) @@ -3918,6 +4097,8 @@ struct workqueue_struct *__alloc_workqueue_key(const char *fmt, wq->unbound_attrs = alloc_workqueue_attrs(GFP_KERNEL); if (!wq->unbound_attrs) goto err_free_wq; + if (cgroups_enabled) + wq->unbound_attrs->cg_enabled = true; } va_start(args, lock_name); @@ -4980,6 +5161,9 @@ static ssize_t wq_pool_ids_show(struct device *dev, const char *delim = ""; int node, written = 0; + if (wq->unbound_attrs->cg_enabled) + return 0; + rcu_read_lock_sched(); for_each_node(node) { written += scnprintf(buf + written, PAGE_SIZE - written, diff --git a/kernel/workqueue_internal.h b/kernel/workqueue_internal.h index 4521587..49228cab 100644 --- a/kernel/workqueue_internal.h +++ b/kernel/workqueue_internal.h @@ -52,6 +52,10 @@ struct worker { /* used only by rescuers to point to the target workqueue */ struct workqueue_struct *rescue_wq; /* I: the workqueue to rescue */ + + /* for cgroups */ + bool attach_pending; + struct task_struct *attach_to; }; /** -- 2.5.0 -- To unsubscribe from this list: send the line "unsubscribe kvm" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html