[PATCH v6 1/4] threadpool impl

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



* src/util/threadpool.c, src/util/threadpool.h: Thread pool
  implementation
* src/Makefile.am: Build thread pool
* src/libvirt_private.syms: Export public functions
---
 cfg.mk                   |    1 +
 src/Makefile.am          |    1 +
 src/libvirt_private.syms |    6 +
 src/util/threadpool.c    |  231 ++++++++++++++++++++++++++++++++++++++++++++++
 src/util/threadpool.h    |   48 ++++++++++
 5 files changed, 287 insertions(+), 0 deletions(-)
 create mode 100644 src/util/threadpool.c
 create mode 100644 src/util/threadpool.h

diff --git a/cfg.mk b/cfg.mk
index 29de9d9..bda8c57 100644
--- a/cfg.mk
+++ b/cfg.mk
@@ -128,6 +128,7 @@ useless_free_options =				\
   --name=virStoragePoolObjFree			\
   --name=virStoragePoolSourceFree		\
   --name=virStorageVolDefFree			\
+  --name=virThreadPoolFree			\
   --name=xmlFree				\
   --name=xmlXPathFreeContext			\
   --name=xmlXPathFreeObject
diff --git a/src/Makefile.am b/src/Makefile.am
index 0923d60..196d8af 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -74,6 +74,7 @@ UTIL_SOURCES =							\
 		util/threads.c util/threads.h			\
 		util/threads-pthread.h				\
 		util/threads-win32.h				\
+		util/threadpool.c util/threadpool.h		\
 		util/uuid.c util/uuid.h				\
 		util/util.c util/util.h				\
 		util/xml.c util/xml.h				\
diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms
index e2def6c..a0a598b 100644
--- a/src/libvirt_private.syms
+++ b/src/libvirt_private.syms
@@ -898,3 +898,9 @@ virXPathStringLimit;
 virXPathULong;
 virXPathULongHex;
 virXPathULongLong;
