Re: [PATCH 1/8] Thread pool impl

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Wed, Dec 01, 2010 at 05:26:27PM +0000, Daniel P. Berrange wrote:
> From: Hu Tao <hutao@xxxxxxxxxxxxxx>
> 
> * src/util/threadpool.c, src/util/threadpool.h: Thread pool
>   implementation
> * src/Makefile.am: Build thread pool
> ---
>  src/Makefile.am       |    1 +
>  src/util/threadpool.c |  178 +++++++++++++++++++++++++++++++++++++++++++++++++
>  src/util/threadpool.h |   23 ++++++
>  3 files changed, 202 insertions(+), 0 deletions(-)
>  create mode 100644 src/util/threadpool.c
>  create mode 100644 src/util/threadpool.h
> 
> diff --git a/src/Makefile.am b/src/Makefile.am
> index a9a1986..d71c644 100644
> --- a/src/Makefile.am
> +++ b/src/Makefile.am
> @@ -73,6 +73,7 @@ UTIL_SOURCES =							\
>  		util/threads.c util/threads.h			\
>  		util/threads-pthread.h				\
>  		util/threads-win32.h				\
> +		util/threadpool.c util/threadpool.h		\
>  		util/uuid.c util/uuid.h				\
>  		util/util.c util/util.h				\
>  		util/xml.c util/xml.h				\
> diff --git a/src/util/threadpool.c b/src/util/threadpool.c
> new file mode 100644
> index 0000000..cf998bf
> --- /dev/null
> +++ b/src/util/threadpool.c
> @@ -0,0 +1,178 @@
> +
> +#include <config.h>
> +
> +#include "threadpool.h"
> +#include "memory.h"
> +#include "threads.h"
> +#include "virterror_internal.h"
> +
> +#define VIR_FROM_THIS VIR_FROM_NONE
> +
> +typedef struct _virThreadPoolJob virThreadPoolJob;
> +typedef virThreadPoolJob *virThreadPoolJobPtr;
> +
> +struct _virThreadPoolJob {
> +    virThreadPoolJobPtr next;
> +
> +    void *data;
> +};
> +
> +struct _virThreadPool {
> +    int quit;
> +
> +    virThreadPoolJobFunc jobFunc;
> +    void *jobOpaque;
> +    virThreadPoolJobPtr jobList;
> +
> +    virMutex mutex;
> +    virCond cond;
> +    virCond quit_cond;
> +
> +    size_t maxWorkers;
> +    size_t freeWorkers;
> +    size_t nWorkers;
> +    virThreadPtr workers;
> +};
> +
> +static void virThreadPoolWorker(void *opaque)
> +{
> +    virThreadPoolPtr pool = opaque;
> +
> +    virMutexLock(&pool->mutex);
> +
> +    while (1) {
> +        while (!pool->quit &&
> +               !pool->jobList) {
> +            pool->freeWorkers++;
> +            if (virCondWait(&pool->cond, &pool->mutex) < 0) {
> +                pool->freeWorkers--;
> +                break;
> +            }
> +            pool->freeWorkers--;
> +        }
> +
> +        if (pool->quit)
> +            break;
> +
> +        virThreadPoolJobPtr job = pool->jobList;
> +        pool->jobList = pool->jobList->next;
> +        job->next = NULL;
> +
> +        virMutexUnlock(&pool->mutex);
> +        (pool->jobFunc)(job->data, pool->jobOpaque);

This could race if jobFunc does something with jobOpaque unless jobFunc
is aware of this and provides a lock to protect jobOpaque.

> +        VIR_FREE(job);
> +        virMutexLock(&pool->mutex);
> +    }
> +
> +    pool->nWorkers--;
> +    if (pool->nWorkers == 0)
> +        virCondSignal(&pool->quit_cond);
> +    virMutexUnlock(&pool->mutex);
> +}
> +
> +virThreadPoolPtr virThreadPoolNew(size_t minWorkers,
> +                                  size_t maxWorkers,
> +                                  virThreadPoolJobFunc func,
> +                                  void *opaque)
> +{
> +    virThreadPoolPtr pool;
> +    int i;
> +
> +    if (VIR_ALLOC(pool) < 0) {
> +        virReportOOMError();
> +        return NULL;
> +    }
> +
> +    pool->jobFunc = func;
> +    pool->jobOpaque = opaque;
> +
> +    if (virMutexInit(&pool->mutex) < 0)
> +        goto error;
> +    if (virCondInit(&pool->cond) < 0)
> +        goto error;
> +    if (virCondInit(&pool->quit_cond) < 0)
> +        goto error;
> +
> +    if (VIR_ALLOC_N(pool->workers, minWorkers) < 0)
> +        goto error;
> +
> +    pool->maxWorkers = maxWorkers;
> +    for (i = 0; i < minWorkers; i++) {
> +        if (virThreadCreate(&pool->workers[i],
> +                            true,
> +                            virThreadPoolWorker,
> +                            pool) < 0)
> +            goto error;
> +        pool->nWorkers++;
> +    }

There will be more than maxWorkers threads created if
minWorkers > maxWorkers

> +
> +    return pool;
> +
> +error:
> +    virThreadPoolFree(pool);
> +    return NULL;
> +}
> +
> +void virThreadPoolFree(virThreadPoolPtr pool)
> +{
> +    virMutexLock(&pool->mutex);
> +    pool->quit = 1;
> +    if (pool->nWorkers > 0) {
> +        virCondBroadcast(&pool->cond);
> +        if (virCondWait(&pool->quit_cond, &pool->mutex) < 0)
> +        {}
> +    }
> +    VIR_FREE(pool->workers);
> +    virMutexUnlock(&pool->mutex);
> +    VIR_FREE(pool);
> +}
> +
> +int virThreadPoolSendJob(virThreadPoolPtr pool,
> +                         void *jobData)
> +{
> +    virThreadPoolJobPtr job;
> +    virThreadPoolJobPtr tmp;
> +
> +    virMutexLock(&pool->mutex);
> +    if (pool->quit)
> +        goto error;
> +
> +    if (pool->freeWorkers == 0 &&
> +        pool->nWorkers < pool->maxWorkers) {
> +        if (VIR_EXPAND_N(pool->workers, pool->nWorkers, 1) < 0) {
> +            virReportOOMError();
> +            goto error;
> +        }
> +
> +        if (virThreadCreate(&pool->workers[pool->nWorkers],
> +                            true,
> +                            virThreadPoolWorker,
> +                            pool) < 0)
> +            goto error;
> +        pool->nWorkers++;
> +    }
> +
> +    if (VIR_ALLOC(job) < 0) {
> +        virReportOOMError();
> +        goto error;
> +    }
> +
> +    job->data = jobData;
> +
> +    tmp = pool->jobList;
> +    while (tmp && tmp->next)
> +        tmp = tmp->next;
> +    if (tmp)
> +        tmp->next = job;
> +    else
> +        pool->jobList = job;
> +
> +    virCondSignal(&pool->cond);
> +    virMutexUnlock(&pool->mutex);
> +
> +    return 0;
> +
> +error:
> +    virMutexUnlock(&pool->mutex);
> +    return -1;
> +}
> diff --git a/src/util/threadpool.h b/src/util/threadpool.h
> new file mode 100644
> index 0000000..093786f
> --- /dev/null
> +++ b/src/util/threadpool.h
> @@ -0,0 +1,23 @@
> +#ifndef __THREADPOOL_H__
> +#define __THREADPOOL_H__
> +
> +#include <pthread.h>
> +
> +typedef struct _virThreadPool virThreadPool;
> +typedef virThreadPool *virThreadPoolPtr;
> +
> +typedef void (*virThreadPoolJobFunc)(void *jobdata, void *opaque);
> +
> +virThreadPoolPtr virThreadPoolNew(size_t minWorkers,
> +                                  size_t maxWorkers,
> +                                  virThreadPoolJobFunc func,
> +                                  void *opaque);
> +
> +void virThreadPoolShutdown(virThreadPoolPtr pool);
> +
> +void virThreadPoolFree(virThreadPoolPtr pool);
> +
> +int virThreadPoolSendJob(virThreadPoolPtr pool,
> +                         void *jobdata);
> +
> +#endif
> -- 
> 1.7.2.3

--
libvir-list mailing list
libvir-list@xxxxxxxxxx
https://www.redhat.com/mailman/listinfo/libvir-list


[Index of Archives]     [Virt Tools]     [Libvirt Users]     [Lib OS Info]     [Fedora Users]     [Fedora Desktop]     [Fedora SELinux]     [Big List of Linux Books]     [Yosemite News]     [KDE Users]     [Fedora Tools]