Re: [PATCH RFC 2/8] Introduce virStream{Recv,Send}Offset

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Mon, 2016-02-01 at 14:49 +0000, Daniel P. Berrange wrote:
> On Fri, Jan 29, 2016 at 02:26:53PM +0100, Michal Privoznik wrote:
> > When dealing with sparse files we need to be able to jump over
> > holes as there's no added value in reading/writing them. For
> > that, we need new set of send and receive APIs that will have
> > @offset argument. Sending data to a stream would be easy - just
> > say from which offset we are sending data. Reading is a bit
> > tricky - we need read function which can detect holes and thus
> > when requested to read from one it will set @offset to new value
> > that contains data.
> > 
> > Signed-off-by: Michal Privoznik <mprivozn@xxxxxxxxxx>
> > ---
> >  include/libvirt/libvirt-stream.h |   8 +++
> >  src/driver-stream.h              |  13 +++++
> >  src/libvirt-stream.c             | 113
> > +++++++++++++++++++++++++++++++++++++++
> >  src/libvirt_public.syms          |   6 +++
> >  4 files changed, 140 insertions(+)
> > 
> > diff --git a/include/libvirt/libvirt-stream.h
> > b/include/libvirt/libvirt-stream.h
> > index 831640d..5a2bde3 100644
> > --- a/include/libvirt/libvirt-stream.h
> > +++ b/include/libvirt/libvirt-stream.h
> > @@ -40,11 +40,19 @@ int virStreamRef(virStreamPtr st);
> >  int virStreamSend(virStreamPtr st,
> >                    const char *data,
> >                    size_t nbytes);
> > +int virStreamSendOffset(virStreamPtr stream,
> > +                        unsigned long long offset,
> > +                        const char *data,
> > +                        size_t nbytes);
> >  
> >  int virStreamRecv(virStreamPtr st,
> >                    char *data,
> >                    size_t nbytes);
> >  
> > +int virStreamRecvOffset(virStreamPtr stream,
> > +                        unsigned long long *offset,
> > +                        char *data,
> > +                        size_t nbytes);
> >  
> >  /**
> >   * virStreamSourceFunc:
> > diff --git a/src/driver-stream.h b/src/driver-stream.h
> > index 85b4e3b..5419b85 100644
> > --- a/src/driver-stream.h
> > +++ b/src/driver-stream.h
> > @@ -31,9 +31,20 @@ typedef int
> >                      size_t nbytes);
> >  
> >  typedef int
> > +(*virDrvStreamSendOffset)(virStreamPtr st,
> > +                          unsigned long long offset,
> > +                          const char *data,
> > +                          size_t nbytes);
> > +
> > +typedef int
> >  (*virDrvStreamRecv)(virStreamPtr st,
> >                      char *data,
> >                      size_t nbytes);
> > +typedef int
> > +(*virDrvStreamRecvOffset)(virStreamPtr st,
> > +                          unsigned long long *offset,
> > +                          char *data,
> > +                          size_t nbytes);
> >  
> >  typedef int
> >  (*virDrvStreamEventAddCallback)(virStreamPtr stream,
> > @@ -60,7 +71,9 @@ typedef virStreamDriver *virStreamDriverPtr;
> >  
> >  struct _virStreamDriver {
> >      virDrvStreamSend streamSend;
> > +    virDrvStreamSendOffset streamSendOffset;
> >      virDrvStreamRecv streamRecv;
> > +    virDrvStreamRecvOffset streamRecvOffset;
> >      virDrvStreamEventAddCallback streamEventAddCallback;
> >      virDrvStreamEventUpdateCallback streamEventUpdateCallback;
> >      virDrvStreamEventRemoveCallback streamEventRemoveCallback;
> > diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c
> > index c16f586..1df188c 100644
> > --- a/src/libvirt-stream.c
> > +++ b/src/libvirt-stream.c
> > @@ -192,6 +192,58 @@ virStreamSend(virStreamPtr stream,
> >  
> >  
> >  /**
> > + * virStreamSendOffset:
> > + * @stream: pointer to the stream object
> > + * @offset: <something>
> > + * @data: buffer to write to stream
> > + * @nbytes: size of @data buffer
> > + *
> > + * Sends some data down the pipe.
> > + *
> > + * Returns the number of bytes written, which may be less
> > + * than requested.
> > + *
> > + * Returns -1 upon error, at which time the stream will
> > + * be marked as aborted, and the caller should now release
> > + * the stream with virStreamFree.
> > + *
> > + * Returns -2 if the outgoing transmit buffers are full &
> > + * the stream is marked as non-blocking.
> > + */
> > +int
> > +virStreamSendOffset(virStreamPtr stream,
> > +                    unsigned long long offset,
> > +                    const char *data,
> > +                    size_t nbytes)
> > +{
> > +    VIR_DEBUG("stream=%p, offset=%llu, data=%p, nbytes=%zu",
> > +              stream, offset, data, nbytes);
> > +
> > +    virResetLastError();
> > +
> > +    virCheckStreamReturn(stream, -1);
> > +    virCheckNonNullArgGoto(data, error);
> > +
> > +    if (stream->driver &&
> > +        stream->driver->streamSendOffset) {
> > +        int ret;
> > +        ret = (stream->driver->streamSendOffset)(stream, offset,
> > data, nbytes);
> > +        if (ret == -2)
> > +            return -2;
> > +        if (ret < 0)
> > +            goto error;
> > +        return ret;
> > +    }
> > +
> > +    virReportUnsupportedError();
> > +
> > + error:
> > +    virDispatchError(stream->conn);
> > +    return -1;
> > +}
> > +
> > +
> > +/**
> >   * virStreamRecv:
> >   * @stream: pointer to the stream object
> >   * @data: buffer to read into from stream
> > @@ -285,6 +337,67 @@ virStreamRecv(virStreamPtr stream,
> >  
> >  
> >  /**
> > + * virStreamRecvOffset:
> > + * @stream: pointer to the stream object
> > + * @offset: <something>
> > + * @data: buffer to write to stream
> > + * @nbytes: size of @data buffer
> > + *
> > + * Recieve some data from stream. On return set offset to next
> > location.
> > + *
> > + * Returns the number of bytes read, which may be less
> > + * than requested.
> > + *
> > + * Returns 0 when either the end of the stream is reached, or
> > + * there are no data to be sent at current @offset. In case of
> > + * the former, the stream should be finished by calling
> > + * virStreamFinish(). However, in case of the latter, @offset
> > + * should be set to new position where interesting data is.
> > + * Failing to do so will result in assumption that there is no
> > + * data left.
> > + *
> > + * Returns -1 upon error, at which time the stream will
> > + * be marked as aborted, and the caller should now release
> > + * the stream with virStreamFree.
> > + *
> > + * Returns -2 if there is no data pending to be read & the
> > + * stream is marked as non-blocking.
> > + */
> 
> It is not entirely clear from these docs, but from the impl
> against 'fdstream', you seem to treating 'offset' as an
> input and output parameter.
> 
> I don't think that is going to fly wrt to the RPC protocol.
> The reads are explicitly asynchronous wrt the remote server
> transmission of data. ie once the stream is open, the server
> will push out the data in a continuous stream. There is no
> "read" operation requested against the server. So it is not
> possible to send an 'offset' across to the server. So the
> 'offset' really needs to be an output only parameter.
> 
> I think it is important that we consider the design additions
> to the wire protocol at the same time as the API design we're
> considering.
> 
> > From an RPC layer POV we currently have VIR_NET_STREAM packets
> to transmit the data payload. There is no offset information
> provided in these packets, as it is designed as a continuous
> stream of data. So from this POV adding offset to the send/recv
> methods in the API is not a good fit for the wire proocol.
> 
> We use the status field to distinguish between a packet with
> payload and a packet without data. ie VIR_NET_CONTINUE status
> indicates that there is a a byte[] payload. We could introduce
> a new status field VIR_NET_SKIP to represent a hole in the
> stream, and the payload would simply be the size of the hole.
> 
> Regards,
> Daniel

