[PATCH 7/9] Revert "rpc: Fix slow volume download (virsh vol-download)"

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



This reverts commit d9c9e138f22c48626f719f880920e04c639e0177.

Unfortunately, things are going to be handled differently so this
commit must go.

Signed-off-by: Michal Privoznik <mprivozn@xxxxxxxxxx>
---
 src/rpc/virnetclientstream.c | 152 +++++++++++++++----------------------------
 1 file changed, 53 insertions(+), 99 deletions(-)

diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c
index 64e9cd2..b428f4b 100644
--- a/src/rpc/virnetclientstream.c
+++ b/src/rpc/virnetclientstream.c
@@ -49,9 +49,9 @@ struct _virNetClientStream {
      * time by stopping consuming any incoming data
      * off the socket....
      */
-    struct iovec *incomingVec; /* I/O Vector to hold data */
-    size_t writeVec;           /* Vectors produced */
-    size_t readVec;            /* Vectors consumed */
+    char *incoming;
+    size_t incomingOffset;
+    size_t incomingLength;
     bool incomingEOF;
 
     virNetClientStreamEventCallback cb;
@@ -86,9 +86,9 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st)
     if (!st->cb)
         return;
 
-    VIR_DEBUG("Check timer readVec %zu writeVec %zu %d", st->readVec, st->writeVec, st->cbEvents);
+    VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset, st->cbEvents);
 
