On 11/18/2015 01:13 PM, Pavel Boldin wrote: > Tunnelled drive mirroring requires an active thread to accept incoming > connections from the QEMU and pumping them to the remote host through > the tunnel. > > For this, we need to split thread's QEMU socket initialization from > the start of the thread and introduce qemuMigrationSetQEMUSocket > to specify it later. > This is a whole lot more going that isn't explained.... e.g. 's', 'f', 'u'... The polling loop would now seem to "wait" for a data socket to be created/added. Also, even though it adds patches, perhaps would have been easier to understand by renaming the fields first, then flip-flopping the order, then splitting the setting of qemuSock until the 'u' is seen. > Signed-off-by: Pavel Boldin <pboldin@xxxxxxxxxxxx> > --- > src/qemu/qemu_migration.c | 93 ++++++++++++++++++++++++++++++----------------- > 1 file changed, 59 insertions(+), 34 deletions(-) > > diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c > index d95cd66..61e78c5 100644 > --- a/src/qemu/qemu_migration.c > +++ b/src/qemu/qemu_migration.c > @@ -3991,14 +3991,15 @@ typedef struct _qemuMigrationIOThread qemuMigrationIOThread; > typedef qemuMigrationIOThread *qemuMigrationIOThreadPtr; > struct _qemuMigrationIOThread { > virThread thread; > - virStreamPtr st; > - int sock; > + virStreamPtr qemuStream; > + int qemuSock; > virError err; > int wakeupRecvFD; > int wakeupSendFD; > }; > > -static void qemuMigrationIOFunc(void *arg) > +static void > +qemuMigrationIOFunc(void *arg) > { > qemuMigrationIOThreadPtr data = arg; > char *buffer = NULL; > @@ -4006,21 +4007,18 @@ static void qemuMigrationIOFunc(void *arg) > int timeout = -1; > virErrorPtr err = NULL; > > - VIR_DEBUG("Running migration tunnel; stream=%p, sock=%d", > - data->st, data->sock); > + VIR_DEBUG("Running migration tunnel; qemuStream=%p", data->qemuStream); Since sock is "sent" to fds[0], thus isn't no longer used. So why is it passed? > > if (VIR_ALLOC_N(buffer, TUNNEL_SEND_BUF_SIZE) < 0) > goto abrt; > > - fds[0].fd = data->sock; > - fds[1].fd = data->wakeupRecvFD; > + fds[0].fd = data->wakeupRecvFD; > + fds[1].fd = -1; > + fds[0].events = fds[1].events = POLLIN; > > for (;;) { > int ret; > > - fds[0].events = fds[1].events = POLLIN; > - fds[0].revents = fds[1].revents = 0; > - Don't think we want to lose the revents = 0. According to how I read the man page, because the fds[1] = -1, it'd be set to 0 anyway on return. But once fds[1] is set - since we're polling two fd's here - how do you guarantee in the following code that you wouldn't be "rereading and resending" on fds[1] if fds[0] is what tripped the poll? > ret = poll(fds, ARRAY_CARDINALITY(fds), timeout); > > if (ret < 0) { > @@ -4040,30 +4038,36 @@ static void qemuMigrationIOFunc(void *arg) > break; > } > > - if (fds[1].revents & (POLLIN | POLLERR | POLLHUP)) { > - char stop = 0; > + if (fds[0].revents & (POLLIN | POLLERR | POLLHUP)) { > + char action = 0; > > - if (saferead(data->wakeupRecvFD, &stop, 1) != 1) { > + if (saferead(data->wakeupRecvFD, &action, 1) != 1) { > virReportSystemError(errno, "%s", > _("failed to read from wakeup fd")); > goto abrt; > } > > - VIR_DEBUG("Migration tunnel was asked to %s", > - stop ? "abort" : "finish"); > - if (stop) { > - goto abrt; > - } else { > - timeout = 0; > + VIR_DEBUG("Migration tunnel was asked to %c", action); > + switch (action) { > + case 's': > + goto abrt; > + break; > + case 'f': > + timeout = 0; > + break; > + case 'u': > + fds[1].fd = data->qemuSock; > + VIR_DEBUG("qemuSock set %d", data->qemuSock); > + break; > } > } > > - if (fds[0].revents & (POLLIN | POLLERR | POLLHUP)) { > + if (fds[1].revents & (POLLIN | POLLERR | POLLHUP)) { > int nbytes; > > - nbytes = saferead(data->sock, buffer, TUNNEL_SEND_BUF_SIZE); > + nbytes = saferead(data->qemuSock, buffer, TUNNEL_SEND_BUF_SIZE); > if (nbytes > 0) { > - if (virStreamSend(data->st, buffer, nbytes) < 0) > + if (virStreamSend(data->qemuStream, buffer, nbytes) < 0) > goto error; > } else if (nbytes < 0) { > virReportSystemError(errno, "%s", > @@ -4076,10 +4080,9 @@ static void qemuMigrationIOFunc(void *arg) > } > } > > - if (virStreamFinish(data->st) < 0) > - goto error; > + virStreamFinish(data->qemuStream); It would seem to me we shouldn't be losing this goto error on failure. John > > - VIR_FORCE_CLOSE(data->sock); > + VIR_FORCE_CLOSE(data->qemuSock); > VIR_FREE(buffer); > > return; > @@ -4090,7 +4093,7 @@ static void qemuMigrationIOFunc(void *arg) > virFreeError(err); > err = NULL; > } > - virStreamAbort(data->st); > + virStreamAbort(data->qemuStream); > if (err) { > virSetError(err); > virFreeError(err); > @@ -4099,7 +4102,7 @@ static void qemuMigrationIOFunc(void *arg) > error: > /* Let the source qemu know that the transfer cant continue anymore. > * Don't copy the error for EPIPE as destination has the actual error. */ > - VIR_FORCE_CLOSE(data->sock); > + VIR_FORCE_CLOSE(data->qemuSock); > if (!virLastErrorIsSystemErrno(EPIPE)) > virCopyLastError(&data->err); > virResetLastError(); > @@ -4108,8 +4111,7 @@ static void qemuMigrationIOFunc(void *arg) > > > static qemuMigrationIOThreadPtr > -qemuMigrationStartTunnel(virStreamPtr st, > - int sock) > +qemuMigrationStartTunnel(virStreamPtr qemuStream) > { > qemuMigrationIOThreadPtr io = NULL; > int wakeupFD[2] = { -1, -1 }; > @@ -4123,8 +4125,8 @@ qemuMigrationStartTunnel(virStreamPtr st, > if (VIR_ALLOC(io) < 0) > goto error; > > - io->st = st; > - io->sock = sock; > + io->qemuStream = qemuStream; > + io->qemuSock = -1; > io->wakeupRecvFD = wakeupFD[0]; > io->wakeupSendFD = wakeupFD[1]; > > @@ -4149,10 +4151,10 @@ static int > qemuMigrationStopTunnel(qemuMigrationIOThreadPtr io, bool error) > { > int rv = -1; > - char stop = error ? 1 : 0; > + char action = error ? 's' : 'f'; > > /* make sure the thread finishes its job and is joinable */ > - if (safewrite(io->wakeupSendFD, &stop, 1) != 1) { > + if (safewrite(io->wakeupSendFD, &action, 1) != 1) { > virReportSystemError(errno, "%s", > _("failed to wakeup migration tunnel")); > goto cleanup; > @@ -4180,6 +4182,26 @@ qemuMigrationStopTunnel(qemuMigrationIOThreadPtr io, bool error) > } > > static int > +qemuMigrationSetQEMUSocket(qemuMigrationIOThreadPtr io, int sock) > +{ > + int rv = -1; > + char action = 'u'; > + > + io->qemuSock = sock; > + > + if (safewrite(io->wakeupSendFD, &action, 1) != 1) { > + virReportSystemError(errno, "%s", > + _("failed to update migration tunnel")); > + goto error; > + } > + > + rv = 0; > + > + error: > + return rv; > +} > + > +static int > qemuMigrationConnect(virQEMUDriverPtr driver, > virDomainObjPtr vm, > qemuMigrationSpecPtr spec) > @@ -4422,7 +4444,10 @@ qemuMigrationRun(virQEMUDriverPtr driver, > } > > if (spec->fwdType != MIGRATION_FWD_DIRECT) { > - if (!(iothread = qemuMigrationStartTunnel(spec->fwd.stream, fd))) > + if (!(iothread = qemuMigrationStartTunnel(spec->fwd.stream))) > + goto cancel; > + > + if (qemuMigrationSetQEMUSocket(iothread, fd) < 0) > goto cancel; > /* If we've created a tunnel, then the 'fd' will be closed in the > * qemuMigrationIOFunc as data->sock. > -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list