[PATCH v3 10/11] admin: Introduce virAdmServerSetThreadPoolParameters

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Since threadpool increments the current number of threads according to current
load, i.e. how many jobs are waiting in the queue. The count however, is
constrained by max and min limits of workers. The logic of this new API works
like this:
    1) setting the minimum
        a) When the limit is increased, depending on the current number of
           threads, new threads are possibly spawned if the current number of
           threads is less than the new minimum limit
        b) Decreasing the minimum limit has no possible effect on the current
           number of threads
    2) setting the maximum
        a) Icreasing the maximum limit has no immediate effect on the current
           number of threads, it only allows the threadpool to spawn more
           threads when new jobs, that would otherwise end up queued, arrive.
        b) Decreasing the maximum limit may affect the current number of
           threads, if the current number of threads is less than the new
           maximum limit. Since there may be some ongoing time-consuming jobs
           that would effectively block this API from killing any threads.
           Therefore, this API is asynchronous with best-effort execution,
           i.e. the necessary number of workers will be terminated once they
           finish their previous job, unless other workers had already
           terminated, decreasing the limit to the requested value.
    3) setting priority workers
        - both increase and decrease in count of these workers have an
          immediate impact on the current number of workers, new ones will be
          spawned or some of them get terminated respectively.

Signed-off-by: Erik Skultety <eskultet@xxxxxxxxxx>
---
 daemon/admin.c                  | 43 +++++++++++++++++++++++
 daemon/admin_server.c           | 43 +++++++++++++++++++++++
 daemon/admin_server.h           |  5 +++
 include/libvirt/libvirt-admin.h |  5 +++
 src/admin/admin_protocol.x      | 13 ++++++-
 src/admin/admin_remote.c        | 34 ++++++++++++++++++
 src/admin_protocol-structs      |  9 +++++
 src/libvirt-admin.c             | 37 ++++++++++++++++++++
 src/libvirt_admin_private.syms  |  1 +
 src/libvirt_admin_public.syms   |  1 +
 src/libvirt_private.syms        |  1 +
 src/rpc/virnetserver.c          | 15 ++++++++
 src/rpc/virnetserver.h          |  5 +++
 src/util/virthreadpool.c        | 77 +++++++++++++++++++++++++++++++++++++++--
 src/util/virthreadpool.h        |  5 +++
 15 files changed, 290 insertions(+), 4 deletions(-)

diff --git a/daemon/admin.c b/daemon/admin.c
index 530f6bf..eb961d3 100644
--- a/daemon/admin.c
+++ b/daemon/admin.c
@@ -178,4 +178,47 @@ adminDispatchServerGetThreadpoolParameters(virNetServerPtr server ATTRIBUTE_UNUS
     virObjectUnref(srv);
     return rv;
 }
+
+static int
+adminDispatchServerSetThreadpoolParameters(virNetServerPtr server ATTRIBUTE_UNUSED,
+                                           virNetServerClientPtr client,
+                                           virNetMessagePtr msg ATTRIBUTE_UNUSED,
+                                           virNetMessageErrorPtr rerr,
+                                           struct admin_server_set_threadpool_parameters_args *args)
+{
+    int rv = -1;
+    virNetServerPtr srv = NULL;
+    virTypedParameterPtr params = NULL;
+    int nparams = 0;
+    struct daemonAdmClientPrivate *priv =
+        virNetServerClientGetPrivateData(client);
+
+    if (!(srv = virNetDaemonGetServer(priv->dmn, args->server.name))) {
+        virReportError(VIR_ERR_NO_SERVER,
+                       _("no server with matching name '%s' found"),
+                       args->server.name);
+        goto cleanup;
+    }
+
+    if (virTypedParamsDeserialize((virTypedParameterRemotePtr) args->params.params_val,
+                                  args->params.params_len,
+                                  ADMIN_SERVER_THREADPOOL_PARAMETERS_MAX,
+                                  &params,
+                                  &nparams) < 0)
+        goto cleanup;
+
+
+    if (adminDaemonSetThreadPoolParameters(srv, params,
+                                           nparams, args->flags) < 0)
+        goto cleanup;
+
+    rv = 0;
+ cleanup:
+    if (rv < 0)
+        virNetMessageSaveError(rerr);
+
+    virTypedParamsFree(params, nparams);
+    virObjectUnref(srv);
+    return rv;
+}
 #include "admin_dispatch.h"
