--- daemon/libvirtd.c | 168 ++++++++--------------------------------------------- daemon/libvirtd.h | 4 + 2 files changed, 29 insertions(+), 143 deletions(-) diff --git a/daemon/libvirtd.c b/daemon/libvirtd.c index caf51bf..f4987a3 100644 --- a/daemon/libvirtd.c +++ b/daemon/libvirtd.c @@ -67,6 +67,7 @@ #include "stream.h" #include "hooks.h" #include "virtaudit.h" +#include "threadpool.h" #ifdef HAVE_AVAHI # include "mdns.h" #endif @@ -248,7 +249,6 @@ static void sig_handler(int sig, siginfo_t * siginfo, static void qemudDispatchClientEvent(int watch, int fd, int events, void *opaque); static void qemudDispatchServerEvent(int watch, int fd, int events, void *opaque); -static int qemudStartWorker(struct qemud_server *server, struct qemud_worker *worker); void qemudClientMessageQueuePush(struct qemud_client_message **queue, @@ -1383,6 +1383,7 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket client->auth = sock->auth; client->addr = addr; client->addrstr = addrstr; + client->server = server; addrstr = NULL; for (i = 0 ; i < VIR_DOMAIN_EVENT_ID_LAST ; i++) { @@ -1458,19 +1459,6 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket server->clients[server->nclients++] = client; - if (server->nclients > server->nactiveworkers && - server->nactiveworkers < server->nworkers) { - for (i = 0 ; i < server->nworkers ; i++) { - if (!server->workers[i].hasThread) { - if (qemudStartWorker(server, &server->workers[i]) < 0) - return -1; - server->nactiveworkers++; - break; - } - } - } - - return 0; error: @@ -1534,100 +1522,27 @@ void qemudDispatchClientFailure(struct qemud_client *client) { VIR_FREE(client->addrstr); } - -/* Caller must hold server lock */ -static struct qemud_client *qemudPendingJob(struct qemud_server *server) +static void qemudWorker(void *data) { - int i; - for (i = 0 ; i < server->nclients ; i++) { - virMutexLock(&server->clients[i]->lock); - if (server->clients[i]->dx) { - /* Delibrately don't unlock client - caller wants the lock */ - return server->clients[i]; - } - virMutexUnlock(&server->clients[i]->lock); - } - return NULL; -} + struct qemud_client *client = data; + struct qemud_client_message *msg; -static void *qemudWorker(void *data) -{ - struct qemud_worker *worker = data; - struct qemud_server *server = worker->server; + virMutexLock(&client->lock); - while (1) { - struct qemud_client *client = NULL; - struct qemud_client_message *msg; + /* Remove our message from dispatch queue while we use it */ + msg = qemudClientMessageQueueServe(&client->dx); - virMutexLock(&server->lock); - while ((client = qemudPendingJob(server)) == NULL) { - if (worker->quitRequest || - virCondWait(&server->job, &server->lock) < 0) { - virMutexUnlock(&server->lock); - return NULL; - } - } - if (worker->quitRequest) { - virMutexUnlock(&client->lock); - virMutexUnlock(&server->lock); - return NULL; - } - worker->processingCall = 1; - virMutexUnlock(&server->lock); - - /* We own a locked client now... */ - client->refs++; - - /* Remove our message from dispatch queue while we use it */ - msg = qemudClientMessageQueueServe(&client->dx); - - /* This function drops the lock during dispatch, - * and re-acquires it before returning */ - if (remoteDispatchClientRequest (server, client, msg) < 0) { - VIR_FREE(msg); - qemudDispatchClientFailure(client); - client->refs--; - virMutexUnlock(&client->lock); - continue; - } - - client->refs--; - virMutexUnlock(&client->lock); - - virMutexLock(&server->lock); - worker->processingCall = 0; - virMutexUnlock(&server->lock); - } -} - -static int qemudStartWorker(struct qemud_server *server, - struct qemud_worker *worker) { - pthread_attr_t attr; - pthread_attr_init(&attr); - /* We want to join workers, so don't detach them */ - /*pthread_attr_setdetachstate(&attr, 1);*/ - - if (worker->hasThread) - return -1; - - worker->server = server; - worker->hasThread = 1; - worker->quitRequest = 0; - worker->processingCall = 0; - - if (pthread_create(&worker->thread, - &attr, - qemudWorker, - worker) != 0) { - worker->hasThread = 0; - worker->server = NULL; - return -1; + /* This function drops the lock during dispatch, + * and re-acquires it before returning */ + if (remoteDispatchClientRequest (client->server, client, msg) < 0) { + VIR_FREE(msg); + qemudDispatchClientFailure(client); } - return 0; + client->refs--; + virMutexUnlock(&client->lock); } - /* * Read data into buffer using wire decoding (plain or TLS) * @@ -1857,8 +1772,11 @@ readmore: } /* Move completed message to the end of the dispatch queue */ - if (msg) + if (msg) { + client->refs++; qemudClientMessageQueuePush(&client->dx, msg); + virWorkerPoolSendJob(server->workerPool, client); + } client->nrequests++; /* Possibly need to create another receive buffer */ @@ -1870,9 +1788,6 @@ readmore: client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN; qemudUpdateClientEvent(client); - - /* Tell one of the workers to get on with it... */ - virCondSignal(&server->job); } } } @@ -2311,10 +2226,10 @@ static void *qemudRunLoop(void *opaque) { return NULL; } - for (i = 0 ; i < min_workers ; i++) { - if (qemudStartWorker(server, &server->workers[i]) < 0) - goto cleanup; - server->nactiveworkers++; + server->workerPool = virWorkerPoolNew(min_workers, max_workers, qemudWorker); + if (!server->workerPool) { + virMutexUnlock(&server->lock); + return NULL; } for (;!server->quitEventThread;) { @@ -2367,43 +2282,10 @@ static void *qemudRunLoop(void *opaque) { goto reprocess; } } - - /* If number of active workers exceeds both the min_workers - * threshold and the number of clients, then kill some - * off */ - for (i = 0 ; (i < server->nworkers && - server->nactiveworkers > server->nclients && - server->nactiveworkers > min_workers) ; i++) { - - if (server->workers[i].hasThread && - !server->workers[i].processingCall) { - server->workers[i].quitRequest = 1; - - virCondBroadcast(&server->job); - virMutexUnlock(&server->lock); - pthread_join(server->workers[i].thread, NULL); - virMutexLock(&server->lock); - server->workers[i].hasThread = 0; - server->nactiveworkers--; - } - } - } - -cleanup: - for (i = 0 ; i < server->nworkers ; i++) { - if (!server->workers[i].hasThread) - continue; - - server->workers[i].quitRequest = 1; - virCondBroadcast(&server->job); - - virMutexUnlock(&server->lock); - pthread_join(server->workers[i].thread, NULL); - virMutexLock(&server->lock); - server->workers[i].hasThread = 0; } - VIR_FREE(server->workers); + virWorkerPoolFree(server->workerPool); + server->workerPool = NULL; virMutexUnlock(&server->lock); return NULL; } diff --git a/daemon/libvirtd.h b/daemon/libvirtd.h index af20e56..9fa5edb 100644 --- a/daemon/libvirtd.h +++ b/daemon/libvirtd.h @@ -49,6 +49,7 @@ # include "logging.h" # include "threads.h" # include "network.h" +# include "threadpool.h" # if WITH_DTRACE # ifndef LIBVIRTD_PROBES_H @@ -192,6 +193,8 @@ struct qemud_client { int magic; + struct qemud_server *server; + int fd; int watch; unsigned int readonly :1; @@ -283,6 +286,7 @@ struct qemud_server { int privileged; + virWorkerPoolPtr workerPool; size_t nworkers; size_t nactiveworkers; struct qemud_worker *workers; -- 1.7.3 -- Thanks, Hu Tao -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list