On Tue, Sep 12, 2023 at 12:39:41PM -0700, Darrick J. Wong wrote: > From: Darrick J. Wong <djwong@xxxxxxxxxx> > > I discovered the following bad behavior in the workqueue code when I > noticed that xfs_scrub was running single-threaded despite having 4 > virtual CPUs allocated to the VM. I observed this sequence: For some reason I thought I had ack'ed this patch in the past... Maybe a Deja vu, or the matrix is broken... in any case: Reviewed-by: Carlos Maiolino <cmaiolino@xxxxxxxxxx> Carlos > > Thread 1 WQ1 WQ2...N > workqueue_create > <start up> > pthread_cond_wait > <start up> > pthread_cond_wait > workqueue_add > next_item == NULL > pthread_cond_signal > > workqueue_add > next_item != NULL > <do not pthread_cond_signal> > > <receives wakeup> > <run first item> > > workqueue_add > next_item != NULL > <do not pthread_cond_signal> > > <run second item> > <run third item> > pthread_cond_wait > > workqueue_terminate > pthread_cond_broadcast > <receives wakeup> > <nothing to do, exits> > <wakes up again> > <nothing to do, exits> > > Notice how threads WQ2...N are completely idle while WQ1 ends up doing > all the work! That wasn't the point of a worker pool! Observe that > thread 1 manages to queue two work items before WQ1 pulls the first item > off the queue. When thread 1 queues the third item, it sees that > next_item is not NULL, so it doesn't wake a worker. If thread 1 queues > all the N work that it has before WQ1 empties the queue, then none of > the other thread get woken up. > > Fix this by maintaining a count of the number of active threads, and > using that to wake either the sole idle thread, or all the threads if > there are many that are idle. This dramatically improves startup > behavior of the workqueue and eliminates the collapse case. > > Signed-off-by: Darrick J. Wong <djwong@xxxxxxxxxx> > --- > libfrog/workqueue.c | 34 ++++++++++++++++++++++++---------- > libfrog/workqueue.h | 1 + > 2 files changed, 25 insertions(+), 10 deletions(-) > > > diff --git a/libfrog/workqueue.c b/libfrog/workqueue.c > index 702a53e2f3c..db5b3f68bc5 100644 > --- a/libfrog/workqueue.c > +++ b/libfrog/workqueue.c > @@ -26,8 +26,8 @@ workqueue_thread(void *arg) > * Check for notification to exit after every chunk of work. > */ > rcu_register_thread(); > + pthread_mutex_lock(&wq->lock); > while (1) { > - pthread_mutex_lock(&wq->lock); > > /* > * Wait for work. > @@ -36,10 +36,8 @@ workqueue_thread(void *arg) > assert(wq->item_count == 0); > pthread_cond_wait(&wq->wakeup, &wq->lock); > } > - if (wq->next_item == NULL && wq->terminate) { > - pthread_mutex_unlock(&wq->lock); > + if (wq->next_item == NULL && wq->terminate) > break; > - } > > /* > * Dequeue work from the head of the list. If the queue was > @@ -57,11 +55,16 @@ workqueue_thread(void *arg) > /* more work, wake up another worker */ > pthread_cond_signal(&wq->wakeup); > } > + wq->active_threads++; > pthread_mutex_unlock(&wq->lock); > > (wi->function)(wi->queue, wi->index, wi->arg); > free(wi); > + > + pthread_mutex_lock(&wq->lock); > + wq->active_threads--; > } > + pthread_mutex_unlock(&wq->lock); > rcu_unregister_thread(); > > return NULL; > @@ -170,12 +173,6 @@ workqueue_add( > restart: > if (wq->next_item == NULL) { > assert(wq->item_count == 0); > - ret = -pthread_cond_signal(&wq->wakeup); > - if (ret) { > - pthread_mutex_unlock(&wq->lock); > - free(wi); > - return ret; > - } > wq->next_item = wi; > } else { > /* throttle on a full queue if configured */ > @@ -192,6 +189,23 @@ workqueue_add( > } > wq->last_item = wi; > wq->item_count++; > + > + if (wq->active_threads == wq->thread_count - 1) { > + /* One thread is idle, wake it */ > + ret = -pthread_cond_signal(&wq->wakeup); > + if (ret) { > + pthread_mutex_unlock(&wq->lock); > + return ret; > + } > + } else if (wq->active_threads < wq->thread_count) { > + /* Multiple threads are idle, wake everyone */ > + ret = -pthread_cond_broadcast(&wq->wakeup); > + if (ret) { > + pthread_mutex_unlock(&wq->lock); > + return ret; > + } > + } > + > pthread_mutex_unlock(&wq->lock); > > return 0; > diff --git a/libfrog/workqueue.h b/libfrog/workqueue.h > index a9c108d0e66..edbe12fabab 100644 > --- a/libfrog/workqueue.h > +++ b/libfrog/workqueue.h > @@ -29,6 +29,7 @@ struct workqueue { > pthread_cond_t wakeup; > unsigned int item_count; > unsigned int thread_count; > + unsigned int active_threads; > bool terminate; > bool terminated; > int max_queued; >