[PATCH 1/6] libfrog: fix overly sleep workqueues

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



From: Darrick J. Wong <djwong@xxxxxxxxxx>

I discovered the following bad behavior in the workqueue code when I
noticed that xfs_scrub was running single-threaded despite having 4
virtual CPUs allocated to the VM.  I observed this sequence:

Thread 1	WQ1		WQ2...N
workqueue_create
		<start up>
		pthread_cond_wait
				<start up>
				pthread_cond_wait
workqueue_add
next_item == NULL
pthread_cond_signal

workqueue_add
next_item != NULL
<do not pthread_cond_signal>

		<receives wakeup>
		<run first item>

workqueue_add
next_item != NULL
<do not pthread_cond_signal>

		<run second item>
		<run third item>
		pthread_cond_wait

workqueue_terminate
pthread_cond_broadcast
				<receives wakeup>
				<nothing to do, exits>
		<wakes up again>
		<nothing to do, exits>

Notice how threads WQ2...N are completely idle while WQ1 ends up doing
all the work!  That wasn't the point of a worker pool!  Observe that
thread 1 manages to queue two work items before WQ1 pulls the first item
off the queue.  When thread 1 queues the third item, it sees that
next_item is not NULL, so it doesn't wake a worker.  If thread 1 queues
all the N work that it has before WQ1 empties the queue, then none of
the other thread get woken up.

Fix this by maintaining a count of the number of active threads, and
using that to wake either the sole idle thread, or all the threads if
there are many that are idle.  This dramatically improves startup
behavior of the workqueue and eliminates the collapse case.

Signed-off-by: Darrick J. Wong <djwong@xxxxxxxxxx>
---
 libfrog/workqueue.c |   34 ++++++++++++++++++++++++----------
 libfrog/workqueue.h |    1 +
 2 files changed, 25 insertions(+), 10 deletions(-)


diff --git a/libfrog/workqueue.c b/libfrog/workqueue.c
index 702a53e2f3c..db5b3f68bc5 100644
--- a/libfrog/workqueue.c
+++ b/libfrog/workqueue.c
@@ -26,8 +26,8 @@ workqueue_thread(void *arg)
 	 * Check for notification to exit after every chunk of work.
 	 */
 	rcu_register_thread();
+	pthread_mutex_lock(&wq->lock);
 	while (1) {
-		pthread_mutex_lock(&wq->lock);
 
 		/*
 		 * Wait for work.
@@ -36,10 +36,8 @@ workqueue_thread(void *arg)
 			assert(wq->item_count == 0);
 			pthread_cond_wait(&wq->wakeup, &wq->lock);
 		}
-		if (wq->next_item == NULL && wq->terminate) {
-			pthread_mutex_unlock(&wq->lock);
+		if (wq->next_item == NULL && wq->terminate)
 			break;
-		}
 
 		/*
 		 *  Dequeue work from the head of the list. If the queue was
@@ -57,11 +55,16 @@ workqueue_thread(void *arg)
 			/* more work, wake up another worker */
 			pthread_cond_signal(&wq->wakeup);
 		}
+		wq->active_threads++;
 		pthread_mutex_unlock(&wq->lock);
 
 		(wi->function)(wi->queue, wi->index, wi->arg);
 		free(wi);
+
+		pthread_mutex_lock(&wq->lock);
+		wq->active_threads--;
 	}
+	pthread_mutex_unlock(&wq->lock);
 	rcu_unregister_thread();
 
 	return NULL;
@@ -170,12 +173,6 @@ workqueue_add(
 restart:
 	if (wq->next_item == NULL) {
 		assert(wq->item_count == 0);
-		ret = -pthread_cond_signal(&wq->wakeup);
-		if (ret) {
-			pthread_mutex_unlock(&wq->lock);
-			free(wi);
-			return ret;
-		}
 		wq->next_item = wi;
 	} else {
 		/* throttle on a full queue if configured */
@@ -192,6 +189,23 @@ workqueue_add(
 	}
 	wq->last_item = wi;
 	wq->item_count++;
+
+	if (wq->active_threads == wq->thread_count - 1) {
+		/* One thread is idle, wake it */
+		ret = -pthread_cond_signal(&wq->wakeup);
+		if (ret) {
+			pthread_mutex_unlock(&wq->lock);
+			return ret;
+		}
+	} else if (wq->active_threads < wq->thread_count) {
+		/* Multiple threads are idle, wake everyone */
+		ret = -pthread_cond_broadcast(&wq->wakeup);
+		if (ret) {
+			pthread_mutex_unlock(&wq->lock);
+			return ret;
+		}
+	}
+
 	pthread_mutex_unlock(&wq->lock);
 
 	return 0;
diff --git a/libfrog/workqueue.h b/libfrog/workqueue.h
index a9c108d0e66..edbe12fabab 100644
--- a/libfrog/workqueue.h
+++ b/libfrog/workqueue.h
@@ -29,6 +29,7 @@ struct workqueue {
 	pthread_cond_t		wakeup;
 	unsigned int		item_count;
 	unsigned int		thread_count;
+	unsigned int		active_threads;
 	bool			terminate;
 	bool			terminated;
 	int			max_queued;




[Index of Archives]     [XFS Filesystem Development (older mail)]     [Linux Filesystem Development]     [Linux Audio Users]     [Yosemite Trails]     [Linux Kernel]     [Linux RAID]     [Linux SCSI]


  Powered by Linux