Re: [PATCH 5/6] workqueue: introduce NR_WORKER_POOLS and for_each_worker_pool()

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



>From 8a0597bf9939d50039d4a6f446db51cf920daaad Mon Sep 17 00:00:00 2001
From: Tejun Heo <tj@xxxxxxxxxx>
Date: Fri, 13 Jul 2012 20:50:50 -0700

Introduce NR_WORKER_POOLS and for_each_worker_pool() and convert code
paths which need to manipulate all pools in a gcwq to use them.
NR_WORKER_POOLS is currently one and for_each_worker_pool() iterates
over only @gcwq->pool.

Note that nr_running is per-pool property and converted to an array
with NR_WORKER_POOLS elements and renamed to pool_nr_running.

The changes in this patch are mechanical and don't caues any
functional difference.  This is to prepare for multiple pools per
gcwq.

v2: nr_running indexing bug in get_pool_nr_running() fixed.

Signed-off-by: Tejun Heo <tj@xxxxxxxxxx>
Cc: Tony Luck <tony.luck@xxxxxxxxx>
Cc: Fengguang Wu <fengguang.wu@xxxxxxxxx>
---
git branch updated accordingly.  Thanks!

 kernel/workqueue.c |  225 ++++++++++++++++++++++++++++++++++++----------------
 1 files changed, 155 insertions(+), 70 deletions(-)

diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 7a98bae..82eee34 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -74,6 +74,8 @@ enum {
 	TRUSTEE_RELEASE		= 3,		/* release workers */
 	TRUSTEE_DONE		= 4,		/* trustee is done */
 
+	NR_WORKER_POOLS		= 1,		/* # worker pools per gcwq */
+
 	BUSY_WORKER_HASH_ORDER	= 6,		/* 64 pointers */
 	BUSY_WORKER_HASH_SIZE	= 1 << BUSY_WORKER_HASH_ORDER,
 	BUSY_WORKER_HASH_MASK	= BUSY_WORKER_HASH_SIZE - 1,
@@ -274,6 +276,9 @@ EXPORT_SYMBOL_GPL(system_nrt_freezable_wq);
 #define CREATE_TRACE_POINTS
 #include <trace/events/workqueue.h>
 
+#define for_each_worker_pool(pool, gcwq)				\
+	for ((pool) = &(gcwq)->pool; (pool); (pool) = NULL)
+
 #define for_each_busy_worker(worker, i, pos, gcwq)			\
 	for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++)			\
 		hlist_for_each_entry(worker, pos, &gcwq->busy_hash[i], hentry)
@@ -454,7 +459,7 @@ static bool workqueue_freezing;		/* W: have wqs started freezing? */
  * try_to_wake_up().  Put it in a separate cacheline.
  */
 static DEFINE_PER_CPU(struct global_cwq, global_cwq);
-static DEFINE_PER_CPU_SHARED_ALIGNED(atomic_t, gcwq_nr_running);
+static DEFINE_PER_CPU_SHARED_ALIGNED(atomic_t, pool_nr_running[NR_WORKER_POOLS]);
 
 /*
  * Global cpu workqueue and nr_running counter for unbound gcwq.  The
@@ -462,7 +467,9 @@ static DEFINE_PER_CPU_SHARED_ALIGNED(atomic_t, gcwq_nr_running);
  * workers have WORKER_UNBOUND set.
  */
 static struct global_cwq unbound_global_cwq;
-static atomic_t unbound_gcwq_nr_running = ATOMIC_INIT(0);	/* always 0 */
+static atomic_t unbound_pool_nr_running[NR_WORKER_POOLS] = {
+	[0 ... NR_WORKER_POOLS - 1]	= ATOMIC_INIT(0),	/* always 0 */
+};
 
 static int worker_thread(void *__worker);
 
@@ -477,11 +484,14 @@ static struct global_cwq *get_gcwq(unsigned int cpu)
 static atomic_t *get_pool_nr_running(struct worker_pool *pool)
 {
 	int cpu = pool->gcwq->cpu;
+	atomic_t (*nr_running)[NR_WORKER_POOLS];
 
 	if (cpu != WORK_CPU_UNBOUND)
-		return &per_cpu(gcwq_nr_running, cpu);
+		nr_running = &per_cpu(pool_nr_running, cpu);
 	else
-		return &unbound_gcwq_nr_running;
+		nr_running = &unbound_pool_nr_running;
+
+	return &(*nr_running)[0];
 }
 
 static struct cpu_workqueue_struct *get_cwq(unsigned int cpu,
@@ -3345,9 +3355,30 @@ EXPORT_SYMBOL_GPL(work_busy);
 	__ret1 < 0 ? -1 : 0;						\
 })
 
