--- daemon/libvirtd.c | 187 ++++++++--------------------------------------------- daemon/libvirtd.h | 16 +---- 2 files changed, 30 insertions(+), 173 deletions(-) diff --git a/daemon/libvirtd.c b/daemon/libvirtd.c index 791b3dc..229c0cc 100644 --- a/daemon/libvirtd.c +++ b/daemon/libvirtd.c @@ -67,6 +67,7 @@ #include "stream.h" #include "hooks.h" #include "virtaudit.h" +#include "threadpool.h" #ifdef HAVE_AVAHI # include "mdns.h" #endif @@ -248,7 +249,6 @@ static void sig_handler(int sig, siginfo_t * siginfo, static void qemudDispatchClientEvent(int watch, int fd, int events, void *opaque); static void qemudDispatchServerEvent(int watch, int fd, int events, void *opaque); -static int qemudStartWorker(struct qemud_server *server, struct qemud_worker *worker); void qemudClientMessageQueuePush(struct qemud_client_message **queue, @@ -842,18 +842,10 @@ static struct qemud_server *qemudInitialize(void) { VIR_FREE(server); return NULL; } - if (virCondInit(&server->job) < 0) { - VIR_ERROR0(_("cannot initialize condition variable")); - virMutexDestroy(&server->lock); - VIR_FREE(server); - return NULL; - } if (virEventInit() < 0) { VIR_ERROR0(_("Failed to initialize event system")); virMutexDestroy(&server->lock); - if (virCondDestroy(&server->job) < 0) - {} VIR_FREE(server); return NULL; } @@ -1458,19 +1450,6 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket server->clients[server->nclients++] = client; - if (server->nclients > server->nactiveworkers && - server->nactiveworkers < server->nworkers) { - for (i = 0 ; i < server->nworkers ; i++) { - if (!server->workers[i].hasThread) { - if (qemudStartWorker(server, &server->workers[i]) < 0) - return -1; - server->nactiveworkers++; - break; - } - } - } - - return 0; error: @@ -1534,100 +1513,28 @@ void qemudDispatchClientFailure(struct qemud_client *client) { VIR_FREE(client->addrstr); } - -/* Caller must hold server lock */ -static struct qemud_client *qemudPendingJob(struct qemud_server *server) -{ - int i; - for (i = 0 ; i < server->nclients ; i++) { - virMutexLock(&server->clients[i]->lock); - if (server->clients[i]->dx) { - /* Delibrately don't unlock client - caller wants the lock */ - return server->clients[i]; - } - virMutexUnlock(&server->clients[i]->lock); - } - return NULL; -} - -static void *qemudWorker(void *data) +static void qemudWorker(void *data, void *opaque) { - struct qemud_worker *worker = data; - struct qemud_server *server = worker->server; - - while (1) { - struct qemud_client *client = NULL; - struct qemud_client_message *msg; - - virMutexLock(&server->lock); - while ((client = qemudPendingJob(server)) == NULL) { - if (worker->quitRequest || - virCondWait(&server->job, &server->lock) < 0) { - virMutexUnlock(&server->lock); - return NULL; - } - } - if (worker->quitRequest) { - virMutexUnlock(&client->lock); - virMutexUnlock(&server->lock); - return NULL; - } - worker->processingCall = 1; - virMutexUnlock(&server->lock); - - /* We own a locked client now... */ - client->refs++; - - /* Remove our message from dispatch queue while we use it */ - msg = qemudClientMessageQueueServe(&client->dx); - - /* This function drops the lock during dispatch, - * and re-acquires it before returning */ - if (remoteDispatchClientRequest (server, client, msg) < 0) { - VIR_FREE(msg); - qemudDispatchClientFailure(client); - client->refs--; - virMutexUnlock(&client->lock); - continue; - } - - client->refs--; - virMutexUnlock(&client->lock); - - virMutexLock(&server->lock); - worker->processingCall = 0; - virMutexUnlock(&server->lock); - } -} - -static int qemudStartWorker(struct qemud_server *server, - struct qemud_worker *worker) { - pthread_attr_t attr; - pthread_attr_init(&attr); - /* We want to join workers, so don't detach them */ - /*pthread_attr_setdetachstate(&attr, 1);*/ + struct qemud_server *server = opaque; + struct qemud_client *client = data; + struct qemud_client_message *msg; - if (worker->hasThread) - return -1; + virMutexLock(&client->lock); - worker->server = server; - worker->hasThread = 1; - worker->quitRequest = 0; - worker->processingCall = 0; + /* Remove our message from dispatch queue while we use it */ + msg = qemudClientMessageQueueServe(&client->dx); - if (pthread_create(&worker->thread, - &attr, - qemudWorker, - worker) != 0) { - worker->hasThread = 0; - worker->server = NULL; - return -1; + /* This function drops the lock during dispatch, + * and re-acquires it before returning */ + if (remoteDispatchClientRequest (server, client, msg) < 0) { + VIR_FREE(msg); + qemudDispatchClientFailure(client); } - return 0; + client->refs--; + virMutexUnlock(&client->lock); } - /* * Read data into buffer using wire decoding (plain or TLS) * @@ -1857,8 +1764,11 @@ readmore: } /* Move completed message to the end of the dispatch queue */ - if (msg) + if (msg) { + client->refs++; qemudClientMessageQueuePush(&client->dx, msg); + ignore_value(virThreadPoolSendJob(server->workerPool, client)); + } client->nrequests++; /* Possibly need to create another receive buffer */ @@ -1870,9 +1780,6 @@ readmore: client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN; qemudUpdateClientEvent(client); - - /* Tell one of the workers to get on with it... */ - virCondSignal(&server->job); } } } @@ -2305,18 +2212,16 @@ static void *qemudRunLoop(void *opaque) { if (min_workers > max_workers) max_workers = min_workers; - server->nworkers = max_workers; - if (VIR_ALLOC_N(server->workers, server->nworkers) < 0) { - VIR_ERROR0(_("Failed to allocate workers")); + server->workerPool = virThreadPoolNew(min_workers, + max_workers, + qemudWorker, + server); + if (!server->workerPool) { + VIR_ERROR0(_("Failed to create thread pool")); + virMutexUnlock(&server->lock); return NULL; } - for (i = 0 ; i < min_workers ; i++) { - if (qemudStartWorker(server, &server->workers[i]) < 0) - goto cleanup; - server->nactiveworkers++; - } - for (;!server->quitEventThread;) { /* A shutdown timeout is specified, so check * if any drivers have active state, if not @@ -2367,47 +2272,14 @@ static void *qemudRunLoop(void *opaque) { goto reprocess; } } - - /* If number of active workers exceeds both the min_workers - * threshold and the number of clients, then kill some - * off */ - for (i = 0 ; (i < server->nworkers && - server->nactiveworkers > server->nclients && - server->nactiveworkers > min_workers) ; i++) { - - if (server->workers[i].hasThread && - !server->workers[i].processingCall) { - server->workers[i].quitRequest = 1; - - virCondBroadcast(&server->job); - virMutexUnlock(&server->lock); - pthread_join(server->workers[i].thread, NULL); - virMutexLock(&server->lock); - server->workers[i].hasThread = 0; - server->nactiveworkers--; - } - } } - -cleanup: - for (i = 0 ; i < server->nworkers ; i++) { - if (!server->workers[i].hasThread) - continue; - - server->workers[i].quitRequest = 1; - virCondBroadcast(&server->job); - - virMutexUnlock(&server->lock); - pthread_join(server->workers[i].thread, NULL); - virMutexLock(&server->lock); - server->workers[i].hasThread = 0; - } - VIR_FREE(server->workers); for (i = 0; i < server->nclients; i++) qemudFreeClient(server->clients[i]); server->nclients = 0; VIR_SHRINK_N(server->clients, server->nclients_max, server->nclients_max); + virThreadPoolFree(server->workerPool); + server->workerPool = NULL; virMutexUnlock(&server->lock); return NULL; } @@ -2475,9 +2347,6 @@ static void qemudCleanup(struct qemud_server *server) { virStateCleanup(); - if (virCondDestroy(&server->job) < 0) { - ; - } virMutexDestroy(&server->lock); VIR_FREE(server); diff --git a/daemon/libvirtd.h b/daemon/libvirtd.h index af20e56..e4ee63b 100644 --- a/daemon/libvirtd.h +++ b/daemon/libvirtd.h @@ -49,6 +49,7 @@ # include "logging.h" # include "threads.h" # include "network.h" +# include "threadpool.h" # if WITH_DTRACE # ifndef LIBVIRTD_PROBES_H @@ -266,26 +267,13 @@ struct qemud_socket { struct qemud_socket *next; }; -struct qemud_worker { - pthread_t thread; - unsigned int hasThread :1; - unsigned int processingCall :1; - unsigned int quitRequest :1; - - /* back-pointer to our server */ - struct qemud_server *server; -}; - /* Main server state */ struct qemud_server { virMutex lock; - virCond job; int privileged; - size_t nworkers; - size_t nactiveworkers; - struct qemud_worker *workers; + virThreadPoolPtr workerPool; size_t nsockets; struct qemud_socket *sockets; size_t nclients; -- 1.7.3 -- Thanks, Hu Tao -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list