Re: [PATCH v3 1/5] Add a threadpool implementation

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On 11/30/2010 12:14 AM, Hu Tao wrote:
> ---
>  src/Makefile.am       |    1 +
>  src/util/threadpool.c |  140 +++++++++++++++++++++++++++++++++++++++++++++++++
>  src/util/threadpool.h |   35 ++++++++++++
>  3 files changed, 176 insertions(+), 0 deletions(-)
>  create mode 100644 src/util/threadpool.c
>  create mode 100644 src/util/threadpool.h
> 
> diff --git a/src/Makefile.am b/src/Makefile.am
> index a9a1986..5febd76 100644
> --- a/src/Makefile.am
> +++ b/src/Makefile.am
> @@ -76,6 +76,7 @@ UTIL_SOURCES =							\
>  		util/uuid.c util/uuid.h				\
>  		util/util.c util/util.h				\
>  		util/xml.c util/xml.h				\
> +		util/threadpool.c util/threadpool.h		\
>  		util/virtaudit.c util/virtaudit.h               \
>  		util/virterror.c util/virterror_internal.h
>  
> diff --git a/src/util/threadpool.c b/src/util/threadpool.c
> new file mode 100644
> index 0000000..4bf0f8d
> --- /dev/null
> +++ b/src/util/threadpool.c
> @@ -0,0 +1,140 @@

Copyright header?

> +#include <config.h>
> +#include <stdio.h>
> +#include <stdlib.h>
> +#include <string.h>
> +
> +#include "threadpool.h"
> +
> +static void *workerHandleJob(void *data)
> +{
> +    struct virData *localData = NULL;
> +    struct virWorkerPool *pool = data;
> +
> +    pthread_mutex_lock(&pool->mutex);

We should be using virMutexLock here, so as to also be portable to mingw.

> +
> +    while (1) {
> +        while (!pool->quit && !pool->dataList) {
> +            pool->nFreeWorker++;
> +            pthread_cond_signal(&pool->worker_cond);

Likewise, virCondSignal here.

> +            pthread_cond_wait(&pool->cond, &pool->mutex);

and virCondWait.

> +            pool->nFreeWorker--;
> +
> +            if (pool->nWorker > pool->nMaxWorker)
> +                goto out;
> +        }
> +
> +        while ((localData = pool->dataList) != NULL) {
> +            pool->dataList = pool->dataList->next;
> +            localData->next = NULL;
> +
> +            pthread_mutex_unlock(&pool->mutex);
> +
> +            (pool->func)(localData->data);
> +            free(localData);

VIR_FREE().

> +
> +            pthread_mutex_lock(&pool->mutex);
> +        }
> +
> +        if (pool->quit)
> +            break;
> +    }
> +
> +out:
> +    pool->nWorker--;
> +    if (pool->nWorker == 0)
> +        pthread_cond_signal(&pool->quit_cond);
> +    pthread_mutex_unlock(&pool->mutex);
> +
> +    return NULL;
> +}
> +
> +struct virWorkerPool *virWorkerPoolNew(int nWorker, int maxWorker, virWorkerFunc func)
> +{
> +    struct virWorkerPool *pool;
> +    pthread_t pid;
> +    int i;
> +
> +    if (nWorker < 0)
> +        return NULL;
> +
> +    if (nWorker > maxWorker)
> +        return NULL;

daemon/libvirtd.c already has a notion of worker threads; I'm wondering
how much overlap there is between your implementation and that one.  A
better proof that this would be a useful API addition would be to have
the next patch in the series convert libvirtd.c over to using this API.

> +
> +    pool = malloc(sizeof(*pool));

Run 'make syntax-check' - it would have complained about this.  Use
VIR_ALLOC or VIR_ALLOC_N instead of malloc.

> +    if (!pool)
> +        return NULL;
> +
> +    memset(pool, 0, sizeof(*pool));
> +    pool->func = func;
> +    pthread_mutex_init(&pool->mutex, NULL);

virMutexInit()

> +    pthread_cond_init(&pool->cond, NULL);
> +    pthread_cond_init(&pool->worker_cond, NULL);
> +    pthread_cond_init(&pool->quit_cond, NULL);

virCondInit()

> +
> +    for (i = 0; i < nWorker; i++) {
> +        pthread_create(&pid, NULL, workerHandleJob, pool);

virThreadCreate()

> +    }
> +
> +    pool->nFreeWorker = 0;
> +    pool->nWorker = nWorker;
> +    pool->nMaxWorker = maxWorker;
> +
> +    return pool;
> +}
> +
> +void virWorkerPoolFree(struct virWorkerPool *pool)
> +{
> +    pthread_mutex_lock(&pool->mutex);
> +    pool->quit = 1;

Use <stdbool.h> and bool if a value will only ever be 0 or 1.

> +    if (pool->nWorker > 0) {
> +        pthread_cond_broadcast(&pool->cond);
> +        pthread_cond_wait(&pool->quit_cond, &pool->mutex);
> +    }
> +    pthread_mutex_unlock(&pool->mutex);
> +    free(pool);

VIR_FREE()

> +}
> +
> +int virWorkerPoolSendJob(struct virWorkerPool *pool, void *data)
> +{
> +    pthread_t pid;
> +    struct virData *localData;
> +
> +    localData = malloc(sizeof(*localData));

VIR_ALLOC()

> +    if (!localData)
> +        return -1;
> +
> +    localData->data = data;
> +
> +    pthread_mutex_lock(&pool->mutex);
> +    if (pool->quit) {
> +        pthread_mutex_unlock(&pool->mutex);
> +        free(localData);
> +        return -1;
> +    }
> +
> +    localData->next = pool->dataList;
> +    pool->dataList = localData;
> +
> +    if (pool->nFreeWorker == 0 && pool->nWorker < pool->nMaxWorker) {
> +        pthread_create(&pid, NULL, workerHandleJob, pool);
> +        pool->nWorker++;
> +    }
> +
> +    pthread_cond_signal(&pool->cond);
> +
> +    pthread_mutex_unlock(&pool->mutex);
> +
> +    return 0;
> +}
> +
> +int virWorkerPoolSetMaxWorker(struct virWorkerPool *pool, int maxWorker)
> +{
> +    if (maxWorker < 0)
> +        return -1;
> +
> +    pthread_mutex_lock(&pool->mutex);
> +    pool->nMaxWorker = maxWorker;
> +    pthread_mutex_unlock(&pool->mutex);

Does this do the right thing if maxWorker < pool->nMaxWorker, or does it
silently lose existing workers?

> +
> +    return 0;
> +}
> diff --git a/src/util/threadpool.h b/src/util/threadpool.h
> new file mode 100644
> index 0000000..5ff3a6b
> --- /dev/null
> +++ b/src/util/threadpool.h
> @@ -0,0 +1,35 @@

