Re: [libvirt] PATCH: 17/25: Concurrent client dispatch in libvirtd

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Tue, Jan 13, 2009 at 05:45:43PM +0000, Daniel P. Berrange wrote:
> Historically libvirtd was single threaded, serializing all
> requests across clients. An recent patch allowed multiple
> threads, so multiple clients could run in parallel. A single
> client was still serialized.
> 
> This patch removes that final restriction, allowing a single
> client to have multiple in-flight RPC requests & replies.
> Each client now has 3 variables
> 
>  - rx: zero or one. If ready for, or in process of reading
>    a message this will be non-null. If we're throttling the
>    client requests, it'll be NULL. Once completely read, moved
>    to the 'dx' queue.
>  - dx: zero or many. Requests read off wire currently waiting
>    to be picked up for processing by a worker thread. Once a
>    worker is available the message is removed from the 'dx'
>    queue for duration of processing. A reply is put on the
>    'tx' queue once a call is finished
>  - tx: zero or many. Replies in process of, or ready to be,
>    sent back to a client. Also includes any asynchronous
>    event notifications to be sent.
> 
> The 'max_client_requests' configuration parameter controls
> how many RPC request+reply calls can be processed in parallel.
> Once this limit is reached, no more requests will be read off
> the wire until a reply has been completed transmitted.
> 
> Each request requires upto 256 KB of memory, thus memory usage
> for I/O is bounded by 'max_client_requests * max_clients * 256k'
> 
> Compatability:
> 
>  - old client -> old server - everything serialized
>  - old client -> new server - client never sends a new
>    request until its first is finished, so effectively
>    serialized, and no compatability problems
>  - new client -> old server - client sends many requests
>    without waiting for replies. The server will only
>    read and proess one at a time, so effectively
>    serialized, and no compatability problems
>  - new client -> new server - fully parallelized
> 
> The code has been stress tested by running 500 concurrent
> clients and fixing the crashes, deadlocks and memory leaks.
> Seems reasonably robust now.
> 
>  libvirtd.aug      |    2 
>  libvirtd.conf     |   16 +
>  qemud.c           |  630 ++++++++++++++++++++++++++++++++----------------------
>  qemud.h           |   66 +++--
>  remote.c          |   48 +++-
>  test_libvirtd.aug |   26 ++
>  6 files changed, 502 insertions(+), 286 deletions(-)

Updated patch to remove the s/X_OK/R_OK/ change that's now in
CVS

Daniel

diff --git a/qemud/libvirtd.aug b/qemud/libvirtd.aug
--- a/qemud/libvirtd.aug
+++ b/qemud/libvirtd.aug
@@ -53,6 +53,8 @@ module Libvirtd =
    let processing_entry = int_entry "min_workers"
                         | int_entry "max_workers"
                         | int_entry "max_clients"
+                        | int_entry "max_requests"
+                        | int_entry "max_client_requests"
 
    let logging_entry = int_entry "log_level"
                      | str_entry "log_filters"
diff --git a/qemud/libvirtd.conf b/qemud/libvirtd.conf
--- a/qemud/libvirtd.conf
+++ b/qemud/libvirtd.conf
@@ -247,6 +247,22 @@
 #min_workers = 5
 #max_workers = 20
 
+# Total global limit on concurrent RPC calls. Should be
+# at least as large as max_workers. Beyond this, RPC requests
+# will be read into memory and queued. This directly impact
+# memory usage, currently each request requires 256 KB of
+# memory. So by default upto 5 MB of memory is used
+#
+# XXX this isn't actually enforced yet, only the per-client
+# limit is used so far
+#max_requests = 20
+
+# Limit on concurrent requests from a single client
+# connection. To avoid one client monopolizing the server
+# this should be a small fraction of the global max_requests
+# and max_workers parameter
+#max_client_requests = 5
+
 #################################################################
 #
 # Logging controls
diff --git a/qemud/qemud.c b/qemud/qemud.c
--- a/qemud/qemud.c
+++ b/qemud/qemud.c
@@ -138,6 +138,11 @@ static int min_workers = 5;
 static int max_workers = 20;
 static int max_clients = 20;
 
+/* Total number of 'in-process' RPC calls allowed across all clients */
+static int max_requests = 20;
+/* Total number of 'in-process' RPC calls allowed by a single client*/
+static int max_client_requests = 5;
+
 #define DH_BITS 1024
 
 static sig_atomic_t sig_errors = 0;
