Re: [PATCH v3 1/5] Add a threadpool implementation

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Wed, Dec 01, 2010 at 05:32:44PM +0800, Hu Tao wrote:
> Hi Eric,
> 
> Thanks for your careful review of these patches. I'll post v4 patches
> tomorrow fixing all problems you pointed out.
> 
> > 
> > daemon/libvirtd.c already has a notion of worker threads; I'm wondering
> > how much overlap there is between your implementation and that one.  A
> > better proof that this would be a useful API addition would be to have
> > the next patch in the series convert libvirtd.c over to using this API.
> 
> OK. Will be in v4.
> 
> <...snip...>
> 
> > > +int virWorkerPoolSetMaxWorker(struct virWorkerPool *pool, int maxWorker)
> > > +{
> > > +    if (maxWorker < 0)
> > > +        return -1;
> > > +
> > > +    pthread_mutex_lock(&pool->mutex);
> > > +    pool->nMaxWorker = maxWorker;
> > > +    pthread_mutex_unlock(&pool->mutex);
> > 
> > Does this do the right thing if maxWorker < pool->nMaxWorker, or does it
> > silently lose existing workers?
> 
> In the case maxWorker < pool->nMaxWorker and there are pool->nMaxWorker
> threads running, (pool->nMaxWorker - maxWorker) threads will exit after
> the new nMaxWorker set.
> 
> <...snip...>
>  
> > > +
> > > +typedef void (*virWorkerFunc)(void *);
> > 
> > pthread_create() takes a function that can return void*.  Should worker
> > functions be allowed to return a value?
> 
> threadpool doesn't care the return value, neither it has no way to pass
> the return value to threadpool creator, so it's meaningless for worker
> functions to return a value.
> 
> Another example is virThreadFunc which does't return a value neither.

I've needed a thread pool implementation for an unrelated piece
of work I'm doing on libvirt. I took your impl here, and updated
it to follow libvirt naming style, use appropriate internals APIs,
and hide the struct definitions from the header. Take a look at
the files attached.

Regards,
Daniel
#ifndef __THREADPOOL_H__
#define __THREADPOOL_H__

#include <pthread.h>

typedef struct _virThreadPool virThreadPool;
typedef virThreadPool *virThreadPoolPtr;

typedef void (*virThreadPoolJobFunc)(void *jobdata, void *opaque);

virThreadPoolPtr virThreadPoolNew(size_t minWorkers,
                                  size_t maxWorkers,
                                  virThreadPoolJobFunc func,
                                  void *opaque);

void virThreadPoolShutdown(virThreadPoolPtr pool);

void virThreadPoolFree(virThreadPoolPtr pool);

int virThreadPoolSendJob(virThreadPoolPtr pool,
                         void *jobdata);

#endif
#include <config.h>

#include "threadpool.h"
#include "memory.h"
#include "threads.h"
#include "virterror_internal.h"

#define VIR_FROM_THIS VIR_FROM_NONE

typedef struct _virThreadPoolJob virThreadPoolJob;
typedef virThreadPoolJob *virThreadPoolJobPtr;

struct _virThreadPoolJob {
    virThreadPoolJobPtr next;

    void *data;
};

struct _virThreadPool {
    int quit;

    virThreadPoolJobFunc jobFunc;
    void *jobOpaque;
    virThreadPoolJobPtr jobList;

    virMutex mutex;
    virCond cond;
    virCond quit_cond;

    size_t maxWorkers;
    size_t freeWorkers;
    size_t nWorkers;
    virThreadPtr workers;
};

static void virThreadPoolWorker(void *opaque)
{
    virThreadPoolPtr pool = opaque;

    virMutexLock(&pool->mutex);

    while (1) {
        while (!pool->quit &&
               !pool->jobList) {
            pool->freeWorkers++;
            if (virCondWait(&pool->cond, &pool->mutex) < 0) {
                pool->freeWorkers--;
                break;
            }
            pool->freeWorkers--;
        }

        if (pool->quit)
            break;

        virThreadPoolJobPtr job = pool->jobList;
        pool->jobList = pool->jobList->next;
        job->next = NULL;

        virMutexUnlock(&pool->mutex);
        (pool->jobFunc)(job->data, pool->jobOpaque);
        VIR_FREE(job);
        virMutexLock(&pool->mutex);
    }

    pool->nWorkers--;
    if (pool->nWorkers == 0)
        virCondSignal(&pool->quit_cond);
    virMutexUnlock(&pool->mutex);
}

virThreadPoolPtr virThreadPoolNew(size_t minWorkers,
                                  size_t maxWorkers,
                                  virThreadPoolJobFunc func,
                                  void *opaque)
{
    virThreadPoolPtr pool;
    int i;

    if (VIR_ALLOC(pool) < 0) {
        virReportOOMError();
        return NULL;
    }

    pool->jobFunc = func;
    pool->jobOpaque = opaque;

    if (virMutexInit(&pool->mutex) < 0)
        goto error;
    if (virCondInit(&pool->cond) < 0)
        goto error;
    if (virCondInit(&pool->quit_cond) < 0)
        goto error;

    if (VIR_ALLOC_N(pool->workers, minWorkers) < 0)
        goto error;

    pool->maxWorkers = maxWorkers;
    for (i = 0; i < minWorkers; i++) {
        if (virThreadCreate(&pool->workers[i],
                            true,
                            virThreadPoolWorker,
                            pool) < 0)
            goto error;
        pool->nWorkers++;
    }

    return pool;

error:
    virThreadPoolFree(pool);
    return NULL;
}

void virThreadPoolFree(virThreadPoolPtr pool)
{
    virMutexLock(&pool->mutex);
    pool->quit = 1;
    if (pool->nWorkers > 0) {
        virCondBroadcast(&pool->cond);
        if (virCondWait(&pool->quit_cond, &pool->mutex) < 0)
        {}
    }
    VIR_FREE(pool->workers);
    virMutexUnlock(&pool->mutex);
    VIR_FREE(pool);
}

int virThreadPoolSendJob(virThreadPoolPtr pool,
                         void *jobData)
{
    virThreadPoolJobPtr job;
    virThreadPoolJobPtr tmp;

    virMutexLock(&pool->mutex);
    if (pool->quit)
        goto error;

    if (pool->freeWorkers == 0 &&
        pool->nWorkers < pool->maxWorkers) {
        if (VIR_EXPAND_N(pool->workers, pool->nWorkers, 1) < 0) {
            virReportOOMError();
            goto error;
        }

        if (virThreadCreate(&pool->workers[pool->nWorkers],
                            true,
                            virThreadPoolWorker,
                            pool) < 0)
            goto error;
        pool->nWorkers++;
    }

    if (VIR_ALLOC(job) < 0) {
        virReportOOMError();
        goto error;
    }

    job->data = jobData;

    tmp = pool->jobList;
    while (tmp && tmp->next)
        tmp = tmp->next;
    if (tmp)
        tmp->next = job;
    else
        pool->jobList = job;

    virCondSignal(&pool->cond);
    virMutexUnlock(&pool->mutex);

    return 0;

error:
    virMutexUnlock(&pool->mutex);
    return -1;
}
--
libvir-list mailing list
libvir-list@xxxxxxxxxx
https://www.redhat.com/mailman/listinfo/libvir-list

[Index of Archives]     [Virt Tools]     [Libvirt Users]     [Lib OS Info]     [Fedora Users]     [Fedora Desktop]     [Fedora SELinux]     [Big List of Linux Books]     [Yosemite News]     [KDE Users]     [Fedora Tools]