On 04/21/2016 10:28 AM, Michal Privoznik wrote: > On 20.04.2016 15:57, John Ferlan wrote: >> >> >> On 04/15/2016 09:51 AM, Michal Privoznik wrote: >>> Currently we have two separate functions for handling read from >>> a stream. One is supposed to be low level and reads data in this >>> self allocating chunk of memory. The other read function then >>> copies data over from the chunk into a user buffer. There are two >>> memcpy() involved even though a single would be sufficient. >>> Moreover, since we are copying just data, we can't process >>> alternative stream packets in the latter function, like stream >>> seeks. >>> >>> In my testing, this proved two times faster then implementation >> >> s/then/than the/ >> >>> which uses IO vectors. >> >> Can I "assume" this testing covers the reverted patch scenario. IOW: I >> think this needs https://bugzilla.redhat.com/show_bug.cgi?id=1026136 to >> be reopened... > > This showed two times faster than even IO vectors implementation. > >> >> Might have been "nice" to indicate/summarize what this algorithm does as >> opposed to the other. I think you started at the end of the first >> paragraph, but I'm not 100% sure - I guess it's easier for me if it's >> explicitly said, such as: >> >> In virNetClientStreamQueuePacket instead of ... >> >> In virNetClientStreamRecvPacket instead of ... >> >> instead of implicitly said if you know the code. > > Something like this? > > There are two functions on the client that handle incoming stream data. > The first one virNetClientStreamQueuePacket() is a low level function > that just process the incoming stream data from the socket and store it ...just processes ... and stores ... > into an internal structure. This happens in the client event loop > therefore the shorter the callbacks are, the better. The second function > virNetClientStreamRecvPacket() then handles copying data from internal > structure into a client provided buffer. Change introduced in this New paragraph before "Change" > commit makes just that: new queue for incoming stream packets is ...a new receive (rx) queue... > introduced. Then instead of copying data into intermediate internal > buffer and then copying them into user buffer, incoming stream messages > are enqueued into the queue and data are copied just once - in the upper ... are queue... ... data is copied... > layer function virNetClientStreamRecvPacket(). In the end, there's just > one copying of data and therefore shorter event loop callback. This > should boost the performance which has proven to be the case in my testing. > > Having said that, I don't think there's any need for reopening the bug > since we are not hurting performance here. > The only reason I suggested is I think technically the revert makes the previous changes essentially NULL and void. Since that commit was connected with a bug #, I just wanted to be sure "process wise" we're covered... It's not that important though. OK... I see , instead of allocating and copying data from incoming stream socket into a buffer to only be copied again into the client buffer, we'll "steal" the entire buffer destined for the client from the incoming stream socket and then create a queue for the client to copy - seems OK to me... ACK for 7-8 John BTW: I know it's existing, but virNetMessageQueueServe caused me to go look and see it's really a virNetMessageQueuePop to complement the virNetMessageQueuePush (sigh) >> >> The functions are just tough to read without (more) knowledge (than I >> have about them) of how they are designed to function. Since he had a >> hand in the above bug, hopefully Martin can take a look at this patch. >> >>> >>> Signed-off-by: Michal Privoznik <mprivozn@xxxxxxxxxx> >>> --- >>> src/rpc/virnetclientstream.c | 106 ++++++++++++++++++++++--------------------- >>> 1 file changed, 54 insertions(+), 52 deletions(-) >>> >>> diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c >>> index b428f4b..34989a9 100644 >>> --- a/src/rpc/virnetclientstream.c >>> +++ b/src/rpc/virnetclientstream.c >>> @@ -49,9 +49,7 @@ struct _virNetClientStream { >>> * time by stopping consuming any incoming data >>> * off the socket.... >>> */ >>> - char *incoming; >>> - size_t incomingOffset; >>> - size_t incomingLength; >>> + virNetMessagePtr rx; >>> bool incomingEOF; >>> >>> virNetClientStreamEventCallback cb; >>> @@ -86,9 +84,9 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st) >>> if (!st->cb) >>> return; >>> >>> - VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset, st->cbEvents); >>> + VIR_DEBUG("Check timer rx=%p cbEvents=%d", st->rx, st->cbEvents); >>> >>> - if (((st->incomingOffset || st->incomingEOF) && >>> + if (((st->rx || st->incomingEOF) && >>> (st->cbEvents & VIR_STREAM_EVENT_READABLE)) || >>> (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) { >>> VIR_DEBUG("Enabling event timer"); >>> @@ -110,13 +108,13 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque) >>> >>> if (st->cb && >>> (st->cbEvents & VIR_STREAM_EVENT_READABLE) && >>> - (st->incomingOffset || st->incomingEOF)) >>> + (st->rx || st->incomingEOF)) >>> events |= VIR_STREAM_EVENT_READABLE; >>> if (st->cb && >>> (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) >>> events |= VIR_STREAM_EVENT_WRITABLE; >>> >>> - VIR_DEBUG("Got Timer dispatch %d %d offset=%zu", events, st->cbEvents, st->incomingOffset); >>> + VIR_DEBUG("Got Timer dispatch events=%d cbEvents=%d rx=%p", events, st->cbEvents, st->rx); >>> if (events) { >>> virNetClientStreamEventCallback cb = st->cb; >>> void *cbOpaque = st->cbOpaque; >>> @@ -161,7 +159,11 @@ void virNetClientStreamDispose(void *obj) >>> virNetClientStreamPtr st = obj; >>> >>> virResetError(&st->err); >>> - VIR_FREE(st->incoming); >>> + while (st->rx) { >>> + virNetMessagePtr msg = st->rx; >>> + virNetMessageQueueServe(&st->rx); >>> + virNetMessageFree(msg); >>> + } >>> virObjectUnref(st->prog); >>> } >>> >>> @@ -264,41 +266,34 @@ int virNetClientStreamSetError(virNetClientStreamPtr st, >>> int virNetClientStreamQueuePacket(virNetClientStreamPtr st, >>> virNetMessagePtr msg) >>> { >>> - int ret = -1; >>> - size_t need; >>> + virNetMessagePtr tmp_msg; >>> + >>> + VIR_DEBUG("Incoming stream message: stream=%p message=%p", st, msg); >>> + >>> + /* Unfortunately, we must allocate new message as the one we >>> + * get in @msg is going to be cleared later in the process. */ >>> + >>> + if (!(tmp_msg = virNetMessageNew(false))) >>> + return -1; >>> + >>> + /* Copy header */ >>> + memcpy(&tmp_msg->header, &msg->header, sizeof(msg->header)); >>> + >>> + /* Steal message buffer */ >>> + tmp_msg->buffer = msg->buffer; >>> + tmp_msg->bufferLength = msg->bufferLength; >>> + tmp_msg->bufferOffset = msg->bufferOffset; >>> + msg->buffer = NULL; >>> + msg->bufferLength = msg->bufferOffset = 0; >>> >>> virObjectLock(st); >>> - need = msg->bufferLength - msg->bufferOffset; >>> - if (need) { >>> - size_t avail = st->incomingLength - st->incomingOffset; >>> - if (need > avail) { >>> - size_t extra = need - avail; >>> - if (VIR_REALLOC_N(st->incoming, >>> - st->incomingLength + extra) < 0) { >>> - VIR_DEBUG("Out of memory handling stream data"); >>> - goto cleanup; >>> - } >>> - st->incomingLength += extra; >>> - } >>> >>> - memcpy(st->incoming + st->incomingOffset, >>> - msg->buffer + msg->bufferOffset, >>> - msg->bufferLength - msg->bufferOffset); >>> - st->incomingOffset += (msg->bufferLength - msg->bufferOffset); >>> - } else { >>> - st->incomingEOF = true; >>> - } >>> + virNetMessageQueuePush(&st->rx, tmp_msg); >>> >>> - VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d", >>> - st->incomingOffset, st->incomingLength, >>> - st->incomingEOF); >>> virNetClientStreamEventTimerUpdate(st); >>> >>> - ret = 0; >>> - >>> - cleanup: >>> virObjectUnlock(st); >>> - return ret; >>> + return 0; >>> } >>> >>> >>> @@ -362,10 +357,12 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, >>> bool nonblock) >>> { >>> int rv = -1; >>> + size_t want; >>> + >>> VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d", >>> st, client, data, nbytes, nonblock); >>> virObjectLock(st); >>> - if (!st->incomingOffset && !st->incomingEOF) { >>> + if (!st->rx && !st->incomingEOF) { >>> virNetMessagePtr msg; >>> int ret; >>> >>> @@ -395,23 +392,28 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, >>> goto cleanup; >>> } >>> >>> - VIR_DEBUG("After IO %zu", st->incomingOffset); >>> - if (st->incomingOffset) { >>> - int want = st->incomingOffset; >>> - if (want > nbytes) >>> - want = nbytes; >>> - memcpy(data, st->incoming, want); >>> - if (want < st->incomingOffset) { >>> - memmove(st->incoming, st->incoming + want, st->incomingOffset - want); >>> - st->incomingOffset -= want; >>> - } else { >>> - VIR_FREE(st->incoming); >>> - st->incomingOffset = st->incomingLength = 0; >>> + VIR_DEBUG("After IO rx=%p", st->rx); >>> + want = nbytes; >>> + while (want && st->rx) { >> >> So if 'st->rx == NULL', then 'rv = nbytes - want;' or 0 - I assume that >> is 'expected'... > > Yes. Calling virStreamRecv() on client side will basically boil down to > calling this function. And return value of this function will become > return value of the wrapper. As described in the docs, virStreamRecv() > and this virNetClientStreamRecvPacket() returns number of bytes read > from stream. In case there's no incoming data, there's nothing we can > read from and therefore we should return 0. > >> >>> + virNetMessagePtr msg = st->rx; >>> + size_t len = want; >>> + >>> + if (len > msg->bufferLength - msg->bufferOffset) >>> + len = msg->bufferLength - msg->bufferOffset; >>> + >>> + if (!len) >>> + break; >>> + >>> + memcpy(data + (nbytes - want), msg->buffer + msg->bufferOffset, len); >>> + want -= len; >>> + msg->bufferOffset += len; >>> + >>> + if (msg->bufferOffset == msg->bufferLength) { >>> + virNetMessageQueueServe(&st->rx); >>> + virNetMessageFree(msg); >> >> Nothing needs to be done with want here? I guess this shows my lack of >> depth of understanding of these algorithms... Big black box that I hope >> works without me needing to intervene! > > No. This does nothing more than: if the head of linked list of incoming > stream messages is fully read (*), then pop the message at the head and > move to the other message in the queue (list). In that case, I haven't > copied any data to user, therefore I should not change @want. > > (*) - It may happen, that users will read less bytes than there is in > incoming message. For instance, incoming stream packet (message) can be > 1024 bytes in size, but user will read 1 byte at the time from stream. > > Hope my explanation makes it clear(-er) to you. > > Michal > -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list