[PATCH 8/9] virnetclientstream: Process stream messages later

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Currently we have two separate functions for handling read from
a stream. One is supposed to be low level and reads data in this
self allocating chunk of memory. The other read function then
copies data over from the chunk into a user buffer. There are two
memcpy() involved even though a single would be sufficient.
Moreover, since we are copying just data, we can't process
alternative stream packets in the latter function, like stream
seeks.

In my testing, this proved two times faster then implementation
which uses IO vectors.

Signed-off-by: Michal Privoznik <mprivozn@xxxxxxxxxx>
---
 src/rpc/virnetclientstream.c | 106 ++++++++++++++++++++++---------------------
 1 file changed, 54 insertions(+), 52 deletions(-)

diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c
index b428f4b..34989a9 100644
--- a/src/rpc/virnetclientstream.c
+++ b/src/rpc/virnetclientstream.c
@@ -49,9 +49,7 @@ struct _virNetClientStream {
      * time by stopping consuming any incoming data
      * off the socket....
      */
-    char *incoming;
-    size_t incomingOffset;
-    size_t incomingLength;
+    virNetMessagePtr rx;
     bool incomingEOF;
 
     virNetClientStreamEventCallback cb;
@@ -86,9 +84,9 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st)
     if (!st->cb)
         return;
 
-    VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset, st->cbEvents);
+    VIR_DEBUG("Check timer rx=%p cbEvents=%d", st->rx, st->cbEvents);
 
-    if (((st->incomingOffset || st->incomingEOF) &&
+    if (((st->rx || st->incomingEOF) &&
          (st->cbEvents & VIR_STREAM_EVENT_READABLE)) ||
         (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) {
         VIR_DEBUG("Enabling event timer");
@@ -110,13 +108,13 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
 
     if (st->cb &&
         (st->cbEvents & VIR_STREAM_EVENT_READABLE) &&
-        (st->incomingOffset || st->incomingEOF))
+        (st->rx || st->incomingEOF))
         events |= VIR_STREAM_EVENT_READABLE;
     if (st->cb &&
         (st->cbEvents & VIR_STREAM_EVENT_WRITABLE))
         events |= VIR_STREAM_EVENT_WRITABLE;
 
-    VIR_DEBUG("Got Timer dispatch %d %d offset=%zu", events, st->cbEvents, st->incomingOffset);
+    VIR_DEBUG("Got Timer dispatch events=%d cbEvents=%d rx=%p", events, st->cbEvents, st->rx);
     if (events) {
         virNetClientStreamEventCallback cb = st->cb;
         void *cbOpaque = st->cbOpaque;
@@ -161,7 +159,11 @@ void virNetClientStreamDispose(void *obj)
     virNetClientStreamPtr st = obj;
 
     virResetError(&st->err);
-    VIR_FREE(st->incoming);
+    while (st->rx) {
+        virNetMessagePtr msg = st->rx;
+        virNetMessageQueueServe(&st->rx);
+        virNetMessageFree(msg);
+    }
     virObjectUnref(st->prog);
 }
 
@@ -264,41 +266,34 @@ int virNetClientStreamSetError(virNetClientStreamPtr st,
 int virNetClientStreamQueuePacket(virNetClientStreamPtr st,
                                   virNetMessagePtr msg)
 {
-    int ret = -1;
-    size_t need;
+    virNetMessagePtr tmp_msg;
+
+    VIR_DEBUG("Incoming stream message: stream=%p message=%p", st, msg);
+
+    /* Unfortunately, we must allocate new message as the one we
+     * get in @msg is going to be cleared later in the process. */
+
+    if (!(tmp_msg = virNetMessageNew(false)))
+        return -1;
+
+    /* Copy header */
+    memcpy(&tmp_msg->header, &msg->header, sizeof(msg->header));
+
+    /* Steal message buffer */
+    tmp_msg->buffer = msg->buffer;
+    tmp_msg->bufferLength = msg->bufferLength;
+    tmp_msg->bufferOffset = msg->bufferOffset;
+    msg->buffer = NULL;
+    msg->bufferLength = msg->bufferOffset = 0;
 
     virObjectLock(st);
-    need = msg->bufferLength - msg->bufferOffset;
-    if (need) {
-        size_t avail = st->incomingLength - st->incomingOffset;
-        if (need > avail) {
-            size_t extra = need - avail;
-            if (VIR_REALLOC_N(st->incoming,
-                              st->incomingLength + extra) < 0) {
-                VIR_DEBUG("Out of memory handling stream data");
-                goto cleanup;
-            }
-            st->incomingLength += extra;
-        }
 
-        memcpy(st->incoming + st->incomingOffset,
-               msg->buffer + msg->bufferOffset,
-               msg->bufferLength - msg->bufferOffset);
-        st->incomingOffset += (msg->bufferLength - msg->bufferOffset);
-    } else {
-        st->incomingEOF = true;
-    }
+    virNetMessageQueuePush(&st->rx, tmp_msg);
 
-    VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d",
-              st->incomingOffset, st->incomingLength,
-              st->incomingEOF);
     virNetClientStreamEventTimerUpdate(st);
 
-    ret = 0;
-
- cleanup:
     virObjectUnlock(st);
-    return ret;
+    return 0;
 }
 
 
@@ -362,10 +357,12 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
                                  bool nonblock)
 {
     int rv = -1;
+    size_t want;
+
     VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d",
               st, client, data, nbytes, nonblock);
     virObjectLock(st);
-    if (!st->incomingOffset && !st->incomingEOF) {
+    if (!st->rx && !st->incomingEOF) {
         virNetMessagePtr msg;
         int ret;
 
@@ -395,23 +392,28 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
             goto cleanup;
     }
 
-    VIR_DEBUG("After IO %zu", st->incomingOffset);
-    if (st->incomingOffset) {
-        int want = st->incomingOffset;
-        if (want > nbytes)
-            want = nbytes;
-        memcpy(data, st->incoming, want);
-        if (want < st->incomingOffset) {
-            memmove(st->incoming, st->incoming + want, st->incomingOffset - want);
-            st->incomingOffset -= want;
-        } else {
-            VIR_FREE(st->incoming);
-            st->incomingOffset = st->incomingLength = 0;
+    VIR_DEBUG("After IO rx=%p", st->rx);
+    want = nbytes;
+    while (want && st->rx) {
+        virNetMessagePtr msg = st->rx;
+        size_t len = want;
+
+        if (len > msg->bufferLength - msg->bufferOffset)
+            len = msg->bufferLength - msg->bufferOffset;
+
+        if (!len)
+            break;
+
+        memcpy(data + (nbytes - want), msg->buffer + msg->bufferOffset, len);
+        want -= len;
+        msg->bufferOffset += len;
+
+        if (msg->bufferOffset == msg->bufferLength) {
+            virNetMessageQueueServe(&st->rx);
+            virNetMessageFree(msg);
         }
-        rv = want;
-    } else {
-        rv = 0;
     }
+    rv = nbytes - want;
 
     virNetClientStreamEventTimerUpdate(st);
 
-- 
2.7.3

--
libvir-list mailing list
libvir-list@xxxxxxxxxx
https://www.redhat.com/mailman/listinfo/libvir-list



[Index of Archives]     [Virt Tools]     [Libvirt Users]     [Lib OS Info]     [Fedora Users]     [Fedora Desktop]     [Fedora SELinux]     [Big List of Linux Books]     [Yosemite News]     [KDE Users]     [Fedora Tools]