Add qemuNBDTunnelAcceptAndPipe function that is called to handle POLLIN on the UNIX socket connection from the QEMU's NBD server. The function creates a pipe of a remote stream connected to the QEMU NBD Unix socket on destination and a local stream connected to the incoming connection from the source QEMU's NBD. Signed-off-by: Pavel Boldin <pboldin@xxxxxxxxxxxx> --- src/qemu/qemu_migration.c | 134 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 132 insertions(+), 2 deletions(-) diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c index 0682fd8..0f35c13 100644 --- a/src/qemu/qemu_migration.c +++ b/src/qemu/qemu_migration.c @@ -3987,6 +3987,9 @@ struct _qemuMigrationSpec { #define TUNNEL_SEND_BUF_SIZE 65536 +typedef struct _qemuMigrationPipe qemuMigrationPipe; +typedef qemuMigrationPipe *qemuMigrationPipePtr; + typedef struct _qemuMigrationIOThread qemuMigrationIOThread; typedef qemuMigrationIOThread *qemuMigrationIOThreadPtr; struct _qemuMigrationIOThread { @@ -3997,9 +4000,124 @@ struct _qemuMigrationIOThread { virError err; int wakeupRecvFD; int wakeupSendFD; + qemuMigrationPipePtr pipes; + virConnectPtr dconn; + unsigned char uuid[VIR_UUID_BUFLEN]; +}; + +struct _qemuMigrationPipe { + qemuMigrationPipePtr next; + qemuMigrationIOThreadPtr data; + virStreamPtr local; + virStreamPtr remote; }; static void +qemuMigrationPipeClose(qemuMigrationPipePtr pipe, bool abort) +{ + virStreamEventUpdateCallback(pipe->local, 0); + virStreamEventUpdateCallback(pipe->remote, 0); + + if (abort) { + virStreamAbort(pipe->local); + virStreamAbort(pipe->remote); + } else { + virStreamFinish(pipe->local); + virStreamFinish(pipe->remote); + } + + virObjectUnref(pipe->local); + virObjectUnref(pipe->remote); +} + +static qemuMigrationPipePtr +qemuMigrationPipeCreate(virStreamPtr local, virStreamPtr remote) +{ + qemuMigrationPipePtr pipe = NULL; + + if (VIR_ALLOC(pipe) < 0) + goto error; + + pipe->local = local; + pipe->remote = remote; + + return pipe; + + error: + virStreamEventRemoveCallback(local); + virStreamEventRemoveCallback(remote); + VIR_FREE(pipe); + return NULL; +} + + +static int +qemuNBDTunnelAcceptAndPipe(qemuMigrationIOThreadPtr data) +{ + int fd, ret; + virStreamPtr local = NULL, remote = NULL; + qemuMigrationPipePtr pipe = NULL; + + while ((fd = accept(data->unixSock, NULL, NULL)) < 0) { + if (errno == EAGAIN || errno == EINTR) + continue; + virReportSystemError( + errno, "%s", _("failed to accept connection from qemu")); + goto abrt; + } + + if (!(local = virStreamNew(data->dconn, VIR_STREAM_NONBLOCK))) + goto abrt; + + if (!(remote = virStreamNew(data->dconn, VIR_STREAM_NONBLOCK))) + goto abrt; + + ret = virDomainMigrateOpenTunnel(data->dconn, + remote, + data->uuid, + VIR_MIGRATE_TUNNEL_NBD); + + if (ret < 0) + goto abrt; + + if (virFDStreamOpen(local, fd) < 0) + goto abrt; + + if (!(pipe = qemuMigrationPipeCreate(local, remote))) + goto abrt; + + pipe->data = data; + pipe->next = data->pipes; + data->pipes = pipe; + + return 0; + + abrt: + VIR_FORCE_CLOSE(fd); + virStreamAbort(local); + virStreamAbort(remote); + + virObjectUnref(local); + virObjectUnref(remote); + return -1; +} + +static void +qemuMigrationPipesStop(qemuMigrationPipePtr pipe, bool abort) +{ + qemuMigrationPipePtr tmp; + + while (pipe) { + tmp = pipe->next; + + qemuMigrationPipeClose(pipe, abort); + VIR_FREE(pipe); + + pipe = tmp; + } +} + +static void qemuMigrationIOFunc(void *arg) { qemuMigrationIOThreadPtr data = arg; @@ -4081,9 +4199,14 @@ qemuMigrationIOFunc(void *arg) break; } } + + if (fds[2].revents & (POLLIN | POLLERR | POLLHUP) && + qemuNBDTunnelAcceptAndPipe(data) < 0) + goto abrt; } virStreamFinish(data->qemuStream); + qemuMigrationPipesStop(data->pipes, false); VIR_FORCE_CLOSE(data->qemuSock); VIR_FREE(buffer); @@ -4097,6 +4220,7 @@ qemuMigrationIOFunc(void *arg) err = NULL; } virStreamAbort(data->qemuStream); + qemuMigrationPipesStop(data->pipes, true); if (err) { virSetError(err); virFreeError(err); @@ -4114,7 +4238,9 @@ qemuMigrationIOFunc(void *arg) static qemuMigrationIOThreadPtr -qemuMigrationStartTunnel(virStreamPtr qemuStream) +qemuMigrationStartTunnel(virStreamPtr qemuStream, + virConnectPtr dconn, + unsigned char uuid[VIR_UUID_BUFLEN]) { qemuMigrationIOThreadPtr io = NULL; int wakeupFD[2] = { -1, -1 }; @@ -4132,6 +4258,8 @@ qemuMigrationStartTunnel(virStreamPtr qemuStream) io->qemuSock = io->unixSock = -1; io->wakeupRecvFD = wakeupFD[0]; io->wakeupSendFD = wakeupFD[1]; + io->dconn = dconn; + memcpy(io->uuid, uuid, VIR_UUID_BUFLEN); if (virThreadCreate(&io->thread, true, qemuMigrationIOFunc, @@ -4337,7 +4465,9 @@ qemuMigrationRun(virQEMUDriverPtr driver, VIR_WARN("unable to provide data for graphics client relocation"); if (spec->fwdType != MIGRATION_FWD_DIRECT) { - if (!(iothread = qemuMigrationStartTunnel(spec->fwd.stream))) + if (!(iothread = qemuMigrationStartTunnel(spec->fwd.stream, + dconn, + mig->uuid))) goto cancel; if (nmigrate_disks && -- 1.9.1 -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list