[dbus PATCH 18/18] main: introduce threads to process the dbus messages

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



This implements very simple thread pool to process dbus messages in
separate threads.  We don't need to handle queue for messages because
dbus does that for us.

The default thread count will be currently 4 and it is also
configurable via --threads parameter for the libvirt-dbus daemon.

Signed-off-by: Pavel Hrdina <phrdina@xxxxxxxxxx>
---
 src/Makefile.am |   1 +
 src/connect.c   |  14 +++++++
 src/connect.h   |   3 ++
 src/main.c      | 118 +++++++++++++++++++++++++++++++++++++++++++++++++-------
 src/util.c      |  28 ++++++++++++++
 src/util.h      |   9 +++++
 6 files changed, 159 insertions(+), 14 deletions(-)

diff --git a/src/Makefile.am b/src/Makefile.am
index 9e23f1b..73bbfd9 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -32,6 +32,7 @@ libvirt_dbus_LDFLAGS = \
 	$(DBUS_LDFLAGS) \
 	$(RELRO_LDFLAGS) \
 	$(PID_LDFLAGS) \
+	-lpthread \
 	$(NULL)
 
 libvirt_dbus_LDADD = \
diff --git a/src/connect.c b/src/connect.c
index 2fe305f..41aba5f 100644
--- a/src/connect.c
+++ b/src/connect.c
@@ -56,10 +56,21 @@ virtDBusConnectClose(virtDBusConnect *connect,
     connect->connection = NULL;
 }
 
