--- src/Makefile.am | 1 + src/util/threadpool.c | 140 +++++++++++++++++++++++++++++++++++++++++++++++++ src/util/threadpool.h | 35 ++++++++++++ 3 files changed, 176 insertions(+), 0 deletions(-) create mode 100644 src/util/threadpool.c create mode 100644 src/util/threadpool.h diff --git a/src/Makefile.am b/src/Makefile.am index a9a1986..5febd76 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -76,6 +76,7 @@ UTIL_SOURCES = \ util/uuid.c util/uuid.h \ util/util.c util/util.h \ util/xml.c util/xml.h \ + util/threadpool.c util/threadpool.h \ util/virtaudit.c util/virtaudit.h \ util/virterror.c util/virterror_internal.h diff --git a/src/util/threadpool.c b/src/util/threadpool.c new file mode 100644 index 0000000..4bf0f8d --- /dev/null +++ b/src/util/threadpool.c @@ -0,0 +1,140 @@ +#include <config.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include "threadpool.h" + +static void *workerHandleJob(void *data) +{ + struct virData *localData = NULL; + struct virWorkerPool *pool = data; + + pthread_mutex_lock(&pool->mutex); + + while (1) { + while (!pool->quit && !pool->dataList) { + pool->nFreeWorker++; + pthread_cond_signal(&pool->worker_cond); + pthread_cond_wait(&pool->cond, &pool->mutex); + pool->nFreeWorker--; + + if (pool->nWorker > pool->nMaxWorker) + goto out; + } + + while ((localData = pool->dataList) != NULL) { + pool->dataList = pool->dataList->next; + localData->next = NULL; + + pthread_mutex_unlock(&pool->mutex); + + (pool->func)(localData->data); + free(localData); + + pthread_mutex_lock(&pool->mutex); + } + + if (pool->quit) + break; + } + +out: + pool->nWorker--; + if (pool->nWorker == 0) + pthread_cond_signal(&pool->quit_cond); + pthread_mutex_unlock(&pool->mutex); + + return NULL; +} + +struct virWorkerPool *virWorkerPoolNew(int nWorker, int maxWorker, virWorkerFunc func) +{ + struct virWorkerPool *pool; + pthread_t pid; + int i; + + if (nWorker < 0) + return NULL; + + if (nWorker > maxWorker) + return NULL; + + pool = malloc(sizeof(*pool)); + if (!pool) + return NULL; + + memset(pool, 0, sizeof(*pool)); + pool->func = func; + pthread_mutex_init(&pool->mutex, NULL); + pthread_cond_init(&pool->cond, NULL); + pthread_cond_init(&pool->worker_cond, NULL); + pthread_cond_init(&pool->quit_cond, NULL); + + for (i = 0; i < nWorker; i++) { + pthread_create(&pid, NULL, workerHandleJob, pool); + } + + pool->nFreeWorker = 0; + pool->nWorker = nWorker; + pool->nMaxWorker = maxWorker; + + return pool; +} + +void virWorkerPoolFree(struct virWorkerPool *pool) +{ + pthread_mutex_lock(&pool->mutex); + pool->quit = 1; + if (pool->nWorker > 0) { + pthread_cond_broadcast(&pool->cond); + pthread_cond_wait(&pool->quit_cond, &pool->mutex); + } + pthread_mutex_unlock(&pool->mutex); + free(pool); +} + +int virWorkerPoolSendJob(struct virWorkerPool *pool, void *data) +{ + pthread_t pid; + struct virData *localData; + + localData = malloc(sizeof(*localData)); + if (!localData) + return -1; + + localData->data = data; + + pthread_mutex_lock(&pool->mutex); + if (pool->quit) { + pthread_mutex_unlock(&pool->mutex); + free(localData); + return -1; + } + + localData->next = pool->dataList; + pool->dataList = localData; + + if (pool->nFreeWorker == 0 && pool->nWorker < pool->nMaxWorker) { + pthread_create(&pid, NULL, workerHandleJob, pool); + pool->nWorker++; + } + + pthread_cond_signal(&pool->cond); + + pthread_mutex_unlock(&pool->mutex); + + return 0; +} + +int virWorkerPoolSetMaxWorker(struct virWorkerPool *pool, int maxWorker) +{ + if (maxWorker < 0) + return -1; + + pthread_mutex_lock(&pool->mutex); + pool->nMaxWorker = maxWorker; + pthread_mutex_unlock(&pool->mutex); + + return 0; +} diff --git a/src/util/threadpool.h b/src/util/threadpool.h new file mode 100644 index 0000000..5ff3a6b --- /dev/null +++ b/src/util/threadpool.h @@ -0,0 +1,35 @@ +#ifndef __THREADPOOL_H__ +#define __THREADPOOL_H__ + +#include <pthread.h> + +typedef void (*virWorkerFunc)(void *); + +struct virData { + struct virData *next; + + void *data; +}; + +struct virWorkerPool { + int nWorker; + int nMaxWorker; + int nFreeWorker; + + int quit; + + virWorkerFunc func; + struct virData *dataList; + + pthread_mutex_t mutex; + pthread_cond_t cond; + pthread_cond_t worker_cond; + pthread_cond_t quit_cond; +}; + +struct virWorkerPool *virWorkerPoolNew(int nWorker, int nMaxWorker, virWorkerFunc func); +void virWorkerPoolFree(struct virWorkerPool *pool); +int virWorkerPoolSendJob(struct virWorkerPool *wp, void *data); +int virWorkerPoolSetMaxWorker(struct virWorkerPool *pool, int maxWorker); + +#endif -- 1.7.3 -- Thanks, Hu Tao -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list