allow interleaved parallel write to a single file, using a record size equal to the io buffer size (1MB). Signed-off-by: Claudio Fontana <cfontana@xxxxxxx> --- src/util/iohelper.c | 3 + src/util/virfile.c | 151 +++++++++++++++++++++++++++++--------------- src/util/virfile.h | 2 + 3 files changed, 106 insertions(+), 50 deletions(-) diff --git a/src/util/iohelper.c b/src/util/iohelper.c index 055540c8c4..dcbdda366f 100644 --- a/src/util/iohelper.c +++ b/src/util/iohelper.c @@ -85,6 +85,9 @@ main(int argc, char **argv) if (fd < 0 || virFileDiskCopy(fd, path, -1, "stdio") < 0) goto error; + if (VIR_CLOSE(fd) < 0) + goto error; + return 0; error: diff --git a/src/util/virfile.c b/src/util/virfile.c index 201d7f4e64..f9ae7d94c4 100644 --- a/src/util/virfile.c +++ b/src/util/virfile.c @@ -4761,6 +4761,9 @@ struct runIOParams { const char *fdinname; int fdout; const char *fdoutname; + int idx; + int nchannels; + off_t total; }; /** @@ -4779,12 +4782,18 @@ runIOCopy(const struct runIOParams p) off_t total = 0; size_t buflen = 1024*1024; char *buf = virFileDirectBufferNew(&base, buflen); + int diskfd = p.isWrite ? p.fdout : p.fdin; if (!buf) { virReportSystemError(errno, _("Failed to allocate aligned memory in function %s"), __FUNCTION__); return -5; } - + if (p.idx >= 0) { + if (lseek(diskfd, p.idx * buflen, SEEK_CUR) < 0) { + virReportSystemError(errno, "%s", _("Failed to lseek to file channel offset")); + return -6; + } + } while (1) { ssize_t got; @@ -4808,7 +4817,12 @@ runIOCopy(const struct runIOParams p) break; total += got; - + if (p.idx >= 0 && !p.isWrite && total > p.total) { + /* do not write to socket too much for this channel, according to CLIA */ + off_t difference = total - p.total; + got -= difference; + total -= difference; + } /* handle last write size align in direct case */ if (got < buflen && p.isDirect && p.isWrite) { ssize_t nwritten = virFileDirectWrite(p.fdout, buf, got); @@ -4816,7 +4830,7 @@ runIOCopy(const struct runIOParams p) virReportSystemError(errno, _("Unable to write %s"), p.fdoutname); return -3; } - if (!p.isBlockDev) { + if (!p.isBlockDev && p.idx < 0) { off_t off = lseek(p.fdout, (off_t)0, SEEK_CUR); if (off < 0) { virReportSystemError(errno, "%s", _("Failed to lseek to get current file offset")); @@ -4824,6 +4838,7 @@ runIOCopy(const struct runIOParams p) } if (nwritten > got) { off -= nwritten - got; + total -= nwritten - got; } if (ftruncate(p.fdout, off) < 0) { virReportSystemError(errno, _("Unable to truncate %s"), p.fdoutname); @@ -4838,51 +4853,61 @@ runIOCopy(const struct runIOParams p) virReportSystemError(errno, _("Unable to write %s"), p.fdoutname); return -3; } + if (p.idx >= 0) { + if (!p.isWrite && total >= p.total) { + /* done for this channel */ + break; + } + /* move channel cursor to the next record */ + if (lseek(diskfd, buflen * (p.nchannels - 1), SEEK_CUR) < 0) { + virReportSystemError(errno, "%s", _("Failed to lseek to next channel record")); + return -7; + } + } } return total; } /** - * virFileDiskCopy: run IO to copy data between storage and a pipe or socket. - * - * @disk_fd: the already open regular file or block device - * @disk_path: the pathname corresponding to disk_fd (for error reporting) - * @remote_fd: the pipe or socket - * Use -1 to auto-choose between STDIN or STDOUT. - * @remote_path: the pathname corresponding to remote_fd (for error reporting) - * - * Note that the direction of the transfer is detected based on the @disk_fd - * file access mode (man 2 open). Therefore @disk_fd must be opened with - * O_RDONLY or O_WRONLY. O_RDWR is not supported. - * - * virFileDiskCopy always closes the file descriptor disk_fd, - * and any error during close(2) is reported and considered a failure. - * - * Returns: bytes transferred or < 0 on failure. + * virFileDiskCopyChannel: like virFileDiskCopy, channel interleaved read/write + * ... + * @idx: channel index + * @nchannels: total number of channels */ off_t -virFileDiskCopy(int disk_fd, const char *disk_path, int remote_fd, const char *remote_path) +virFileDiskCopyChannel(int disk_fd, const char *disk_path, int remote_fd, const char *remote_path, + int idx, int nchannels, off_t total) { - int ret = -1; - off_t total = 0; + off_t new_total = -1; struct stat sb; struct runIOParams p; int oflags = -1; + if ((nchannels == 0) || + (nchannels > 0 && idx >= nchannels) || + (nchannels > 0 && idx < 0) || + (nchannels < 0 && idx >= 0)) { + virReportSystemError(EINVAL, "%s", _("Invalid channel arguments")); + goto out; + } + p.idx = idx; + p.nchannels = nchannels; + p.total = total; + oflags = fcntl(disk_fd, F_GETFL); if (oflags < 0) { virReportSystemError(errno, _("unable to determine access mode of %s"), disk_path); - goto cleanup; + goto out; } if (fstat(disk_fd, &sb) < 0) { virReportSystemError(errno, _("unable to stat file descriptor %d path %s"), disk_fd, disk_path); - goto cleanup; + goto out; } p.isBlockDev = S_ISBLK(sb.st_mode); p.isDirect = O_DIRECT && (oflags & O_DIRECT); @@ -4906,53 +4931,79 @@ virFileDiskCopy(int disk_fd, const char *disk_path, int remote_fd, const char *r default: virReportSystemError(EINVAL, _("Unable to process file with flags %d"), (oflags & O_ACCMODE)); - goto cleanup; + goto out; } if (!p.isBlockDev && p.isDirect) { off_t off = lseek(disk_fd, 0, SEEK_CUR); if (off < 0) { virReportSystemError(errno, "%s", _("O_DIRECT needs a seekable file")); - goto cleanup; + goto out; } if (virFileDirectAlign(off) != off) { /* we could write some zeroes, but maybe it is safer to just fail */ virReportSystemError(EINVAL, "%s", _("O_DIRECT attempted with unaligned file pointer")); - goto cleanup; + goto out; } } - total = runIOCopy(p); - if (total < 0) - goto cleanup; - - /* Ensure all data is written */ - if (virFileDataSync(p.fdout) < 0) { - if (errno != EINVAL && errno != EROFS) { - /* fdatasync() may fail on some special FDs, e.g. pipes */ - virReportSystemError(errno, _("unable to fsync %s"), p.fdoutname); - goto cleanup; + new_total = runIOCopy(p); + if (new_total < 0) + goto out; + + if (p.idx < 0 && p.isWrite) { + /* without channels we can run the fdatasync here */ + if (virFileDataSync(disk_fd) < 0) { + if (errno != EINVAL && errno != EROFS) { + virReportSystemError(errno, _("unable to fsyncdata %s"), p.fdoutname); + new_total = -1; + goto out; + } } } - ret = 0; - - cleanup: - if (VIR_CLOSE(disk_fd) < 0 && ret == 0) { - virReportSystemError(errno, _("Unable to close %s"), disk_path); - ret = -1; - } - return ret; + out: + return new_total; } #else /* WIN32 */ off_t -virFileDiskCopy(int disk_fd G_GNUC_UNUSED, - const char *disk_path G_GNUC_UNUSED, - int remote_fd G_GNUC_UNUSED, - const char *remote_path G_GNUC_UNUSED) +virFileDiskCopyChannel(int disk_fd G_GNUC_UNUSED, + const char *disk_path G_GNUC_UNUSED, + int remote_fd G_GNUC_UNUSED, + const char *remote_path G_GNUC_UNUSED, + int idx G_GNUC_UNUSED, + int nchannels G_GNUC_UNUSED, + off_t total G_GNUC_UNUSED) { virReportError(VIR_ERR_INTERNAL_ERROR, "%s", - _("virFileDiskCopy unsupported on this platform")); + _("virFileDiskCopyChannel unsupported on this platform")); return -1; } #endif /* WIN32 */ + +/** + * virFileDiskCopy: run IO to copy data between storage and a pipe or socket. + * + * @disk_fd: the already open regular file or block device + * @disk_path: the pathname corresponding to disk_fd (for error reporting) + * @remote_fd: the pipe or socket + * Use -1 to auto-choose between STDIN or STDOUT. + * @remote_path: the pathname corresponding to remote_fd (for error reporting) + * + * Note that the direction of the transfer is detected based on the @disk_fd + * file access mode (man 2 open). Therefore @disk_fd must be opened with + * O_RDONLY or O_WRONLY. O_RDWR is not supported. + * + * virFileDiskCopy always closes the file descriptor disk_fd, + * and any error during close(2) is reported and considered a failure. + * + * Returns: bytes transferred or < 0 on failure. + */ + +off_t +virFileDiskCopy(int disk_fd, const char *disk_path, + int remote_fd, const char *remote_path) +{ + return virFileDiskCopyChannel(disk_fd, disk_path, remote_fd, remote_path, + -1, -1, 0); +} diff --git a/src/util/virfile.h b/src/util/virfile.h index 844261e0a4..4d75389c84 100644 --- a/src/util/virfile.h +++ b/src/util/virfile.h @@ -394,3 +394,5 @@ int virFileSetCOW(const char *path, virTristateBool state); off_t virFileDiskCopy(int disk_fd, const char *disk_path, int remote_fd, const char *remote_path); +off_t virFileDiskCopyChannel(int disk_fd, const char *disk_path, int remote_fd, const char *remote_path, + int idx, int nchannels, off_t total); -- 2.26.2