By running the doTunnelSendAll code in a separate thread, the main thread can do qemuMigrationWaitForCompletion as with normal migration. This in turn ensures that job signals work correctly and that progress monitoring can be done * src/qemu/qemu_migration.c: Runn tunnelled migration in separate thread --- src/qemu/qemu_migration.c | 95 ++++++++++++++++++++++++++++++++++++++------- 1 files changed, 81 insertions(+), 14 deletions(-) diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c index b8e595e..5413186 100644 --- a/src/qemu/qemu_migration.c +++ b/src/qemu/qemu_migration.c @@ -1289,44 +1289,101 @@ cleanup: #define TUNNEL_SEND_BUF_SIZE 65536 -static int doTunnelSendAll(virStreamPtr st, - int sock) +typedef struct _qemuMigrationIOThread qemuMigrationIOThread; +typedef qemuMigrationIOThread * qemuMigrationIOThreadPtr; +struct _qemuMigrationIOThread { + virThread thread; + virStreamPtr st; + int sock; + virError err; +}; + +static void qemuMigrationIOFunc(void *arg) { + qemuMigrationIOThreadPtr data = arg; char *buffer; int nbytes = TUNNEL_SEND_BUF_SIZE; if (VIR_ALLOC_N(buffer, TUNNEL_SEND_BUF_SIZE) < 0) { virReportOOMError(); - virStreamAbort(st); - return -1; + virStreamAbort(data->st); + goto error; } for (;;) { - nbytes = saferead(sock, buffer, TUNNEL_SEND_BUF_SIZE); + nbytes = saferead(data->sock, buffer, TUNNEL_SEND_BUF_SIZE); if (nbytes < 0) { virReportSystemError(errno, "%s", _("tunnelled migration failed to read from qemu")); - virStreamAbort(st); + virStreamAbort(data->st); VIR_FREE(buffer); - return -1; + goto error; } else if (nbytes == 0) /* EOF; get out of here */ break; - if (virStreamSend(st, buffer, nbytes) < 0) { + if (virStreamSend(data->st, buffer, nbytes) < 0) { VIR_FREE(buffer); - return -1; + goto error; } } VIR_FREE(buffer); - if (virStreamFinish(st) < 0) - /* virStreamFinish set the error for us */ - return -1; + if (virStreamFinish(data->st) < 0) + goto error; - return 0; + return; + +error: + virCopyLastError(&data->err); + virResetLastError(); +} + + +static qemuMigrationIOThreadPtr +qemuMigrationStartTunnel(virStreamPtr st, + int sock) +{ + qemuMigrationIOThreadPtr io; + + if (VIR_ALLOC(io) < 0) { + virReportOOMError(); + return NULL; + } + + io->st = st; + io->sock = sock; + + if (virThreadCreate(&io->thread, true, + qemuMigrationIOFunc, + io) < 0) { + VIR_FREE(io); + return NULL; + } + + return io; +} + +static int +qemuMigrationStopTunnel(qemuMigrationIOThreadPtr io) +{ + int rv = -1; + virThreadJoin(&io->thread); + + /* Forward error from the IO thread, to this thread */ + if (io->err.code != VIR_ERR_OK) { + virSetError(&io->err); + virResetError(&io->err); + goto cleanup; + } + + rv = 0; + +cleanup: + VIR_FREE(io); + return rv; } @@ -1351,6 +1408,7 @@ static int doTunnelMigrate(struct qemud_driver *driver, unsigned int background_flags = QEMU_MONITOR_MIGRATE_BACKGROUND; int ret = -1; qemuMigrationCookiePtr mig = NULL; + qemuMigrationIOThreadPtr iothread = NULL; if (!qemuCapsGet(priv->qemuCaps, QEMU_CAPS_MIGRATE_QEMU_UNIX) && !qemuCapsGet(priv->qemuCaps, QEMU_CAPS_MIGRATE_QEMU_EXEC)) { @@ -1486,7 +1544,16 @@ static int doTunnelMigrate(struct qemud_driver *driver, goto cancel; } - ret = doTunnelSendAll(st, client_sock); + if (!(iothread = qemuMigrationStartTunnel(st, client_sock))) + goto cancel; + + ret = qemuMigrationWaitForCompletion(driver, vm); + + /* Close now to ensure the IO thread quits & is joinable in next method */ + VIR_FORCE_CLOSE(client_sock); + + if (qemuMigrationStopTunnel(iothread) < 0) + ret = -1; if (ret == 0 && qemuMigrationBakeCookie(mig, driver, vm, cookieout, cookieoutlen, 0) < 0) -- 1.7.4.4 -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list