[PATCH RT 10/11] workqueue: Revert workqueue: Fix cpuhotplug trainwreck

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



From: Steven Rostedt <srostedt@xxxxxxxxxx>

Revert

    Author: Peter Zijlstra <a.p.zijlstra@xxxxxxxxx>
    Date:   Fri Sep 30 11:57:58 2011 +0200
    workqueue: Fix cpuhotplug trainwreck

As TREAD_BOUND no longer affects cpu down, and this code introduced
a lot of races with taking down a CPU.

Signed-off-by: Steven Rostedt <rostedt@xxxxxxxxxxx>
Signed-off-by: Thomas Gleixner <tglx@xxxxxxxxxxxxx>
---
 include/linux/cpu.h       |    6 +-
 include/linux/workqueue.h |    5 +-
 kernel/workqueue.c        |  561 +++++++++++++++++++++++++++++++++------------
 3 files changed, 415 insertions(+), 157 deletions(-)

diff --git a/include/linux/cpu.h b/include/linux/cpu.h
index a6bda1b..00d2f6f8 100644
--- a/include/linux/cpu.h
+++ b/include/linux/cpu.h
@@ -75,10 +75,8 @@ enum {
 	/* migration should happen before other stuff but after perf */
 	CPU_PRI_PERF		= 20,
 	CPU_PRI_MIGRATION	= 10,
-
-	CPU_PRI_WORKQUEUE_ACTIVE	= 5,  /* prepare workqueues for others */
-	CPU_PRI_NORMAL			= 0,
-	CPU_PRI_WORKQUEUE_INACTIVE	= -5, /* flush workqueues after others */
+	/* prepare workqueues for other notifiers */
+	CPU_PRI_WORKQUEUE	= 5,
 };
 
 #define CPU_ONLINE		0x0002 /* CPU (unsigned)v is up */
diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 9849be1..af15545 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -254,10 +254,9 @@ enum {
 	WQ_MEM_RECLAIM		= 1 << 3, /* may be used for memory reclaim */
 	WQ_HIGHPRI		= 1 << 4, /* high priority */
 	WQ_CPU_INTENSIVE	= 1 << 5, /* cpu instensive workqueue */
-	WQ_NON_AFFINE		= 1 << 6, /* free to move works around cpus */
 
-	WQ_DRAINING		= 1 << 7, /* internal: workqueue is draining */
-	WQ_RESCUER		= 1 << 8, /* internal: workqueue has rescuer */
+	WQ_DRAINING		= 1 << 6, /* internal: workqueue is draining */
+	WQ_RESCUER		= 1 << 7, /* internal: workqueue has rescuer */
 
 	WQ_MAX_ACTIVE		= 512,	  /* I like 512, better ideas? */
 	WQ_MAX_UNBOUND_PER_CPU	= 4,	  /* 4 * #cpus for unbound wq */
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 1e7f9c7..33d1095 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -41,7 +41,6 @@
 #include <linux/debug_locks.h>
 #include <linux/lockdep.h>
 #include <linux/idr.h>
-#include <linux/delay.h>
 
 #include "workqueue_sched.h"
 
@@ -58,10 +57,20 @@ enum {
 	WORKER_DIE		= 1 << 1,	/* die die die */
 	WORKER_IDLE		= 1 << 2,	/* is idle */
 	WORKER_PREP		= 1 << 3,	/* preparing to run works */
-	WORKER_CPU_INTENSIVE	= 1 << 4,	/* cpu intensive */
-	WORKER_UNBOUND		= 1 << 5,	/* worker is unbound */
+	WORKER_ROGUE		= 1 << 4,	/* not bound to any cpu */
+	WORKER_REBIND		= 1 << 5,	/* mom is home, come back */
+	WORKER_CPU_INTENSIVE	= 1 << 6,	/* cpu intensive */
+	WORKER_UNBOUND		= 1 << 7,	/* worker is unbound */
 
-	WORKER_NOT_RUNNING	= WORKER_PREP | WORKER_CPU_INTENSIVE | WORKER_UNBOUND,
+	WORKER_NOT_RUNNING	= WORKER_PREP | WORKER_ROGUE | WORKER_REBIND |
+				  WORKER_CPU_INTENSIVE | WORKER_UNBOUND,
+
+	/* gcwq->trustee_state */
+	TRUSTEE_START		= 0,		/* start */
+	TRUSTEE_IN_CHARGE	= 1,		/* trustee in charge of gcwq */
+	TRUSTEE_BUTCHER		= 2,		/* butcher workers */
+	TRUSTEE_RELEASE		= 3,		/* release workers */
+	TRUSTEE_DONE		= 4,		/* trustee is done */
 
 	BUSY_WORKER_HASH_ORDER	= 6,		/* 64 pointers */
 	BUSY_WORKER_HASH_SIZE	= 1 << BUSY_WORKER_HASH_ORDER,
@@ -75,6 +84,7 @@ enum {
 						   (min two ticks) */
 	MAYDAY_INTERVAL		= HZ / 10,	/* and then every 100ms */
 	CREATE_COOLDOWN		= HZ,		/* time to breath after fail */
+	TRUSTEE_COOLDOWN	= HZ / 10,	/* for trustee draining */
 
 	/*
 	 * Rescue workers are used only on emergencies and shared by
@@ -126,6 +136,7 @@ struct worker {
 	unsigned long		last_active;	/* L: last active timestamp */
 	unsigned int		flags;		/* X: flags */
 	int			id;		/* I: worker id */
+	struct work_struct	rebind_work;	/* L: rebind worker to cpu */
 	int			sleeping;	/* None */
 };
 
@@ -153,8 +164,10 @@ struct global_cwq {
 
 	struct ida		worker_ida;	/* L: for worker IDs */
 
+	struct task_struct	*trustee;	/* L: for gcwq shutdown */
+	unsigned int		trustee_state;	/* L: trustee state */
+	wait_queue_head_t	trustee_wait;	/* trustee wait */
 	struct worker		*first_idle;	/* L: first idle worker */
-	wait_queue_head_t	idle_wait;
 } ____cacheline_aligned_in_smp;
 
 /*
@@ -956,38 +969,13 @@ static bool is_chained_work(struct workqueue_struct *wq)
 	return false;
 }
 
-static void ___queue_work(struct workqueue_struct *wq, struct global_cwq *gcwq,
-			  struct work_struct *work)
-{
-	struct cpu_workqueue_struct *cwq;
-	struct list_head *worklist;
-	unsigned int work_flags;
-
-	/* gcwq determined, get cwq and queue */
-	cwq = get_cwq(gcwq->cpu, wq);
-	trace_workqueue_queue_work(gcwq->cpu, cwq, work);
-
-	BUG_ON(!list_empty(&work->entry));
-
-	cwq->nr_in_flight[cwq->work_color]++;
-	work_flags = work_color_to_flags(cwq->work_color);
-
-	if (likely(cwq->nr_active < cwq->max_active)) {
-		trace_workqueue_activate_work(work);
-		cwq->nr_active++;
-		worklist = gcwq_determine_ins_pos(gcwq, cwq);
-	} else {
-		work_flags |= WORK_STRUCT_DELAYED;
-		worklist = &cwq->delayed_works;
-	}
-
-	insert_work(cwq, work, worklist, work_flags);
-}
-
 static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
 			 struct work_struct *work)
 {
 	struct global_cwq *gcwq;
+	struct cpu_workqueue_struct *cwq;
+	struct list_head *worklist;
+	unsigned int work_flags;
 	unsigned long flags;
 
 	debug_work_activate(work);
@@ -1033,32 +1021,27 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
 		spin_lock_irqsave(&gcwq->lock, flags);
 	}
 
-	___queue_work(wq, gcwq, work);
+	/* gcwq determined, get cwq and queue */
+	cwq = get_cwq(gcwq->cpu, wq);
+	trace_workqueue_queue_work(cpu, cwq, work);
 
-	spin_unlock_irqrestore(&gcwq->lock, flags);
-}
+	BUG_ON(!list_empty(&work->entry));
 