+static bool gcwq_is_managing_workers(struct global_cwq *gcwq)
+{
+	struct worker_pool *pool;
+
+	for_each_worker_pool(pool, gcwq)
+		if (pool->flags & POOL_MANAGING_WORKERS)
+			return true;
+	return false;
+}
+
+static bool gcwq_has_idle_workers(struct global_cwq *gcwq)
+{
+	struct worker_pool *pool;
+
+	for_each_worker_pool(pool, gcwq)
+		if (!list_empty(&pool->idle_list))
+			return true;
+	return false;
+}
+
 static int __cpuinit trustee_thread(void *__gcwq)
 {
 	struct global_cwq *gcwq = __gcwq;
+	struct worker_pool *pool;
 	struct worker *worker;
 	struct work_struct *work;
 	struct hlist_node *pos;
@@ -3363,13 +3394,15 @@ static int __cpuinit trustee_thread(void *__gcwq)
 	 * cancelled.
 	 */
 	BUG_ON(gcwq->cpu != smp_processor_id());
-	rc = trustee_wait_event(!(gcwq->pool.flags & POOL_MANAGING_WORKERS));
+	rc = trustee_wait_event(!gcwq_is_managing_workers(gcwq));
 	BUG_ON(rc < 0);
 
-	gcwq->pool.flags |= POOL_MANAGING_WORKERS;
+	for_each_worker_pool(pool, gcwq) {
+		pool->flags |= POOL_MANAGING_WORKERS;
 
-	list_for_each_entry(worker, &gcwq->pool.idle_list, entry)
-		worker->flags |= WORKER_ROGUE;
+		list_for_each_entry(worker, &pool->idle_list, entry)
+			worker->flags |= WORKER_ROGUE;
+	}
 
 	for_each_busy_worker(worker, i, pos, gcwq)
 		worker->flags |= WORKER_ROGUE;
@@ -3390,10 +3423,12 @@ static int __cpuinit trustee_thread(void *__gcwq)
 	 * keep_working() are always true as long as the worklist is
 	 * not empty.
 	 */
-	atomic_set(get_pool_nr_running(&gcwq->pool), 0);
+	for_each_worker_pool(pool, gcwq)
+		atomic_set(get_pool_nr_running(pool), 0);
 
 	spin_unlock_irq(&gcwq->lock);
-	del_timer_sync(&gcwq->pool.idle_timer);
+	for_each_worker_pool(pool, gcwq)
+		del_timer_sync(&pool->idle_timer);
 	spin_lock_irq(&gcwq->lock);
 
 	/*
@@ -3415,29 +3450,38 @@ static int __cpuinit trustee_thread(void *__gcwq)
 	 * may be frozen works in freezable cwqs.  Don't declare
 	 * completion while frozen.
 	 */
-	while (gcwq->pool.nr_workers != gcwq->pool.nr_idle ||
-	       gcwq->flags & GCWQ_FREEZING ||
-	       gcwq->trustee_state == TRUSTEE_IN_CHARGE) {
-		int nr_works = 0;
+	while (true) {
+		bool busy = false;
 
-		list_for_each_entry(work, &gcwq->pool.worklist, entry) {
-			send_mayday(work);
-			nr_works++;
-		}
+		for_each_worker_pool(pool, gcwq)
+			busy |= pool->nr_workers != pool->nr_idle;
 
-		list_for_each_entry(worker, &gcwq->pool.idle_list, entry) {
-			if (!nr_works--)
-				break;
-			wake_up_process(worker->task);
-		}
+		if (!busy && !(gcwq->flags & GCWQ_FREEZING) &&
+		    gcwq->trustee_state != TRUSTEE_IN_CHARGE)
+			break;
 
-		if (need_to_create_worker(&gcwq->pool)) {
-			spin_unlock_irq(&gcwq->lock);
-			worker = create_worker(&gcwq->pool, false);
-			spin_lock_irq(&gcwq->lock);
-			if (worker) {
-				worker->flags |= WORKER_ROGUE;
-				start_worker(worker);
+		for_each_worker_pool(pool, gcwq) {
+			int nr_works = 0;
+
+			list_for_each_entry(work, &pool->worklist, entry) {
+				send_mayday(work);
+				nr_works++;
+			}
+
+			list_for_each_entry(worker, &pool->idle_list, entry) {
+				if (!nr_works--)
+					break;
+				wake_up_process(worker->task);
+			}
+
+			if (need_to_create_worker(pool)) {
+				spin_unlock_irq(&gcwq->lock);
+				worker = create_worker(pool, false);
+				spin_lock_irq(&gcwq->lock);
+				if (worker) {
+					worker->flags |= WORKER_ROGUE;
+					start_worker(worker);
+				}
 			}
 		}
 
@@ -3452,11 +3496,18 @@ static int __cpuinit trustee_thread(void *__gcwq)
 	 * all workers till we're canceled.
 	 */
 	do {
-		rc = trustee_wait_event(!list_empty(&gcwq->pool.idle_list));
-		while (!list_empty(&gcwq->pool.idle_list))
-			destroy_worker(list_first_entry(&gcwq->pool.idle_list,
-							struct worker, entry));
-	} while (gcwq->pool.nr_workers && rc >= 0);
+		rc = trustee_wait_event(gcwq_has_idle_workers(gcwq));
+
+		i = 0;
+		for_each_worker_pool(pool, gcwq) {
+			while (!list_empty(&pool->idle_list)) {
+				worker = list_first_entry(&pool->idle_list,
+							  struct worker, entry);
+				destroy_worker(worker);
+			}
+			i |= pool->nr_workers;
+		}
+	} while (i && rc >= 0);
 
 	/*
 	 * At this point, either draining has completed and no worker
@@ -3465,7 +3516,8 @@ static int __cpuinit trustee_thread(void *__gcwq)
 	 * Tell the remaining busy ones to rebind once it finishes the
 	 * currently scheduled works by scheduling the rebind_work.
 	 */
-	WARN_ON(!list_empty(&gcwq->pool.idle_list));
+	for_each_worker_pool(pool, gcwq)
+		WARN_ON(!list_empty(&pool->idle_list));
 
 	for_each_busy_worker(worker, i, pos, gcwq) {
 		struct work_struct *rebind_work = &worker->rebind_work;
@@ -3490,7 +3542,8 @@ static int __cpuinit trustee_thread(void *__gcwq)
 	}
 
 	/* relinquish manager role */
-	gcwq->pool.flags &= ~POOL_MANAGING_WORKERS;
+	for_each_worker_pool(pool, gcwq)
+		pool->flags &= ~POOL_MANAGING_WORKERS;
 
 	/* notify completion */
 	gcwq->trustee = NULL;
@@ -3532,8 +3585,10 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
 	unsigned int cpu = (unsigned long)hcpu;
 	struct global_cwq *gcwq = get_gcwq(cpu);
 	struct task_struct *new_trustee = NULL;
-	struct worker *uninitialized_var(new_worker);
+	struct worker *new_workers[NR_WORKER_POOLS] = { };
+	struct worker_pool *pool;
 	unsigned long flags;
+	int i;
 
 	action &= ~CPU_TASKS_FROZEN;
 
@@ -3546,12 +3601,12 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
 		kthread_bind(new_trustee, cpu);
 		/* fall through */
 	case CPU_UP_PREPARE:
-		BUG_ON(gcwq->pool.first_idle);
-		new_worker = create_worker(&gcwq->pool, false);
-		if (!new_worker) {
-			if (new_trustee)
-				kthread_stop(new_trustee);
-			return NOTIFY_BAD;
+		i = 0;
+		for_each_worker_pool(pool, gcwq) {
+			BUG_ON(pool->first_idle);
+			new_workers[i] = create_worker(pool, false);
+			if (!new_workers[i++])
+				goto err_destroy;
 		}
 	}
 
@@ -3568,8 +3623,11 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
 		wait_trustee_state(gcwq, TRUSTEE_IN_CHARGE);
 		/* fall through */
 	case CPU_UP_PREPARE:
-		BUG_ON(gcwq->pool.first_idle);
-		gcwq->pool.first_idle = new_worker;
+		i = 0;
+		for_each_worker_pool(pool, gcwq) {
+			BUG_ON(pool->first_idle);
+			pool->first_idle = new_workers[i++];
+		}
 		break;
 
 	case CPU_DYING:
@@ -3586,8 +3644,10 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
 		gcwq->trustee_state = TRUSTEE_BUTCHER;
 		/* fall through */
 	case CPU_UP_CANCELED:
-		destroy_worker(gcwq->pool.first_idle);
-		gcwq->pool.first_idle = NULL;
+		for_each_worker_pool(pool, gcwq) {
+			destroy_worker(pool->first_idle);
+			pool->first_idle = NULL;
+		}
 		break;
 
 	case CPU_DOWN_FAILED:
@@ -3604,18 +3664,32 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
 		 * Put the first_idle in and request a real manager to
 		 * take a look.
 		 */
-		spin_unlock_irq(&gcwq->lock);
-		kthread_bind(gcwq->pool.first_idle->task, cpu);
-		spin_lock_irq(&gcwq->lock);
-		gcwq->pool.flags |= POOL_MANAGE_WORKERS;
-		start_worker(gcwq->pool.first_idle);
-		gcwq->pool.first_idle = NULL;
+		for_each_worker_pool(pool, gcwq) {
+			spin_unlock_irq(&gcwq->lock);
+			kthread_bind(pool->first_idle->task, cpu);
+			spin_lock_irq(&gcwq->lock);
+			pool->flags |= POOL_MANAGE_WORKERS;
+			start_worker(pool->first_idle);
+			pool->first_idle = NULL;
+		}
 		break;
 	}
 
 	spin_unlock_irqrestore(&gcwq->lock, flags);
 
 	return notifier_from_errno(0);
+
+err_destroy:
+	if (new_trustee)
+		kthread_stop(new_trustee);
+
+	spin_lock_irqsave(&gcwq->lock, flags);
+	for (i = 0; i < NR_WORKER_POOLS; i++)
+		if (new_workers[i])
+			destroy_worker(new_workers[i]);
+	spin_unlock_irqrestore(&gcwq->lock, flags);
+
+	return NOTIFY_BAD;
 }
 
 #ifdef CONFIG_SMP
@@ -3774,6 +3848,7 @@ void thaw_workqueues(void)
 
 	for_each_gcwq_cpu(cpu) {
 		struct global_cwq *gcwq = get_gcwq(cpu);
+		struct worker_pool *pool;
 		struct workqueue_struct *wq;
 
 		spin_lock_irq(&gcwq->lock);
@@ -3795,7 +3870,8 @@ void thaw_workqueues(void)
 				cwq_activate_first_delayed(cwq);
 		}
 
-		wake_up_worker(&gcwq->pool);
+		for_each_worker_pool(pool, gcwq)
+			wake_up_worker(pool);
 
 		spin_unlock_irq(&gcwq->lock);
 	}
@@ -3816,25 +3892,29 @@ static int __init init_workqueues(void)
 	/* initialize gcwqs */
 	for_each_gcwq_cpu(cpu) {
 		struct global_cwq *gcwq = get_gcwq(cpu);
+		struct worker_pool *pool;
 
 		spin_lock_init(&gcwq->lock);
-		gcwq->pool.gcwq = gcwq;
-		INIT_LIST_HEAD(&gcwq->pool.worklist);
 		gcwq->cpu = cpu;
 		gcwq->flags |= GCWQ_DISASSOCIATED;
 
-		INIT_LIST_HEAD(&gcwq->pool.idle_list);
 		for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++)
 			INIT_HLIST_HEAD(&gcwq->busy_hash[i]);
 
-		init_timer_deferrable(&gcwq->pool.idle_timer);
-		gcwq->pool.idle_timer.function = idle_worker_timeout;
-		gcwq->pool.idle_timer.data = (unsigned long)&gcwq->pool;
+		for_each_worker_pool(pool, gcwq) {
+			pool->gcwq = gcwq;
+			INIT_LIST_HEAD(&pool->worklist);
+			INIT_LIST_HEAD(&pool->idle_list);
+
+			init_timer_deferrable(&pool->idle_timer);
+			pool->idle_timer.function = idle_worker_timeout;
+			pool->idle_timer.data = (unsigned long)pool;
 
-		setup_timer(&gcwq->pool.mayday_timer, gcwq_mayday_timeout,
-			    (unsigned long)&gcwq->pool);
+			setup_timer(&pool->mayday_timer, gcwq_mayday_timeout,
+				    (unsigned long)pool);
 
-		ida_init(&gcwq->pool.worker_ida);
+			ida_init(&pool->worker_ida);
+		}
 
 		gcwq->trustee_state = TRUSTEE_DONE;
 		init_waitqueue_head(&gcwq->trustee_wait);
@@ -3843,15 +3923,20 @@ static int __init init_workqueues(void)
 	/* create the initial worker */
 	for_each_online_gcwq_cpu(cpu) {
 		struct global_cwq *gcwq = get_gcwq(cpu);
-		struct worker *worker;
+		struct worker_pool *pool;
 
 		if (cpu != WORK_CPU_UNBOUND)
 			gcwq->flags &= ~GCWQ_DISASSOCIATED;
-		worker = create_worker(&gcwq->pool, true);
-		BUG_ON(!worker);
-		spin_lock_irq(&gcwq->lock);
-		start_worker(worker);
-		spin_unlock_irq(&gcwq->lock);
+
+		for_each_worker_pool(pool, gcwq) {
+			struct worker *worker;
+
+			worker = create_worker(pool, true);
+			BUG_ON(!worker);
+			spin_lock_irq(&gcwq->lock);
+			start_worker(worker);
+			spin_unlock_irq(&gcwq->lock);
+		}
 	}
 
 	system_wq = alloc_workqueue("events", 0, 0);
-- 
1.7.7.3

_______________________________________________
xfs mailing list
xfs@xxxxxxxxxxx
http://oss.sgi.com/mailman/listinfo/xfs


[Index of Archives]     [Linux XFS Devel]     [Linux Filesystem Development]     [Filesystem Testing]     [Linux USB Devel]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]

  Powered by Linux