On 11/30/2010 12:14 AM, Hu Tao wrote: > --- > src/Makefile.am | 1 + > src/util/threadpool.c | 140 +++++++++++++++++++++++++++++++++++++++++++++++++ > src/util/threadpool.h | 35 ++++++++++++ > 3 files changed, 176 insertions(+), 0 deletions(-) > create mode 100644 src/util/threadpool.c > create mode 100644 src/util/threadpool.h > > diff --git a/src/Makefile.am b/src/Makefile.am > index a9a1986..5febd76 100644 > --- a/src/Makefile.am > +++ b/src/Makefile.am > @@ -76,6 +76,7 @@ UTIL_SOURCES = \ > util/uuid.c util/uuid.h \ > util/util.c util/util.h \ > util/xml.c util/xml.h \ > + util/threadpool.c util/threadpool.h \ > util/virtaudit.c util/virtaudit.h \ > util/virterror.c util/virterror_internal.h > > diff --git a/src/util/threadpool.c b/src/util/threadpool.c > new file mode 100644 > index 0000000..4bf0f8d > --- /dev/null > +++ b/src/util/threadpool.c > @@ -0,0 +1,140 @@ Copyright header? > +#include <config.h> > +#include <stdio.h> > +#include <stdlib.h> > +#include <string.h> > + > +#include "threadpool.h" > + > +static void *workerHandleJob(void *data) > +{ > + struct virData *localData = NULL; > + struct virWorkerPool *pool = data; > + > + pthread_mutex_lock(&pool->mutex); We should be using virMutexLock here, so as to also be portable to mingw. > + > + while (1) { > + while (!pool->quit && !pool->dataList) { > + pool->nFreeWorker++; > + pthread_cond_signal(&pool->worker_cond); Likewise, virCondSignal here. > + pthread_cond_wait(&pool->cond, &pool->mutex); and virCondWait. > + pool->nFreeWorker--; > + > + if (pool->nWorker > pool->nMaxWorker) > + goto out; > + } > + > + while ((localData = pool->dataList) != NULL) { > + pool->dataList = pool->dataList->next; > + localData->next = NULL; > + > + pthread_mutex_unlock(&pool->mutex); > + > + (pool->func)(localData->data); > + free(localData); VIR_FREE(). > + > + pthread_mutex_lock(&pool->mutex); > + } > + > + if (pool->quit) > + break; > + } > + > +out: > + pool->nWorker--; > + if (pool->nWorker == 0) > + pthread_cond_signal(&pool->quit_cond); > + pthread_mutex_unlock(&pool->mutex); > + > + return NULL; > +} > + > +struct virWorkerPool *virWorkerPoolNew(int nWorker, int maxWorker, virWorkerFunc func) > +{ > + struct virWorkerPool *pool; > + pthread_t pid; > + int i; > + > + if (nWorker < 0) > + return NULL; > + > + if (nWorker > maxWorker) > + return NULL; daemon/libvirtd.c already has a notion of worker threads; I'm wondering how much overlap there is between your implementation and that one. A better proof that this would be a useful API addition would be to have the next patch in the series convert libvirtd.c over to using this API. > + > + pool = malloc(sizeof(*pool)); Run 'make syntax-check' - it would have complained about this. Use VIR_ALLOC or VIR_ALLOC_N instead of malloc. > + if (!pool) > + return NULL; > + > + memset(pool, 0, sizeof(*pool)); > + pool->func = func; > + pthread_mutex_init(&pool->mutex, NULL); virMutexInit() > + pthread_cond_init(&pool->cond, NULL); > + pthread_cond_init(&pool->worker_cond, NULL); > + pthread_cond_init(&pool->quit_cond, NULL); virCondInit() > + > + for (i = 0; i < nWorker; i++) { > + pthread_create(&pid, NULL, workerHandleJob, pool); virThreadCreate() > + } > + > + pool->nFreeWorker = 0; > + pool->nWorker = nWorker; > + pool->nMaxWorker = maxWorker; > + > + return pool; > +} > + > +void virWorkerPoolFree(struct virWorkerPool *pool) > +{ > + pthread_mutex_lock(&pool->mutex); > + pool->quit = 1; Use <stdbool.h> and bool if a value will only ever be 0 or 1. > + if (pool->nWorker > 0) { > + pthread_cond_broadcast(&pool->cond); > + pthread_cond_wait(&pool->quit_cond, &pool->mutex); > + } > + pthread_mutex_unlock(&pool->mutex); > + free(pool); VIR_FREE() > +} > + > +int virWorkerPoolSendJob(struct virWorkerPool *pool, void *data) > +{ > + pthread_t pid; > + struct virData *localData; > + > + localData = malloc(sizeof(*localData)); VIR_ALLOC() > + if (!localData) > + return -1; > + > + localData->data = data; > + > + pthread_mutex_lock(&pool->mutex); > + if (pool->quit) { > + pthread_mutex_unlock(&pool->mutex); > + free(localData); > + return -1; > + } > + > + localData->next = pool->dataList; > + pool->dataList = localData; > + > + if (pool->nFreeWorker == 0 && pool->nWorker < pool->nMaxWorker) { > + pthread_create(&pid, NULL, workerHandleJob, pool); > + pool->nWorker++; > + } > + > + pthread_cond_signal(&pool->cond); > + > + pthread_mutex_unlock(&pool->mutex); > + > + return 0; > +} > + > +int virWorkerPoolSetMaxWorker(struct virWorkerPool *pool, int maxWorker) > +{ > + if (maxWorker < 0) > + return -1; > + > + pthread_mutex_lock(&pool->mutex); > + pool->nMaxWorker = maxWorker; > + pthread_mutex_unlock(&pool->mutex); Does this do the right thing if maxWorker < pool->nMaxWorker, or does it silently lose existing workers? > + > + return 0; > +} > diff --git a/src/util/threadpool.h b/src/util/threadpool.h > new file mode 100644 > index 0000000..5ff3a6b > --- /dev/null > +++ b/src/util/threadpool.h > @@ -0,0 +1,35 @@ Copyright header? > +#ifndef __THREADPOOL_H__ > +#define __THREADPOOL_H__ Use of the __ namespace risks collision with the system; I'd feel better if this were __VIR_THREADPOOL_H__. > + > +#include <pthread.h> "threads.h", not <pthread.h>, so we can support mingw > + > +typedef void (*virWorkerFunc)(void *); pthread_create() takes a function that can return void*. Should worker functions be allowed to return a value? > + > +struct virData { > + struct virData *next; > + > + void *data; > +}; We've typically used typedefs to avoid having to type 'struct virData' everywhere else. > + > +struct virWorkerPool { > + int nWorker; > + int nMaxWorker; > + int nFreeWorker; s/int/size_t/ when dealing with non-zero counts. > + > + int quit; s/int/bool/ > + > + virWorkerFunc func; > + struct virData *dataList; > + > + pthread_mutex_t mutex; > + pthread_cond_t cond; > + pthread_cond_t worker_cond; > + pthread_cond_t quit_cond; virMutex, virCond > +}; > + > +struct virWorkerPool *virWorkerPoolNew(int nWorker, int nMaxWorker, virWorkerFunc func); needs ATTRIBUTE_RETURN_CHECK. > +void virWorkerPoolFree(struct virWorkerPool *pool); > +int virWorkerPoolSendJob(struct virWorkerPool *wp, void *data); ATTRIBUTE_NONNULL(1) > +int virWorkerPoolSetMaxWorker(struct virWorkerPool *pool, int maxWorker); ATTRIBUTE_NONNULL(1) > + > +#endif -- Eric Blake eblake@xxxxxxxxxx +1-801-349-2682 Libvirt virtualization library http://libvirt.org
Attachment:
signature.asc
Description: OpenPGP digital signature
-- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list