-/**
- * queue_work_on - queue work on specific cpu
- * @cpu: CPU number to execute work on
- * @wq: workqueue to use
- * @work: work to queue
- *
- * Returns 0 if @work was already on a queue, non-zero otherwise.
- *
- * We queue the work to a specific CPU, the caller must ensure it
- * can't go away.
- */
-static int
-__queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)
-{
-	int ret = 0;
+	cwq->nr_in_flight[cwq->work_color]++;
+	work_flags = work_color_to_flags(cwq->work_color);
 
-	if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
-		__queue_work(cpu, wq, work);
-		ret = 1;
+	if (likely(cwq->nr_active < cwq->max_active)) {
+		trace_workqueue_activate_work(work);
+		cwq->nr_active++;
+		worklist = gcwq_determine_ins_pos(gcwq, cwq);
+	} else {
+		work_flags |= WORK_STRUCT_DELAYED;
+		worklist = &cwq->delayed_works;
 	}
-	return ret;
+
+	insert_work(cwq, work, worklist, work_flags);
+
+	spin_unlock_irqrestore(&gcwq->lock, flags);
 }
 
 /**
@@ -1075,19 +1058,34 @@ int queue_work(struct workqueue_struct *wq, struct work_struct *work)
 {
 	int ret;
 
-	ret = __queue_work_on(get_cpu_light(), wq, work);
+	ret = queue_work_on(get_cpu_light(), wq, work);
 	put_cpu_light();
 
 	return ret;
 }
 EXPORT_SYMBOL_GPL(queue_work);
 
+/**
+ * queue_work_on - queue work on specific cpu
+ * @cpu: CPU number to execute work on
+ * @wq: workqueue to use
+ * @work: work to queue
+ *
+ * Returns 0 if @work was already on a queue, non-zero otherwise.
+ *
+ * We queue the work to a specific CPU, the caller must ensure it
+ * can't go away.
+ */
 int
 queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)
 {
-	WARN_ON(wq->flags & WQ_NON_AFFINE);
+	int ret = 0;
 
-	return __queue_work_on(cpu, wq, work);
+	if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
+		__queue_work(cpu, wq, work);
+		ret = 1;
+	}
+	return ret;
 }
 EXPORT_SYMBOL_GPL(queue_work_on);
 
@@ -1133,8 +1131,6 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
 	struct timer_list *timer = &dwork->timer;
 	struct work_struct *work = &dwork->work;
 
-	WARN_ON((wq->flags & WQ_NON_AFFINE) && cpu != -1);
-
 	if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
 		unsigned int lcpu;
 
@@ -1200,13 +1196,12 @@ static void worker_enter_idle(struct worker *worker)
 	/* idle_list is LIFO */
 	list_add(&worker->entry, &gcwq->idle_list);
 
-	if (gcwq->nr_idle == gcwq->nr_workers)
-		wake_up_all(&gcwq->idle_wait);
-
-	if (too_many_workers(gcwq) && !timer_pending(&gcwq->idle_timer)) {
-		mod_timer(&gcwq->idle_timer,
-				jiffies + IDLE_WORKER_TIMEOUT);
-	}
+	if (likely(!(worker->flags & WORKER_ROGUE))) {
+		if (too_many_workers(gcwq) && !timer_pending(&gcwq->idle_timer))
+			mod_timer(&gcwq->idle_timer,
+				  jiffies + IDLE_WORKER_TIMEOUT);
+	} else
+		wake_up_all(&gcwq->trustee_wait);
 
 	/* sanity check nr_running */
 	WARN_ON_ONCE(gcwq->nr_workers == gcwq->nr_idle &&
@@ -1298,6 +1293,23 @@ __acquires(&gcwq->lock)
 	}
 }
 
