[PATCH v3 1/5] Add a threadpool implementation

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



---
 src/Makefile.am       |    1 +
 src/util/threadpool.c |  140 +++++++++++++++++++++++++++++++++++++++++++++++++
 src/util/threadpool.h |   35 ++++++++++++
 3 files changed, 176 insertions(+), 0 deletions(-)
 create mode 100644 src/util/threadpool.c
 create mode 100644 src/util/threadpool.h

diff --git a/src/Makefile.am b/src/Makefile.am
index a9a1986..5febd76 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -76,6 +76,7 @@ UTIL_SOURCES =							\
 		util/uuid.c util/uuid.h				\
 		util/util.c util/util.h				\
 		util/xml.c util/xml.h				\
+		util/threadpool.c util/threadpool.h		\
 		util/virtaudit.c util/virtaudit.h               \
 		util/virterror.c util/virterror_internal.h
 
diff --git a/src/util/threadpool.c b/src/util/threadpool.c
new file mode 100644
index 0000000..4bf0f8d
--- /dev/null
+++ b/src/util/threadpool.c
@@ -0,0 +1,140 @@
+#include <config.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "threadpool.h"
+
+static void *workerHandleJob(void *data)
+{
+    struct virData *localData = NULL;
+    struct virWorkerPool *pool = data;
+
+    pthread_mutex_lock(&pool->mutex);
+
+    while (1) {
+        while (!pool->quit && !pool->dataList) {
+            pool->nFreeWorker++;
+            pthread_cond_signal(&pool->worker_cond);
+            pthread_cond_wait(&pool->cond, &pool->mutex);
+            pool->nFreeWorker--;
+
+            if (pool->nWorker > pool->nMaxWorker)
+                goto out;
+        }
+
+        while ((localData = pool->dataList) != NULL) {
+            pool->dataList = pool->dataList->next;
+            localData->next = NULL;
+
+            pthread_mutex_unlock(&pool->mutex);
+
+            (pool->func)(localData->data);
+            free(localData);
+
+            pthread_mutex_lock(&pool->mutex);
+        }
+
+        if (pool->quit)
+            break;
+    }
+
+out:
+    pool->nWorker--;
+    if (pool->nWorker == 0)
+        pthread_cond_signal(&pool->quit_cond);
+    pthread_mutex_unlock(&pool->mutex);
+
+    return NULL;
+}
+
+struct virWorkerPool *virWorkerPoolNew(int nWorker, int maxWorker, virWorkerFunc func)
+{
+    struct virWorkerPool *pool;
+    pthread_t pid;
+    int i;
+
+    if (nWorker < 0)
+        return NULL;
+
+    if (nWorker > maxWorker)
+        return NULL;
+
+    pool = malloc(sizeof(*pool));
+    if (!pool)
+        return NULL;
+
+    memset(pool, 0, sizeof(*pool));
+    pool->func = func;
+    pthread_mutex_init(&pool->mutex, NULL);
+    pthread_cond_init(&pool->cond, NULL);
+    pthread_cond_init(&pool->worker_cond, NULL);
+    pthread_cond_init(&pool->quit_cond, NULL);
+
+    for (i = 0; i < nWorker; i++) {
+        pthread_create(&pid, NULL, workerHandleJob, pool);
+    }
+
+    pool->nFreeWorker = 0;
+    pool->nWorker = nWorker;
+    pool->nMaxWorker = maxWorker;
+
+    return pool;
+}
+
+void virWorkerPoolFree(struct virWorkerPool *pool)
+{
+    pthread_mutex_lock(&pool->mutex);
+    pool->quit = 1;
+    if (pool->nWorker > 0) {
+        pthread_cond_broadcast(&pool->cond);
+        pthread_cond_wait(&pool->quit_cond, &pool->mutex);
+    }
+    pthread_mutex_unlock(&pool->mutex);
+    free(pool);
+}
+
+int virWorkerPoolSendJob(struct virWorkerPool *pool, void *data)
+{
+    pthread_t pid;
+    struct virData *localData;
+
+    localData = malloc(sizeof(*localData));
+    if (!localData)
+        return -1;
+
+    localData->data = data;
+
+    pthread_mutex_lock(&pool->mutex);
+    if (pool->quit) {
+        pthread_mutex_unlock(&pool->mutex);
+        free(localData);
+        return -1;
+    }
+
+    localData->next = pool->dataList;
+    pool->dataList = localData;
+
+    if (pool->nFreeWorker == 0 && pool->nWorker < pool->nMaxWorker) {
+        pthread_create(&pid, NULL, workerHandleJob, pool);
+        pool->nWorker++;
+    }
+
+    pthread_cond_signal(&pool->cond);
+
+    pthread_mutex_unlock(&pool->mutex);
+
+    return 0;
+}
+
+int virWorkerPoolSetMaxWorker(struct virWorkerPool *pool, int maxWorker)
+{
+    if (maxWorker < 0)
+        return -1;
+
+    pthread_mutex_lock(&pool->mutex);
+    pool->nMaxWorker = maxWorker;
+    pthread_mutex_unlock(&pool->mutex);
+
+    return 0;
+}
diff --git a/src/util/threadpool.h b/src/util/threadpool.h
new file mode 100644
index 0000000..5ff3a6b
--- /dev/null
+++ b/src/util/threadpool.h
@@ -0,0 +1,35 @@
+#ifndef __THREADPOOL_H__
+#define __THREADPOOL_H__
+
+#include <pthread.h>
+
+typedef void (*virWorkerFunc)(void *);
+
+struct virData {
+    struct virData *next;
+
+    void *data;
+};
+
+struct virWorkerPool {
+    int nWorker;
+    int nMaxWorker;
+    int nFreeWorker;
+
+    int quit;
+
+    virWorkerFunc func;
+    struct virData *dataList;
+
+    pthread_mutex_t mutex;
+    pthread_cond_t cond;
+    pthread_cond_t worker_cond;
+    pthread_cond_t quit_cond;
+};
+
+struct virWorkerPool *virWorkerPoolNew(int nWorker, int nMaxWorker, virWorkerFunc func);
+void virWorkerPoolFree(struct virWorkerPool *pool);
+int virWorkerPoolSendJob(struct virWorkerPool *wp, void *data);
+int virWorkerPoolSetMaxWorker(struct virWorkerPool *pool, int maxWorker);
+
+#endif
-- 
1.7.3


-- 
Thanks,
Hu Tao

--
libvir-list mailing list
libvir-list@xxxxxxxxxx
https://www.redhat.com/mailman/listinfo/libvir-list


[Index of Archives]     [Virt Tools]     [Libvirt Users]     [Lib OS Info]     [Fedora Users]     [Fedora Desktop]     [Fedora SELinux]     [Big List of Linux Books]     [Yosemite News]     [KDE Users]     [Fedora Tools]