[PATCH v4 6/7] Using threadpool API to manage qemud worker

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



---
 daemon/libvirtd.c |  168 ++++++++---------------------------------------------
 daemon/libvirtd.h |    4 +
 2 files changed, 29 insertions(+), 143 deletions(-)

diff --git a/daemon/libvirtd.c b/daemon/libvirtd.c
index caf51bf..f4987a3 100644
--- a/daemon/libvirtd.c
+++ b/daemon/libvirtd.c
@@ -67,6 +67,7 @@
 #include "stream.h"
 #include "hooks.h"
 #include "virtaudit.h"
+#include "threadpool.h"
 #ifdef HAVE_AVAHI
 # include "mdns.h"
 #endif
@@ -248,7 +249,6 @@ static void sig_handler(int sig, siginfo_t * siginfo,
 
 static void qemudDispatchClientEvent(int watch, int fd, int events, void *opaque);
 static void qemudDispatchServerEvent(int watch, int fd, int events, void *opaque);
-static int qemudStartWorker(struct qemud_server *server, struct qemud_worker *worker);
 
 void
 qemudClientMessageQueuePush(struct qemud_client_message **queue,
@@ -1383,6 +1383,7 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket
     client->auth = sock->auth;
     client->addr = addr;
     client->addrstr = addrstr;
+    client->server = server;
     addrstr = NULL;
 
     for (i = 0 ; i < VIR_DOMAIN_EVENT_ID_LAST ; i++) {
@@ -1458,19 +1459,6 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket
 
     server->clients[server->nclients++] = client;
 
-    if (server->nclients > server->nactiveworkers &&
-        server->nactiveworkers < server->nworkers) {
-        for (i = 0 ; i < server->nworkers ; i++) {
-            if (!server->workers[i].hasThread) {
-                if (qemudStartWorker(server, &server->workers[i]) < 0)
-                    return -1;
-                server->nactiveworkers++;
-                break;
-            }
-        }
-    }
-
-
     return 0;
 
 error:
@@ -1534,100 +1522,27 @@ void qemudDispatchClientFailure(struct qemud_client *client) {
     VIR_FREE(client->addrstr);
 }
 
-
-/* Caller must hold server lock */
-static struct qemud_client *qemudPendingJob(struct qemud_server *server)
+static void qemudWorker(void *data)
 {
-    int i;
-    for (i = 0 ; i < server->nclients ; i++) {
-        virMutexLock(&server->clients[i]->lock);
-        if (server->clients[i]->dx) {
-            /* Delibrately don't unlock client - caller wants the lock */
-            return server->clients[i];
-        }
-        virMutexUnlock(&server->clients[i]->lock);
-    }
-    return NULL;
-}
+    struct qemud_client *client = data;
+    struct qemud_client_message *msg;
 
-static void *qemudWorker(void *data)
-{
-    struct qemud_worker *worker = data;
-    struct qemud_server *server = worker->server;
+    virMutexLock(&client->lock);
 
-    while (1) {
-        struct qemud_client *client = NULL;
-        struct qemud_client_message *msg;
+    /* Remove our message from dispatch queue while we use it */
+    msg = qemudClientMessageQueueServe(&client->dx);
 
-        virMutexLock(&server->lock);
-        while ((client = qemudPendingJob(server)) == NULL) {
-            if (worker->quitRequest ||
-                virCondWait(&server->job, &server->lock) < 0) {
-                virMutexUnlock(&server->lock);
-                return NULL;
-            }
-        }
-        if (worker->quitRequest) {
-            virMutexUnlock(&client->lock);
-            virMutexUnlock(&server->lock);
-            return NULL;
-        }
-        worker->processingCall = 1;
-        virMutexUnlock(&server->lock);
-
-        /* We own a locked client now... */
-        client->refs++;
-
-        /* Remove our message from dispatch queue while we use it */
-        msg = qemudClientMessageQueueServe(&client->dx);
-
-        /* This function drops the lock during dispatch,
-         * and re-acquires it before returning */
-        if (remoteDispatchClientRequest (server, client, msg) < 0) {
-            VIR_FREE(msg);
-            qemudDispatchClientFailure(client);
-            client->refs--;
-            virMutexUnlock(&client->lock);
-            continue;
-        }
-
-        client->refs--;
-        virMutexUnlock(&client->lock);
-
-        virMutexLock(&server->lock);
-        worker->processingCall = 0;
-        virMutexUnlock(&server->lock);
-    }
-}
-
-static int qemudStartWorker(struct qemud_server *server,
-                            struct qemud_worker *worker) {
-    pthread_attr_t attr;
-    pthread_attr_init(&attr);
-    /* We want to join workers, so don't detach them */
-    /*pthread_attr_setdetachstate(&attr, 1);*/
-
-    if (worker->hasThread)
-        return -1;
-
-    worker->server = server;
-    worker->hasThread = 1;
-    worker->quitRequest = 0;
-    worker->processingCall = 0;
-
-    if (pthread_create(&worker->thread,
-                       &attr,
-                       qemudWorker,
-                       worker) != 0) {
-        worker->hasThread = 0;
-        worker->server = NULL;
-        return -1;
+    /* This function drops the lock during dispatch,
+     * and re-acquires it before returning */
+    if (remoteDispatchClientRequest (client->server, client, msg) < 0) {
+        VIR_FREE(msg);
+        qemudDispatchClientFailure(client);
     }
 
-    return 0;
+    client->refs--;
+    virMutexUnlock(&client->lock);
 }
 
-
 /*
  * Read data into buffer using wire decoding (plain or TLS)
  *
@@ -1857,8 +1772,11 @@ readmore:
         }
 
         /* Move completed message to the end of the dispatch queue */
-        if (msg)
+        if (msg) {
+            client->refs++;
             qemudClientMessageQueuePush(&client->dx, msg);
+            virWorkerPoolSendJob(server->workerPool, client);
+        }
         client->nrequests++;
 
         /* Possibly need to create another receive buffer */
@@ -1870,9 +1788,6 @@ readmore:
                 client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
 
             qemudUpdateClientEvent(client);
-
-            /* Tell one of the workers to get on with it... */
-            virCondSignal(&server->job);
         }
     }
 }
@@ -2311,10 +2226,10 @@ static void *qemudRunLoop(void *opaque) {
         return NULL;
     }
 
-    for (i = 0 ; i < min_workers ; i++) {
-        if (qemudStartWorker(server, &server->workers[i]) < 0)
-            goto cleanup;
-        server->nactiveworkers++;
+    server->workerPool = virWorkerPoolNew(min_workers, max_workers, qemudWorker);
+    if (!server->workerPool) {
+        virMutexUnlock(&server->lock);
+        return NULL;
     }
 
     for (;!server->quitEventThread;) {
@@ -2367,43 +2282,10 @@ static void *qemudRunLoop(void *opaque) {
                 goto reprocess;
             }
         }
-
-        /* If number of active workers exceeds both the min_workers
-         * threshold and the number of clients, then kill some
-         * off */
-        for (i = 0 ; (i < server->nworkers &&
-                      server->nactiveworkers > server->nclients &&
-                      server->nactiveworkers > min_workers) ; i++) {
-
-            if (server->workers[i].hasThread &&
-                !server->workers[i].processingCall) {
-                server->workers[i].quitRequest = 1;
-
-                virCondBroadcast(&server->job);
-                virMutexUnlock(&server->lock);
-                pthread_join(server->workers[i].thread, NULL);
-                virMutexLock(&server->lock);
-                server->workers[i].hasThread = 0;
-                server->nactiveworkers--;
-            }
-        }
-    }
-
-cleanup:
-    for (i = 0 ; i < server->nworkers ; i++) {
-        if (!server->workers[i].hasThread)
-            continue;
-
-        server->workers[i].quitRequest = 1;
-        virCondBroadcast(&server->job);
-
-        virMutexUnlock(&server->lock);
-        pthread_join(server->workers[i].thread, NULL);
-        virMutexLock(&server->lock);
-        server->workers[i].hasThread = 0;
     }
-    VIR_FREE(server->workers);
 
+    virWorkerPoolFree(server->workerPool);
+    server->workerPool = NULL;
     virMutexUnlock(&server->lock);
     return NULL;
 }
