On 05/16/2017 10:04 AM, Michal Privoznik wrote: > Basically, what is needed here is to introduce new message type > for the messages passed between the event loop callbacks and the > worker thread that does all the I/O. The idea is that instead of > a queue of read buffers we will have a queue where "hole of size > X" messages appear. That way the even loop callbacks can just s/even/event/ > check the head of the queue and see if the worker thread is in > data or a hole section and how long the section is. > > Signed-off-by: Michal Privoznik <mprivozn@xxxxxxxxxx> > --- > src/storage/storage_util.c | 4 +- > src/util/virfdstream.c | 239 ++++++++++++++++++++++++++++++++++++++++----- > src/util/virfdstream.h | 1 + > 3 files changed, 220 insertions(+), 24 deletions(-) > [...] > diff --git a/src/util/virfdstream.c b/src/util/virfdstream.c > index 4b42939e7..ba209025a 100644 > --- a/src/util/virfdstream.c > +++ b/src/util/virfdstream.c [...] > static ssize_t > virFDStreamThreadDoRead(virFDStreamDataPtr fdst, > + bool sparse, > const int fdin, > const int fdout, > const char *fdinname, > const char *fdoutname, > + size_t *dataLen, > size_t buflen) > { > virFDStreamMsgPtr msg = NULL; > + int inData = 0; > + long long sectionLen = 0; > char *buf = NULL; > ssize_t got; > > + if (sparse && *dataLen == 0) { > + if (virFileInData(fdin, &inData, §ionLen) < 0) > + goto error; > + > + if (inData) > + *dataLen = sectionLen; > + } > + > if (VIR_ALLOC(msg) < 0) > goto error; > > - if (VIR_ALLOC_N(buf, buflen) < 0) > - goto error; > - > - if ((got = saferead(fdin, buf, buflen)) < 0) { > - virReportSystemError(errno, > - _("Unable to read %s"), > - fdinname); > - goto error; > + if (sparse && *dataLen == 0) { > + msg->type = VIR_FDSTREAM_MSG_TYPE_HOLE; > + msg->stream.hole.len = sectionLen; > + got = sectionLen; > + > + /* HACK. The message queue is one directional. So caller HACK or "By design" > + * cannot make us skip the hole. Do that for them instead. */ > + if (sectionLen && > + lseek(fdin, sectionLen, SEEK_CUR) == (off_t) -1) { > + virReportSystemError(errno, > + _("unable to seek in %s"), > + fdinname); > + goto error; > + } [...] > +static int > +virFDStreamSendHole(virStreamPtr st, > + long long length, > + unsigned int flags) > +{ > + virFDStreamDataPtr fdst = st->privateData; > + virFDStreamMsgPtr msg = NULL; > + off_t off; > + int ret = -1; > + > + virCheckFlags(0, -1); > + > + virObjectLock(fdst); > + if (fdst->length) { > + if (length > fdst->length - fdst->offset) > + length = fdst->length - fdst->offset; > + fdst->offset += length; > + } > + > + if (fdst->thread) { > + /* Things are a bit complicated here. But bear with me. If FDStream is s/But bear with me.// > + * in a read mode, then if the message at the queue head is HOLE, just > + * pop it. The thread has lseek()-ed anyway. If however, the FDStream However, if the FDStream Reviewed-by: John Ferlan <jferlan@xxxxxxxxxx> John > + * is in write mode, then tell the thread to do the lseek() for us. > + * Under no circumstances we can do the lseek() ourselves here. We > + * might mess up file position for the thread. */ > + if (fdst->threadDoRead) { > + msg = fdst->msg; > + if (msg->type != VIR_FDSTREAM_MSG_TYPE_HOLE) { > + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", > + _("Invalid stream hole")); > + goto cleanup; > + } > + > + virFDStreamMsgQueuePop(fdst, fdst->fd, "pipe"); > + } else { > + if (VIR_ALLOC(msg) < 0) > + goto cleanup; > + > + msg->type = VIR_FDSTREAM_MSG_TYPE_HOLE; > + msg->stream.hole.len = length; > + virFDStreamMsgQueuePush(fdst, msg, fdst->fd, "pipe"); > + msg = NULL; > + } [...] -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list