Re: [PATCH 11/21] qemu: migration: src: qemuSock for running thread

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 




On 11/18/2015 01:13 PM, Pavel Boldin wrote:
> Tunnelled drive mirroring requires an active thread to accept incoming
> connections from the QEMU and pumping them to the remote host through
> the tunnel.
> 
> For this, we need to split thread's QEMU socket initialization from
> the start of the thread and introduce qemuMigrationSetQEMUSocket
> to specify it later.
> 

This is a whole lot more going that isn't explained.... e.g. 's', 'f',
'u'...  The polling loop would now seem to "wait" for a data socket to
be created/added.

Also, even though it adds patches, perhaps would have been easier to
understand by renaming the fields first, then flip-flopping the order,
then splitting the setting of qemuSock until the 'u' is seen.


> Signed-off-by: Pavel Boldin <pboldin@xxxxxxxxxxxx>
> ---
>  src/qemu/qemu_migration.c | 93 ++++++++++++++++++++++++++++++-----------------
>  1 file changed, 59 insertions(+), 34 deletions(-)
> 
> diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c
> index d95cd66..61e78c5 100644
> --- a/src/qemu/qemu_migration.c
> +++ b/src/qemu/qemu_migration.c
> @@ -3991,14 +3991,15 @@ typedef struct _qemuMigrationIOThread qemuMigrationIOThread;
>  typedef qemuMigrationIOThread *qemuMigrationIOThreadPtr;
>  struct _qemuMigrationIOThread {
>      virThread thread;
> -    virStreamPtr st;
> -    int sock;
> +    virStreamPtr qemuStream;
> +    int qemuSock;
>      virError err;
>      int wakeupRecvFD;
>      int wakeupSendFD;
>  };
>  
> -static void qemuMigrationIOFunc(void *arg)
> +static void
> +qemuMigrationIOFunc(void *arg)
>  {
>      qemuMigrationIOThreadPtr data = arg;
>      char *buffer = NULL;
> @@ -4006,21 +4007,18 @@ static void qemuMigrationIOFunc(void *arg)
>      int timeout = -1;
>      virErrorPtr err = NULL;
>  
> -    VIR_DEBUG("Running migration tunnel; stream=%p, sock=%d",
> -              data->st, data->sock);
> +    VIR_DEBUG("Running migration tunnel; qemuStream=%p", data->qemuStream);

Since sock is "sent" to fds[0], thus isn't no longer used. So why is it
passed?

>  
>      if (VIR_ALLOC_N(buffer, TUNNEL_SEND_BUF_SIZE) < 0)
>          goto abrt;
>  
> -    fds[0].fd = data->sock;
> -    fds[1].fd = data->wakeupRecvFD;
> +    fds[0].fd = data->wakeupRecvFD;
> +    fds[1].fd = -1;
> +    fds[0].events = fds[1].events = POLLIN;
>  
>      for (;;) {
>          int ret;
>  
> -        fds[0].events = fds[1].events = POLLIN;
> -        fds[0].revents = fds[1].revents = 0;
> -

Don't think we want to lose the revents = 0.

According to how I read the man page, because the fds[1] = -1, it'd be
set to 0 anyway on return. But once fds[1] is set - since we're polling
two fd's here - how do you guarantee in the following code that you
wouldn't be "rereading and resending" on fds[1] if fds[0] is what
tripped the poll?


>          ret = poll(fds, ARRAY_CARDINALITY(fds), timeout);
>  
>          if (ret < 0) {
> @@ -4040,30 +4038,36 @@ static void qemuMigrationIOFunc(void *arg)
>              break;
>          }
>  
> -        if (fds[1].revents & (POLLIN | POLLERR | POLLHUP)) {
> -            char stop = 0;
> +        if (fds[0].revents & (POLLIN | POLLERR | POLLHUP)) {
> +            char action = 0;
>  
> -            if (saferead(data->wakeupRecvFD, &stop, 1) != 1) {
> +            if (saferead(data->wakeupRecvFD, &action, 1) != 1) {
>                  virReportSystemError(errno, "%s",
>                                       _("failed to read from wakeup fd"));
>                  goto abrt;
>              }
>  
> -            VIR_DEBUG("Migration tunnel was asked to %s",
> -                      stop ? "abort" : "finish");
> -            if (stop) {
> -                goto abrt;
> -            } else {
> -                timeout = 0;
> +            VIR_DEBUG("Migration tunnel was asked to %c", action);
> +            switch (action) {
> +                case 's':
> +                    goto abrt;
> +                    break;
> +                case 'f':
> +                    timeout = 0;
> +                    break;
> +                case 'u':
> +                    fds[1].fd = data->qemuSock;
> +                    VIR_DEBUG("qemuSock set %d", data->qemuSock);
> +                    break;
>              }
>          }
>  
> -        if (fds[0].revents & (POLLIN | POLLERR | POLLHUP)) {
> +        if (fds[1].revents & (POLLIN | POLLERR | POLLHUP)) {
>              int nbytes;
>  
> -            nbytes = saferead(data->sock, buffer, TUNNEL_SEND_BUF_SIZE);
> +            nbytes = saferead(data->qemuSock, buffer, TUNNEL_SEND_BUF_SIZE);
>              if (nbytes > 0) {
> -                if (virStreamSend(data->st, buffer, nbytes) < 0)
> +                if (virStreamSend(data->qemuStream, buffer, nbytes) < 0)
>                      goto error;
>              } else if (nbytes < 0) {
>                  virReportSystemError(errno, "%s",
> @@ -4076,10 +4080,9 @@ static void qemuMigrationIOFunc(void *arg)
>          }
>      }
>  
> -    if (virStreamFinish(data->st) < 0)
> -        goto error;
> +    virStreamFinish(data->qemuStream);

It would seem to me we shouldn't be losing this goto error on failure.

John
>  
> -    VIR_FORCE_CLOSE(data->sock);
> +    VIR_FORCE_CLOSE(data->qemuSock);
>      VIR_FREE(buffer);
>  
>      return;
> @@ -4090,7 +4093,7 @@ static void qemuMigrationIOFunc(void *arg)
>          virFreeError(err);
>          err = NULL;
>      }
> -    virStreamAbort(data->st);
> +    virStreamAbort(data->qemuStream);
>      if (err) {
>          virSetError(err);
>          virFreeError(err);
> @@ -4099,7 +4102,7 @@ static void qemuMigrationIOFunc(void *arg)
>   error:
>      /* Let the source qemu know that the transfer cant continue anymore.
>       * Don't copy the error for EPIPE as destination has the actual error. */
> -    VIR_FORCE_CLOSE(data->sock);
> +    VIR_FORCE_CLOSE(data->qemuSock);
>      if (!virLastErrorIsSystemErrno(EPIPE))
>          virCopyLastError(&data->err);
>      virResetLastError();
> @@ -4108,8 +4111,7 @@ static void qemuMigrationIOFunc(void *arg)
>  
>  
>  static qemuMigrationIOThreadPtr
> -qemuMigrationStartTunnel(virStreamPtr st,
> -                         int sock)
> +qemuMigrationStartTunnel(virStreamPtr qemuStream)
>  {
>      qemuMigrationIOThreadPtr io = NULL;
>      int wakeupFD[2] = { -1, -1 };
> @@ -4123,8 +4125,8 @@ qemuMigrationStartTunnel(virStreamPtr st,
>      if (VIR_ALLOC(io) < 0)
>          goto error;
>  
> -    io->st = st;
> -    io->sock = sock;
> +    io->qemuStream = qemuStream;
> +    io->qemuSock = -1;
>      io->wakeupRecvFD = wakeupFD[0];
>      io->wakeupSendFD = wakeupFD[1];
>  
> @@ -4149,10 +4151,10 @@ static int
>  qemuMigrationStopTunnel(qemuMigrationIOThreadPtr io, bool error)
>  {
>      int rv = -1;
> -    char stop = error ? 1 : 0;
> +    char action = error ? 's' : 'f';
>  
>      /* make sure the thread finishes its job and is joinable */
> -    if (safewrite(io->wakeupSendFD, &stop, 1) != 1) {
> +    if (safewrite(io->wakeupSendFD, &action, 1) != 1) {
>          virReportSystemError(errno, "%s",
>                               _("failed to wakeup migration tunnel"));
>          goto cleanup;
> @@ -4180,6 +4182,26 @@ qemuMigrationStopTunnel(qemuMigrationIOThreadPtr io, bool error)
>  }
>  
>  static int
> +qemuMigrationSetQEMUSocket(qemuMigrationIOThreadPtr io, int sock)
> +{
> +    int rv = -1;
> +    char action = 'u';
> +
> +    io->qemuSock = sock;
> +
> +    if (safewrite(io->wakeupSendFD, &action, 1) != 1) {
> +        virReportSystemError(errno, "%s",
> +                             _("failed to update migration tunnel"));
> +        goto error;
> +    }
> +
> +    rv = 0;
> +
> + error:
> +    return rv;
> +}
> +
> +static int
>  qemuMigrationConnect(virQEMUDriverPtr driver,
>                       virDomainObjPtr vm,
>                       qemuMigrationSpecPtr spec)
> @@ -4422,7 +4444,10 @@ qemuMigrationRun(virQEMUDriverPtr driver,
>      }
>  
>      if (spec->fwdType != MIGRATION_FWD_DIRECT) {
> -        if (!(iothread = qemuMigrationStartTunnel(spec->fwd.stream, fd)))
> +        if (!(iothread = qemuMigrationStartTunnel(spec->fwd.stream)))
> +            goto cancel;
> +
> +        if (qemuMigrationSetQEMUSocket(iothread, fd) < 0)
>              goto cancel;
>          /* If we've created a tunnel, then the 'fd' will be closed in the
>           * qemuMigrationIOFunc as data->sock.
> 

--
libvir-list mailing list
libvir-list@xxxxxxxxxx
https://www.redhat.com/mailman/listinfo/libvir-list



[Index of Archives]     [Virt Tools]     [Libvirt Users]     [Lib OS Info]     [Fedora Users]     [Fedora Desktop]     [Fedora SELinux]     [Big List of Linux Books]     [Yosemite News]     [KDE Users]     [Fedora Tools]