Use I/O vector (iovec) instead of one huge memory buffer as suggested in https://bugzilla.redhat.com/show_bug.cgi?id=1026137#c7. This avoids doing memmove() to big buffers and performance doesn't degrade if source (virNetClientStreamQueuePacket()) is faster than sink (virNetClientStreamRecvPacket()). --- src/rpc/virnetclientstream.c | 134 +++++++++++++++++++++++++---------------- 1 files changed, 82 insertions(+), 52 deletions(-) diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index b428f4b..18c6e8b 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -49,9 +49,9 @@ struct _virNetClientStream { * time by stopping consuming any incoming data * off the socket.... */ - char *incoming; - size_t incomingOffset; - size_t incomingLength; + struct iovec *incomingVec; /* I/O Vector to hold data */ + size_t writeVec; /* Vectors produced */ + size_t readVec; /* Vectors consumed */ bool incomingEOF; virNetClientStreamEventCallback cb; @@ -86,9 +86,9 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st) if (!st->cb) return; - VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset, st->cbEvents); + VIR_DEBUG("Check timer readVec %zu writeVec %zu %d", st->readVec, st->writeVec, st->cbEvents); - if (((st->incomingOffset || st->incomingEOF) && + if ((((st->readVec < st->writeVec) || st->incomingEOF) && (st->cbEvents & VIR_STREAM_EVENT_READABLE)) || (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) { VIR_DEBUG("Enabling event timer"); @@ -110,13 +110,14 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque) if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_READABLE) && - (st->incomingOffset || st->incomingEOF)) + ((st->readVec < st->writeVec) || st->incomingEOF)) events |= VIR_STREAM_EVENT_READABLE; if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) events |= VIR_STREAM_EVENT_WRITABLE; - VIR_DEBUG("Got Timer dispatch %d %d offset=%zu", events, st->cbEvents, st->incomingOffset); + VIR_DEBUG("Got Timer dispatch %d %d readVec %zu writeVec %zu", events, st->cbEvents, + st->readVec, st->writeVec); if (events) { virNetClientStreamEventCallback cb = st->cb; void *cbOpaque = st->cbOpaque; @@ -161,7 +162,7 @@ void virNetClientStreamDispose(void *obj) virNetClientStreamPtr st = obj; virResetError(&st->err); - VIR_FREE(st->incoming); + VIR_FREE(st->incomingVec); virObjectUnref(st->prog); } @@ -265,38 +266,49 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr st, virNetMessagePtr msg) { int ret = -1; - size_t need; + struct iovec iov; + char *base; + size_t piece, pieces, length, offset = 0, size = 1024*1024; virObjectLock(st); - need = msg->bufferLength - msg->bufferOffset; - if (need) { - size_t avail = st->incomingLength - st->incomingOffset; - if (need > avail) { - size_t extra = need - avail; - if (VIR_REALLOC_N(st->incoming, - st->incomingLength + extra) < 0) { - VIR_DEBUG("Out of memory handling stream data"); - goto cleanup; - } - st->incomingLength += extra; - } - memcpy(st->incoming + st->incomingOffset, - msg->buffer + msg->bufferOffset, - msg->bufferLength - msg->bufferOffset); - st->incomingOffset += (msg->bufferLength - msg->bufferOffset); - } else { + length = msg->bufferLength - msg->bufferOffset; + + if (length == 0) { st->incomingEOF = true; + goto end; } - VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d", - st->incomingOffset, st->incomingLength, - st->incomingEOF); + pieces = (length + size - 1) / size; + for (piece = 0; piece < pieces; piece++) { + if (size > length - offset) + size = length - offset; + + if (VIR_ALLOC_N(base, size)) { + VIR_DEBUG("Allocation failed"); + goto cleanup; + } + + memcpy(base, msg->buffer + msg->bufferOffset + offset, size); + iov.iov_base = base; + iov.iov_len = size; + offset += size; + + if (VIR_APPEND_ELEMENT(st->incomingVec, st->writeVec, iov) < 0) { + VIR_DEBUG("Append failed"); + VIR_FREE(base); + goto cleanup; + } + VIR_DEBUG("Wrote piece of vector. readVec %zu, writeVec %zu size %zu", st->readVec, st->writeVec, size); + } + + end: virNetClientStreamEventTimerUpdate(st); - ret = 0; cleanup: + VIR_DEBUG("Stream incoming data readVec %zu writeVec %zu EOF %d", + st->readVec, st->writeVec, st->incomingEOF); virObjectUnlock(st); return ret; } @@ -361,17 +373,21 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, size_t nbytes, bool nonblock) { - int rv = -1; + int ret = -1; + size_t partial, offset; + + virObjectLock(st); + VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d", st, client, data, nbytes, nonblock); - virObjectLock(st); - if (!st->incomingOffset && !st->incomingEOF) { + + if ((st->readVec >= st->writeVec) && !st->incomingEOF) { virNetMessagePtr msg; - int ret; + int rv; if (nonblock) { VIR_DEBUG("Non-blocking mode and no data available"); - rv = -2; + ret = -2; goto cleanup; } @@ -387,37 +403,51 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, VIR_DEBUG("Dummy packet to wait for stream data"); virObjectUnlock(st); - ret = virNetClientSendWithReplyStream(client, msg, st); + rv = virNetClientSendWithReplyStream(client, msg, st); virObjectLock(st); virNetMessageFree(msg); - if (ret < 0) + if (rv < 0) goto cleanup; } - VIR_DEBUG("After IO %zu", st->incomingOffset); - if (st->incomingOffset) { - int want = st->incomingOffset; - if (want > nbytes) - want = nbytes; - memcpy(data, st->incoming, want); - if (want < st->incomingOffset) { - memmove(st->incoming, st->incoming + want, st->incomingOffset - want); - st->incomingOffset -= want; + offset = 0; + partial = nbytes; + + while (st->incomingVec && (st->readVec < st->writeVec)) { + struct iovec *iov = st->incomingVec + st->readVec; + + if (!iov || !iov->iov_base) { + VIR_DEBUG("NULL pointer"); + goto cleanup; + } + + if (partial < iov->iov_len) { + memcpy(data+offset, iov->iov_base, partial); + memmove(iov->iov_base, (char*)iov->iov_base+partial, iov->iov_len-partial); + iov->iov_len -= partial; + offset += partial; + VIR_DEBUG("Consumed %zu, left %zu", partial, iov->iov_len); + break; } else { - VIR_FREE(st->incoming); - st->incomingOffset = st->incomingLength = 0; + memcpy(data+offset, iov->iov_base, iov->iov_len); + VIR_DEBUG("Consumed %zu. Moving to next piece", iov->iov_len); + partial -= iov->iov_len; + offset += iov->iov_len; + VIR_FREE(iov->iov_base); + iov->iov_len = 0; + st->readVec++; } - rv = want; - } else { - rv = 0; + + VIR_DEBUG("Read piece of vector. read %zu readVec %zu, writeVec %zu", offset, st->readVec, st->writeVec); } + ret = offset; virNetClientStreamEventTimerUpdate(st); cleanup: virObjectUnlock(st); - return rv; + return ret; } -- 1.7.1 -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list