+/*
+ * Function for worker->rebind_work used to rebind rogue busy workers
+ * to the associated cpu which is coming back online.  This is
+ * scheduled by cpu up but can race with other cpu hotplug operations
+ * and may be executed twice without intervening cpu down.
+ */
+static void worker_rebind_fn(struct work_struct *work)
+{
+	struct worker *worker = container_of(work, struct worker, rebind_work);
+	struct global_cwq *gcwq = worker->gcwq;
+
+	if (worker_maybe_bind_and_lock(worker))
+		worker_clr_flags(worker, WORKER_REBIND);
+
+	spin_unlock_irq(&gcwq->lock);
+}
+
 static struct worker *alloc_worker(void)
 {
 	struct worker *worker;
@@ -1306,6 +1318,7 @@ static struct worker *alloc_worker(void)
 	if (worker) {
 		INIT_LIST_HEAD(&worker->entry);
 		INIT_LIST_HEAD(&worker->scheduled);
+		INIT_WORK(&worker->rebind_work, worker_rebind_fn);
 		/* on creation a worker is in !idle && prep state */
 		worker->flags = WORKER_PREP;
 	}
@@ -1645,6 +1658,13 @@ static bool manage_workers(struct worker *worker)
 
 	gcwq->flags &= ~GCWQ_MANAGING_WORKERS;
 
+	/*
+	 * The trustee might be waiting to take over the manager
+	 * position, tell it we're done.
+	 */
+	if (unlikely(gcwq->trustee))
+		wake_up_all(&gcwq->trustee_wait);
+
 	return ret;
 }
 
@@ -3185,76 +3205,171 @@ EXPORT_SYMBOL_GPL(work_busy);
  * gcwqs serve mix of short, long and very long running works making
  * blocked draining impractical.
  *
+ * This is solved by allowing a gcwq to be detached from CPU, running
+ * it with unbound (rogue) workers and allowing it to be reattached
+ * later if the cpu comes back online.  A separate thread is created
+ * to govern a gcwq in such state and is called the trustee of the
+ * gcwq.
+ *
+ * Trustee states and their descriptions.
+ *
+ * START	Command state used on startup.  On CPU_DOWN_PREPARE, a
+ *		new trustee is started with this state.
+ *
+ * IN_CHARGE	Once started, trustee will enter this state after
+ *		assuming the manager role and making all existing
+ *		workers rogue.  DOWN_PREPARE waits for trustee to
+ *		enter this state.  After reaching IN_CHARGE, trustee
+ *		tries to execute the pending worklist until it's empty
+ *		and the state is set to BUTCHER, or the state is set
+ *		to RELEASE.
+ *
+ * BUTCHER	Command state which is set by the cpu callback after
+ *		the cpu has went down.  Once this state is set trustee
+ *		knows that there will be no new works on the worklist
+ *		and once the worklist is empty it can proceed to
+ *		killing idle workers.
+ *
+ * RELEASE	Command state which is set by the cpu callback if the
+ *		cpu down has been canceled or it has come online
+ *		again.  After recognizing this state, trustee stops
+ *		trying to drain or butcher and clears ROGUE, rebinds
+ *		all remaining workers back to the cpu and releases
+ *		manager role.
+ *
+ * DONE		Trustee will enter this state after BUTCHER or RELEASE
+ *		is complete.
+ *
+ *          trustee                 CPU                draining
+ *         took over                down               complete
+ * START -----------> IN_CHARGE -----------> BUTCHER -----------> DONE
+ *                        |                     |                  ^
+ *                        | CPU is back online  v   return workers |
+ *                         ----------------> RELEASE --------------
  */
 
-static int __devinit workqueue_cpu_up_callback(struct notifier_block *nfb,
-						unsigned long action,
-						void *hcpu)
-{
-	unsigned int cpu = (unsigned long)hcpu;
-	struct global_cwq *gcwq = get_gcwq(cpu);
-	struct worker *uninitialized_var(new_worker);
-	unsigned long flags;
+/**
+ * trustee_wait_event_timeout - timed event wait for trustee
+ * @cond: condition to wait for
+ * @timeout: timeout in jiffies
+ *
+ * wait_event_timeout() for trustee to use.  Handles locking and
+ * checks for RELEASE request.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock) which may be released and regrabbed
+ * multiple times.  To be used by trustee.
+ *
+ * RETURNS:
+ * Positive indicating left time if @cond is satisfied, 0 if timed
+ * out, -1 if canceled.
+ */
+#define trustee_wait_event_timeout(cond, timeout) ({			\
+	long __ret = (timeout);						\
+	while (!((cond) || (gcwq->trustee_state == TRUSTEE_RELEASE)) &&	\
+	       __ret) {							\
+		spin_unlock_irq(&gcwq->lock);				\
+		__wait_event_timeout(gcwq->trustee_wait, (cond) ||	\
+			(gcwq->trustee_state == TRUSTEE_RELEASE),	\
+			__ret);						\
+		spin_lock_irq(&gcwq->lock);				\
+	}								\
+	gcwq->trustee_state == TRUSTEE_RELEASE ? -1 : (__ret);		\
+})
 
