On Fri, Mar 19, 2021 at 12:33:49PM +1100, Dave Chinner wrote: > From: Dave Chinner <dchinner@xxxxxxxxxx> > > Existing users of workqueues have bound maximum queue depths in > their external algorithms (e.g. prefetch counts). For parallelising > work that doesn't have an external bound, allow workqueues to > throttle incoming requests at a maximum bound. Bounded workqueues > also need to distribute work over all worker threads themselves as > there is no external bounding or worker function throttling > provided. > > Existing callers are not throttled and retain direct control of > worker threads, only users of the new create interface will be > throttled and concurrency managed. > > Signed-off-by: Dave Chinner <dchinner@xxxxxxxxxx> > --- > libfrog/workqueue.c | 42 +++++++++++++++++++++++++++++++++++++++--- > libfrog/workqueue.h | 4 ++++ > 2 files changed, 43 insertions(+), 3 deletions(-) > > diff --git a/libfrog/workqueue.c b/libfrog/workqueue.c > index fe3de4289379..97f3bf76d9b9 100644 > --- a/libfrog/workqueue.c > +++ b/libfrog/workqueue.c > @@ -40,13 +40,21 @@ workqueue_thread(void *arg) > } > > /* > - * Dequeue work from the head of the list. > + * Dequeue work from the head of the list. If the queue was > + * full then send a wakeup if we're configured to do so. > */ > assert(wq->item_count > 0); > + if (wq->max_queued && wq->item_count == wq->max_queued) > + pthread_cond_broadcast(&wq->queue_full); FWIW I'll probably end up changing this to: if (wq->max_queued) pthread_cond_signal(&wq->queue_full); To support multiple producers whenever I get around to posting the giant scrub rewrite that is .... 272nd in line. In the meantime: Reviewed-by: Darrick J. Wong <djwong@xxxxxxxxxx> --D > wi = wq->next_item; > wq->next_item = wi->next; > wq->item_count--; > > + if (wq->max_queued && wq->next_item) { > + /* more work, wake up another worker */ > + pthread_cond_signal(&wq->wakeup); > + } > pthread_mutex_unlock(&wq->lock); > > (wi->function)(wi->queue, wi->index, wi->arg); > @@ -58,10 +66,11 @@ workqueue_thread(void *arg) > > /* Allocate a work queue and threads. Returns zero or negative error code. */ > int > -workqueue_create( > +workqueue_create_bound( > struct workqueue *wq, > void *wq_ctx, > - unsigned int nr_workers) > + unsigned int nr_workers, > + unsigned int max_queue) > { > unsigned int i; > int err = 0; > @@ -70,12 +79,16 @@ workqueue_create( > err = -pthread_cond_init(&wq->wakeup, NULL); > if (err) > return err; > + err = -pthread_cond_init(&wq->queue_full, NULL); > + if (err) > + goto out_wake; > err = -pthread_mutex_init(&wq->lock, NULL); > if (err) > goto out_cond; > > wq->wq_ctx = wq_ctx; > wq->thread_count = nr_workers; > + wq->max_queued = max_queue; > wq->threads = malloc(nr_workers * sizeof(pthread_t)); > if (!wq->threads) { > err = -errno; > @@ -102,10 +115,21 @@ workqueue_create( > out_mutex: > pthread_mutex_destroy(&wq->lock); > out_cond: > + pthread_cond_destroy(&wq->queue_full); > +out_wake: > pthread_cond_destroy(&wq->wakeup); > return err; > } > > +int > +workqueue_create( > + struct workqueue *wq, > + void *wq_ctx, > + unsigned int nr_workers) > +{ > + return workqueue_create_bound(wq, wq_ctx, nr_workers, 0); > +} > + > /* > * Create a work item consisting of a function and some arguments and schedule > * the work item to be run via the thread pool. Returns zero or a negative > @@ -140,6 +164,7 @@ workqueue_add( > > /* Now queue the new work structure to the work queue. */ > pthread_mutex_lock(&wq->lock); > +restart: > if (wq->next_item == NULL) { > assert(wq->item_count == 0); > ret = -pthread_cond_signal(&wq->wakeup); > @@ -150,6 +175,16 @@ workqueue_add( > } > wq->next_item = wi; > } else { > + /* throttle on a full queue if configured */ > + if (wq->max_queued && wq->item_count == wq->max_queued) { > + pthread_cond_wait(&wq->queue_full, &wq->lock); > + /* > + * Queue might be empty or even still full by the time > + * we get the lock back, so restart the lookup so we do > + * the right thing with the current state of the queue. > + */ > + goto restart; > + } > wq->last_item->next = wi; > } > wq->last_item = wi; > @@ -201,5 +236,6 @@ workqueue_destroy( > free(wq->threads); > pthread_mutex_destroy(&wq->lock); > pthread_cond_destroy(&wq->wakeup); > + pthread_cond_destroy(&wq->queue_full); > memset(wq, 0, sizeof(*wq)); > } > diff --git a/libfrog/workqueue.h b/libfrog/workqueue.h > index a56d1cf14081..a9c108d0e66a 100644 > --- a/libfrog/workqueue.h > +++ b/libfrog/workqueue.h > @@ -31,10 +31,14 @@ struct workqueue { > unsigned int thread_count; > bool terminate; > bool terminated; > + int max_queued; > + pthread_cond_t queue_full; > }; > > int workqueue_create(struct workqueue *wq, void *wq_ctx, > unsigned int nr_workers); > +int workqueue_create_bound(struct workqueue *wq, void *wq_ctx, > + unsigned int nr_workers, unsigned int max_queue); > int workqueue_add(struct workqueue *wq, workqueue_func_t fn, > uint32_t index, void *arg); > int workqueue_terminate(struct workqueue *wq); > -- > 2.30.1 >