Re: [PATCH v2 06/38] virfdstream: Use messages instead of pipe

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 




On 04/20/2017 06:01 AM, Michal Privoznik wrote:
> One big downside of using the pipe to transfer the data is that
> we can really transfer just bare data. No metadata can be carried
> through unless some formatted messages are introduced. That would
> be quite painful to achieve so let's use a message queue. It's
> fairly easy to exchange info between threads now that iohelper is
> no longer used.
> 
> The reason why we cannot use the FD for plain files directly is
> that despite us setting noblock flag on the FD, any
> read()/write() blocks regardless (which is a show stopper since
> those parts of the code are run from the event loop) and poll()
> reports such FD as always readable/writable - even though the
> subsequent operation might block.
> 
> The pipe is still not gone though. It is used to signal to even
> loop that an event occurred (e.g. data are available for reading
> in the queue, or vice versa).
> 
> Signed-off-by: Michal Privoznik <mprivozn@xxxxxxxxxx>
> ---
>  src/util/virfdstream.c | 402 ++++++++++++++++++++++++++++++++++++++++++-------
>  1 file changed, 350 insertions(+), 52 deletions(-)
> 

Very strange - compilation breaks on this patch:

util/virfdstream.c: In function 'virFDStreamThread':
util/virfdstream.c:551:15: error: 'got' may be used uninitialized in
this function [-Werror=maybe-uninitialized]
         total += got;
               ^~
The reason this happens is that virFDStreamThreadDoWrite doesn't
initialize got, so it's return is indeterminate if (msg->type) is not
VIR_FDSTREAM_MSG_TYPE_DATA

Wish the complaint was in the right function...

Before I forget... starting here - perhaps a bit "nervous" about the
claim from your ping that "Patches 01-07 are fairly trivial and are more
of a bug fixes than feature implementation"...  I'd almost say 1-4 are
trivial, 5 is a little less so trivial since you're "removing" the
iohelper command and replacing it "inline", but this has moved into
altering more of the algorithm.  So close to a release...

I see Daniel has responded to this one too ... still I'll point out a
few more things...