diff --git a/daemon/admin_server.c b/daemon/admin_server.c
index e7d48b8..573558e 100644
--- a/daemon/admin_server.c
+++ b/daemon/admin_server.c
@@ -32,6 +32,7 @@
 #include "virnetserver.h"
 #include "virstring.h"
 #include "virthreadpool.h"
+#include "virtypedparam.h"
 
 #define VIR_FROM_THIS VIR_FROM_ADMIN
 
@@ -135,3 +136,45 @@ adminDaemonGetThreadPoolParameters(virNetServerPtr srv,
     virTypedParamsFree(tmpparams, *nparams);
     return ret;
 }
+
+int
+adminDaemonSetThreadPoolParameters(virNetServerPtr srv,
+                                   virTypedParameterPtr params,
+                                   int nparams,
+                                   unsigned int flags)
+{
+    long long int minWorkers = -1;
+    long long int maxWorkers = -1;
+    long long int prioWorkers = -1;
+    virTypedParameterPtr param = NULL;
+
+    virCheckFlags(0, -1);
+
+    if (virTypedParamsValidate(params, nparams,
+                               VIR_THREADPOOL_WORKERS_MIN,
+                               VIR_TYPED_PARAM_UINT,
+                               VIR_THREADPOOL_WORKERS_MAX,
+                               VIR_TYPED_PARAM_UINT,
+                               VIR_THREADPOOL_WORKERS_PRIORITY,
+                               VIR_TYPED_PARAM_UINT,
+                               NULL) < 0)
+        return -1;
+
+    if ((param = virTypedParamsGet(params, nparams,
+                                  VIR_THREADPOOL_WORKERS_MIN)))
+        minWorkers = param->value.ui;
+
+    if ((param = virTypedParamsGet(params, nparams,
+                                  VIR_THREADPOOL_WORKERS_MAX)))
+        maxWorkers = param->value.ui;
+
+    if ((param = virTypedParamsGet(params, nparams,
+                                  VIR_THREADPOOL_WORKERS_PRIORITY)))
+        prioWorkers = param->value.ui;
+
+    if (virNetServerSetThreadPoolParameters(srv, minWorkers,
+                                            maxWorkers, prioWorkers) < 0)
+        return -1;
+
+    return 0;
+}
diff --git a/daemon/admin_server.h b/daemon/admin_server.h
index 7aa7727..076e21a 100644
--- a/daemon/admin_server.h
+++ b/daemon/admin_server.h
@@ -40,5 +40,10 @@ adminDaemonGetThreadPoolParameters(virNetServerPtr srv,
                                    virTypedParameterPtr *params,
                                    int *nparams,
                                    unsigned int flags);
+int
+adminDaemonSetThreadPoolParameters(virNetServerPtr srv,
+                                   virTypedParameterPtr params,
+                                   int nparams,
+                                   unsigned int flags);
 
 #endif /* __LIBVIRTD_ADMIN_SERVER_H__ */
diff --git a/include/libvirt/libvirt-admin.h b/include/libvirt/libvirt-admin.h
index 39c2c02..db17663 100644
--- a/include/libvirt/libvirt-admin.h
+++ b/include/libvirt/libvirt-admin.h
@@ -176,6 +176,11 @@ int virAdmServerGetThreadPoolParameters(virAdmServerPtr srv,
                                         int *nparams,
                                         unsigned int flags);
 
+int virAdmServerSetThreadPoolParameters(virAdmServerPtr srv,
+                                        virTypedParameterPtr params,
+                                        int nparams,
+                                        unsigned int flags);
+
 # ifdef __cplusplus
 }
 # endif
