Add support for: starting/stopping full device streaming, streaming a single sector, and getting the status of streaming. These operations are done by using the 'stream' and 'info stream' qemu monitor commands. * src/qemu/qemu_driver.c src/qemu/qemu_monitor_text.[ch]: implement disk streaming by using the stream and info stream text monitor commands * src/qemu/qemu_monitor_json.[ch]: implement commands using the qmp monitor Signed-off-by: Adam Litke <agl@xxxxxxxxxx> --- src/qemu/qemu_driver.c | 77 ++++++++++++++++++++- src/qemu/qemu_monitor.c | 41 +++++++++++ src/qemu/qemu_monitor.h | 6 ++ src/qemu/qemu_monitor_json.c | 104 ++++++++++++++++++++++++++++ src/qemu/qemu_monitor_json.h | 7 ++ src/qemu/qemu_monitor_text.c | 156 ++++++++++++++++++++++++++++++++++++++++++ src/qemu/qemu_monitor_text.h | 8 ++ 7 files changed, 397 insertions(+), 2 deletions(-) diff --git a/src/qemu/qemu_driver.c b/src/qemu/qemu_driver.c index dbde9e7..d7c049a 100644 --- a/src/qemu/qemu_driver.c +++ b/src/qemu/qemu_driver.c @@ -13143,6 +13143,79 @@ cleanup: return ret; } +static unsigned long long +qemudDomainStreamDisk (virDomainPtr dom, const char *path, + unsigned long long offset, unsigned int flags) +{ + struct qemud_driver *driver = dom->conn->privateData; + virDomainObjPtr vm; + unsigned long long ret = -1; + + qemuDriverLock(driver); + vm = virDomainFindByUUID(&driver->domains, dom->uuid); + qemuDriverUnlock(driver); + + if (!vm) { + char uuidstr[VIR_UUID_STRING_BUFLEN]; + virUUIDFormat(dom->uuid, uuidstr); + qemuReportError(VIR_ERR_NO_DOMAIN, + _("no domain with matching uuid '%s'"), uuidstr); + goto cleanup; + } + + if (virDomainObjIsActive(vm)) { + qemuDomainObjPrivatePtr priv = vm->privateData; + qemuDomainObjEnterMonitor(vm); + ret = qemuMonitorStreamDisk(priv->mon, path, offset, flags); + qemuDomainObjExitMonitor(vm); + } else { + qemuReportError(VIR_ERR_OPERATION_INVALID, + "%s", _("domain is not running")); + } + +cleanup: + if (vm) + virDomainObjUnlock(vm); + return ret; +} + +static int +qemudDomainStreamDiskInfo (virDomainPtr dom, virStreamDiskStatePtr states, + unsigned int nr_states, + unsigned int flags ATTRIBUTE_UNUSED) +{ + struct qemud_driver *driver = dom->conn->privateData; + virDomainObjPtr vm; + unsigned int ret = -1; + + qemuDriverLock(driver); + vm = virDomainFindByUUID(&driver->domains, dom->uuid); + qemuDriverUnlock(driver); + + if (!vm) { + char uuidstr[VIR_UUID_STRING_BUFLEN]; + virUUIDFormat(dom->uuid, uuidstr); + qemuReportError(VIR_ERR_NO_DOMAIN, + _("no domain with matching uuid '%s'"), uuidstr); + goto cleanup; + } + + if (virDomainObjIsActive(vm)) { + qemuDomainObjPrivatePtr priv = vm->privateData; + qemuDomainObjEnterMonitor(vm); + ret = qemuMonitorStreamDiskInfo(priv->mon, states, nr_states); + qemuDomainObjExitMonitor(vm); + } else { + qemuReportError(VIR_ERR_OPERATION_INVALID, + "%s", _("domain is not running")); + } + +cleanup: + if (vm) + virDomainObjUnlock(vm); + return ret; +} + static int qemuDomainMonitorCommand(virDomainPtr domain, const char *cmd, char **result, unsigned int flags) { @@ -13298,8 +13371,8 @@ static virDriver qemuDriver = { qemuDomainMonitorCommand, /* qemuDomainMonitorCommand */ qemuDomainSetMemoryParameters, /* domainSetMemoryParameters */ qemuDomainGetMemoryParameters, /* domainGetMemoryParameters */ - NULL, /* domainStreamDisk */ - NULL, /* domainStreamDiskInfo */ + qemudDomainStreamDisk, /* domainStreamDisk */ + qemudDomainStreamDiskInfo, /* domainStreamDiskInfo */ }; diff --git a/src/qemu/qemu_monitor.c b/src/qemu/qemu_monitor.c index 2366fdb..9169e23 100644 --- a/src/qemu/qemu_monitor.c +++ b/src/qemu/qemu_monitor.c @@ -1917,6 +1917,47 @@ int qemuMonitorDeleteSnapshot(qemuMonitorPtr mon, const char *name) return ret; } +unsigned long long +qemuMonitorStreamDisk(qemuMonitorPtr mon, const char *path, + unsigned long long offset, unsigned int flags) +{ + unsigned long long ret; + + DEBUG("mon=%p, path=%p, offset=%llu, flags=%u", mon, path, offset, flags); + + if (!mon) { + qemuReportError(VIR_ERR_INVALID_ARG, "%s", + _("monitor must not be NULL")); + return -1; + } + + if (mon->json) + ret = qemuMonitorJSONStreamDisk(mon, path, offset, flags); + else + ret = qemuMonitorTextStreamDisk(mon, path, offset, flags); + return ret; +} + +int qemuMonitorStreamDiskInfo(qemuMonitorPtr mon, virStreamDiskStatePtr states, + unsigned int nr_states) +{ + int ret; + + DEBUG("mon=%p, states=%p, nr_states=%u", mon, states, nr_states); + + if (!mon) { + qemuReportError(VIR_ERR_INVALID_ARG, "%s", + _("monitor must not be NULL")); + return -1; + } + + if (mon->json) + ret = qemuMonitorJSONStreamDiskInfo(mon, states, nr_states); + else + ret = qemuMonitorTextStreamDiskInfo(mon, states, nr_states); + return ret; +} + int qemuMonitorArbitraryCommand(qemuMonitorPtr mon, const char *cmd, char **reply) { int ret; diff --git a/src/qemu/qemu_monitor.h b/src/qemu/qemu_monitor.h index 7d09145..719de76 100644 --- a/src/qemu/qemu_monitor.h +++ b/src/qemu/qemu_monitor.h @@ -389,6 +389,12 @@ int qemuMonitorCreateSnapshot(qemuMonitorPtr mon, const char *name); int qemuMonitorLoadSnapshot(qemuMonitorPtr mon, const char *name); int qemuMonitorDeleteSnapshot(qemuMonitorPtr mon, const char *name); +unsigned long long +qemuMonitorStreamDisk(qemuMonitorPtr mon, const char *path, + unsigned long long offset, unsigned int flags); +int qemuMonitorStreamDiskInfo(qemuMonitorPtr mon, virStreamDiskStatePtr states, + unsigned int nr_states); + int qemuMonitorArbitraryCommand(qemuMonitorPtr mon, const char *cmd, char **reply); /** diff --git a/src/qemu/qemu_monitor_json.c b/src/qemu/qemu_monitor_json.c index d2c6f0a..15e0c5b 100644 --- a/src/qemu/qemu_monitor_json.c +++ b/src/qemu/qemu_monitor_json.c @@ -2342,6 +2342,110 @@ int qemuMonitorJSONDeleteSnapshot(qemuMonitorPtr mon, const char *name) return ret; } +static int qemuMonitorJSONExtractStreamState(virJSONValuePtr reply, + virStreamDiskStatePtr state) +{ + virJSONValuePtr data; + int ret = -1; + const char *path; + unsigned long long offset, size; + + if (!(data = virJSONValueObjectGet(reply, "return"))) { + qemuReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("stream reply was missing return data")); + goto cleanup; + } + + if ((path = virJSONValueObjectGetString(data, "device"))) { + if (!virJSONValueObjectGetNumberUlong(data, "offset", &offset)) { + qemuReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("stream reply was missing offset")); + goto cleanup; + } + if (!virJSONValueObjectGetNumberUlong(data, "size", &size)) { + qemuReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("stream reply was missing size")); + goto cleanup; + } + + memcpy(state->path, path, strlen(path)); + state->offset = offset; + state->size = size; + ret = 1; + } else { + /* No currently active streams */ + ret = 0; + } + +cleanup: + return ret; +} + +unsigned long long +qemuMonitorJSONStreamDisk(qemuMonitorPtr mon, const char *path, + unsigned long long offset, unsigned int flags) +{ + virJSONValuePtr cmd = NULL; + virJSONValuePtr reply = NULL; + struct _virStreamDiskState state; + int rc; + unsigned long long ret = -1; + + if (flags == VIR_STREAM_DISK_START) + cmd = qemuMonitorJSONMakeCommand("stream", "b:all", "true", + "s:device", path, NULL); + else if (flags == VIR_STREAM_DISK_STOP) + cmd = qemuMonitorJSONMakeCommand("stream", "b:stop", "true", + "s:device", path, NULL); + else if (flags == VIR_STREAM_DISK_ONE) + cmd = qemuMonitorJSONMakeCommand("stream", "s:device", path, + "i:offset", offset, NULL); + else + qemuReportError(VIR_ERR_INTERNAL_ERROR, "Invalid argument for flags: " + "%u", flags); + + if (!cmd) + return -1; + + if (qemuMonitorJSONCommand(mon, cmd, &reply) < 0) + goto cleanup; + rc = qemuMonitorJSONExtractStreamState(reply, &state); + if (rc == 0 && (flags == VIR_STREAM_DISK_START || + flags == VIR_STREAM_DISK_STOP)) + ret = 0; + if (rc == 1 && flags == VIR_STREAM_DISK_ONE) + ret = state.offset; + +cleanup: + virJSONValueFree(cmd); + virJSONValueFree(reply); + return ret; +} + +int qemuMonitorJSONStreamDiskInfo(qemuMonitorPtr mon, + virStreamDiskStatePtr states, + unsigned int nr_states) +{ + virJSONValuePtr cmd = NULL; + virJSONValuePtr reply = NULL; + int ret = -1; + + /* Qemu only supports one stream at a time */ + nr_states = 1; + + cmd = qemuMonitorJSONMakeCommand("query-stream", NULL); + if (!cmd) + return -1; + + if (qemuMonitorJSONCommand(mon, cmd, &reply) < 0) + goto cleanup; + ret = qemuMonitorJSONExtractStreamState(reply, states); +cleanup: + virJSONValueFree(cmd); + virJSONValueFree(reply); + return ret; +} + int qemuMonitorJSONArbitraryCommand(qemuMonitorPtr mon, const char *cmd_str, char **reply_str) diff --git a/src/qemu/qemu_monitor_json.h b/src/qemu/qemu_monitor_json.h index 94806c1..f4db2b4 100644 --- a/src/qemu/qemu_monitor_json.h +++ b/src/qemu/qemu_monitor_json.h @@ -196,6 +196,13 @@ int qemuMonitorJSONCreateSnapshot(qemuMonitorPtr mon, const char *name); int qemuMonitorJSONLoadSnapshot(qemuMonitorPtr mon, const char *name); int qemuMonitorJSONDeleteSnapshot(qemuMonitorPtr mon, const char *name); +unsigned long long +qemuMonitorJSONStreamDisk(qemuMonitorPtr mon, const char *path, + unsigned long long offset, unsigned int flags); +int qemuMonitorJSONStreamDiskInfo(qemuMonitorPtr mon, + virStreamDiskStatePtr states, + unsigned int nr_states); + int qemuMonitorJSONArbitraryCommand(qemuMonitorPtr mon, const char *cmd_str, char **reply_str); diff --git a/src/qemu/qemu_monitor_text.c b/src/qemu/qemu_monitor_text.c index d7e128c..115b220 100644 --- a/src/qemu/qemu_monitor_text.c +++ b/src/qemu/qemu_monitor_text.c @@ -2569,6 +2569,162 @@ cleanup: return ret; } +static int qemuMonitorParseStreamInfo(char *text, + virStreamDiskStatePtr state) +{ + char *p; + unsigned long long data; + unsigned int device_len; + + memset(state->path, 0, VIR_STREAM_PATH_BUFLEN); + state->offset = 0; + state->size = 0; + + if (strstr(text, "Device '") && strstr(text, "' not found")) { + qemuReportError(VIR_ERR_OPERATION_INVALID, "%s", _("Device not found")); + return -1; + } + + if (strstr(text, "expects a sector size less than device length")) { + qemuReportError(VIR_ERR_OPERATION_INVALID, "%s", + _("Offset parameter is greater than the device size")); + return -1; + } + + if (strstr(text, "Device '") && strstr(text, "' is in use")) { + qemuReportError(VIR_ERR_OPERATION_FAILED, "%s", + _("Another streaming operation is in progress")); + return -1; + } + + if (strstr(text, "No active stream") || STREQ(text, "")) + return 0; + + if ((text = STRSKIP(text, "Streaming device ")) == NULL) + return -1; + + /* Parse the device path */ + p = strstr(text, ": Completed "); + if (!p) + return -1; + + device_len = (unsigned int)(p - text); + if (device_len >= VIR_STREAM_PATH_BUFLEN) { + qemuReportError(VIR_ERR_OPERATION_FAILED, "%s", + "Device name is too long"); + return -1; + } + + if (sprintf((char *)&state->path, "%.*s", device_len, text) < 0) { + qemuReportError(VIR_ERR_OPERATION_FAILED, "%s", + "Unable to store device name"); + return -1; + } + text = p + 12; /* Skip over ": Completed " */ + + /* Parse the current sector offset */ + if (virStrToLong_ull (text, &p, 10, &data)) + return -1; + state->offset = (size_t) data; + text = p; + + /* Parse the total number of sectors */ + if (!STRPREFIX(text, " of ")) + return -1; + text += 4; + if (virStrToLong_ull (text, &p, 10, &data)) + return -1; + state->size = (size_t) data; + text = p; + + /* Verify the ending */ + if (!STRPREFIX(text, " sectors")) + return -1; + + return 1; +} + +unsigned long long +qemuMonitorTextStreamDisk(qemuMonitorPtr mon, const char *path, + unsigned long long offset, unsigned int flags) +{ + char *cmd; + char *reply = NULL; + int rc; + unsigned long long ret = -1; + virStreamDiskState state; + + if (flags == VIR_STREAM_DISK_START) + rc = virAsprintf(&cmd, "stream -a %s", path); + else if (flags == VIR_STREAM_DISK_STOP) + rc = virAsprintf(&cmd, "stream -s %s", path); + else if (flags == VIR_STREAM_DISK_ONE) + rc = virAsprintf(&cmd, "stream %s %llu", path, offset); + else { + qemuReportError(VIR_ERR_OPERATION_INVALID, "%s%u", + _("invalid value for flags: "), flags); + return -1; + } + + if (rc < 0) { + virReportOOMError(); + return -1; + } + + if (qemuMonitorCommand(mon, cmd, &reply)) { + qemuReportError(VIR_ERR_OPERATION_FAILED, + _("failed to perform stream command '%s'"), + cmd); + goto cleanup; + } + + rc = qemuMonitorParseStreamInfo(reply, &state); + if (rc == 0 && (flags == VIR_STREAM_DISK_START || + flags == VIR_STREAM_DISK_STOP)) + ret = 0; /* A successful full disk start or stop produces no output */ + if (rc == 1 && flags == VIR_STREAM_DISK_ONE) + ret = state.offset; + +cleanup: + VIR_FREE(cmd); + VIR_FREE(reply); + return ret; +} + +int qemuMonitorTextStreamDiskInfo(qemuMonitorPtr mon, + virStreamDiskStatePtr states, + unsigned int nr_states) +{ + char *cmd; + char *reply = NULL; + int ret = -1; + + /* Qemu only supports one stream at a time */ + nr_states = 1; + + if (virAsprintf(&cmd, "info stream") < 0) { + virReportOOMError(); + return -1; + } + + if (qemuMonitorCommand(mon, cmd, &reply)) { + qemuReportError(VIR_ERR_OPERATION_FAILED, + _("failed to perform stream command '%s'"), + cmd); + goto cleanup; + } + + ret = qemuMonitorParseStreamInfo(reply, states); + if (ret == -1) + qemuReportError(VIR_ERR_OPERATION_FAILED, + _("Failed to parse monitor output: '%s'"), reply); + +cleanup: + VIR_FREE(cmd); + VIR_FREE(reply); + return ret; +} + int qemuMonitorTextArbitraryCommand(qemuMonitorPtr mon, const char *cmd, char **reply) { diff --git a/src/qemu/qemu_monitor_text.h b/src/qemu/qemu_monitor_text.h index c017509..4d4aaa3 100644 --- a/src/qemu/qemu_monitor_text.h +++ b/src/qemu/qemu_monitor_text.h @@ -194,6 +194,14 @@ int qemuMonitorTextCreateSnapshot(qemuMonitorPtr mon, const char *name); int qemuMonitorTextLoadSnapshot(qemuMonitorPtr mon, const char *name); int qemuMonitorTextDeleteSnapshot(qemuMonitorPtr mon, const char *name); +unsigned long long +qemuMonitorTextStreamDisk(qemuMonitorPtr mon, const char *path, + unsigned long long offset, unsigned int flags); +int qemuMonitorTextStreamDiskInfo(qemuMonitorPtr mon, + virStreamDiskStatePtr states, + unsigned int nr_states); + + int qemuMonitorTextArbitraryCommand(qemuMonitorPtr mon, const char *cmd, char **reply); -- 1.7.3.2.164.g6f10c -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list