[PATCH] rpc: RH1026137: Fix slow volume download (virsh vol-download)

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Use I/O vector (iovec) instead of one huge memory buffer as suggested
in https://bugzilla.redhat.com/show_bug.cgi?id=1026137#c7. This avoids
doing memmove() to big buffers and performance doesn't degrade if
source (virNetClientStreamQueuePacket()) is faster than sink
(virNetClientStreamRecvPacket()).
---
 src/rpc/virnetclientstream.c |  134 +++++++++++++++++++++++++----------------
 1 files changed, 82 insertions(+), 52 deletions(-)

diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c
index b428f4b..18c6e8b 100644
--- a/src/rpc/virnetclientstream.c
+++ b/src/rpc/virnetclientstream.c
@@ -49,9 +49,9 @@ struct _virNetClientStream {
      * time by stopping consuming any incoming data
      * off the socket....
      */
-    char *incoming;
-    size_t incomingOffset;
-    size_t incomingLength;
+    struct iovec *incomingVec; /* I/O Vector to hold data */
+    size_t writeVec;           /* Vectors produced */
+    size_t readVec;            /* Vectors consumed */
     bool incomingEOF;
 
     virNetClientStreamEventCallback cb;
@@ -86,9 +86,9 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st)
     if (!st->cb)
         return;
 
-    VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset, st->cbEvents);
+    VIR_DEBUG("Check timer readVec %zu writeVec %zu %d", st->readVec, st->writeVec, st->cbEvents);
 
