* src/util/threadpool.c, src/util/threadpool.h: Thread pool implementation * src/Makefile.am: Build thread pool * src/libvirt_private.syms: Export public functions --- cfg.mk | 1 + src/Makefile.am | 1 + src/libvirt_private.syms | 6 + src/util/threadpool.c | 235 ++++++++++++++++++++++++++++++++++++++++++++++ src/util/threadpool.h | 49 ++++++++++ 5 files changed, 292 insertions(+), 0 deletions(-) create mode 100644 src/util/threadpool.c create mode 100644 src/util/threadpool.h diff --git a/cfg.mk b/cfg.mk index 5576ecb..e4ee763 100644 --- a/cfg.mk +++ b/cfg.mk @@ -127,6 +127,7 @@ useless_free_options = \ --name=virStoragePoolObjFree \ --name=virStoragePoolSourceFree \ --name=virStorageVolDefFree \ + --name=virThreadPoolFree \ --name=xmlFree \ --name=xmlXPathFreeContext \ --name=xmlXPathFreeObject diff --git a/src/Makefile.am b/src/Makefile.am index a9a1986..d71c644 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -73,6 +73,7 @@ UTIL_SOURCES = \ util/threads.c util/threads.h \ util/threads-pthread.h \ util/threads-win32.h \ + util/threadpool.c util/threadpool.h \ util/uuid.c util/uuid.h \ util/util.c util/util.h \ util/xml.c util/xml.h \ diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms index f251c94..70c68cb 100644 --- a/src/libvirt_private.syms +++ b/src/libvirt_private.syms @@ -861,3 +861,9 @@ virXPathStringLimit; virXPathULong; virXPathULongHex; virXPathULongLong; + + +# threadpool.h +virThreadPoolNew; +virThreadPoolFree; +virThreadPoolSendJob; diff --git a/src/util/threadpool.c b/src/util/threadpool.c new file mode 100644 index 0000000..a5f24c2 --- /dev/null +++ b/src/util/threadpool.c @@ -0,0 +1,235 @@ +/* + * threadpool.c: a generic thread pool implementation + * + * Copyright (C) 2010 Hu Tao + * Copyright (C) 2010 Daniel P. Berrange + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * Authors: + * Hu Tao <hutao@xxxxxxxxxxxxxx> + * Daniel P. Berrange <berrange@xxxxxxxxxx> + */ + +#include <config.h> + +#include "threadpool.h" +#include "memory.h" +#include "threads.h" +#include "virterror_internal.h" +#include "ignore-value.h" + +#define VIR_FROM_THIS VIR_FROM_NONE + +typedef struct _virThreadPoolJob virThreadPoolJob; +typedef virThreadPoolJob *virThreadPoolJobPtr; + +struct _virThreadPoolJob { + virThreadPoolJobPtr next; + + void *data; +}; + +typedef struct _virThreadPoolJobList virThreadPoolJobList; +typedef virThreadPoolJobList *virThreadPoolJobListPtr; + +struct _virThreadPoolJobList { + virThreadPoolJobPtr head; + virThreadPoolJobPtr *tail; +}; + + +struct _virThreadPool { + bool quit; + + virThreadPoolJobFunc jobFunc; + void *jobOpaque; + virThreadPoolJobList jobList; + + virMutex mutex; + virCond cond; + virCond quit_cond; + + size_t maxWorkers; + size_t freeWorkers; + size_t nWorkers; + virThreadPtr workers; +}; + +static void virThreadPoolWorker(void *opaque) +{ + virThreadPoolPtr pool = opaque; + + virMutexLock(&pool->mutex); + + while (1) { + while (!pool->quit && + !pool->jobList.head) { + pool->freeWorkers++; + if (virCondWait(&pool->cond, &pool->mutex) < 0) { + pool->freeWorkers--; + goto out; + } + pool->freeWorkers--; + } + + if (pool->quit) + break; + + virThreadPoolJobPtr job = pool->jobList.head; + pool->jobList.head = pool->jobList.head->next; + job->next = NULL; + if (pool->jobList.tail == &job->next) + pool->jobList.tail = &pool->jobList.head; + + virMutexUnlock(&pool->mutex); + (pool->jobFunc)(job->data, pool->jobOpaque); + VIR_FREE(job); + virMutexLock(&pool->mutex); + } + +out: + pool->nWorkers--; + if (pool->nWorkers == 0) + virCondSignal(&pool->quit_cond); + virMutexUnlock(&pool->mutex); +} + +virThreadPoolPtr virThreadPoolNew(size_t minWorkers, + size_t maxWorkers, + virThreadPoolJobFunc func, + void *opaque) +{ + virThreadPoolPtr pool; + size_t i; + + if (minWorkers > maxWorkers) + minWorkers = maxWorkers; + + if (VIR_ALLOC(pool) < 0) { + virReportOOMError(); + return NULL; + } + + pool->jobList.head = NULL; + pool->jobList.tail = &pool->jobList.head; + + pool->jobFunc = func; + pool->jobOpaque = opaque; + + if (virMutexInit(&pool->mutex) < 0) + goto error; + if (virCondInit(&pool->cond) < 0) + goto error; + if (virCondInit(&pool->quit_cond) < 0) + goto error; + + if (VIR_ALLOC_N(pool->workers, minWorkers) < 0) + goto error; + + pool->maxWorkers = maxWorkers; + for (i = 0; i < minWorkers; i++) { + if (virThreadCreate(&pool->workers[i], + true, + virThreadPoolWorker, + pool) < 0) { + virThreadPoolFree(pool); + return NULL; + } + pool->nWorkers++; + } + + return pool; + +error: + VIR_FREE(pool->workers); + ignore_value(virCondDestroy(&pool->quit_cond)); + ignore_value(virCondDestroy(&pool->cond)); + virMutexDestroy(&pool->mutex); + return NULL; + +} + +void virThreadPoolFree(virThreadPoolPtr pool) +{ + virThreadPoolJobPtr job; + + if (!pool) + return; + + virMutexLock(&pool->mutex); + pool->quit = true; + if (pool->nWorkers > 0) { + virCondBroadcast(&pool->cond); + ignore_value(virCondWait(&pool->quit_cond, &pool->mutex)); + } + + while ((job = pool->jobList.head)) { + pool->jobList.head = pool->jobList.head->next; + VIR_FREE(job); + } + + VIR_FREE(pool->workers); + virMutexUnlock(&pool->mutex); + virMutexDestroy(&pool->mutex); + ignore_value(virCondDestroy(&pool->quit_cond)); + ignore_value(virCondDestroy(&pool->cond)); + VIR_FREE(pool); +} + +int virThreadPoolSendJob(virThreadPoolPtr pool, + void *jobData) +{ + virThreadPoolJobPtr job; + + virMutexLock(&pool->mutex); + if (pool->quit) + goto error; + + if (pool->freeWorkers == 0 && + pool->nWorkers < pool->maxWorkers) { + if (VIR_EXPAND_N(pool->workers, pool->nWorkers, 1) < 0) { + virReportOOMError(); + goto error; + } + + if (virThreadCreate(&pool->workers[pool->nWorkers - 1], + true, + virThreadPoolWorker, + pool) < 0) { + pool->nWorkers--; + goto error; + } + } + + if (VIR_ALLOC(job) < 0) { + virReportOOMError(); + goto error; + } + + job->data = jobData; + job->next = NULL; + *pool->jobList.tail = job; + pool->jobList.tail = &(*pool->jobList.tail)->next; + + virCondSignal(&pool->cond); + virMutexUnlock(&pool->mutex); + + return 0; + +error: + virMutexUnlock(&pool->mutex); + return -1; +} diff --git a/src/util/threadpool.h b/src/util/threadpool.h new file mode 100644 index 0000000..9ff27ec --- /dev/null +++ b/src/util/threadpool.h @@ -0,0 +1,49 @@ +/* + * threadpool.h: a generic thread pool implementation + * + * Copyright (C) 2010 Hu Tao + * Copyright (C) 2010 Daniel P. Berrange + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * Author: + * Hu Tao <hutao@xxxxxxxxxxxxxx> + * Daniel P. Berrange <berrange@xxxxxxxxxx> + */ + +#ifndef __VIR_THREADPOOL_H__ +#define __VIR_THREADPOOL_H__ + +#include "threads.h" + +typedef struct _virThreadPool virThreadPool; +typedef virThreadPool *virThreadPoolPtr; + +typedef void (*virThreadPoolJobFunc)(void *jobdata, void *opaque); + +virThreadPoolPtr virThreadPoolNew(size_t minWorkers, + size_t maxWorkers, + virThreadPoolJobFunc func, + void *opaque) ATTRIBUTE_NONNULL(3) + ATTRIBUTE_RETURN_CHECK; + +void virThreadPoolFree(virThreadPoolPtr pool); + +int virThreadPoolSendJob(virThreadPoolPtr pool, + void *jobdata) ATTRIBUTE_NONNULL(1) + ATTRIBUTE_NONNULL(2) + ATTRIBUTE_RETURN_CHECK; + +#endif -- 1.7.3 -- Thanks, Hu Tao -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list