Re: [PATCH 8/9] virnetclientstream: Process stream messages later

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 




On 04/15/2016 09:51 AM, Michal Privoznik wrote:
> Currently we have two separate functions for handling read from
> a stream. One is supposed to be low level and reads data in this
> self allocating chunk of memory. The other read function then
> copies data over from the chunk into a user buffer. There are two
> memcpy() involved even though a single would be sufficient.
> Moreover, since we are copying just data, we can't process
> alternative stream packets in the latter function, like stream
> seeks.
> 
> In my testing, this proved two times faster then implementation

s/then/than the/

> which uses IO vectors.

Can I "assume" this testing covers the reverted patch scenario. IOW: I
think this needs https://bugzilla.redhat.com/show_bug.cgi?id=1026136 to
be reopened...

Might have been "nice" to indicate/summarize what this algorithm does as
opposed to the other. I think you started at the end of the first
paragraph, but I'm not 100% sure - I guess it's easier for me if it's
explicitly said, such as:

In virNetClientStreamQueuePacket instead of ...

In virNetClientStreamRecvPacket instead of ...

instead of implicitly said if you know the code.

The functions are just tough to read without (more) knowledge (than I
have about them) of how they are designed to function. Since he had a
hand in the above bug, hopefully Martin can take a look at this patch.

