This converts the QEMU agent APIs to use the per-VM event loop, which involves switching from virEvent APIs to GMainContext / GSource APIs. A GSocket is used as a convenient way to create a GSource for a socket, but is not yet used for actual I/O. Signed-off-by: Daniel P. Berrangé <berrange@xxxxxxxxxx> --- src/qemu/qemu_agent.c | 146 +++++++++++++++++++---------------- src/qemu/qemu_agent.h | 1 + src/qemu/qemu_process.c | 1 + tests/qemumonitortestutils.c | 1 + 4 files changed, 84 insertions(+), 65 deletions(-) diff --git a/src/qemu/qemu_agent.c b/src/qemu/qemu_agent.c index 72ea159a9c..1655e26212 100644 --- a/src/qemu/qemu_agent.c +++ b/src/qemu/qemu_agent.c @@ -25,6 +25,7 @@ #include <unistd.h> #include <fcntl.h> #include <sys/time.h> +#include <gio/gio.h> #include "qemu_agent.h" #include "qemu_domain.h" @@ -101,7 +102,10 @@ struct _qemuAgent { virCond notify; int fd; - int watch; + + GMainContext *context; + GSocket *socket; + GSource *watch; bool running; @@ -172,6 +176,7 @@ static void qemuAgentDispose(void *obj) (agent->cb->destroy)(agent, agent->vm); virCondDestroy(&agent->notify); VIR_FREE(agent->buffer); + g_main_context_unref(agent->context); virResetError(&agent->lastError); } @@ -188,13 +193,6 @@ qemuAgentOpenUnix(const char *socketpath) return -1; } - if (virSetNonBlock(agentfd) < 0) { - virReportSystemError(errno, "%s", - _("Unable to put monitor " - "into non-blocking mode")); - goto error; - } - if (virSetCloseExec(agentfd) < 0) { virReportSystemError(errno, "%s", _("Unable to set agent " @@ -498,28 +496,62 @@ qemuAgentIORead(qemuAgentPtr agent) } -static void qemuAgentUpdateWatch(qemuAgentPtr agent) -{ - int events = - VIR_EVENT_HANDLE_HANGUP | - VIR_EVENT_HANDLE_ERROR; +static gboolean +qemuAgentIO(GSocket *socket, + GIOCondition cond, + gpointer opaque); - if (!agent->watch) - return; + +static void +qemuAgentRegister(qemuAgentPtr agent) +{ + GIOCondition cond = 0; if (agent->lastError.code == VIR_ERR_OK) { - events |= VIR_EVENT_HANDLE_READABLE; + cond |= G_IO_IN; if (agent->msg && agent->msg->txOffset < agent->msg->txLength) - events |= VIR_EVENT_HANDLE_WRITABLE; + cond |= G_IO_OUT; } - virEventUpdateHandle(agent->watch, events); + agent->watch = g_socket_create_source(agent->socket, + cond, + NULL); + + virObjectRef(agent); + g_source_set_callback(agent->watch, + (GSourceFunc)qemuAgentIO, + agent, + NULL); + + g_source_attach(agent->watch, + agent->context); } static void -qemuAgentIO(int watch, int fd, int events, void *opaque) +qemuAgentUnregister(qemuAgentPtr agent) +{ + if (agent->watch) { + g_source_destroy(agent->watch); + g_source_unref(agent->watch); + agent->watch = NULL; + } +} + + +static void qemuAgentUpdateWatch(qemuAgentPtr agent) +{ + qemuAgentUnregister(agent); + if (agent->socket) + qemuAgentRegister(agent); +} + + +static gboolean +qemuAgentIO(GSocket *socket G_GNUC_UNUSED, + GIOCondition cond, + gpointer opaque) { qemuAgentPtr agent = opaque; bool error = false; @@ -529,45 +561,36 @@ qemuAgentIO(int watch, int fd, int events, void *opaque) /* lock access to the agent and protect fd */ virObjectLock(agent); #if DEBUG_IO - VIR_DEBUG("Agent %p I/O on watch %d fd %d events %d", agent, watch, fd, events); + VIR_DEBUG("Agent %p I/O on watch %d socket %p cond %d", agent, agent->socket, cond); #endif - if (agent->fd == -1 || agent->watch == 0) { + if (agent->fd == -1 || !agent->watch) { virObjectUnlock(agent); virObjectUnref(agent); - return; + return G_SOURCE_REMOVE; } - if (agent->fd != fd || agent->watch != watch) { - if (events & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR)) - eof = true; - virReportError(VIR_ERR_INTERNAL_ERROR, - _("event from unexpected fd %d!=%d / watch %d!=%d"), - agent->fd, fd, agent->watch, watch); - error = true; - } else if (agent->lastError.code != VIR_ERR_OK) { - if (events & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR)) + if (agent->lastError.code != VIR_ERR_OK) { + if (cond & (G_IO_HUP | G_IO_ERR)) eof = true; error = true; } else { - if (events & VIR_EVENT_HANDLE_WRITABLE) { + if (cond & G_IO_OUT) { if (qemuAgentIOWrite(agent) < 0) error = true; - events &= ~VIR_EVENT_HANDLE_WRITABLE; } if (!error && - events & VIR_EVENT_HANDLE_READABLE) { + cond & G_IO_IN) { int got = qemuAgentIORead(agent); - events &= ~VIR_EVENT_HANDLE_READABLE; if (got < 0) { error = true; } else if (got == 0) { eof = true; } else { - /* Ignore hangup/error events if we read some data, to + /* Ignore hangup/error cond if we read some data, to * give time for that data to be consumed */ - events = 0; + cond = 0; if (qemuAgentIOProcess(agent) < 0) error = true; @@ -575,25 +598,17 @@ qemuAgentIO(int watch, int fd, int events, void *opaque) } if (!error && - events & VIR_EVENT_HANDLE_HANGUP) { + cond & G_IO_HUP) { virReportError(VIR_ERR_INTERNAL_ERROR, "%s", _("End of file from agent socket")); eof = true; - events &= ~VIR_EVENT_HANDLE_HANGUP; } if (!error && !eof && - events & VIR_EVENT_HANDLE_ERROR) { + cond & G_IO_ERR) { virReportError(VIR_ERR_INTERNAL_ERROR, "%s", _("Invalid file descriptor while waiting for agent")); eof = true; - events &= ~VIR_EVENT_HANDLE_ERROR; - } - if (!error && events) { - virReportError(VIR_ERR_INTERNAL_ERROR, - _("Unhandled event %d for agent fd %d"), - events, agent->fd); - error = true; } } @@ -649,15 +664,19 @@ qemuAgentIO(int watch, int fd, int events, void *opaque) virObjectUnlock(agent); virObjectUnref(agent); } + + return G_SOURCE_REMOVE; } qemuAgentPtr qemuAgentOpen(virDomainObjPtr vm, const virDomainChrSourceDef *config, + GMainContext *context, qemuAgentCallbacksPtr cb) { qemuAgentPtr agent; + g_autoptr(GError) gerr = NULL; if (!cb || !cb->eofNotify) { virReportError(VIR_ERR_INTERNAL_ERROR, "%s", @@ -693,22 +712,20 @@ qemuAgentOpen(virDomainObjPtr vm, if (agent->fd == -1) goto cleanup; - virObjectRef(agent); - if ((agent->watch = virEventAddHandle(agent->fd, - VIR_EVENT_HANDLE_HANGUP | - VIR_EVENT_HANDLE_ERROR | - VIR_EVENT_HANDLE_READABLE, - qemuAgentIO, - agent, - virObjectFreeCallback)) < 0) { - virObjectUnref(agent); - virReportError(VIR_ERR_INTERNAL_ERROR, "%s", - _("unable to register agent events")); + agent->context = g_main_context_ref(context); + + agent->socket = g_socket_new_from_fd(agent->fd, &gerr); + if (!agent->socket) { + virReportError(VIR_ERR_INTERNAL_ERROR, + _("Unable to create socket object: %s"), + gerr->message); goto cleanup; } + qemuAgentRegister(agent); + agent->running = true; - VIR_DEBUG("New agent %p fd =%d watch=%d", agent, agent->fd, agent->watch); + VIR_DEBUG("New agent %p fd=%d", agent, agent->fd); return agent; @@ -763,12 +780,11 @@ void qemuAgentClose(qemuAgentPtr agent) virObjectLock(agent); - if (agent->fd >= 0) { - if (agent->watch) { - virEventRemoveHandle(agent->watch); - agent->watch = 0; - } - VIR_FORCE_CLOSE(agent->fd); + if (agent->socket) { + qemuAgentUnregister(agent); + g_object_unref(agent->socket); + agent->socket = NULL; + agent->fd = -1; } qemuAgentNotifyCloseLocked(agent); diff --git a/src/qemu/qemu_agent.h b/src/qemu/qemu_agent.h index 5656fe60ff..d4d8615323 100644 --- a/src/qemu/qemu_agent.h +++ b/src/qemu/qemu_agent.h @@ -41,6 +41,7 @@ struct _qemuAgentCallbacks { qemuAgentPtr qemuAgentOpen(virDomainObjPtr vm, const virDomainChrSourceDef *config, + GMainContext *context, qemuAgentCallbacksPtr cb); void qemuAgentClose(qemuAgentPtr mon); diff --git a/src/qemu/qemu_process.c b/src/qemu/qemu_process.c index ea046bcb14..a235064320 100644 --- a/src/qemu/qemu_process.c +++ b/src/qemu/qemu_process.c @@ -237,6 +237,7 @@ qemuConnectAgent(virQEMUDriverPtr driver, virDomainObjPtr vm) agent = qemuAgentOpen(vm, config->source, + virEventThreadGetContext(priv->eventThread), &agentCallbacks); virObjectLock(vm); diff --git a/tests/qemumonitortestutils.c b/tests/qemumonitortestutils.c index df93aae758..328bfb8525 100644 --- a/tests/qemumonitortestutils.c +++ b/tests/qemumonitortestutils.c @@ -1406,6 +1406,7 @@ qemuMonitorTestNewAgent(virDomainXMLOptionPtr xmlopt) if (!(test->agent = qemuAgentOpen(test->vm, &src, + virEventThreadGetContext(test->eventThread), &qemuMonitorTestAgentCallbacks))) goto error; -- 2.24.1