On Wed, Dec 01, 2010 at 05:26:27PM +0000, Daniel P. Berrange wrote: > From: Hu Tao <hutao@xxxxxxxxxxxxxx> > > * src/util/threadpool.c, src/util/threadpool.h: Thread pool > implementation > * src/Makefile.am: Build thread pool > --- > src/Makefile.am | 1 + > src/util/threadpool.c | 178 +++++++++++++++++++++++++++++++++++++++++++++++++ > src/util/threadpool.h | 23 ++++++ > 3 files changed, 202 insertions(+), 0 deletions(-) > create mode 100644 src/util/threadpool.c > create mode 100644 src/util/threadpool.h > > diff --git a/src/Makefile.am b/src/Makefile.am > index a9a1986..d71c644 100644 > --- a/src/Makefile.am > +++ b/src/Makefile.am > @@ -73,6 +73,7 @@ UTIL_SOURCES = \ > util/threads.c util/threads.h \ > util/threads-pthread.h \ > util/threads-win32.h \ > + util/threadpool.c util/threadpool.h \ > util/uuid.c util/uuid.h \ > util/util.c util/util.h \ > util/xml.c util/xml.h \ > diff --git a/src/util/threadpool.c b/src/util/threadpool.c > new file mode 100644 > index 0000000..cf998bf > --- /dev/null > +++ b/src/util/threadpool.c > @@ -0,0 +1,178 @@ > + > +#include <config.h> > + > +#include "threadpool.h" > +#include "memory.h" > +#include "threads.h" > +#include "virterror_internal.h" > + > +#define VIR_FROM_THIS VIR_FROM_NONE > + > +typedef struct _virThreadPoolJob virThreadPoolJob; > +typedef virThreadPoolJob *virThreadPoolJobPtr; > + > +struct _virThreadPoolJob { > + virThreadPoolJobPtr next; > + > + void *data; > +}; > + > +struct _virThreadPool { > + int quit; > + > + virThreadPoolJobFunc jobFunc; > + void *jobOpaque; > + virThreadPoolJobPtr jobList; > + > + virMutex mutex; > + virCond cond; > + virCond quit_cond; > + > + size_t maxWorkers; > + size_t freeWorkers; > + size_t nWorkers; > + virThreadPtr workers; > +}; > + > +static void virThreadPoolWorker(void *opaque) > +{ > + virThreadPoolPtr pool = opaque; > + > + virMutexLock(&pool->mutex); > + > + while (1) { > + while (!pool->quit && > + !pool->jobList) { > + pool->freeWorkers++; > + if (virCondWait(&pool->cond, &pool->mutex) < 0) { > + pool->freeWorkers--; > + break; > + } > + pool->freeWorkers--; > + } > + > + if (pool->quit) > + break; > + > + virThreadPoolJobPtr job = pool->jobList; > + pool->jobList = pool->jobList->next; > + job->next = NULL; > + > + virMutexUnlock(&pool->mutex); > + (pool->jobFunc)(job->data, pool->jobOpaque); This could race if jobFunc does something with jobOpaque unless jobFunc is aware of this and provides a lock to protect jobOpaque. > + VIR_FREE(job); > + virMutexLock(&pool->mutex); > + } > + > + pool->nWorkers--; > + if (pool->nWorkers == 0) > + virCondSignal(&pool->quit_cond); > + virMutexUnlock(&pool->mutex); > +} > + > +virThreadPoolPtr virThreadPoolNew(size_t minWorkers, > + size_t maxWorkers, > + virThreadPoolJobFunc func, > + void *opaque) > +{ > + virThreadPoolPtr pool; > + int i; > + > + if (VIR_ALLOC(pool) < 0) { > + virReportOOMError(); > + return NULL; > + } > + > + pool->jobFunc = func; > + pool->jobOpaque = opaque; > + > + if (virMutexInit(&pool->mutex) < 0) > + goto error; > + if (virCondInit(&pool->cond) < 0) > + goto error; > + if (virCondInit(&pool->quit_cond) < 0) > + goto error; > + > + if (VIR_ALLOC_N(pool->workers, minWorkers) < 0) > + goto error; > + > + pool->maxWorkers = maxWorkers; > + for (i = 0; i < minWorkers; i++) { > + if (virThreadCreate(&pool->workers[i], > + true, > + virThreadPoolWorker, > + pool) < 0) > + goto error; > + pool->nWorkers++; > + } There will be more than maxWorkers threads created if minWorkers > maxWorkers > + > + return pool; > + > +error: > + virThreadPoolFree(pool); > + return NULL; > +} > + > +void virThreadPoolFree(virThreadPoolPtr pool) > +{ > + virMutexLock(&pool->mutex); > + pool->quit = 1; > + if (pool->nWorkers > 0) { > + virCondBroadcast(&pool->cond); > + if (virCondWait(&pool->quit_cond, &pool->mutex) < 0) > + {} > + } > + VIR_FREE(pool->workers); > + virMutexUnlock(&pool->mutex); > + VIR_FREE(pool); > +} > + > +int virThreadPoolSendJob(virThreadPoolPtr pool, > + void *jobData) > +{ > + virThreadPoolJobPtr job; > + virThreadPoolJobPtr tmp; > + > + virMutexLock(&pool->mutex); > + if (pool->quit) > + goto error; > + > + if (pool->freeWorkers == 0 && > + pool->nWorkers < pool->maxWorkers) { > + if (VIR_EXPAND_N(pool->workers, pool->nWorkers, 1) < 0) { > + virReportOOMError(); > + goto error; > + } > + > + if (virThreadCreate(&pool->workers[pool->nWorkers], > + true, > + virThreadPoolWorker, > + pool) < 0) > + goto error; > + pool->nWorkers++; > + } > + > + if (VIR_ALLOC(job) < 0) { > + virReportOOMError(); > + goto error; > + } > + > + job->data = jobData; > + > + tmp = pool->jobList; > + while (tmp && tmp->next) > + tmp = tmp->next; > + if (tmp) > + tmp->next = job; > + else > + pool->jobList = job; > + > + virCondSignal(&pool->cond); > + virMutexUnlock(&pool->mutex); > + > + return 0; > + > +error: > + virMutexUnlock(&pool->mutex); > + return -1; > +} > diff --git a/src/util/threadpool.h b/src/util/threadpool.h > new file mode 100644 > index 0000000..093786f > --- /dev/null > +++ b/src/util/threadpool.h > @@ -0,0 +1,23 @@ > +#ifndef __THREADPOOL_H__ > +#define __THREADPOOL_H__ > + > +#include <pthread.h> > + > +typedef struct _virThreadPool virThreadPool; > +typedef virThreadPool *virThreadPoolPtr; > + > +typedef void (*virThreadPoolJobFunc)(void *jobdata, void *opaque); > + > +virThreadPoolPtr virThreadPoolNew(size_t minWorkers, > + size_t maxWorkers, > + virThreadPoolJobFunc func, > + void *opaque); > + > +void virThreadPoolShutdown(virThreadPoolPtr pool); > + > +void virThreadPoolFree(virThreadPoolPtr pool); > + > +int virThreadPoolSendJob(virThreadPoolPtr pool, > + void *jobdata); > + > +#endif > -- > 1.7.2.3 -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list