On Thu, Dec 02, 2010 at 03:30:23PM +0800, Hu Tao wrote: > --- > daemon/libvirtd.c | 172 +++++++++-------------------------------------------- > daemon/libvirtd.h | 4 + > 2 files changed, 33 insertions(+), 143 deletions(-) > > diff --git a/daemon/libvirtd.c b/daemon/libvirtd.c > index 791b3dc..dbd050a 100644 > --- a/daemon/libvirtd.c > +++ b/daemon/libvirtd.c > @@ -67,6 +67,7 @@ > #include "stream.h" > #include "hooks.h" > #include "virtaudit.h" > +#include "threadpool.h" > #ifdef HAVE_AVAHI > # include "mdns.h" > #endif > @@ -248,7 +249,6 @@ static void sig_handler(int sig, siginfo_t * siginfo, > > static void qemudDispatchClientEvent(int watch, int fd, int events, void *opaque); > static void qemudDispatchServerEvent(int watch, int fd, int events, void *opaque); > -static int qemudStartWorker(struct qemud_server *server, struct qemud_worker *worker); > > void > qemudClientMessageQueuePush(struct qemud_client_message **queue, > @@ -1383,6 +1383,7 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket > client->auth = sock->auth; > client->addr = addr; > client->addrstr = addrstr; > + client->server = server; > addrstr = NULL; This shouldn't be needed, as 'server' shoudl be passed into the worker function via the 'void *opaque' parameter. > > for (i = 0 ; i < VIR_DOMAIN_EVENT_ID_LAST ; i++) { > @@ -1458,19 +1459,6 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket > > server->clients[server->nclients++] = client; > > - if (server->nclients > server->nactiveworkers && > - server->nactiveworkers < server->nworkers) { > - for (i = 0 ; i < server->nworkers ; i++) { > - if (!server->workers[i].hasThread) { > - if (qemudStartWorker(server, &server->workers[i]) < 0) > - return -1; > - server->nactiveworkers++; > - break; > - } > - } > - } > - > - > return 0; > > error: > @@ -1534,100 +1522,27 @@ void qemudDispatchClientFailure(struct qemud_client *client) { > VIR_FREE(client->addrstr); > } > > - > -/* Caller must hold server lock */ > -static struct qemud_client *qemudPendingJob(struct qemud_server *server) > +static void qemudWorker(void *data, void *opaque ATTRIBUTE_UNUSED) > { > - int i; > - for (i = 0 ; i < server->nclients ; i++) { > - virMutexLock(&server->clients[i]->lock); > - if (server->clients[i]->dx) { > - /* Delibrately don't unlock client - caller wants the lock */ > - return server->clients[i]; > - } > - virMutexUnlock(&server->clients[i]->lock); > - } > - return NULL; > -} > + struct qemud_client *client = data; > + struct qemud_client_message *msg; > > -static void *qemudWorker(void *data) > -{ > - struct qemud_worker *worker = data; > - struct qemud_server *server = worker->server; > + virMutexLock(&client->lock); It is neccessary to hold the lock on 'server' before obtaining a lock on 'client'. The server lock can be released again immediately if no longer needed. > > - while (1) { > - struct qemud_client *client = NULL; > - struct qemud_client_message *msg; > + /* Remove our message from dispatch queue while we use it */ > + msg = qemudClientMessageQueueServe(&client->dx); > > - virMutexLock(&server->lock); > - while ((client = qemudPendingJob(server)) == NULL) { > - if (worker->quitRequest || > - virCondWait(&server->job, &server->lock) < 0) { > - virMutexUnlock(&server->lock); > - return NULL; > - } > - } > - if (worker->quitRequest) { > - virMutexUnlock(&client->lock); > - virMutexUnlock(&server->lock); > - return NULL; > - } > - worker->processingCall = 1; > - virMutexUnlock(&server->lock); > - > - /* We own a locked client now... */ > - client->refs++; > - > - /* Remove our message from dispatch queue while we use it */ > - msg = qemudClientMessageQueueServe(&client->dx); > - > - /* This function drops the lock during dispatch, > - * and re-acquires it before returning */ > - if (remoteDispatchClientRequest (server, client, msg) < 0) { > - VIR_FREE(msg); > - qemudDispatchClientFailure(client); > - client->refs--; > - virMutexUnlock(&client->lock); > - continue; > - } > - > - client->refs--; > - virMutexUnlock(&client->lock); > - > - virMutexLock(&server->lock); > - worker->processingCall = 0; > - virMutexUnlock(&server->lock); > - } > -} > - > -static int qemudStartWorker(struct qemud_server *server, > - struct qemud_worker *worker) { > - pthread_attr_t attr; > - pthread_attr_init(&attr); > - /* We want to join workers, so don't detach them */ > - /*pthread_attr_setdetachstate(&attr, 1);*/ > - > - if (worker->hasThread) > - return -1; > - > - worker->server = server; > - worker->hasThread = 1; > - worker->quitRequest = 0; > - worker->processingCall = 0; > - > - if (pthread_create(&worker->thread, > - &attr, > - qemudWorker, > - worker) != 0) { > - worker->hasThread = 0; > - worker->server = NULL; > - return -1; > + /* This function drops the lock during dispatch, > + * and re-acquires it before returning */ > + if (remoteDispatchClientRequest (client->server, client, msg) < 0) { > + VIR_FREE(msg); > + qemudDispatchClientFailure(client); > } > > - return 0; > + client->refs--; > + virMutexUnlock(&client->lock); > } > > - > /* > * Read data into buffer using wire decoding (plain or TLS) > * > @@ -1857,8 +1772,11 @@ readmore: > } > > /* Move completed message to the end of the dispatch queue */ > - if (msg) > + if (msg) { > + client->refs++; > qemudClientMessageQueuePush(&client->dx, msg); > + ignore_value(virThreadPoolSendJob(server->workerPool, client)); > + } > client->nrequests++; > > /* Possibly need to create another receive buffer */ > @@ -1870,9 +1788,6 @@ readmore: > client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN; > > qemudUpdateClientEvent(client); > - > - /* Tell one of the workers to get on with it... */ > - virCondSignal(&server->job); > } > } > } > @@ -2311,10 +2226,14 @@ static void *qemudRunLoop(void *opaque) { > return NULL; > } > > - for (i = 0 ; i < min_workers ; i++) { > - if (qemudStartWorker(server, &server->workers[i]) < 0) > - goto cleanup; > - server->nactiveworkers++; > + server->workerPool = virThreadPoolNew(min_workers, > + max_workers, > + qemudWorker, > + NULL); Should pass 'server' in here, instead of NULL. > + if (!server->workerPool) { > + VIR_ERROR0(_("Failed to create thread pool")); > + virMutexUnlock(&server->lock); > + return NULL; > } > > for (;!server->quitEventThread;) { A small change in that we no longer kill off idle worker threads, but the improved simplicity of libvirtd code makes this a worthwhile tradeoff. So looks good to me aside from the minor locking bug. Regards, Daniel -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list