> diff --git a/src/util/virfdstream.c b/src/util/virfdstream.c
> index 7a8d65d..0350494 100644
> --- a/src/util/virfdstream.c
> +++ b/src/util/virfdstream.c
> @@ -49,6 +49,27 @@
>  
>  VIR_LOG_INIT("fdstream");
>  
> +typedef enum {
> +    VIR_FDSTREAM_MSG_TYPE_DATA,
> +} virFDStreamMsgType;
> +
> +typedef struct _virFDStreamMsg virFDStreamMsg;
> +typedef virFDStreamMsg *virFDStreamMsgPtr;
> +struct _virFDStreamMsg {
> +    virFDStreamMsgPtr next;
> +
> +    virFDStreamMsgType type;
> +
> +    union {
> +        struct {
> +            char *buf;
> +            size_t len;
> +            size_t offset;
> +        } data;
> +    } stream;
> +};
> +
> +
>  /* Tunnelled migration stream support */
>  typedef struct virFDStreamData virFDStreamData;
>  typedef virFDStreamData *virFDStreamDataPtr;
> @@ -80,18 +101,25 @@ struct virFDStreamData {
>  
>      /* Thread data */
>      virThreadPtr thread;
> +    virCond threadCond;
>      int threadErr;
>      bool threadQuit;
> +    bool threadAbort;
> +    bool threadDoRead;
> +    virFDStreamMsgPtr msg;
>  };
>  
>  static virClassPtr virFDStreamDataClass;
>  
> +static void virFDStreamMsgQueueFree(virFDStreamMsgPtr *queue);
> +
>  static void
>  virFDStreamDataDispose(void *obj)
>  {
>      virFDStreamDataPtr fdst = obj;
>  
>      VIR_DEBUG("obj=%p", fdst);
> +    virFDStreamMsgQueueFree(&fdst->msg);
>  }
>  
>  static int virFDStreamDataOnceInit(void)
> @@ -108,6 +136,89 @@ static int virFDStreamDataOnceInit(void)
>  VIR_ONCE_GLOBAL_INIT(virFDStreamData)
>  
>  
> +static int
> +virFDStreamMsgQueuePush(virFDStreamDataPtr fdst,
> +                        virFDStreamMsgPtr msg,
> +                        int fd,
> +                        const char *fdname)
> +{
> +    virFDStreamMsgPtr *tmp = &fdst->msg;
> +    char c = '1';
> +
> +    while (*tmp)
> +        tmp = &(*tmp)->next;
> +
> +    *tmp = msg;
> +    virCondSignal(&fdst->threadCond);
> +
> +    if (safewrite(fd, &c, sizeof(c)) != sizeof(c)) {
> +        virReportSystemError(errno,
> +                             _("Unable to write to %s"),
> +                             fdname);
> +        return -1;
> +    }
> +
> +    return 0;
> +}
> +
> +
> +static virFDStreamMsgPtr
> +virFDStreamMsgQueuePop(virFDStreamDataPtr fdst,
> +                       int fd,
> +                       const char *fdname)
> +{
> +    virFDStreamMsgPtr tmp = fdst->msg;
> +    char c;
> +
> +    if (tmp) {
> +        fdst->msg = tmp->next;
> +        tmp->next = NULL;
> +    }
> +
> +    virCondSignal(&fdst->threadCond);
> +
> +    if (saferead(fd, &c, sizeof(c)) != sizeof(c)) {
> +        virReportSystemError(errno,
> +                             _("Unable to read from %s"),
> +                             fdname);
> +        return NULL;
> +    }
> +
> +    return tmp;
> +}
> +
> +
> +static void
> +virFDStreamMsgFree(virFDStreamMsgPtr msg)
> +{
> +    if (!msg)
> +        return;
> +
> +    switch (msg->type) {
> +    case VIR_FDSTREAM_MSG_TYPE_DATA:
> +        VIR_FREE(msg->stream.data.buf);
> +        break;
> +    }
> +
> +    VIR_FREE(msg);
> +}
> +
> +
> +static void
> +virFDStreamMsgQueueFree(virFDStreamMsgPtr *queue)
> +{
> +    virFDStreamMsgPtr tmp = *queue;
> +
> +    while (tmp) {
> +        virFDStreamMsgPtr next = tmp->next;
> +        virFDStreamMsgFree(tmp);
> +        tmp = next;
> +    }
> +
> +    *queue = NULL;
> +}
> +
> +
>  static int virFDStreamRemoveCallback(virStreamPtr stream)
>  {
>      virFDStreamDataPtr fdst = stream->privateData;
> @@ -273,6 +384,7 @@ typedef virFDStreamThreadData *virFDStreamThreadDataPtr;
>  struct _virFDStreamThreadData {
>      virStreamPtr st;
>      size_t length;
> +    bool doRead;
>      int fdin;
>      char *fdinname;
>      int fdout;
> @@ -293,6 +405,86 @@ virFDStreamThreadDataFree(virFDStreamThreadDataPtr data)
>  }
>  
>  
> +static ssize_t
> +virFDStreamThreadDoRead(virFDStreamDataPtr fdst,
> +                        const int fdin,
> +                        const int fdout,
> +                        const char *fdinname,
> +                        const char *fdoutname,
> +                        size_t buflen)
> +{
> +    virFDStreamMsgPtr msg = NULL;
> +    char *buf = NULL;
> +    ssize_t got;

got = -1;

Not really required yet, but if additional code gets added...

> +
> +    if (VIR_ALLOC(msg) < 0)
> +        goto error;
> +
> +    if (VIR_ALLOC_N(buf, buflen) < 0)
> +        goto error;
> +
> +    if ((got = saferead(fdin, buf, buflen)) < 0) {
> +        virReportSystemError(errno,
> +                             _("Unable to read %s"),
> +                             fdinname);
> +        goto error;
> +    }
> +
> +    msg->type = VIR_FDSTREAM_MSG_TYPE_DATA;
> +    msg->stream.data.buf = buf;

Could also go with the VIR_STEAL_PTR(msg->stream.data.buf, buf);
avoiding the buf = NULL below

> +    msg->stream.data.len = got;
> +    buf = NULL;
> +
> +    virFDStreamMsgQueuePush(fdst, msg, fdout, fdoutname);
> +    msg = NULL;

*QueuePush is not a void. What happens if safewrite fails?

> +
> +    return got;
> +
> + error:
> +    VIR_FREE(buf);
> +    virFDStreamMsgFree(msg);
> +    return -1;
> +}
> +
> +
> +static ssize_t
> +virFDStreamThreadDoWrite(virFDStreamDataPtr fdst,
> +                         const int fdin,
> +                         const int fdout,
> +                         const char *fdinname,
> +                         const char *fdoutname)
> +{
> +    ssize_t got;

got = -1

is required here since got is interdeterminate if msg->type != TYPE_DATA


> +    virFDStreamMsgPtr msg = fdst->msg;
> +    bool pop = false;
> +
> +    switch (msg->type) {
> +    case VIR_FDSTREAM_MSG_TYPE_DATA:
> +        got = safewrite(fdout,
> +                        msg->stream.data.buf + msg->stream.data.offset,
> +                        msg->stream.data.len - msg->stream.data.offset);
> +        if (got < 0) {
> +            virReportSystemError(errno,
> +                                 _("Unable to write %s"),
> +                                 fdoutname);
> +            return -1;
> +        }
> +
> +        msg->stream.data.offset += got;
> +
> +        pop = msg->stream.data.offset == msg->stream.data.len;
> +        break;
> +    }
> +
> +    if (pop) {
> +        virFDStreamMsgQueuePop(fdst, fdin, fdinname);

*QueuePop is not a void...  What if saferead fails?

> +        virFDStreamMsgFree(msg);
> +    }
> +
> +    return got;
> +}
> +
> +
>  static void
>  virFDStreamThread(void *opaque)
>  {
> @@ -304,14 +496,12 @@ virFDStreamThread(void *opaque)
>      int fdout = data->fdout;
>      char *fdoutname = data->fdoutname;
>      virFDStreamDataPtr fdst = st->privateData;
> -    char *buf = NULL;
> +    bool doRead = fdst->threadDoRead;
>      size_t buflen = 256 * 1024;
>      size_t total = 0;
>  
>      virObjectRef(fdst);
> -
> -    if (VIR_ALLOC_N(buf, buflen) < 0)
> -        goto error;
> +    virObjectLock(fdst);
>  
>      while (1) {
>          ssize_t got;
> @@ -323,39 +513,56 @@ virFDStreamThread(void *opaque)
>          if (buflen == 0)
>              break; /* End of requested data from client */
>  
> -        if ((got = saferead(fdin, buf, buflen)) < 0) {
> -            virReportSystemError(errno,
> -                                 _("Unable to read %s"),
> -                                 fdinname);
> +        while (doRead == (fdst->msg != NULL) &&
> +               !fdst->threadQuit) {
> +            if (virCondWait(&fdst->threadCond, &fdst->parent.lock)) {
> +                virReportSystemError(errno, "%s",
> +                                     _("failed to wait on condition"));
> +                goto error;
> +            }
> +        }
> +
> +        if (fdst->threadQuit) {
> +            /* If stream abort was requested, quit early. */
> +            if (fdst->threadAbort)
> +                goto cleanup;
> +
> +            /* Otherwise flush buffers and quit gracefully. */
> +            if (doRead == (fdst->msg != NULL))
> +                break;
> +        }
> +
> +        if (doRead)
> +            got = virFDStreamThreadDoRead(fdst,
> +                                          fdin, fdout,
> +                                          fdinname, fdoutname,
> +                                          buflen);
> +        else
> +            got = virFDStreamThreadDoWrite(fdst,
> +                                           fdin, fdout,
> +                                           fdinname, fdoutname);
> +
> +        if (got < 0)
>              goto error;
> -        }
>  
>          if (got == 0)
>              break;
>  
>          total += got;
> -
> -        if (safewrite(fdout, buf, got) < 0) {
> -            virReportSystemError(errno,
> -                                 _("Unable to write %s"),
> -                                 fdoutname);
> -            goto error;
> -        }
>      }
>  
>   cleanup:
> +    fdst->threadQuit = true;
> +    virObjectUnlock(fdst);
>      if (!virObjectUnref(fdst))
>          st->privateData = NULL;
>      VIR_FORCE_CLOSE(fdin);
>      VIR_FORCE_CLOSE(fdout);
>      virFDStreamThreadDataFree(data);
> -    VIR_FREE(buf);
>      return;
>  
>   error:
> -    virObjectLock(fdst);
>      fdst->threadErr = errno;
> -    virObjectUnlock(fdst);
>      goto cleanup;
>  }
>  
> @@ -367,6 +574,10 @@ virFDStreamJoinWorker(virFDStreamDataPtr fdst, bool streamAbort)
>      if (!fdst->thread)
>          return 0;
>  
> +    fdst->threadAbort = streamAbort;
> +    fdst->threadQuit = true;
> +    virCondSignal(&fdst->threadCond);
> +
>      /* Give the thread a chance to lock the FD stream object. */
>      virObjectUnlock(fdst);
>      virThreadJoin(fdst->thread);
> @@ -380,6 +591,7 @@ virFDStreamJoinWorker(virFDStreamDataPtr fdst, bool streamAbort)
>      ret = 0;
>   cleanup:
>      VIR_FREE(fdst->thread);
> +    virCondDestroy(&fdst->threadCond);
>      return ret;
>  }
>  
> @@ -426,11 +638,14 @@ virFDStreamCloseInt(virStreamPtr st, bool streamAbort)
>          fdst->abortCallbackDispatching = false;
>      }
>  
> -    /* mutex locked */
> -    ret = VIR_CLOSE(fdst->fd);
>      if (virFDStreamJoinWorker(fdst, streamAbort) < 0)
>          ret = -1;
>  
> +    /* mutex locked */
> +    if ((ret = VIR_CLOSE(fdst->fd)) < 0)
> +        virReportSystemError(errno, "%s",
> +                             _("Unable to close"));
> +
>      st->privateData = NULL;
>  
>      /* call the internal stream closing callback */
> @@ -467,7 +682,8 @@ virFDStreamAbort(virStreamPtr st)
>  static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes)
>  {
>      virFDStreamDataPtr fdst = st->privateData;
> -    int ret;
> +    virFDStreamMsgPtr msg = NULL;
> +    int ret = -1;
>  
>      if (nbytes > INT_MAX) {
>          virReportSystemError(ERANGE, "%s",
> @@ -495,25 +711,51 @@ static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes)
>              nbytes = fdst->length - fdst->offset;
>      }
>  
> - retry:
> -    ret = write(fdst->fd, bytes, nbytes);
> -    if (ret < 0) {
> -        VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR
> -        if (errno == EAGAIN || errno == EWOULDBLOCK) {
> -        VIR_WARNINGS_RESET
> -            ret = -2;
> -        } else if (errno == EINTR) {
> -            goto retry;
> -        } else {
> -            ret = -1;
> -            virReportSystemError(errno, "%s",
> +    if (fdst->thread) {
> +        char *buf;
> +
> +        if (fdst->threadQuit) {
> +            virReportSystemError(EBADF, "%s",
>                                   _("cannot write to stream"));
> +            return -1;

either virObjectUnlock(fdst); goto cleanup  (other fdst remains locked)

> +        }
> +
> +        if (VIR_ALLOC(msg) < 0 ||
> +            VIR_ALLOC_N(buf, nbytes) < 0)
> +            goto cleanup;
> +
> +        memcpy(buf, bytes, nbytes);
> +        msg->type = VIR_FDSTREAM_MSG_TYPE_DATA;
> +        msg->stream.data.buf = buf;
> +        msg->stream.data.len = nbytes;
> +
> +        virFDStreamMsgQueuePush(fdst, msg, fdst->fd, "pipe");

*QueuePush is not a void...  What happens if safewrite fails?

> +        msg = NULL;
> +        ret = nbytes;
> +    } else {
> +     retry:
> +        ret = write(fdst->fd, bytes, nbytes);
> +        if (ret < 0) {
> +            VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR
> +            if (errno == EAGAIN || errno == EWOULDBLOCK) {
> +            VIR_WARNINGS_RESET
> +                ret = -2;
> +            } else if (errno == EINTR) {
> +                goto retry;
> +            } else {
> +                ret = -1;
> +                virReportSystemError(errno, "%s",
> +                                     _("cannot write to stream"));
> +            }

Should there be a goto cleanup here so that we avoid any chance that
fstd->length > 0 and thus fdst->offset gets decremented by 1 or 2?  Or
me thinking that could happen without actually looking for whether
fdst->length could be non zero here.

>          }
> -    } else if (fdst->length) {
> -        fdst->offset += ret;
>      }
>  
> +    if (fdst->length)
> +        fdst->offset += ret;
> +
> + cleanup:
>      virObjectUnlock(fdst);
> +    virFDStreamMsgFree(msg);
>      return ret;
>  }
>  
> @@ -521,7 +763,7 @@ static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes)
>  static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes)
>  {
>      virFDStreamDataPtr fdst = st->privateData;
> -    int ret;
> +    int ret = -1;
>  
>      if (nbytes > INT_MAX) {
>          virReportSystemError(ERANGE, "%s",
> @@ -547,24 +789,70 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes)
>              nbytes = fdst->length - fdst->offset;
>      }
>  
> - retry:
> -    ret = read(fdst->fd, bytes, nbytes);
> -    if (ret < 0) {
> -        VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR
> -        if (errno == EAGAIN || errno == EWOULDBLOCK) {
> -        VIR_WARNINGS_RESET
> -            ret = -2;
> -        } else if (errno == EINTR) {
> -            goto retry;
> -        } else {
> -            ret = -1;
> -            virReportSystemError(errno, "%s",
> -                                 _("cannot read from stream"));
> +    if (fdst->thread) {
> +        virFDStreamMsgPtr msg = NULL;
> +
> +        while (!(msg = fdst->msg)) {
> +            if (fdst->threadQuit) {
> +                if (nbytes) {
> +                    virReportSystemError(EBADF, "%s",
> +                                         _("stream is not open"));
> +                } else {
> +                    ret = 0;
> +                }
> +                goto cleanup;
> +            } else {
> +                virObjectUnlock(fdst);
> +                virCondSignal(&fdst->threadCond);
> +                virObjectLock(fdst);
> +            }
> +        }
> +
> +        if (msg->type != VIR_FDSTREAM_MSG_TYPE_DATA) {
> +            /* Nope, nope, I'm outta here */
> +            virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
> +                           _("unexpected message type"));
> +            goto cleanup;
> +        }
> +
> +        if (nbytes > msg->stream.data.len - msg->stream.data.offset)
> +            nbytes = msg->stream.data.len - msg->stream.data.offset;
> +
> +        memcpy(bytes,
> +               msg->stream.data.buf + msg->stream.data.offset,
> +               nbytes);
> +
> +        msg->stream.data.offset += nbytes;
> +        if (msg->stream.data.offset == msg->stream.data.len) {
> +            virFDStreamMsgQueuePop(fdst, fdst->fd, "pipe");

*QueuePop is not a void... what if saferead fails.

> +            virFDStreamMsgFree(msg);
> +        }
> +
> +        ret = nbytes;
> +
> +    } else {
> +     retry:
> +        ret = read(fdst->fd, bytes, nbytes);
> +        if (ret < 0) {
> +            VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR
> +            if (errno == EAGAIN || errno == EWOULDBLOCK) {
> +            VIR_WARNINGS_RESET
> +                ret = -2;
> +            } else if (errno == EINTR) {
> +                goto retry;
> +            } else {
> +                ret = -1;
> +                virReportSystemError(errno, "%s",
> +                                     _("cannot read from stream"));
> +            }
> +            goto cleanup;

I think I know the answer to my question from virFDStreamWrite now..


John
>          }
> -    } else if (fdst->length) {
> -        fdst->offset += ret;
>      }
>  
> +    if (fdst->length)
> +        fdst->offset += ret;
> +
> + cleanup:
>      virObjectUnlock(fdst);
>      return ret;
>  }
> @@ -609,11 +897,19 @@ static int virFDStreamOpenInternal(virStreamPtr st,
>      st->privateData = fdst;
>  
>      if (threadData) {
> +        fdst->threadDoRead = threadData->doRead;
> +
>          /* Create the thread after fdst and st were initialized.
>           * The thread worker expects them to be that way. */
>          if (VIR_ALLOC(fdst->thread) < 0)
>              goto error;
>  
> +        if (virCondInit(&fdst->threadCond) < 0) {
> +            virReportSystemError(errno, "%s",
> +                                 _("cannot initialize condition variable"));
> +            goto error;
> +        }
> +
>          if (virThreadCreate(fdst->thread,
>                              true,
>                              virFDStreamThread,
> @@ -782,6 +1078,7 @@ virFDStreamOpenFileInternal(virStreamPtr st,
>                  VIR_STRDUP(threadData->fdoutname, "pipe") < 0)
>                  goto error;
>              tmpfd = pipefds[0];
> +            threadData->doRead = true;
>          } else {
>              threadData->fdin = pipefds[0];
>              threadData->fdout = fd;
> @@ -789,6 +1086,7 @@ virFDStreamOpenFileInternal(virStreamPtr st,
>                  VIR_STRDUP(threadData->fdoutname, path) < 0)
>                  goto error;
>              tmpfd = pipefds[1];
> +            threadData->doRead = false;
>          }
>      }
>  
> 

--
libvir-list mailing list
libvir-list@xxxxxxxxxx
https://www.redhat.com/mailman/listinfo/libvir-list



[Index of Archives]     [Virt Tools]     [Libvirt Users]     [Lib OS Info]     [Fedora Users]     [Fedora Desktop]     [Fedora SELinux]     [Big List of Linux Books]     [Yosemite News]     [KDE Users]     [Fedora Tools]
  Powered by Linux