[PATCH v4 06/13] Add support for non-blocking calls in client RPC

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



When a client wants to send a keepalive message it needs to do so in a
non-blocking way to avoid blocking its event loop. This patch adds
dontBlock flag which says that the call should be processed without
blocking. Such calls do not have a thread waiting for the result
associated with them. This means, that sending such call fails if no
thread is dispatching and writing to the socket would block. In case
there is a thread waiting for its (normal) call to finish, sending
non-blocking call just pushes it into the queue and lets the dispatching
thread send it. The thread which has the buck tries to send all
non-blocking calls in the queue in a best effort way---if sending them
would block or there's an error on the socket, non-blocking calls are
simply removed from the queue and discarded.  In case a non-blocking
call is partially sent but sending the rest of it would block, it is
moved into client's unfinishedCall and left for future delivery.  Every
sending attempt first sends the rest of unfinishedCall and than
continues with other queued calls.
---
Notes:
    Version 4:
    - correctly handle partially sent non-blocking calls that would block
    
    Version 3:
    - no changes
    
    Version 2:
    - no changes

 src/rpc/virnetclient.c |  261 ++++++++++++++++++++++++++++++++++++++----------
 1 files changed, 210 insertions(+), 51 deletions(-)

diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c
index 085dc8d..58ba66d 100644
--- a/src/rpc/virnetclient.c
+++ b/src/rpc/virnetclient.c
@@ -55,6 +55,7 @@ struct _virNetClientCall {
 
     virNetMessagePtr msg;
     bool expectReply;
+    bool dontBlock;
 
     virCond cond;
 
@@ -86,6 +87,9 @@ struct _virNetClient {
     int wakeupSendFD;
     int wakeupReadFD;
 
+    /* Unfinished call that needs to be finished before any of the calls in
+     * the queue can be processed */
+    virNetClientCallPtr unfinishedCall;
     /* List of threads currently waiting for dispatch */
     virNetClientCallPtr waitDispatch;
 
@@ -94,6 +98,11 @@ struct _virNetClient {
 };
 
 
+static int virNetClientSendInternal(virNetClientPtr client,
+                                    virNetMessagePtr msg,
+                                    bool expectReply,
+                                    bool dontBlock);
+
 static void virNetClientLock(virNetClientPtr client)
 {
     virMutexLock(&client->lock);
@@ -743,26 +752,42 @@ static ssize_t
 virNetClientIOHandleOutput(virNetClientPtr client)
 {
     virNetClientCallPtr thecall = client->waitDispatch;
+    ssize_t ret = -1;
 
     while (thecall &&
            thecall->mode != VIR_NET_CLIENT_MODE_WAIT_TX)
         thecall = thecall->next;
 
+    /* If there is an unfinished non-blocking call, process it first */
+    if (client->unfinishedCall) {
+        client->unfinishedCall->next = thecall;
+        thecall = client->unfinishedCall;
+    }
+
     if (!thecall)
-        return -1; /* Shouldn't happen, but you never know... */
+        goto cleanup; /* Shouldn't happen, but you never know... */
 
     while (thecall) {
-        ssize_t ret = virNetClientIOWriteMessage(client, thecall);
+        ret = virNetClientIOWriteMessage(client, thecall);
         if (ret < 0)
-            return ret;
+            goto cleanup;
 
-        if (thecall->mode == VIR_NET_CLIENT_MODE_WAIT_TX)
-            return 0; /* Blocking write, to back to event loop */
+        if (thecall->mode == VIR_NET_CLIENT_MODE_WAIT_TX) {
+            /* Blocking write, go back to event loop */
+            ret = 0;
+            goto cleanup;
+        }
 
         thecall = thecall->next;
     }
 
-    return 0; /* No more calls to send, all done */
+    ret = 0; /* No more calls to send, all done */
+
+cleanup:
+    if (client->unfinishedCall)
+        client->unfinishedCall->next = NULL;
+
+    return ret;
 }
 
 static ssize_t
@@ -845,6 +870,91 @@ virNetClientIOHandleInput(virNetClientPtr client)
 }
 
 
+static void
+virNetClientDiscardNonBlocking(virNetClientPtr client,
+                               virNetClientCallPtr thiscall,
+                               bool error)
+{
+    virNetClientCallPtr call = client->waitDispatch;
+    virNetClientCallPtr prev = NULL;
+
+    if (client->unfinishedCall) {
+        client->unfinishedCall->next = call;
+        call = client->unfinishedCall;
+    }
+
+    while (call) {
+        virNetClientCallPtr next = call->next;
+
+        if (!call->dontBlock) {
+            prev = call;
+            goto skip;
+        }
+
+        /* We can't remove nonblocking call which was already partially sent
+         * to the remote party (unless there was an error in which case we
+         * won't be able to send anything anymore anyway); we store it in
+         * unfinishedCall and when someone needs to send something in the
+         * future, it will first send the rest of the unfinishedCall.
+         */
+        if (!error &&
+            call->mode != VIR_NET_CLIENT_MODE_COMPLETE &&
+            call->msg->bufferOffset > 0) {
+            VIR_DEBUG("Can't finish nonblocking call %p without blocking",
+                      call);
+            if (call == client->unfinishedCall)
+                goto skip;
+
+            client->unfinishedCall = call;
+            goto next;
+        }
+
+        /* We should never free thiscall since it will be freed by the caller.
+         * We shouldn't remove thiscall from the queue either since that is
+         * handled elsewhere.
+         */
+        if (call == thiscall) {
+            prev = call;
+            goto skip;
+        }
+
+        /* Remove and free completed calls or calls that we didn't even get to
+         * without blocking or error.
+         */
+        if (call->mode != VIR_NET_CLIENT_MODE_COMPLETE) {
+            const char *action;
+            if (call->msg->bufferOffset > 0)
+                action = "finish";
+            else
+                action = "send";
+
+            VIR_DEBUG("Can't %s nonblocking call %p without %s",
+                      action, call, error ? "error" : "blocking");
+        }
+
+        if (call == client->unfinishedCall) {
+            client->unfinishedCall = NULL;
+            virNetMessageFree(call->msg);
+            VIR_FREE(call);
+            goto skip;
+        }
+
+        virNetMessageFree(call->msg);
+        VIR_FREE(call);
+
+next:
+        if (prev)
+            prev->next = next;
+        else
+            client->waitDispatch = next;
+skip:
+        call = next;
+    }
+
+    if (client->unfinishedCall)
+        client->unfinishedCall->next = NULL;
+}
+
 /*
  * Process all calls pending dispatch/receive until we
  * get a reply to our own call. Then quit and pass the buck
@@ -854,7 +964,9 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
                                    virNetClientCallPtr thiscall)
 {
     struct pollfd fds[2];
-    int ret;
+    int pollret;
+    bool error;
+    int ret = -1;
 
     fds[0].fd = virNetSocketGetFD(client->sock);
     fds[1].fd = client->wakeupReadFD;
@@ -877,11 +989,19 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
         fds[1].events = fds[1].revents = 0;
 
         fds[1].events = POLLIN;
+        if (client->unfinishedCall)
+            fds[0].events = POLLOUT;
+
         while (tmp) {
             if (tmp->mode == VIR_NET_CLIENT_MODE_WAIT_RX)
                 fds[0].events |= POLLIN;
             if (tmp->mode == VIR_NET_CLIENT_MODE_WAIT_TX)
                 fds[0].events |= POLLOUT;
+            /* We don't want to sleep in poll if any of the calls is
+             * non-blocking
+             */
+            if (tmp->dontBlock)
+                timeout = 0;
 
             tmp = tmp->next;
         }
@@ -913,8 +1033,8 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
         ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask));
 
     repoll:
-        ret = poll(fds, ARRAY_CARDINALITY(fds), timeout);
-        if (ret < 0 && errno == EAGAIN)
+        pollret = poll(fds, ARRAY_CARDINALITY(fds), timeout);
+        if (pollret < 0 && errno == EAGAIN)
             goto repoll;
 
         ignore_value(pthread_sigmask(SIG_SETMASK, &oldmask, NULL));
@@ -936,7 +1056,7 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
             }
         }
 
-        if (ret < 0) {
+        if (pollret < 0) {
             if (errno == EWOULDBLOCK)
                 continue;
             virReportSystemError(errno,
@@ -954,8 +1074,14 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
                 goto error;
         }
 
-        /* Iterate through waiting threads and if
-         * any are complete then tell 'em to wakeup
+        /* All calls in the queue have been sent or sending would block, remove
+         * nonblocking calls since we did all we could for them.
+         */
+        error = !!(fds[0].revents & (POLLHUP | POLLERR));
+        virNetClientDiscardNonBlocking(client, thiscall, error);
+
+        /* Iterate through waiting calls and if any are complete, tell
+         * their threads to wake up.
          */
         tmp = client->waitDispatch;
         prev = NULL;
@@ -983,39 +1109,39 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
 
         /* Now see if *we* are done */
         if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) {
-            /* We're at head of the list already, so
-             * remove us
-             */
-            client->waitDispatch = thiscall->next;
-            VIR_DEBUG("Giving up the buck %p %p", thiscall, client->waitDispatch);
-            /* See if someone else is still waiting
-             * and if so, then pass the buck ! */
-            if (client->waitDispatch) {
-                VIR_DEBUG("Passing the buck to %p", client->waitDispatch);
-                virCondSignal(&client->waitDispatch->cond);
-            }
-            return 0;
+            VIR_DEBUG("Giving up the buck %p %p", thiscall, thiscall->next);
+            ret = 0;
+            goto pass;
         }
 
-
         if (fds[0].revents & (POLLHUP | POLLERR)) {
             virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
                         _("received hangup / error event on socket"));
             goto error;
         }