@@ -162,9 +167,36 @@ static void sig_handler(int sig, siginfo
 
 static void qemudDispatchClientEvent(int watch, int fd, int events, void *opaque);
 static void qemudDispatchServerEvent(int watch, int fd, int events, void *opaque);
-static int qemudRegisterClientEvent(struct qemud_server *server,
-                                    struct qemud_client *client,
-                                    int removeFirst);
+
+
+void
+qemudClientMessageQueuePush(struct qemud_client_message **queue,
+                            struct qemud_client_message *msg)
+{
+    struct qemud_client_message *tmp = *queue;
+
+    if (tmp) {
+        while (tmp->next)
+            tmp = tmp->next;
+        tmp->next = msg;
+    } else {
+        *queue = msg;
+    }
+}
+
+static struct qemud_client_message *
+qemudClientMessageQueuePop(struct qemud_client_message **queue)
+{
+    struct qemud_client_message *tmp = *queue;
+
+    if (tmp)
+        *queue = tmp->next;
+    else
+        *queue = NULL;
+
+    tmp->next = NULL;
+    return tmp;
+}
 
 static int
 remoteCheckCertFile(const char *type, const char *file)
@@ -1042,6 +1074,8 @@ remoteCheckCertificate (gnutls_session_t
 static int
 remoteCheckAccess (struct qemud_client *client)
 {
+    struct qemud_client_message *confirm;
+
     /* Verify client certificate. */
     if (remoteCheckCertificate (client->tlssession) == -1) {
         VIR_ERROR0(_("remoteCheckCertificate: "
@@ -1051,14 +1085,25 @@ remoteCheckAccess (struct qemud_client *
                           "is set so the bad certificate is ignored"));
     }
 
+    if (client->tx) {
+        VIR_INFO("%s",
+                 _("client had unexpected data pending tx after access check"));
+        return -1;
+    }
+
+    if (VIR_ALLOC(confirm) < 0)
+        return -1;
+
     /* Checks have succeeded.  Write a '\1' byte back to the client to
      * indicate this (otherwise the socket is abruptly closed).
      * (NB. The '\1' byte is sent in an encrypted record).
      */
-    client->bufferLength = 1;
-    client->bufferOffset = 0;
-    client->buffer[0] = '\1';
-    client->mode = QEMUD_MODE_TX_PACKET;
+    confirm->async = 1;
+    confirm->bufferLength = 1;
+    confirm->bufferOffset = 0;
+    confirm->buffer[0] = '\1';
+
+    client->tx = confirm;
     return 0;
 }
 
@@ -1084,6 +1129,7 @@ int qemudGetSocketIdentity(int fd, uid_t
 }
 #endif
 
+
 static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket *sock) {
     int fd;
     struct sockaddr_storage addr;
@@ -1099,7 +1145,7 @@ static int qemudDispatchServer(struct qe
     }
 
     if (server->nclients >= max_clients) {
-        VIR_ERROR0(_("Too many active clients, dropping connection"));
+        VIR_ERROR(_("Too many active clients (%d), dropping connection"), max_clients);
         close(fd);
         return -1;
     }
@@ -1137,6 +1183,12 @@ static int qemudDispatchServer(struct qe
     client->addrlen = addrlen;
     client->server = server;
 
+    /* Prepare one for packet receive */
+    if (VIR_ALLOC(client->rx) < 0)
+        goto cleanup;
+    client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
+
+
 #if HAVE_POLKIT
     /* Only do policy checks for non-root - allow root user
        through with no checks, as a fail-safe - root can easily
@@ -1158,9 +1210,7 @@ static int qemudDispatchServer(struct qe
 #endif
 
     if (client->type != QEMUD_SOCK_TYPE_TLS) {
-        client->mode = QEMUD_MODE_RX_HEADER;
-        client->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
-
+        /* Plain socket, so prepare to read first message */
         if (qemudRegisterClientEvent (server, client, 0) < 0)
             goto cleanup;
     } else {
@@ -1180,12 +1230,12 @@ static int qemudDispatchServer(struct qe
             if (remoteCheckAccess (client) == -1)
                 goto cleanup;
 
+            /* Handshake & cert check OK,  so prepare to read first message */
             if (qemudRegisterClientEvent(server, client, 0) < 0)
                 goto cleanup;
         } else if (ret == GNUTLS_E_INTERRUPTED || ret == GNUTLS_E_AGAIN) {
-            /* Most likely. */
-            client->mode = QEMUD_MODE_TLS_HANDSHAKE;
-            client->bufferLength = -1;
+            /* Most likely, need to do more handshake data */
+            client->handshake = 1;
 
             if (qemudRegisterClientEvent (server, client, 0) < 0)
                 goto cleanup;
@@ -1204,7 +1254,8 @@ static int qemudDispatchServer(struct qe
     if (client &&
         client->tlssession) gnutls_deinit (client->tlssession);
     close (fd);
-    free (client);
+    VIR_FREE(client->rx);
+    VIR_FREE(client);
     return -1;
 }
 
@@ -1216,8 +1267,7 @@ static int qemudDispatchServer(struct qe
  * We keep the libvirt connection open until any async
  * jobs have finished, then clean it up elsehwere
  */
-static void qemudDispatchClientFailure(struct qemud_server *server ATTRIBUTE_UNUSED,
-                                       struct qemud_client *client) {
+void qemudDispatchClientFailure(struct qemud_client *client) {
     virEventRemoveHandleImpl(client->watch);
 
     /* Deregister event delivery callback */
@@ -1242,7 +1292,7 @@ static struct qemud_client *qemudPending
     int i;
     for (i = 0 ; i < server->nclients ; i++) {
         virMutexLock(&server->clients[i]->lock);
-        if (server->clients[i]->mode == QEMUD_MODE_WAIT_DISPATCH) {
+        if (server->clients[i]->dx) {
             /* Delibrately don't unlock client - caller wants the lock */
             return server->clients[i];
         }
@@ -1256,8 +1306,9 @@ static void *qemudWorker(void *data)
     struct qemud_server *server = data;
 
     while (1) {
-        struct qemud_client *client;
-        int len;
+        struct qemud_client *client = NULL;
+        struct qemud_client_message *reply;
+
         virMutexLock(&server->lock);
         while ((client = qemudPendingJob(server)) == NULL) {
             if (virCondWait(&server->job, &server->lock) < 0) {
@@ -1268,55 +1319,64 @@ static void *qemudWorker(void *data)
         virMutexUnlock(&server->lock);
 
         /* We own a locked client now... */
-        client->mode = QEMUD_MODE_IN_DISPATCH;
         client->refs++;
 
-        if ((len = remoteDispatchClientRequest (server, client)) == 0)
-            qemudDispatchClientFailure(server, client);
+        /* Remove out message from dispatch queue while we use it */
+        reply = qemudClientMessageQueuePop(&client->dx);
 
-        /* Set up the output buffer. */
-        client->mode = QEMUD_MODE_TX_PACKET;
-        client->bufferLength = len;
-        client->bufferOffset = 0;
+        /* This function drops the lock during dispatch,
+         * and re-acquires it before returning */
+        if (remoteDispatchClientRequest (server, client, reply) < 0) {
+            VIR_FREE(reply);
+            qemudDispatchClientFailure(client);
+            client->refs--;
+            virMutexUnlock(&client->lock);
+            continue;
+        }
+
+        /* Put reply on end of tx queue to send out  */
+        qemudClientMessageQueuePush(&client->tx, reply);
 
         if (qemudRegisterClientEvent(server, client, 1) < 0)
-            qemudDispatchClientFailure(server, client);
+            qemudDispatchClientFailure(client);
 
         client->refs--;
         virMutexUnlock(&client->lock);
-        virMutexUnlock(&server->lock);
     }
 }
 
 
-static int qemudClientReadBuf(struct qemud_server *server,
-                              struct qemud_client *client,
+/*
+ * Read data into buffer using wire decoding (plain or TLS)
+ */
+static int qemudClientReadBuf(struct qemud_client *client,
                               char *data, unsigned len) {
     int ret;
 
     /*qemudDebug ("qemudClientRead: len = %d", len);*/
 
     if (!client->tlssession) {
-        if ((ret = read (client->fd, data, len)) <= 0) {
-            if (ret == 0 || errno != EAGAIN) {
-                if (ret != 0)
-                    VIR_ERROR(_("read: %s"), strerror (errno));
-                qemudDispatchClientFailure(server, client);
-            }
+        ret = read (client->fd, data, len);
+        if (ret == -1 && (errno == EAGAIN ||
+                          errno == EINTR))
+            return 0;
+        if (ret <= 0) {
+            if (ret != 0)
+                VIR_ERROR(_("read: %s"), strerror (errno));
+            qemudDispatchClientFailure(client);
             return -1;
         }
     } else {
         ret = gnutls_record_recv (client->tlssession, data, len);
-        if (qemudRegisterClientEvent (server, client, 1) < 0)
-            qemudDispatchClientFailure (server, client);
-        else if (ret <= 0) {
-            if (ret == 0 || (ret != GNUTLS_E_AGAIN &&
-                             ret != GNUTLS_E_INTERRUPTED)) {
-                if (ret != 0)
-                    VIR_ERROR(_("gnutls_record_recv: %s"),
-                              gnutls_strerror (ret));
-                qemudDispatchClientFailure (server, client);
-            }
+
+        if (ret == -1 && (ret == GNUTLS_E_AGAIN &&
+                          ret == GNUTLS_E_INTERRUPTED))
+            return 0;
+        if (ret <= 0) {
+            if (ret != 0)
+                VIR_ERROR(_("gnutls_record_recv: %s"),
+                          gnutls_strerror (ret));
+            qemudDispatchClientFailure(client);
             return -1;
         }
     }
@@ -1324,21 +1384,26 @@ static int qemudClientReadBuf(struct qem
     return ret;
 }
 
-static int qemudClientReadPlain(struct qemud_server *server,
-                                struct qemud_client *client) {
+/*
+ * Read data into buffer without decoding
+ */
+static int qemudClientReadPlain(struct qemud_client *client) {
     int ret;
-    ret = qemudClientReadBuf(server, client,
-                             client->buffer + client->bufferOffset,
-                             client->bufferLength - client->bufferOffset);
-    if (ret < 0)
-        return ret;
-    client->bufferOffset += ret;
-    return 0;
+    ret = qemudClientReadBuf(client,
+                             client->rx->buffer + client->rx->bufferOffset,
+                             client->rx->bufferLength - client->rx->bufferOffset);
+    if (ret <= 0)
+        return ret; /* -1 error, 0 eagain */
+
+    client->rx->bufferOffset += ret;
+    return ret;
 }
 
 #if HAVE_SASL
-static int qemudClientReadSASL(struct qemud_server *server,
-                               struct qemud_client *client) {
+/*
+ * Read data into buffer decoding with SASL
+ */
+static int qemudClientReadSASL(struct qemud_client *client) {
     int got, want;
 
     /* We're doing a SSF data read, so now its times to ensure
@@ -1350,30 +1415,33 @@ static int qemudClientReadSASL(struct qe
 
     /* Need to read some more data off the wire */
     if (client->saslDecoded == NULL) {
+        int ret;
         char encoded[8192];
         int encodedLen = sizeof(encoded);
-        encodedLen = qemudClientReadBuf(server, client, encoded, encodedLen);
+        encodedLen = qemudClientReadBuf(client, encoded, encodedLen);
 
         if (encodedLen < 0)
             return -1;
 
-        sasl_decode(client->saslconn, encoded, encodedLen,
-                    &client->saslDecoded, &client->saslDecodedLength);
+        ret = sasl_decode(client->saslconn, encoded, encodedLen,
+                          &client->saslDecoded, &client->saslDecodedLength);
+        if (ret != SASL_OK)
+            return -1;
 
         client->saslDecodedOffset = 0;
     }
 
     /* Some buffered decoded data to return now */
     got = client->saslDecodedLength - client->saslDecodedOffset;
-    want = client->bufferLength - client->bufferOffset;
+    want = client->rx->bufferLength - client->rx->bufferOffset;
 
     if (want > got)
         want = got;
 
-    memcpy(client->buffer + client->bufferOffset,
+    memcpy(client->rx->buffer + client->rx->bufferOffset,
            client->saslDecoded + client->saslDecodedOffset, want);
     client->saslDecodedOffset += want;
-    client->bufferOffset += want;
+    client->rx->bufferOffset += want;
 
     if (client->saslDecodedOffset == client->saslDecodedLength) {
         client->saslDecoded = NULL;
@@ -1384,132 +1452,125 @@ static int qemudClientReadSASL(struct qe
 }
 #endif
 
-static int qemudClientRead(struct qemud_server *server,
-                           struct qemud_client *client) {
+/*
+ * Read as much data off wire as possible till we fill our
+ * buffer, or would block on I/O
+ */
+static int qemudClientRead(struct qemud_client *client) {
 #if HAVE_SASL
     if (client->saslSSF & QEMUD_SASL_SSF_READ)
-        return qemudClientReadSASL(server, client);
+        return qemudClientReadSASL(client);
     else
 #endif
-        return qemudClientReadPlain(server, client);
+        return qemudClientReadPlain(client);
 }
 
 
-static void qemudDispatchClientRead(struct qemud_server *server, struct qemud_client *client) {
-    unsigned int len;
+/*
+ * Read data until we get a complete message to process
+ */
+static void qemudDispatchClientRead(struct qemud_server *server,
+                                    struct qemud_client *client) {
     /*qemudDebug ("qemudDispatchClientRead: mode = %d", client->mode);*/
 
-    switch (client->mode) {
-    case QEMUD_MODE_RX_HEADER: {
+readmore:
+    if (qemudClientRead(client) < 0)
+        return; /* Error, or blocking */
+
+    if (client->rx->bufferOffset < client->rx->bufferLength)
+        return; /* Not read enough */
+
+    /* Either done with length word header */
+    if (client->rx->bufferLength == REMOTE_MESSAGE_HEADER_XDR_LEN) {
+        int len;
         XDR x;
 
-        if (qemudClientRead(server, client) < 0)
-            return; /* Error, or blocking */
+        xdrmem_create(&x, client->rx->buffer, client->rx->bufferLength, XDR_DECODE);
 
-        if (client->bufferOffset < client->bufferLength)
-            return; /* Not read enough */
-
-        xdrmem_create(&x, client->buffer, client->bufferLength, XDR_DECODE);
-
-        if (!xdr_u_int(&x, &len)) {
+        if (!xdr_int(&x, &len)) {
             xdr_destroy (&x);
             DEBUG0("Failed to decode packet length");
-            qemudDispatchClientFailure(server, client);
+            qemudDispatchClientFailure(client);
             return;
         }
         xdr_destroy (&x);
 
+        /* Length includes the size of the length word itself */
+        len -= REMOTE_MESSAGE_HEADER_XDR_LEN;
+
         if (len > REMOTE_MESSAGE_MAX) {
             DEBUG("Packet length %u too large", len);
-            qemudDispatchClientFailure(server, client);
+            qemudDispatchClientFailure(client);
             return;
         }
 
         /* Length include length of the length field itself, so
          * check minimum size requirements */
-        if (len <= REMOTE_MESSAGE_HEADER_XDR_LEN) {
+        if (len <= 0) {
             DEBUG("Packet length %u too small", len);
-            qemudDispatchClientFailure(server, client);
+            qemudDispatchClientFailure(client);
             return;
         }
 
-        client->mode = QEMUD_MODE_RX_PAYLOAD;
-        client->bufferLength = len - REMOTE_MESSAGE_HEADER_XDR_LEN;
-        client->bufferOffset = 0;
+        /* Prepare to read rest of message */
+        client->rx->bufferLength += len;
 
         if (qemudRegisterClientEvent(server, client, 1) < 0) {
-            qemudDispatchClientFailure(server, client);
+            qemudDispatchClientFailure(client);
             return;
         }
 
-        /* Fall through */
-    }
+        /* Try and read payload immediately instead of going back
+           into poll() because chances are the data is already
+           waiting for us */
+        goto readmore;
+    } else {
+        /* Move completed message to the end of the dispatch queue */
+        qemudClientMessageQueuePush(&client->dx, client->rx);
+        client->rx = NULL;
+        client->nrequests++;
 
-    case QEMUD_MODE_RX_PAYLOAD: {
-        if (qemudClientRead(server, client) < 0)
-            return; /* Error, or blocking */
+        /* Possibly need to create another receive buffer */
+        if ((client->nrequests < max_client_requests &&
+             VIR_ALLOC(client->rx) < 0)) {
+            qemudDispatchClientFailure(client);
+        } else {
+            if (client->rx)
+                client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
 
-        if (client->bufferOffset < client->bufferLength)
-            return; /* Not read enough */
-
-        client->mode = QEMUD_MODE_WAIT_DISPATCH;
-        if (qemudRegisterClientEvent(server, client, 1) < 0)
-            qemudDispatchClientFailure(server, client);
-
-        virCondSignal(&server->job);
-
-        break;
-    }
-
-    case QEMUD_MODE_TLS_HANDSHAKE: {
-        int ret;
-
-        /* Continue the handshake. */
-        ret = gnutls_handshake (client->tlssession);
-        if (ret == 0) {
-            /* Finished.  Next step is to check the certificate. */
-            if (remoteCheckAccess (client) == -1)
-                qemudDispatchClientFailure (server, client);
-            else if (qemudRegisterClientEvent (server, client, 1) < 0)
-                qemudDispatchClientFailure (server, client);
-        } else if (ret != GNUTLS_E_AGAIN && ret != GNUTLS_E_INTERRUPTED) {
-            VIR_ERROR(_("TLS handshake failed: %s"),
-                      gnutls_strerror (ret));
-            qemudDispatchClientFailure (server, client);
-        } else {
-            if (qemudRegisterClientEvent (server ,client, 1) < 0)
-                qemudDispatchClientFailure (server, client);
+            if (qemudRegisterClientEvent(server, client, 1) < 0)
+                qemudDispatchClientFailure(client);
+            else
+                /* Tell one of the workers to get on with it... */
+                virCondSignal(&server->job);
         }
-
-        break;
-    }
-
-    default:
-        DEBUG("Got unexpected data read while in %d mode", client->mode);
-        qemudDispatchClientFailure(server, client);
     }
 }
 
 
-static int qemudClientWriteBuf(struct qemud_server *server,
-                               struct qemud_client *client,
+/*
+ * Send a chunk of data using wire encoding (plain or TLS)
+ */
+static int qemudClientWriteBuf(struct qemud_client *client,
                                const char *data, int len) {
     int ret;
     if (!client->tlssession) {
-        if ((ret = safewrite(client->fd, data, len)) == -1) {
+        if ((ret = write(client->fd, data, len)) == -1) {
+            if (errno == EAGAIN || errno == EINTR)
+                return 0;
             VIR_ERROR(_("write: %s"), strerror (errno));
-            qemudDispatchClientFailure(server, client);
+            qemudDispatchClientFailure(client);
             return -1;
         }
     } else {
         ret = gnutls_record_send (client->tlssession, data, len);
-        if (qemudRegisterClientEvent (server, client, 1) < 0)
-            qemudDispatchClientFailure (server, client);
-        else if (ret < 0) {
-            if (ret != GNUTLS_E_INTERRUPTED && ret != GNUTLS_E_AGAIN) {
-                VIR_ERROR(_("gnutls_record_send: %s"), gnutls_strerror (ret));
-                qemudDispatchClientFailure (server, client);
-            }
+        if (ret < 0) {
+            if (ret == GNUTLS_E_INTERRUPTED ||
+                ret == GNUTLS_E_AGAIN)
+                return 0;
+
+            VIR_ERROR(_("gnutls_record_send: %s"), gnutls_strerror (ret));
+            qemudDispatchClientFailure(client);
             return -1;
         }
     }
@@ -1517,42 +1578,49 @@ static int qemudClientWriteBuf(struct qe
 }
 
 
-static int qemudClientWritePlain(struct qemud_server *server,
-                                 struct qemud_client *client) {
-    int ret = qemudClientWriteBuf(server, client,
-                                  client->buffer + client->bufferOffset,
-                                  client->bufferLength - client->bufferOffset);
-    if (ret < 0)
-        return -1;
-    client->bufferOffset += ret;
-    return 0;
+/*
+ * Send client->tx using no encoding
+ */
+static int qemudClientWritePlain(struct qemud_client *client) {
+    int ret = qemudClientWriteBuf(client,
+                                  client->tx->buffer + client->tx->bufferOffset,
+                                  client->tx->bufferLength - client->tx->bufferOffset);
+    if (ret <= 0)
+        return ret; /* -1 error, 0 = egain */
+    client->tx->bufferOffset += ret;
+    return ret;
 }
 
 
 #if HAVE_SASL
-static int qemudClientWriteSASL(struct qemud_server *server,
-                                struct qemud_client *client) {
+/*
+ * Send client->tx using SASL encoding
+ */
+static int qemudClientWriteSASL(struct qemud_client *client) {
     int ret;
 
     /* Not got any pending encoded data, so we need to encode raw stuff */
     if (client->saslEncoded == NULL) {
         int err;
         err = sasl_encode(client->saslconn,
-                          client->buffer + client->bufferOffset,
-                          client->bufferLength - client->bufferOffset,
+                          client->tx->buffer + client->tx->bufferOffset,
+                          client->tx->bufferLength - client->tx->bufferOffset,
                           &client->saslEncoded,
                           &client->saslEncodedLength);
 
+        if (err != SASL_OK)
+            return -1;
+
         client->saslEncodedOffset = 0;
     }
 
     /* Send some of the encoded stuff out on the wire */
-    ret = qemudClientWriteBuf(server, client,
+    ret = qemudClientWriteBuf(client,
                               client->saslEncoded + client->saslEncodedOffset,
                               client->saslEncodedLength - client->saslEncodedOffset);
 
-    if (ret < 0)
-        return -1;
+    if (ret <= 0)
+        return ret; /* -1 error, 0 == egain */
 
     /* Note how much we sent */
     client->saslEncodedOffset += ret;
@@ -1561,77 +1629,101 @@ static int qemudClientWriteSASL(struct q
     if (client->saslEncodedOffset == client->saslEncodedLength) {
         client->saslEncoded = NULL;
         client->saslEncodedOffset = client->saslEncodedLength = 0;
-        client->bufferOffset = client->bufferLength;
+
+        /* Mark as complete, so caller detects completion */
+        client->tx->bufferOffset = client->tx->bufferLength;
     }
 
-    return 0;
+    return ret;
 }
 #endif
 
-static int qemudClientWrite(struct qemud_server *server,
-                            struct qemud_client *client) {
+/*
+ * Send as much data in the client->tx as possible
+ */
+static int qemudClientWrite(struct qemud_client *client) {
 #if HAVE_SASL
     if (client->saslSSF & QEMUD_SASL_SSF_WRITE)
-        return qemudClientWriteSASL(server, client);
+        return qemudClientWriteSASL(client);
     else
 #endif
-        return qemudClientWritePlain(server, client);
+        return qemudClientWritePlain(client);
 }
 
 
-void
+/*
+ * Process all queued client->tx messages until
+ * we would block on I/O
+ */
+static void
 qemudDispatchClientWrite(struct qemud_server *server,
                          struct qemud_client *client) {
-    switch (client->mode) {
-    case QEMUD_MODE_TX_PACKET: {
-        if (qemudClientWrite(server, client) < 0)
-            return;
-
-        if (client->bufferOffset == client->bufferLength) {
-            if (client->closing) {
-                qemudDispatchClientFailure (server, client);
-            } else {
-                /* Done writing, switch back to receive */
-                client->mode = QEMUD_MODE_RX_HEADER;
-                client->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
-                client->bufferOffset = 0;
-
-                if (qemudRegisterClientEvent (server, client, 1) < 0)
-                    qemudDispatchClientFailure (server, client);
-            }
-        }
-        /* Still writing */
-        break;
-    }
-
-    case QEMUD_MODE_TLS_HANDSHAKE: {
+    while (client->tx) {
         int ret;
 
-        /* Continue the handshake. */
-        ret = gnutls_handshake (client->tlssession);
-        if (ret == 0) {
-            /* Finished.  Next step is to check the certificate. */
-            if (remoteCheckAccess (client) == -1)
-                qemudDispatchClientFailure (server, client);
-            else if (qemudRegisterClientEvent (server, client, 1))
-                qemudDispatchClientFailure (server, client);
-        } else if (ret != GNUTLS_E_AGAIN && ret != GNUTLS_E_INTERRUPTED) {
-            VIR_ERROR(_("TLS handshake failed: %s"), gnutls_strerror (ret));
-            qemudDispatchClientFailure (server, client);
-        } else {
-            if (qemudRegisterClientEvent (server, client, 1))
-                qemudDispatchClientFailure (server, client);
+        ret = qemudClientWrite(client);
+        if (ret < 0) {
+            qemudDispatchClientFailure(client);
+            return;
         }
+        if (ret == 0)
+            return; /* Would block on write EAGAIN */
 
-        break;
-    }
+        if (client->tx->bufferOffset == client->tx->bufferLength) {
+            struct qemud_client_message *reply;
 
-    default:
-        DEBUG("Got unexpected data write while in %d mode", client->mode);
-        qemudDispatchClientFailure(server, client);
+            /* Get finished reply from head of tx queue */
+            reply = qemudClientMessageQueuePop(&client->tx);
+
+            /* If its not an async message, then we have
+             * just completed an RPC request */
+            if (!reply->async)
+                client->nrequests--;
+
+            /* Move record to end of 'rx' ist */
+            if (!client->rx &&
+                client->nrequests < max_client_requests) {
+                /* Reset message record for next RX attempt */
+                client->rx = reply;
+                client->rx->bufferOffset = 0;
+                client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
+            } else {
+                VIR_FREE(reply);
+            }
+
+            if (client->closing ||
+                qemudRegisterClientEvent (server, client, 1) < 0)
+                 qemudDispatchClientFailure(client);
+         }
     }
 }
 
+static void
+qemudDispatchClientHandshake(struct qemud_server *server,
+                             struct qemud_client *client) {
+    int ret;
+    /* Continue the handshake. */
+    ret = gnutls_handshake (client->tlssession);
+    if (ret == 0) {
+        /* Finished.  Next step is to check the certificate. */
+        if (remoteCheckAccess (client) == -1)
+            qemudDispatchClientFailure(client);
+        else if (qemudRegisterClientEvent (server, client, 1))
+            qemudDispatchClientFailure(client);
+    } else if (ret == GNUTLS_E_AGAIN ||
+               ret == GNUTLS_E_INTERRUPTED) {
+        /* Carry on waiting for more handshake. Update
+           the events just in case handshake data flow
+           direction has changed */
+        if (qemudRegisterClientEvent (server, client, 1))
+            qemudDispatchClientFailure(client);
+    } else {
+        /* Fatal error in handshake */
+        VIR_ERROR(_("TLS handshake failed: %s"),
+                  gnutls_strerror (ret));
+        qemudDispatchClientFailure(client);
+    }
+}
 
 static void
 qemudDispatchClientEvent(int watch, int fd, int events, void *opaque) {
@@ -1642,59 +1734,66 @@ qemudDispatchClientEvent(int watch, int 
     virMutexLock(&server->lock);
 
     for (i = 0 ; i < server->nclients ; i++) {
+        virMutexLock(&server->clients[i]->lock);
         if (server->clients[i]->watch == watch) {
             client = server->clients[i];
             break;
         }
+        virMutexUnlock(&server->clients[i]->lock);
     }
 
+    virMutexUnlock(&server->lock);
+
     if (!client) {
-        virMutexUnlock(&server->lock);
         return;
     }
 
-    virMutexLock(&client->lock);
-    virMutexUnlock(&server->lock);
+    if (client->fd != fd) {
+        virMutexUnlock(&client->lock);
+        return;
+    }
 
-    if (client->fd != fd)
-        return;
+    if (events & (VIR_EVENT_HANDLE_WRITABLE |
+                  VIR_EVENT_HANDLE_READABLE)) {
+        if (client->handshake) {
+            qemudDispatchClientHandshake(server, client);
+        } else {
+            if (events & VIR_EVENT_HANDLE_WRITABLE)
+                qemudDispatchClientWrite(server, client);
+            if (events == VIR_EVENT_HANDLE_READABLE)
+                qemudDispatchClientRead(server, client);
+        }
+    }
 
-    if (events == VIR_EVENT_HANDLE_WRITABLE)
-        qemudDispatchClientWrite(server, client);
-    else if (events == VIR_EVENT_HANDLE_READABLE)
-        qemudDispatchClientRead(server, client);
-    else
-        qemudDispatchClientFailure(server, client);
+    /* NB, will get HANGUP + READABLE at same time upon
+     * disconnect */
+    if (events & (VIR_EVENT_HANDLE_ERROR |
+                  VIR_EVENT_HANDLE_HANGUP))
+        qemudDispatchClientFailure(client);
+
     virMutexUnlock(&client->lock);
 }
 
-static int qemudRegisterClientEvent(struct qemud_server *server,
-                                    struct qemud_client *client,
-                                    int update) {
-    int mode;
-    switch (client->mode) {
-    case QEMUD_MODE_TLS_HANDSHAKE:
+int qemudRegisterClientEvent(struct qemud_server *server,
+                             struct qemud_client *client,
+                             int update) {
+    int mode = 0;
+
+    if (client->handshake) {
         if (gnutls_record_get_direction (client->tlssession) == 0)
-            mode = VIR_EVENT_HANDLE_READABLE;
+            mode |= VIR_EVENT_HANDLE_READABLE;
         else
-            mode = VIR_EVENT_HANDLE_WRITABLE;
-        break;
+            mode |= VIR_EVENT_HANDLE_WRITABLE;
+    } else {
+        /* If there is a message on the rx queue then
+         * we're wanting more input */
+        if (client->rx)
+            mode |= VIR_EVENT_HANDLE_READABLE;
 
-    case QEMUD_MODE_RX_HEADER:
-    case QEMUD_MODE_RX_PAYLOAD:
-        mode = VIR_EVENT_HANDLE_READABLE;
-        break;
-
-    case QEMUD_MODE_TX_PACKET:
-        mode = VIR_EVENT_HANDLE_WRITABLE;
-        break;
-
-    case QEMUD_MODE_WAIT_DISPATCH:
-        mode = 0;
-        break;
-
-    default:
-        return -1;
+        /* If there are one or more messages to send back to client,
+           then monitor for writability on socket */
+        if (client->tx)
+            mode |= VIR_EVENT_HANDLE_WRITABLE;
     }
 
     if (update) {
@@ -1760,6 +1859,29 @@ static void qemudInactiveTimer(int timer
     }
 }
 
+static void qemudFreeClient(struct qemud_client *client) {
+    while (client->rx) {
+        struct qemud_client_message *msg
+            = qemudClientMessageQueuePop(&client->rx);
+        VIR_FREE(msg);
+    }
+    while (client->dx) {
+        struct qemud_client_message *msg
+            = qemudClientMessageQueuePop(&client->dx);
+        VIR_FREE(msg);
+    }
+    while (client->tx) {
+        struct qemud_client_message *msg
+            = qemudClientMessageQueuePop(&client->tx);
+        VIR_FREE(msg);
+    }
+
+    if (client->conn)
+        virConnectClose(client->conn);
+    virMutexDestroy(&client->lock);
+    VIR_FREE(client);
+}
+
 static int qemudRunLoop(struct qemud_server *server) {
     int timerid = -1;
     int ret = -1, i;
@@ -1796,8 +1918,11 @@ static int qemudRunLoop(struct qemud_ser
         }
 
         virMutexUnlock(&server->lock);
-        if (qemudOneLoop() < 0)
+        if (qemudOneLoop() < 0) {
+            virMutexLock(&server->lock);
+            DEBUG0("Loop iteration error, exiting\n");
             break;
+        }
         virMutexLock(&server->lock);
 
     reprocess:
@@ -1808,17 +1933,18 @@ static int qemudRunLoop(struct qemud_ser
                 && server->clients[i]->refs == 0;
             virMutexUnlock(&server->clients[i]->lock);
             if (inactive) {
-                if (server->clients[i]->conn)
-                    virConnectClose(server->clients[i]->conn);
-                virMutexDestroy(&server->clients[i]->lock);
-                VIR_FREE(server->clients[i]);
+                qemudFreeClient(server->clients[i]);
                 server->nclients--;
-                if (i < server->nclients) {
+                if (i < server->nclients)
                     memmove(server->clients + i,
                             server->clients + i + 1,
-                            server->nclients - i);
-                    goto reprocess;
+                            sizeof (*server->clients) * (server->nclients - i));
+
+                if (VIR_REALLOC_N(server->clients,
+                                  server->nclients) < 0) {
+                    ; /* ignore */
                 }
+                goto reprocess;
             }
         }
 
@@ -1843,6 +1969,7 @@ static int qemudRunLoop(struct qemud_ser
         pthread_join(thread, NULL);
         virMutexLock(&server->lock);
     }
+    VIR_FREE(server->workers);
 
     free(server->workers);
     virMutexUnlock(&server->lock);
@@ -2223,6 +2350,9 @@ remoteReadConfigFile (struct qemud_serve
     GET_CONF_INT (conf, filename, max_workers);
     GET_CONF_INT (conf, filename, max_clients);
 
+    GET_CONF_INT (conf, filename, max_requests);
+    GET_CONF_INT (conf, filename, max_client_requests);
+
     virConfFree (conf);
     return 0;
 
diff --git a/qemud/qemud.h b/qemud/qemud.h
--- a/qemud/qemud.h
+++ b/qemud/qemud.h
@@ -65,15 +65,6 @@
 
 #define qemudDebug DEBUG
 
-enum qemud_mode {
-    QEMUD_MODE_RX_HEADER,       /* Receiving the fixed length RPC header data */
-    QEMUD_MODE_RX_PAYLOAD,      /* Receiving the variable length RPC payload data */
-    QEMUD_MODE_WAIT_DISPATCH,   /* Message received, waiting for worker to process */
-    QEMUD_MODE_IN_DISPATCH,     /* RPC call being processed */
-    QEMUD_MODE_TX_PACKET,       /* Transmitting reply to RPC call */
-    QEMUD_MODE_TLS_HANDSHAKE,   /* Performing TLS handshake */
-};
-
 /* Whether we're passing reads & writes through a sasl SSF */
 enum qemud_sasl_ssf {
     QEMUD_SASL_SSF_NONE = 0,
@@ -87,6 +78,18 @@ enum qemud_sock_type {
     QEMUD_SOCK_TYPE_TLS = 2,
 };
 
+struct qemud_client_message;
+
+struct qemud_client_message {
+    char buffer [REMOTE_MESSAGE_MAX + REMOTE_MESSAGE_HEADER_XDR_LEN];
+    unsigned int bufferLength;
+    unsigned int bufferOffset;
+
+    int async : 1;
+
+    struct qemud_client_message *next;
+};
+
 /* Stores the per-client connection state */
 struct qemud_client {
     virMutex lock;
@@ -97,7 +100,6 @@ struct qemud_client {
     int watch;
     int readonly:1;
     int closing:1;
-    enum qemud_mode mode;
 
     struct sockaddr_storage addr;
     socklen_t addrlen;
@@ -105,6 +107,7 @@ struct qemud_client {
     int type; /* qemud_sock_type */
     gnutls_session_t tlssession;
     int auth;
+    int handshake : 1; /* If we're in progress for TLS handshake */
 #if HAVE_SASL
     sasl_conn_t *saslconn;
     int saslSSF;
@@ -117,12 +120,20 @@ struct qemud_client {
     char *saslUsername;
 #endif
 
-    unsigned int incomingSerial;
-    unsigned int outgoingSerial;
-
-    char buffer [REMOTE_MESSAGE_MAX];
-    unsigned int bufferLength;
-    unsigned int bufferOffset;
+    /* Count of meages in 'dx' or 'tx' queue
+     * ie RPC calls in progress. Does not count
+     * async events which are not used for
+     * throttling calculations */
+    int nrequests;
+    /* Zero or one messages being received. Zero if
+     * nrequests >= max_clients and throttling */
+    struct qemud_client_message *rx;
+    /* Zero or many messages waiting for a worker
+     * to process them */
+    struct qemud_client_message *dx;
+    /* Zero or many messages waiting for transmit
+     * back to client, including async events */
+    struct qemud_client_message *tx;
 
     /* This is only valid if a remote open call has been made on this
      * connection, otherwise it will be NULL.  Also if remote close is
@@ -181,16 +192,20 @@ void qemudLog(int priority, const char *
 int qemudSetCloseExec(int fd);
 int qemudSetNonBlock(int fd);
 
-unsigned int
+int
 remoteDispatchClientRequest (struct qemud_server *server,
-                             struct qemud_client *client);
+                             struct qemud_client *client,
+                             struct qemud_client_message *req);
 
-void qemudDispatchClientWrite(struct qemud_server *server,
-                             struct qemud_client *client);
+int qemudRegisterClientEvent(struct qemud_server *server,
+                             struct qemud_client *client,
+                             int update);
 
-#if HAVE_POLKIT
-int qemudGetSocketIdentity(int fd, uid_t *uid, pid_t *pid);
-#endif
+void qemudDispatchClientFailure(struct qemud_client *client);
+
+void
+qemudClientMessageQueuePush(struct qemud_client_message **queue,
+                            struct qemud_client_message *msg);
 
 int remoteRelayDomainEvent (virConnectPtr conn ATTRIBUTE_UNUSED,
                             virDomainPtr dom,
@@ -198,4 +213,9 @@ int remoteRelayDomainEvent (virConnectPt
                             int detail,
                             void *opaque);
 
+
+#if HAVE_POLKIT
+int qemudGetSocketIdentity(int fd, uid_t *uid, pid_t *pid);
 #endif
+
+#endif
diff --git a/qemud/remote.c b/qemud/remote.c
--- a/qemud/remote.c
+++ b/qemud/remote.c
@@ -111,6 +111,7 @@ static const dispatch_data const dispatc
 /* Prototypes */
 static void
 remoteDispatchDomainEventSend (struct qemud_client *client,
+                               struct qemud_client_message *msg,
                                virDomainPtr dom,
                                int event,
                                int detail);
@@ -219,9 +220,10 @@ remoteDispatchConnError (remote_error *r
  * Server object is unlocked
  * Client object is locked
  */
-unsigned int
+int
 remoteDispatchClientRequest (struct qemud_server *server,
-                             struct qemud_client *client)
+                             struct qemud_client *client,
+                             struct qemud_client_message *msg)
 {
     XDR xdr;
     remote_message_header req, rep;
@@ -237,7 +239,10 @@ remoteDispatchClientRequest (struct qemu
     memset(&rerr, 0, sizeof rerr);
 
     /* Parse the header. */
-    xdrmem_create (&xdr, client->buffer, client->bufferLength, XDR_DECODE);
+    xdrmem_create (&xdr,
+                   msg->buffer + REMOTE_MESSAGE_HEADER_XDR_LEN,
+                   msg->bufferLength - REMOTE_MESSAGE_HEADER_XDR_LEN,
+                   XDR_DECODE);
 
     if (!xdr_remote_message_header (&xdr, &req))
         goto fatal_error;
@@ -333,7 +338,7 @@ rpc_error:
     rep.status = rv < 0 ? REMOTE_ERROR : REMOTE_OK;
 
     /* Serialise the return header. */
-    xdrmem_create (&xdr, client->buffer, sizeof client->buffer, XDR_ENCODE);
+    xdrmem_create (&xdr, msg->buffer, sizeof msg->buffer, XDR_ENCODE);
 
     len = 0; /* We'll come back and write this later. */
     if (!xdr_int (&xdr, &len)) {
@@ -368,13 +373,17 @@ rpc_error:
         goto fatal_error;
 
     xdr_destroy (&xdr);
-    return len;
+
+    msg->bufferLength = len;
+    msg->bufferOffset = 0;
+
+    return 0;
 
 fatal_error:
     /* Seriously bad stuff happened, so we'll kill off this client
        and not send back any RPC error */
     xdr_destroy (&xdr);
-    return 0;
+    return -1;
 }
 
 int remoteRelayDomainEvent (virConnectPtr conn ATTRIBUTE_UNUSED,
@@ -386,9 +395,20 @@ int remoteRelayDomainEvent (virConnectPt
     struct qemud_client *client = opaque;
     REMOTE_DEBUG("Relaying domain event %d %d", event, detail);
 
-    if(client) {
-        remoteDispatchDomainEventSend (client, dom, event, detail);
-        qemudDispatchClientWrite(client->server,client);
+    if (client) {
+        struct qemud_client_message *ev;
+
+        if (VIR_ALLOC(ev) < 0)
+            return -1;
+
+        virMutexLock(&client->lock);
+
+        remoteDispatchDomainEventSend (client, ev, dom, event, detail);
+
+        if (qemudRegisterClientEvent(client->server, client, 1) < 0)
+            qemudDispatchClientFailure(client);
+
+        virMutexUnlock(&client->lock);
     }
     return 0;
 }
@@ -4202,6 +4222,7 @@ remoteDispatchDomainEventsDeregister (st
 
 static void
 remoteDispatchDomainEventSend (struct qemud_client *client,
+                               struct qemud_client_message *msg,
                                virDomainPtr dom,
                                int event,
                                int detail)
@@ -4222,7 +4243,7 @@ remoteDispatchDomainEventSend (struct qe
     rep.status = REMOTE_OK;
 
     /* Serialise the return header and event. */
-    xdrmem_create (&xdr, client->buffer, sizeof client->buffer, XDR_ENCODE);
+    xdrmem_create (&xdr, msg->buffer, sizeof msg->buffer, XDR_ENCODE);
 
     len = 0; /* We'll come back and write this later. */
     if (!xdr_int (&xdr, &len)) {
@@ -4263,9 +4284,10 @@ remoteDispatchDomainEventSend (struct qe
     xdr_destroy (&xdr);
 
     /* Send it. */
-    client->mode = QEMUD_MODE_TX_PACKET;
-    client->bufferLength = len;
-    client->bufferOffset = 0;
+    msg->async = 1;
+    msg->bufferLength = len;
+    msg->bufferOffset = 0;
+    qemudClientMessageQueuePush(&client->tx, msg);
 }
 
 /*----- Helpers. -----*/
diff --git a/qemud/test_libvirtd.aug b/qemud/test_libvirtd.aug
--- a/qemud/test_libvirtd.aug
+++ b/qemud/test_libvirtd.aug
@@ -246,6 +246,19 @@ max_clients = 20
 # of clients allowed
 min_workers = 5
 max_workers = 20
+
+# Total global limit on concurrent RPC calls. Should be
+# at least as large as max_workers. Beyond this, RPC requests
+# will be read into memory and queued. This directly impact
+# memory usage, currently each request requires 256 KB of
+# memory. So by default upto 5 MB of memory is used
+max_requests = 20
+
+# Limit on concurrent requests from a single client
+# connection. To avoid one client monopolizing the server
+# this should be a small fraction of the global max_requests
+# and max_workers parameter
+max_client_requests = 5
 "
 
    test Libvirtd.lns get conf =
@@ -499,3 +512,16 @@ max_workers = 20
         { "#comment" = "of clients allowed"}
         { "min_workers" = "5" }
         { "max_workers" = "20" }
+	{ "#empty" }
+        { "#comment" = "Total global limit on concurrent RPC calls. Should be" }
+        { "#comment" = "at least as large as max_workers. Beyond this, RPC requests" }
+        { "#comment" = "will be read into memory and queued. This directly impact" }
+        { "#comment" = "memory usage, currently each request requires 256 KB of" }
+        { "#comment" = "memory. So by default upto 5 MB of memory is used" }
+        { "max_requests" = "20" }
+	{ "#empty" }
+        { "#comment" = "Limit on concurrent requests from a single client" }
+        { "#comment" = "connection. To avoid one client monopolizing the server" }
+        { "#comment" = "this should be a small fraction of the global max_requests" }
+        { "#comment" = "and max_workers parameter" }
+        { "max_client_requests" = "5" }


-- 
|: Red Hat, Engineering, London   -o-   http://people.redhat.com/berrange/ :|
|: http://libvirt.org  -o-  http://virt-manager.org  -o-  http://ovirt.org :|
|: http://autobuild.org       -o-         http://search.cpan.org/~danberr/ :|
|: GnuPG: 7D3B9505  -o-  F3C9 553F A1DA 4AC2 5648 23C1 B3DF F742 7D3B 9505 :|

--
Libvir-list mailing list
Libvir-list@xxxxxxxxxx
https://www.redhat.com/mailman/listinfo/libvir-list

[Index of Archives]     [Virt Tools]     [Libvirt Users]     [Lib OS Info]     [Fedora Users]     [Fedora Desktop]     [Fedora SELinux]     [Big List of Linux Books]     [Yosemite News]     [KDE Users]     [Fedora Tools]