Tunnelled drive mirroring requires an active thread to accept incoming connections from the QEMU and pumping them to the remote host through the tunnel. For this, we need to split thread's QEMU socket initialization from the start of the thread and introduce qemuMigrationSetQEMUSocket to specify it later. Signed-off-by: Pavel Boldin <pboldin@xxxxxxxxxxxx> --- src/qemu/qemu_migration.c | 93 ++++++++++++++++++++++++++++++----------------- 1 file changed, 59 insertions(+), 34 deletions(-) diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c index d95cd66..61e78c5 100644 --- a/src/qemu/qemu_migration.c +++ b/src/qemu/qemu_migration.c @@ -3991,14 +3991,15 @@ typedef struct _qemuMigrationIOThread qemuMigrationIOThread; typedef qemuMigrationIOThread *qemuMigrationIOThreadPtr; struct _qemuMigrationIOThread { virThread thread; - virStreamPtr st; - int sock; + virStreamPtr qemuStream; + int qemuSock; virError err; int wakeupRecvFD; int wakeupSendFD; }; -static void qemuMigrationIOFunc(void *arg) +static void +qemuMigrationIOFunc(void *arg) { qemuMigrationIOThreadPtr data = arg; char *buffer = NULL; @@ -4006,21 +4007,18 @@ static void qemuMigrationIOFunc(void *arg) int timeout = -1; virErrorPtr err = NULL; - VIR_DEBUG("Running migration tunnel; stream=%p, sock=%d", - data->st, data->sock); + VIR_DEBUG("Running migration tunnel; qemuStream=%p", data->qemuStream); if (VIR_ALLOC_N(buffer, TUNNEL_SEND_BUF_SIZE) < 0) goto abrt; - fds[0].fd = data->sock; - fds[1].fd = data->wakeupRecvFD; + fds[0].fd = data->wakeupRecvFD; + fds[1].fd = -1; + fds[0].events = fds[1].events = POLLIN; for (;;) { int ret; - fds[0].events = fds[1].events = POLLIN; - fds[0].revents = fds[1].revents = 0; - ret = poll(fds, ARRAY_CARDINALITY(fds), timeout); if (ret < 0) { @@ -4040,30 +4038,36 @@ static void qemuMigrationIOFunc(void *arg) break; } - if (fds[1].revents & (POLLIN | POLLERR | POLLHUP)) { - char stop = 0; + if (fds[0].revents & (POLLIN | POLLERR | POLLHUP)) { + char action = 0; - if (saferead(data->wakeupRecvFD, &stop, 1) != 1) { + if (saferead(data->wakeupRecvFD, &action, 1) != 1) { virReportSystemError(errno, "%s", _("failed to read from wakeup fd")); goto abrt; } - VIR_DEBUG("Migration tunnel was asked to %s", - stop ? "abort" : "finish"); - if (stop) { - goto abrt; - } else { - timeout = 0; + VIR_DEBUG("Migration tunnel was asked to %c", action); + switch (action) { + case 's': + goto abrt; + break; + case 'f': + timeout = 0; + break; + case 'u': + fds[1].fd = data->qemuSock; + VIR_DEBUG("qemuSock set %d", data->qemuSock); + break; } } - if (fds[0].revents & (POLLIN | POLLERR | POLLHUP)) { + if (fds[1].revents & (POLLIN | POLLERR | POLLHUP)) { int nbytes; - nbytes = saferead(data->sock, buffer, TUNNEL_SEND_BUF_SIZE); + nbytes = saferead(data->qemuSock, buffer, TUNNEL_SEND_BUF_SIZE); if (nbytes > 0) { - if (virStreamSend(data->st, buffer, nbytes) < 0) + if (virStreamSend(data->qemuStream, buffer, nbytes) < 0) goto error; } else if (nbytes < 0) { virReportSystemError(errno, "%s", @@ -4076,10 +4080,9 @@ static void qemuMigrationIOFunc(void *arg) } } - if (virStreamFinish(data->st) < 0) - goto error; + virStreamFinish(data->qemuStream); - VIR_FORCE_CLOSE(data->sock); + VIR_FORCE_CLOSE(data->qemuSock); VIR_FREE(buffer); return; @@ -4090,7 +4093,7 @@ static void qemuMigrationIOFunc(void *arg) virFreeError(err); err = NULL; } - virStreamAbort(data->st); + virStreamAbort(data->qemuStream); if (err) { virSetError(err); virFreeError(err); @@ -4099,7 +4102,7 @@ static void qemuMigrationIOFunc(void *arg) error: /* Let the source qemu know that the transfer cant continue anymore. * Don't copy the error for EPIPE as destination has the actual error. */ - VIR_FORCE_CLOSE(data->sock); + VIR_FORCE_CLOSE(data->qemuSock); if (!virLastErrorIsSystemErrno(EPIPE)) virCopyLastError(&data->err); virResetLastError(); @@ -4108,8 +4111,7 @@ static void qemuMigrationIOFunc(void *arg) static qemuMigrationIOThreadPtr -qemuMigrationStartTunnel(virStreamPtr st, - int sock) +qemuMigrationStartTunnel(virStreamPtr qemuStream) { qemuMigrationIOThreadPtr io = NULL; int wakeupFD[2] = { -1, -1 }; @@ -4123,8 +4125,8 @@ qemuMigrationStartTunnel(virStreamPtr st, if (VIR_ALLOC(io) < 0) goto error; - io->st = st; - io->sock = sock; + io->qemuStream = qemuStream; + io->qemuSock = -1; io->wakeupRecvFD = wakeupFD[0]; io->wakeupSendFD = wakeupFD[1]; @@ -4149,10 +4151,10 @@ static int qemuMigrationStopTunnel(qemuMigrationIOThreadPtr io, bool error) { int rv = -1; - char stop = error ? 1 : 0; + char action = error ? 's' : 'f'; /* make sure the thread finishes its job and is joinable */ - if (safewrite(io->wakeupSendFD, &stop, 1) != 1) { + if (safewrite(io->wakeupSendFD, &action, 1) != 1) { virReportSystemError(errno, "%s", _("failed to wakeup migration tunnel")); goto cleanup; @@ -4180,6 +4182,26 @@ qemuMigrationStopTunnel(qemuMigrationIOThreadPtr io, bool error) } static int +qemuMigrationSetQEMUSocket(qemuMigrationIOThreadPtr io, int sock) +{ + int rv = -1; + char action = 'u'; + + io->qemuSock = sock; + + if (safewrite(io->wakeupSendFD, &action, 1) != 1) { + virReportSystemError(errno, "%s", + _("failed to update migration tunnel")); + goto error; + } + + rv = 0; + + error: + return rv; +} + +static int qemuMigrationConnect(virQEMUDriverPtr driver, virDomainObjPtr vm, qemuMigrationSpecPtr spec) @@ -4422,7 +4444,10 @@ qemuMigrationRun(virQEMUDriverPtr driver, } if (spec->fwdType != MIGRATION_FWD_DIRECT) { - if (!(iothread = qemuMigrationStartTunnel(spec->fwd.stream, fd))) + if (!(iothread = qemuMigrationStartTunnel(spec->fwd.stream))) + goto cancel; + + if (qemuMigrationSetQEMUSocket(iothread, fd) < 0) goto cancel; /* If we've created a tunnel, then the 'fd' will be closed in the * qemuMigrationIOFunc as data->sock. -- 1.9.1 -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list