On 04/20/2017 06:01 AM, Michal Privoznik wrote: > One big downside of using the pipe to transfer the data is that > we can really transfer just bare data. No metadata can be carried > through unless some formatted messages are introduced. That would > be quite painful to achieve so let's use a message queue. It's > fairly easy to exchange info between threads now that iohelper is > no longer used. > > The reason why we cannot use the FD for plain files directly is > that despite us setting noblock flag on the FD, any > read()/write() blocks regardless (which is a show stopper since > those parts of the code are run from the event loop) and poll() > reports such FD as always readable/writable - even though the > subsequent operation might block. > > The pipe is still not gone though. It is used to signal to even > loop that an event occurred (e.g. data are available for reading > in the queue, or vice versa). > > Signed-off-by: Michal Privoznik <mprivozn@xxxxxxxxxx> > --- > src/util/virfdstream.c | 402 ++++++++++++++++++++++++++++++++++++++++++------- > 1 file changed, 350 insertions(+), 52 deletions(-) > Very strange - compilation breaks on this patch: util/virfdstream.c: In function 'virFDStreamThread': util/virfdstream.c:551:15: error: 'got' may be used uninitialized in this function [-Werror=maybe-uninitialized] total += got; ^~ The reason this happens is that virFDStreamThreadDoWrite doesn't initialize got, so it's return is indeterminate if (msg->type) is not VIR_FDSTREAM_MSG_TYPE_DATA Wish the complaint was in the right function... Before I forget... starting here - perhaps a bit "nervous" about the claim from your ping that "Patches 01-07 are fairly trivial and are more of a bug fixes than feature implementation"... I'd almost say 1-4 are trivial, 5 is a little less so trivial since you're "removing" the iohelper command and replacing it "inline", but this has moved into altering more of the algorithm. So close to a release... I see Daniel has responded to this one too ... still I'll point out a few more things... > diff --git a/src/util/virfdstream.c b/src/util/virfdstream.c > index 7a8d65d..0350494 100644 > --- a/src/util/virfdstream.c > +++ b/src/util/virfdstream.c > @@ -49,6 +49,27 @@ > > VIR_LOG_INIT("fdstream"); > > +typedef enum { > + VIR_FDSTREAM_MSG_TYPE_DATA, > +} virFDStreamMsgType; > + > +typedef struct _virFDStreamMsg virFDStreamMsg; > +typedef virFDStreamMsg *virFDStreamMsgPtr; > +struct _virFDStreamMsg { > + virFDStreamMsgPtr next; > + > + virFDStreamMsgType type; > + > + union { > + struct { > + char *buf; > + size_t len; > + size_t offset; > + } data; > + } stream; > +}; > + > + > /* Tunnelled migration stream support */ > typedef struct virFDStreamData virFDStreamData; > typedef virFDStreamData *virFDStreamDataPtr; > @@ -80,18 +101,25 @@ struct virFDStreamData { > > /* Thread data */ > virThreadPtr thread; > + virCond threadCond; > int threadErr; > bool threadQuit; > + bool threadAbort; > + bool threadDoRead; > + virFDStreamMsgPtr msg; > }; > > static virClassPtr virFDStreamDataClass; > > +static void virFDStreamMsgQueueFree(virFDStreamMsgPtr *queue); > + > static void > virFDStreamDataDispose(void *obj) > { > virFDStreamDataPtr fdst = obj; > > VIR_DEBUG("obj=%p", fdst); > + virFDStreamMsgQueueFree(&fdst->msg); > } > > static int virFDStreamDataOnceInit(void) > @@ -108,6 +136,89 @@ static int virFDStreamDataOnceInit(void) > VIR_ONCE_GLOBAL_INIT(virFDStreamData) > > > +static int > +virFDStreamMsgQueuePush(virFDStreamDataPtr fdst, > + virFDStreamMsgPtr msg, > + int fd, > + const char *fdname) > +{ > + virFDStreamMsgPtr *tmp = &fdst->msg; > + char c = '1'; > + > + while (*tmp) > + tmp = &(*tmp)->next; > + > + *tmp = msg; > + virCondSignal(&fdst->threadCond); > + > + if (safewrite(fd, &c, sizeof(c)) != sizeof(c)) { > + virReportSystemError(errno, > + _("Unable to write to %s"), > + fdname); > + return -1; > + } > + > + return 0; > +} > + > + > +static virFDStreamMsgPtr > +virFDStreamMsgQueuePop(virFDStreamDataPtr fdst, > + int fd, > + const char *fdname) > +{ > + virFDStreamMsgPtr tmp = fdst->msg; > + char c; > + > + if (tmp) { > + fdst->msg = tmp->next; > + tmp->next = NULL; > + } > + > + virCondSignal(&fdst->threadCond); > + > + if (saferead(fd, &c, sizeof(c)) != sizeof(c)) { > + virReportSystemError(errno, > + _("Unable to read from %s"), > + fdname); > + return NULL; > + } > + > + return tmp; > +} > + > + > +static void > +virFDStreamMsgFree(virFDStreamMsgPtr msg) > +{ > + if (!msg) > + return; > + > + switch (msg->type) { > + case VIR_FDSTREAM_MSG_TYPE_DATA: > + VIR_FREE(msg->stream.data.buf); > + break; > + } > + > + VIR_FREE(msg); > +} > + > + > +static void > +virFDStreamMsgQueueFree(virFDStreamMsgPtr *queue) > +{ > + virFDStreamMsgPtr tmp = *queue; > + > + while (tmp) { > + virFDStreamMsgPtr next = tmp->next; > + virFDStreamMsgFree(tmp); > + tmp = next; > + } > + > + *queue = NULL; > +} > + > + > static int virFDStreamRemoveCallback(virStreamPtr stream) > { > virFDStreamDataPtr fdst = stream->privateData; > @@ -273,6 +384,7 @@ typedef virFDStreamThreadData *virFDStreamThreadDataPtr; > struct _virFDStreamThreadData { > virStreamPtr st; > size_t length; > + bool doRead; > int fdin; > char *fdinname; > int fdout; > @@ -293,6 +405,86 @@ virFDStreamThreadDataFree(virFDStreamThreadDataPtr data) > } > > > +static ssize_t > +virFDStreamThreadDoRead(virFDStreamDataPtr fdst, > + const int fdin, > + const int fdout, > + const char *fdinname, > + const char *fdoutname, > + size_t buflen) > +{ > + virFDStreamMsgPtr msg = NULL; > + char *buf = NULL; > + ssize_t got; got = -1; Not really required yet, but if additional code gets added... > + > + if (VIR_ALLOC(msg) < 0) > + goto error; > + > + if (VIR_ALLOC_N(buf, buflen) < 0) > + goto error; > + > + if ((got = saferead(fdin, buf, buflen)) < 0) { > + virReportSystemError(errno, > + _("Unable to read %s"), > + fdinname); > + goto error; > + } > + > + msg->type = VIR_FDSTREAM_MSG_TYPE_DATA; > + msg->stream.data.buf = buf; Could also go with the VIR_STEAL_PTR(msg->stream.data.buf, buf); avoiding the buf = NULL below > + msg->stream.data.len = got; > + buf = NULL; > + > + virFDStreamMsgQueuePush(fdst, msg, fdout, fdoutname); > + msg = NULL; *QueuePush is not a void. What happens if safewrite fails? > + > + return got; > + > + error: > + VIR_FREE(buf); > + virFDStreamMsgFree(msg); > + return -1; > +} > + > + > +static ssize_t > +virFDStreamThreadDoWrite(virFDStreamDataPtr fdst, > + const int fdin, > + const int fdout, > + const char *fdinname, > + const char *fdoutname) > +{ > + ssize_t got; got = -1 is required here since got is interdeterminate if msg->type != TYPE_DATA > + virFDStreamMsgPtr msg = fdst->msg; > + bool pop = false; > + > + switch (msg->type) { > + case VIR_FDSTREAM_MSG_TYPE_DATA: > + got = safewrite(fdout, > + msg->stream.data.buf + msg->stream.data.offset, > + msg->stream.data.len - msg->stream.data.offset); > + if (got < 0) { > + virReportSystemError(errno, > + _("Unable to write %s"), > + fdoutname); > + return -1; > + } > + > + msg->stream.data.offset += got; > + > + pop = msg->stream.data.offset == msg->stream.data.len; > + break; > + } > + > + if (pop) { > + virFDStreamMsgQueuePop(fdst, fdin, fdinname); *QueuePop is not a void... What if saferead fails? > + virFDStreamMsgFree(msg); > + } > + > + return got; > +} > + > + > static void > virFDStreamThread(void *opaque) > { > @@ -304,14 +496,12 @@ virFDStreamThread(void *opaque) > int fdout = data->fdout; > char *fdoutname = data->fdoutname; > virFDStreamDataPtr fdst = st->privateData; > - char *buf = NULL; > + bool doRead = fdst->threadDoRead; > size_t buflen = 256 * 1024; > size_t total = 0; > > virObjectRef(fdst); > - > - if (VIR_ALLOC_N(buf, buflen) < 0) > - goto error; > + virObjectLock(fdst); > > while (1) { > ssize_t got; > @@ -323,39 +513,56 @@ virFDStreamThread(void *opaque) > if (buflen == 0) > break; /* End of requested data from client */ > > - if ((got = saferead(fdin, buf, buflen)) < 0) { > - virReportSystemError(errno, > - _("Unable to read %s"), > - fdinname); > + while (doRead == (fdst->msg != NULL) && > + !fdst->threadQuit) { > + if (virCondWait(&fdst->threadCond, &fdst->parent.lock)) { > + virReportSystemError(errno, "%s", > + _("failed to wait on condition")); > + goto error; > + } > + } > + > + if (fdst->threadQuit) { > + /* If stream abort was requested, quit early. */ > + if (fdst->threadAbort) > + goto cleanup; > + > + /* Otherwise flush buffers and quit gracefully. */ > + if (doRead == (fdst->msg != NULL)) > + break; > + } > + > + if (doRead) > + got = virFDStreamThreadDoRead(fdst, > + fdin, fdout, > + fdinname, fdoutname, > + buflen); > + else > + got = virFDStreamThreadDoWrite(fdst, > + fdin, fdout, > + fdinname, fdoutname); > + > + if (got < 0) > goto error; > - } > > if (got == 0) > break; > > total += got; > - > - if (safewrite(fdout, buf, got) < 0) { > - virReportSystemError(errno, > - _("Unable to write %s"), > - fdoutname); > - goto error; > - } > } > > cleanup: > + fdst->threadQuit = true; > + virObjectUnlock(fdst); > if (!virObjectUnref(fdst)) > st->privateData = NULL; > VIR_FORCE_CLOSE(fdin); > VIR_FORCE_CLOSE(fdout); > virFDStreamThreadDataFree(data); > - VIR_FREE(buf); > return; > > error: > - virObjectLock(fdst); > fdst->threadErr = errno; > - virObjectUnlock(fdst); > goto cleanup; > } > > @@ -367,6 +574,10 @@ virFDStreamJoinWorker(virFDStreamDataPtr fdst, bool streamAbort) > if (!fdst->thread) > return 0; > > + fdst->threadAbort = streamAbort; > + fdst->threadQuit = true; > + virCondSignal(&fdst->threadCond); > + > /* Give the thread a chance to lock the FD stream object. */ > virObjectUnlock(fdst); > virThreadJoin(fdst->thread); > @@ -380,6 +591,7 @@ virFDStreamJoinWorker(virFDStreamDataPtr fdst, bool streamAbort) > ret = 0; > cleanup: > VIR_FREE(fdst->thread); > + virCondDestroy(&fdst->threadCond); > return ret; > } > > @@ -426,11 +638,14 @@ virFDStreamCloseInt(virStreamPtr st, bool streamAbort) > fdst->abortCallbackDispatching = false; > } > > - /* mutex locked */ > - ret = VIR_CLOSE(fdst->fd); > if (virFDStreamJoinWorker(fdst, streamAbort) < 0) > ret = -1; > > + /* mutex locked */ > + if ((ret = VIR_CLOSE(fdst->fd)) < 0) > + virReportSystemError(errno, "%s", > + _("Unable to close")); > + > st->privateData = NULL; > > /* call the internal stream closing callback */ > @@ -467,7 +682,8 @@ virFDStreamAbort(virStreamPtr st) > static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes) > { > virFDStreamDataPtr fdst = st->privateData; > - int ret; > + virFDStreamMsgPtr msg = NULL; > + int ret = -1; > > if (nbytes > INT_MAX) { > virReportSystemError(ERANGE, "%s", > @@ -495,25 +711,51 @@ static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes) > nbytes = fdst->length - fdst->offset; > } > > - retry: > - ret = write(fdst->fd, bytes, nbytes); > - if (ret < 0) { > - VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR > - if (errno == EAGAIN || errno == EWOULDBLOCK) { > - VIR_WARNINGS_RESET > - ret = -2; > - } else if (errno == EINTR) { > - goto retry; > - } else { > - ret = -1; > - virReportSystemError(errno, "%s", > + if (fdst->thread) { > + char *buf; > + > + if (fdst->threadQuit) { > + virReportSystemError(EBADF, "%s", > _("cannot write to stream")); > + return -1; either virObjectUnlock(fdst); goto cleanup (other fdst remains locked) > + } > + > + if (VIR_ALLOC(msg) < 0 || > + VIR_ALLOC_N(buf, nbytes) < 0) > + goto cleanup; > + > + memcpy(buf, bytes, nbytes); > + msg->type = VIR_FDSTREAM_MSG_TYPE_DATA; > + msg->stream.data.buf = buf; > + msg->stream.data.len = nbytes; > + > + virFDStreamMsgQueuePush(fdst, msg, fdst->fd, "pipe"); *QueuePush is not a void... What happens if safewrite fails? > + msg = NULL; > + ret = nbytes; > + } else { > + retry: > + ret = write(fdst->fd, bytes, nbytes); > + if (ret < 0) { > + VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR > + if (errno == EAGAIN || errno == EWOULDBLOCK) { > + VIR_WARNINGS_RESET > + ret = -2; > + } else if (errno == EINTR) { > + goto retry; > + } else { > + ret = -1; > + virReportSystemError(errno, "%s", > + _("cannot write to stream")); > + } Should there be a goto cleanup here so that we avoid any chance that fstd->length > 0 and thus fdst->offset gets decremented by 1 or 2? Or me thinking that could happen without actually looking for whether fdst->length could be non zero here. > } > - } else if (fdst->length) { > - fdst->offset += ret; > } > > + if (fdst->length) > + fdst->offset += ret; > + > + cleanup: > virObjectUnlock(fdst); > + virFDStreamMsgFree(msg); > return ret; > } > > @@ -521,7 +763,7 @@ static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes) > static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes) > { > virFDStreamDataPtr fdst = st->privateData; > - int ret; > + int ret = -1; > > if (nbytes > INT_MAX) { > virReportSystemError(ERANGE, "%s", > @@ -547,24 +789,70 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes) > nbytes = fdst->length - fdst->offset; > } > > - retry: > - ret = read(fdst->fd, bytes, nbytes); > - if (ret < 0) { > - VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR > - if (errno == EAGAIN || errno == EWOULDBLOCK) { > - VIR_WARNINGS_RESET > - ret = -2; > - } else if (errno == EINTR) { > - goto retry; > - } else { > - ret = -1; > - virReportSystemError(errno, "%s", > - _("cannot read from stream")); > + if (fdst->thread) { > + virFDStreamMsgPtr msg = NULL; > + > + while (!(msg = fdst->msg)) { > + if (fdst->threadQuit) { > + if (nbytes) { > + virReportSystemError(EBADF, "%s", > + _("stream is not open")); > + } else { > + ret = 0; > + } > + goto cleanup; > + } else { > + virObjectUnlock(fdst); > + virCondSignal(&fdst->threadCond); > + virObjectLock(fdst); > + } > + } > + > + if (msg->type != VIR_FDSTREAM_MSG_TYPE_DATA) { > + /* Nope, nope, I'm outta here */ > + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", > + _("unexpected message type")); > + goto cleanup; > + } > + > + if (nbytes > msg->stream.data.len - msg->stream.data.offset) > + nbytes = msg->stream.data.len - msg->stream.data.offset; > + > + memcpy(bytes, > + msg->stream.data.buf + msg->stream.data.offset, > + nbytes); > + > + msg->stream.data.offset += nbytes; > + if (msg->stream.data.offset == msg->stream.data.len) { > + virFDStreamMsgQueuePop(fdst, fdst->fd, "pipe"); *QueuePop is not a void... what if saferead fails. > + virFDStreamMsgFree(msg); > + } > + > + ret = nbytes; > + > + } else { > + retry: > + ret = read(fdst->fd, bytes, nbytes); > + if (ret < 0) { > + VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR > + if (errno == EAGAIN || errno == EWOULDBLOCK) { > + VIR_WARNINGS_RESET > + ret = -2; > + } else if (errno == EINTR) { > + goto retry; > + } else { > + ret = -1; > + virReportSystemError(errno, "%s", > + _("cannot read from stream")); > + } > + goto cleanup; I think I know the answer to my question from virFDStreamWrite now.. John > } > - } else if (fdst->length) { > - fdst->offset += ret; > } > > + if (fdst->length) > + fdst->offset += ret; > + > + cleanup: > virObjectUnlock(fdst); > return ret; > } > @@ -609,11 +897,19 @@ static int virFDStreamOpenInternal(virStreamPtr st, > st->privateData = fdst; > > if (threadData) { > + fdst->threadDoRead = threadData->doRead; > + > /* Create the thread after fdst and st were initialized. > * The thread worker expects them to be that way. */ > if (VIR_ALLOC(fdst->thread) < 0) > goto error; > > + if (virCondInit(&fdst->threadCond) < 0) { > + virReportSystemError(errno, "%s", > + _("cannot initialize condition variable")); > + goto error; > + } > + > if (virThreadCreate(fdst->thread, > true, > virFDStreamThread, > @@ -782,6 +1078,7 @@ virFDStreamOpenFileInternal(virStreamPtr st, > VIR_STRDUP(threadData->fdoutname, "pipe") < 0) > goto error; > tmpfd = pipefds[0]; > + threadData->doRead = true; > } else { > threadData->fdin = pipefds[0]; > threadData->fdout = fd; > @@ -789,6 +1086,7 @@ virFDStreamOpenFileInternal(virStreamPtr st, > VIR_STRDUP(threadData->fdoutname, path) < 0) > goto error; > tmpfd = pipefds[1]; > + threadData->doRead = false; > } > } > > -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list