diff --git a/src/admin/admin_protocol.x b/src/admin/admin_protocol.x
index 46b1b5c..e2123b6 100644
--- a/src/admin/admin_protocol.x
+++ b/src/admin/admin_protocol.x
@@ -110,6 +110,12 @@ struct admin_server_get_threadpool_parameters_ret {
     admin_typed_param params<ADMIN_SERVER_THREADPOOL_PARAMETERS_MAX>;
 };
 
+struct admin_server_set_threadpool_parameters_args {
+    admin_nonnull_server server;
+    admin_typed_param params<ADMIN_SERVER_THREADPOOL_PARAMETERS_MAX>;
+    unsigned int flags;
+};
+
 /* Define the program number, protocol version and procedure numbers here. */
 const ADMIN_PROGRAM = 0x06900690;
 const ADMIN_PROTOCOL_VERSION = 1;
@@ -160,5 +166,10 @@ enum admin_procedure {
     /**
      * @generate: none
      */
-    ADMIN_PROC_SERVER_GET_THREADPOOL_PARAMETERS = 6
+    ADMIN_PROC_SERVER_GET_THREADPOOL_PARAMETERS = 6,
+
+    /**
+     * @generate: none
+     */
+    ADMIN_PROC_SERVER_SET_THREADPOOL_PARAMETERS = 7
 };
diff --git a/src/admin/admin_remote.c b/src/admin/admin_remote.c
index 64942e7..c2cb6b6 100644
--- a/src/admin/admin_remote.c
+++ b/src/admin/admin_remote.c
@@ -267,3 +267,37 @@ remoteAdminServerGetThreadPoolParameters(virAdmServerPtr srv,
     virObjectUnlock(priv);
     return rv;
 }
+
+static int
+remoteAdminServerSetThreadPoolParameters(virAdmServerPtr srv,
+                                         virTypedParameterPtr params,
+                                         int nparams,
+                                         unsigned int flags)
+{
+    remoteAdminPrivPtr priv = srv->conn->privateData;
+    int rv = -1;
+    admin_server_set_threadpool_parameters_args args;
+
+    args.flags = flags;
+    make_nonnull_server(&args.server, srv);
+
+    if (virTypedParamsSerialize(params, nparams,
+                                (virTypedParameterRemotePtr *) &args.params.params_val,
+                                &args.params.params_len,
+                                0) < 0)
+        goto cleanup;
+
+
+    if (call(srv->conn, 0, ADMIN_PROC_SERVER_SET_THREADPOOL_PARAMETERS,
+             (xdrproc_t)xdr_admin_server_set_threadpool_parameters_args, (char *) &args,
+             (xdrproc_t)xdr_void, (char *) NULL) == -1)
+        goto cleanup;
+
+    rv = 0;
+
+ cleanup:
+    virTypedParamsRemoteFree((virTypedParameterRemotePtr) args.params.params_val,
+                             args.params.params_len);
+    virObjectUnlock(priv);
+    return rv;
+}
diff --git a/src/admin_protocol-structs b/src/admin_protocol-structs
index 8a7bf88..d0a0f1a 100644
--- a/src/admin_protocol-structs
+++ b/src/admin_protocol-structs
@@ -61,6 +61,14 @@ struct admin_server_get_threadpool_parameters_ret {
                 admin_typed_param * params_val;
         } params;
 };
