[PATCH 03/12] client rpc: Don't drop non-blocking calls

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



So far, we were dropping non-blocking calls whenever sending them would
block. In case a client is sending lots of stream calls (which are not
supposed to generate any reply), the assumption that having other calls
in a queue is sufficient to get a reply from the server doesn't work. I
tried to fix this in b1e374a7ac56927cfe62435179bf0bba1e08b372 but
failed and reverted that commit.

With this patch, non-blocking calls are never dropped (unless the
connection is being closed) and will always be sent.
---
 src/rpc/virnetclient.c |  164 +++++++++++++++++++++---------------------------
 1 file changed, 71 insertions(+), 93 deletions(-)

diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c
index 3e661d2..614b469 100644
--- a/src/rpc/virnetclient.c
+++ b/src/rpc/virnetclient.c
@@ -58,7 +58,6 @@ struct _virNetClientCall {
     bool expectReply;
     bool nonBlock;
     bool haveThread;
-    bool sentSomeData;
 
     virCond cond;
 
@@ -108,6 +107,10 @@ struct _virNetClient {
 };
 
 
+static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client,
+                                               virNetClientCallPtr thiscall);
+
+
 static void virNetClientLock(virNetClientPtr client)
 {
     virMutexLock(&client->lock);
@@ -525,19 +528,21 @@ void virNetClientClose(virNetClientPtr client)
 
     virNetClientLock(client);
 
-    /* If there is a thread polling for data on the socket, set wantClose flag
-     * and wake the thread up or just immediately close the socket when no-one
-     * is polling on it.
+    client->wantClose = true;
+
+    /* If there is a thread polling for data on the socket, wake the thread up
+     * otherwise try to pass the buck to a possibly waiting thread. If no
+     * thread is waiting, virNetClientIOEventLoopPassTheBuck will clean the
+     * queue and close the client because we set client->wantClose.
      */
-    if (client->waitDispatch) {
+    if (client->haveTheBuck) {
         char ignore = 1;
         size_t len = sizeof(ignore);
 
-        client->wantClose = true;
         if (safewrite(client->wakeupSendFD, &ignore, len) != len)
             VIR_ERROR(_("failed to wake up polling thread"));
     } else {
-        virNetClientCloseLocked(client);
+        virNetClientIOEventLoopPassTheBuck(client, NULL);
     }
 
     virNetClientUnlock(client);
@@ -972,8 +977,6 @@ virNetClientIOWriteMessage(virNetClientPtr client,
         ret = virNetSocketWrite(client->sock,
                                 thecall->msg->buffer + thecall->msg->bufferOffset,
                                 thecall->msg->bufferLength - thecall->msg->bufferOffset);
-        if (ret > 0 || virNetSocketHasPendingData(client->sock))
-            thecall->sentSomeData = true;
         if (ret <= 0)
             return ret;
 
@@ -1197,71 +1200,43 @@ static bool virNetClientIOEventLoopRemoveDone(virNetClientCallPtr call,
 }
 
 
-static bool virNetClientIOEventLoopRemoveNonBlocking(virNetClientCallPtr call,
-                                                     void *opaque)
+static bool
+virNetClientIOEventLoopDetachNonBlocking(virNetClientCallPtr call,
+                                         void *opaque)
 {
     virNetClientCallPtr thiscall = opaque;
 
-    if (call == thiscall)
-        return false;
-
-    if (!call->nonBlock)
-        return false;
-
-    if (call->sentSomeData) {
-        /*
-         * If some data has been sent we must keep it in the list,
-         * but still wakeup any thread
-         */
-        if (call->haveThread) {
-            VIR_DEBUG("Waking up sleep %p", call);
-            virCondSignal(&call->cond);
-        } else {
-            VIR_DEBUG("Keeping unfinished call %p in the list", call);
-        }
-        return false;
-    } else {
-        /*
-         * If no data has been sent, we can remove it from the list.
-         * Wakup any thread, otherwise free the caller ourselves
-         */
-        if (call->haveThread) {
-            VIR_DEBUG("Waking up sleep %p", call);
-            virCondSignal(&call->cond);
-        } else {
-            VIR_DEBUG("Removing call %p", call);
-            if (call->expectReply)
-                VIR_WARN("Got a call expecting a reply but without a waiting thread");
-            ignore_value(virCondDestroy(&call->cond));
-            VIR_FREE(call->msg);
-            VIR_FREE(call);
-        }
+    if (call != thiscall && call->nonBlock && call->haveThread) {
+        VIR_DEBUG("Waking up sleep %p", call);
+        call->haveThread = false;
+        virCondSignal(&call->cond);
         return true;
     }
+
+    return false;
 }
 
 
-static void
-virNetClientIOEventLoopRemoveAll(virNetClientPtr client,
-                                 virNetClientCallPtr thiscall)
+static bool
+virNetClientIOEventLoopRemoveAll(virNetClientCallPtr call,
+                                 void *opaque)
 {
-    if (!client->waitDispatch)
-        return;
+    virNetClientCallPtr thiscall = opaque;
 
-    if (client->waitDispatch == thiscall) {
-        /* just pretend nothing was sent and the caller will free the call */
-        thiscall->sentSomeData = false;
-    } else {
-        virNetClientCallPtr call = client->waitDispatch;
-        virNetClientCallRemove(&client->waitDispatch, call);
-        ignore_value(virCondDestroy(&call->cond));
-        VIR_FREE(call->msg);
-        VIR_FREE(call);
-    }
+    if (call == thiscall)
+        return false;
+
+    VIR_DEBUG("Removing call %p", call);
+    ignore_value(virCondDestroy(&call->cond));
+    VIR_FREE(call->msg);
+    VIR_FREE(call);
+    return true;
 }
 
 
-static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetClientCallPtr thiscall)
+static void
+virNetClientIOEventLoopPassTheBuck(virNetClientPtr client,
+                                   virNetClientCallPtr thiscall)
 {
     VIR_DEBUG("Giving up the buck %p", thiscall);
     virNetClientCallPtr tmp = client->waitDispatch;
@@ -1280,14 +1255,18 @@ static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetCli
     VIR_DEBUG("No thread to pass the buck to");
     if (client->wantClose) {
         virNetClientCloseLocked(client);
-        virNetClientIOEventLoopRemoveAll(client, thiscall);
+        virNetClientCallRemovePredicate(&client->waitDispatch,
+                                        virNetClientIOEventLoopRemoveAll,
+                                        thiscall);
     }
 }
 
 
-static bool virNetClientIOEventLoopWantNonBlock(virNetClientCallPtr call, void *opaque ATTRIBUTE_UNUSED)
+static bool
+virNetClientIOEventLoopWantNonBlock(virNetClientCallPtr call,
+                                    void *opaque ATTRIBUTE_UNUSED)
 {
-    return call->nonBlock;
+    return call->nonBlock && call->haveThread;
 }
 
 /*
@@ -1320,8 +1299,8 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
         if (virNetSocketHasCachedData(client->sock) || client->wantClose)
             timeout = 0;
 
-        /* If there are any non-blocking calls in the queue,
-         * then we don't want to sleep in poll()
+        /* If there are any non-blocking calls with an associated thread
+         * in the queue, then we don't want to sleep in poll()
          */
         if (virNetClientCallMatchPredicate(client->waitDispatch,
                                            virNetClientIOEventLoopWantNonBlock,
@@ -1394,12 +1373,15 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
             }
 
             /* If we were woken up because a new non-blocking call was queued,
-             * we need to re-poll to check if we can send it.
+             * we need to re-poll to check if we can send it. To be precise, we
+             * will re-poll even if a blocking call arrived when unhandled
+             * non-blocking calls are still in the queue. But this can't hurt.
              */
             if (virNetClientCallMatchPredicate(client->waitDispatch,
                                                virNetClientIOEventLoopWantNonBlock,
                                                NULL)) {
-                VIR_DEBUG("New non-blocking call arrived; repolling");
+                VIR_DEBUG("The queue contains new non-blocking call(s);"
+                          " repolling");
                 continue;
             }
         }
@@ -1424,18 +1406,18 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
         }
 
         /* Iterate through waiting calls and if any are
-         * complete, remove them from the dispatch list..
+         * complete, remove them from the dispatch list.
          */
         virNetClientCallRemovePredicate(&client->waitDispatch,
                                         virNetClientIOEventLoopRemoveDone,
                                         thiscall);
 
-        /* Iterate through waiting calls and if any are
-         * non-blocking, remove them from the dispatch list...
+        /* Iterate through waiting calls and wake up and detach threads
+         * attached to non-blocking calls.
          */
-        virNetClientCallRemovePredicate(&client->waitDispatch,
-                                        virNetClientIOEventLoopRemoveNonBlocking,
-                                        thiscall);
+        virNetClientCallMatchPredicate(client->waitDispatch,
+                                       virNetClientIOEventLoopDetachNonBlocking,
+                                       thiscall);
 
         /* Now see if *we* are done */
         if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) {
@@ -1444,15 +1426,11 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
             return 2;
         }
 
-        /* We're not done, but we're non-blocking */
+        /* We're not done, but we're non-blocking; keep the call queued */
         if (thiscall->nonBlock) {
+            thiscall->haveThread = false;
             virNetClientIOEventLoopPassTheBuck(client, thiscall);
-            if (thiscall->sentSomeData) {
-                return 1;
-            } else {
-                virNetClientCallRemove(&client->waitDispatch, thiscall);
-                return 0;
-            }
+            return 1;
         }
 
         if (fds[0].revents & (POLLHUP | POLLERR)) {
@@ -1462,7 +1440,6 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
         }
     }
 
