From: Hu Tao <hutao@xxxxxxxxxxxxxx> * src/util/threadpool.c, src/util/threadpool.h: Thread pool implementation * src/Makefile.am: Build thread pool --- src/Makefile.am | 1 + src/util/threadpool.c | 178 +++++++++++++++++++++++++++++++++++++++++++++++++ src/util/threadpool.h | 23 ++++++ 3 files changed, 202 insertions(+), 0 deletions(-) create mode 100644 src/util/threadpool.c create mode 100644 src/util/threadpool.h diff --git a/src/Makefile.am b/src/Makefile.am index a9a1986..d71c644 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -73,6 +73,7 @@ UTIL_SOURCES = \ util/threads.c util/threads.h \ util/threads-pthread.h \ util/threads-win32.h \ + util/threadpool.c util/threadpool.h \ util/uuid.c util/uuid.h \ util/util.c util/util.h \ util/xml.c util/xml.h \ diff --git a/src/util/threadpool.c b/src/util/threadpool.c new file mode 100644 index 0000000..cf998bf --- /dev/null +++ b/src/util/threadpool.c @@ -0,0 +1,178 @@ + +#include <config.h> + +#include "threadpool.h" +#include "memory.h" +#include "threads.h" +#include "virterror_internal.h" + +#define VIR_FROM_THIS VIR_FROM_NONE + +typedef struct _virThreadPoolJob virThreadPoolJob; +typedef virThreadPoolJob *virThreadPoolJobPtr; + +struct _virThreadPoolJob { + virThreadPoolJobPtr next; + + void *data; +}; + +struct _virThreadPool { + int quit; + + virThreadPoolJobFunc jobFunc; + void *jobOpaque; + virThreadPoolJobPtr jobList; + + virMutex mutex; + virCond cond; + virCond quit_cond; + + size_t maxWorkers; + size_t freeWorkers; + size_t nWorkers; + virThreadPtr workers; +}; + +static void virThreadPoolWorker(void *opaque) +{ + virThreadPoolPtr pool = opaque; + + virMutexLock(&pool->mutex); + + while (1) { + while (!pool->quit && + !pool->jobList) { + pool->freeWorkers++; + if (virCondWait(&pool->cond, &pool->mutex) < 0) { + pool->freeWorkers--; + break; + } + pool->freeWorkers--; + } + + if (pool->quit) + break; + + virThreadPoolJobPtr job = pool->jobList; + pool->jobList = pool->jobList->next; + job->next = NULL; + + virMutexUnlock(&pool->mutex); + (pool->jobFunc)(job->data, pool->jobOpaque); + VIR_FREE(job); + virMutexLock(&pool->mutex); + } + + pool->nWorkers--; + if (pool->nWorkers == 0) + virCondSignal(&pool->quit_cond); + virMutexUnlock(&pool->mutex); +} + +virThreadPoolPtr virThreadPoolNew(size_t minWorkers, + size_t maxWorkers, + virThreadPoolJobFunc func, + void *opaque) +{ + virThreadPoolPtr pool; + int i; + + if (VIR_ALLOC(pool) < 0) { + virReportOOMError(); + return NULL; + } + + pool->jobFunc = func; + pool->jobOpaque = opaque; + + if (virMutexInit(&pool->mutex) < 0) + goto error; + if (virCondInit(&pool->cond) < 0) + goto error; + if (virCondInit(&pool->quit_cond) < 0) + goto error; + + if (VIR_ALLOC_N(pool->workers, minWorkers) < 0) + goto error; + + pool->maxWorkers = maxWorkers; + for (i = 0; i < minWorkers; i++) { + if (virThreadCreate(&pool->workers[i], + true, + virThreadPoolWorker, + pool) < 0) + goto error; + pool->nWorkers++; + } + + return pool; + +error: + virThreadPoolFree(pool); + return NULL; +} + +void virThreadPoolFree(virThreadPoolPtr pool) +{ + virMutexLock(&pool->mutex); + pool->quit = 1; + if (pool->nWorkers > 0) { + virCondBroadcast(&pool->cond); + if (virCondWait(&pool->quit_cond, &pool->mutex) < 0) + {} + } + VIR_FREE(pool->workers); + virMutexUnlock(&pool->mutex); + VIR_FREE(pool); +} + +int virThreadPoolSendJob(virThreadPoolPtr pool, + void *jobData) +{ + virThreadPoolJobPtr job; + virThreadPoolJobPtr tmp; + + virMutexLock(&pool->mutex); + if (pool->quit) + goto error; + + if (pool->freeWorkers == 0 && + pool->nWorkers < pool->maxWorkers) { + if (VIR_EXPAND_N(pool->workers, pool->nWorkers, 1) < 0) { + virReportOOMError(); + goto error; + } + + if (virThreadCreate(&pool->workers[pool->nWorkers], + true, + virThreadPoolWorker, + pool) < 0) + goto error; + pool->nWorkers++; + } + + if (VIR_ALLOC(job) < 0) { + virReportOOMError(); + goto error; + } + + job->data = jobData; + + tmp = pool->jobList; + while (tmp && tmp->next) + tmp = tmp->next; + if (tmp) + tmp->next = job; + else + pool->jobList = job; + + virCondSignal(&pool->cond); + virMutexUnlock(&pool->mutex); + + return 0; + +error: + virMutexUnlock(&pool->mutex); + return -1; +} diff --git a/src/util/threadpool.h b/src/util/threadpool.h new file mode 100644 index 0000000..093786f --- /dev/null +++ b/src/util/threadpool.h @@ -0,0 +1,23 @@ +#ifndef __THREADPOOL_H__ +#define __THREADPOOL_H__ + +#include <pthread.h> + +typedef struct _virThreadPool virThreadPool; +typedef virThreadPool *virThreadPoolPtr; + +typedef void (*virThreadPoolJobFunc)(void *jobdata, void *opaque); + +virThreadPoolPtr virThreadPoolNew(size_t minWorkers, + size_t maxWorkers, + virThreadPoolJobFunc func, + void *opaque); + +void virThreadPoolShutdown(virThreadPoolPtr pool); + +void virThreadPoolFree(virThreadPoolPtr pool); + +int virThreadPoolSendJob(virThreadPoolPtr pool, + void *jobdata); + +#endif -- 1.7.2.3 -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list