On 04/27/2017 07:48 PM, Daniel P. Berrange wrote: > On Thu, Apr 20, 2017 at 12:01:35PM +0200, Michal Privoznik wrote: >> One big downside of using the pipe to transfer the data is that >> we can really transfer just bare data. No metadata can be carried >> through unless some formatted messages are introduced. That would >> be quite painful to achieve so let's use a message queue. It's >> fairly easy to exchange info between threads now that iohelper is >> no longer used. >> >> The reason why we cannot use the FD for plain files directly is >> that despite us setting noblock flag on the FD, any >> read()/write() blocks regardless (which is a show stopper since >> those parts of the code are run from the event loop) and poll() >> reports such FD as always readable/writable - even though the >> subsequent operation might block. >> >> The pipe is still not gone though. It is used to signal to even >> loop that an event occurred (e.g. data are available for reading >> in the queue, or vice versa). >> >> Signed-off-by: Michal Privoznik <mprivozn@xxxxxxxxxx> >> --- >> src/util/virfdstream.c | 402 ++++++++++++++++++++++++++++++++++++++++++------- >> 1 file changed, 350 insertions(+), 52 deletions(-) >> > >> +static int >> +virFDStreamMsgQueuePush(virFDStreamDataPtr fdst, >> + virFDStreamMsgPtr msg, >> + int fd, >> + const char *fdname) >> +{ >> + virFDStreamMsgPtr *tmp = &fdst->msg; >> + char c = '1'; >> + >> + while (*tmp) >> + tmp = &(*tmp)->next; >> + >> + *tmp = msg; >> + virCondSignal(&fdst->threadCond); >> + >> + if (safewrite(fd, &c, sizeof(c)) != sizeof(c)) { >> + virReportSystemError(errno, >> + _("Unable to write to %s"), >> + fdname); >> + return -1; >> + } >> + >> + return 0; >> +} >> + >> + >> +static virFDStreamMsgPtr >> +virFDStreamMsgQueuePop(virFDStreamDataPtr fdst, >> + int fd, >> + const char *fdname) >> +{ >> + virFDStreamMsgPtr tmp = fdst->msg; >> + char c; >> + >> + if (tmp) { >> + fdst->msg = tmp->next; >> + tmp->next = NULL; >> + } >> + >> + virCondSignal(&fdst->threadCond); >> + >> + if (saferead(fd, &c, sizeof(c)) != sizeof(c)) { >> + virReportSystemError(errno, >> + _("Unable to read from %s"), >> + fdname); >> + return NULL; >> + } >> + >> + return tmp; >> +} > > Both these methods signal the condition > > > >> +static ssize_t >> +virFDStreamThreadDoRead(virFDStreamDataPtr fdst, >> + const int fdin, >> + const int fdout, >> + const char *fdinname, >> + const char *fdoutname, >> + size_t buflen) >> +{ >> + virFDStreamMsgPtr msg = NULL; >> + char *buf = NULL; >> + ssize_t got; >> + >> + if (VIR_ALLOC(msg) < 0) >> + goto error; >> + >> + if (VIR_ALLOC_N(buf, buflen) < 0) >> + goto error; >> + >> + if ((got = saferead(fdin, buf, buflen)) < 0) { >> + virReportSystemError(errno, >> + _("Unable to read %s"), >> + fdinname); >> + goto error; >> + } >> + >> + msg->type = VIR_FDSTREAM_MSG_TYPE_DATA; >> + msg->stream.data.buf = buf; >> + msg->stream.data.len = got; >> + buf = NULL; >> + >> + virFDStreamMsgQueuePush(fdst, msg, fdout, fdoutname); >> + msg = NULL; >> + >> + return got; >> + >> + error: >> + VIR_FREE(buf); >> + virFDStreamMsgFree(msg); >> + return -1; >> +} >> + >> + >> +static ssize_t >> +virFDStreamThreadDoWrite(virFDStreamDataPtr fdst, >> + const int fdin, >> + const int fdout, >> + const char *fdinname, >> + const char *fdoutname) >> +{ >> + ssize_t got; >> + virFDStreamMsgPtr msg = fdst->msg; >> + bool pop = false; >> + >> + switch (msg->type) { >> + case VIR_FDSTREAM_MSG_TYPE_DATA: >> + got = safewrite(fdout, >> + msg->stream.data.buf + msg->stream.data.offset, >> + msg->stream.data.len - msg->stream.data.offset); >> + if (got < 0) { >> + virReportSystemError(errno, >> + _("Unable to write %s"), >> + fdoutname); >> + return -1; >> + } >> + >> + msg->stream.data.offset += got; >> + >> + pop = msg->stream.data.offset == msg->stream.data.len; >> + break; >> + } >> + >> + if (pop) { >> + virFDStreamMsgQueuePop(fdst, fdin, fdinname); >> + virFDStreamMsgFree(msg); >> + } >> + >> + return got; >> +} > > Both these methods call into the Pop/Push functions which > signal the condition. > >> @@ -304,14 +496,12 @@ virFDStreamThread(void *opaque) >> int fdout = data->fdout; >> char *fdoutname = data->fdoutname; >> virFDStreamDataPtr fdst = st->privateData; >> - char *buf = NULL; >> + bool doRead = fdst->threadDoRead; >> size_t buflen = 256 * 1024; >> size_t total = 0; >> >> virObjectRef(fdst); >> - >> - if (VIR_ALLOC_N(buf, buflen) < 0) >> - goto error; >> + virObjectLock(fdst); >> >> while (1) { >> ssize_t got; >> @@ -323,39 +513,56 @@ virFDStreamThread(void *opaque) >> if (buflen == 0) >> break; /* End of requested data from client */ >> >> - if ((got = saferead(fdin, buf, buflen)) < 0) { >> - virReportSystemError(errno, >> - _("Unable to read %s"), >> - fdinname); >> + while (doRead == (fdst->msg != NULL) && >> + !fdst->threadQuit) { >> + if (virCondWait(&fdst->threadCond, &fdst->parent.lock)) { >> + virReportSystemError(errno, "%s", >> + _("failed to wait on condition")); >> + goto error; >> + } >> + } >> + >> + if (fdst->threadQuit) { >> + /* If stream abort was requested, quit early. */ >> + if (fdst->threadAbort) >> + goto cleanup; >> + >> + /* Otherwise flush buffers and quit gracefully. */ >> + if (doRead == (fdst->msg != NULL)) >> + break; >> + } >> + >> + if (doRead) >> + got = virFDStreamThreadDoRead(fdst, >> + fdin, fdout, >> + fdinname, fdoutname, >> + buflen); >> + else >> + got = virFDStreamThreadDoWrite(fdst, >> + fdin, fdout, >> + fdinname, fdoutname); >> + >> + if (got < 0) > > > This method waits on the condition. > > So unless I'm mistaken, this thread is signaling & waiting on > the same condition, which feels wrong. Generally different > threads would signal vs wait. > I hear what you're saying but I don't think this is a problem. Exactly because of the way we wait on the condition. Assume this thread is doing reads. That is it reads from fdin (an actual file on a disk) and feeds the message queue with the data. Now, it reads some data, push it at the end of the message queue (fdst->msg != NULL at that point) and goes to the while loop above. Effectively it eats all the spurious wake ups for as long as fdst->msg != NULL. Now assume the thread is doing writes (read data from the queue and writes into a file a disk). Again, as long as there are some messages to be written (that is fdst->msg != NULL) the control won't even reach virCondWait. And if it does, it's because fdst->msg == NULL in which case there is no data to be written. But something has just came up to my mind whilst writing these lines - I wonder if we can ditch the condition entirely and rely on the pipe + poll(). I mean, this worker would use pipe to signalize to the event loop that there is a message waiting for it in the queue. Question is how thread safe this approach would be. Michal -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list