On 6/7/22 11:19, Claudio Fontana wrote: > allow interleaved parallel write to a single file, > using a record size equal to the io buffer size (1MB). > > Signed-off-by: Claudio Fontana <cfontana@xxxxxxx> > --- > src/util/iohelper.c | 3 + > src/util/virfile.c | 151 +++++++++++++++++++++++++++++--------------- > src/util/virfile.h | 2 + > 3 files changed, 106 insertions(+), 50 deletions(-) > > diff --git a/src/util/iohelper.c b/src/util/iohelper.c > index 055540c8c4..dcbdda366f 100644 > --- a/src/util/iohelper.c > +++ b/src/util/iohelper.c > @@ -85,6 +85,9 @@ main(int argc, char **argv) > if (fd < 0 || virFileDiskCopy(fd, path, -1, "stdio") < 0) > goto error; > > + if (VIR_CLOSE(fd) < 0) > + goto error; > + > return 0; > > error: > diff --git a/src/util/virfile.c b/src/util/virfile.c > index 201d7f4e64..f9ae7d94c4 100644 > --- a/src/util/virfile.c > +++ b/src/util/virfile.c > @@ -4761,6 +4761,9 @@ struct runIOParams { > const char *fdinname; > int fdout; > const char *fdoutname; > + int idx; > + int nchannels; > + off_t total; > }; > > /** > @@ -4779,12 +4782,18 @@ runIOCopy(const struct runIOParams p) > off_t total = 0; > size_t buflen = 1024*1024; > char *buf = virFileDirectBufferNew(&base, buflen); > + int diskfd = p.isWrite ? p.fdout : p.fdin; > > if (!buf) { > virReportSystemError(errno, _("Failed to allocate aligned memory in function %s"), __FUNCTION__); > return -5; > } > - > + if (p.idx >= 0) { > + if (lseek(diskfd, p.idx * buflen, SEEK_CUR) < 0) { > + virReportSystemError(errno, "%s", _("Failed to lseek to file channel offset")); > + return -6; > + } > + } > while (1) { > ssize_t got; > > @@ -4808,7 +4817,12 @@ runIOCopy(const struct runIOParams p) > break; > > total += got; > - > + if (p.idx >= 0 && !p.isWrite && total > p.total) { > + /* do not write to socket too much for this channel, according to CLIA */ > + off_t difference = total - p.total; > + got -= difference; > + total -= difference; > + } > /* handle last write size align in direct case */ > if (got < buflen && p.isDirect && p.isWrite) { > ssize_t nwritten = virFileDirectWrite(p.fdout, buf, got); > @@ -4816,7 +4830,7 @@ runIOCopy(const struct runIOParams p) > virReportSystemError(errno, _("Unable to write %s"), p.fdoutname); > return -3; > } > - if (!p.isBlockDev) { > + if (!p.isBlockDev && p.idx < 0) { > off_t off = lseek(p.fdout, (off_t)0, SEEK_CUR); > if (off < 0) { > virReportSystemError(errno, "%s", _("Failed to lseek to get current file offset")); > @@ -4824,6 +4838,7 @@ runIOCopy(const struct runIOParams p) > } > if (nwritten > got) { > off -= nwritten - got; > + total -= nwritten - got; > } > if (ftruncate(p.fdout, off) < 0) { > virReportSystemError(errno, _("Unable to truncate %s"), p.fdoutname); > @@ -4838,51 +4853,61 @@ runIOCopy(const struct runIOParams p) > virReportSystemError(errno, _("Unable to write %s"), p.fdoutname); > return -3; > } > + if (p.idx >= 0) { > + if (!p.isWrite && total >= p.total) { > + /* done for this channel */ > + break; > + } > + /* move channel cursor to the next record */ > + if (lseek(diskfd, buflen * (p.nchannels - 1), SEEK_CUR) < 0) { > + virReportSystemError(errno, "%s", _("Failed to lseek to next channel record")); > + return -7; > + } > + } > } > return total; > } > > /** > - * virFileDiskCopy: run IO to copy data between storage and a pipe or socket. > - * > - * @disk_fd: the already open regular file or block device > - * @disk_path: the pathname corresponding to disk_fd (for error reporting) > - * @remote_fd: the pipe or socket > - * Use -1 to auto-choose between STDIN or STDOUT. > - * @remote_path: the pathname corresponding to remote_fd (for error reporting) > - * > - * Note that the direction of the transfer is detected based on the @disk_fd > - * file access mode (man 2 open). Therefore @disk_fd must be opened with > - * O_RDONLY or O_WRONLY. O_RDWR is not supported. > - * > - * virFileDiskCopy always closes the file descriptor disk_fd, > - * and any error during close(2) is reported and considered a failure. > - * > - * Returns: bytes transferred or < 0 on failure. > + * virFileDiskCopyChannel: like virFileDiskCopy, channel interleaved read/write > + * ... > + * @idx: channel index > + * @nchannels: total number of channels > */ > > off_t > -virFileDiskCopy(int disk_fd, const char *disk_path, int remote_fd, const char *remote_path) > +virFileDiskCopyChannel(int disk_fd, const char *disk_path, int remote_fd, const char *remote_path, > + int idx, int nchannels, off_t total) > { > - int ret = -1; > - off_t total = 0; > + off_t new_total = -1; > struct stat sb; > struct runIOParams p; > int oflags = -1; > > + if ((nchannels == 0) || > + (nchannels > 0 && idx >= nchannels) || > + (nchannels > 0 && idx < 0) || > + (nchannels < 0 && idx >= 0)) { > + virReportSystemError(EINVAL, "%s", _("Invalid channel arguments")); > + goto out; > + } > + p.idx = idx; > + p.nchannels = nchannels; > + p.total = total; > + > oflags = fcntl(disk_fd, F_GETFL); > > if (oflags < 0) { > virReportSystemError(errno, > _("unable to determine access mode of %s"), > disk_path); > - goto cleanup; > + goto out; > } > if (fstat(disk_fd, &sb) < 0) { > virReportSystemError(errno, > _("unable to stat file descriptor %d path %s"), > disk_fd, disk_path); > - goto cleanup; > + goto out; > } > p.isBlockDev = S_ISBLK(sb.st_mode); > p.isDirect = O_DIRECT && (oflags & O_DIRECT); > @@ -4906,53 +4931,79 @@ virFileDiskCopy(int disk_fd, const char *disk_path, int remote_fd, const char *r > default: > virReportSystemError(EINVAL, _("Unable to process file with flags %d"), > (oflags & O_ACCMODE)); > - goto cleanup; > + goto out; > } > if (!p.isBlockDev && p.isDirect) { > off_t off = lseek(disk_fd, 0, SEEK_CUR); > if (off < 0) { > virReportSystemError(errno, "%s", _("O_DIRECT needs a seekable file")); > - goto cleanup; > + goto out; > } > if (virFileDirectAlign(off) != off) { > /* we could write some zeroes, but maybe it is safer to just fail */ > virReportSystemError(EINVAL, "%s", _("O_DIRECT attempted with unaligned file pointer")); > - goto cleanup; > + goto out; > } > } > - total = runIOCopy(p); > - if (total < 0) > - goto cleanup; > - > - /* Ensure all data is written */ > - if (virFileDataSync(p.fdout) < 0) { > - if (errno != EINVAL && errno != EROFS) { > - /* fdatasync() may fail on some special FDs, e.g. pipes */ > - virReportSystemError(errno, _("unable to fsync %s"), p.fdoutname); > - goto cleanup; > + new_total = runIOCopy(p); > + if (new_total < 0) > + goto out; > + > + if (p.idx < 0 && p.isWrite) { > + /* without channels we can run the fdatasync here */ > + if (virFileDataSync(disk_fd) < 0) { > + if (errno != EINVAL && errno != EROFS) { > + virReportSystemError(errno, _("unable to fsyncdata %s"), p.fdoutname); > + new_total = -1; > + goto out; > + } > } > } > > - ret = 0; > - > - cleanup: > - if (VIR_CLOSE(disk_fd) < 0 && ret == 0) { > - virReportSystemError(errno, _("Unable to close %s"), disk_path); > - ret = -1; > - } > - return ret; > + out: > + return new_total; > } > > #else /* WIN32 */ > > off_t > -virFileDiskCopy(int disk_fd G_GNUC_UNUSED, > - const char *disk_path G_GNUC_UNUSED, > - int remote_fd G_GNUC_UNUSED, > - const char *remote_path G_GNUC_UNUSED) > +virFileDiskCopyChannel(int disk_fd G_GNUC_UNUSED, > + const char *disk_path G_GNUC_UNUSED, > + int remote_fd G_GNUC_UNUSED, > + const char *remote_path G_GNUC_UNUSED, > + int idx G_GNUC_UNUSED, > + int nchannels G_GNUC_UNUSED, > + off_t total G_GNUC_UNUSED) > { > virReportError(VIR_ERR_INTERNAL_ERROR, "%s", > - _("virFileDiskCopy unsupported on this platform")); > + _("virFileDiskCopyChannel unsupported on this platform")); > return -1; > } > #endif /* WIN32 */ > + > +/** > + * virFileDiskCopy: run IO to copy data between storage and a pipe or socket. > + * > + * @disk_fd: the already open regular file or block device > + * @disk_path: the pathname corresponding to disk_fd (for error reporting) > + * @remote_fd: the pipe or socket > + * Use -1 to auto-choose between STDIN or STDOUT. > + * @remote_path: the pathname corresponding to remote_fd (for error reporting) > + * > + * Note that the direction of the transfer is detected based on the @disk_fd > + * file access mode (man 2 open). Therefore @disk_fd must be opened with > + * O_RDONLY or O_WRONLY. O_RDWR is not supported. > + * > + * virFileDiskCopy always closes the file descriptor disk_fd, > + * and any error during close(2) is reported and considered a failure. this is not true anymore, the close needs to be done outside of virFileDiskCopy now. > + * > + * Returns: bytes transferred or < 0 on failure. > + */ > + > +off_t > +virFileDiskCopy(int disk_fd, const char *disk_path, > + int remote_fd, const char *remote_path) > +{ > + return virFileDiskCopyChannel(disk_fd, disk_path, remote_fd, remote_path, > + -1, -1, 0); > +} > diff --git a/src/util/virfile.h b/src/util/virfile.h > index 844261e0a4..4d75389c84 100644 > --- a/src/util/virfile.h > +++ b/src/util/virfile.h > @@ -394,3 +394,5 @@ int virFileSetCOW(const char *path, > virTristateBool state); > > off_t virFileDiskCopy(int disk_fd, const char *disk_path, int remote_fd, const char *remote_path); > +off_t virFileDiskCopyChannel(int disk_fd, const char *disk_path, int remote_fd, const char *remote_path, > + int idx, int nchannels, off_t total);