On Wed, Dec 01, 2010 at 05:32:44PM +0800, Hu Tao wrote:
> Hi Eric,
>
> Thanks for your careful review of these patches. I'll post v4 patches
> tomorrow fixing all problems you pointed out.
>
> >
> > daemon/libvirtd.c already has a notion of worker threads; I'm wondering
> > how much overlap there is between your implementation and that one. A
> > better proof that this would be a useful API addition would be to have
> > the next patch in the series convert libvirtd.c over to using this API.
>
> OK. Will be in v4.
>
> <...snip...>
>
> > > +int virWorkerPoolSetMaxWorker(struct virWorkerPool *pool, int maxWorker)
> > > +{
> > > + if (maxWorker < 0)
> > > + return -1;
> > > +
> > > + pthread_mutex_lock(&pool->mutex);
> > > + pool->nMaxWorker = maxWorker;
> > > + pthread_mutex_unlock(&pool->mutex);
> >
> > Does this do the right thing if maxWorker < pool->nMaxWorker, or does it
> > silently lose existing workers?
>
> In the case maxWorker < pool->nMaxWorker and there are pool->nMaxWorker
> threads running, (pool->nMaxWorker - maxWorker) threads will exit after
> the new nMaxWorker set.
>
> <...snip...>
>
> > > +
> > > +typedef void (*virWorkerFunc)(void *);
> >
> > pthread_create() takes a function that can return void*. Should worker
> > functions be allowed to return a value?
>
> threadpool doesn't care the return value, neither it has no way to pass
> the return value to threadpool creator, so it's meaningless for worker
> functions to return a value.
>
> Another example is virThreadFunc which does't return a value neither.
I've needed a thread pool implementation for an unrelated piece
of work I'm doing on libvirt. I took your impl here, and updated
it to follow libvirt naming style, use appropriate internals APIs,
and hide the struct definitions from the header. Take a look at
the files attached.
Regards,
Daniel
#ifndef __THREADPOOL_H__
#define __THREADPOOL_H__
#include <pthread.h>
typedef struct _virThreadPool virThreadPool;
typedef virThreadPool *virThreadPoolPtr;
typedef void (*virThreadPoolJobFunc)(void *jobdata, void *opaque);
virThreadPoolPtr virThreadPoolNew(size_t minWorkers,
size_t maxWorkers,
virThreadPoolJobFunc func,
void *opaque);
void virThreadPoolShutdown(virThreadPoolPtr pool);
void virThreadPoolFree(virThreadPoolPtr pool);
int virThreadPoolSendJob(virThreadPoolPtr pool,
void *jobdata);
#endif
#include <config.h>
#include "threadpool.h"
#include "memory.h"
#include "threads.h"
#include "virterror_internal.h"
#define VIR_FROM_THIS VIR_FROM_NONE
typedef struct _virThreadPoolJob virThreadPoolJob;
typedef virThreadPoolJob *virThreadPoolJobPtr;
struct _virThreadPoolJob {
virThreadPoolJobPtr next;
void *data;
};
struct _virThreadPool {
int quit;
virThreadPoolJobFunc jobFunc;
void *jobOpaque;
virThreadPoolJobPtr jobList;
virMutex mutex;
virCond cond;
virCond quit_cond;
size_t maxWorkers;
size_t freeWorkers;
size_t nWorkers;
virThreadPtr workers;
};
static void virThreadPoolWorker(void *opaque)
{
virThreadPoolPtr pool = opaque;
virMutexLock(&pool->mutex);
while (1) {
while (!pool->quit &&
!pool->jobList) {
pool->freeWorkers++;
if (virCondWait(&pool->cond, &pool->mutex) < 0) {
pool->freeWorkers--;
break;
}
pool->freeWorkers--;
}
if (pool->quit)
break;
virThreadPoolJobPtr job = pool->jobList;
pool->jobList = pool->jobList->next;
job->next = NULL;
virMutexUnlock(&pool->mutex);
(pool->jobFunc)(job->data, pool->jobOpaque);
VIR_FREE(job);
virMutexLock(&pool->mutex);
}
pool->nWorkers--;
if (pool->nWorkers == 0)
virCondSignal(&pool->quit_cond);
virMutexUnlock(&pool->mutex);
}
virThreadPoolPtr virThreadPoolNew(size_t minWorkers,
size_t maxWorkers,
virThreadPoolJobFunc func,
void *opaque)
{
virThreadPoolPtr pool;
int i;
if (VIR_ALLOC(pool) < 0) {
virReportOOMError();
return NULL;
}
pool->jobFunc = func;
pool->jobOpaque = opaque;
if (virMutexInit(&pool->mutex) < 0)
goto error;
if (virCondInit(&pool->cond) < 0)
goto error;
if (virCondInit(&pool->quit_cond) < 0)
goto error;
if (VIR_ALLOC_N(pool->workers, minWorkers) < 0)
goto error;
pool->maxWorkers = maxWorkers;
for (i = 0; i < minWorkers; i++) {
if (virThreadCreate(&pool->workers[i],
true,
virThreadPoolWorker,
pool) < 0)
goto error;
pool->nWorkers++;
}
return pool;
error:
virThreadPoolFree(pool);
return NULL;
}
void virThreadPoolFree(virThreadPoolPtr pool)
{
virMutexLock(&pool->mutex);
pool->quit = 1;
if (pool->nWorkers > 0) {
virCondBroadcast(&pool->cond);
if (virCondWait(&pool->quit_cond, &pool->mutex) < 0)
{}
}
VIR_FREE(pool->workers);
virMutexUnlock(&pool->mutex);
VIR_FREE(pool);
}
int virThreadPoolSendJob(virThreadPoolPtr pool,
void *jobData)
{
virThreadPoolJobPtr job;
virThreadPoolJobPtr tmp;
virMutexLock(&pool->mutex);
if (pool->quit)
goto error;
if (pool->freeWorkers == 0 &&
pool->nWorkers < pool->maxWorkers) {
if (VIR_EXPAND_N(pool->workers, pool->nWorkers, 1) < 0) {
virReportOOMError();
goto error;
}
if (virThreadCreate(&pool->workers[pool->nWorkers],
true,
virThreadPoolWorker,
pool) < 0)
goto error;
pool->nWorkers++;
}
if (VIR_ALLOC(job) < 0) {
virReportOOMError();
goto error;
}
job->data = jobData;
tmp = pool->jobList;
while (tmp && tmp->next)
tmp = tmp->next;
if (tmp)
tmp->next = job;
else
pool->jobList = job;
virCondSignal(&pool->cond);
virMutexUnlock(&pool->mutex);
return 0;
error:
virMutexUnlock(&pool->mutex);
return -1;
}
--
libvir-list mailing list
libvir-list@xxxxxxxxxx
https://www.redhat.com/mailman/listinfo/libvir-list