Implement virStreamSkip and virStreamInData callbacks. These callbacks do no magic, just skip a hole or detect whether we are in a data section of a file or in a hole and how much bytes can we read until section changes. Signed-off-by: Michal Privoznik <mprivozn@xxxxxxxxxx> --- src/storage/storage_util.c | 4 +- src/util/virfdstream.c | 234 +++++++++++++++++++++++++++++++++++++++++---- src/util/virfdstream.h | 1 + 3 files changed, 216 insertions(+), 23 deletions(-) diff --git a/src/storage/storage_util.c b/src/storage/storage_util.c index a2d89af..3576435 100644 --- a/src/storage/storage_util.c +++ b/src/storage/storage_util.c @@ -2427,7 +2427,7 @@ virStorageBackendVolUploadLocal(virConnectPtr conn ATTRIBUTE_UNUSED, /* Not using O_CREAT because the file is required to already exist at * this point */ ret = virFDStreamOpenBlockDevice(stream, target_path, - offset, len, O_WRONLY); + offset, len, false, O_WRONLY); cleanup: VIR_FREE(path); @@ -2465,7 +2465,7 @@ virStorageBackendVolDownloadLocal(virConnectPtr conn ATTRIBUTE_UNUSED, } ret = virFDStreamOpenBlockDevice(stream, target_path, - offset, len, O_RDONLY); + offset, len, false, O_RDONLY); cleanup: VIR_FREE(path); diff --git a/src/util/virfdstream.c b/src/util/virfdstream.c index efd9199..e9b5962 100644 --- a/src/util/virfdstream.c +++ b/src/util/virfdstream.c @@ -51,6 +51,7 @@ VIR_LOG_INIT("fdstream"); typedef enum { VIR_FDSTREAM_MSG_TYPE_DATA, + VIR_FDSTREAM_MSG_TYPE_SKIP, } virFDStreamMsgType; typedef struct _virFDStreamMsg virFDStreamMsg; @@ -66,6 +67,9 @@ struct _virFDStreamMsg { size_t len; size_t offset; } data; + struct { + size_t len; + } skip; } stream; }; @@ -175,6 +179,9 @@ virFDStreamMsgFree(virFDStreamMsgPtr msg) case VIR_FDSTREAM_MSG_TYPE_DATA: VIR_FREE(msg->stream.data.buf); break; + case VIR_FDSTREAM_MSG_TYPE_SKIP: + /* nada */ + break; } VIR_FREE(msg); @@ -361,6 +368,7 @@ typedef virFDStreamThreadData *virFDStreamThreadDataPtr; struct _virFDStreamThreadData { virStreamPtr st; size_t length; + bool sparse; int fdin; char *fdinname; int fdout; @@ -383,32 +391,66 @@ virFDStreamThreadDataFree(virFDStreamThreadDataPtr data) static ssize_t virFDStreamThreadDoRead(virFDStreamDataPtr fdst, + bool sparse, const int fdin, const char *fdinname, + size_t *dataLen, size_t buflen) { virFDStreamMsgPtr msg = NULL; + int inData = 0; + unsigned long long sectionLen = 0; char *buf = NULL; ssize_t got; + if (sparse && *dataLen == 0) { + if (virFileInData(fdin, &inData, §ionLen) < 0) + goto error; + + if (inData) + *dataLen = sectionLen; + } + if (VIR_ALLOC(msg) < 0) goto error; - if (VIR_ALLOC_N(buf, buflen) < 0) - goto error; - - if ((got = saferead(fdin, buf, buflen)) < 0) { - virReportSystemError(errno, - _("Unable to read %s"), - fdinname); - goto error; + if (sparse && *dataLen == 0) { + msg->type = VIR_FDSTREAM_MSG_TYPE_SKIP; + msg->stream.skip.len = sectionLen; + got = sectionLen; + + /* HACK. The message queue is one directional. So caller + * cannot make us skip the hole. Do that for them instead. */ + if (sectionLen && + lseek(fdin, sectionLen, SEEK_CUR) == (off_t) -1) { + virReportSystemError(errno, + _("unable to seek in %s"), + fdinname); + goto error; + } + } else { + if (sparse && + buflen > *dataLen) + buflen = *dataLen; + + if (VIR_ALLOC_N(buf, buflen) < 0) + goto error; + + if ((got = saferead(fdin, buf, buflen)) < 0) { + virReportSystemError(errno, + _("Unable to read %s"), + fdinname); + goto error; + } + + msg->type = VIR_FDSTREAM_MSG_TYPE_DATA; + msg->stream.data.buf = buf; + msg->stream.data.len = got; + buf = NULL; + if (sparse) + *dataLen -= got; } - msg->type = VIR_FDSTREAM_MSG_TYPE_DATA; - msg->stream.data.buf = buf; - msg->stream.data.len = got; - buf = NULL; - virFDStreamMsgQueuePush(fdst, msg); msg = NULL; @@ -423,11 +465,13 @@ virFDStreamThreadDoRead(virFDStreamDataPtr fdst, static ssize_t virFDStreamThreadDoWrite(virFDStreamDataPtr fdst, + bool sparse, const int fdout, const char *fdoutname) { ssize_t got; virFDStreamMsgPtr msg = fdst->msg; + off_t off; bool pop = false; switch (msg->type) { @@ -446,6 +490,32 @@ virFDStreamThreadDoWrite(virFDStreamDataPtr fdst, pop = msg->stream.data.offset == msg->stream.data.len; break; + + case VIR_FDSTREAM_MSG_TYPE_SKIP: + if (!sparse) { + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("unexpected stream skip")); + return -1; + } + + got = msg->stream.skip.len; + off = lseek(fdout, got, SEEK_CUR); + if (off == (off_t) -1) { + virReportSystemError(errno, + _("unable to seek in %s"), + fdoutname); + return -1; + } + + if (ftruncate(fdout, off) < 0) { + virReportSystemError(errno, + _("unable to truncate %s"), + fdoutname); + return -1; + } + + pop = true; + break; } if (pop) { @@ -463,6 +533,7 @@ virFDStreamThread(void *opaque) virFDStreamThreadDataPtr data = opaque; virStreamPtr st = data->st; size_t length = data->length; + bool sparse = data->sparse; int fdin = data->fdin; char *fdinname = data->fdinname; int fdout = data->fdout; @@ -471,6 +542,7 @@ virFDStreamThread(void *opaque) bool doRead = fdst->threadDoRead; size_t buflen = 256 * 1024; size_t total = 0; + size_t dataLen = 0; virObjectRef(fdst); virObjectLock(fdst); @@ -505,9 +577,9 @@ virFDStreamThread(void *opaque) } if (doRead) - got = virFDStreamThreadDoRead(fdst, fdin, fdinname, buflen); + got = virFDStreamThreadDoRead(fdst, sparse, fdin, fdinname, &dataLen, buflen); else - got = virFDStreamThreadDoWrite(fdst, fdout, fdoutname); + got = virFDStreamThreadDoWrite(fdst, sparse, fdout, fdoutname); if (got < 0) goto error; @@ -773,6 +845,14 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes) } } + /* Shortcut, if the stream is in the trailing hole, + * return 0 immediately. */ + if (msg->type == VIR_FDSTREAM_MSG_TYPE_SKIP && + msg->stream.skip.len == 0) { + ret = 0; + goto cleanup; + } + if (msg->type != VIR_FDSTREAM_MSG_TYPE_DATA) { /* Nope, nope, I'm outta here */ virReportError(VIR_ERR_INTERNAL_ERROR, "%s", @@ -823,11 +903,120 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes) } +static int +virFDStreamSkip(virStreamPtr st, + unsigned long long length) +{ + virFDStreamDataPtr fdst = st->privateData; + virFDStreamMsgPtr msg = NULL; + off_t off; + int ret = -1; + + virObjectLock(fdst); + if (fdst->length) { + if (length > fdst->length - fdst->offset) + length = fdst->length - fdst->offset; + fdst->offset += length; + } + + if (fdst->thread) { + /* Things are a bit complicated here. But bear with me. If FDStream is + * in a read mode, then if the message at the queue head is SKIP, just + * pop it. The thread has lseek()-ed anyway. If however, the FDStream + * is in write mode, then tell the thread to do the lseek() for us. + * Under no circumstances we can do the lseek() ourselves here. We + * might mess up file position for the thread. */ + if (fdst->threadDoRead) { + msg = fdst->msg; + if (msg->type != VIR_FDSTREAM_MSG_TYPE_SKIP) { + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("Invalid stream skip")); + goto cleanup; + } + + virFDStreamMsgQueuePop(fdst); + } else { + if (VIR_ALLOC(msg) < 0) + goto cleanup; + + msg->type = VIR_FDSTREAM_MSG_TYPE_SKIP; + msg->stream.skip.len = length; + virFDStreamMsgQueuePush(fdst, msg); + msg = NULL; + } + } else { + off = lseek(fdst->fd, length, SEEK_CUR); + if (off == (off_t) -1) { + virReportSystemError(errno, "%s", + _("unable to seek")); + goto cleanup; + } + + if (ftruncate(fdst->fd, off) < 0) { + virReportSystemError(errno, "%s", + _("unable to truncate")); + goto cleanup; + } + } + + ret = 0; + cleanup: + virObjectUnlock(fdst); + virFDStreamMsgFree(msg); + return ret; +} + + +static int +virFDStreamInData(virStreamPtr st, + int *inData, + unsigned long long *length) +{ + virFDStreamDataPtr fdst = st->privateData; + int ret = -1; + + virObjectLock(fdst); + + if (fdst->thread) { + virFDStreamMsgPtr msg; + + while (!(msg = fdst->msg)) { + if (fdst->threadQuit) { + *inData = *length = 0; + ret = 0; + goto cleanup; + } else { + virObjectUnlock(fdst); + virCondSignal(&fdst->threadCond); + virObjectLock(fdst); + } + } + + if (msg->type == VIR_FDSTREAM_MSG_TYPE_DATA) { + *inData = 1; + *length = msg->stream.data.len - msg->stream.data.offset; + } else { + *inData = 0; + *length = msg->stream.skip.len; + } + ret = 0; + } else { + ret = virFileInData(fdst->fd, inData, length); + } + + cleanup: + virObjectUnlock(fdst); + return ret; +} + + static virStreamDriver virFDStreamDrv = { .streamSend = virFDStreamWrite, .streamRecv = virFDStreamRead, .streamFinish = virFDStreamClose, .streamAbort = virFDStreamAbort, + .streamSkip = virFDStreamSkip, + .streamInData = virFDStreamInData, .streamEventAddCallback = virFDStreamAddCallback, .streamEventUpdateCallback = virFDStreamUpdateCallback, .streamEventRemoveCallback = virFDStreamRemoveCallback @@ -969,7 +1158,8 @@ virFDStreamOpenFileInternal(virStreamPtr st, unsigned long long length, int oflags, int mode, - bool forceIOHelper) + bool forceIOHelper, + bool sparse) { int fd = -1; struct stat sb; @@ -1026,6 +1216,7 @@ virFDStreamOpenFileInternal(virStreamPtr st, threadData->st = virObjectRef(st); threadData->length = length; + threadData->sparse = sparse; if ((oflags & O_ACCMODE) == O_RDONLY) { threadData->fdin = fd; @@ -1067,7 +1258,7 @@ int virFDStreamOpenFile(virStreamPtr st, } return virFDStreamOpenFileInternal(st, path, offset, length, - oflags, 0, false); + oflags, 0, false, false); } int virFDStreamCreateFile(virStreamPtr st, @@ -1080,7 +1271,7 @@ int virFDStreamCreateFile(virStreamPtr st, return virFDStreamOpenFileInternal(st, path, offset, length, oflags | O_CREAT, mode, - false); + false, false); } #ifdef HAVE_CFMAKERAW @@ -1096,7 +1287,7 @@ int virFDStreamOpenPTY(virStreamPtr st, if (virFDStreamOpenFileInternal(st, path, offset, length, oflags | O_CREAT, 0, - false) < 0) + false, false) < 0) return -1; fdst = st->privateData; @@ -1133,7 +1324,7 @@ int virFDStreamOpenPTY(virStreamPtr st, return virFDStreamOpenFileInternal(st, path, offset, length, oflags | O_CREAT, 0, - false); + false, false); } #endif /* !HAVE_CFMAKERAW */ @@ -1141,11 +1332,12 @@ int virFDStreamOpenBlockDevice(virStreamPtr st, const char *path, unsigned long long offset, unsigned long long length, + bool sparse, int oflags) { return virFDStreamOpenFileInternal(st, path, offset, length, - oflags, 0, true); + oflags, 0, true, sparse); } int virFDStreamSetInternalCloseCb(virStreamPtr st, diff --git a/src/util/virfdstream.h b/src/util/virfdstream.h index 34c4c3f..887c991 100644 --- a/src/util/virfdstream.h +++ b/src/util/virfdstream.h @@ -59,6 +59,7 @@ int virFDStreamOpenBlockDevice(virStreamPtr st, const char *path, unsigned long long offset, unsigned long long length, + bool sparse, int oflags); int virFDStreamSetInternalCloseCb(virStreamPtr st, -- 2.10.2 -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list