-
 error:
     virNetClientCallRemove(&client->waitDispatch, thiscall);
     virNetClientIOEventLoopPassTheBuck(client, thiscall);
@@ -1614,9 +1591,11 @@ static int virNetClientIO(virNetClientPtr client,
             goto cleanup;
         }
 
-        /* If we're non-blocking, get outta here */
+        /* If we're non-blocking, we were either queued (and detached) or the
+         * call was not sent because of an error.
+         */
         if (thiscall->nonBlock) {
-            if (thiscall->sentSomeData)
+            if (!thiscall->haveThread)
                 rv = 1; /* In progress */
             else
                 rv = 0; /* none at all */
@@ -1708,7 +1687,7 @@ done:
 
 
 /*
- * Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true),
+ * Returns 2 if fully sent, 1 if queued (only for nonBlock==true),
  * 0 if nothing sent (only for nonBlock==true) and -1 on error
  */
 static int virNetClientSendInternal(virNetClientPtr client,
@@ -1768,16 +1747,15 @@ static int virNetClientSendInternal(virNetClientPtr client,
 
     ret = virNetClientIO(client, call);
 
-    /* If partially sent, then the call is still on the dispatch queue */
-    if (ret == 1) {
-        call->haveThread = false;
-    } else {
-        ignore_value(virCondDestroy(&call->cond));
-    }
+    /* If queued, the call will be finished and freed later by another thread;
+     * we're done. */
+    if (ret == 1)
+        return 1;
+
+    ignore_value(virCondDestroy(&call->cond));
 
 cleanup:
-    if (ret != 1)
-        VIR_FREE(call);
+    VIR_FREE(call);
     return ret;
 }
 
-- 
1.7.10.2

--
libvir-list mailing list
libvir-list@xxxxxxxxxx
https://www.redhat.com/mailman/listinfo/libvir-list


[Index of Archives]     [Virt Tools]     [Libvirt Users]     [Lib OS Info]     [Fedora Users]     [Fedora Desktop]     [Fedora SELinux]     [Big List of Linux Books]     [Yosemite News]     [KDE Users]     [Fedora Tools]