When this idea came to me, I wonder which is better to maintain memcg's thread pool or add support in workqueue for generic use. In genral, I feel enhancing genric one is better...so, wrote this one. == This patch adds a new workqueue class as WQ_IDLEPRI. The worker thread for this workqueue will have SCHED_IDLE scheduling policy and don't use (too much) CPU if there are other active threads. IOW, unless the system is idle, work will not progress. Considering to schedule an asynchronous work which can be a help for reduce latency of applications, it's good to use idle time of the system. The CPU time which was used by application's context will be moved to fill idle time of the system. Applications can hide its latency by shifting cpu time for a work to be done in idle time. This will be used by memory cgroup to hide memory reclaim latency. I may miss something...any comments are welcomed. NOTE 1: SCHED_IDLE is just a lowest priority of SCHED_OTHER. NOTE 2: It may be better to add cond_resched() in worker thread somewhere.. but I couldn't find where is the best. Signed-off-by: KAMEZAWA Hiroyuki <kamezawa.hiroyu@xxxxxxxxxxxxxx> --- Documentation/workqueue.txt | 10 ++++ include/linux/workqueue.h | 8 ++- kernel/workqueue.c | 101 +++++++++++++++++++++++++++++++++----------- mm/memcontrol.c | 3 - 4 files changed, 93 insertions(+), 29 deletions(-) Index: memcg_async/include/linux/workqueue.h =================================================================== --- memcg_async.orig/include/linux/workqueue.h +++ memcg_async/include/linux/workqueue.h @@ -56,7 +56,8 @@ enum { /* special cpu IDs */ WORK_CPU_UNBOUND = NR_CPUS, - WORK_CPU_NONE = NR_CPUS + 1, + WORK_CPU_IDLEPRI = NR_CPUS + 1, + WORK_CPU_NONE = NR_CPUS + 2, WORK_CPU_LAST = WORK_CPU_NONE, /* @@ -254,9 +255,10 @@ enum { WQ_MEM_RECLAIM = 1 << 3, /* may be used for memory reclaim */ WQ_HIGHPRI = 1 << 4, /* high priority */ WQ_CPU_INTENSIVE = 1 << 5, /* cpu instensive workqueue */ + WQ_IDLEPRI = 1 << 6, /* the lowest priority in scheduler*/ - WQ_DYING = 1 << 6, /* internal: workqueue is dying */ - WQ_RESCUER = 1 << 7, /* internal: workqueue has rescuer */ + WQ_DYING = 1 << 7, /* internal: workqueue is dying */ + WQ_RESCUER = 1 << 8, /* internal: workqueue has rescuer */ WQ_MAX_ACTIVE = 512, /* I like 512, better ideas? */ WQ_MAX_UNBOUND_PER_CPU = 4, /* 4 * #cpus for unbound wq */ Index: memcg_async/kernel/workqueue.c =================================================================== --- memcg_async.orig/kernel/workqueue.c +++ memcg_async/kernel/workqueue.c @@ -61,9 +61,11 @@ enum { WORKER_REBIND = 1 << 5, /* mom is home, come back */ WORKER_CPU_INTENSIVE = 1 << 6, /* cpu intensive */ WORKER_UNBOUND = 1 << 7, /* worker is unbound */ + WORKER_IDLEPRI = 1 << 8, WORKER_NOT_RUNNING = WORKER_PREP | WORKER_ROGUE | WORKER_REBIND | - WORKER_CPU_INTENSIVE | WORKER_UNBOUND, + WORKER_CPU_INTENSIVE | WORKER_UNBOUND | + WORKER_IDLEPRI, /* gcwq->trustee_state */ TRUSTEE_START = 0, /* start */ @@ -276,14 +278,25 @@ static inline int __next_gcwq_cpu(int cp } if (sw & 2) return WORK_CPU_UNBOUND; - } + if (sw & 4) + return WORK_CPU_IDLEPRI; + } else if (cpu == WORK_CPU_UNBOUND && (sw & 4)) + return WORK_CPU_IDLEPRI; return WORK_CPU_NONE; } static inline int __next_wq_cpu(int cpu, const struct cpumask *mask, struct workqueue_struct *wq) { - return __next_gcwq_cpu(cpu, mask, !(wq->flags & WQ_UNBOUND) ? 1 : 2); + int sw = 1; + + if (wq->flags & WQ_UNBOUND) { + if (!(wq->flags & WQ_IDLEPRI)) + sw = 2; + else + sw = 4; + } + return __next_gcwq_cpu(cpu, mask, sw); } /* @@ -294,20 +307,21 @@ static inline int __next_wq_cpu(int cpu, * specific CPU. The following iterators are similar to * for_each_*_cpu() iterators but also considers the unbound gcwq. * - * for_each_gcwq_cpu() : possible CPUs + WORK_CPU_UNBOUND - * for_each_online_gcwq_cpu() : online CPUs + WORK_CPU_UNBOUND - * for_each_cwq_cpu() : possible CPUs for bound workqueues, - * WORK_CPU_UNBOUND for unbound workqueues + * for_each_gcwq_cpu() : possible CPUs + WORK_CPU_UNBOUND + IDLEPRI + * for_each_online_gcwq_cpu() : online CPUs + WORK_CPU_UNBOUND + IDLEPRI + * for_each_cwq_cpu() : possible CPUs for bound workqueues, + * WORK_CPU_UNBOUND for unbound workqueues + * IDLEPRI for idle workqueues. */ #define for_each_gcwq_cpu(cpu) \ - for ((cpu) = __next_gcwq_cpu(-1, cpu_possible_mask, 3); \ + for ((cpu) = __next_gcwq_cpu(-1, cpu_possible_mask, 7); \ (cpu) < WORK_CPU_NONE; \ - (cpu) = __next_gcwq_cpu((cpu), cpu_possible_mask, 3)) + (cpu) = __next_gcwq_cpu((cpu), cpu_possible_mask, 7)) #define for_each_online_gcwq_cpu(cpu) \ - for ((cpu) = __next_gcwq_cpu(-1, cpu_online_mask, 3); \ + for ((cpu) = __next_gcwq_cpu(-1, cpu_online_mask, 7); \ (cpu) < WORK_CPU_NONE; \ - (cpu) = __next_gcwq_cpu((cpu), cpu_online_mask, 3)) + (cpu) = __next_gcwq_cpu((cpu), cpu_online_mask, 7)) #define for_each_cwq_cpu(cpu, wq) \ for ((cpu) = __next_wq_cpu(-1, cpu_possible_mask, (wq)); \ @@ -451,22 +465,34 @@ static DEFINE_PER_CPU_SHARED_ALIGNED(ato static struct global_cwq unbound_global_cwq; static atomic_t unbound_gcwq_nr_running = ATOMIC_INIT(0); /* always 0 */ +/* + * Global cpu workqueue and nr_running for idle gcwq. The idle gcwq is + * always online has GCWQ_DISASSOCIATED set. and all its worker have + * WORKER_UNBOUND and WORKER_IDLEPRI set. + */ +static struct global_cwq unbound_idle_global_cwq; +static atomic_t unbound_idle_gcwq_nr_running = ATOMIC_INIT(0); /* always 0 */ + static int worker_thread(void *__worker); static struct global_cwq *get_gcwq(unsigned int cpu) { - if (cpu != WORK_CPU_UNBOUND) + if (cpu < WORK_CPU_UNBOUND) return &per_cpu(global_cwq, cpu); - else + else if (cpu == WORK_CPU_UNBOUND) return &unbound_global_cwq; + else + return &unbound_idle_global_cwq; } static atomic_t *get_gcwq_nr_running(unsigned int cpu) { - if (cpu != WORK_CPU_UNBOUND) + if (cpu < WORK_CPU_UNBOUND) return &per_cpu(gcwq_nr_running, cpu); - else + else if (cpu == WORK_CPU_UNBOUND) return &unbound_gcwq_nr_running; + else + return &unbound_idle_gcwq_nr_running; } static struct cpu_workqueue_struct *get_cwq(unsigned int cpu, @@ -480,7 +506,8 @@ static struct cpu_workqueue_struct *get_ return wq->cpu_wq.single; #endif } - } else if (likely(cpu == WORK_CPU_UNBOUND)) + } else if (likely(cpu == WORK_CPU_UNBOUND || + cpu == WORK_CPU_IDLEPRI)) return wq->cpu_wq.single; return NULL; } @@ -563,7 +590,9 @@ static struct global_cwq *get_work_gcwq( if (cpu == WORK_CPU_NONE) return NULL; - BUG_ON(cpu >= nr_cpu_ids && cpu != WORK_CPU_UNBOUND); + BUG_ON(cpu >= nr_cpu_ids + && cpu != WORK_CPU_UNBOUND + && cpu != WORK_CPU_IDLEPRI); return get_gcwq(cpu); } @@ -599,6 +628,10 @@ static bool keep_working(struct global_c { atomic_t *nr_running = get_gcwq_nr_running(gcwq->cpu); + if (unlikely((gcwq->cpu == WORK_CPU_IDLEPRI)) && + need_resched()) + return false; + return !list_empty(&gcwq->worklist) && (atomic_read(nr_running) <= 1 || gcwq->flags & GCWQ_HIGHPRI_PENDING); @@ -1025,9 +1058,12 @@ static void __queue_work(unsigned int cp } } else spin_lock_irqsave(&gcwq->lock, flags); - } else { + } else if (!(wq->flags & WQ_IDLEPRI)) { gcwq = get_gcwq(WORK_CPU_UNBOUND); spin_lock_irqsave(&gcwq->lock, flags); + } else { + gcwq = get_gcwq(WORK_CPU_IDLEPRI); + spin_lock_irqsave(&gcwq->lock, flags); } /* gcwq determined, get cwq and queue */ @@ -1160,8 +1196,10 @@ int queue_delayed_work_on(int cpu, struc lcpu = gcwq->cpu; else lcpu = raw_smp_processor_id(); - } else + } else if (!(wq->flags & WQ_IDLEPRI)) lcpu = WORK_CPU_UNBOUND; + else + lcpu = WORK_CPU_IDLEPRI; set_work_cwq(work, get_cwq(lcpu, wq), 0); @@ -1352,6 +1390,7 @@ static struct worker *alloc_worker(void) static struct worker *create_worker(struct global_cwq *gcwq, bool bind) { bool on_unbound_cpu = gcwq->cpu == WORK_CPU_UNBOUND; + bool on_idle_cpu = gcwq->cpu == WORK_CPU_IDLEPRI; struct worker *worker = NULL; int id = -1; @@ -1371,14 +1410,17 @@ static struct worker *create_worker(stru worker->gcwq = gcwq; worker->id = id; - if (!on_unbound_cpu) + if (!on_unbound_cpu && !on_idle_cpu) worker->task = kthread_create_on_node(worker_thread, worker, cpu_to_node(gcwq->cpu), "kworker/%u:%d", gcwq->cpu, id); - else + else if (!on_idle_cpu) worker->task = kthread_create(worker_thread, worker, "kworker/u:%d", id); + else + worker->task = kthread_create(worker_thread, worker, + "kworker/i:%d", id); if (IS_ERR(worker->task)) goto fail; @@ -1387,12 +1429,14 @@ static struct worker *create_worker(stru * online later on. Make sure every worker has * PF_THREAD_BOUND set. */ - if (bind && !on_unbound_cpu) + if (bind && !on_unbound_cpu && !on_idle_cpu) kthread_bind(worker->task, gcwq->cpu); else { worker->task->flags |= PF_THREAD_BOUND; if (on_unbound_cpu) worker->flags |= WORKER_UNBOUND; + if (on_idle_cpu) + worker->flags |= WORKER_IDLEPRI; } return worker; @@ -1496,7 +1540,7 @@ static bool send_mayday(struct work_stru /* mayday mayday mayday */ cpu = cwq->gcwq->cpu; /* WORK_CPU_UNBOUND can't be set in cpumask, use cpu 0 instead */ - if (cpu == WORK_CPU_UNBOUND) + if ((cpu == WORK_CPU_UNBOUND) || (cpu == WORK_CPU_IDLEPRI)) cpu = 0; if (!mayday_test_and_set_cpu(cpu, wq->mayday_mask)) wake_up_process(wq->rescuer->task); @@ -1935,6 +1979,11 @@ static int worker_thread(void *__worker) /* tell the scheduler that this is a workqueue worker */ worker->task->flags |= PF_WQ_WORKER; + /* if worker is for IDLEPRI, set scheduler */ + if (worker->flags & WORKER_IDLEPRI) { + struct sched_param param; + sched_setscheduler(current, SCHED_IDLE, ¶m); + } woke_up: spin_lock_irq(&gcwq->lock); @@ -2912,8 +2961,9 @@ struct workqueue_struct *__alloc_workque /* * Workqueues which may be used during memory reclaim should * have a rescuer to guarantee forward progress. + * But IDLE workqueue will not have any rescuer. */ - if (flags & WQ_MEM_RECLAIM) + if ((flags & WQ_MEM_RECLAIM) && !(flags & WQ_IDLEPRI)) flags |= WQ_RESCUER; /* @@ -3775,7 +3825,8 @@ static int __init init_workqueues(void) struct global_cwq *gcwq = get_gcwq(cpu); struct worker *worker; - if (cpu != WORK_CPU_UNBOUND) + if ((cpu != WORK_CPU_UNBOUND) && + (cpu != WORK_CPU_IDLEPRI)) gcwq->flags &= ~GCWQ_DISASSOCIATED; worker = create_worker(gcwq, true); BUG_ON(!worker); Index: memcg_async/Documentation/workqueue.txt =================================================================== --- memcg_async.orig/Documentation/workqueue.txt +++ memcg_async/Documentation/workqueue.txt @@ -247,6 +247,16 @@ resources, scheduled and executed. highpri CPU-intensive wq start execution as soon as resources are available and don't affect execution of other work items. + WQ_UNBOUND | WQ_IDLEPRI + An special case of unbound wq, the worker thread for this workqueue + will run in the lowest priority of SCHED_IDLE. Most of characteristics + are same to UNBOUND workqueue but the thread's priority is SCHED_IDLE. + This is useful when you want to run a work for hiding application's + latency by making use of idle time of the system. Because scheduling + priority of this class workqueue is minimum, you must assume that + the work will not run for a long time when the system is cpu hogging. + Then, unlike UNBOUND WQ, this will not have rescuer threads. + @max_active: @max_active determines the maximum number of execution contexts per Index: memcg_async/mm/memcontrol.c =================================================================== --- memcg_async.orig/mm/memcontrol.c +++ memcg_async/mm/memcontrol.c @@ -3872,7 +3872,8 @@ struct workqueue_struct *memcg_async_shr static int memcg_async_shrinker_init(void) { memcg_async_shrinker = alloc_workqueue("memcg_async", - WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_FREEZABLE, 0); + WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_IDLEPRI | WQ_FREEZABLE, + 0); return 0; } module_init(memcg_async_shrinker_init); -- To unsubscribe, send a message with 'unsubscribe linux-mm' in the body to majordomo@xxxxxxxxxx For more info on Linux MM, see: http://www.linux-mm.org/ . Fight unfair telecom internet charges in Canada: sign http://stopthemeter.ca/ Don't email: <a href=mailto:"dont@xxxxxxxxx"> email@xxxxxxxxx </a>