Current libvirt qemu monitor can not handle many send command requests and it does not need to as misc job conditions protect him from this case. As there is an intention to replace nested jobs concept by mere serialization at monitor level let's add serialization to monitor as a preparation step. Let's add job tracking code so that in case of request jams we can get nice info on blockers just as domain job acquiring code does. This patch introduces distinct mutex condition for serializing queries. This way we can still wake up only senders on qemu response and wake up waiters when sender finishes. Typically we wake up only one waiter, in case of error conditions all waiters will wakeup and fail. --- src/qemu/qemu_monitor.c | 66 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/src/qemu/qemu_monitor.c b/src/qemu/qemu_monitor.c index 3f86887..f402300 100644 --- a/src/qemu/qemu_monitor.c +++ b/src/qemu/qemu_monitor.c @@ -43,6 +43,7 @@ #include "virprobe.h" #include "virstring.h" #include "virtime.h" +#include "virthreadjob.h" #ifdef WITH_DTRACE_PROBES # include "libvirt_qemu_probes.h" @@ -99,6 +100,11 @@ struct _qemuMonitor { qemuMonitorReportDomainLogError logFunc; void *logOpaque; virFreeCallback logDestroy; + + unsigned long long owner; + const char *ownerAPI; + unsigned long long started; + virCond queueCond; }; /** @@ -320,6 +326,7 @@ qemuMonitorDispose(void *obj) virResetError(&mon->lastError); virCondDestroy(&mon->notify); + virCondDestroy(&mon->queueCond); VIR_FREE(mon->buffer); virJSONValueFree(mon->options); VIR_FREE(mon->balloonpath); @@ -813,6 +820,11 @@ qemuMonitorOpenInternal(virDomainObjPtr vm, _("cannot initialize monitor condition")); goto cleanup; } + if (virCondInit(&mon->queueCond) < 0) { + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("cannot initialize monitor queue condition")); + goto cleanup; + } mon->fd = fd; mon->hasSendFD = hasSendFD; mon->vm = virObjectRef(vm); @@ -986,12 +998,52 @@ qemuMonitorNextCommandID(qemuMonitorPtr mon) return id; } +#define QEMU_COMMAND_WAIT_TIME (1000ull * 30) int qemuMonitorSend(qemuMonitorPtr mon, qemuMonitorMessagePtr msg) { int ret = -1; + unsigned long long now; + unsigned long long then; + + if (virTimeMillisNow(&now) < 0) + return -1; + then = now + QEMU_COMMAND_WAIT_TIME; + + while (mon->msg && mon->lastError.code == VIR_ERR_OK) { + VIR_DEBUG("Waiting for monitor lock: mon=%p, msg=%s, fd=%d", + mon, msg->txBuffer, msg->txFD); + + if (virCondWaitUntil(&mon->queueCond, &mon->parent.lock, then) < 0) { + if (errno == ETIMEDOUT) { + if (mon->ownerAPI) { + virReportError(VIR_ERR_OPERATION_TIMEOUT, + _("cannot acquire monitor lock (held by %s)"), + mon->ownerAPI); + } else { + virReportError(VIR_ERR_OPERATION_TIMEOUT, "%s", + _("cannot acquire monitor lock")); + } + } else { + virReportSystemError(errno, "%s", + _("cannot acquire monitor lock")); + } + + now = 0; + ignore_value(virTimeMillisNow(&now)); + VIR_WARN("Cannot send message: mon=%p, msg=%s, fd=%d" + " blocker: msg=%s, fd=%d" + " (thread: %llu, API: %s," " duration: %llu)", + mon, msg->txBuffer, msg->txFD, + mon->msg->txBuffer, mon->msg->txFD, + mon->owner, NULLSTR(mon->ownerAPI), + now && mon->started ? (now - mon->started) / 1000 : 0); + + return -1; + } + } /* Check whether qemu quit unexpectedly */ if (mon->lastError.code != VIR_ERR_OK) { @@ -1001,6 +1053,11 @@ qemuMonitorSend(qemuMonitorPtr mon, return -1; } + now = 0; + ignore_value(virTimeMillisNow(&now)); + mon->ownerAPI = virThreadJobGet(); + mon->owner = virThreadSelfID(); + mon->started = now; mon->msg = msg; qemuMonitorUpdateWatch(mon); @@ -1027,6 +1084,15 @@ qemuMonitorSend(qemuMonitorPtr mon, cleanup: mon->msg = NULL; + mon->ownerAPI = NULL; + mon->owner = 0; + mon->started = 0; + + if (mon->lastError.code == VIR_ERR_OK) + virCondSignal(&mon->queueCond); + else + virCondBroadcast(&mon->queueCond); + qemuMonitorUpdateWatch(mon); return ret; -- 1.8.3.1 -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list