-	action &= ~CPU_TASKS_FROZEN;
+/**
+ * trustee_wait_event - event wait for trustee
+ * @cond: condition to wait for
+ *
+ * wait_event() for trustee to use.  Automatically handles locking and
+ * checks for CANCEL request.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock) which may be released and regrabbed
+ * multiple times.  To be used by trustee.
+ *
+ * RETURNS:
+ * 0 if @cond is satisfied, -1 if canceled.
+ */
+#define trustee_wait_event(cond) ({					\
+	long __ret1;							\
+	__ret1 = trustee_wait_event_timeout(cond, MAX_SCHEDULE_TIMEOUT);\
+	__ret1 < 0 ? -1 : 0;						\
+})
 
-	switch (action) {
-	case CPU_UP_PREPARE:
-		BUG_ON(gcwq->first_idle);
-		new_worker = create_worker(gcwq, false);
-		if (!new_worker)
-			return NOTIFY_BAD;
-	case CPU_UP_CANCELED:
-	case CPU_ONLINE:
-		break;
-	default:
-		return notifier_from_errno(0);
-	}
+static int __cpuinit trustee_thread(void *__gcwq)
+{
+	struct global_cwq *gcwq = __gcwq;
+	struct worker *worker;
+	struct work_struct *work;
+	struct hlist_node *pos;
+	long rc;
+	int i;
 
-	/* some are called w/ irq disabled, don't disturb irq status */
-	spin_lock_irqsave(&gcwq->lock, flags);
+	BUG_ON(gcwq->cpu != smp_processor_id());
 
-	switch (action) {
-	case CPU_UP_PREPARE:
-		BUG_ON(gcwq->first_idle);
-		gcwq->first_idle = new_worker;
-		break;
+	spin_lock_irq(&gcwq->lock);
+	/*
+	 * Claim the manager position and make all workers rogue.
+	 * Trustee must be bound to the target cpu and can't be
+	 * cancelled.
+	 */
+	BUG_ON(gcwq->cpu != smp_processor_id());
+	rc = trustee_wait_event(!(gcwq->flags & GCWQ_MANAGING_WORKERS));
+	BUG_ON(rc < 0);
 
-	case CPU_UP_CANCELED:
-		destroy_worker(gcwq->first_idle);
-		gcwq->first_idle = NULL;
-		break;
+	gcwq->flags |= GCWQ_MANAGING_WORKERS;
 
-	case CPU_ONLINE:
-		spin_unlock_irq(&gcwq->lock);
-		kthread_bind(gcwq->first_idle->task, cpu);
-		spin_lock_irq(&gcwq->lock);
-		gcwq->flags |= GCWQ_MANAGE_WORKERS;
-		start_worker(gcwq->first_idle);
-		gcwq->first_idle = NULL;
-		break;
-	}
+	list_for_each_entry(worker, &gcwq->idle_list, entry)
+		worker->flags |= WORKER_ROGUE;
 
-	spin_unlock_irqrestore(&gcwq->lock, flags);
+	for_each_busy_worker(worker, i, pos, gcwq)
+		worker->flags |= WORKER_ROGUE;
 
-	return notifier_from_errno(0);
-}
+	/*
+	 * Call schedule() so that we cross rq->lock and thus can
+	 * guarantee sched callbacks see the rogue flag.  This is
+	 * necessary as scheduler callbacks may be invoked from other
+	 * cpus.
+	 */
+	spin_unlock_irq(&gcwq->lock);
+	schedule();
+	spin_lock_irq(&gcwq->lock);
 