+struct admin_server_set_threadpool_parameters_args {
+        admin_nonnull_server       server;
+        struct {
+                u_int              params_len;
+                admin_typed_param * params_val;
+        } params;
+        u_int                      flags;
+};
 enum admin_procedure {
         ADMIN_PROC_CONNECT_OPEN = 1,
         ADMIN_PROC_CONNECT_CLOSE = 2,
@@ -68,4 +76,5 @@ enum admin_procedure {
         ADMIN_PROC_CONNECT_LIST_SERVERS = 4,
         ADMIN_PROC_CONNECT_LOOKUP_SERVER = 5,
         ADMIN_PROC_SERVER_GET_THREADPOOL_PARAMETERS = 6,
+        ADMIN_PROC_SERVER_SET_THREADPOOL_PARAMETERS = 7,
 };
diff --git a/src/libvirt-admin.c b/src/libvirt-admin.c
index c528f79..5a6c2a4 100644
--- a/src/libvirt-admin.c
+++ b/src/libvirt-admin.c
@@ -720,3 +720,40 @@ virAdmServerGetThreadPoolParameters(virAdmServerPtr srv,
     virDispatchError(NULL);
     return -1;
 }
+
+/**
+ * virAdmServerSetThreadPoolParameters:
+ * @srv: a valid server object reference
+ * @params: pointer to threadpool parameter objects
+ * @nparams: number of parameters in @params
+ * @flags: bitwise-OR of extra flags virAdmServerSetThreadPoolParametersFlags
+ *
+ * Change server threadpool parameters according to @params. Note that some
+ * tunables are read-only, thus any attempt to set them will result in a
+ * failure.
+ *
+ * Returns 0 on success, -1 in case of an error.
+ */
+int
+virAdmServerSetThreadPoolParameters(virAdmServerPtr srv,
+                                    virTypedParameterPtr params,
+                                    int nparams,
+                                    unsigned int flags)
+{
+    VIR_DEBUG("srv=%p, params=%p, nparams=%x, flags=%x",
+              srv, params, nparams, flags);
+
+    virResetLastError();
+
+    virCheckAdmServerReturn(srv, -1);
+    virCheckNonNullArgGoto(params, error);
+
+    if (remoteAdminServerSetThreadPoolParameters(srv, params,
+                                                 nparams, flags) < 0)
+        goto error;
+
+    return 0;
+ error:
+    virDispatchError(NULL);
+    return -1;
+}
diff --git a/src/libvirt_admin_private.syms b/src/libvirt_admin_private.syms
index b05067c..b150d8a 100644
--- a/src/libvirt_admin_private.syms
+++ b/src/libvirt_admin_private.syms
@@ -14,6 +14,7 @@ xdr_admin_connect_lookup_server_ret;
 xdr_admin_connect_open_args;
 xdr_admin_server_get_threadpool_parameters_args;
 xdr_admin_server_get_threadpool_parameters_ret;
+xdr_admin_server_set_threadpool_parameters_args;
 
 # datatypes.h
 virAdmConnectClass;
diff --git a/src/libvirt_admin_public.syms b/src/libvirt_admin_public.syms
index 0a12b5f..0a16444 100644
--- a/src/libvirt_admin_public.syms
+++ b/src/libvirt_admin_public.syms
@@ -26,4 +26,5 @@ LIBVIRT_ADMIN_1.3.0 {
         virAdmServerGetThreadPoolParameters;
         virAdmServerFree;
         virAdmConnectLookupServer;
+        virAdmServerSetThreadPoolParameters;
 };
diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms
index e9a2bce..824b1fa 100644
--- a/src/libvirt_private.syms
+++ b/src/libvirt_private.syms
@@ -2355,6 +2355,7 @@ virThreadPoolGetMinWorkers;
 virThreadPoolGetPriorityWorkers;
 virThreadPoolNewFull;
 virThreadPoolSendJob;
+virThreadPoolSetParameters;
 
 
 # util/virtime.h
diff --git a/src/rpc/virnetserver.c b/src/rpc/virnetserver.c
index 3878547..57bd95c 100644
--- a/src/rpc/virnetserver.c
+++ b/src/rpc/virnetserver.c
@@ -899,3 +899,18 @@ virNetServerGetThreadPoolParameters(virNetServerPtr srv,
     virObjectUnlock(srv);
     return 0;
 }
+
+int
+virNetServerSetThreadPoolParameters(virNetServerPtr srv,
+                                    long long int minWorkers,
+                                    long long int maxWorkers,
+                                    long long int prioWorkers)
+{
+    int ret;
+
+    virObjectLock(srv);
+    ret = virThreadPoolSetParameters(srv->workers, minWorkers,
+                                     maxWorkers, prioWorkers);
+    virObjectUnlock(srv);
+    return ret;
+}
diff --git a/src/rpc/virnetserver.h b/src/rpc/virnetserver.h
index 6f17d1c..8b304f6 100644
--- a/src/rpc/virnetserver.h
+++ b/src/rpc/virnetserver.h
@@ -97,4 +97,9 @@ int virNetServerGetThreadPoolParameters(virNetServerPtr srv,
                                         size_t *nPrioWorkers,
                                         size_t *jobQueueDepth);
 
+int virNetServerSetThreadPoolParameters(virNetServerPtr srv,
+                                        long long int minWorkers,
+                                        long long int maxWorkers,
+                                        long long int prioWorkers);
+
 #endif /* __VIR_NET_SERVER_H__ */
diff --git a/src/util/virthreadpool.c b/src/util/virthreadpool.c
index fec8620..c553477 100644
--- a/src/util/virthreadpool.c
+++ b/src/util/virthreadpool.c
@@ -73,6 +73,7 @@ struct _virThreadPool {
     size_t nWorkers;
     virThreadPtr workers;
 
+    size_t maxPrioWorkers;
     size_t nPrioWorkers;
     virThreadPtr prioWorkers;
     virCond prioCond;
@@ -84,12 +85,22 @@ struct virThreadPoolWorkerData {
     bool priority;
 };
 
+/* Test whether the worker needs to quit if the current number of workers @count
+ * is greater than @limit actually allows.
+ */
+static inline bool virThreadPoolWorkerQuitHelper(size_t count, size_t limit)
+{
+    return count > limit;
+}
+
 static void virThreadPoolWorker(void *opaque)
 {
     struct virThreadPoolWorkerData *data = opaque;
     virThreadPoolPtr pool = data->pool;
     virCondPtr cond = data->cond;
     bool priority = data->priority;
+    size_t *curWorkers = priority ? &pool->nPrioWorkers : &pool->nWorkers;
+    size_t *maxLimit = priority ? &pool->maxPrioWorkers : &pool->maxWorkers;
     virThreadPoolJobPtr job = NULL;
 
     VIR_FREE(data);
@@ -97,6 +108,14 @@ static void virThreadPoolWorker(void *opaque)
     virMutexLock(&pool->mutex);
 
     while (1) {
+        /* In order to support async worker termination, we need ensure that
+         * both busy and free workers know if they need to terminated. Thus,
+         * busy workers need to check for this fact before they start waiting for
+         * another job (and before taking another one from the queue); and
+         * free workers need to check for this right after waking up.
+         */
+        if (virThreadPoolWorkerQuitHelper(*curWorkers, *maxLimit))
+            goto out;
         while (!pool->quit &&
                ((!priority && !pool->jobList.head) ||
                 (priority && !pool->jobList.firstPrio))) {
@@ -109,6 +128,9 @@ static void virThreadPoolWorker(void *opaque)
             }
             if (!priority)
                 pool->freeWorkers--;
+
+            if (virThreadPoolWorkerQuitHelper(*curWorkers, *maxLimit))
+                goto out;
         }
 
         if (pool->quit)
@@ -160,12 +182,12 @@ static void virThreadPoolWorker(void *opaque)
 static int
 virThreadPoolExpand(virThreadPoolPtr pool, size_t gain, bool priority)
 {
-    virThreadPtr workers = priority ? pool->prioWorkers : pool->workers;
+    virThreadPtr *workers = priority ? &pool->prioWorkers : &pool->workers;
     size_t *curWorkers = priority ? &pool->nPrioWorkers : &pool->nWorkers;
     size_t i = 0;
     struct virThreadPoolWorkerData *data = NULL;
 
-    if (VIR_EXPAND_N(workers, *curWorkers, gain) < 0)
+    if (VIR_EXPAND_N(*workers, *curWorkers, gain) < 0)
         return -1;
 
     for (i = 0; i < gain; i++) {
@@ -176,7 +198,7 @@ virThreadPoolExpand(virThreadPoolPtr pool, size_t gain, bool priority)
         data->cond = priority ? &pool->prioCond : &pool->cond;
         data->priority = priority;
 
-        if (virThreadCreateFull(&workers[i],
+        if (virThreadCreateFull(&(*workers)[i],
                                 false,
                                 virThreadPoolWorker,
                                 pool->jobFuncName,
@@ -226,6 +248,7 @@ virThreadPoolNewFull(size_t minWorkers,
 
     pool->minWorkers = minWorkers;
     pool->maxWorkers = maxWorkers;
+    pool->maxPrioWorkers = prioWorkers;
 
     if (virThreadPoolExpand(pool, minWorkers, false) < 0)
         goto error;
@@ -399,3 +422,51 @@ int virThreadPoolSendJob(virThreadPoolPtr pool,
     virMutexUnlock(&pool->mutex);
     return -1;
 }
+
+int
+virThreadPoolSetParameters(virThreadPoolPtr pool,
+                           long long int minWorkers,
+                           long long int maxWorkers,
+                           long long int prioWorkers)
+{
+    virMutexLock(&pool->mutex);
+
+    if ((minWorkers >= 0 && minWorkers > pool->maxWorkers) ||
+        (maxWorkers >= 0 && maxWorkers < pool->minWorkers) ||
+        (minWorkers >= 0 && maxWorkers >= 0 && minWorkers > maxWorkers)) {
+            virReportError(VIR_ERR_INVALID_ARG, "%s",
+                           _("minWorkers cannot be larger than maxWorkers"));
+            goto error;
+    }
+
+    if (minWorkers >= 0) {
+        if ((size_t) minWorkers > pool->nWorkers &&
+            virThreadPoolExpand(pool, minWorkers - pool->nWorkers,
+                                false) < 0)
+            goto error;
+        pool->minWorkers = minWorkers;
+    }
+
+    if (maxWorkers >= 0) {
+        pool->maxWorkers = maxWorkers;
+        virCondBroadcast(&pool->cond);
+    }
+
+    if (prioWorkers >= 0) {
+        if (prioWorkers < pool->nPrioWorkers) {
+            virCondBroadcast(&pool->prioCond);
+        } else if ((size_t) prioWorkers > pool->nPrioWorkers &&
+                   virThreadPoolExpand(pool, prioWorkers - pool->nPrioWorkers,
+                                       true) < 0) {
+            goto error;
+        }
+        pool->maxPrioWorkers = prioWorkers;
+    }
+
+    virMutexUnlock(&pool->mutex);
+    return 0;
+
+ error:
+    virMutexUnlock(&pool->mutex);
+    return -1;
+}
diff --git a/src/util/virthreadpool.h b/src/util/virthreadpool.h
index bc0c907..e1f362f 100644
--- a/src/util/virthreadpool.h
+++ b/src/util/virthreadpool.h
@@ -57,4 +57,9 @@ int virThreadPoolSendJob(virThreadPoolPtr pool,
                          void *jobdata) ATTRIBUTE_NONNULL(1)
                                         ATTRIBUTE_RETURN_CHECK;
 
+int virThreadPoolSetParameters(virThreadPoolPtr pool,
+                               long long int minWorkers,
+                               long long int maxWorkers,
+                               long long int prioWorkers);
+
 #endif
-- 
2.4.11

--
libvir-list mailing list
libvir-list@xxxxxxxxxx
https://www.redhat.com/mailman/listinfo/libvir-list



[Index of Archives]     [Virt Tools]     [Libvirt Users]     [Lib OS Info]     [Fedora Users]     [Fedora Desktop]     [Fedora SELinux]     [Big List of Linux Books]     [Yosemite News]     [KDE Users]     [Fedora Tools]