Attached is a new version of the patch which allocs/frees memory for storing the callback data in blocks. It also adds copious comments and fixes a very subtle bug in adding handles during dispatch. The printfs are changed into qemudDebug()s and the nvmfds field is killed here. On Mon, Jun 18, 2007 at 05:49:59AM -0400, Daniel Veillard wrote: > On Sun, Jun 17, 2007 at 10:52:55PM +0100, Daniel P. Berrange wrote: > > The following patch adds a qemud/event.c & qemud/event.h file providing a > > general purpose event loop built around poll. Users register file handles > > and associated callbacks, and / or timers. The qemud.c file is changed to > > make use of these APIs for dealing with server, client, and VM file handles > > and/or sockets. This decouples much of the QEMU VM I/O code from the main > > qemud.c daemon code. b/qemud/event.c | 460 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ b/qemud/event.h | 98 +++++++++++ qemud/Makefile.am | 3 qemud/internal.h | 1 qemud/qemud.c | 400 +++++++++++++++++++++++++--------------------- 5 files changed, 774 insertions(+), 188 deletions(-) Dan. -- |=- Red Hat, Engineering, Emerging Technologies, Boston. +1 978 392 2496 -=| |=- Perl modules: http://search.cpan.org/~danberr/ -=| |=- Projects: http://freshmeat.net/~danielpb/ -=| |=- GnuPG: 7D3B9505 F3C9 553F A1DA 4AC2 5648 23C1 B3DF F742 7D3B 9505 -=|
diff -r 56b36b784a1c qemud/Makefile.am --- a/qemud/Makefile.am Mon Jun 18 11:01:38 2007 -0400 +++ b/qemud/Makefile.am Mon Jun 18 11:02:20 2007 -0400 @@ -16,7 +16,8 @@ libvirt_qemud_SOURCES = \ buf.c buf.h \ protocol.h protocol.c \ remote_protocol.h remote_protocol.c \ - remote.c + remote.c \ + event.c event.h #-D_XOPEN_SOURCE=600 -D_XOPEN_SOURCE_EXTENDED=1 -D_POSIX_C_SOURCE=199506L libvirt_qemud_CFLAGS = \ -I$(top_srcdir)/include -I$(top_builddir)/include $(LIBXML_CFLAGS) \ diff -r 56b36b784a1c qemud/event.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/qemud/event.c Mon Jun 18 14:14:13 2007 -0400 @@ -0,0 +1,460 @@ +/* + * event.h: event loop for monitoring file handles + * + * Copyright (C) 2007 Daniel P. Berrange + * Copyright (C) 2007 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * Author: Daniel P. Berrange <berrange@xxxxxxxxxx> + */ + + +#include <stdlib.h> +#include <string.h> +#include <poll.h> +#include <sys/time.h> +#include <errno.h> + +#include "internal.h" +#include "event.h" + +/* State for a single file handle being monitored */ +struct virEventHandle { + int fd; + int events; + virEventHandleCallback cb; + void *opaque; + int deleted; +}; + +/* State for a single timer being generated */ +struct virEventTimeout { + int timer; + int timeout; + unsigned long long expiresAt; + virEventTimeoutCallback cb; + void *opaque; + int deleted; +}; + +/* Allocate extra slots for virEventHandle/virEventTimeout + records in this multiple */ +#define EVENT_ALLOC_EXTENT 10 + +/* State for the main event loop */ +struct virEventLoop { + int handlesCount; + int handlesAlloc; + struct virEventHandle *handles; + int timeoutsCount; + int timeoutsAlloc; + struct virEventTimeout *timeouts; +}; + +/* Only have one event loop */ +static struct virEventLoop eventLoop; + +/* Unique ID for the next timer to be registered */ +static int nextTimer = 0; + + +/* + * Register a callback for monitoring file handle events. + * NB, it *must* be safe to call this from within a callback + * For this reason we only ever append to existing list. + */ +int virEventAddHandle(int fd, int events, virEventHandleCallback cb, void *opaque) { + qemudDebug("Add handle %d %d %p %p\n", fd, events, cb, opaque); + if (eventLoop.handlesCount == eventLoop.handlesAlloc) { + struct virEventHandle *tmp; + qemudDebug("Used %d handle slots, adding %d more\n", + eventLoop.handlesAlloc, EVENT_ALLOC_EXTENT); + tmp = realloc(eventLoop.handles, + sizeof(struct virEventHandle) * + (eventLoop.handlesAlloc + EVENT_ALLOC_EXTENT)); + if (!tmp) { + return -1; + } + eventLoop.handles = tmp; + eventLoop.handlesAlloc += EVENT_ALLOC_EXTENT; + } + + eventLoop.handles[eventLoop.handlesCount].fd = fd; + eventLoop.handles[eventLoop.handlesCount].events = events; + eventLoop.handles[eventLoop.handlesCount].cb = cb; + eventLoop.handles[eventLoop.handlesCount].opaque = opaque; + eventLoop.handles[eventLoop.handlesCount].deleted = 0; + + eventLoop.handlesCount++; + + return 0; +} + +/* + * Unregister a callback from a file handle + * NB, it *must* be safe to call this from within a callback + * For this reason we only ever set a flag in the existing list. + * Actual deletion will be done out-of-band + */ +int virEventRemoveHandle(int fd) { + int i; + qemudDebug("Remove handle %d\n", fd); + for (i = 0 ; i < eventLoop.handlesCount ; i++) { + if (eventLoop.handles[i].deleted) + continue; + + if (eventLoop.handles[i].fd == fd) { + qemudDebug("mark delete %d\n", i); + eventLoop.handles[i].deleted = 1; + return 0; + } + } + return -1; +} + + +/* + * Register a callback for a timer event + * NB, it *must* be safe to call this from within a callback + * For this reason we only ever append to existing list. + */ +int virEventAddTimeout(int timeout, virEventTimeoutCallback cb, void *opaque) { + struct timeval tv; + + if (gettimeofday(&tv, NULL) < 0) { + return -1; + } + + if (eventLoop.timeoutsCount == eventLoop.timeoutsAlloc) { + struct virEventTimeout *tmp; + qemudDebug("Used %d timeout slots, adding %d more\n", + eventLoop.timeoutsAlloc, EVENT_ALLOC_EXTENT); + tmp = realloc(eventLoop.timeouts, + sizeof(struct virEventTimeout) * + (eventLoop.timeoutsAlloc + EVENT_ALLOC_EXTENT)); + if (!tmp) { + return -1; + } + eventLoop.timeouts = tmp; + eventLoop.timeoutsAlloc += EVENT_ALLOC_EXTENT; + } + + eventLoop.timeouts[eventLoop.timeoutsCount].timer = nextTimer++; + eventLoop.timeouts[eventLoop.timeoutsCount].timeout = timeout; + eventLoop.timeouts[eventLoop.timeoutsCount].cb = cb; + eventLoop.timeouts[eventLoop.timeoutsCount].opaque = opaque; + eventLoop.timeouts[eventLoop.timeoutsCount].deleted = 0; + eventLoop.timeouts[eventLoop.timeoutsCount].expiresAt = + (((unsigned long long)tv.tv_sec)*1000) + + (((unsigned long long)tv.tv_usec)/1000); + + eventLoop.timeoutsCount++; + + return 0; +} + +/* + * Unregister a callback for a timer + * NB, it *must* be safe to call this from within a callback + * For this reason we only ever set a flag in the existing list. + * Actual deletion will be done out-of-band + */ +int virEventRemoveTimeout(int timer) { + int i; + for (i = 0 ; i < eventLoop.timeoutsCount ; i++) { + if (eventLoop.timeouts[i].deleted) + continue; + + if (eventLoop.timeouts[i].timer == timer) { + eventLoop.timeouts[i].deleted = 1; + return 0; + } + } + return -1; +} + +/* Iterates over all registered timeouts and determine which + * will be the first to expire. + * @timeout: filled with expiry time of soonest timer, or -1 if + * no timeout is pending + * returns: 0 on success, -1 on error + */ +static int virEventCalculateTimeout(int *timeout) { + unsigned long long then = 0; + int i; + + /* Figure out if we need a timeout */ + for (i = 0 ; i < eventLoop.timeoutsCount ; i++) { + if (then == 0 || + eventLoop.timeouts[i].expiresAt < then) + then = eventLoop.timeouts[i].expiresAt; + } + + /* Calculate how long we should wait for a timeout if needed */ + if (then > 0) { + struct timeval tv; + + if (gettimeofday(&tv, NULL) < 0) { + return -1; + } + + *timeout = then - + ((((unsigned long long)tv.tv_sec)*1000) + + (((unsigned long long)tv.tv_usec)/1000)); + + if (*timeout < 0) + *timeout = 1; + } else { + *timeout = -1; + } + + return 0; +} + +/* + * Allocate a pollfd array containing data for all registered + * file handles. The caller must free the returned data struct + * returns: the pollfd array, or NULL on error + */ +static struct pollfd *virEventMakePollFDs(void) { + struct pollfd *fds; + int i; + + /* Setup the poll file handle data structs */ + if (!(fds = malloc(sizeof(struct pollfd) * + eventLoop.handlesCount))) + return NULL; + + for (i = 0 ; i < eventLoop.handlesCount ; i++) { + fds[i].fd = eventLoop.handles[i].fd; + fds[i].events = eventLoop.handles[i].events; + fds[i].revents = 0; + qemudDebug("Wait for %d %d\n", eventLoop.handles[i].fd, eventLoop.handles[i].events); + } + + return fds; +} + + +/* + * Iterate over all timers and determine if any have expired. + * Invoke the user supplied callback for each timer whose + * expiry time is met, and schedule the next timeout. Does + * not try to 'catch up' on time if the actual expiry time + * was later than the requested time. + * + * This method must cope with new timers being registered + * by a callback, and must skip any timers marked as deleted. + * + * Returns 0 upon success, -1 if an error occurred + */ +static int virEventDispatchTimeouts(void) { + struct timeval tv; + unsigned long long now; + int i; + /* Save this now - it may be changed during dispatch */ + int ntimeouts = eventLoop.timeoutsCount; + + if (gettimeofday(&tv, NULL) < 0) { + return -1; + } + now = (((unsigned long long)tv.tv_sec)*1000) + + (((unsigned long long)tv.tv_usec)/1000); + + for (i = 0 ; i < ntimeouts ; i++) { + if (eventLoop.timeouts[i].deleted) + continue; + + if (eventLoop.timeouts[i].expiresAt <= now) { + (eventLoop.timeouts[i].cb)(eventLoop.timeouts[i].timer, + eventLoop.timeouts[i].opaque); + eventLoop.timeouts[i].expiresAt = + now + eventLoop.timeouts[i].timeout; + } + } + return 0; +} + + +/* Iterate over all file handles and dispatch any which + * have pending events listed in the poll() data. Invoke + * the user supplied callback for each handle which has + * pending events + * + * This method must cope with new handles being registered + * by a callback, and must skip any handles marked as deleted. + * + * Returns 0 upon success, -1 if an error occurred + */ +static int virEventDispatchHandles(struct pollfd *fds) { + int i; + /* Save this now - it may be changed during dispatch */ + int nhandles = eventLoop.handlesCount; + + for (i = 0 ; i < nhandles ; i++) { + if (eventLoop.handles[i].deleted) { + qemudDebug("Skip deleted %d\n", eventLoop.handles[i].fd); + continue; + } + + if (fds[i].revents) { + qemudDebug("Dispatch %d %d %p\n", fds[i].fd, fds[i].revents, eventLoop.handles[i].opaque); + (eventLoop.handles[i].cb)(fds[i].fd, fds[i].revents, + eventLoop.handles[i].opaque); + } + } + + return 0; +} + + +/* Used post dispatch to actually remove any timers that + * were previously marked as deleted. This asynchronous + * cleanup is needed to make dispatch re-entrant safe. + */ +static int virEventCleanupTimeouts(void) { + int i; + + /* Remove deleted entries, shuffling down remaining + * entries as needed to form contigous series + */ + for (i = 0 ; i < eventLoop.timeoutsCount ; ) { + if (!eventLoop.handles[i].deleted) { + i++; + continue; + } + + if ((i+1) < eventLoop.timeoutsCount) { + memmove(eventLoop.timeouts+i, + eventLoop.timeouts+i+1, + sizeof(struct virEventTimeout)*(eventLoop.timeoutsCount-(i+1))); + } + eventLoop.timeoutsCount--; + } + + /* Release some memory if we've got a big chunk free */ + if ((eventLoop.timeoutsAlloc - EVENT_ALLOC_EXTENT) > eventLoop.timeoutsCount) { + struct virEventTimeout *tmp; + qemudDebug("Releasing %d out of %d timeout slots used, releasing %d\n", + eventLoop.timeoutsCount, eventLoop.timeoutsAlloc, EVENT_ALLOC_EXTENT); + tmp = realloc(eventLoop.timeouts, + sizeof(struct virEventTimeout) * + (eventLoop.timeoutsAlloc - EVENT_ALLOC_EXTENT)); + if (!tmp) { + return -1; + } + eventLoop.timeouts = tmp; + eventLoop.timeoutsAlloc -= EVENT_ALLOC_EXTENT; + } + return 0; +} + +/* Used post dispatch to actually remove any handles that + * were previously marked as deleted. This asynchronous + * cleanup is needed to make dispatch re-entrant safe. + */ +static int virEventCleanupHandles(void) { + int i; + + /* Remove deleted entries, shuffling down remaining + * entries as needed to form contigous series + */ + for (i = 0 ; i < eventLoop.handlesCount ; ) { + if (!eventLoop.handles[i].deleted) { + i++; + continue; + } + + if ((i+1) < eventLoop.handlesCount) { + memmove(eventLoop.handles+i, + eventLoop.handles+i+1, + sizeof(struct virEventHandle)*(eventLoop.handlesCount-(i+1))); + } + eventLoop.handlesCount--; + } + + /* Release some memory if we've got a big chunk free */ + if ((eventLoop.handlesAlloc - EVENT_ALLOC_EXTENT) > eventLoop.handlesCount) { + struct virEventHandle *tmp; + qemudDebug("Releasing %d out of %d handles slots used, releasing %d\n", + eventLoop.handlesCount, eventLoop.handlesAlloc, EVENT_ALLOC_EXTENT); + tmp = realloc(eventLoop.handles, + sizeof(struct virEventHandle) * + (eventLoop.handlesAlloc - EVENT_ALLOC_EXTENT)); + if (!tmp) { + return -1; + } + eventLoop.handles = tmp; + eventLoop.handlesAlloc -= EVENT_ALLOC_EXTENT; + } + return 0; +} + +/* + * Run a single iteration of the event loop, blocking until + * at least one file handle has an event, or a timer expires + */ +int virEventRunOnce(void) { + struct pollfd *fds; + int ret, timeout; + + if (!(fds = virEventMakePollFDs())) + return -1; + + if (virEventCalculateTimeout(&timeout) < 0) { + free(fds); + return -1; + } + + retry: + qemudDebug("Poll on %d handles %p timeout %d\n", eventLoop.handlesCount, fds, timeout); + ret = poll(fds, eventLoop.handlesCount, timeout); + qemudDebug("Poll got %d event\n", ret); + if (ret < 0) { + if (errno == EINTR) { + goto retry; + } + free(fds); + return -1; + } + if (virEventDispatchTimeouts() < 0) { + free(fds); + return -1; + } + + if (ret > 0 && + virEventDispatchHandles(fds) < 0) { + free(fds); + return -1; + } + free(fds); + + if (virEventCleanupTimeouts() < 0) + return -1; + + if (virEventCleanupHandles() < 0) + return -1; + + return 0; +} +/* + * Local variables: + * indent-tabs-mode: nil + * c-indent-level: 4 + * c-basic-offset: 4 + * tab-width: 4 + * End: + */ diff -r 56b36b784a1c qemud/event.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/qemud/event.h Mon Jun 18 12:20:17 2007 -0400 @@ -0,0 +1,98 @@ +/* + * event.h: event loop for monitoring file handles + * + * Copyright (C) 2007 Daniel P. Berrange + * Copyright (C) 2007 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * Author: Daniel P. Berrange <berrange@xxxxxxxxxx> + */ + +#ifndef __VIRTD_EVENT_H__ +#define __VIRTD_EVENT_H__ + + +/** + * virEventHandleCallback: callback for receiving file handle events + * + * @fd: file handle on which the event occured + * @events: bitset of events from POLLnnn constants + * @opaque: user data registered with handle + */ +typedef void (*virEventHandleCallback)(int fd, int events, void *opaque); + +/** + * virEventAddHandle: register a callback for monitoring file handle events + * + * @fd: file handle to monitor for events + * @events: bitset of events to wach from POLLnnn constants + * @cb: callback to invoke when an event occurrs + * @opaque: user data to pass to callback + * + * returns -1 if the file handle cannot be registered, 0 upon success + */ +int virEventAddHandle(int fd, int events, virEventHandleCallback cb, void *opaque); + +/** + * virEventRemoveHandle: unregister a callback from a file handle + * + * @fd: file handle to stop monitoring for events + * + * returns -1 if the file handle was not registered, 0 upon success + */ +int virEventRemoveHandle(int fd); + +/** + * virEventTimeoutCallback: callback for receiving timer events + * + * @timer: timer id emitting the event + * @opaque: user data registered with handle + */ +typedef void (*virEventTimeoutCallback)(int timer, void *opaque); + +/** + * virEventAddTimeout: register a callback for a timer event + * + * @timeout: timeout between events in milliseconds + * @cb: callback to invoke when an event occurrs + * @opaque: user data to pass to callback + * + * returns -1 if the file handle cannot be registered, a positive + * integer timer id upon success + */ +int virEventAddTimeout(int timeout, virEventTimeoutCallback cb, void *opaque); + +/** + * virEventRemoveTimeout: unregister a callback for a timer + * + * @timer: the timer id to remove + * + * returns -1 if the timer was not registered, 0 upon success + */ +int virEventRemoveTimeout(int timer); + + +/** + * virEventRunOnce: run a single iteration of the event loop. + * + * Blocks the caller until at least one file handle has an + * event or the first timer expires. + * + * returns -1 if the event monitoring failed + */ +int virEventRunOnce(void); + +#endif /* __VIRTD_EVENT_H__ */ diff -r 56b36b784a1c qemud/internal.h --- a/qemud/internal.h Mon Jun 18 11:01:38 2007 -0400 +++ b/qemud/internal.h Mon Jun 18 14:20:02 2007 -0400 @@ -338,7 +338,6 @@ struct qemud_server { int nclients; struct qemud_client *clients; int sigread; - int nvmfds; int nactivevms; int ninactivevms; struct qemud_vm *vms; diff -r 56b36b784a1c qemud/qemud.c --- a/qemud/qemud.c Mon Jun 18 11:01:38 2007 -0400 +++ b/qemud/qemud.c Mon Jun 18 14:19:58 2007 -0400 @@ -61,6 +61,7 @@ #include "driver.h" #include "conf.h" #include "iptables.h" +#include "event.h" static int godaemon = 0; /* -d: Be a daemon */ static int verbose = 0; /* -v: Verbose mode */ @@ -109,6 +110,13 @@ static void sig_handler(int sig) { } errno = origerrno; } + +static void qemudDispatchVMEvent(int fd, int events, void *opaque); +static void qemudDispatchClientEvent(int fd, int events, void *opaque); +static void qemudDispatchServerEvent(int fd, int events, void *opaque); +static int qemudRegisterClientEvent(struct qemud_server *server, + struct qemud_client *client, + int remove); static int remoteInitializeGnuTLS (void) @@ -184,8 +192,10 @@ remoteInitializeGnuTLS (void) return 0; } -static int qemudDispatchSignal(struct qemud_server *server) -{ +static void qemudDispatchSignalEvent(int fd ATTRIBUTE_UNUSED, + int events ATTRIBUTE_UNUSED, + void *opaque) { + struct qemud_server *server = (struct qemud_server *)opaque; unsigned char sigc; struct qemud_vm *vm; struct qemud_network *network; @@ -194,7 +204,7 @@ static int qemudDispatchSignal(struct qe if (read(server->sigread, &sigc, 1) != 1) { qemudLog(QEMUD_ERR, "Failed to read from signal pipe: %s", strerror(errno)); - return -1; + return; } ret = 0; @@ -266,7 +276,8 @@ static int qemudDispatchSignal(struct qe break; } - return ret; + if (ret != 0) + server->shutdown = 1; } static int qemudSetCloseExec(int fd) { @@ -474,19 +485,16 @@ static int qemudListenUnix(struct qemud_ } sock->readonly = readonly; - sock->next = server->sockets; - server->sockets = sock; - server->nsockets++; if ((sock->fd = socket(PF_UNIX, SOCK_STREAM, 0)) < 0) { qemudLog(QEMUD_ERR, "Failed to create socket: %s", strerror(errno)); - return -1; + goto cleanup; } if (qemudSetCloseExec(sock->fd) < 0 || qemudSetNonBlock(sock->fd) < 0) - return -1; + goto cleanup; memset(&addr, 0, sizeof(addr)); addr.sun_family = AF_UNIX; @@ -502,17 +510,35 @@ static int qemudListenUnix(struct qemud_ if (bind(sock->fd, (struct sockaddr *)&addr, sizeof(addr)) < 0) { qemudLog(QEMUD_ERR, "Failed to bind socket to '%s': %s", path, strerror(errno)); - return -1; + goto cleanup; } umask(oldmask); if (listen(sock->fd, 30) < 0) { qemudLog(QEMUD_ERR, "Failed to listen for connections on '%s': %s", path, strerror(errno)); - return -1; - } + goto cleanup; + } + + if (virEventAddHandle(sock->fd, + POLLIN| POLLERR | POLLHUP, + qemudDispatchServerEvent, + server) < 0) { + qemudLog(QEMUD_ERR, "Failed to add server event callback"); + goto cleanup; + } + + sock->next = server->sockets; + server->sockets = sock; + server->nsockets++; return 0; + + cleanup: + if (sock->fd) + close(sock->fd); + free(sock); + return -1; } // See: http://people.redhat.com/drepper/userapi-ipv6.html @@ -606,6 +632,15 @@ remoteListenTCP (struct qemud_server *se "remoteListenTCP: listen: %s", strerror (errno)); return -1; } + + if (virEventAddHandle(sock->fd, + POLLIN| POLLERR | POLLHUP, + qemudDispatchServerEvent, + server) < 0) { + qemudLog(QEMUD_ERR, "Failed to add server event callback"); + return -1; + } + } return 0; @@ -1026,11 +1061,15 @@ static int qemudDispatchServer(struct qe if (!client->tls) { client->mode = QEMUD_MODE_RX_HEADER; client->bufferLength = QEMUD_PKT_HEADER_XDR_LEN; + + if (qemudRegisterClientEvent (server, client, 0) < 0) + goto cleanup; } else { int ret; client->session = remoteInitializeTLSSession (); - if (client->session == NULL) goto tls_failed; + if (client->session == NULL) + goto cleanup; gnutls_transport_set_ptr (client->session, (gnutls_transport_ptr_t) (long) fd); @@ -1040,16 +1079,22 @@ static int qemudDispatchServer(struct qe if (ret == 0) { /* Unlikely, but ... Next step is to check the certificate. */ if (remoteCheckAccess (client) == -1) - goto tls_failed; + goto cleanup; + + if (qemudRegisterClientEvent(server, client, 0) < 0) + goto cleanup; } else if (ret == GNUTLS_E_INTERRUPTED || ret == GNUTLS_E_AGAIN) { /* Most likely. */ client->mode = QEMUD_MODE_TLS_HANDSHAKE; client->bufferLength = -1; client->direction = gnutls_record_get_direction (client->session); + + if (qemudRegisterClientEvent (server, client, 0) < 0) + goto cleanup; } else { qemudLog (QEMUD_ERR, "TLS handshake failed: %s", gnutls_strerror (ret)); - goto tls_failed; + goto cleanup; } } @@ -1059,7 +1104,7 @@ static int qemudDispatchServer(struct qe return 0; - tls_failed: + cleanup: if (client->session) gnutls_deinit (client->session); close (fd); free (client); @@ -1453,7 +1498,15 @@ int qemudStartVMDaemon(struct qemud_serv server->ninactivevms--; server->nactivevms++; - server->nvmfds += 2; + + virEventAddHandle(vm->stdout, + POLLIN | POLLERR | POLLHUP, + qemudDispatchVMEvent, + server); + virEventAddHandle(vm->stderr, + POLLIN | POLLERR | POLLHUP, + qemudDispatchVMEvent, + server); ret = 0; @@ -1497,6 +1550,8 @@ static void qemudDispatchClientFailure(s tmp = tmp->next; } + virEventRemoveHandle(client->fd); + if (client->tls && client->session) gnutls_deinit (client->session); close(client->fd); free(client); @@ -1590,6 +1645,8 @@ static int qemudClientRead(struct qemud_ } else { ret = gnutls_record_recv (client->session, data, len); client->direction = gnutls_record_get_direction (client->session); + if (qemudRegisterClientEvent (server, client, 1) < 0) + qemudDispatchClientFailure (server, client); if (ret <= 0) { if (ret == 0 || (ret != GNUTLS_E_AGAIN && ret != GNUTLS_E_INTERRUPTED)) { @@ -1655,6 +1712,11 @@ static void qemudDispatchClientRead(stru xdr_destroy (&x); + if (qemudRegisterClientEvent(server, client, 1) < 0) { + qemudDispatchClientFailure(server, client); + return; + } + /* Fall through */ } @@ -1679,6 +1741,8 @@ static void qemudDispatchClientRead(stru if (remote && h.prog == REMOTE_PROGRAM) { remoteDispatchClientRequest (server, client); + if (qemudRegisterClientEvent(server, client, 1) < 0) + qemudDispatchClientFailure(server, client); } else if (!remote && h.prog == QEMUD_PROGRAM) { qemud_packet_client p; @@ -1689,6 +1753,9 @@ static void qemudDispatchClientRead(stru } qemudDispatchClientRequest(server, client, &p); + + if (qemudRegisterClientEvent(server, client, 1) < 0) + qemudDispatchClientFailure(server, client); } else { /* An internal error. */ qemudDebug ("Not REMOTE_PROGRAM or QEMUD_PROGRAM"); @@ -1709,12 +1776,17 @@ static void qemudDispatchClientRead(stru /* Finished. Next step is to check the certificate. */ if (remoteCheckAccess (client) == -1) qemudDispatchClientFailure (server, client); + if (qemudRegisterClientEvent (server, client, 1) < 0) + qemudDispatchClientFailure (server, client); } else if (ret != GNUTLS_E_AGAIN && ret != GNUTLS_E_INTERRUPTED) { qemudLog (QEMUD_ERR, "TLS handshake failed: %s", gnutls_strerror (ret)); qemudDispatchClientFailure (server, client); - } else + } else { client->direction = gnutls_record_get_direction (client->session); + if (qemudRegisterClientEvent (server ,client, 1) < 0) + qemudDispatchClientFailure (server, client); + } break; } @@ -1745,6 +1817,8 @@ static int qemudClientWrite(struct qemud } else { ret = gnutls_record_send (client->session, data, len); client->direction = gnutls_record_get_direction (client->session); + if (qemudRegisterClientEvent (server, client, 1) < 0) + qemudDispatchClientFailure (server, client); if (ret < 0) { if (ret != GNUTLS_E_INTERRUPTED && ret != GNUTLS_E_AGAIN) { qemudLog (QEMUD_ERR, "gnutls_record_send: %s", @@ -1772,6 +1846,9 @@ static void qemudDispatchClientWrite(str client->bufferLength = QEMUD_PKT_HEADER_XDR_LEN; client->bufferOffset = 0; if (client->tls) client->direction = QEMUD_TLS_DIRECTION_READ; + + if (qemudRegisterClientEvent (server, client, 1) < 0) + qemudDispatchClientFailure (server, client); } /* Still writing */ break; @@ -1786,12 +1863,18 @@ static void qemudDispatchClientWrite(str /* Finished. Next step is to check the certificate. */ if (remoteCheckAccess (client) == -1) qemudDispatchClientFailure (server, client); + + if (qemudRegisterClientEvent (server, client, 1)) + qemudDispatchClientFailure (server, client); } else if (ret != GNUTLS_E_AGAIN && ret != GNUTLS_E_INTERRUPTED) { qemudLog (QEMUD_ERR, "TLS handshake failed: %s", gnutls_strerror (ret)); qemudDispatchClientFailure (server, client); - } else + } else { client->direction = gnutls_record_get_direction (client->session); + if (qemudRegisterClientEvent (server, client, 1)) + qemudDispatchClientFailure (server, client); + } break; } @@ -1842,6 +1925,10 @@ int qemudShutdownVMDaemon(struct qemud_s qemudVMData(server, vm, vm->stdout); qemudVMData(server, vm, vm->stderr); + + virEventRemoveHandle(vm->stdout); + virEventRemoveHandle(vm->stderr); + if (close(vm->logfile) < 0) qemudLog(QEMUD_WARN, "Unable to close logfile %d: %s", errno, strerror(errno)); close(vm->stdout); @@ -1852,7 +1939,6 @@ int qemudShutdownVMDaemon(struct qemud_s vm->stdout = -1; vm->stderr = -1; vm->monitor = -1; - server->nvmfds -= 2; if (waitpid(vm->pid, NULL, WNOHANG) != vm->pid) { kill(vm->pid, SIGKILL); @@ -2340,94 +2426,104 @@ int qemudShutdownNetworkDaemon(struct qe } -static int qemudDispatchPoll(struct qemud_server *server, struct pollfd *fds) { +static void qemudDispatchVMEvent(int fd, int events, void *opaque) { + struct qemud_server *server = (struct qemud_server *)opaque; + struct qemud_vm *vm = server->vms; + + while (vm) { + if (qemudIsActiveVM(vm) && + (vm->stdout == fd || + vm->stderr == fd)) + break; + + vm = vm->next; + } + + if (!vm) + return; + + if (events == POLLIN && + qemudDispatchVMLog(server, vm, fd) == 0) + return; + + qemudDispatchVMFailure(server, vm, fd); +} + +static void qemudDispatchClientEvent(int fd, int events, void *opaque) { + struct qemud_server *server = (struct qemud_server *)opaque; + struct qemud_client *client = server->clients; + + while (client) { + if (client->fd == fd) + break; + + client = client->next; + } + + if (!client) + return; + + if (events == POLLOUT) + qemudDispatchClientWrite(server, client); + else if (events == POLLIN) + qemudDispatchClientRead(server, client); + else + qemudDispatchClientFailure(server, client); +} + +static int qemudRegisterClientEvent(struct qemud_server *server, + struct qemud_client *client, + int removeFirst) { + if (removeFirst) + if (virEventRemoveHandle(client->fd) < 0) + return -1; + + if (client->tls) { + if (virEventAddHandle(client->fd, + (client->direction ? + POLLOUT : POLLIN) | POLLERR | POLLHUP, + qemudDispatchClientEvent, + server) < 0) + return -1; + } else { + if (virEventAddHandle(client->fd, + (client->mode == QEMUD_MODE_TX_PACKET ? + POLLOUT : POLLIN) | POLLERR | POLLHUP, + qemudDispatchClientEvent, + server) < 0) + return -1; + } + + return 0; +} + +static void qemudDispatchServerEvent(int fd, int events, void *opaque) { + struct qemud_server *server = (struct qemud_server *)opaque; struct qemud_socket *sock = server->sockets; - struct qemud_client *client = server->clients; - struct qemud_vm *vm; - struct qemud_network *network; - int ret = 0; - int fd = 0; - - if (fds[fd++].revents && qemudDispatchSignal(server) < 0) - return -1; - - if (server->shutdown) - return 0; - - vm = server->vms; + + while (sock) { + if (sock->fd == fd) + break; + + sock = sock->next; + } + + if (!sock) + return; + + if (events) + qemudDispatchServer(server, sock); +} + + +static void qemudCleanupInactive(struct qemud_server *server) { + struct qemud_vm *vm = server->vms; + struct qemud_network *network = server->networks; + + /* Cleanup any VMs which shutdown & dont have an associated + config file */ while (vm) { struct qemud_vm *next = vm->next; - int failed = 0, - stdoutfd = vm->stdout, - stderrfd = vm->stderr; - - if (!qemudIsActiveVM(vm)) { - vm = next; - continue; - } - - if (stdoutfd != -1) { - if (fds[fd].revents) { - if (fds[fd].revents == POLLIN) { - if (qemudDispatchVMLog(server, vm, fds[fd].fd) < 0) - failed = 1; - } else { - if (qemudDispatchVMFailure(server, vm, fds[fd].fd) < 0) - failed = 1; - } - } - fd++; - } - if (stderrfd != -1) { - if (!failed) { - if (fds[fd].revents) { - if (fds[fd].revents == POLLIN) { - if (qemudDispatchVMLog(server, vm, fds[fd].fd) < 0) - failed = 1; - } else { - if (qemudDispatchVMFailure(server, vm, fds[fd].fd) < 0) - failed = 1; - } - } - } - fd++; - } - vm = next; - if (failed) - ret = -1; /* FIXME: the daemon shouldn't exit on failure here */ - } - while (client) { - struct qemud_client *next = client->next; - - assert (client->magic == QEMUD_CLIENT_MAGIC); - - if (fds[fd].revents) { - qemudDebug("Poll data normal"); - if (fds[fd].revents == POLLOUT) - qemudDispatchClientWrite(server, client); - else if (fds[fd].revents == POLLIN) - qemudDispatchClientRead(server, client); - else - qemudDispatchClientFailure(server, client); - } - fd++; - client = next; - } - while (sock) { - struct qemud_socket *next = sock->next; - /* FIXME: the daemon shouldn't exit on error here */ - if (fds[fd].revents) - if (qemudDispatchServer(server, sock) < 0) - return -1; - fd++; - sock = next; - } - - /* Cleanup any VMs which shutdown & dont have an associated - config file */ - vm = server->vms; - while (vm) { - struct qemud_vm *next = vm->next; if (!qemudIsActiveVM(vm) && !vm->configFile[0]) qemudRemoveInactiveVM(server, vm); @@ -2436,7 +2532,6 @@ static int qemudDispatchPoll(struct qemu } /* Cleanup any networks too */ - network = server->networks; while (network) { struct qemud_network *next = network->next; @@ -2446,91 +2541,16 @@ static int qemudDispatchPoll(struct qemu network = next; } - return ret; -} - -static void qemudPreparePoll(struct qemud_server *server, struct pollfd *fds) { - int fd = 0; - struct qemud_socket *sock; - struct qemud_client *client; - struct qemud_vm *vm; - - fds[fd].fd = server->sigread; - fds[fd].events = POLLIN; - fd++; - - for (vm = server->vms ; vm ; vm = vm->next) { - if (!qemudIsActiveVM(vm)) - continue; - if (vm->stdout != -1) { - fds[fd].fd = vm->stdout; - fds[fd].events = POLLIN | POLLERR | POLLHUP; - fd++; - } - if (vm->stderr != -1) { - fds[fd].fd = vm->stderr; - fds[fd].events = POLLIN | POLLERR | POLLHUP; - fd++; - } - } - for (client = server->clients ; client ; client = client->next) { - fds[fd].fd = client->fd; - if (!client->tls) { - /* Refuse to read more from client if tx is pending to - rate limit */ - if (client->mode == QEMUD_MODE_TX_PACKET) - fds[fd].events = POLLOUT | POLLERR | POLLHUP; - else - fds[fd].events = POLLIN | POLLERR | POLLHUP; - } else { - qemudDebug ("direction = %s", - client->direction ? "WRITE" : "READ"); - fds[fd].events = client->direction ? POLLOUT : POLLIN; - fds[fd].events |= POLLERR | POLLHUP; - } - fd++; - } - for (sock = server->sockets ; sock ; sock = sock->next) { - fds[fd].fd = sock->fd; - fds[fd].events = POLLIN; - fd++; - } + return; } static int qemudOneLoop(struct qemud_server *server) { - int nfds = server->nsockets + server->nclients + server->nvmfds + 1; /* server->sigread */ - struct pollfd fds[nfds]; - int thistimeout = -1; - int ret; sig_atomic_t errors; - /* If we have no clients or vms, then timeout after - 30 seconds, letting daemon exit */ - if (timeout > 0 && - !server->nclients && - !server->nactivevms) - thistimeout = timeout; - - qemudPreparePoll(server, fds); - - retry: - - if ((ret = poll(fds, nfds, thistimeout * 1000)) < 0) { - if (errno == EINTR) { - goto retry; - } - qemudLog(QEMUD_ERR, "Error polling on file descriptors: %s", - strerror(errno)); - return -1; - } - - /* Must have timed out */ - if (ret == 0) { - qemudLog(QEMUD_INFO, "Timed out while polling on file descriptors"); - return -1; - } + if (virEventRunOnce() < 0) + return -1; /* Check for any signal handling errors and log them. */ errors = sig_errors; @@ -2542,8 +2562,7 @@ static int qemudOneLoop(struct qemud_ser return -1; } - if (qemudDispatchPoll(server, fds) < 0) - return -1; + qemudCleanupInactive(server); return 0; } @@ -2941,6 +2960,15 @@ int main(int argc, char **argv) { goto error2; } + if (virEventAddHandle(sigpipe[0], + POLLIN, + qemudDispatchSignalEvent, + server) < 0) { + qemudLog(QEMUD_ERR, "Failed to register callback for signal pipe"); + ret = 3; + goto error2; + } + qemudRunLoop(server); qemudCleanup(server);