> 
> Signed-off-by: Michal Privoznik <mprivozn@xxxxxxxxxx>
> ---
>  src/rpc/virnetclientstream.c | 106 ++++++++++++++++++++++---------------------
>  1 file changed, 54 insertions(+), 52 deletions(-)
> 
> diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c
> index b428f4b..34989a9 100644
> --- a/src/rpc/virnetclientstream.c
> +++ b/src/rpc/virnetclientstream.c
> @@ -49,9 +49,7 @@ struct _virNetClientStream {
>       * time by stopping consuming any incoming data
>       * off the socket....
>       */
> -    char *incoming;
> -    size_t incomingOffset;
> -    size_t incomingLength;
> +    virNetMessagePtr rx;
>      bool incomingEOF;
>  
>      virNetClientStreamEventCallback cb;
> @@ -86,9 +84,9 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st)
>      if (!st->cb)
>          return;
>  
> -    VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset, st->cbEvents);
> +    VIR_DEBUG("Check timer rx=%p cbEvents=%d", st->rx, st->cbEvents);
>  
> -    if (((st->incomingOffset || st->incomingEOF) &&
> +    if (((st->rx || st->incomingEOF) &&
>           (st->cbEvents & VIR_STREAM_EVENT_READABLE)) ||
>          (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) {
>          VIR_DEBUG("Enabling event timer");
> @@ -110,13 +108,13 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
>  
>      if (st->cb &&
>          (st->cbEvents & VIR_STREAM_EVENT_READABLE) &&
> -        (st->incomingOffset || st->incomingEOF))
> +        (st->rx || st->incomingEOF))
>          events |= VIR_STREAM_EVENT_READABLE;
>      if (st->cb &&
>          (st->cbEvents & VIR_STREAM_EVENT_WRITABLE))
>          events |= VIR_STREAM_EVENT_WRITABLE;
>  
> -    VIR_DEBUG("Got Timer dispatch %d %d offset=%zu", events, st->cbEvents, st->incomingOffset);
> +    VIR_DEBUG("Got Timer dispatch events=%d cbEvents=%d rx=%p", events, st->cbEvents, st->rx);
>      if (events) {
>          virNetClientStreamEventCallback cb = st->cb;
>          void *cbOpaque = st->cbOpaque;
> @@ -161,7 +159,11 @@ void virNetClientStreamDispose(void *obj)
>      virNetClientStreamPtr st = obj;
>  
>      virResetError(&st->err);
> -    VIR_FREE(st->incoming);
> +    while (st->rx) {
> +        virNetMessagePtr msg = st->rx;
> +        virNetMessageQueueServe(&st->rx);
> +        virNetMessageFree(msg);
> +    }
>      virObjectUnref(st->prog);
>  }
>  
> @@ -264,41 +266,34 @@ int virNetClientStreamSetError(virNetClientStreamPtr st,
>  int virNetClientStreamQueuePacket(virNetClientStreamPtr st,
>                                    virNetMessagePtr msg)
>  {
> -    int ret = -1;
> -    size_t need;
> +    virNetMessagePtr tmp_msg;
> +
> +    VIR_DEBUG("Incoming stream message: stream=%p message=%p", st, msg);
> +
> +    /* Unfortunately, we must allocate new message as the one we
> +     * get in @msg is going to be cleared later in the process. */
> +
> +    if (!(tmp_msg = virNetMessageNew(false)))
> +        return -1;
> +
> +    /* Copy header */
> +    memcpy(&tmp_msg->header, &msg->header, sizeof(msg->header));
> +
> +    /* Steal message buffer */
> +    tmp_msg->buffer = msg->buffer;
> +    tmp_msg->bufferLength = msg->bufferLength;
> +    tmp_msg->bufferOffset = msg->bufferOffset;
> +    msg->buffer = NULL;
> +    msg->bufferLength = msg->bufferOffset = 0;
>  
>      virObjectLock(st);
> -    need = msg->bufferLength - msg->bufferOffset;
> -    if (need) {
> -        size_t avail = st->incomingLength - st->incomingOffset;
> -        if (need > avail) {
> -            size_t extra = need - avail;
> -            if (VIR_REALLOC_N(st->incoming,
> -                              st->incomingLength + extra) < 0) {
> -                VIR_DEBUG("Out of memory handling stream data");
> -                goto cleanup;
> -            }
> -            st->incomingLength += extra;
> -        }
>  
> -        memcpy(st->incoming + st->incomingOffset,
> -               msg->buffer + msg->bufferOffset,
> -               msg->bufferLength - msg->bufferOffset);
> -        st->incomingOffset += (msg->bufferLength - msg->bufferOffset);
> -    } else {
> -        st->incomingEOF = true;
> -    }
> +    virNetMessageQueuePush(&st->rx, tmp_msg);
>  
> -    VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d",
> -              st->incomingOffset, st->incomingLength,
> -              st->incomingEOF);
>      virNetClientStreamEventTimerUpdate(st);
>  
> -    ret = 0;
> -
> - cleanup:
>      virObjectUnlock(st);
> -    return ret;
> +    return 0;
>  }
>  
>  
> @@ -362,10 +357,12 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
>                                   bool nonblock)
>  {
>      int rv = -1;
> +    size_t want;
> +
>      VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d",
>                st, client, data, nbytes, nonblock);
>      virObjectLock(st);
> -    if (!st->incomingOffset && !st->incomingEOF) {
> +    if (!st->rx && !st->incomingEOF) {
>          virNetMessagePtr msg;
>          int ret;
>  
> @@ -395,23 +392,28 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
>              goto cleanup;
>      }
>  
> -    VIR_DEBUG("After IO %zu", st->incomingOffset);
> -    if (st->incomingOffset) {
> -        int want = st->incomingOffset;
> -        if (want > nbytes)
> -            want = nbytes;
> -        memcpy(data, st->incoming, want);
> -        if (want < st->incomingOffset) {
> -            memmove(st->incoming, st->incoming + want, st->incomingOffset - want);
> -            st->incomingOffset -= want;
> -        } else {
> -            VIR_FREE(st->incoming);
> -            st->incomingOffset = st->incomingLength = 0;
> +    VIR_DEBUG("After IO rx=%p", st->rx);
> +    want = nbytes;
> +    while (want && st->rx) {

So if 'st->rx == NULL', then 'rv = nbytes - want;' or 0 - I assume that
is 'expected'...

> +        virNetMessagePtr msg = st->rx;
> +        size_t len = want;
> +
> +        if (len > msg->bufferLength - msg->bufferOffset)
> +            len = msg->bufferLength - msg->bufferOffset;
> +
> +        if (!len)
> +            break;
> +
> +        memcpy(data + (nbytes - want), msg->buffer + msg->bufferOffset, len);
> +        want -= len;
> +        msg->bufferOffset += len;
> +
> +        if (msg->bufferOffset == msg->bufferLength) {
> +            virNetMessageQueueServe(&st->rx);
> +            virNetMessageFree(msg);

Nothing needs to be done with want here?   I guess this shows my lack of
depth of understanding of these algorithms...  Big black box that I hope
works without me needing to intervene!

John

>          }
> -        rv = want;
> -    } else {
> -        rv = 0;
>      }
> +    rv = nbytes - want;
>  
>      virNetClientStreamEventTimerUpdate(st);
>  
> 

--
libvir-list mailing list
libvir-list@xxxxxxxxxx
https://www.redhat.com/mailman/listinfo/libvir-list



[Index of Archives]     [Virt Tools]     [Libvirt Users]     [Lib OS Info]     [Fedora Users]     [Fedora Desktop]     [Fedora SELinux]     [Big List of Linux Books]     [Yosemite News]     [KDE Users]     [Fedora Tools]