-    }
 
+        if (thiscall->dontBlock && thiscall != client->unfinishedCall) {
+            virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
+                        _("Can't send nonblocking call without blocking"));
+            VIR_DEBUG("Giving up the buck %p %p", thiscall, thiscall->next);
+            goto pass;
+        }
+    }
 
 error:
+    virNetClientDiscardNonBlocking(client, thiscall, true);
+    VIR_DEBUG("Giving up the buck due to I/O error %p %p",
+              thiscall, thiscall->next);
+
+pass:
     client->waitDispatch = thiscall->next;
-    VIR_DEBUG("Giving up the buck due to I/O error %p %p", thiscall, client->waitDispatch);
     /* See if someone else is still waiting
      * and if so, then pass the buck ! */
     if (client->waitDispatch) {
         VIR_DEBUG("Passing the buck to %p", client->waitDispatch);
         virCondSignal(&client->waitDispatch->cond);
     }
-    return -1;
+    return ret;
 }
 
 
@@ -1057,38 +1183,41 @@ static int virNetClientIO(virNetClientPtr client,
 {
     int rv = -1;
 
-    VIR_DEBUG("Outgoing message prog=%u version=%u serial=%u proc=%d type=%d length=%zu dispatch=%p",
+    VIR_DEBUG("Outgoing message prog=%u version=%u serial=%u proc=%d type=%d"
+              " length=%zu dontBlock=%d dispatch=%p",
               thiscall->msg->header.prog,
               thiscall->msg->header.vers,
               thiscall->msg->header.serial,
               thiscall->msg->header.proc,
               thiscall->msg->header.type,
               thiscall->msg->bufferLength,
+              thiscall->dontBlock,
               client->waitDispatch);
 
     /* Check to see if another thread is dispatching */
     if (client->waitDispatch) {
-        /* Stick ourselves on the end of the wait queue */
-        virNetClientCallPtr tmp = client->waitDispatch;
+        virNetClientCallPtr tmp;
         char ignore = 1;
-        while (tmp && tmp->next)
-            tmp = tmp->next;
-        if (tmp)
-            tmp->next = thiscall;
-        else
-            client->waitDispatch = thiscall;
 
         /* Force other thread to wakeup from poll */
         if (safewrite(client->wakeupSendFD, &ignore, sizeof(ignore)) != sizeof(ignore)) {
-            if (tmp)
-                tmp->next = NULL;
-            else
-                client->waitDispatch = NULL;
             virReportSystemError(errno, "%s",
                                  _("failed to wake up polling thread"));
             return -1;
         }
 
+        /* Stick ourselves on the end of the wait queue */
+        tmp = client->waitDispatch;
+        while (tmp->next)
+            tmp = tmp->next;
+        tmp->next = thiscall;
+
+        if (thiscall->dontBlock) {
+            VIR_DEBUG("Sending non-blocking call while another thread is"
+                      " dispatching; it will send the call for us");
+            return 0;
+        }
+
         VIR_DEBUG("Going to sleep %p %p", client->waitDispatch, thiscall);
         /* Go to sleep while other thread is working... */
         if (virCondWait(&thiscall->cond, &client->lock) < 0) {
@@ -1108,7 +1237,7 @@ static int virNetClientIO(virNetClientPtr client,
             return -1;
         }
 
-        VIR_DEBUG("Wokeup from sleep %p %p", client->waitDispatch, thiscall);
+        VIR_DEBUG("Woken up from sleep %p %p", client->waitDispatch, thiscall);
         /* Two reasons we can be woken up
          *  1. Other thread has got our reply ready for us
          *  2. Other thread is all done, and it is our turn to
@@ -1198,9 +1327,11 @@ done:
 }
 
 
-int virNetClientSend(virNetClientPtr client,
-                     virNetMessagePtr msg,
-                     bool expectReply)
+static int
+virNetClientSendInternal(virNetClientPtr client,
+                         virNetMessagePtr msg,
+                         bool expectReply,
+                         bool dontBlock)
 {
     virNetClientCallPtr call;
     int ret = -1;
@@ -1213,7 +1344,7 @@ int virNetClientSend(virNetClientPtr client,
 
     if (expectReply &&
         (msg->bufferLength != 0) &&
-        (msg->header.status == VIR_NET_CONTINUE)) {
+        (msg->header.status == VIR_NET_CONTINUE || dontBlock)) {
         virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
                     _("Attempt to send an asynchronous message with a synchronous reply"));
         return -1;
@@ -1226,10 +1357,15 @@ int virNetClientSend(virNetClientPtr client,
 
     virNetClientLock(client);
 
-    if (virCondInit(&call->cond) < 0) {
-        virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
-                    _("cannot initialize condition variable"));
-        goto cleanup;
+    /* We don't need call->cond for non-blocking calls since there's no
+     * thread to be woken up anyway
+     */
+    if (!dontBlock) {
+        if (virCondInit(&call->cond) < 0) {
+            virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
+                        _("cannot initialize condition variable"));
+            goto cleanup;
+        }
     }
 
     if (msg->bufferLength)
@@ -1238,12 +1374,35 @@ int virNetClientSend(virNetClientPtr client,
         call->mode = VIR_NET_CLIENT_MODE_WAIT_RX;
     call->msg = msg;
     call->expectReply = expectReply;
+    call->dontBlock = dontBlock;
 
     ret = virNetClientIO(client, call);
 
 cleanup:
     ignore_value(virCondDestroy(&call->cond));
-    VIR_FREE(call);
+    if (ret != 0) {
+        VIR_FREE(call);
+    } else if (dontBlock) {
+        /* Only free the call if it was completed since otherwise it was just
+         * queued up and will be processed later.
+         */
+        if (call->mode == VIR_NET_CLIENT_MODE_COMPLETE) {
+            /* We need to free the message as well since no-one is waiting for
+             * it.
+             */
+            virNetMessageFree(msg);
+            VIR_FREE(call);
+        }
+    } else {
+        VIR_FREE(call);
+    }
     virNetClientUnlock(client);
     return ret;
 }
+
+int virNetClientSend(virNetClientPtr client,
+                     virNetMessagePtr msg,
+                     bool expectReply)
+{
+    return virNetClientSendInternal(client, msg, expectReply, false);
+}
-- 
1.7.7.1

--
libvir-list mailing list
libvir-list@xxxxxxxxxx
https://www.redhat.com/mailman/listinfo/libvir-list


[Index of Archives]     [Virt Tools]     [Libvirt Users]     [Lib OS Info]     [Fedora Users]     [Fedora Desktop]     [Fedora SELinux]     [Big List of Linux Books]     [Yosemite News]     [KDE Users]     [Fedora Tools]