+
+
+# threadpool.h
+virThreadPoolNew;
+virThreadPoolFree;
+virThreadPoolSendJob;
diff --git a/src/util/threadpool.c b/src/util/threadpool.c
new file mode 100644
index 0000000..8217591
--- /dev/null
+++ b/src/util/threadpool.c
@@ -0,0 +1,231 @@
+/*
+ * threadpool.c: a generic thread pool implementation
+ *
+ * Copyright (C) 2010 Hu Tao
+ * Copyright (C) 2010 Daniel P. Berrange
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
+ *
+ * Authors:
+ *     Hu Tao <hutao@xxxxxxxxxxxxxx>
+ *     Daniel P. Berrange <berrange@xxxxxxxxxx>
+ */
+
+#include <config.h>
+
+#include "threadpool.h"
+#include "memory.h"
+#include "threads.h"
+#include "virterror_internal.h"
+#include "ignore-value.h"
+
+#define VIR_FROM_THIS VIR_FROM_NONE
+
+typedef struct _virThreadPoolJob virThreadPoolJob;
+typedef virThreadPoolJob *virThreadPoolJobPtr;
+
+struct _virThreadPoolJob {
+    virThreadPoolJobPtr next;
+
+    void *data;
+};
+
+typedef struct _virThreadPoolJobList virThreadPoolJobList;
+typedef virThreadPoolJobList *virThreadPoolJobListPtr;
+
+struct _virThreadPoolJobList {
+    virThreadPoolJobPtr head;
+    virThreadPoolJobPtr *tail;
+};
+
+
+struct _virThreadPool {
+    bool quit;
+
+    virThreadPoolJobFunc jobFunc;
+    void *jobOpaque;
+    virThreadPoolJobList jobList;
+
+    virMutex mutex;
+    virCond cond;
+    virCond quit_cond;
+
+    size_t maxWorkers;
+    size_t freeWorkers;
+    size_t nWorkers;
+    virThreadPtr workers;
+};
+
+static void virThreadPoolWorker(void *opaque)
+{
+    virThreadPoolPtr pool = opaque;
+
+    virMutexLock(&pool->mutex);
+
+    while (1) {
+        while (!pool->quit &&
+               !pool->jobList.head) {
+            pool->freeWorkers++;
+            if (virCondWait(&pool->cond, &pool->mutex) < 0) {
+                pool->freeWorkers--;
+                goto out;
+            }
+            pool->freeWorkers--;
+        }
+
+        if (pool->quit)
+            break;
+
+        virThreadPoolJobPtr job = pool->jobList.head;
+        pool->jobList.head = pool->jobList.head->next;
+        job->next = NULL;
+        if (pool->jobList.tail == &job->next)
+            pool->jobList.tail = &pool->jobList.head;
+
+        virMutexUnlock(&pool->mutex);
+        (pool->jobFunc)(job->data, pool->jobOpaque);
+        VIR_FREE(job);
+        virMutexLock(&pool->mutex);
+    }
+
+out:
+    pool->nWorkers--;
+    if (pool->nWorkers == 0)
+        virCondSignal(&pool->quit_cond);
+    virMutexUnlock(&pool->mutex);
+}
+
+virThreadPoolPtr virThreadPoolNew(size_t minWorkers,
+                                  size_t maxWorkers,
+                                  virThreadPoolJobFunc func,
+                                  void *opaque)
+{
+    virThreadPoolPtr pool;
+    size_t i;
+
+    if (minWorkers > maxWorkers)
+        minWorkers = maxWorkers;
+
+    if (VIR_ALLOC(pool) < 0) {
+        virReportOOMError();
+        return NULL;
+    }
+
+    pool->jobList.head = NULL;
+    pool->jobList.tail = &pool->jobList.head;
+
+    pool->jobFunc = func;
+    pool->jobOpaque = opaque;
+
+    if (virMutexInit(&pool->mutex) < 0)
+        goto error;
+    if (virCondInit(&pool->cond) < 0)
+        goto error;
+    if (virCondInit(&pool->quit_cond) < 0)
+        goto error;
+
+    if (VIR_ALLOC_N(pool->workers, minWorkers) < 0)
+        goto error;
+
+    pool->maxWorkers = maxWorkers;
+    for (i = 0; i < minWorkers; i++) {
+        if (virThreadCreate(&pool->workers[i],
+                            true,
+                            virThreadPoolWorker,
+                            pool) < 0) {
+            goto error;
+        }
+        pool->nWorkers++;
+    }
+
+    return pool;
+
+error:
+    virThreadPoolFree(pool);
+    return NULL;
+
+}
+
+void virThreadPoolFree(virThreadPoolPtr pool)
+{
+    virThreadPoolJobPtr job;
+
+    if (!pool)
+        return;
+
+    virMutexLock(&pool->mutex);
+    pool->quit = true;
+    if (pool->nWorkers > 0) {
+        virCondBroadcast(&pool->cond);
+        ignore_value(virCondWait(&pool->quit_cond, &pool->mutex));
+    }
+
+    while ((job = pool->jobList.head)) {
+        pool->jobList.head = pool->jobList.head->next;
+        VIR_FREE(job);
+    }
+
+    VIR_FREE(pool->workers);
+    virMutexUnlock(&pool->mutex);
+    virMutexDestroy(&pool->mutex);
+    ignore_value(virCondDestroy(&pool->quit_cond));
+    ignore_value(virCondDestroy(&pool->cond));
+    VIR_FREE(pool);
+}
+
+int virThreadPoolSendJob(virThreadPoolPtr pool,
+                         void *jobData)
+{
+    virThreadPoolJobPtr job;
+
+    virMutexLock(&pool->mutex);
+    if (pool->quit)
+        goto error;
+
+    if (pool->freeWorkers == 0 &&
+        pool->nWorkers < pool->maxWorkers) {
+        if (VIR_EXPAND_N(pool->workers, pool->nWorkers, 1) < 0) {
+            virReportOOMError();
+            goto error;
+        }
+
+        if (virThreadCreate(&pool->workers[pool->nWorkers - 1],
+                            true,
+                            virThreadPoolWorker,
+                            pool) < 0) {
+            pool->nWorkers--;
+            goto error;
+        }
+    }
+
+    if (VIR_ALLOC(job) < 0) {
+        virReportOOMError();
+        goto error;
+    }
+
+    job->data = jobData;
+    job->next = NULL;
+    *pool->jobList.tail = job;
+    pool->jobList.tail = &(*pool->jobList.tail)->next;
+
+    virCondSignal(&pool->cond);
+    virMutexUnlock(&pool->mutex);
+
+    return 0;
+
+error:
+    virMutexUnlock(&pool->mutex);
+    return -1;
+}
diff --git a/src/util/threadpool.h b/src/util/threadpool.h
new file mode 100644
index 0000000..adab68f
--- /dev/null
+++ b/src/util/threadpool.h
@@ -0,0 +1,48 @@
+/*
+ * threadpool.h: a generic thread pool implementation
+ *
+ * Copyright (C) 2010 Hu Tao
+ * Copyright (C) 2010 Daniel P. Berrange
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
+ *
+ * Author:
+ *     Hu Tao <hutao@xxxxxxxxxxxxxx>
+ *     Daniel P. Berrange <berrange@xxxxxxxxxx>
+ */
+
+#ifndef __VIR_THREADPOOL_H__
+#define __VIR_THREADPOOL_H__
+
+#include "internal.h"
+
+typedef struct _virThreadPool virThreadPool;
+typedef virThreadPool *virThreadPoolPtr;
+
+typedef void (*virThreadPoolJobFunc)(void *jobdata, void *opaque);
+
+virThreadPoolPtr virThreadPoolNew(size_t minWorkers,
+                                  size_t maxWorkers,
+                                  virThreadPoolJobFunc func,
+                                  void *opaque) ATTRIBUTE_NONNULL(3);
+
+void virThreadPoolFree(virThreadPoolPtr pool);
+
+int virThreadPoolSendJob(virThreadPoolPtr pool,
+                         void *jobdata) ATTRIBUTE_NONNULL(1)
+                                        ATTRIBUTE_NONNULL(2)
+                                        ATTRIBUTE_RETURN_CHECK;
+
+#endif
-- 
1.7.3


-- 
Thanks,
Hu Tao

--
libvir-list mailing list
libvir-list@xxxxxxxxxx
https://www.redhat.com/mailman/listinfo/libvir-list


[Index of Archives]     [Virt Tools]     [Libvirt Users]     [Lib OS Info]     [Fedora Users]     [Fedora Desktop]     [Fedora SELinux]     [Big List of Linux Books]     [Yosemite News]     [KDE Users]     [Fedora Tools]