The virDomainBlockPull* family of commands are enabled by the following HMP/QMP commands: 'block_stream', 'block_job_cancel', 'info block-jobs' / 'query-block-jobs', and 'block_job_set_speed'. * src/qemu/qemu_driver.c src/qemu/qemu_monitor_text.[ch]: implement disk streaming by using the proper qemu monitor commands. * src/qemu/qemu_monitor_json.[ch]: implement commands using the qmp monitor Signed-off-by: Adam Litke <agl@xxxxxxxxxx> --- src/qemu/qemu_driver.c | 113 +++++++++++++++++++++++++++++ src/qemu/qemu_monitor.c | 18 +++++ src/qemu/qemu_monitor.h | 13 ++++ src/qemu/qemu_monitor_json.c | 147 ++++++++++++++++++++++++++++++++++++++ src/qemu/qemu_monitor_json.h | 5 ++ src/qemu/qemu_monitor_text.c | 162 ++++++++++++++++++++++++++++++++++++++++++ src/qemu/qemu_monitor_text.h | 6 ++ 7 files changed, 464 insertions(+), 0 deletions(-) diff --git a/src/qemu/qemu_driver.c b/src/qemu/qemu_driver.c index 8870e33..0f556a9 100644 --- a/src/qemu/qemu_driver.c +++ b/src/qemu/qemu_driver.c @@ -8493,6 +8493,115 @@ cleanup: return ret; } +static const char * +qemuDiskPathToAlias(virDomainObjPtr vm, const char *path) { + int i; + char *ret = NULL; + + for (i = 0 ; i < vm->def->ndisks ; i++) { + virDomainDiskDefPtr disk = vm->def->disks[i]; + + if (disk->type != VIR_DOMAIN_DISK_TYPE_BLOCK && + disk->type != VIR_DOMAIN_DISK_TYPE_FILE) + continue; + + if (disk->src != NULL && STREQ(disk->src, path)) { + if (virAsprintf(&ret, "drive-%s", disk->info.alias) < 0) { + virReportOOMError(); + return NULL; + } + break; + } + } + + if (!ret) { + qemuReportError(VIR_ERR_INVALID_ARG, + "%s", _("No device found for specified path")); + } + return ret; +} + +static int +qemuDomainBlockJobImpl(virDomainPtr dom, const char *path, + unsigned long bandwidth, virDomainBlockJobInfoPtr info, + int mode) +{ + struct qemud_driver *driver = dom->conn->privateData; + virDomainObjPtr vm = NULL; + qemuDomainObjPrivatePtr priv; + char uuidstr[VIR_UUID_STRING_BUFLEN]; + const char *device = NULL; + int ret = -1; + + qemuDriverLock(driver); + virUUIDFormat(dom->uuid, uuidstr); + vm = virDomainFindByUUID(&driver->domains, dom->uuid); + if (!vm) { + qemuReportError(VIR_ERR_NO_DOMAIN, + _("no domain with matching uuid '%s'"), uuidstr); + goto cleanup; + } + + if (!virDomainObjIsActive(vm)) { + qemuReportError(VIR_ERR_OPERATION_INVALID, + "%s", _("domain is not running")); + goto cleanup; + } + + device = qemuDiskPathToAlias(vm, path); + if (!device) { + goto cleanup; + } + + if (qemuDomainObjBeginJobWithDriver(driver, vm, QEMU_JOB_MODIFY) < 0) + goto cleanup; + ignore_value(qemuDomainObjEnterMonitorWithDriver(driver, vm)); + priv = vm->privateData; + ret = qemuMonitorBlockJob(priv->mon, device, bandwidth, info, mode); + qemuDomainObjExitMonitorWithDriver(driver, vm); + if (qemuDomainObjEndJob(driver, vm) == 0) { + vm = NULL; + goto cleanup; + } + +cleanup: + VIR_FREE(device); + if (vm) + virDomainObjUnlock(vm); + qemuDriverUnlock(driver); + return ret; +} + +static int +qemuDomainBlockJobAbort(virDomainPtr dom, const char *path, unsigned int flags) +{ + virCheckFlags(0, -1); + return qemuDomainBlockJobImpl(dom, path, 0, NULL, BLOCK_JOB_ABORT); +} + +static int +qemuDomainGetBlockJobInfo(virDomainPtr dom, const char *path, + virDomainBlockJobInfoPtr info, unsigned int flags) +{ + virCheckFlags(0, -1); + return qemuDomainBlockJobImpl(dom, path, 0, info, BLOCK_JOB_INFO); +} + +static int +qemuDomainBlockJobSetSpeed(virDomainPtr dom, const char *path, + unsigned long bandwidth, unsigned int flags) +{ + virCheckFlags(0, -1); + return qemuDomainBlockJobImpl(dom, path, bandwidth, NULL, BLOCK_JOB_SPEED); +} + +static int +qemuDomainBlockPull(virDomainPtr dom, const char *path, unsigned long bandwidth, + unsigned int flags) +{ + virCheckFlags(0, -1); + return qemuDomainBlockJobImpl(dom, path, bandwidth, NULL, BLOCK_JOB_PULL); +} static virDriver qemuDriver = { .no = VIR_DRV_QEMU, @@ -8619,6 +8728,10 @@ static virDriver qemuDriver = { .domainMigratePerform3 = qemuDomainMigratePerform3, /* 0.9.2 */ .domainMigrateFinish3 = qemuDomainMigrateFinish3, /* 0.9.2 */ .domainMigrateConfirm3 = qemuDomainMigrateConfirm3, /* 0.9.2 */ + .domainBlockJobAbort = qemuDomainBlockJobAbort, /* 0.9.4 */ + .domainGetBlockJobInfo = qemuDomainGetBlockJobInfo, /* 0.9.4 */ + .domainBlockJobSetSpeed = qemuDomainBlockJobSetSpeed, /* 0.9.4 */ + .domainBlockPull = qemuDomainBlockPull, /* 0.9.4 */ }; diff --git a/src/qemu/qemu_monitor.c b/src/qemu/qemu_monitor.c index 3a30a15..5c048eb 100644 --- a/src/qemu/qemu_monitor.c +++ b/src/qemu/qemu_monitor.c @@ -2427,3 +2427,21 @@ int qemuMonitorScreendump(qemuMonitorPtr mon, ret = qemuMonitorTextScreendump(mon, file); return ret; } + +int qemuMonitorBlockJob(qemuMonitorPtr mon, + const char *device, + unsigned long bandwidth, + virDomainBlockJobInfoPtr info, + int mode) +{ + int ret; + + VIR_DEBUG("mon=%p, device=%p, bandwidth=%lu, info=%p, mode=%o", + mon, device, bandwidth, info, mode); + + if (mon->json) + ret = qemuMonitorJSONBlockJob(mon, device, bandwidth, info, mode); + else + ret = qemuMonitorTextBlockJob(mon, device, bandwidth, info, mode); + return ret; +} diff --git a/src/qemu/qemu_monitor.h b/src/qemu/qemu_monitor.h index f246d21..c5d27ef 100644 --- a/src/qemu/qemu_monitor.h +++ b/src/qemu/qemu_monitor.h @@ -447,6 +447,19 @@ int qemuMonitorInjectNMI(qemuMonitorPtr mon); int qemuMonitorScreendump(qemuMonitorPtr mon, const char *file); +typedef enum { + BLOCK_JOB_ABORT = 0, + BLOCK_JOB_INFO = 1, + BLOCK_JOB_SPEED = 2, + BLOCK_JOB_PULL = 3, +} BLOCK_JOB_CMD; + +int qemuMonitorBlockJob(qemuMonitorPtr mon, + const char *device, + unsigned long bandwidth, + virDomainBlockJobInfoPtr info, + int mode); + /** * When running two dd process and using <> redirection, we need a * shell that will not truncate files. These two strings serve that diff --git a/src/qemu/qemu_monitor_json.c b/src/qemu/qemu_monitor_json.c index 4db2b78..e7163bb 100644 --- a/src/qemu/qemu_monitor_json.c +++ b/src/qemu/qemu_monitor_json.c @@ -2717,3 +2717,150 @@ int qemuMonitorJSONScreendump(qemuMonitorPtr mon, virJSONValueFree(reply); return ret; } + +static int qemuMonitorJSONGetBlockJobInfoOne(virJSONValuePtr entry, + const char *device, + virDomainBlockJobInfoPtr info) +{ + const char *this_dev; + const char *type; + unsigned long long speed_bytes; + + if ((this_dev = virJSONValueObjectGetString(entry, "device")) == NULL) { + qemuReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("entry was missing 'device'")); + return -1; + } + if (!STREQ(this_dev, device)) + return -1; + + type = virJSONValueObjectGetString(entry, "type"); + if (!type) { + qemuReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("entry was missing 'type'")); + return -1; + } + if (STREQ(type, "stream")) + info->type = VIR_DOMAIN_BLOCK_JOB_TYPE_PULL; + else + info->type = VIR_DOMAIN_BLOCK_JOB_TYPE_UNKNOWN; + + if (virJSONValueObjectGetNumberUlong(entry, "speed", &speed_bytes) < 0) { + qemuReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("entry was missing 'speed'")); + return -1; + } + info->bandwidth = speed_bytes / 1024ULL / 1024ULL; + + if (virJSONValueObjectGetNumberUlong(entry, "offset", &info->cur) < 0) { + qemuReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("entry was missing 'offset'")); + return -1; + } + + if (virJSONValueObjectGetNumberUlong(entry, "len", &info->end) < 0) { + qemuReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("entry was missing 'len'")); + return -1; + } + return 0; +} + +/** qemuMonitorJSONGetBlockJobInfo: + * Parse Block Job information. + * The reply is a JSON array of objects, one per active job. + */ +static int qemuMonitorJSONGetBlockJobInfo(virJSONValuePtr reply, + const char *device, + virDomainBlockJobInfoPtr info) +{ + virJSONValuePtr data; + int nr_results, i; + + if (!info) + return -1; + + if ((data = virJSONValueObjectGet(reply, "return")) == NULL) { + qemuReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("reply was missing return data")); + return -1; + } + + if (data->type != VIR_JSON_TYPE_ARRAY) { + qemuReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("urecognized format of block job information")); + return -1; + } + + if ((nr_results = virJSONValueArraySize(data)) < 0) { + qemuReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("unable to determine array size")); + return -1; + } + + for (i = 0; i < nr_results; i++) { + virJSONValuePtr entry = virJSONValueArrayGet(data, i); + if (qemuMonitorJSONGetBlockJobInfoOne(entry, device, info) == 0) + return 1; + } + + return 0; +} + + +int qemuMonitorJSONBlockJob(qemuMonitorPtr mon, + const char *device, + unsigned long bandwidth, + virDomainBlockJobInfoPtr info, + int mode) +{ + int ret = -1; + virJSONValuePtr cmd = NULL; + virJSONValuePtr reply = NULL; + + if (mode == BLOCK_JOB_ABORT) + cmd = qemuMonitorJSONMakeCommand("block_job_cancel", + "s:device", device, NULL); + else if (mode == BLOCK_JOB_INFO) + cmd = qemuMonitorJSONMakeCommand("query-block-jobs", NULL); + else if (mode == BLOCK_JOB_SPEED) + cmd = qemuMonitorJSONMakeCommand("block_job_set_speed", + "s:device", device, + "U:value", bandwidth * 1024ULL * 1024ULL, + NULL); + else if (mode == BLOCK_JOB_PULL) + cmd = qemuMonitorJSONMakeCommand("block_stream", + "s:device", device, NULL); + + if (!cmd) + return -1; + + ret = qemuMonitorJSONCommand(mon, cmd, &reply); + + if (ret == 0 && virJSONValueObjectHasKey(reply, "error")) { + if (qemuMonitorJSONHasError(reply, "DeviceNotActive")) + qemuReportError(VIR_ERR_OPERATION_INVALID, + _("No active operation on device: %s"), device); + else if (qemuMonitorJSONHasError(reply, "DeviceInUse")) + qemuReportError(VIR_ERR_OPERATION_FAILED, + _("Device %s in use"), device); + else if (qemuMonitorJSONHasError(reply, "NotSupported")) + qemuReportError(VIR_ERR_OPERATION_INVALID, + _("Operation is not supported for device: %s"), device); + else + qemuReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("Unexpected error")); + ret = -1; + } + + if (ret == 0 && mode == BLOCK_JOB_INFO) + ret = qemuMonitorJSONGetBlockJobInfo(reply, device, info); + + if (ret == 0 && mode == BLOCK_JOB_PULL && bandwidth != 0) + ret = qemuMonitorJSONBlockJob(mon, device, bandwidth, NULL, + BLOCK_JOB_SPEED); + + virJSONValueFree(cmd); + virJSONValueFree(reply); + return ret; +} diff --git a/src/qemu/qemu_monitor_json.h b/src/qemu/qemu_monitor_json.h index 380e26a..1804390 100644 --- a/src/qemu/qemu_monitor_json.h +++ b/src/qemu/qemu_monitor_json.h @@ -220,5 +220,10 @@ int qemuMonitorJSONInjectNMI(qemuMonitorPtr mon); int qemuMonitorJSONScreendump(qemuMonitorPtr mon, const char *file); +int qemuMonitorJSONBlockJob(qemuMonitorPtr mon, + const char *device, + unsigned long bandwidth, + virDomainBlockJobInfoPtr info, + int mode); #endif /* QEMU_MONITOR_JSON_H */ diff --git a/src/qemu/qemu_monitor_text.c b/src/qemu/qemu_monitor_text.c index 0965a08..c7632e2 100644 --- a/src/qemu/qemu_monitor_text.c +++ b/src/qemu/qemu_monitor_text.c @@ -2785,3 +2785,165 @@ cleanup: VIR_FREE(cmd); return ret; } + +static int qemuMonitorTextParseBlockJobOne(const char *text, + const char *device, + virDomainBlockJobInfoPtr info, + const char **next) +{ + virDomainBlockJobInfo tmp; + char *p; + unsigned long long speed_bytes; + int mismatch = 0; + + if (next == NULL) + return -1; + *next = NULL; + + /* + * Each active stream will appear on its own line in the following format: + * Streaming device <device>: Completed <cur> of <end> bytes + */ + if ((text = STRSKIP(text, "Streaming device ")) == NULL) + return -EINVAL; + + if (!STREQLEN(text, device, strlen(device))) + mismatch = 1; + + if ((text = strstr(text, ": Completed ")) == NULL) + return -EINVAL; + text += 11; + + if (virStrToLong_ull (text, &p, 10, &tmp.cur)) + return -EINVAL; + text = p; + + if (!STRPREFIX(text, " of ")) + return -EINVAL; + text += 4; + + if (virStrToLong_ull (text, &p, 10, &tmp.end)) + return -EINVAL; + text = p; + + if (!STRPREFIX(text, " bytes, speed limit ")) + return -EINVAL; + text += 20; + + if (virStrToLong_ull (text, &p, 10, &speed_bytes)) + return -EINVAL; + text = p; + + if (!STRPREFIX(text, " bytes/s")) + return -EINVAL; + + if (mismatch) { + *next = STRSKIP(text, "\n"); + return -EAGAIN; + } + + if (info) { + info->cur = tmp.cur; + info->end = tmp.end; + info->bandwidth = speed_bytes / 1024ULL / 1024ULL; + info->type = VIR_DOMAIN_BLOCK_JOB_TYPE_PULL; + } + return 1; +} + +static int qemuMonitorTextParseBlockJob(const char *text, + const char *device, + virDomainBlockJobInfoPtr info) +{ + const char *next = NULL; + int ret = 0; + + /* Check error: Device not found */ + if (strstr(text, "Device '") && strstr(text, "' not found")) { + qemuReportError(VIR_ERR_OPERATION_INVALID, "%s", _("Device not found")); + return -1; + } + + /* Check error: Job already active on this device */ + if (strstr(text, "Device '") && strstr(text, "' is in use")) { + qemuReportError(VIR_ERR_OPERATION_FAILED, _("Device %s in use"), + device); + return -1; + } + + /* Check error: Stop non-existent job */ + if (strstr(text, "has not been activated")) { + qemuReportError(VIR_ERR_OPERATION_INVALID,\ + _("No active operation on device: %s"), device); + return -1; + } + + /* This is not an error condition, there are just no results to report. */ + if (strstr(text, "No active jobs")) { + return 0; + } + + /* Check for unsupported operation */ + if (strstr(text, "Operation is not supported")) { + qemuReportError(VIR_ERR_OPERATION_INVALID, + _("Operation is not supported for device: %s"), device); + return -1; + } + + /* No output indicates success for Pull, JobAbort, and JobSetSpeed */ + if (STREQ(text, "")) + return 0; + + /* Now try to parse BlockJobInfo */ + do { + ret = qemuMonitorTextParseBlockJobOne(text, device, info, &next); + text = next; + } while (text && ret == -EAGAIN); + + if (ret < 0) + return -1; + return ret; +} + +int qemuMonitorTextBlockJob(qemuMonitorPtr mon, + const char *device, + unsigned long bandwidth, + virDomainBlockJobInfoPtr info, + int mode) +{ + char *cmd = NULL; + char *reply = NULL; + int ret; + + if (mode == BLOCK_JOB_ABORT) + ret = virAsprintf(&cmd, "block_job_cancel %s", device); + else if (mode == BLOCK_JOB_INFO) + ret = virAsprintf(&cmd, "info block-jobs"); + else if (mode == BLOCK_JOB_SPEED) + ret = virAsprintf(&cmd, "block_job_set_speed %s %llu", device, + bandwidth * 1024ULL * 1024ULL); + else if (mode == BLOCK_JOB_PULL) + ret = virAsprintf(&cmd, "block_stream %s", device); + else + return -1; + + if (ret < 0) { + virReportOOMError(); + return -1; + } + + ret = 0; + if (qemuMonitorHMPCommand(mon, cmd, &reply) < 0) { + qemuReportError(VIR_ERR_INTERNAL_ERROR, + "%s", _("cannot run monitor command")); + ret = -1; + goto cleanup; + } + + ret = qemuMonitorTextParseBlockJob(reply, device, info); + +cleanup: + VIR_FREE(cmd); + VIR_FREE(reply); + return ret; +} diff --git a/src/qemu/qemu_monitor_text.h b/src/qemu/qemu_monitor_text.h index e53f693..9a1c7c0 100644 --- a/src/qemu/qemu_monitor_text.h +++ b/src/qemu/qemu_monitor_text.h @@ -213,4 +213,10 @@ int qemuMonitorTextInjectNMI(qemuMonitorPtr mon); int qemuMonitorTextScreendump(qemuMonitorPtr mon, const char *file); +int qemuMonitorTextBlockJob(qemuMonitorPtr mon, + const char *device, + unsigned long bandwidth, + virDomainBlockJobInfoPtr info, + int mode); + #endif /* QEMU_MONITOR_TEXT_H */ -- 1.7.3 -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list