On 04/20/2017 06:01 AM, Michal Privoznik wrote: > This is just a wrapper over new functions that have been just > introduced: virStreamRecvFlags(), virStreamHoleSize(). It's very > similar to virStreamRecvAll() except it handles sparse streams > well. You could have some API name adjustments not only in the commit message but also within code. > > Signed-off-by: Michal Privoznik <mprivozn@xxxxxxxxxx> > --- > include/libvirt/libvirt-stream.h | 28 ++++++++- > src/libvirt-stream.c | 119 +++++++++++++++++++++++++++++++++++++++ > src/libvirt_public.syms | 1 + > 3 files changed, 145 insertions(+), 3 deletions(-) > > diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h > index 23fcc26..e5f5126 100644 > --- a/include/libvirt/libvirt-stream.h > +++ b/include/libvirt/libvirt-stream.h > @@ -102,9 +102,9 @@ int virStreamSendAll(virStreamPtr st, > * @nbytes: size of the data array > * @opaque: optional application provided data > * > - * The virStreamSinkFunc callback is used together > - * with the virStreamRecvAll function for libvirt to > - * provide the data that has been received. > + * The virStreamSinkFunc callback is used together with the > + * virStreamRecvAll or virStreamSparseRecvAll functions for > + * libvirt to provide the data that has been received. > * > * The callback will be invoked multiple times, > * providing data in small chunks. The application > @@ -127,6 +127,28 @@ int virStreamRecvAll(virStreamPtr st, > virStreamSinkFunc handler, > void *opaque); > > +/** > + * virStreamSinkHoleFunc: > + * @st: the stream object > + * @length: stream hole size > + * @opaque: optional application provided data > + * > + * This callback is used together with the virStreamSparseRecvAll > + * function for libvirt to provide the size of a hole that > + * occurred in the stream. The callback may be invoked multiple times as holes are found during processing a stream. The application should create the hole in the stream target and then return. A return value of -1 at any time will abort the receive operation. > + * > + * Returns 0 on success, > + * -1 upon error > + */ > +typedef int (*virStreamSinkHoleFunc)(virStreamPtr st, > + unsigned long long length, > + void *opaque); > + > +int virStreamSparseRecvAll(virStreamPtr stream, > + virStreamSinkFunc handler, > + virStreamSinkHoleFunc holeHandler, > + void *opaque); > + > typedef enum { > VIR_STREAM_EVENT_READABLE = (1 << 0), > VIR_STREAM_EVENT_WRITABLE = (1 << 1), > diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c > index 1162d33..81190cc 100644 > --- a/src/libvirt-stream.c > +++ b/src/libvirt-stream.c > @@ -660,6 +660,125 @@ virStreamRecvAll(virStreamPtr stream, > > > /** > + * virStreamSparseRecvAll: > + * @stream: pointer to the stream object > + * @handler: sink callback for writing data to application > + * @holeHandler: stream hole callback for skipping holes > + * @opaque: application defined data > + * > + * Receive the entire data stream, sending the data to the > + * requested data sink. This is simply a convenient alternative s/sink./sink @handler and calling the skip @holeHandler to generate holes for sparse stream targets. > + * to virStreamRecv, for apps that do blocking-I/O. s/virStreamRecv/virStreamRecvAll/ > + * > + * An example using this with a hypothetical file download > + * API looks like: > + * > + * int mysink(virStreamPtr st, const char *buf, int nbytes, void *opaque) { > + * int *fd = opaque; > + * > + * return write(*fd, buf, nbytes); > + * } > + * > + * int myskip(virStreamPtr st, unsigned long long offset, void *opaque) { > + * int *fd = opaque; > + * > + * return lseek(*fd, offset, SEEK_CUR) == (off_t) -1 ? -1 : 0; > + * } > + * > + * virStreamPtr st = virStreamNew(conn, 0); > + * int fd = open("demo.iso", O_WRONLY); > + * > + * virConnectDownloadSparseFile(conn, st); > + * if (virStreamSparseRecvAll(st, mysink, myskip, &fd) < 0) { > + * ...report an error ... > + * goto done; > + * } > + * if (virStreamFinish(st) < 0) > + * ...report an error... > + * virStreamFree(st); > + * close(fd); > + * > + * Note that @opaque data is shared between both @handler and > + * @holeHandler callbacks. > + * > + * Returns 0 if all the data was successfully received. The caller > + * should invoke virStreamFinish(st) to flush the stream upon > + * success and then virStreamFree s/Free/Free(). > + * > + * Returns -1 upon any error, with virStreamAbort() already > + * having been called, so the caller need only call > + * virStreamFree() s/, so/, so/ s/Free()/Free()./ (e.g. lose the extra space and add a period) Could virStreamFree(). go on the previous line? > + */ > +int > +virStreamSparseRecvAll(virStreamPtr stream, > + virStreamSinkFunc handler, > + virStreamSinkHoleFunc holeHandler, > + void *opaque) > +{ > + char *bytes = NULL; > + size_t want = VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX; > + const unsigned int flags = VIR_STREAM_RECV_STOP_AT_HOLE; > + int ret = -1; > + > + VIR_DEBUG("stream=%p handler=%p holeHandler=%p opaque=%p", > + stream, handler, holeHandler, opaque); > + > + virResetLastError(); > + > + virCheckStreamReturn(stream, -1); > + virCheckNonNullArgGoto(handler, cleanup); > + virCheckNonNullArgGoto(holeHandler, cleanup); > + > + if (stream->flags & VIR_STREAM_NONBLOCK) { > + virReportError(VIR_ERR_OPERATION_INVALID, "%s", > + _("data sinks cannot be used for non-blocking streams")); > + goto cleanup; > + } > + > + if (VIR_ALLOC_N(bytes, want) < 0) > + goto cleanup; > + > + for (;;) { > + int got, offset = 0; > + unsigned long long holeLen; > + got = virStreamRecvFlags(stream, bytes, want, flags); > + if (got == -3) { > + if (virStreamHoleSize(stream, &holeLen) < 0) { > + virStreamAbort(stream); > + goto cleanup; > + } > + > + if (holeHandler(stream, holeLen, opaque) < 0) { > + virStreamAbort(stream); > + goto cleanup; > + } We could continue; here, right? Although, I suppose (0 < -3) would fail anyway, but if someone changed that someday and didn't read this part... An ACK ends up being a bit dependent upon previous patches... What's here seems fine to me though. John > + } else if (got < 0) { > + goto cleanup; > + } else if (got == 0) { > + break; > + } > + while (offset < got) { > + int done; > + done = (handler)(stream, bytes + offset, got - offset, opaque); > + if (done < 0) { > + virStreamAbort(stream); > + goto cleanup; > + } > + offset += done; > + } > + } > + ret = 0; > + > + cleanup: > + VIR_FREE(bytes); > + > + if (ret != 0) > + virDispatchError(stream->conn); > + > + return ret; > +} > + > +/** > * virStreamEventAddCallback: > * @stream: pointer to the stream object > * @events: set of events to monitor > diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms > index 0e34eee..008dc59 100644 > --- a/src/libvirt_public.syms > +++ b/src/libvirt_public.syms > @@ -764,6 +764,7 @@ LIBVIRT_3.3.0 { > virStreamHoleSize; > virStreamRecvFlags; > virStreamSkip; > + virStreamSparseRecvAll; > } LIBVIRT_3.1.0; > > # .... define new API here using predicted next version number .... > -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list