+static void
+virtDBusConnectUnlock(pthread_mutex_t **lock)
+{
+    if (lock)
+        pthread_mutex_unlock(*lock);
+}
+
 int
 virtDBusConnectOpen(virtDBusConnect *connect,
                     virtDBusMessage *msg)
 {
+    _cleanup_(virtDBusConnectUnlock) pthread_mutex_t *lock = &connect->lock;
+
+    pthread_mutex_lock(lock);
+
     if (connect->connection) {
         if (virConnectIsAlive(connect->connection))
             return 0;
@@ -212,6 +223,9 @@ virtDBusConnectNew(virtDBusConnect **connectp,
     connect->uri = uri;
     connect->connectPath = connectPath;
 
+    if (virtDBusUtilMutexInit(&connect->lock) != 0)
+        return -1;
+
     if (virtDBusObjectListRegister(objectList,
                                    connect->connectPath,
                                    &introspectXML,
diff --git a/src/connect.h b/src/connect.h
index e685b41..c6026c9 100644
--- a/src/connect.h
+++ b/src/connect.h
@@ -4,10 +4,13 @@
 
 #include <dbus/dbus.h>
 #include <libvirt/libvirt.h>
+#include <pthread.h>
 
 #define VIRT_DBUS_CONNECT_INTERFACE "org.libvirt.Connect"
 
 struct _virtDBusConnect {
+    pthread_mutex_t lock;
+
     DBusConnection *bus;
     const char *uri;
     const char *connectPath;
diff --git a/src/main.c b/src/main.c
index bef5dcc..1808620 100644
--- a/src/main.c
+++ b/src/main.c
@@ -8,6 +8,7 @@
 #include <errno.h>
 #include <getopt.h>
 #include <poll.h>
+#include <pthread.h>
 #include <signal.h>
 #include <stdbool.h>
 #include <stdio.h>
@@ -16,21 +17,98 @@
 #include <sys/signalfd.h>
 #include <unistd.h>
 
+#define VIRT_DBUS_THREADS 4
+
 static int loop_status = 0;
+static pthread_mutex_t loopStatusLock = PTHREAD_MUTEX_INITIALIZER;
+
+static int
+virtDBusLoopStatusGet(void)
+{
+    int ret;
+    pthread_mutex_lock(&loopStatusLock);
+    ret = loop_status;
+    pthread_mutex_unlock(&loopStatusLock);
+    return ret;
+}
+
+static void
+virtDBusLoopStatusSet(int val)
+{
+    pthread_mutex_lock(&loopStatusLock);
+    loop_status = val;
+    pthread_mutex_unlock(&loopStatusLock);
+}
+
+struct _virtDBusDispatchData {
+    DBusConnection *bus;
+    virtDBusObjectList *objectList;
+};
+
+static pthread_cond_t threadLoopCond;
+static pthread_mutex_t threadLoopLock;
 
 static int
 virtDBusProcessEvents(DBusConnection *bus,
                       virtDBusObjectList *objectList)
 {
     for (;;) {
-            int r;
+        int r;
 
-            r = virtDBusDispatchMessage(bus, objectList);
-            if (r < 0)
-                    return r;
+        r = virtDBusDispatchMessage(bus, objectList);
+        if (r < 0)
+            return r;
 
-            if (r == 0)
-                    break;
+        if (r == 0)
+            break;
+    }
+
+    return 0;
+}
+
+static void *
+virtDBusDispatchThread(void *opaque)
+{
+    struct _virtDBusDispatchData *data = opaque;
+
+    while(true) {
+        if (pthread_cond_wait(&threadLoopCond, &threadLoopLock) != 0) {
+            virtDBusLoopStatusSet(errno);
+            return NULL;
+        }
+        if (virtDBusLoopStatusGet() != 0)
+            return NULL;
+
+        if (virtDBusProcessEvents(data->bus, data->objectList) < 0) {
+            virtDBusLoopStatusSet(-ENOMEM);
+            return NULL;
+        }
+    }
+
+    return NULL;
+}
+
+static void
+virtDBusDispatch(void)
+{
+    pthread_cond_broadcast(&threadLoopCond);
+}
+
+static int
+virtDBusStartThreads(struct _virtDBusDispatchData *data,
+                     int threads)
+{
+    if (pthread_cond_init(&threadLoopCond, NULL) != 0)
+        return -1;
+
+    if (virtDBusUtilMutexInit(&threadLoopLock) != 0)
+        return -1;
+
+    for (int i = 0; i < threads; i++) {
+        pthread_t thread;
+
+        if (pthread_create(&thread, NULL, virtDBusDispatchThread, data) != 0)
+            return -1;
     }
 
     return 0;
@@ -49,7 +127,7 @@ virtDBusHandleSignal(int watch VIRT_ATTR_UNUSED,
                      int events VIRT_ATTR_UNUSED,
                      void *opaque VIRT_ATTR_UNUSED)
 {
-    loop_status = -ECANCELED;
+    virtDBusLoopStatusSet(-ECANCELED);
 }
 
 struct virtDBusDriver {
@@ -91,6 +169,7 @@ main(int argc, char *argv[])
         { "help",    no_argument,       NULL, 'h' },
         { "system",  no_argument,       NULL, ARG_SYSTEM },
         { "session", no_argument,       NULL, ARG_SESSION },
+        { "threads", required_argument, NULL, 't' },
         {}
     };
 
@@ -106,6 +185,7 @@ main(int argc, char *argv[])
     sigset_t mask;
     int c;
     int r;
+    int threads = VIRT_DBUS_THREADS;
 
     if (geteuid() == 0) {
         busType = DBUS_BUS_SYSTEM;
@@ -113,7 +193,7 @@ main(int argc, char *argv[])
         busType = DBUS_BUS_SESSION;
     }
 
-    while ((c = getopt_long(argc, argv, "hc:", options, NULL)) >= 0) {
+    while ((c = getopt_long(argc, argv, "ht:", options, NULL)) >= 0) {
         switch (c) {
             case 'h':
                 printf("Usage: %s [OPTIONS]\n", program_invocation_short_name);
@@ -123,8 +203,16 @@ main(int argc, char *argv[])
                 printf("  -h, --help        Display this help text and exit\n");
                 printf("  --session         Connect to the session bus\n");
                 printf("  --system          Connect to the system bus\n");
+                printf("  -t, --threads     Configure count of worker threads\n");
                 return 0;
 
+            case 't':
+                if (virtDBusUtilStrToInt(optarg, 10, &threads) < 0) {
+                    fprintf(stderr, "Failed to parse --threads.\n");
+                    return EXIT_FAILURE;
+                }
+                break;
+
             case ARG_SYSTEM:
                 busType = DBUS_BUS_SYSTEM;
                 break;
@@ -179,11 +267,13 @@ main(int argc, char *argv[])
         return EXIT_FAILURE;
     }
 
-    r = virtDBusProcessEvents(bus, &objectList);
-    if (r < 0)
-        return EXIT_FAILURE;
+    struct _virtDBusDispatchData data = { bus, &objectList };
+
+    virtDBusStartThreads(&data, threads);
+
+    virtDBusDispatch();
 
-    while (loop_status >= 0) {
+    while ((r = virtDBusLoopStatusGet()) >= 0) {
         virEventRunDefaultImpl();
 
         r = virtDBusProcessEvents(bus, &objectList);
@@ -191,8 +281,8 @@ main(int argc, char *argv[])
             return EXIT_FAILURE;
     }
 
-    if (loop_status < 0 && loop_status != -ECANCELED) {
-        fprintf(stderr, "Error: %s\n", strerror(-loop_status));
+    if (r < 0 && r != -ECANCELED) {
+        fprintf(stderr, "Error: %s\n", strerror(-r));
         return EXIT_FAILURE;
     }
 
diff --git a/src/util.c b/src/util.c
index 6f8b7be..fe9e023 100644
--- a/src/util.c
+++ b/src/util.c
@@ -2,6 +2,7 @@
 
 #include "util.h"
 
+#include <errno.h>
 #include <fcntl.h>
 #include <libvirt/virterror.h>
 #include <stdio.h>
@@ -90,6 +91,33 @@ virtDBusUtilSetLastVirtError(virtDBusMessage *msg)
                                    virError->message);
 }
 
+int
+virtDBusUtilMutexInit(pthread_mutex_t *mutex)
+{
+    _cleanup_(pthread_mutexattr_destroy) pthread_mutexattr_t attr;
+
+    pthread_mutexattr_init(&attr);
+    pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL);
+    return pthread_mutex_init(mutex, &attr);
+}
+
+int
+virtDBusUtilStrToInt(char const *string,
+                     int base,
+                     int *result)
+{
+    long int val;
+    char *ptr;
+
+    errno = 0;
+    val = strtol(string, &ptr, base);
+    if (errno != 0 || *ptr != 0 || ptr == string || (int) val != val)
+        return -1;
+
+    *result = val;
+    return 0;
+}
+
 char *
 virtDBusUtilReadFile(const char *filename)
 {
diff --git a/src/util.h b/src/util.h
index 16c54df..afb118c 100644
--- a/src/util.h
+++ b/src/util.h
@@ -3,6 +3,7 @@
 #include "dbus.h"
 
 #include <libvirt/libvirt.h>
+#include <pthread.h>
 
 #define VIRT_DBUS_ERROR_INTERFACE "org.libvirt.Error"
 
@@ -26,6 +27,14 @@ virtDBusUtilMessageAppendTypedParameters(virtDBusMessage *msg,
 int
 virtDBusUtilSetLastVirtError(virtDBusMessage *msg);
 
+int
+virtDBusUtilMutexInit(pthread_mutex_t *mutex);
+
+int
+virtDBusUtilStrToInt(char const *string,
+                     int base,
+                     int *result);
+
 char *
 virtDBusUtilReadFile(const char *filename);
 
-- 
2.14.3

--
libvir-list mailing list
libvir-list@xxxxxxxxxx
https://www.redhat.com/mailman/listinfo/libvir-list



[Index of Archives]     [Virt Tools]     [Libvirt Users]     [Lib OS Info]     [Fedora Users]     [Fedora Desktop]     [Fedora SELinux]     [Big List of Linux Books]     [Yosemite News]     [KDE Users]     [Fedora Tools]

  Powered by Linux