-    if (((st->incomingOffset || st->incomingEOF) &&
+    if ((((st->readVec < st->writeVec) || st->incomingEOF) &&
          (st->cbEvents & VIR_STREAM_EVENT_READABLE)) ||
         (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) {
         VIR_DEBUG("Enabling event timer");
@@ -110,13 +110,14 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
 
     if (st->cb &&
         (st->cbEvents & VIR_STREAM_EVENT_READABLE) &&
-        (st->incomingOffset || st->incomingEOF))
+        ((st->readVec < st->writeVec) || st->incomingEOF))
         events |= VIR_STREAM_EVENT_READABLE;
     if (st->cb &&
         (st->cbEvents & VIR_STREAM_EVENT_WRITABLE))
         events |= VIR_STREAM_EVENT_WRITABLE;
 
-    VIR_DEBUG("Got Timer dispatch %d %d offset=%zu", events, st->cbEvents, st->incomingOffset);
+    VIR_DEBUG("Got Timer dispatch %d %d readVec %zu writeVec %zu", events, st->cbEvents,
+              st->readVec, st->writeVec);
     if (events) {
         virNetClientStreamEventCallback cb = st->cb;
         void *cbOpaque = st->cbOpaque;
@@ -161,7 +162,7 @@ void virNetClientStreamDispose(void *obj)
     virNetClientStreamPtr st = obj;
 
     virResetError(&st->err);
-    VIR_FREE(st->incoming);
+    VIR_FREE(st->incomingVec);
     virObjectUnref(st->prog);
 }
 
@@ -265,38 +266,49 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr st,
                                   virNetMessagePtr msg)
 {
     int ret = -1;
-    size_t need;
+    struct iovec iov;
+    char *base;
+    size_t piece, pieces, length, offset = 0, size = 1024*1024;
 
     virObjectLock(st);
-    need = msg->bufferLength - msg->bufferOffset;
-    if (need) {
-        size_t avail = st->incomingLength - st->incomingOffset;
-        if (need > avail) {
-            size_t extra = need - avail;
-            if (VIR_REALLOC_N(st->incoming,
-                              st->incomingLength + extra) < 0) {
-                VIR_DEBUG("Out of memory handling stream data");
-                goto cleanup;
-            }
-            st->incomingLength += extra;
-        }
 
-        memcpy(st->incoming + st->incomingOffset,
-               msg->buffer + msg->bufferOffset,
-               msg->bufferLength - msg->bufferOffset);
-        st->incomingOffset += (msg->bufferLength - msg->bufferOffset);
-    } else {
+    length = msg->bufferLength - msg->bufferOffset;
+
+    if (length == 0) {
         st->incomingEOF = true;
+        goto end;
     }
 
-    VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d",
-              st->incomingOffset, st->incomingLength,
-              st->incomingEOF);
+    pieces = (length + size - 1) / size;
+    for (piece = 0; piece < pieces; piece++) {
+        if (size > length - offset)
+            size = length - offset;
+
+        if (VIR_ALLOC_N(base, size)) {
+            VIR_DEBUG("Allocation failed");
+            goto cleanup;
+        }
+
+        memcpy(base, msg->buffer + msg->bufferOffset + offset, size);
+        iov.iov_base = base;
+        iov.iov_len = size;
+        offset += size;
+
+        if (VIR_APPEND_ELEMENT(st->incomingVec, st->writeVec, iov) < 0) {
+            VIR_DEBUG("Append failed");
+            VIR_FREE(base);
+            goto cleanup;
+        }
+        VIR_DEBUG("Wrote piece of vector. readVec %zu, writeVec %zu size %zu", st->readVec, st->writeVec, size);
+    }
+
+ end:
     virNetClientStreamEventTimerUpdate(st);
-
     ret = 0;
 
  cleanup:
+    VIR_DEBUG("Stream incoming data readVec %zu writeVec %zu EOF %d",
+              st->readVec, st->writeVec, st->incomingEOF);
     virObjectUnlock(st);
     return ret;
 }
@@ -361,17 +373,21 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
                                  size_t nbytes,
                                  bool nonblock)
 {
-    int rv = -1;
+    int ret = -1;
+    size_t partial, offset;
+
+    virObjectLock(st);
+
     VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d",
               st, client, data, nbytes, nonblock);
-    virObjectLock(st);
-    if (!st->incomingOffset && !st->incomingEOF) {
+
+    if ((st->readVec >= st->writeVec) && !st->incomingEOF) {
         virNetMessagePtr msg;
-        int ret;
+        int rv;
 
         if (nonblock) {
             VIR_DEBUG("Non-blocking mode and no data available");
-            rv = -2;
+            ret = -2;
             goto cleanup;
         }
 
@@ -387,37 +403,51 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
 
         VIR_DEBUG("Dummy packet to wait for stream data");
         virObjectUnlock(st);
-        ret = virNetClientSendWithReplyStream(client, msg, st);
+        rv = virNetClientSendWithReplyStream(client, msg, st);
         virObjectLock(st);
         virNetMessageFree(msg);
 
-        if (ret < 0)
+        if (rv < 0)
             goto cleanup;
     }
 
-    VIR_DEBUG("After IO %zu", st->incomingOffset);
-    if (st->incomingOffset) {
-        int want = st->incomingOffset;
-        if (want > nbytes)
-            want = nbytes;
-        memcpy(data, st->incoming, want);
-        if (want < st->incomingOffset) {
-            memmove(st->incoming, st->incoming + want, st->incomingOffset - want);
-            st->incomingOffset -= want;
+    offset = 0;
+    partial = nbytes;
+
+    while (st->incomingVec && (st->readVec < st->writeVec)) {
+        struct iovec *iov = st->incomingVec + st->readVec;
+
+        if (!iov || !iov->iov_base) {
+            VIR_DEBUG("NULL pointer");
+            goto cleanup;
+        }
+
+        if (partial < iov->iov_len) {
+            memcpy(data+offset, iov->iov_base, partial);
+            memmove(iov->iov_base, (char*)iov->iov_base+partial, iov->iov_len-partial);
+            iov->iov_len -= partial;
+            offset += partial;
+            VIR_DEBUG("Consumed %zu, left %zu", partial, iov->iov_len);
+            break;
         } else {
-            VIR_FREE(st->incoming);
-            st->incomingOffset = st->incomingLength = 0;
+            memcpy(data+offset, iov->iov_base, iov->iov_len);
+            VIR_DEBUG("Consumed %zu. Moving to next piece", iov->iov_len);
+            partial -= iov->iov_len;
+            offset += iov->iov_len;
+            VIR_FREE(iov->iov_base);
+            iov->iov_len = 0;
+            st->readVec++;
         }
-        rv = want;
-    } else {
-        rv = 0;
+
+        VIR_DEBUG("Read piece of vector. read %zu readVec %zu, writeVec %zu", offset, st->readVec, st->writeVec);
     }
 
+    ret = offset;
     virNetClientStreamEventTimerUpdate(st);
 
  cleanup:
     virObjectUnlock(st);
-    return rv;
+    return ret;
 }
 
 
-- 
1.7.1

--
libvir-list mailing list
libvir-list@xxxxxxxxxx
https://www.redhat.com/mailman/listinfo/libvir-list




[Index of Archives]     [Virt Tools]     [Libvirt Users]     [Lib OS Info]     [Fedora Users]     [Fedora Desktop]     [Fedora SELinux]     [Big List of Linux Books]     [Yosemite News]     [KDE Users]     [Fedora Tools]