This converts the QEMU monitor APIs to use the per-VM event loop, which involves switching from virEvent APIs to GMainContext / GSource APIs. A GSocket is used as a convenient way to create a GSource for a socket, but is not yet used for actual I/O. Signed-off-by: Daniel P. Berrangé <berrange@xxxxxxxxxx> --- src/qemu/qemu_monitor.c | 145 ++++++++++++++++------------------- src/qemu/qemu_monitor.h | 3 +- src/qemu/qemu_process.c | 6 +- tests/qemumonitortestutils.c | 1 + 4 files changed, 71 insertions(+), 84 deletions(-) diff --git a/src/qemu/qemu_monitor.c b/src/qemu/qemu_monitor.c index bf53962872..d969853963 100644 --- a/src/qemu/qemu_monitor.c +++ b/src/qemu/qemu_monitor.c @@ -24,6 +24,7 @@ #include <poll.h> #include <unistd.h> #include <fcntl.h> +#include <gio/gio.h> #include "qemu_monitor.h" #include "qemu_monitor_text.h" @@ -71,12 +72,9 @@ struct _qemuMonitor { int fd; - /* Represents the watch number to be used for updating and - * unregistering the monitor @fd for events in the event loop: - * > 0: valid watch number - * = 0: not registered - * < 0: an error occurred during the registration of @fd */ - int watch; + GMainContext *context; + GSocket *socket; + GSource *watch; virDomainObjPtr vm; @@ -226,6 +224,7 @@ qemuMonitorDispose(void *obj) (mon->cb->destroy)(mon, mon->vm, mon->callbackOpaque); virObjectUnref(mon->vm); + g_main_context_unref(mon->context); virResetError(&mon->lastError); virCondDestroy(&mon->notify); VIR_FREE(mon->buffer); @@ -509,27 +508,16 @@ qemuMonitorIORead(qemuMonitorPtr mon) static void qemuMonitorUpdateWatch(qemuMonitorPtr mon) { - int events = - VIR_EVENT_HANDLE_HANGUP | - VIR_EVENT_HANDLE_ERROR; - - if (!mon->watch) - return; - - if (mon->lastError.code == VIR_ERR_OK) { - events |= VIR_EVENT_HANDLE_READABLE; - - if ((mon->msg && mon->msg->txOffset < mon->msg->txLength) && - !mon->waitGreeting) - events |= VIR_EVENT_HANDLE_WRITABLE; - } - - virEventUpdateHandle(mon->watch, events); + qemuMonitorUnregister(mon); + if (mon->socket) + qemuMonitorRegister(mon); } -static void -qemuMonitorIO(int watch, int fd, int events, void *opaque) +static gboolean +qemuMonitorIO(GSocket *socket G_GNUC_UNUSED, + GIOCondition cond, + gpointer opaque) { qemuMonitorPtr mon = opaque; bool error = false; @@ -541,39 +529,29 @@ qemuMonitorIO(int watch, int fd, int events, void *opaque) /* lock access to the monitor and protect fd */ virObjectLock(mon); #if DEBUG_IO - VIR_DEBUG("Monitor %p I/O on watch %d fd %d events %d", mon, watch, fd, events); + VIR_DEBUG("Monitor %p I/O on socket %p cond %d", mon, socket, cond); #endif - if (mon->fd == -1 || mon->watch == 0) { + if (mon->fd == -1 || !mon->watch) { virObjectUnlock(mon); virObjectUnref(mon); - return; + return G_SOURCE_REMOVE; } - if (mon->fd != fd || mon->watch != watch) { - if (events & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR)) - eof = true; - virReportError(VIR_ERR_INTERNAL_ERROR, - _("event from unexpected fd %d!=%d / watch %d!=%d"), - mon->fd, fd, mon->watch, watch); - error = true; - } else if (mon->lastError.code != VIR_ERR_OK) { - if (events & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR)) + if (mon->lastError.code != VIR_ERR_OK) { + if (cond & (G_IO_HUP | G_IO_ERR)) eof = true; error = true; } else { - if (events & VIR_EVENT_HANDLE_WRITABLE) { + if (cond & G_IO_OUT) { if (qemuMonitorIOWrite(mon) < 0) { error = true; if (errno == ECONNRESET) hangup = true; } - events &= ~VIR_EVENT_HANDLE_WRITABLE; } - if (!error && - events & VIR_EVENT_HANDLE_READABLE) { + if (!error && cond & G_IO_IN) { int got = qemuMonitorIORead(mon); - events &= ~VIR_EVENT_HANDLE_READABLE; if (got < 0) { error = true; if (errno == ECONNRESET) @@ -581,37 +559,29 @@ qemuMonitorIO(int watch, int fd, int events, void *opaque) } else if (got == 0) { eof = true; } else { - /* Ignore hangup/error events if we read some data, to + /* Ignore hangup/error cond if we read some data, to * give time for that data to be consumed */ - events = 0; + cond = 0; if (qemuMonitorIOProcess(mon) < 0) error = true; } } - if (events & VIR_EVENT_HANDLE_HANGUP) { + if (cond & G_IO_HUP) { hangup = true; if (!error) { virReportError(VIR_ERR_INTERNAL_ERROR, "%s", _("End of file from qemu monitor")); eof = true; - events &= ~VIR_EVENT_HANDLE_HANGUP; } } if (!error && !eof && - events & VIR_EVENT_HANDLE_ERROR) { + cond & G_IO_ERR) { virReportError(VIR_ERR_INTERNAL_ERROR, "%s", _("Invalid file descriptor while waiting for monitor")); eof = true; - events &= ~VIR_EVENT_HANDLE_ERROR; - } - if (!error && events) { - virReportError(VIR_ERR_INTERNAL_ERROR, - _("Unhandled event %d for monitor fd %d"), - events, mon->fd); - error = true; } } @@ -679,16 +649,20 @@ qemuMonitorIO(int watch, int fd, int events, void *opaque) virObjectUnlock(mon); virObjectUnref(mon); } + + return G_SOURCE_REMOVE; } static qemuMonitorPtr qemuMonitorOpenInternal(virDomainObjPtr vm, int fd, + GMainContext *context, qemuMonitorCallbacksPtr cb, void *opaque) { qemuMonitorPtr mon; + g_autoptr(GError) gerr = NULL; if (!cb->eofNotify) { virReportError(VIR_ERR_INTERNAL_ERROR, "%s", @@ -713,6 +687,7 @@ qemuMonitorOpenInternal(virDomainObjPtr vm, goto cleanup; } mon->fd = fd; + mon->context = g_main_context_ref(context); mon->vm = virObjectRef(vm); mon->waitGreeting = true; mon->cb = cb; @@ -723,20 +698,17 @@ qemuMonitorOpenInternal(virDomainObjPtr vm, "%s", _("Unable to set monitor close-on-exec flag")); goto cleanup; } - if (virSetNonBlock(mon->fd) < 0) { + + mon->socket = g_socket_new_from_fd(fd, &gerr); + if (!mon->socket) { virReportError(VIR_ERR_INTERNAL_ERROR, - "%s", _("Unable to put monitor into non-blocking mode")); + _("Unable to create socket object: %s"), + gerr->message); goto cleanup; } - virObjectLock(mon); - if (!qemuMonitorRegister(mon)) { - virObjectUnlock(mon); - virReportError(VIR_ERR_INTERNAL_ERROR, "%s", - _("unable to register monitor events")); - goto cleanup; - } + qemuMonitorRegister(mon); PROBE(QEMU_MONITOR_NEW, "mon=%p refs=%d fd=%d", @@ -782,6 +754,7 @@ qemuMonitorOpen(virDomainObjPtr vm, virDomainChrSourceDefPtr config, bool retry, unsigned long long timeout, + GMainContext *context, qemuMonitorCallbacksPtr cb, void *opaque) { @@ -815,7 +788,7 @@ qemuMonitorOpen(virDomainObjPtr vm, goto cleanup; } - ret = qemuMonitorOpenInternal(vm, fd, cb, opaque); + ret = qemuMonitorOpenInternal(vm, fd, context, cb, opaque); cleanup: if (!ret) VIR_FORCE_CLOSE(fd); @@ -830,25 +803,32 @@ qemuMonitorOpen(virDomainObjPtr vm, * * Registers the monitor in the event loop. The caller has to hold the * lock for @mon. - * - * Returns true in case of success, false otherwise */ -bool +void qemuMonitorRegister(qemuMonitorPtr mon) { - virObjectRef(mon); - if ((mon->watch = virEventAddHandle(mon->fd, - VIR_EVENT_HANDLE_HANGUP | - VIR_EVENT_HANDLE_ERROR | - VIR_EVENT_HANDLE_READABLE, - qemuMonitorIO, - mon, - virObjectFreeCallback)) < 0) { - virObjectUnref(mon); - return false; + GIOCondition cond = 0; + + if (mon->lastError.code == VIR_ERR_OK) { + cond |= G_IO_IN; + + if ((mon->msg && mon->msg->txOffset < mon->msg->txLength) && + !mon->waitGreeting) + cond |= G_IO_OUT; } - return true; + mon->watch = g_socket_create_source(mon->socket, + cond, + NULL); + + virObjectRef(mon); + g_source_set_callback(mon->watch, + (GSourceFunc)qemuMonitorIO, + mon, + NULL); + + g_source_attach(mon->watch, + mon->context); } @@ -856,8 +836,9 @@ void qemuMonitorUnregister(qemuMonitorPtr mon) { if (mon->watch) { - virEventRemoveHandle(mon->watch); - mon->watch = 0; + g_source_destroy(mon->watch); + g_source_unref(mon->watch); + mon->watch = NULL; } } @@ -873,9 +854,11 @@ qemuMonitorClose(qemuMonitorPtr mon) qemuMonitorSetDomainLogLocked(mon, NULL, NULL, NULL); - if (mon->fd >= 0) { + if (mon->socket) { qemuMonitorUnregister(mon); - VIR_FORCE_CLOSE(mon->fd); + g_object_unref(mon->socket); + mon->socket = NULL; + mon->fd = -1; } /* In case another thread is waiting for its monitor command to be diff --git a/src/qemu/qemu_monitor.h b/src/qemu/qemu_monitor.h index c84cd425df..dd2aaa4691 100644 --- a/src/qemu/qemu_monitor.h +++ b/src/qemu/qemu_monitor.h @@ -391,11 +391,12 @@ qemuMonitorPtr qemuMonitorOpen(virDomainObjPtr vm, virDomainChrSourceDefPtr config, bool retry, unsigned long long timeout, + GMainContext *context, qemuMonitorCallbacksPtr cb, void *opaque) ATTRIBUTE_NONNULL(1) ATTRIBUTE_NONNULL(2) ATTRIBUTE_NONNULL(5); -bool qemuMonitorRegister(qemuMonitorPtr mon) +void qemuMonitorRegister(qemuMonitorPtr mon) ATTRIBUTE_NONNULL(1); void qemuMonitorUnregister(qemuMonitorPtr mon) ATTRIBUTE_NONNULL(1); diff --git a/src/qemu/qemu_process.c b/src/qemu/qemu_process.c index 7475813e9f..bc57474bdc 100644 --- a/src/qemu/qemu_process.c +++ b/src/qemu/qemu_process.c @@ -1976,6 +1976,7 @@ qemuConnectMonitor(virQEMUDriverPtr driver, virDomainObjPtr vm, int asyncJob, priv->monConfig, retry, timeout, + virEventThreadGetContext(priv->eventThread), &monitorCallbacks, driver); @@ -8602,8 +8603,9 @@ qemuProcessQMPConnectMonitor(qemuProcessQMPPtr proc) proc->vm->pid = proc->pid; - if (!(proc->mon = qemuMonitorOpen(proc->vm, &monConfig, true, - 0, &callbacks, NULL))) + if (!(proc->mon = qemuMonitorOpen(proc->vm, &monConfig, true, 0, + virEventThreadGetContext(proc->eventThread), + &callbacks, NULL))) goto cleanup; virObjectLock(proc->mon); diff --git a/tests/qemumonitortestutils.c b/tests/qemumonitortestutils.c index a1641050ea..3efdea9cce 100644 --- a/tests/qemumonitortestutils.c +++ b/tests/qemumonitortestutils.c @@ -1171,6 +1171,7 @@ qemuMonitorTestNew(virDomainXMLOptionPtr xmlopt, &src, true, 0, + virEventThreadGetContext(test->eventThread), &qemuMonitorTestCallbacks, driver))) goto error; -- 2.24.1