On 04/20/2017 06:01 AM, Michal Privoznik wrote: > Currently we use iohelper for virFDStream implementation. This is > because UNIX I/O can lie sometimes: even though a FD for a > file/block device is set as unblocking, actual read()/write() can > block. To avoid this, a pipe is created and one end is kept for > read/write while the other is handed over to iohelper to > write/read the data for us. Thus it's iohelper which gets blocked > and not our event loop. > > This approach has two problems: > 1) we are spawning a new process. > 2) any exchange of information between daemon and iohelper can be > done only through the pipe. > > Therefore, iohelper is replaced with an implementation in thread > which is created just for the stream lifetime. The data are still > transferred through pipe (for now), but both problems described > above are solved. > > Signed-off-by: Michal Privoznik <mprivozn@xxxxxxxxxx> > --- > src/util/virfdstream.c | 245 +++++++++++++++++++++++++++++++------------------ > src/util/virfdstream.h | 1 - > 2 files changed, 158 insertions(+), 88 deletions(-) > > diff --git a/src/util/virfdstream.c b/src/util/virfdstream.c > index 9a4a7ff..7a8d65d 100644 > --- a/src/util/virfdstream.c > +++ b/src/util/virfdstream.c > @@ -56,8 +56,6 @@ struct virFDStreamData { > virObjectLockable parent; > > int fd; > - int errfd; > - virCommandPtr cmd; > unsigned long long offset; > unsigned long long length; > > @@ -79,6 +77,11 @@ struct virFDStreamData { > virFDStreamInternalCloseCb icbCb; > virFDStreamInternalCloseCbFreeOpaque icbFreeOpaque; > void *icbOpaque; > + > + /* Thread data */ > + virThreadPtr thread; > + int threadErr; > + bool threadQuit; > }; > > static virClassPtr virFDStreamDataClass; > @@ -264,57 +267,123 @@ virFDStreamAddCallback(virStreamPtr st, > return ret; > } > > + > +typedef struct _virFDStreamThreadData virFDStreamThreadData; > +typedef virFDStreamThreadData *virFDStreamThreadDataPtr; > +struct _virFDStreamThreadData { > + virStreamPtr st; > + size_t length; > + int fdin; > + char *fdinname; > + int fdout; > + char *fdoutname; > +}; > + > + > +static void > +virFDStreamThreadDataFree(virFDStreamThreadDataPtr data) > +{ > + if (!data) > + return; > + > + virObjectUnref(data->st); > + VIR_FREE(data->fdinname); > + VIR_FREE(data->fdoutname); > + VIR_FREE(data); > +} > + > + > +static void > +virFDStreamThread(void *opaque) > +{ > + virFDStreamThreadDataPtr data = opaque; > + virStreamPtr st = data->st; > + size_t length = data->length; > + int fdin = data->fdin; > + char *fdinname = data->fdinname; > + int fdout = data->fdout; > + char *fdoutname = data->fdoutname; > + virFDStreamDataPtr fdst = st->privateData; > + char *buf = NULL; > + size_t buflen = 256 * 1024; > + size_t total = 0; > + > + virObjectRef(fdst); > + > + if (VIR_ALLOC_N(buf, buflen) < 0) > + goto error; > + > + while (1) { > + ssize_t got; > + > + if (length && > + (length - total) < buflen) > + buflen = length - total; > + > + if (buflen == 0) > + break; /* End of requested data from client */ > + > + if ((got = saferead(fdin, buf, buflen)) < 0) { > + virReportSystemError(errno, > + _("Unable to read %s"), > + fdinname); > + goto error; > + } > + > + if (got == 0) > + break; > + > + total += got; > + > + if (safewrite(fdout, buf, got) < 0) { > + virReportSystemError(errno, > + _("Unable to write %s"), > + fdoutname); > + goto error; > + } > + } > + > + cleanup: > + if (!virObjectUnref(fdst)) > + st->privateData = NULL; > + VIR_FORCE_CLOSE(fdin); > + VIR_FORCE_CLOSE(fdout); > + virFDStreamThreadDataFree(data); > + VIR_FREE(buf); > + return; > + > + error: > + virObjectLock(fdst); > + fdst->threadErr = errno; > + virObjectUnlock(fdst); > + goto cleanup; > +} > + > + > static int > -virFDStreamCloseCommand(virFDStreamDataPtr fdst, bool streamAbort) > +virFDStreamJoinWorker(virFDStreamDataPtr fdst, bool streamAbort) Since you're touching - arguments should be on separate lines... > { > - char buf[1024]; > - ssize_t len; > - int status; > int ret = -1; > - > - if (!fdst->cmd) > + if (!fdst->thread) > return 0; > > - if ((len = saferead(fdst->errfd, buf, sizeof(buf)-1)) < 0) > - buf[0] = '\0'; > - else > - buf[len] = '\0'; > + /* Give the thread a chance to lock the FD stream object. */ > + virObjectUnlock(fdst); > + virThreadJoin(fdst->thread); > + virObjectLock(fdst); > > - virCommandRawStatus(fdst->cmd); > - if (virCommandWait(fdst->cmd, &status) < 0) > - goto cleanup; > - > - if (status != 0) { > - if (buf[0] != '\0') { > - virReportError(VIR_ERR_INTERNAL_ERROR, "%s", buf); > - } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGPIPE) { > - if (streamAbort) { > - /* Explicit abort request means the caller doesn't care > - if there's data left over, so skip the error */ > - goto out; > - } > - > - virReportError(VIR_ERR_INTERNAL_ERROR, "%s", > - _("I/O helper exited " > - "before all data was processed")); > - } else { > - char *str = virProcessTranslateStatus(status); > - virReportError(VIR_ERR_INTERNAL_ERROR, > - _("I/O helper exited with %s"), > - NULLSTR(str)); > - VIR_FREE(str); > - } > + if (fdst->threadErr && !streamAbort) { > + /* errors are expected on streamAbort */ > goto cleanup; > } > > - out: > ret = 0; > cleanup: > - virCommandFree(fdst->cmd); > - fdst->cmd = NULL; > + VIR_FREE(fdst->thread); > return ret; > } > > + Should I say anything? I'm fine with it... :-) > static int > virFDStreamCloseInt(virStreamPtr st, bool streamAbort) > { > @@ -359,12 +428,9 @@ virFDStreamCloseInt(virStreamPtr st, bool streamAbort) > > /* mutex locked */ > ret = VIR_CLOSE(fdst->fd); > - if (virFDStreamCloseCommand(fdst, streamAbort) < 0) > + if (virFDStreamJoinWorker(fdst, streamAbort) < 0) > ret = -1; > > - if (VIR_CLOSE(fdst->errfd) < 0) > - VIR_DEBUG("ignoring failed close on fd %d", fdst->errfd); > - > st->privateData = NULL; > > /* call the internal stream closing callback */ > @@ -516,14 +582,13 @@ static virStreamDriver virFDStreamDrv = { > > static int virFDStreamOpenInternal(virStreamPtr st, > int fd, > - virCommandPtr cmd, > - int errfd, > + virFDStreamThreadDataPtr threadData, > unsigned long long length) > { > virFDStreamDataPtr fdst; > > - VIR_DEBUG("st=%p fd=%d cmd=%p errfd=%d length=%llu", > - st, fd, cmd, errfd, length); > + VIR_DEBUG("st=%p fd=%d threadData=%p length=%llu", > + st, fd, threadData, length); > > if (virFDStreamDataInitialize() < 0) > return -1; > @@ -538,21 +603,39 @@ static int virFDStreamOpenInternal(virStreamPtr st, > return -1; > > fdst->fd = fd; > - fdst->cmd = cmd; > - fdst->errfd = errfd; > fdst->length = length; > > st->driver = &virFDStreamDrv; > st->privateData = fdst; > > + if (threadData) { > + /* Create the thread after fdst and st were initialized. > + * The thread worker expects them to be that way. */ > + if (VIR_ALLOC(fdst->thread) < 0) > + goto error; > + > + if (virThreadCreate(fdst->thread, > + true, > + virFDStreamThread, > + threadData) < 0) > + goto error; > + } > + > return 0; > + > + error: > + VIR_FREE(fdst->thread); > + st->driver = NULL; > + st->privateData = NULL; > + virObjectUnref(fdst); > + return -1; > } > > > int virFDStreamOpen(virStreamPtr st, > int fd) > { > - return virFDStreamOpenInternal(st, fd, NULL, -1, 0); > + return virFDStreamOpenInternal(st, fd, NULL, 0); > } > > > @@ -598,7 +681,7 @@ int virFDStreamConnectUNIX(virStreamPtr st, > goto error; > } > > - if (virFDStreamOpenInternal(st, fd, NULL, -1, 0) < 0) > + if (virFDStreamOpenInternal(st, fd, NULL, 0) < 0) > goto error; > return 0; > > @@ -627,11 +710,10 @@ virFDStreamOpenFileInternal(virStreamPtr st, > bool forceIOHelper) > { > int fd = -1; > - int childfd = -1; > + int pipefds[2] = { -1, -1 }; > + int tmpfd = -1; > struct stat sb; > - virCommandPtr cmd = NULL; > - int errfd = -1; > - char *iohelper_path = NULL; > + virFDStreamThreadDataPtr threadData = NULL; > > VIR_DEBUG("st=%p path=%s oflags=%x offset=%llu length=%llu mode=%o", > st, path, oflags, offset, length, mode); > @@ -648,6 +730,7 @@ virFDStreamOpenFileInternal(virStreamPtr st, > path); > return -1; > } > + tmpfd = fd; > > if (fstat(fd, &sb) < 0) { > virReportSystemError(errno, > @@ -672,7 +755,6 @@ virFDStreamOpenFileInternal(virStreamPtr st, > if ((st->flags & VIR_STREAM_NONBLOCK) && > ((!S_ISCHR(sb.st_mode) && > !S_ISFIFO(sb.st_mode)) || forceIOHelper)) { > - int fds[2] = { -1, -1 }; > > if ((oflags & O_ACCMODE) == O_RDWR) { > virReportError(VIR_ERR_INTERNAL_ERROR, > @@ -681,58 +763,47 @@ virFDStreamOpenFileInternal(virStreamPtr st, > goto error; > } > There's a comment above here indicating forking a helper that should change... ACK w/ a couple of minor adjustments, John > - if (pipe(fds) < 0) { > + if (pipe(pipefds) < 0) { > virReportSystemError(errno, "%s", > _("Unable to create pipe")); > goto error; > } > > - if (!(iohelper_path = virFileFindResource("libvirt_iohelper", > - abs_topbuilddir "/src", > - LIBEXECDIR))) > + if (VIR_ALLOC(threadData) < 0) > goto error; > > - cmd = virCommandNewArgList(iohelper_path, > - path, > - NULL); > - > - VIR_FREE(iohelper_path); > - > - virCommandAddArgFormat(cmd, "%llu", length); > - virCommandPassFD(cmd, fd, > - VIR_COMMAND_PASS_FD_CLOSE_PARENT); > - virCommandAddArgFormat(cmd, "%d", fd); > + threadData->st = virObjectRef(st); > + threadData->length = length; > > if ((oflags & O_ACCMODE) == O_RDONLY) { > - childfd = fds[1]; > - fd = fds[0]; > - virCommandSetOutputFD(cmd, &childfd); > + threadData->fdin = fd; > + threadData->fdout = pipefds[1]; > + if (VIR_STRDUP(threadData->fdinname, path) < 0 || > + VIR_STRDUP(threadData->fdoutname, "pipe") < 0) > + goto error; > + tmpfd = pipefds[0]; > } else { > - childfd = fds[0]; > - fd = fds[1]; > - virCommandSetInputFD(cmd, childfd); > + threadData->fdin = pipefds[0]; > + threadData->fdout = fd; > + if (VIR_STRDUP(threadData->fdinname, "pipe") < 0 || > + VIR_STRDUP(threadData->fdoutname, path) < 0) > + goto error; > + tmpfd = pipefds[1]; > } > - virCommandSetErrorFD(cmd, &errfd); > - > - if (virCommandRunAsync(cmd, NULL) < 0) > - goto error; > - > - VIR_FORCE_CLOSE(childfd); > } > > - if (virFDStreamOpenInternal(st, fd, cmd, errfd, length) < 0) > + if (virFDStreamOpenInternal(st, tmpfd, threadData, length) < 0) > goto error; > > return 0; > > error: > - virCommandFree(cmd); > VIR_FORCE_CLOSE(fd); > - VIR_FORCE_CLOSE(childfd); > - VIR_FORCE_CLOSE(errfd); > - VIR_FREE(iohelper_path); > + VIR_FORCE_CLOSE(pipefds[0]); > + VIR_FORCE_CLOSE(pipefds[1]); > if (oflags & O_CREAT) > unlink(path); > + virFDStreamThreadDataFree(threadData); > return -1; > } > > diff --git a/src/util/virfdstream.h b/src/util/virfdstream.h > index 32a741e..34c4c3f 100644 > --- a/src/util/virfdstream.h > +++ b/src/util/virfdstream.h > @@ -24,7 +24,6 @@ > # define __VIR_FDSTREAM_H_ > > # include "internal.h" > -# include "vircommand.h" > > /* internal callback, the generic one is used up by daemon stream driver */ > /* the close callback is called with fdstream private data locked */ > -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list