On Mon, 2016-02-01 at 14:49 +0000, Daniel P. Berrange wrote: > On Fri, Jan 29, 2016 at 02:26:53PM +0100, Michal Privoznik wrote: > > When dealing with sparse files we need to be able to jump over > > holes as there's no added value in reading/writing them. For > > that, we need new set of send and receive APIs that will have > > @offset argument. Sending data to a stream would be easy - just > > say from which offset we are sending data. Reading is a bit > > tricky - we need read function which can detect holes and thus > > when requested to read from one it will set @offset to new value > > that contains data. > > > > Signed-off-by: Michal Privoznik <mprivozn@xxxxxxxxxx> > > --- > > include/libvirt/libvirt-stream.h | 8 +++ > > src/driver-stream.h | 13 +++++ > > src/libvirt-stream.c | 113 > > +++++++++++++++++++++++++++++++++++++++ > > src/libvirt_public.syms | 6 +++ > > 4 files changed, 140 insertions(+) > > > > diff --git a/include/libvirt/libvirt-stream.h > > b/include/libvirt/libvirt-stream.h > > index 831640d..5a2bde3 100644 > > --- a/include/libvirt/libvirt-stream.h > > +++ b/include/libvirt/libvirt-stream.h > > @@ -40,11 +40,19 @@ int virStreamRef(virStreamPtr st); > > int virStreamSend(virStreamPtr st, > > const char *data, > > size_t nbytes); > > +int virStreamSendOffset(virStreamPtr stream, > > + unsigned long long offset, > > + const char *data, > > + size_t nbytes); > > > > int virStreamRecv(virStreamPtr st, > > char *data, > > size_t nbytes); > > > > +int virStreamRecvOffset(virStreamPtr stream, > > + unsigned long long *offset, > > + char *data, > > + size_t nbytes); > > > > /** > > * virStreamSourceFunc: > > diff --git a/src/driver-stream.h b/src/driver-stream.h > > index 85b4e3b..5419b85 100644 > > --- a/src/driver-stream.h > > +++ b/src/driver-stream.h > > @@ -31,9 +31,20 @@ typedef int > > size_t nbytes); > > > > typedef int > > +(*virDrvStreamSendOffset)(virStreamPtr st, > > + unsigned long long offset, > > + const char *data, > > + size_t nbytes); > > + > > +typedef int > > (*virDrvStreamRecv)(virStreamPtr st, > > char *data, > > size_t nbytes); > > +typedef int > > +(*virDrvStreamRecvOffset)(virStreamPtr st, > > + unsigned long long *offset, > > + char *data, > > + size_t nbytes); > > > > typedef int > > (*virDrvStreamEventAddCallback)(virStreamPtr stream, > > @@ -60,7 +71,9 @@ typedef virStreamDriver *virStreamDriverPtr; > > > > struct _virStreamDriver { > > virDrvStreamSend streamSend; > > + virDrvStreamSendOffset streamSendOffset; > > virDrvStreamRecv streamRecv; > > + virDrvStreamRecvOffset streamRecvOffset; > > virDrvStreamEventAddCallback streamEventAddCallback; > > virDrvStreamEventUpdateCallback streamEventUpdateCallback; > > virDrvStreamEventRemoveCallback streamEventRemoveCallback; > > diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c > > index c16f586..1df188c 100644 > > --- a/src/libvirt-stream.c > > +++ b/src/libvirt-stream.c > > @@ -192,6 +192,58 @@ virStreamSend(virStreamPtr stream, > > > > > > /** > > + * virStreamSendOffset: > > + * @stream: pointer to the stream object > > + * @offset: <something> > > + * @data: buffer to write to stream > > + * @nbytes: size of @data buffer > > + * > > + * Sends some data down the pipe. > > + * > > + * Returns the number of bytes written, which may be less > > + * than requested. > > + * > > + * Returns -1 upon error, at which time the stream will > > + * be marked as aborted, and the caller should now release > > + * the stream with virStreamFree. > > + * > > + * Returns -2 if the outgoing transmit buffers are full & > > + * the stream is marked as non-blocking. > > + */ > > +int > > +virStreamSendOffset(virStreamPtr stream, > > + unsigned long long offset, > > + const char *data, > > + size_t nbytes) > > +{ > > + VIR_DEBUG("stream=%p, offset=%llu, data=%p, nbytes=%zu", > > + stream, offset, data, nbytes); > > + > > + virResetLastError(); > > + > > + virCheckStreamReturn(stream, -1); > > + virCheckNonNullArgGoto(data, error); > > + > > + if (stream->driver && > > + stream->driver->streamSendOffset) { > > + int ret; > > + ret = (stream->driver->streamSendOffset)(stream, offset, > > data, nbytes); > > + if (ret == -2) > > + return -2; > > + if (ret < 0) > > + goto error; > > + return ret; > > + } > > + > > + virReportUnsupportedError(); > > + > > + error: > > + virDispatchError(stream->conn); > > + return -1; > > +} > > + > > + > > +/** > > * virStreamRecv: > > * @stream: pointer to the stream object > > * @data: buffer to read into from stream > > @@ -285,6 +337,67 @@ virStreamRecv(virStreamPtr stream, > > > > > > /** > > + * virStreamRecvOffset: > > + * @stream: pointer to the stream object > > + * @offset: <something> > > + * @data: buffer to write to stream > > + * @nbytes: size of @data buffer > > + * > > + * Recieve some data from stream. On return set offset to next > > location. > > + * > > + * Returns the number of bytes read, which may be less > > + * than requested. > > + * > > + * Returns 0 when either the end of the stream is reached, or > > + * there are no data to be sent at current @offset. In case of > > + * the former, the stream should be finished by calling > > + * virStreamFinish(). However, in case of the latter, @offset > > + * should be set to new position where interesting data is. > > + * Failing to do so will result in assumption that there is no > > + * data left. > > + * > > + * Returns -1 upon error, at which time the stream will > > + * be marked as aborted, and the caller should now release > > + * the stream with virStreamFree. > > + * > > + * Returns -2 if there is no data pending to be read & the > > + * stream is marked as non-blocking. > > + */ > > It is not entirely clear from these docs, but from the impl > against 'fdstream', you seem to treating 'offset' as an > input and output parameter. > > I don't think that is going to fly wrt to the RPC protocol. > The reads are explicitly asynchronous wrt the remote server > transmission of data. ie once the stream is open, the server > will push out the data in a continuous stream. There is no > "read" operation requested against the server. So it is not > possible to send an 'offset' across to the server. So the > 'offset' really needs to be an output only parameter. > > I think it is important that we consider the design additions > to the wire protocol at the same time as the API design we're > considering. > > > From an RPC layer POV we currently have VIR_NET_STREAM packets > to transmit the data payload. There is no offset information > provided in these packets, as it is designed as a continuous > stream of data. So from this POV adding offset to the send/recv > methods in the API is not a good fit for the wire proocol. > > We use the status field to distinguish between a packet with > payload and a packet without data. ie VIR_NET_CONTINUE status > indicates that there is a a byte[] payload. We could introduce > a new status field VIR_NET_SKIP to represent a hole in the > stream, and the payload would simply be the size of the hole. > > Regards, > Daniel Daniel, Michal: I started looking at this topic last week after Daniel pointed me at Michal's thread back in December re: "Sparse image volDownload". I don't have any proposed patches yet. Still reading code, building my mental map thereof. I was happy to see Michal's RFC and Daniel's responses as it helps in this endeavor. So to check my perceived understanding so far I'd like to discuss Michal's proposed APIs and possible implementations in the context of Daniels comment on the wire protocol -- specifically the notion of a VIR_NET_SKIP packet to represent holes. I have also looked at the virsh volume upload/download use of virStream{Send,Recv)All() but will comment on that separately in response to Daniel's response to message 0/8. I think that Michal's proposed APIs could work atop a wire protocol that sends holes and data chunks separately based on a couple of different interpretations of the 'offset' parameter. 1) If 'offset' is interpreted as "offset from the beginning of the stream", then I think the stream would need to track the amount of data and hole sizes already sent -- effectively the offset from start of the stream of the next chunk to send/receive. Then for virStreamSendOffset() the 'offset' parameter would need to be >= current "stream offset". If equal, just send the data, otherwise send a hole of size offset-"stream offset", followed by the data. This would require that the client also track or be able to query the cumulative data+holes sent to compute the offset. 2) Or, one could interpret offset as relative to the data+holes already sent, constrained to >=0. Greater than zero indicates a hole to send before any data -- a hole could be at end of the stream data source. Less bookkeeping for the client and stream. In either case, the added stream driver function would be "streamSendHole()" rather than streamSendOffset(). For compatibility we only want to send a hole if the remote end supports it. If not, we'd need to expand the offset/hole to the equivalent number of zero data bytes somewhere in the stack. I haven't looked at how libvirt negotiates protocol support yet. The current virStreamSend() could be enhanced to scan data buffers for sufficiently long runs of zeroes and covert to holes. Again, if remote end supports receiving holes. Similar alternative interpretations exist for virStreamRecvOffset(). However, I think that for receiving holes, we want a driver function that returns a hole along with any subsequent data as we don't know where the holes will occur in the incoming stream. Or we could ask for a hole before each chunk of data with the expectation that we might not get one. The receiving client might not need to track data+holes received because it would be able to use SEEK_SET if 'offset' represents offset from start of stream or SEEK_CUR when it represents offset from data+holes received so far. Of course, this assumes that the data sink is capable of seeking at all. If not the client would not want to use virStreamRecvOffset() unless it wants to expand holes on the wire into the corresponding run of zeros itself. We can do that in virStreamRecv() without changing the API. Looking further down the stack: In virNetClientCallDispatchStream(), the switch on client- >msg.header.status will need a case added to handle VIR_NET_SKIP packets. I think this can be the same path as for VIR_NET_CONTINUE -- that is, let virNetClientStreamQueuePacket() handle it. The latter function would need to recheck the msg.header.status and in the case of '_SKIP, insert a special "hole-iovec" in the incoming queue representing a hole -- perhaps a 0 or ~0 value for iov_base. Inserting an iovec to represent a hole preserves the sequencing of the stream and plays nice with virNetClientStreamRecvPacket(). On the client side in the while() loop that consumes incoming iovecs virNetClientStreamRecvPacket() could check for iovecs representing holes and handle them appropriately. It appears that we can change the virNetClientStreamRecvPacket() API to return holes -- e.g., add a 'size_t *hole' parameter. The function is listed is in libvirt_remote.syms, private to libvirt according to the comments there. On encountering a "hole-iovec" virNetClientStreamRecvPacket() would terminate any partial data buffer and pass the received data up to the caller. Returning the hole should probably be deferred until the next call so that holes precede any accompanying data -- can be used to compute seek offsets to that data -- if we return both together. If virNetClientStreamRecvPacket() has been called from the existing virStreamRecv() -- e.g., NULL 'hole' parameter -- it would expand as much of the hole as will fit into the data buffer as zeroes and adjust the hole-iovec to leave any remainder on the incoming queue. I.e., like it handles data packages that don't fit into the data buffer. We could enhance virStreamRecv() to receive and expand the holes, but virNetClientStreamRecvPacket() already has the logic to handle iovecs larger than the passed in buffer. ... I haven't looked closely at the local stream{Send,Recv} handlers yet other than to note that they are virFDStream functions. Since these are using fds, I'm assuming the can be enhanced or parallel APIs defined to query whether the fd supports seeking at all and, in the case of receiving from a local FDStream whether the fd supports SEEK_{DATA,HOLE}. I suppose the support for data/hole seeks could be hidden behind the API. When such seeks are not supported the function could scan the data stream for sufficiently long runs of zeroes and return them as holes for Recv or expand holes/offsets to runs of zeros for Send. Note that one could also scan the data chunks for sequences of zeroes worth transmitting as holes to keep them off the wire and perhaps end up with a destination file that is "sparser" than the original. It also appears that the remote ends use FDStreams as sinks/sources, so they might be addressed by these or similar changes? Does that all make any sense? Regards, Lee -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list