This reverts commit d9c9e138f22c48626f719f880920e04c639e0177. Unfortunately, things are going to be handled differently so this commit must go. Signed-off-by: Michal Privoznik <mprivozn@xxxxxxxxxx> --- src/rpc/virnetclientstream.c | 152 +++++++++++++++---------------------------- 1 file changed, 53 insertions(+), 99 deletions(-) diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 64e9cd2..b428f4b 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -49,9 +49,9 @@ struct _virNetClientStream { * time by stopping consuming any incoming data * off the socket.... */ - struct iovec *incomingVec; /* I/O Vector to hold data */ - size_t writeVec; /* Vectors produced */ - size_t readVec; /* Vectors consumed */ + char *incoming; + size_t incomingOffset; + size_t incomingLength; bool incomingEOF; virNetClientStreamEventCallback cb; @@ -86,9 +86,9 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st) if (!st->cb) return; - VIR_DEBUG("Check timer readVec %zu writeVec %zu %d", st->readVec, st->writeVec, st->cbEvents); + VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset, st->cbEvents); - if ((((st->readVec < st->writeVec) || st->incomingEOF) && + if (((st->incomingOffset || st->incomingEOF) && (st->cbEvents & VIR_STREAM_EVENT_READABLE)) || (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) { VIR_DEBUG("Enabling event timer"); @@ -110,14 +110,13 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque) if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_READABLE) && - ((st->readVec < st->writeVec) || st->incomingEOF)) + (st->incomingOffset || st->incomingEOF)) events |= VIR_STREAM_EVENT_READABLE; if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) events |= VIR_STREAM_EVENT_WRITABLE; - VIR_DEBUG("Got Timer dispatch %d %d readVec %zu writeVec %zu", events, st->cbEvents, - st->readVec, st->writeVec); + VIR_DEBUG("Got Timer dispatch %d %d offset=%zu", events, st->cbEvents, st->incomingOffset); if (events) { virNetClientStreamEventCallback cb = st->cb; void *cbOpaque = st->cbOpaque; @@ -162,7 +161,7 @@ void virNetClientStreamDispose(void *obj) virNetClientStreamPtr st = obj; virResetError(&st->err); - VIR_FREE(st->incomingVec); + VIR_FREE(st->incoming); virObjectUnref(st->prog); } @@ -266,50 +265,38 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr st, virNetMessagePtr msg) { int ret = -1; - struct iovec iov; - char *base; - size_t piece, pieces, length, offset = 0, size = 1024*1024; + size_t need; virObjectLock(st); + need = msg->bufferLength - msg->bufferOffset; + if (need) { + size_t avail = st->incomingLength - st->incomingOffset; + if (need > avail) { + size_t extra = need - avail; + if (VIR_REALLOC_N(st->incoming, + st->incomingLength + extra) < 0) { + VIR_DEBUG("Out of memory handling stream data"); + goto cleanup; + } + st->incomingLength += extra; + } - length = msg->bufferLength - msg->bufferOffset; - - if (length == 0) { + memcpy(st->incoming + st->incomingOffset, + msg->buffer + msg->bufferOffset, + msg->bufferLength - msg->bufferOffset); + st->incomingOffset += (msg->bufferLength - msg->bufferOffset); + } else { st->incomingEOF = true; - goto end; } - pieces = VIR_DIV_UP(length, size); - for (piece = 0; piece < pieces; piece++) { - if (size > length - offset) - size = length - offset; - - if (VIR_ALLOC_N(base, size)) { - VIR_DEBUG("Allocation failed"); - goto cleanup; - } - - memcpy(base, msg->buffer + msg->bufferOffset + offset, size); - iov.iov_base = base; - iov.iov_len = size; - offset += size; - - if (VIR_APPEND_ELEMENT(st->incomingVec, st->writeVec, iov) < 0) { - VIR_DEBUG("Append failed"); - VIR_FREE(base); - goto cleanup; - } - VIR_DEBUG("Wrote piece of vector. readVec %zu, writeVec %zu size %zu", - st->readVec, st->writeVec, size); - } - - end: + VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d", + st->incomingOffset, st->incomingLength, + st->incomingEOF); virNetClientStreamEventTimerUpdate(st); + ret = 0; cleanup: - VIR_DEBUG("Stream incoming data readVec %zu writeVec %zu EOF %d", - st->readVec, st->writeVec, st->incomingEOF); virObjectUnlock(st); return ret; } @@ -374,21 +361,17 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, size_t nbytes, bool nonblock) { - int ret = -1; - size_t partial, offset; - - virObjectLock(st); - + int rv = -1; VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d", st, client, data, nbytes, nonblock); - - if ((st->readVec >= st->writeVec) && !st->incomingEOF) { + virObjectLock(st); + if (!st->incomingOffset && !st->incomingEOF) { virNetMessagePtr msg; - int rv; + int ret; if (nonblock) { VIR_DEBUG("Non-blocking mode and no data available"); - ret = -2; + rv = -2; goto cleanup; } @@ -404,66 +387,37 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, VIR_DEBUG("Dummy packet to wait for stream data"); virObjectUnlock(st); - rv = virNetClientSendWithReplyStream(client, msg, st); + ret = virNetClientSendWithReplyStream(client, msg, st); virObjectLock(st); virNetMessageFree(msg); - if (rv < 0) + if (ret < 0) goto cleanup; } - offset = 0; - partial = nbytes; - - while (st->incomingVec && (st->readVec < st->writeVec)) { - struct iovec *iov = st->incomingVec + st->readVec; - - if (!iov || !iov->iov_base) { - virReportError(VIR_ERR_INTERNAL_ERROR, - "%s", _("NULL pointer encountered")); - goto cleanup; - } - - if (partial < iov->iov_len) { - memcpy(data+offset, iov->iov_base, partial); - memmove(iov->iov_base, (char*)iov->iov_base+partial, - iov->iov_len-partial); - iov->iov_len -= partial; - offset += partial; - VIR_DEBUG("Consumed %zu, left %zu", partial, iov->iov_len); - break; + VIR_DEBUG("After IO %zu", st->incomingOffset); + if (st->incomingOffset) { + int want = st->incomingOffset; + if (want > nbytes) + want = nbytes; + memcpy(data, st->incoming, want); + if (want < st->incomingOffset) { + memmove(st->incoming, st->incoming + want, st->incomingOffset - want); + st->incomingOffset -= want; + } else { + VIR_FREE(st->incoming); + st->incomingOffset = st->incomingLength = 0; } - - memcpy(data+offset, iov->iov_base, iov->iov_len); - VIR_DEBUG("Consumed %zu. Moving to next piece", iov->iov_len); - partial -= iov->iov_len; - offset += iov->iov_len; - VIR_FREE(iov->iov_base); - iov->iov_len = 0; - st->readVec++; - - VIR_DEBUG("Read piece of vector. read %zu, readVec %zu, writeVec %zu", - offset, st->readVec, st->writeVec); - } - - /* Shrink the I/O Vector buffer to free up memory. Do the - shrinking only when there is selected amount or more buffers to - free so it doesn't constantly memmove() and realloc() buffers. - */ - if (st->readVec >= 16) { - memmove(st->incomingVec, st->incomingVec + st->readVec, - sizeof(*st->incomingVec)*(st->writeVec - st->readVec)); - VIR_SHRINK_N(st->incomingVec, st->writeVec, st->readVec); - VIR_DEBUG("shrink removed %zu, left %zu", st->readVec, st->writeVec); - st->readVec = 0; + rv = want; + } else { + rv = 0; } - ret = offset; virNetClientStreamEventTimerUpdate(st); cleanup: virObjectUnlock(st); - return ret; + return rv; } -- 2.7.3 -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list