[PATCH 6/6 (v2)] Allow non-blocking message sending on virNetClient

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



From: "Daniel P. Berrange" <berrange@xxxxxxxxxx>

Split the existing virNetClientSend into two parts
virNetClientSend and virNetClientSendNoReply, instead
of having a 'bool expectReply' parameter.

Add a new virNetClientSendNonBlock which returns 2 on
full send, 1 on partial send, 0 on no send, -1 on error

If a partial send occurs, then a subsequent call to any
of the virNetClientSend* APIs will finish any outstanding
I/O.

TODO: the virNetClientEvent event handler could be used
to speed up completion of partial sends if an event loop
is present.

In v2:
   - Fix logic in virNetClientIOEventLoopRemoveNonBlocking

* src/rpc/virnetclientprogram.c, src/rpc/virnetclientstream.c:
  Update for changed API
---
 src/rpc/virnetclient.c |  200 +++++++++++++++++++++++++++++++++++++++++++-----
 src/rpc/virnetclient.h |    4 +
 src/rpc/virnetsocket.c |   13 +++
 src/rpc/virnetsocket.h |    1 +
 4 files changed, 200 insertions(+), 18 deletions(-)

diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c
index 96d1886..17105fe 100644
--- a/src/rpc/virnetclient.c
+++ b/src/rpc/virnetclient.c
@@ -55,6 +55,9 @@ struct _virNetClientCall {
 
     virNetMessagePtr msg;
     bool expectReply;
+    bool nonBlock;
+    bool haveThread;
+    bool sentSomeData;
 
     virCond cond;
 
@@ -86,7 +89,12 @@ struct _virNetClient {
     int wakeupSendFD;
     int wakeupReadFD;
 
-    /* List of threads currently waiting for dispatch */
+    /*
+     * List of calls currently waiting for dispatch
+     * The calls should all have threads waiting for
+     * them, except possibly the first call in the list
+     * which might be a partially sent non-blocking call.
+     */
     virNetClientCallPtr waitDispatch;
     /* True if a thread holds the buck */
     bool haveTheBuck;
@@ -648,7 +656,7 @@ virNetClientCallDispatchReply(virNetClientPtr client)
     virNetClientCallPtr thecall;
 
     /* Ok, definitely got an RPC reply now find
-       out who's been waiting for it */
+       out which waiting call is associated with it */
     thecall = client->waitDispatch;
     while (thecall &&
            !(thecall->msg->header.prog == client->msg.header.prog &&
@@ -824,6 +832,8 @@ virNetClientIOWriteMessage(virNetClientPtr client,
         ret = virNetSocketWrite(client->sock,
                                 thecall->msg->buffer + thecall->msg->bufferOffset,
                                 thecall->msg->bufferLength - thecall->msg->bufferOffset);
+        if (ret || virNetSocketHasPendingData(client->sock))
+            thecall->sentSomeData = true;
         if (ret <= 0)
             return ret;
 
@@ -1015,17 +1025,69 @@ static bool virNetClientIOEventLoopRemoveDone(virNetClientCallPtr call,
         return false;
 
     /*
-     * ...they won't actually wakeup until
+     * ...if the call being removed from the list
+     * still has a thread, then wake that thread up,
+     * otherwise free the call. The latter should
+     * only happen for calls without replies.
+     *
+     * ...the threads won't actually wakeup until
      * we release our mutex a short while
      * later...
      */
-    VIR_DEBUG("Waking up sleeping call %p", call);
-    virCondSignal(&call->cond);
+    if (call->haveThread) {
+        VIR_DEBUG("Waking up sleep %p", call);
+        virCondSignal(&call->cond);
+    } else {
+        if (call->expectReply)
+            VIR_WARN("Got a call expecting a reply but without a waiting thread");
+        ignore_value(virCondDestroy(&call->cond));
+        VIR_FREE(call);
+    }
 
     return true;
 }
 
 
+static bool virNetClientIOEventLoopRemoveNonBlocking(virNetClientCallPtr call,
+                                                     void *opaque)
+{
+    virNetClientCallPtr thiscall = opaque;
+
+    if (call == thiscall)
+        return false;
+
+    if (!call->nonBlock)
+        return false;
+
+    if (call->sentSomeData) {
+        /*
+         * If some data has been sent we must keep it in the list,
+         * but still wakeup any thread
+         */
+        if (call->haveThread) {
+            VIR_DEBUG("Waking up sleep %p", call);
+            virCondSignal(&call->cond);
+        }
+        return false;
+    } else {
+        /*
+         * If no data has been sent, we can remove it from the list.
+         * Wakup any thread, otherwise free the caller ourselves
+         */
+        if (call->haveThread) {
+            VIR_DEBUG("Waking up sleep %p", call);
+            virCondSignal(&call->cond);
+        } else {
+            if (call->expectReply)
+                VIR_WARN("Got a call expecting a reply but without a waiting thread");
+            ignore_value(virCondDestroy(&call->cond));
+            VIR_FREE(call);
+        }
+        return true;
+    }
+}
+
+
 static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetClientCallPtr thiscall)
 {
     VIR_DEBUG("Giving up the buck %p", thiscall);
@@ -1033,19 +1095,29 @@ static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetCli
     /* See if someone else is still waiting
      * and if so, then pass the buck ! */
     while (tmp) {
-        if (tmp != thiscall) {
+        if (tmp != thiscall && tmp->haveThread) {
             VIR_DEBUG("Passing the buck to %p", tmp);
             virCondSignal(&tmp->cond);
             break;
         }
         tmp = tmp->next;
     }
+    VIR_DEBUG("No thread to pass the buck to");
+}
+
+
+static bool virNetClientIOEventLoopWantNonBlock(virNetClientCallPtr call, void *opaque ATTRIBUTE_UNUSED)
+{
+    return call->nonBlock;
 }
 
 /*
  * Process all calls pending dispatch/receive until we
  * get a reply to our own call. Then quit and pass the buck
  * to someone else.
+ *
+ * Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true),
+ * 0 if nothing sent (only for nonBlock==true) and -1 on error
  */
 static int virNetClientIOEventLoop(virNetClientPtr client,
                                    virNetClientCallPtr thiscall)
@@ -1068,6 +1140,14 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
         if (virNetSocketHasCachedData(client->sock))
             timeout = 0;
 
+        /* If there are any non-blocking calls in the queue,
+         * then we don't want to sleep in poll()
+         */
+        if (virNetClientCallMatchPredicate(client->waitDispatch,
+                                           virNetClientIOEventLoopWantNonBlock,
+                                           NULL))
+            timeout = 0;
+
         fds[0].events = fds[0].revents = 0;
         fds[1].events = fds[1].revents = 0;
 
@@ -1116,8 +1196,9 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
         /* If we have existing SASL decoded data, pretend
          * the socket became readable so we consume it
          */
-        if (virNetSocketHasCachedData(client->sock))
+        if (virNetSocketHasCachedData(client->sock)) {
             fds[0].revents |= POLLIN;
+        }
 
         if (fds[1].revents) {
             VIR_DEBUG("Woken up from poll by other thread");
@@ -1129,6 +1210,7 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
         }
 
         if (ret < 0) {
+            /* XXX what's this dubious errno check doing ? */
             if (errno == EWOULDBLOCK)
                 continue;
             virReportSystemError(errno,
@@ -1146,20 +1228,33 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
                 goto error;
         }
 
-        /* Iterate through waiting threads and if
-         * any are complete then tell 'em to wakeup
+        /* Iterate through waiting calls and if any are
+         * complete, remove them from the dispatch list..
          */
         virNetClientCallRemovePredicate(&client->waitDispatch,
                                         virNetClientIOEventLoopRemoveDone,
                                         thiscall);
 
+        /* Iterate through waiting calls and if any are
+         * non-blocking, remove them from the dispatch list...
+         */
+        virNetClientCallRemovePredicate(&client->waitDispatch,
+                                        virNetClientIOEventLoopRemoveNonBlocking,
+                                        thiscall);
+
         /* Now see if *we* are done */
         if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) {
             virNetClientCallRemove(&client->waitDispatch, thiscall);
             virNetClientIOEventLoopPassTheBuck(client, thiscall);
-            return 0;
+            return 2;
         }
 
+        /* We're not done, but we're non-blocking */
+        if (thiscall->nonBlock) {
+            VIR_DEBUG("Giving up the buck %p %p", thiscall, client->waitDispatch);
+            virNetClientIOEventLoopPassTheBuck(client, thiscall);
+            return thiscall->sentSomeData ? 1 : 0;
+        }
 
         if (fds[0].revents & (POLLHUP | POLLERR)) {
             virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
@@ -1218,7 +1313,31 @@ static void virNetClientIOUpdateCallback(virNetClientPtr client,
  *    a strategy in power politics when the actions of one country/
  *    nation are blamed on another, providing an opportunity for war."
  *
- * NB(5) Don't Panic!
+ * NB(5) If the 'thiscall' has the 'nonBlock' flag set, the caller
+ * must *NOT* free it, if this returns '1' (ie partial send).
+ *
+ * NB(6) The following input states are valid if *no* threads
+ *       are currently executing this method
+ *
+ *   - waitDispatch == NULL,
+ *   - waitDispatch != NULL, waitDispatch.nonBlock == true
+ *
+ * The following input states are valid, if n threads are currently
+ * executing
+ *
+ *   - waitDispatch != NULL
+ *   - 0 or 1  waitDispatch.nonBlock == false, without any threads
+ *   - 0 or more waitDispatch.nonBlock == false, with threads
+ *
+ * The following output states are valid when all threads are done
+ *
+ *   - waitDispatch == NULL,
+ *   - waitDispatch != NULL, waitDispatch.nonBlock == true
+ *
+ * NB(7) Don't Panic!
+ *
+ * Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true),
+ * 0 if nothing sent (only for nonBlock==true) and -1 on error
  */
 static int virNetClientIO(virNetClientPtr client,
                           virNetClientCallPtr thiscall)
@@ -1259,14 +1378,15 @@ static int virNetClientIO(virNetClientPtr client,
         }
 
         VIR_DEBUG("Wokeup from sleep %p %p", client->waitDispatch, thiscall);
-        /* Two reasons we can be woken up
+        /* Three reasons we can be woken up
          *  1. Other thread has got our reply ready for us
          *  2. Other thread is all done, and it is our turn to
          *     be the dispatcher to finish waiting for
          *     our reply
+         *  3. I/O was expected to block
          */
         if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) {
-            rv = 0;
+            rv = 2;
             /*
              * We avoided catching the buck and our reply is ready !
              * We've already had 'thiscall' removed from the list
@@ -1275,6 +1395,15 @@ static int virNetClientIO(virNetClientPtr client,
             goto cleanup;
         }
 
+        /* If we're non-blocking, get outta here */
+        if (thiscall->nonBlock) {
+            if (thiscall->sentSomeData)
+                rv = 1; /* In progress */
+            else
+                rv = 0; /* none at all */
+            goto cleanup;
+        }
+
         /* Grr, someone passed the buck onto us ... */
     }
 
@@ -1348,9 +1477,14 @@ done:
 }
 
 
+/*
+ * Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true),
+ * 0 if nothing sent (only for nonBlock==true) and -1 on error
+ */
 static int virNetClientSendInternal(virNetClientPtr client,
                                     virNetMessagePtr msg,
-                                    bool expectReply)
+                                    bool expectReply,
+                                    bool nonBlock)
 {
     virNetClientCallPtr call;
     int ret = -1;
@@ -1369,6 +1503,12 @@ static int virNetClientSendInternal(virNetClientPtr client,
         return -1;
     }
 
+    if (expectReply && nonBlock) {
+        virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
+                    _("Attempt to send an non-blocking message with a synchronous reply"));
+        return -1;
+    }
+
     if (VIR_ALLOC(call) < 0) {
         virReportOOMError();
         return -1;
@@ -1389,16 +1529,24 @@ static int virNetClientSendInternal(virNetClientPtr client,
         call->mode = VIR_NET_CLIENT_MODE_WAIT_RX;
     call->msg = msg;
     call->expectReply = expectReply;
+    call->nonBlock = nonBlock;
+    call->haveThread = true;
 
     ret = virNetClientIO(client, call);
 
 cleanup:
-    ignore_value(virCondDestroy(&call->cond));
-    VIR_FREE(call);
+    /* If partially sent, then the call is still on the dispatch queue */
+    if (ret == 1) {
+        call->haveThread = false;
+    } else {
+        ignore_value(virCondDestroy(&call->cond));
+        VIR_FREE(call);
+    }
     virNetClientUnlock(client);
     return ret;
 }
 
+
 /*
  * @msg: a message allocated on heap or stack
  *
@@ -1412,7 +1560,7 @@ cleanup:
 int virNetClientSendWithReply(virNetClientPtr client,
                               virNetMessagePtr msg)
 {
-    int ret = virNetClientSendInternal(client, msg, true);
+    int ret = virNetClientSendInternal(client, msg, true, false);
     if (ret < 0)
         return -1;
     return 0;
@@ -1432,8 +1580,24 @@ int virNetClientSendWithReply(virNetClientPtr client,
 int virNetClientSendNoReply(virNetClientPtr client,
                             virNetMessagePtr msg)
 {
-    int ret = virNetClientSendInternal(client, msg, false);
+    int ret = virNetClientSendInternal(client, msg, false, false);
     if (ret < 0)
         return -1;
     return 0;
 }
+
+/*
+ * @msg: a message allocated on the heap.
+ *
+ * Send a message asynchronously, without any reply
+ *
+ * The caller is responsible for free'ing @msg, *except* if
+ * this method returns -1.
+ *
+ * Returns 2 on full send, 1 on partial send, 0 on no send, -1 on error
+ */
+int virNetClientSendNonBlock(virNetClientPtr client,
+                             virNetMessagePtr msg)
+{
+    return virNetClientSendInternal(client, msg, false, true);
+}
diff --git a/src/rpc/virnetclient.h b/src/rpc/virnetclient.h
index eef3eb3..71db543 100644
--- a/src/rpc/virnetclient.h
+++ b/src/rpc/virnetclient.h
@@ -73,6 +73,10 @@ int virNetClientSendWithReply(virNetClientPtr client,
 int virNetClientSendNoReply(virNetClientPtr client,
                             virNetMessagePtr msg);
 
+int virNetClientSendNonBlock(virNetClientPtr client,
+                             virNetMessagePtr msg);
+
+
 # ifdef HAVE_SASL
 void virNetClientSetSASLSession(virNetClientPtr client,
                                 virNetSASLSessionPtr sasl);
diff --git a/src/rpc/virnetsocket.c b/src/rpc/virnetsocket.c
index 30b8fe6..2449353 100644
--- a/src/rpc/virnetsocket.c
+++ b/src/rpc/virnetsocket.c
@@ -931,6 +931,19 @@ bool virNetSocketHasCachedData(virNetSocketPtr sock ATTRIBUTE_UNUSED)
 }
 
 
+bool virNetSocketHasPendingData(virNetSocketPtr sock ATTRIBUTE_UNUSED)
+{
+    bool hasPending = false;
+    virMutexLock(&sock->lock);
+#if HAVE_SASL
+    if (sock->saslEncoded)
+        hasPending = true;
+#endif
+    virMutexUnlock(&sock->lock);
+    return hasPending;
+}
+
+
 static ssize_t virNetSocketReadWire(virNetSocketPtr sock, char *buf, size_t len)
 {
     char *errout = NULL;
diff --git a/src/rpc/virnetsocket.h b/src/rpc/virnetsocket.h
index e444aef..508dd47 100644
--- a/src/rpc/virnetsocket.h
+++ b/src/rpc/virnetsocket.h
@@ -106,6 +106,7 @@ void virNetSocketSetSASLSession(virNetSocketPtr sock,
                                 virNetSASLSessionPtr sess);
 # endif
 bool virNetSocketHasCachedData(virNetSocketPtr sock);
+bool virNetSocketHasPendingData(virNetSocketPtr sock);
 void virNetSocketRef(virNetSocketPtr sock);
 void virNetSocketFree(virNetSocketPtr sock);
 
-- 
1.7.6.4

--
libvir-list mailing list
libvir-list@xxxxxxxxxx
https://www.redhat.com/mailman/listinfo/libvir-list


[Index of Archives]     [Virt Tools]     [Libvirt Users]     [Lib OS Info]     [Fedora Users]     [Fedora Desktop]     [Fedora SELinux]     [Big List of Linux Books]     [Yosemite News]     [KDE Users]     [Fedora Tools]