Daniel, Michal:

I started looking at this topic last week after Daniel pointed me at
Michal's thread back in December re: "Sparse image volDownload".  I
don't have any proposed patches yet.  Still reading code, building my
mental map thereof.  I was happy to see Michal's RFC and Daniel's
responses as it helps in this endeavor.  So to check my perceived
understanding so far I'd like to discuss Michal's proposed APIs and
possible implementations in the context of Daniels comment on the wire
protocol -- specifically the notion of a VIR_NET_SKIP packet to
represent holes.  I have also looked at the virsh volume
upload/download use of virStream{Send,Recv)All() but will comment on
that separately in response to Daniel's response to message 0/8.

I think that Michal's proposed APIs could work atop a wire protocol
that sends holes and data chunks separately based on a couple of
different interpretations of the 'offset' parameter.

1)  If 'offset' is interpreted as "offset from the beginning of the
stream", then I think the stream would need to track the amount of data
and hole sizes already sent -- effectively the offset from start of the
stream of the next chunk to send/receive.  Then for
virStreamSendOffset() the 'offset' parameter would need to be >=
current "stream offset".  If equal, just send the data, otherwise send
a hole of size offset-"stream offset", followed by the data.  This
would require that the client also track or be able to query the
cumulative data+holes sent to compute the offset.

