On 12/01/2010 10:26 AM, Daniel P. Berrange wrote: > From: Hu Tao <hutao@xxxxxxxxxxxxxx> Not sure whether Daniel or Hu will be the next person to post an improvement of this patch. Some of my comments are repeated from Hu's first attempt, although many of them have been addressed in updating to libvirt APIs. > > * src/util/threadpool.c, src/util/threadpool.h: Thread pool > implementation > * src/Makefile.am: Build thread pool > --- > src/Makefile.am | 1 + > src/util/threadpool.c | 178 +++++++++++++++++++++++++++++++++++++++++++++++++ > src/util/threadpool.h | 23 ++++++ > 3 files changed, 202 insertions(+), 0 deletions(-) > create mode 100644 src/util/threadpool.c > create mode 100644 src/util/threadpool.h > +++ b/src/util/threadpool.c > @@ -0,0 +1,178 @@ Still missing copyright blurb. > + > +struct _virThreadPool { > + int quit; s/int/bool/ (which means including <stdbool.h> earlier) > +virThreadPoolPtr virThreadPoolNew(size_t minWorkers, > + size_t maxWorkers, > + virThreadPoolJobFunc func, > + void *opaque) > +{ > + virThreadPoolPtr pool; > + int i; s/int/size_t/ > +void virThreadPoolFree(virThreadPoolPtr pool) > +{ if (!pool) return; > + virMutexLock(&pool->mutex); What happens if the reason we called this was because virMutexInit failed during virThreadPoolNew? Is it still safe to blindly call virMutexLock on a broken mutex? > + pool->quit = 1; s/1/true/ > + if (pool->nWorkers > 0) { > + virCondBroadcast(&pool->cond); > + if (virCondWait(&pool->quit_cond, &pool->mutex) < 0) > + {} Is it any better to use "ignore-value.h" and ignore_value(virCondWait), instead of the empty if () {}? > + } > + VIR_FREE(pool->workers); > + virMutexUnlock(&pool->mutex); missing cleanups for mutex, cond, quit_cond (virMutexDestroy, virCondDestroy). And if you are forcefully quitting while jobs are still unclaimed, is it possible that jobList might also need freeing, or is that guaranteed to be taken care of by the virCondWait? > + VIR_FREE(pool); > +} > + > +int virThreadPoolSendJob(virThreadPoolPtr pool, > + void *jobData) > +{ > + virThreadPoolJobPtr job; > + virThreadPoolJobPtr tmp; > + > + virMutexLock(&pool->mutex); > + if (pool->quit) > + goto error; > + > + if (pool->freeWorkers == 0 && > + pool->nWorkers < pool->maxWorkers) { > + if (VIR_EXPAND_N(pool->workers, pool->nWorkers, 1) < 0) { > + virReportOOMError(); > + goto error; > + } > + > + if (virThreadCreate(&pool->workers[pool->nWorkers], > + true, > + virThreadPoolWorker, > + pool) < 0) > + goto error; > + pool->nWorkers++; Problem. VIR_EXPAND_N already incremented pool->nWorkers, but you are incrementing it again. You either need to use: VIR_RESIZE_N(pool->workers, pool->maxworkers, pool->nWorkers, 1) pool->nWorkers++ or keep the VIR_EXPAND_N, but set pool->workers[pool->nWorkers - 1] to the created thread. > + } > + > + if (VIR_ALLOC(job) < 0) { > + virReportOOMError(); > + goto error; > + } > + > + job->data = jobData; > + > + tmp = pool->jobList; > + while (tmp && tmp->next) > + tmp = tmp->next; > + if (tmp) > + tmp->next = job; > + else > + pool->jobList = job; Good - FIFO job ordering. However, this is O(n) list traversal; would it make sense to keep track of a list head and list tail pointer, so that we can have O(1) job insertion, or is that extra bookkeeping not worth it given that our maximum job queue size is not going to typically be that large? > + > + virCondSignal(&pool->cond); > + virMutexUnlock(&pool->mutex); > + > + return 0; > + > +error: > + virMutexUnlock(&pool->mutex); > + return -1; > +} > diff --git a/src/util/threadpool.h b/src/util/threadpool.h > new file mode 100644 > index 0000000..093786f > --- /dev/null > +++ b/src/util/threadpool.h > @@ -0,0 +1,23 @@ Missing copyright blurb. > +#ifndef __THREADPOOL_H__ > +#define __THREADPOOL_H__ I'd prefer __VIR_THREADPOOL_H__, to avoid potential collisions in the __ namespace. > + > +#include <pthread.h> Wrong header; this should be "threads.h" > + > +typedef struct _virThreadPool virThreadPool; > +typedef virThreadPool *virThreadPoolPtr; > + > +typedef void (*virThreadPoolJobFunc)(void *jobdata, void *opaque); > + > +virThreadPoolPtr virThreadPoolNew(size_t minWorkers, > + size_t maxWorkers, > + virThreadPoolJobFunc func, > + void *opaque); ATTRIBUTE_NONNULL(3) ATTRIBUTE_RETURN_CHECK > + > +void virThreadPoolShutdown(virThreadPoolPtr pool); ATTRIBUTE_NONNULL(1) > + > +void virThreadPoolFree(virThreadPoolPtr pool); add this to the list of free-like functions in cfg.mk > + > +int virThreadPoolSendJob(virThreadPoolPtr pool, > + void *jobdata); ATTRIBUTE_NONNULL(1) ATTRIBUTE_NONNULL(2) ATTRIBUTE_RETURN_CHECK -- Eric Blake eblake@xxxxxxxxxx +1-801-349-2682 Libvirt virtualization library http://libvirt.org
Attachment:
signature.asc
Description: OpenPGP digital signature
-- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list