Add and use qemuMigrationPipeEvent piped streams' event handler. It sets the appropriate event flags for each of the stream and pumps the pipe using qemuMigrationPipeIO whenever there is a data at any end. Signed-off-by: Pavel Boldin <pboldin@xxxxxxxxxxxx> --- src/qemu/qemu_migration.c | 85 ++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 84 insertions(+), 1 deletion(-) diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c index 0f35c13..43f71e9 100644 --- a/src/qemu/qemu_migration.c +++ b/src/qemu/qemu_migration.c @@ -4010,8 +4010,28 @@ struct _qemuMigrationPipe { qemuMigrationIOThreadPtr data; virStreamPtr local; virStreamPtr remote; + + int local_flags : 4; + int remote_flags : 4; + char buffer[TUNNEL_SEND_BUF_SIZE]; }; +static int +qemuMigrationPipeIO(virStreamPtr from, virStreamPtr to, char *buffer) +{ + int done, got, offset = 0; + got = virStreamRecv(from, buffer, TUNNEL_SEND_BUF_SIZE); + + while (offset < got) { + done = virStreamSend(to, buffer + offset, got - offset); + if (done < 0) + break; + offset += done; + } + + return got; +} + static void qemuMigrationPipeClose(qemuMigrationPipePtr pipe, bool abort) { @@ -4030,6 +4050,55 @@ qemuMigrationPipeClose(qemuMigrationPipePtr pipe, bool abort) virObjectUnref(pipe->remote); } +static void +qemuMigrationPipeEvent(virStreamPtr stream, int events, void *opaque) +{ + qemuMigrationPipePtr pipe = opaque; + + if (stream == pipe->remote) + pipe->remote_flags |= events; + if (stream == pipe->local) + pipe->local_flags |= events; + + VIR_DEBUG("remote = %p, remote_flags = %x, local = %p, local_flags = %x", + pipe->remote, pipe->remote_flags, + pipe->local, pipe->local_flags); + + if (events & (VIR_STREAM_EVENT_ERROR | VIR_STREAM_EVENT_HANGUP)) { + char dummy; + virStreamRecv(stream, &dummy, 1); + abrt: + virCopyLastError(&pipe->data->err); + qemuMigrationPipeClose(pipe, true); + if (safewrite(pipe->data->wakeupSendFD, "c", 1) != 1) { + virReportSystemError(errno, "%s", + _("failed to stop migration tunnel")); + } + return; + } + + if ((pipe->remote_flags & VIR_STREAM_EVENT_READABLE) && + (pipe->local_flags & VIR_STREAM_EVENT_WRITABLE)) { + + if (qemuMigrationPipeIO(pipe->remote, pipe->local, pipe->buffer) == -1) + goto abrt; + + pipe->remote_flags &= ~VIR_STREAM_EVENT_READABLE; + pipe->local_flags &= ~VIR_STREAM_EVENT_WRITABLE; + } + + if ((pipe->local_flags & VIR_STREAM_EVENT_READABLE) && + (pipe->remote_flags & VIR_STREAM_EVENT_WRITABLE)) { + + if (qemuMigrationPipeIO(pipe->local, pipe->remote, pipe->buffer) == -1) + goto abrt; + + pipe->local_flags &= ~VIR_STREAM_EVENT_READABLE; + pipe->remote_flags &= ~VIR_STREAM_EVENT_WRITABLE; + } +} + + static qemuMigrationPipePtr qemuMigrationPipeCreate(virStreamPtr local, virStreamPtr remote) { @@ -4041,6 +4110,20 @@ qemuMigrationPipeCreate(virStreamPtr local, virStreamPtr remote) pipe->local = local; pipe->remote = remote; + if (virStreamEventAddCallback(local, + VIR_STREAM_EVENT_READABLE | + VIR_STREAM_EVENT_WRITABLE, + qemuMigrationPipeEvent, + pipe, NULL) < 0) + goto error; + + if (virStreamEventAddCallback(remote, + VIR_STREAM_EVENT_READABLE | + VIR_STREAM_EVENT_WRITABLE, + qemuMigrationPipeEvent, + pipe, NULL) < 0) + goto error; + return pipe; error: @@ -4230,7 +4313,7 @@ qemuMigrationIOFunc(void *arg) /* Let the source qemu know that the transfer cant continue anymore. * Don't copy the error for EPIPE as destination has the actual error. */ VIR_FORCE_CLOSE(data->qemuSock); - if (!virLastErrorIsSystemErrno(EPIPE)) + if (data->err.code == VIR_ERR_OK && !virLastErrorIsSystemErrno(EPIPE)) virCopyLastError(&data->err); virResetLastError(); VIR_FREE(buffer); -- 1.9.1 -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list