2) Or, one could interpret offset as relative to the data+holes already
sent, constrained to >=0.  Greater than zero indicates a hole to send
before any data -- a hole could be at end of the stream data source.
 Less bookkeeping for the client and stream.

In either case, the added stream driver function would be
"streamSendHole()" rather than streamSendOffset().

For compatibility we only want to send a hole if the remote end
supports it.  If not, we'd need to expand the offset/hole to the
equivalent number of zero data bytes somewhere in the stack. I haven't
looked at how libvirt negotiates protocol support yet.

The current virStreamSend() could be enhanced to scan data buffers for
sufficiently long runs of zeroes and covert to holes.  Again, if remote
end supports receiving holes.

Similar alternative interpretations exist for virStreamRecvOffset().
 However, I think that for receiving holes, we want a driver function
that returns a hole along with any subsequent data as we don't know
where the holes will occur in the incoming stream.  Or we could ask for
a hole before each chunk of data with the expectation that we might not
get one.

The receiving client might not need to track data+holes received
because it would be able to use SEEK_SET if 'offset' represents offset
from start of stream or SEEK_CUR when it represents offset from
data+holes received so far.  Of course, this assumes that the data sink
is capable of seeking at all.  If not the client would not want to use
virStreamRecvOffset() unless it wants to expand holes on the wire into
the corresponding run of zeros itself.  We can do that in
virStreamRecv() without changing the API.

Looking further down the stack:

In virNetClientCallDispatchStream(), the switch on client-
>msg.header.status will need a case added to handle VIR_NET_SKIP
packets.  I think this can be the same path as for VIR_NET_CONTINUE --
that is, let virNetClientStreamQueuePacket() handle it.  The latter
function would need to recheck the msg.header.status and in the case of
'_SKIP, insert a special "hole-iovec" in the incoming queue
representing a hole -- perhaps a 0 or ~0 value for iov_base.  Inserting
an iovec to represent a hole preserves the sequencing of the stream and
plays nice with virNetClientStreamRecvPacket().

On the client side in the while() loop that consumes incoming
iovecs virNetClientStreamRecvPacket() could check for iovecs
representing holes and handle them appropriately.  It appears that we
can change the virNetClientStreamRecvPacket() API to return holes --
e.g., add a 'size_t *hole' parameter.  The function is listed is in
libvirt_remote.syms, private to libvirt according to the comments
there.

On encountering a "hole-iovec" virNetClientStreamRecvPacket() would
terminate any partial data buffer and pass the received data up to the
caller.  Returning the hole should probably be deferred until the next
call so that holes precede any accompanying data -- can be used to
compute seek offsets to that data -- if we return both together.

If virNetClientStreamRecvPacket() has been called from the existing
virStreamRecv() -- e.g., NULL 'hole' parameter -- it would expand as
much of the hole as will fit into the data buffer as zeroes and adjust
the hole-iovec to leave any remainder on the incoming queue.  I.e.,
like it handles data packages that don't fit into the data buffer.  We
could enhance virStreamRecv() to receive and expand the holes, but
virNetClientStreamRecvPacket() already has the logic to handle iovecs
larger than the passed in buffer.

...

I haven't looked closely at the local stream{Send,Recv} handlers yet
other than to note that they are virFDStream functions.  Since these
are using fds, I'm assuming the can be enhanced or parallel APIs
defined to query whether the fd supports seeking at all and, in the
case of receiving from a local FDStream whether the fd supports
SEEK_{DATA,HOLE}.  I suppose the support for data/hole seeks could be
hidden behind the API.  When such seeks are not supported the function
could scan the data stream for sufficiently long runs of zeroes and
return them as holes for Recv or expand holes/offsets to runs of zeros
for Send.  Note that one could also scan the data chunks for sequences
of zeroes worth transmitting as holes to keep them off the wire and
perhaps end up with a destination file that is "sparser" than the
original.

It also appears that the remote ends use FDStreams as sinks/sources, so
they might be addressed by these or similar changes?

Does that all make any sense?

Regards,
Lee

--
libvir-list mailing list
libvir-list@xxxxxxxxxx
https://www.redhat.com/mailman/listinfo/libvir-list




[Index of Archives]     [Virt Tools]     [Libvirt Users]     [Lib OS Info]     [Fedora Users]     [Fedora Desktop]     [Fedora SELinux]     [Big List of Linux Books]     [Yosemite News]     [KDE Users]     [Fedora Tools]