From: Darrick J. Wong <djwong@xxxxxxxxxx> I discovered the following bad behavior in the workqueue code when I noticed that xfs_scrub was running single-threaded despite having 4 virtual CPUs allocated to the VM. I observed this sequence: Thread 1 WQ1 WQ2...N workqueue_create <start up> pthread_cond_wait <start up> pthread_cond_wait workqueue_add next_item == NULL pthread_cond_signal workqueue_add next_item != NULL <do not pthread_cond_signal> <receives wakeup> <run first item> workqueue_add next_item != NULL <do not pthread_cond_signal> <run second item> <run third item> pthread_cond_wait workqueue_terminate pthread_cond_broadcast <receives wakeup> <nothing to do, exits> <wakes up again> <nothing to do, exits> Notice how threads WQ2...N are completely idle while WQ1 ends up doing all the work! That wasn't the point of a worker pool! Observe that thread 1 manages to queue two work items before WQ1 pulls the first item off the queue. When thread 1 queues the third item, it sees that next_item is not NULL, so it doesn't wake a worker. If thread 1 queues all the N work that it has before WQ1 empties the queue, then none of the other thread get woken up. Fix this by maintaining a count of the number of active threads, and using that to wake either the sole idle thread, or all the threads if there are many that are idle. This dramatically improves startup behavior of the workqueue and eliminates the collapse case. Signed-off-by: Darrick J. Wong <djwong@xxxxxxxxxx> --- libfrog/workqueue.c | 34 ++++++++++++++++++++++++---------- libfrog/workqueue.h | 1 + 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/libfrog/workqueue.c b/libfrog/workqueue.c index 702a53e2f3c..db5b3f68bc5 100644 --- a/libfrog/workqueue.c +++ b/libfrog/workqueue.c @@ -26,8 +26,8 @@ workqueue_thread(void *arg) * Check for notification to exit after every chunk of work. */ rcu_register_thread(); + pthread_mutex_lock(&wq->lock); while (1) { - pthread_mutex_lock(&wq->lock); /* * Wait for work. @@ -36,10 +36,8 @@ workqueue_thread(void *arg) assert(wq->item_count == 0); pthread_cond_wait(&wq->wakeup, &wq->lock); } - if (wq->next_item == NULL && wq->terminate) { - pthread_mutex_unlock(&wq->lock); + if (wq->next_item == NULL && wq->terminate) break; - } /* * Dequeue work from the head of the list. If the queue was @@ -57,11 +55,16 @@ workqueue_thread(void *arg) /* more work, wake up another worker */ pthread_cond_signal(&wq->wakeup); } + wq->active_threads++; pthread_mutex_unlock(&wq->lock); (wi->function)(wi->queue, wi->index, wi->arg); free(wi); + + pthread_mutex_lock(&wq->lock); + wq->active_threads--; } + pthread_mutex_unlock(&wq->lock); rcu_unregister_thread(); return NULL; @@ -170,12 +173,6 @@ workqueue_add( restart: if (wq->next_item == NULL) { assert(wq->item_count == 0); - ret = -pthread_cond_signal(&wq->wakeup); - if (ret) { - pthread_mutex_unlock(&wq->lock); - free(wi); - return ret; - } wq->next_item = wi; } else { /* throttle on a full queue if configured */ @@ -192,6 +189,23 @@ workqueue_add( } wq->last_item = wi; wq->item_count++; + + if (wq->active_threads == wq->thread_count - 1) { + /* One thread is idle, wake it */ + ret = -pthread_cond_signal(&wq->wakeup); + if (ret) { + pthread_mutex_unlock(&wq->lock); + return ret; + } + } else if (wq->active_threads < wq->thread_count) { + /* Multiple threads are idle, wake everyone */ + ret = -pthread_cond_broadcast(&wq->wakeup); + if (ret) { + pthread_mutex_unlock(&wq->lock); + return ret; + } + } + pthread_mutex_unlock(&wq->lock); return 0; diff --git a/libfrog/workqueue.h b/libfrog/workqueue.h index a9c108d0e66..edbe12fabab 100644 --- a/libfrog/workqueue.h +++ b/libfrog/workqueue.h @@ -29,6 +29,7 @@ struct workqueue { pthread_cond_t wakeup; unsigned int item_count; unsigned int thread_count; + unsigned int active_threads; bool terminate; bool terminated; int max_queued;