On Thu, Apr 20, 2017 at 12:01:35PM +0200, Michal Privoznik wrote: > One big downside of using the pipe to transfer the data is that > we can really transfer just bare data. No metadata can be carried > through unless some formatted messages are introduced. That would > be quite painful to achieve so let's use a message queue. It's > fairly easy to exchange info between threads now that iohelper is > no longer used. > > The reason why we cannot use the FD for plain files directly is > that despite us setting noblock flag on the FD, any > read()/write() blocks regardless (which is a show stopper since > those parts of the code are run from the event loop) and poll() > reports such FD as always readable/writable - even though the > subsequent operation might block. > > The pipe is still not gone though. It is used to signal to even > loop that an event occurred (e.g. data are available for reading > in the queue, or vice versa). > > Signed-off-by: Michal Privoznik <mprivozn@xxxxxxxxxx> > --- > src/util/virfdstream.c | 402 ++++++++++++++++++++++++++++++++++++++++++------- > 1 file changed, 350 insertions(+), 52 deletions(-) > > +static int > +virFDStreamMsgQueuePush(virFDStreamDataPtr fdst, > + virFDStreamMsgPtr msg, > + int fd, > + const char *fdname) > +{ > + virFDStreamMsgPtr *tmp = &fdst->msg; > + char c = '1'; > + > + while (*tmp) > + tmp = &(*tmp)->next; > + > + *tmp = msg; > + virCondSignal(&fdst->threadCond); > + > + if (safewrite(fd, &c, sizeof(c)) != sizeof(c)) { > + virReportSystemError(errno, > + _("Unable to write to %s"), > + fdname); > + return -1; > + } > + > + return 0; > +} > + > + > +static virFDStreamMsgPtr > +virFDStreamMsgQueuePop(virFDStreamDataPtr fdst, > + int fd, > + const char *fdname) > +{ > + virFDStreamMsgPtr tmp = fdst->msg; > + char c; > + > + if (tmp) { > + fdst->msg = tmp->next; > + tmp->next = NULL; > + } > + > + virCondSignal(&fdst->threadCond); > + > + if (saferead(fd, &c, sizeof(c)) != sizeof(c)) { > + virReportSystemError(errno, > + _("Unable to read from %s"), > + fdname); > + return NULL; > + } > + > + return tmp; > +} Both these methods signal the condition > +static ssize_t > +virFDStreamThreadDoRead(virFDStreamDataPtr fdst, > + const int fdin, > + const int fdout, > + const char *fdinname, > + const char *fdoutname, > + size_t buflen) > +{ > + virFDStreamMsgPtr msg = NULL; > + char *buf = NULL; > + ssize_t got; > + > + if (VIR_ALLOC(msg) < 0) > + goto error; > + > + if (VIR_ALLOC_N(buf, buflen) < 0) > + goto error; > + > + if ((got = saferead(fdin, buf, buflen)) < 0) { > + virReportSystemError(errno, > + _("Unable to read %s"), > + fdinname); > + goto error; > + } > + > + msg->type = VIR_FDSTREAM_MSG_TYPE_DATA; > + msg->stream.data.buf = buf; > + msg->stream.data.len = got; > + buf = NULL; > + > + virFDStreamMsgQueuePush(fdst, msg, fdout, fdoutname); > + msg = NULL; > + > + return got; > + > + error: > + VIR_FREE(buf); > + virFDStreamMsgFree(msg); > + return -1; > +} > + > + > +static ssize_t > +virFDStreamThreadDoWrite(virFDStreamDataPtr fdst, > + const int fdin, > + const int fdout, > + const char *fdinname, > + const char *fdoutname) > +{ > + ssize_t got; > + virFDStreamMsgPtr msg = fdst->msg; > + bool pop = false; > + > + switch (msg->type) { > + case VIR_FDSTREAM_MSG_TYPE_DATA: > + got = safewrite(fdout, > + msg->stream.data.buf + msg->stream.data.offset, > + msg->stream.data.len - msg->stream.data.offset); > + if (got < 0) { > + virReportSystemError(errno, > + _("Unable to write %s"), > + fdoutname); > + return -1; > + } > + > + msg->stream.data.offset += got; > + > + pop = msg->stream.data.offset == msg->stream.data.len; > + break; > + } > + > + if (pop) { > + virFDStreamMsgQueuePop(fdst, fdin, fdinname); > + virFDStreamMsgFree(msg); > + } > + > + return got; > +} Both these methods call into the Pop/Push functions which signal the condition. > @@ -304,14 +496,12 @@ virFDStreamThread(void *opaque) > int fdout = data->fdout; > char *fdoutname = data->fdoutname; > virFDStreamDataPtr fdst = st->privateData; > - char *buf = NULL; > + bool doRead = fdst->threadDoRead; > size_t buflen = 256 * 1024; > size_t total = 0; > > virObjectRef(fdst); > - > - if (VIR_ALLOC_N(buf, buflen) < 0) > - goto error; > + virObjectLock(fdst); > > while (1) { > ssize_t got; > @@ -323,39 +513,56 @@ virFDStreamThread(void *opaque) > if (buflen == 0) > break; /* End of requested data from client */ > > - if ((got = saferead(fdin, buf, buflen)) < 0) { > - virReportSystemError(errno, > - _("Unable to read %s"), > - fdinname); > + while (doRead == (fdst->msg != NULL) && > + !fdst->threadQuit) { > + if (virCondWait(&fdst->threadCond, &fdst->parent.lock)) { > + virReportSystemError(errno, "%s", > + _("failed to wait on condition")); > + goto error; > + } > + } > + > + if (fdst->threadQuit) { > + /* If stream abort was requested, quit early. */ > + if (fdst->threadAbort) > + goto cleanup; > + > + /* Otherwise flush buffers and quit gracefully. */ > + if (doRead == (fdst->msg != NULL)) > + break; > + } > + > + if (doRead) > + got = virFDStreamThreadDoRead(fdst, > + fdin, fdout, > + fdinname, fdoutname, > + buflen); > + else > + got = virFDStreamThreadDoWrite(fdst, > + fdin, fdout, > + fdinname, fdoutname); > + > + if (got < 0) This method waits on the condition. So unless I'm mistaken, this thread is signaling & waiting on the same condition, which feels wrong. Generally different threads would signal vs wait. Regards, Daniel -- |: https://berrange.com -o- https://www.flickr.com/photos/dberrange :| |: https://libvirt.org -o- https://fstop138.berrange.com :| |: https://entangle-photo.org -o- https://www.instagram.com/dberrange :| -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list