> On Sat, Jun 06, 2015 at 07:36:48PM +0000, Ossi Herrala wrote: > > Sorry to miss this mail, it got buried somehow and I haven't got to it > until now since nobody pinged it. Sorry for the long wait then. > No worries and thank you for taking time to review my patch. See new patch attached as well as comments on the memory usage below. The patch is tested on latest master (e46791e003444ce825feaf5bb2a16f778ee951e5). > The only thing I would mention wrt to how it works after this patch is > that it will consume some memory that's not needed, precisely > (sizeof(struct iovec) + sizeof(void *)) * unreadMBs. It might be > worth it to do: > > memmove(st->incomingVec, st->incomingVec + st->readVec, > st->writeVec - st->readVec); > VIR_SHRINK_N(st->incomingVec, st->readVec); > st->writeVec -= st->readVec; > st->readVec = 0; > > Apart from that it's definitely *way* better approach. What do you > think of such modification? This would go either instead > 'st->readVec++', but rather at the end of this function, so it's not > done after each MB read. > I totally agree and implemented freeing of the memory in the new patch: if (st->readVec >= 16) { memmove(st->incomingVec, st->incomingVec + st->readVec, sizeof(*st->incomingVec)*(st->writeVec - st->readVec)); VIR_SHRINK_N(st->incomingVec, st->writeVec, st->readVec); VIR_DEBUG("shrink removed %zu, left %zu", st->readVec, st->writeVec); st->readVec = 0; } now it only frees memory in chunks of 16 to avoid doing memmove() all the time. The iovec is 16 bytes (in 64 bit Linux) so this frees 256 bytes at a time and in my testing usually everything or almost everything that is allocated. Thanks again for taking time to review and commit, and I hope the new patch below is formatted ok. >From 2ae95c31568eb800c1c6df3641a8ecbdc95bf268 Mon Sep 17 00:00:00 2001 From: Ossi Herrala <oherrala@xxxxxxxxx> Date: Mon, 20 Jul 2015 12:44:32 +0000 Subject: [PATCH] rpc: Fix slow volume download (virsh vol-download) Use I/O vector (iovec) instead of one huge memory buffer as suggested in https://bugzilla.redhat.com/show_bug.cgi?id=1026137#c7. This avoids doing memmove() to big buffers and performance doesn't degrade if source (virNetClientStreamQueuePacket()) is faster than sink (virNetClientStreamRecvPacket()). Resolves: http://bugzilla.redhat.com/1026137 --- src/rpc/virnetclientstream.c | 152 +++++++++++++++++++++++++++--------------- 1 files changed, 99 insertions(+), 53 deletions(-) diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index b428f4b..1cc9002 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -49,9 +49,9 @@ struct _virNetClientStream { * time by stopping consuming any incoming data * off the socket.... */ - char *incoming; - size_t incomingOffset; - size_t incomingLength; + struct iovec *incomingVec; /* I/O Vector to hold data */ + size_t writeVec; /* Vectors produced */ + size_t readVec; /* Vectors consumed */ bool incomingEOF; virNetClientStreamEventCallback cb; @@ -86,9 +86,9 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st) if (!st->cb) return; - VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset, st->cbEvents); + VIR_DEBUG("Check timer readVec %zu writeVec %zu %d", st->readVec, st->writeVec, st->cbEvents); - if (((st->incomingOffset || st->incomingEOF) && + if ((((st->readVec < st->writeVec) || st->incomingEOF) && (st->cbEvents & VIR_STREAM_EVENT_READABLE)) || (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) { VIR_DEBUG("Enabling event timer"); @@ -110,13 +110,14 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque) if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_READABLE) && - (st->incomingOffset || st->incomingEOF)) + ((st->readVec < st->writeVec) || st->incomingEOF)) events |= VIR_STREAM_EVENT_READABLE; if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) events |= VIR_STREAM_EVENT_WRITABLE; - VIR_DEBUG("Got Timer dispatch %d %d offset=%zu", events, st->cbEvents, st->incomingOffset); + VIR_DEBUG("Got Timer dispatch %d %d readVec %zu writeVec %zu", events, st->cbEvents, + st->readVec, st->writeVec); if (events) { virNetClientStreamEventCallback cb = st->cb; void *cbOpaque = st->cbOpaque; @@ -161,7 +162,7 @@ void virNetClientStreamDispose(void *obj) virNetClientStreamPtr st = obj; virResetError(&st->err); - VIR_FREE(st->incoming); + VIR_FREE(st->incomingVec); virObjectUnref(st->prog); } @@ -265,38 +266,50 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr st, virNetMessagePtr msg) { int ret = -1; - size_t need; + struct iovec iov; + char *base; + size_t piece, pieces, length, offset = 0, size = 1024*1024; virObjectLock(st); - need = msg->bufferLength - msg->bufferOffset; - if (need) { - size_t avail = st->incomingLength - st->incomingOffset; - if (need > avail) { - size_t extra = need - avail; - if (VIR_REALLOC_N(st->incoming, - st->incomingLength + extra) < 0) { - VIR_DEBUG("Out of memory handling stream data"); - goto cleanup; - } - st->incomingLength += extra; - } - memcpy(st->incoming + st->incomingOffset, - msg->buffer + msg->bufferOffset, - msg->bufferLength - msg->bufferOffset); - st->incomingOffset += (msg->bufferLength - msg->bufferOffset); - } else { + length = msg->bufferLength - msg->bufferOffset; + + if (length == 0) { st->incomingEOF = true; + goto end; } - VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d", - st->incomingOffset, st->incomingLength, - st->incomingEOF); - virNetClientStreamEventTimerUpdate(st); + pieces = (length + size - 1) / size; + for (piece = 0; piece < pieces; piece++) { + if (size > length - offset) + size = length - offset; + + if (VIR_ALLOC_N(base, size)) { + VIR_DEBUG("Allocation failed"); + goto cleanup; + } + + memcpy(base, msg->buffer + msg->bufferOffset + offset, size); + iov.iov_base = base; + iov.iov_len = size; + offset += size; + if (VIR_APPEND_ELEMENT(st->incomingVec, st->writeVec, iov) < 0) { + VIR_DEBUG("Append failed"); + VIR_FREE(base); + goto cleanup; + } + VIR_DEBUG("Wrote piece of vector. readVec %zu, writeVec %zu size %zu", + st->readVec, st->writeVec, size); + } + + end: + virNetClientStreamEventTimerUpdate(st); ret = 0; cleanup: + VIR_DEBUG("Stream incoming data readVec %zu writeVec %zu EOF %d", + st->readVec, st->writeVec, st->incomingEOF); virObjectUnlock(st); return ret; } @@ -361,17 +374,21 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, size_t nbytes, bool nonblock) { - int rv = -1; + int ret = -1; + size_t partial, offset; + + virObjectLock(st); + VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d", st, client, data, nbytes, nonblock); - virObjectLock(st); - if (!st->incomingOffset && !st->incomingEOF) { + + if ((st->readVec >= st->writeVec) && !st->incomingEOF) { virNetMessagePtr msg; - int ret; + int rv; if (nonblock) { VIR_DEBUG("Non-blocking mode and no data available"); - rv = -2; + ret = -2; goto cleanup; } @@ -387,37 +404,66 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, VIR_DEBUG("Dummy packet to wait for stream data"); virObjectUnlock(st); - ret = virNetClientSendWithReplyStream(client, msg, st); + rv = virNetClientSendWithReplyStream(client, msg, st); virObjectLock(st); virNetMessageFree(msg); - if (ret < 0) + if (rv < 0) goto cleanup; } - VIR_DEBUG("After IO %zu", st->incomingOffset); - if (st->incomingOffset) { - int want = st->incomingOffset; - if (want > nbytes) - want = nbytes; - memcpy(data, st->incoming, want); - if (want < st->incomingOffset) { - memmove(st->incoming, st->incoming + want, st->incomingOffset - want); - st->incomingOffset -= want; - } else { - VIR_FREE(st->incoming); - st->incomingOffset = st->incomingLength = 0; + offset = 0; + partial = nbytes; + + while (st->incomingVec && (st->readVec < st->writeVec)) { + struct iovec *iov = st->incomingVec + st->readVec; + + if (!iov || !iov->iov_base) { + virReportError(VIR_ERR_INTERNAL_ERROR, + "%s", _("NULL pointer encountered")); + goto cleanup; } - rv = want; - } else { - rv = 0; + + if (partial < iov->iov_len) { + memcpy(data+offset, iov->iov_base, partial); + memmove(iov->iov_base, (char*)iov->iov_base+partial, + iov->iov_len-partial); + iov->iov_len -= partial; + offset += partial; + VIR_DEBUG("Consumed %zu, left %zu", partial, iov->iov_len); + break; + } + + memcpy(data+offset, iov->iov_base, iov->iov_len); + VIR_DEBUG("Consumed %zu. Moving to next piece", iov->iov_len); + partial -= iov->iov_len; + offset += iov->iov_len; + VIR_FREE(iov->iov_base); + iov->iov_len = 0; + st->readVec++; + + VIR_DEBUG("Read piece of vector. read %zu, readVec %zu, writeVec %zu", + offset, st->readVec, st->writeVec); } + /* Shrink the I/O Vector buffer to free up memory. Do the + shrinking only when there is selected amount or more buffers to + free so it doesn't constantly memmove() and realloc() buffers. + */ + if (st->readVec >= 16) { + memmove(st->incomingVec, st->incomingVec + st->readVec, + sizeof(*st->incomingVec)*(st->writeVec - st->readVec)); + VIR_SHRINK_N(st->incomingVec, st->writeVec, st->readVec); + VIR_DEBUG("shrink removed %zu, left %zu", st->readVec, st->writeVec); + st->readVec = 0; + } + + ret = offset; virNetClientStreamEventTimerUpdate(st); cleanup: virObjectUnlock(st); - return rv; + return ret; } -- 1.7.1 -- Ossi Herrala -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list