On 04/15/2016 09:51 AM, Michal Privoznik wrote: > Currently we have two separate functions for handling read from > a stream. One is supposed to be low level and reads data in this > self allocating chunk of memory. The other read function then > copies data over from the chunk into a user buffer. There are two > memcpy() involved even though a single would be sufficient. > Moreover, since we are copying just data, we can't process > alternative stream packets in the latter function, like stream > seeks. > > In my testing, this proved two times faster then implementation s/then/than the/ > which uses IO vectors. Can I "assume" this testing covers the reverted patch scenario. IOW: I think this needs https://bugzilla.redhat.com/show_bug.cgi?id=1026136 to be reopened... Might have been "nice" to indicate/summarize what this algorithm does as opposed to the other. I think you started at the end of the first paragraph, but I'm not 100% sure - I guess it's easier for me if it's explicitly said, such as: In virNetClientStreamQueuePacket instead of ... In virNetClientStreamRecvPacket instead of ... instead of implicitly said if you know the code. The functions are just tough to read without (more) knowledge (than I have about them) of how they are designed to function. Since he had a hand in the above bug, hopefully Martin can take a look at this patch. > > Signed-off-by: Michal Privoznik <mprivozn@xxxxxxxxxx> > --- > src/rpc/virnetclientstream.c | 106 ++++++++++++++++++++++--------------------- > 1 file changed, 54 insertions(+), 52 deletions(-) > > diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c > index b428f4b..34989a9 100644 > --- a/src/rpc/virnetclientstream.c > +++ b/src/rpc/virnetclientstream.c > @@ -49,9 +49,7 @@ struct _virNetClientStream { > * time by stopping consuming any incoming data > * off the socket.... > */ > - char *incoming; > - size_t incomingOffset; > - size_t incomingLength; > + virNetMessagePtr rx; > bool incomingEOF; > > virNetClientStreamEventCallback cb; > @@ -86,9 +84,9 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st) > if (!st->cb) > return; > > - VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset, st->cbEvents); > + VIR_DEBUG("Check timer rx=%p cbEvents=%d", st->rx, st->cbEvents); > > - if (((st->incomingOffset || st->incomingEOF) && > + if (((st->rx || st->incomingEOF) && > (st->cbEvents & VIR_STREAM_EVENT_READABLE)) || > (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) { > VIR_DEBUG("Enabling event timer"); > @@ -110,13 +108,13 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque) > > if (st->cb && > (st->cbEvents & VIR_STREAM_EVENT_READABLE) && > - (st->incomingOffset || st->incomingEOF)) > + (st->rx || st->incomingEOF)) > events |= VIR_STREAM_EVENT_READABLE; > if (st->cb && > (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) > events |= VIR_STREAM_EVENT_WRITABLE; > > - VIR_DEBUG("Got Timer dispatch %d %d offset=%zu", events, st->cbEvents, st->incomingOffset); > + VIR_DEBUG("Got Timer dispatch events=%d cbEvents=%d rx=%p", events, st->cbEvents, st->rx); > if (events) { > virNetClientStreamEventCallback cb = st->cb; > void *cbOpaque = st->cbOpaque; > @@ -161,7 +159,11 @@ void virNetClientStreamDispose(void *obj) > virNetClientStreamPtr st = obj; > > virResetError(&st->err); > - VIR_FREE(st->incoming); > + while (st->rx) { > + virNetMessagePtr msg = st->rx; > + virNetMessageQueueServe(&st->rx); > + virNetMessageFree(msg); > + } > virObjectUnref(st->prog); > } > > @@ -264,41 +266,34 @@ int virNetClientStreamSetError(virNetClientStreamPtr st, > int virNetClientStreamQueuePacket(virNetClientStreamPtr st, > virNetMessagePtr msg) > { > - int ret = -1; > - size_t need; > + virNetMessagePtr tmp_msg; > + > + VIR_DEBUG("Incoming stream message: stream=%p message=%p", st, msg); > + > + /* Unfortunately, we must allocate new message as the one we > + * get in @msg is going to be cleared later in the process. */ > + > + if (!(tmp_msg = virNetMessageNew(false))) > + return -1; > + > + /* Copy header */ > + memcpy(&tmp_msg->header, &msg->header, sizeof(msg->header)); > + > + /* Steal message buffer */ > + tmp_msg->buffer = msg->buffer; > + tmp_msg->bufferLength = msg->bufferLength; > + tmp_msg->bufferOffset = msg->bufferOffset; > + msg->buffer = NULL; > + msg->bufferLength = msg->bufferOffset = 0; > > virObjectLock(st); > - need = msg->bufferLength - msg->bufferOffset; > - if (need) { > - size_t avail = st->incomingLength - st->incomingOffset; > - if (need > avail) { > - size_t extra = need - avail; > - if (VIR_REALLOC_N(st->incoming, > - st->incomingLength + extra) < 0) { > - VIR_DEBUG("Out of memory handling stream data"); > - goto cleanup; > - } > - st->incomingLength += extra; > - } > > - memcpy(st->incoming + st->incomingOffset, > - msg->buffer + msg->bufferOffset, > - msg->bufferLength - msg->bufferOffset); > - st->incomingOffset += (msg->bufferLength - msg->bufferOffset); > - } else { > - st->incomingEOF = true; > - } > + virNetMessageQueuePush(&st->rx, tmp_msg); > > - VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d", > - st->incomingOffset, st->incomingLength, > - st->incomingEOF); > virNetClientStreamEventTimerUpdate(st); > > - ret = 0; > - > - cleanup: > virObjectUnlock(st); > - return ret; > + return 0; > } > > > @@ -362,10 +357,12 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, > bool nonblock) > { > int rv = -1; > + size_t want; > + > VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d", > st, client, data, nbytes, nonblock); > virObjectLock(st); > - if (!st->incomingOffset && !st->incomingEOF) { > + if (!st->rx && !st->incomingEOF) { > virNetMessagePtr msg; > int ret; > > @@ -395,23 +392,28 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, > goto cleanup; > } > > - VIR_DEBUG("After IO %zu", st->incomingOffset); > - if (st->incomingOffset) { > - int want = st->incomingOffset; > - if (want > nbytes) > - want = nbytes; > - memcpy(data, st->incoming, want); > - if (want < st->incomingOffset) { > - memmove(st->incoming, st->incoming + want, st->incomingOffset - want); > - st->incomingOffset -= want; > - } else { > - VIR_FREE(st->incoming); > - st->incomingOffset = st->incomingLength = 0; > + VIR_DEBUG("After IO rx=%p", st->rx); > + want = nbytes; > + while (want && st->rx) { So if 'st->rx == NULL', then 'rv = nbytes - want;' or 0 - I assume that is 'expected'... > + virNetMessagePtr msg = st->rx; > + size_t len = want; > + > + if (len > msg->bufferLength - msg->bufferOffset) > + len = msg->bufferLength - msg->bufferOffset; > + > + if (!len) > + break; > + > + memcpy(data + (nbytes - want), msg->buffer + msg->bufferOffset, len); > + want -= len; > + msg->bufferOffset += len; > + > + if (msg->bufferOffset == msg->bufferLength) { > + virNetMessageQueueServe(&st->rx); > + virNetMessageFree(msg); Nothing needs to be done with want here? I guess this shows my lack of depth of understanding of these algorithms... Big black box that I hope works without me needing to intervene! John > } > - rv = want; > - } else { > - rv = 0; > } > + rv = nbytes - want; > > virNetClientStreamEventTimerUpdate(st); > > -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list