diff --git a/daemon/libvirtd.h b/daemon/libvirtd.h
index af20e56..9fa5edb 100644
--- a/daemon/libvirtd.h
+++ b/daemon/libvirtd.h
@@ -49,6 +49,7 @@
 # include "logging.h"
 # include "threads.h"
 # include "network.h"
+# include "threadpool.h"
 
 # if WITH_DTRACE
 #  ifndef LIBVIRTD_PROBES_H
@@ -192,6 +193,8 @@ struct qemud_client {
 
     int magic;
 
+    struct qemud_server *server;
+
     int fd;
     int watch;
     unsigned int readonly :1;
@@ -283,6 +286,7 @@ struct qemud_server {
 
     int privileged;
 
+    virWorkerPoolPtr workerPool;
     size_t nworkers;
     size_t nactiveworkers;
     struct qemud_worker *workers;
-- 
1.7.3


-- 
Thanks,
Hu Tao

--
libvir-list mailing list
libvir-list@xxxxxxxxxx
https://www.redhat.com/mailman/listinfo/libvir-list


[Index of Archives]     [Virt Tools]     [Libvirt Users]     [Lib OS Info]     [Fedora Users]     [Fedora Desktop]     [Fedora SELinux]     [Big List of Linux Books]     [Yosemite News]     [KDE Users]     [Fedora Tools]