The libvirtd.conf file has three parameters max_clients min_workers max_workers When the daemon starts up it spawns min_workers threads. It accepts connections from upto max_clients. I never implemented the logic to auto-spawn more threads upto max_workers though. This patch addresses that. Upon accept()ing a client connection if the number of clients is greater than the number of active worker threads, we spawn another worker, unless we've hit the max workers limit. If the number of clients is greater than the max_workers, this means some clients may have to wait for other clients requests to finish before eing processed. No great problem This also fixes a shutdown problem. We were marking the threads as detached, but still calling pthread_join() on them. This gives an error on Linux, but just hangs on Solaris while it tries to join a thread that has no intention of exiting. So during shutdown we set a 'quit' flag on the worker, and then broadcast a signal to wake it up from its condition variable sleep. Upon wakup it notices the quit flag and exits, allowing us to join & cleanup. qemud.c | 109 +++++++++++++++++++++++++++++++++++++++++++++++++++++++--------- qemud.h | 13 +++++++ 2 files changed, 106 insertions(+), 16 deletions(-) Daniel diff --git a/qemud/qemud.c b/qemud/qemud.c --- a/qemud/qemud.c +++ b/qemud/qemud.c @@ -167,7 +167,7 @@ static void sig_handler(int sig, siginfo static void qemudDispatchClientEvent(int watch, int fd, int events, void *opaque); static void qemudDispatchServerEvent(int watch, int fd, int events, void *opaque); - +static int qemudStartWorker(struct qemud_server *server, struct qemud_worker *worker); void qemudClientMessageQueuePush(struct qemud_client_message **queue, @@ -1248,6 +1248,20 @@ static int qemudDispatchServer(struct qe server->clients[server->nclients++] = client; + if (server->nclients > server->nactiveworkers && + server->nactiveworkers < server->nworkers) { + int i; + for (i = 0 ; i < server->nworkers ; i++) { + if (!server->workers[i].active) { + if (qemudStartWorker(server, &server->workers[i]) < 0) + return -1; + server->nactiveworkers++; + break; + } + } + } + + return 0; cleanup: @@ -1303,19 +1317,28 @@ static struct qemud_client *qemudPending static void *qemudWorker(void *data) { - struct qemud_server *server = data; + struct qemud_worker *worker = data; + struct qemud_server *server = worker->server; while (1) { struct qemud_client *client = NULL; struct qemud_client_message *reply; virMutexLock(&server->lock); - while ((client = qemudPendingJob(server)) == NULL) { + while (((client = qemudPendingJob(server)) == NULL) && + !worker->quit) { if (virCondWait(&server->job, &server->lock) < 0) { virMutexUnlock(&server->lock); return NULL; } } + if (worker->quit) { + if (client) + virMutexUnlock(&client->lock); + virMutexUnlock(&server->lock); + return NULL; + } + worker->processing = 1; virMutexUnlock(&server->lock); /* We own a locked client now... */ @@ -1342,9 +1365,40 @@ static void *qemudWorker(void *data) client->refs--; virMutexUnlock(&client->lock); + + virMutexLock(&server->lock); + worker->processing = 0; + virMutexUnlock(&server->lock); } } +static int qemudStartWorker(struct qemud_server *server, + struct qemud_worker *worker) { + pthread_attr_t attr; + pthread_attr_init(&attr); + /* We want to join workers, so don't detach them */ + /*pthread_attr_setdetachstate(&attr, 1);*/ + + if (worker->active) + return -1; + + worker->server = server; + worker->active = 1; + worker->quit = 0; + worker->processing = 0; + + if (pthread_create(&worker->thread, + &attr, + qemudWorker, + worker) != 0) { + worker->active = 0; + worker->server = NULL; + return -1; + } + + return 0; +} + /* * Read data into buffer using wire decoding (plain or TLS) @@ -1888,21 +1942,19 @@ static int qemudRunLoop(struct qemud_ser virMutexLock(&server->lock); - server->nworkers = min_workers; + if (min_workers > max_workers) + max_workers = min_workers; + + server->nworkers = max_workers; if (VIR_ALLOC_N(server->workers, server->nworkers) < 0) { VIR_ERROR0(_("Failed to allocate workers")); return -1; } - for (i = 0 ; i < server->nworkers ; i++) { - pthread_attr_t attr; - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, 1); - - pthread_create(&server->workers[i], - &attr, - qemudWorker, - server); + for (i = 0 ; i < min_workers ; i++) { + if (qemudStartWorker(server, &server->workers[i]) < 0) + goto cleanup; + server->nactiveworkers++; } for (;;) { @@ -1948,6 +2000,26 @@ static int qemudRunLoop(struct qemud_ser } } + /* If number of active workers exceeds both the min_workers + * threshold and the number of clients, then kill some + * off */ + for (i = 0 ; (i < server->nworkers && + server->nactiveworkers > server->nclients && + server->nactiveworkers > min_workers) ; i++) { + + if (server->workers[i].active && + !server->workers[i].processing) { + server->workers[i].quit = 1; + + virCondBroadcast(&server->job); + virMutexUnlock(&server->lock); + pthread_join(server->workers[i].thread, NULL); + virMutexLock(&server->lock); + server->workers[i].active = 0; + server->nactiveworkers--; + } + } + /* Unregister any timeout that's active, since we * just had an event processed */ @@ -1963,11 +2035,18 @@ static int qemudRunLoop(struct qemud_ser } } +cleanup: for (i = 0 ; i < server->nworkers ; i++) { - pthread_t thread = server->workers[i]; + if (!server->workers[i].active) + continue; + + server->workers[i].quit = 1; + virCondBroadcast(&server->job); + virMutexUnlock(&server->lock); - pthread_join(thread, NULL); + pthread_join(server->workers[i].thread, NULL); virMutexLock(&server->lock); + server->workers[i].active = 0; } VIR_FREE(server->workers); diff --git a/qemud/qemud.h b/qemud/qemud.h --- a/qemud/qemud.h +++ b/qemud/qemud.h @@ -159,13 +159,24 @@ struct qemud_socket { struct qemud_socket *next; }; +struct qemud_worker { + pthread_t thread; + int active :1; + int processing :1; + int quit : 1; + + /* back-pointer to our server */ + struct qemud_server *server; +}; + /* Main server state */ struct qemud_server { virMutex lock; virCond job; int nworkers; - pthread_t *workers; + int nactiveworkers; + struct qemud_worker *workers; int nsockets; struct qemud_socket *sockets; int nclients; -- |: Red Hat, Engineering, London -o- http://people.redhat.com/berrange/ :| |: http://libvirt.org -o- http://virt-manager.org -o- http://ovirt.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: GnuPG: 7D3B9505 -o- F3C9 553F A1DA 4AC2 5648 23C1 B3DF F742 7D3B 9505 :| -- Libvir-list mailing list Libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list