Copyright header?

> +#ifndef __THREADPOOL_H__
> +#define __THREADPOOL_H__

Use of the __ namespace risks collision with the system; I'd feel better
if this were __VIR_THREADPOOL_H__.

> +
> +#include <pthread.h>

"threads.h", not <pthread.h>, so we can support mingw

> +
> +typedef void (*virWorkerFunc)(void *);

pthread_create() takes a function that can return void*.  Should worker
functions be allowed to return a value?

> +
> +struct virData {
> +    struct virData *next;
> +
> +    void *data;
> +};

We've typically used typedefs to avoid having to type 'struct virData'
everywhere else.

> +
> +struct virWorkerPool {
> +    int nWorker;
> +    int nMaxWorker;
> +    int nFreeWorker;

s/int/size_t/ when dealing with non-zero counts.

> +
> +    int quit;

s/int/bool/

> +
> +    virWorkerFunc func;
> +    struct virData *dataList;
> +
> +    pthread_mutex_t mutex;
> +    pthread_cond_t cond;
> +    pthread_cond_t worker_cond;
> +    pthread_cond_t quit_cond;

virMutex, virCond

> +};
> +
> +struct virWorkerPool *virWorkerPoolNew(int nWorker, int nMaxWorker, virWorkerFunc func);

needs ATTRIBUTE_RETURN_CHECK.

> +void virWorkerPoolFree(struct virWorkerPool *pool);
> +int virWorkerPoolSendJob(struct virWorkerPool *wp, void *data);

ATTRIBUTE_NONNULL(1)

> +int virWorkerPoolSetMaxWorker(struct virWorkerPool *pool, int maxWorker);

ATTRIBUTE_NONNULL(1)

> +
> +#endif

-- 
Eric Blake   eblake@xxxxxxxxxx    +1-801-349-2682
Libvirt virtualization library http://libvirt.org

Attachment: signature.asc
Description: OpenPGP digital signature

--
libvir-list mailing list
libvir-list@xxxxxxxxxx
https://www.redhat.com/mailman/listinfo/libvir-list

[Index of Archives]     [Virt Tools]     [Libvirt Users]     [Lib OS Info]     [Fedora Users]     [Fedora Desktop]     [Fedora SELinux]     [Big List of Linux Books]     [Yosemite News]     [KDE Users]     [Fedora Tools]