-    if ((((st->readVec < st->writeVec) || st->incomingEOF) &&
+    if (((st->incomingOffset || st->incomingEOF) &&
          (st->cbEvents & VIR_STREAM_EVENT_READABLE)) ||
         (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) {
         VIR_DEBUG("Enabling event timer");
@@ -110,14 +110,13 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
 
     if (st->cb &&
         (st->cbEvents & VIR_STREAM_EVENT_READABLE) &&
-        ((st->readVec < st->writeVec) || st->incomingEOF))
+        (st->incomingOffset || st->incomingEOF))
         events |= VIR_STREAM_EVENT_READABLE;
     if (st->cb &&
         (st->cbEvents & VIR_STREAM_EVENT_WRITABLE))
         events |= VIR_STREAM_EVENT_WRITABLE;
 
-    VIR_DEBUG("Got Timer dispatch %d %d readVec %zu writeVec %zu", events, st->cbEvents,
-              st->readVec, st->writeVec);
+    VIR_DEBUG("Got Timer dispatch %d %d offset=%zu", events, st->cbEvents, st->incomingOffset);
     if (events) {
         virNetClientStreamEventCallback cb = st->cb;
         void *cbOpaque = st->cbOpaque;
@@ -162,7 +161,7 @@ void virNetClientStreamDispose(void *obj)
     virNetClientStreamPtr st = obj;
 
     virResetError(&st->err);
-    VIR_FREE(st->incomingVec);
+    VIR_FREE(st->incoming);
     virObjectUnref(st->prog);
 }
 
@@ -266,50 +265,38 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr st,
                                   virNetMessagePtr msg)
 {
     int ret = -1;
-    struct iovec iov;
-    char *base;
-    size_t piece, pieces, length, offset = 0, size = 1024*1024;
+    size_t need;
 
     virObjectLock(st);
+    need = msg->bufferLength - msg->bufferOffset;
+    if (need) {
+        size_t avail = st->incomingLength - st->incomingOffset;
+        if (need > avail) {
+            size_t extra = need - avail;
+            if (VIR_REALLOC_N(st->incoming,
+                              st->incomingLength + extra) < 0) {
+                VIR_DEBUG("Out of memory handling stream data");
+                goto cleanup;
+            }
+            st->incomingLength += extra;
+        }
 
-    length = msg->bufferLength - msg->bufferOffset;
-
-    if (length == 0) {
+        memcpy(st->incoming + st->incomingOffset,
+               msg->buffer + msg->bufferOffset,
+               msg->bufferLength - msg->bufferOffset);
+        st->incomingOffset += (msg->bufferLength - msg->bufferOffset);
+    } else {
         st->incomingEOF = true;
-        goto end;
     }
 
-    pieces = VIR_DIV_UP(length, size);
-    for (piece = 0; piece < pieces; piece++) {
-        if (size > length - offset)
-            size = length - offset;
-
-        if (VIR_ALLOC_N(base, size)) {
-            VIR_DEBUG("Allocation failed");
-            goto cleanup;
-        }
-
-        memcpy(base, msg->buffer + msg->bufferOffset + offset, size);
-        iov.iov_base = base;
-        iov.iov_len = size;
-        offset += size;
-
-        if (VIR_APPEND_ELEMENT(st->incomingVec, st->writeVec, iov) < 0) {
-            VIR_DEBUG("Append failed");
-            VIR_FREE(base);
-            goto cleanup;
-        }
-        VIR_DEBUG("Wrote piece of vector. readVec %zu, writeVec %zu size %zu",
-                  st->readVec, st->writeVec, size);
-    }
-
- end:
+    VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d",
+              st->incomingOffset, st->incomingLength,
+              st->incomingEOF);
     virNetClientStreamEventTimerUpdate(st);
+
     ret = 0;
 
  cleanup:
-    VIR_DEBUG("Stream incoming data readVec %zu writeVec %zu EOF %d",
-              st->readVec, st->writeVec, st->incomingEOF);
     virObjectUnlock(st);
     return ret;
 }
@@ -374,21 +361,17 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
                                  size_t nbytes,
                                  bool nonblock)
 {
-    int ret = -1;
-    size_t partial, offset;
-
-    virObjectLock(st);
-
+    int rv = -1;
     VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d",
               st, client, data, nbytes, nonblock);
-
-    if ((st->readVec >= st->writeVec) && !st->incomingEOF) {
+    virObjectLock(st);
+    if (!st->incomingOffset && !st->incomingEOF) {
         virNetMessagePtr msg;
-        int rv;
+        int ret;
 
         if (nonblock) {
             VIR_DEBUG("Non-blocking mode and no data available");
-            ret = -2;
+            rv = -2;
             goto cleanup;
         }
 
@@ -404,66 +387,37 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
 
         VIR_DEBUG("Dummy packet to wait for stream data");
         virObjectUnlock(st);
-        rv = virNetClientSendWithReplyStream(client, msg, st);
+        ret = virNetClientSendWithReplyStream(client, msg, st);
         virObjectLock(st);
         virNetMessageFree(msg);
 
-        if (rv < 0)
+        if (ret < 0)
             goto cleanup;
     }
 
-    offset = 0;
-    partial = nbytes;
-
-    while (st->incomingVec && (st->readVec < st->writeVec)) {
-        struct iovec *iov = st->incomingVec + st->readVec;
-
-        if (!iov || !iov->iov_base) {
-            virReportError(VIR_ERR_INTERNAL_ERROR,
-                           "%s", _("NULL pointer encountered"));
-            goto cleanup;
-        }
-
-        if (partial < iov->iov_len) {
-            memcpy(data+offset, iov->iov_base, partial);
-            memmove(iov->iov_base, (char*)iov->iov_base+partial,
-                    iov->iov_len-partial);
-            iov->iov_len -= partial;
-            offset += partial;
-            VIR_DEBUG("Consumed %zu, left %zu", partial, iov->iov_len);
-            break;
+    VIR_DEBUG("After IO %zu", st->incomingOffset);
+    if (st->incomingOffset) {
+        int want = st->incomingOffset;
+        if (want > nbytes)
+            want = nbytes;
+        memcpy(data, st->incoming, want);
+        if (want < st->incomingOffset) {
+            memmove(st->incoming, st->incoming + want, st->incomingOffset - want);
+            st->incomingOffset -= want;
+        } else {
+            VIR_FREE(st->incoming);
+            st->incomingOffset = st->incomingLength = 0;
         }
-
-        memcpy(data+offset, iov->iov_base, iov->iov_len);
-        VIR_DEBUG("Consumed %zu. Moving to next piece", iov->iov_len);
-        partial -= iov->iov_len;
-        offset += iov->iov_len;
-        VIR_FREE(iov->iov_base);
-        iov->iov_len = 0;
-        st->readVec++;
-
-        VIR_DEBUG("Read piece of vector. read %zu, readVec %zu, writeVec %zu",
-                  offset, st->readVec, st->writeVec);
-    }
-
-    /* Shrink the I/O Vector buffer to free up memory. Do the
-       shrinking only when there is selected amount or more buffers to
-       free so it doesn't constantly memmove() and realloc() buffers.
-     */
-    if (st->readVec >= 16) {
-        memmove(st->incomingVec, st->incomingVec + st->readVec,
-                sizeof(*st->incomingVec)*(st->writeVec - st->readVec));
-        VIR_SHRINK_N(st->incomingVec, st->writeVec, st->readVec);
-        VIR_DEBUG("shrink removed %zu, left %zu", st->readVec, st->writeVec);
-        st->readVec = 0;
+        rv = want;
+    } else {
+        rv = 0;
     }
 
-    ret = offset;
     virNetClientStreamEventTimerUpdate(st);
 
  cleanup:
     virObjectUnlock(st);
-    return ret;
+    return rv;
 }
 
 
-- 
2.7.3

--
libvir-list mailing list
libvir-list@xxxxxxxxxx
https://www.redhat.com/mailman/listinfo/libvir-list



[Index of Archives]     [Virt Tools]     [Libvirt Users]     [Lib OS Info]     [Fedora Users]     [Fedora Desktop]     [Fedora SELinux]     [Big List of Linux Books]     [Yosemite News]     [KDE Users]     [Fedora Tools]