--- src/Makefile.am | 1 + src/util/threadpool.c | 172 +++++++++++++++++++++++++++++++++++++++++++++++++ src/util/threadpool.h | 62 ++++++++++++++++++ 3 files changed, 235 insertions(+), 0 deletions(-) create mode 100644 src/util/threadpool.c create mode 100644 src/util/threadpool.h diff --git a/src/Makefile.am b/src/Makefile.am index a9a1986..5febd76 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -76,6 +76,7 @@ UTIL_SOURCES = \ util/uuid.c util/uuid.h \ util/util.c util/util.h \ util/xml.c util/xml.h \ + util/threadpool.c util/threadpool.h \ util/virtaudit.c util/virtaudit.h \ util/virterror.c util/virterror_internal.h diff --git a/src/util/threadpool.c b/src/util/threadpool.c new file mode 100644 index 0000000..79f9fbb --- /dev/null +++ b/src/util/threadpool.c @@ -0,0 +1,172 @@ +/* + * threadpool.c: a generic thread pool implementation + * + * Copyright (C) 2010 Hu Tao + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * Author: + * Hu Tao <hutao@xxxxxxxxxxxxxx> + */ + +#include <config.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include "threadpool.h" +#include "memory.h" + +static void workerHandleJob(void *data) +{ + virDataPtr localData = NULL; + virWorkerPoolPtr pool = data; + + virMutexLock(&pool->mutex); + + while (1) { + while (!pool->quit && !pool->dataList) { + pool->nFreeWorker++; + virCondSignal(&pool->worker_cond); + if (virCondWait(&pool->cond, &pool->mutex) < 0) { + pool->nFreeWorker--; + goto out; + } + pool->nFreeWorker--; + + if (pool->nWorker > pool->nMaxWorker) + goto out; + } + + while ((localData = pool->dataList) != NULL) { + pool->dataList = pool->dataList->next; + localData->next = NULL; + + virMutexUnlock(&pool->mutex); + + (pool->func)(localData->data); + VIR_FREE(localData); + + virMutexLock(&pool->mutex); + } + + if (pool->quit) + break; + } + +out: + pool->nWorker--; + if (pool->nWorker == 0) + virCondSignal(&pool->quit_cond); + virMutexUnlock(&pool->mutex); +} + +virWorkerPoolPtr virWorkerPoolNew(int nWorker, int maxWorker, virWorkerFunc func) +{ + virWorkerPoolPtr pool; + virThread thread; + int i; + + if (nWorker < 0) + return NULL; + + if (nWorker > maxWorker) + return NULL; + + if (VIR_ALLOC(pool)) + return NULL; + + memset(pool, 0, sizeof(*pool)); + pool->func = func; + if (virMutexInit(&pool->mutex) < 0) + goto error; + if (virCondInit(&pool->cond) < 0) + goto error; + if (virCondInit(&pool->worker_cond) < 0) + goto error; + if (virCondInit(&pool->quit_cond) < 0) + goto error; + + for (i = 0; i < nWorker; i++) { + if (virThreadCreate(&thread, true, workerHandleJob, pool) < 0) { + pool->nWorker = i; + goto error; + } + } + + pool->nFreeWorker = 0; + pool->nWorker = nWorker; + pool->nMaxWorker = maxWorker; + + return pool; +error: + virWorkerPoolFree(pool); + return NULL; +} + +void virWorkerPoolFree(virWorkerPoolPtr pool) +{ + virMutexLock(&pool->mutex); + pool->quit = true; + if (pool->nWorker > 0) { + virCondBroadcast(&pool->cond); + virCondWait(&pool->quit_cond, &pool->mutex); + } + virMutexUnlock(&pool->mutex); + VIR_FREE(pool); +} + +int virWorkerPoolSendJob(virWorkerPoolPtr pool, void *data) +{ + virThread thread; + virDataPtr localData; + + if (VIR_ALLOC(localData)) + return -1; + + localData->data = data; + + virMutexLock(&pool->mutex); + if (pool->quit) { + virMutexUnlock(&pool->mutex); + VIR_FREE(localData); + return -1; + } + + localData->next = pool->dataList; + pool->dataList = localData; + + if (pool->nFreeWorker == 0 && pool->nWorker < pool->nMaxWorker) { + if (virThreadCreate(&thread, true, workerHandleJob, pool) == 0) + pool->nWorker++; + } + + virCondSignal(&pool->cond); + virMutexUnlock(&pool->mutex); + + return 0; +} + +int virWorkerPoolSetMaxWorker(virWorkerPoolPtr pool, int maxWorker) +{ + if (maxWorker < 0) + return -1; + + virMutexLock(&pool->mutex); + pool->nMaxWorker = maxWorker; + virMutexUnlock(&pool->mutex); + + return 0; +} diff --git a/src/util/threadpool.h b/src/util/threadpool.h new file mode 100644 index 0000000..f8039e6 --- /dev/null +++ b/src/util/threadpool.h @@ -0,0 +1,62 @@ +/* + * threadpool.h: a generic thread pool implementation + * + * Copyright (C) 2010 Hu Tao + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * Author: + * Hu Tao <hutao@xxxxxxxxxxxxxx> + */ + +#ifndef __VIR_THREADPOOL_H__ +#define __VIR_THREADPOOL_H__ + +#include "threads.h" + +typedef void (*virWorkerFunc)(void *); + +struct _virData { + struct _virData *next; + + void *data; +}; +typedef struct _virData virData; +typedef virData *virDataPtr; + +struct _virWorkerPool { + size_t nWorker; + size_t nMaxWorker; + size_t nFreeWorker; + + bool quit; + + virWorkerFunc func; + virDataPtr dataList; + + virMutex mutex; + virCond cond; + virCond worker_cond; + virCond quit_cond; +}; +typedef struct _virWorkerPool virWorkerPool; +typedef virWorkerPool *virWorkerPoolPtr; + +virWorkerPoolPtr virWorkerPoolNew(int nWorker, int nMaxWorker, virWorkerFunc func) ATTRIBUTE_RETURN_CHECK; +void virWorkerPoolFree(virWorkerPoolPtr pool); +int virWorkerPoolSendJob(virWorkerPoolPtr pool, void *data) ATTRIBUTE_NONNULL(1); +int virWorkerPoolSetMaxWorker(virWorkerPoolPtr pool, int maxWorker) ATTRIBUTE_NONNULL(1); + +#endif -- 1.7.3 -- Thanks, Hu Tao -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list