-static void flush_gcwq(struct global_cwq *gcwq)
-{
-	struct work_struct *work, *nw;
-	struct worker *worker, *n;
-	LIST_HEAD(non_affine_works);
+	/*
+	 * Sched callbacks are disabled now.  Zap nr_running.  After
+	 * this, nr_running stays zero and need_more_worker() and
+	 * keep_working() are always true as long as the worklist is
+	 * not empty.
+	 */
+	atomic_set(get_gcwq_nr_running(gcwq->cpu), 0);
 
+	spin_unlock_irq(&gcwq->lock);
+	del_timer_sync(&gcwq->idle_timer);
 	spin_lock_irq(&gcwq->lock);
-	list_for_each_entry_safe(work, nw, &gcwq->worklist, entry) {
-		struct workqueue_struct *wq = get_work_cwq(work)->wq;
 
-		if (wq->flags & WQ_NON_AFFINE)
-			list_move(&work->entry, &non_affine_works);
-	}
+	/*
+	 * We're now in charge.  Notify and proceed to drain.  We need
+	 * to keep the gcwq running during the whole CPU down
+	 * procedure as other cpu hotunplug callbacks may need to
+	 * flush currently running tasks.
+	 */
+	gcwq->trustee_state = TRUSTEE_IN_CHARGE;
+	wake_up_all(&gcwq->trustee_wait);
 
-	while (!list_empty(&gcwq->worklist)) {
+	/*
+	 * The original cpu is in the process of dying and may go away
+	 * anytime now.  When that happens, we and all workers would
+	 * be migrated to other cpus.  Try draining any left work.  We
+	 * want to get it over with ASAP - spam rescuers, wake up as
+	 * many idlers as necessary and create new ones till the
+	 * worklist is empty.  Note that if the gcwq is frozen, there
+	 * may be frozen works in freezable cwqs.  Don't declare
+	 * completion while frozen.
+	 */
+	while (gcwq->nr_workers != gcwq->nr_idle ||
+	       gcwq->flags & GCWQ_FREEZING ||
+	       gcwq->trustee_state == TRUSTEE_IN_CHARGE) {
 		int nr_works = 0;
 
 		list_for_each_entry(work, &gcwq->worklist, entry) {
@@ -3268,55 +3383,200 @@ static void flush_gcwq(struct global_cwq *gcwq)
 			wake_up_process(worker->task);
 		}
 
-		spin_unlock_irq(&gcwq->lock);
-
 		if (need_to_create_worker(gcwq)) {
-			worker = create_worker(gcwq, true);
-			if (worker)
+			spin_unlock_irq(&gcwq->lock);
+			worker = create_worker(gcwq, false);
+			spin_lock_irq(&gcwq->lock);
+			if (worker) {
+				worker->flags |= WORKER_ROGUE;
 				start_worker(worker);
+			}
 		}
 
-		wait_event_timeout(gcwq->idle_wait,
-				gcwq->nr_idle == gcwq->nr_workers, HZ/10);
-
-		spin_lock_irq(&gcwq->lock);
+		/* give a breather */
+		if (trustee_wait_event_timeout(false, TRUSTEE_COOLDOWN) < 0)
+			break;
 	}
 
-	WARN_ON(gcwq->nr_workers != gcwq->nr_idle);
+	/*
+	 * Either all works have been scheduled and cpu is down, or
+	 * cpu down has already been canceled.  Wait for and butcher
+	 * all workers till we're canceled.
+	 */
+	do {
+		rc = trustee_wait_event(!list_empty(&gcwq->idle_list));
+		while (!list_empty(&gcwq->idle_list))
+			destroy_worker(list_first_entry(&gcwq->idle_list,
+							struct worker, entry));
+	} while (gcwq->nr_workers && rc >= 0);
 
-	list_for_each_entry_safe(worker, n, &gcwq->idle_list, entry)
-		destroy_worker(worker);
+	/*
+	 * At this point, either draining has completed and no worker
+	 * is left, or cpu down has been canceled or the cpu is being
+	 * brought back up.  There shouldn't be any idle one left.
+	 * Tell the remaining busy ones to rebind once it finishes the
+	 * currently scheduled works by scheduling the rebind_work.
+	 */
+	WARN_ON(!list_empty(&gcwq->idle_list));
 
-	WARN_ON(gcwq->nr_workers || gcwq->nr_idle);
+	for_each_busy_worker(worker, i, pos, gcwq) {
+		struct work_struct *rebind_work = &worker->rebind_work;
 
-	spin_unlock_irq(&gcwq->lock);
+		/*
+		 * Rebind_work may race with future cpu hotplug
+		 * operations.  Use a separate flag to mark that
+		 * rebinding is scheduled.
+		 */
+		worker->flags |= WORKER_REBIND;
+		worker->flags &= ~WORKER_ROGUE;
 
-	gcwq = get_gcwq(get_cpu_light());
-	spin_lock_irq(&gcwq->lock);
-	list_for_each_entry_safe(work, nw, &non_affine_works, entry) {
-		list_del_init(&work->entry);
-		___queue_work(get_work_cwq(work)->wq, gcwq, work);
+		/* queue rebind_work, wq doesn't matter, use the default one */
+		if (test_and_set_bit(WORK_STRUCT_PENDING_BIT,
+				     work_data_bits(rebind_work)))
+			continue;
+
+		debug_work_activate(rebind_work);
+		insert_work(get_cwq(gcwq->cpu, system_wq), rebind_work,
+			    worker->scheduled.next,
+			    work_color_to_flags(WORK_NO_COLOR));
 	}
+
+	/* relinquish manager role */
+	gcwq->flags &= ~GCWQ_MANAGING_WORKERS;
+
+	/* notify completion */
+	gcwq->trustee = NULL;
+	gcwq->trustee_state = TRUSTEE_DONE;
+	wake_up_all(&gcwq->trustee_wait);
 	spin_unlock_irq(&gcwq->lock);
-	put_cpu_light();
+	return 0;
 }
 
-static int __devinit workqueue_cpu_down_callback(struct notifier_block *nfb,
+/**
+ * wait_trustee_state - wait for trustee to enter the specified state
+ * @gcwq: gcwq the trustee of interest belongs to
+ * @state: target state to wait for
+ *
+ * Wait for the trustee to reach @state.  DONE is already matched.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock) which may be released and regrabbed
+ * multiple times.  To be used by cpu_callback.
+ */
+static void __cpuinit wait_trustee_state(struct global_cwq *gcwq, int state)
+__releases(&gcwq->lock)
+__acquires(&gcwq->lock)
+{
+	if (!(gcwq->trustee_state == state ||
+	      gcwq->trustee_state == TRUSTEE_DONE)) {
+		spin_unlock_irq(&gcwq->lock);
+		__wait_event(gcwq->trustee_wait,
+			     gcwq->trustee_state == state ||
+			     gcwq->trustee_state == TRUSTEE_DONE);
+		spin_lock_irq(&gcwq->lock);
+	}
+}
+
+static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
 						unsigned long action,
 						void *hcpu)
 {
 	unsigned int cpu = (unsigned long)hcpu;
 	struct global_cwq *gcwq = get_gcwq(cpu);
+	struct task_struct *new_trustee = NULL;
+	struct worker *uninitialized_var(new_worker);
+	unsigned long flags;
 
 	action &= ~CPU_TASKS_FROZEN;
 
-        switch (action) {
-        case CPU_DOWN_PREPARE:
-                flush_gcwq(gcwq);
-                break;
-        }
+	switch (action) {
+	case CPU_DOWN_PREPARE:
+		new_trustee = kthread_create(trustee_thread, gcwq,
+					     "workqueue_trustee/%d\n", cpu);
+		if (IS_ERR(new_trustee))
+			return notifier_from_errno(PTR_ERR(new_trustee));
+		kthread_bind(new_trustee, cpu);
+		/* fall through */
+	case CPU_UP_PREPARE:
+		BUG_ON(gcwq->first_idle);
+		new_worker = create_worker(gcwq, false);
+		if (!new_worker) {
+			if (new_trustee)
+				kthread_stop(new_trustee);
+			return NOTIFY_BAD;
+		}
+		break;
+	case CPU_POST_DEAD:
+	case CPU_UP_CANCELED:
+	case CPU_DOWN_FAILED:
+	case CPU_ONLINE:
+		break;
+	case CPU_DYING:
+		/*
+		 * We access this lockless. We are on the dying CPU
+		 * and called from stomp machine.
+		 *
+		 * Before this, the trustee and all workers except for
+		 * the ones which are still executing works from
+		 * before the last CPU down must be on the cpu.  After
+		 * this, they'll all be diasporas.
+		 */
+		gcwq->flags |= GCWQ_DISASSOCIATED;
+	default:
+		goto out;
+	}
+
+	/* some are called w/ irq disabled, don't disturb irq status */
+	spin_lock_irqsave(&gcwq->lock, flags);
+
+	switch (action) {
+	case CPU_DOWN_PREPARE:
+		/* initialize trustee and tell it to acquire the gcwq */
+		BUG_ON(gcwq->trustee || gcwq->trustee_state != TRUSTEE_DONE);
+		gcwq->trustee = new_trustee;
+		gcwq->trustee_state = TRUSTEE_START;
+		wake_up_process(gcwq->trustee);
+		wait_trustee_state(gcwq, TRUSTEE_IN_CHARGE);
+		/* fall through */
+	case CPU_UP_PREPARE:
+		BUG_ON(gcwq->first_idle);
+		gcwq->first_idle = new_worker;
+		break;
+
+	case CPU_POST_DEAD:
+		gcwq->trustee_state = TRUSTEE_BUTCHER;
+		/* fall through */
+	case CPU_UP_CANCELED:
+		destroy_worker(gcwq->first_idle);
+		gcwq->first_idle = NULL;
+		break;
 
+	case CPU_DOWN_FAILED:
+	case CPU_ONLINE:
+		gcwq->flags &= ~GCWQ_DISASSOCIATED;
+		if (gcwq->trustee_state != TRUSTEE_DONE) {
+			gcwq->trustee_state = TRUSTEE_RELEASE;
+			wake_up_process(gcwq->trustee);
+			wait_trustee_state(gcwq, TRUSTEE_DONE);
+		}
+
+		/*
+		 * Trustee is done and there might be no worker left.
+		 * Put the first_idle in and request a real manager to
+		 * take a look.
+		 */
+		spin_unlock_irq(&gcwq->lock);
+		kthread_bind(gcwq->first_idle->task, cpu);
+		spin_lock_irq(&gcwq->lock);
+		gcwq->flags |= GCWQ_MANAGE_WORKERS;
+		start_worker(gcwq->first_idle);
+		gcwq->first_idle = NULL;
+		break;
+	}
+
+	spin_unlock_irqrestore(&gcwq->lock, flags);
 
+out:
 	return notifier_from_errno(0);
 }
 
@@ -3513,8 +3773,7 @@ static int __init init_workqueues(void)
 	unsigned int cpu;
 	int i;
 
-	cpu_notifier(workqueue_cpu_up_callback, CPU_PRI_WORKQUEUE_ACTIVE);
- 	hotcpu_notifier(workqueue_cpu_down_callback, CPU_PRI_WORKQUEUE_INACTIVE);
+	cpu_notifier(workqueue_cpu_callback, CPU_PRI_WORKQUEUE);
 
 	/* initialize gcwqs */
 	for_each_gcwq_cpu(cpu) {
@@ -3537,7 +3796,9 @@ static int __init init_workqueues(void)
 			    (unsigned long)gcwq);
 
 		ida_init(&gcwq->worker_ida);
-		init_waitqueue_head(&gcwq->idle_wait);
+
+		gcwq->trustee_state = TRUSTEE_DONE;
+		init_waitqueue_head(&gcwq->trustee_wait);
 	}
 
 	/* create the initial worker */
-- 
1.7.10.4


--
To unsubscribe from this list: send the line "unsubscribe linux-rt-users" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html


[Index of Archives]     [RT Stable]     [Kernel Newbies]     [IDE]     [Security]     [Git]     [Netfilter]     [Bugtraq]     [Yosemite]     [Yosemite News]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux RAID]     [Linux ATA RAID]     [Samba]     [Video 4 Linux]     [Device Mapper]

  Powered by Linux