[PATCH v4 2/7] Add a threadpool implementation

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



---
 src/Makefile.am       |    1 +
 src/util/threadpool.c |  172 +++++++++++++++++++++++++++++++++++++++++++++++++
 src/util/threadpool.h |   62 ++++++++++++++++++
 3 files changed, 235 insertions(+), 0 deletions(-)
 create mode 100644 src/util/threadpool.c
 create mode 100644 src/util/threadpool.h

diff --git a/src/Makefile.am b/src/Makefile.am
index a9a1986..5febd76 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -76,6 +76,7 @@ UTIL_SOURCES =							\
 		util/uuid.c util/uuid.h				\
 		util/util.c util/util.h				\
 		util/xml.c util/xml.h				\
+		util/threadpool.c util/threadpool.h		\
 		util/virtaudit.c util/virtaudit.h               \
 		util/virterror.c util/virterror_internal.h
 
diff --git a/src/util/threadpool.c b/src/util/threadpool.c
new file mode 100644
index 0000000..79f9fbb
--- /dev/null
+++ b/src/util/threadpool.c
@@ -0,0 +1,172 @@
+/*
+ * threadpool.c: a generic thread pool implementation
+ *
+ * Copyright (C) 2010 Hu Tao
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
+ *
+ * Author:
+ *     Hu Tao <hutao@xxxxxxxxxxxxxx>
+ */
+
+#include <config.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "threadpool.h"
+#include "memory.h"
+
+static void workerHandleJob(void *data)
+{
+    virDataPtr localData = NULL;
+    virWorkerPoolPtr pool = data;
+
+    virMutexLock(&pool->mutex);
+
+    while (1) {
+        while (!pool->quit && !pool->dataList) {
+            pool->nFreeWorker++;
+            virCondSignal(&pool->worker_cond);
+            if (virCondWait(&pool->cond, &pool->mutex) < 0) {
+                pool->nFreeWorker--;
+                goto out;
+            }
+            pool->nFreeWorker--;
+
+            if (pool->nWorker > pool->nMaxWorker)
+                goto out;
+        }
+
+        while ((localData = pool->dataList) != NULL) {
+            pool->dataList = pool->dataList->next;
+            localData->next = NULL;
+
+            virMutexUnlock(&pool->mutex);
+
+            (pool->func)(localData->data);
+            VIR_FREE(localData);
+
+            virMutexLock(&pool->mutex);
+        }
+
+        if (pool->quit)
+            break;
+    }
+
+out:
+    pool->nWorker--;
+    if (pool->nWorker == 0)
+        virCondSignal(&pool->quit_cond);
+    virMutexUnlock(&pool->mutex);
+}
+
+virWorkerPoolPtr virWorkerPoolNew(int nWorker, int maxWorker, virWorkerFunc func)
+{
+    virWorkerPoolPtr pool;
+    virThread thread;
+    int i;
+
+    if (nWorker < 0)
+        return NULL;
+
+    if (nWorker > maxWorker)
+        return NULL;
+
+    if (VIR_ALLOC(pool))
+        return NULL;
+
+    memset(pool, 0, sizeof(*pool));
+    pool->func = func;
+    if (virMutexInit(&pool->mutex) < 0)
+        goto error;
+    if (virCondInit(&pool->cond) < 0)
+        goto error;
+    if (virCondInit(&pool->worker_cond) < 0)
+        goto error;
+    if (virCondInit(&pool->quit_cond) < 0)
+        goto error;
+
+    for (i = 0; i < nWorker; i++) {
+        if (virThreadCreate(&thread, true, workerHandleJob, pool) < 0) {
+            pool->nWorker = i;
+            goto error;
+        }
+    }
+
+    pool->nFreeWorker = 0;
+    pool->nWorker = nWorker;
+    pool->nMaxWorker = maxWorker;
+
+    return pool;
+error:
+    virWorkerPoolFree(pool);
+    return NULL;
+}
+
+void virWorkerPoolFree(virWorkerPoolPtr pool)
+{
+    virMutexLock(&pool->mutex);
+    pool->quit = true;
+    if (pool->nWorker > 0) {
+        virCondBroadcast(&pool->cond);
+        virCondWait(&pool->quit_cond, &pool->mutex);
+    }
+    virMutexUnlock(&pool->mutex);
+    VIR_FREE(pool);
+}
+
+int virWorkerPoolSendJob(virWorkerPoolPtr pool, void *data)
+{
+    virThread thread;
+    virDataPtr localData;
+
+    if (VIR_ALLOC(localData))
+        return -1;
+
+    localData->data = data;
+
+    virMutexLock(&pool->mutex);
+    if (pool->quit) {
+        virMutexUnlock(&pool->mutex);
+        VIR_FREE(localData);
+        return -1;
+    }
+
+    localData->next = pool->dataList;
+    pool->dataList = localData;
+
+    if (pool->nFreeWorker == 0 && pool->nWorker < pool->nMaxWorker) {
+        if (virThreadCreate(&thread, true, workerHandleJob, pool) == 0)
+            pool->nWorker++;
+    }
+
+    virCondSignal(&pool->cond);
+    virMutexUnlock(&pool->mutex);
+
+    return 0;
+}
+
+int virWorkerPoolSetMaxWorker(virWorkerPoolPtr pool, int maxWorker)
+{
+    if (maxWorker < 0)
+        return -1;
+
+    virMutexLock(&pool->mutex);
+    pool->nMaxWorker = maxWorker;
+    virMutexUnlock(&pool->mutex);
+
+    return 0;
+}
diff --git a/src/util/threadpool.h b/src/util/threadpool.h
new file mode 100644
index 0000000..f8039e6
--- /dev/null
+++ b/src/util/threadpool.h
@@ -0,0 +1,62 @@
+/*
+ * threadpool.h: a generic thread pool implementation
+ *
+ * Copyright (C) 2010 Hu Tao
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
+ *
+ * Author:
+ *     Hu Tao <hutao@xxxxxxxxxxxxxx>
+ */
+
+#ifndef __VIR_THREADPOOL_H__
+#define __VIR_THREADPOOL_H__
+
+#include "threads.h"
+
+typedef void (*virWorkerFunc)(void *);
+
+struct _virData {
+    struct _virData *next;
+
+    void *data;
+};
+typedef struct _virData virData;
+typedef virData *virDataPtr;
+
+struct _virWorkerPool {
+    size_t nWorker;
+    size_t nMaxWorker;
+    size_t nFreeWorker;
+
+    bool quit;
+
+    virWorkerFunc func;
+    virDataPtr dataList;
+
+    virMutex mutex;
+    virCond cond;
+    virCond worker_cond;
+    virCond quit_cond;
+};
+typedef struct _virWorkerPool virWorkerPool;
+typedef virWorkerPool *virWorkerPoolPtr;
+
+virWorkerPoolPtr virWorkerPoolNew(int nWorker, int nMaxWorker, virWorkerFunc func) ATTRIBUTE_RETURN_CHECK;
+void virWorkerPoolFree(virWorkerPoolPtr pool);
+int virWorkerPoolSendJob(virWorkerPoolPtr pool, void *data) ATTRIBUTE_NONNULL(1);
+int virWorkerPoolSetMaxWorker(virWorkerPoolPtr pool, int maxWorker) ATTRIBUTE_NONNULL(1);
+
+#endif
-- 
1.7.3


-- 
Thanks,
Hu Tao

--
libvir-list mailing list
libvir-list@xxxxxxxxxx
https://www.redhat.com/mailman/listinfo/libvir-list


[Index of Archives]     [Virt Tools]     [Libvirt Users]     [Lib OS Info]     [Fedora Users]     [Fedora Desktop]     [Fedora SELinux]     [Big List of Linux Books]     [Yosemite News]     [KDE Users]     [Fedora Tools]