Signed-off-by: Rafael Fonseca <r4f4rfs@xxxxxxxxx> --- src/util/virthreadpool.c | 149 +++++++++++++-------------------------- 1 file changed, 50 insertions(+), 99 deletions(-) diff --git a/src/util/virthreadpool.c b/src/util/virthreadpool.c index 379d2369ad..147943f395 100644 --- a/src/util/virthreadpool.c +++ b/src/util/virthreadpool.c @@ -59,9 +59,9 @@ struct _virThreadPool { virThreadPoolJobList jobList; size_t jobQueueDepth; - virMutex mutex; - virCond cond; - virCond quit_cond; + GMutex mutex; + GCond cond; + GCond quit_cond; size_t maxWorkers; size_t minWorkers; @@ -72,12 +72,12 @@ struct _virThreadPool { size_t maxPrioWorkers; size_t nPrioWorkers; virThreadPtr prioWorkers; - virCond prioCond; + GCond prioCond; }; struct virThreadPoolWorkerData { virThreadPoolPtr pool; - virCondPtr cond; + GCond *cond; bool priority; }; @@ -93,7 +93,7 @@ static void virThreadPoolWorker(void *opaque) { struct virThreadPoolWorkerData *data = opaque; virThreadPoolPtr pool = data->pool; - virCondPtr cond = data->cond; + GCond *cond = data->cond; bool priority = data->priority; size_t *curWorkers = priority ? &pool->nPrioWorkers : &pool->nWorkers; size_t *maxLimit = priority ? &pool->maxPrioWorkers : &pool->maxWorkers; @@ -101,7 +101,7 @@ static void virThreadPoolWorker(void *opaque) VIR_FREE(data); - virMutexLock(&pool->mutex); + g_mutex_lock(&pool->mutex); while (1) { /* In order to support async worker termination, we need ensure that @@ -117,11 +117,7 @@ static void virThreadPoolWorker(void *opaque) (priority && !pool->jobList.firstPrio))) { if (!priority) pool->freeWorkers++; - if (virCondWait(cond, &pool->mutex) < 0) { - if (!priority) - pool->freeWorkers--; - goto out; - } + g_cond_wait(cond, &pool->mutex); if (!priority) pool->freeWorkers--; @@ -159,10 +155,10 @@ static void virThreadPoolWorker(void *opaque) pool->jobQueueDepth--; - virMutexUnlock(&pool->mutex); + g_mutex_unlock(&pool->mutex); (pool->jobFunc)(job->data, pool->jobOpaque); VIR_FREE(job); - virMutexLock(&pool->mutex); + g_mutex_lock(&pool->mutex); } out: @@ -171,8 +167,8 @@ static void virThreadPoolWorker(void *opaque) else pool->nWorkers--; if (pool->nWorkers == 0 && pool->nPrioWorkers == 0) - virCondSignal(&pool->quit_cond); - virMutexUnlock(&pool->mutex); + g_cond_signal(&pool->quit_cond); + g_mutex_unlock(&pool->mutex); } static int @@ -241,12 +237,9 @@ virThreadPoolNewFull(size_t minWorkers, pool->jobName = name; pool->jobOpaque = opaque; - if (virMutexInit(&pool->mutex) < 0) - goto error; - if (virCondInit(&pool->cond) < 0) - goto error; - if (virCondInit(&pool->quit_cond) < 0) - goto error; + g_mutex_init(&pool->mutex); + g_cond_init(&pool->cond); + g_cond_init(&pool->quit_cond); pool->minWorkers = minWorkers; pool->maxWorkers = maxWorkers; @@ -256,8 +249,7 @@ virThreadPoolNewFull(size_t minWorkers, goto error; if (prioWorkers) { - if (virCondInit(&pool->prioCond) < 0) - goto error; + g_cond_init(&pool->prioCond); if (virThreadPoolExpand(pool, prioWorkers, true) < 0) goto error; @@ -279,17 +271,17 @@ void virThreadPoolFree(virThreadPoolPtr pool) if (!pool) return; - virMutexLock(&pool->mutex); + g_mutex_lock(&pool->mutex); pool->quit = true; if (pool->nWorkers > 0) - virCondBroadcast(&pool->cond); + g_cond_broadcast(&pool->cond); if (pool->nPrioWorkers > 0) { priority = true; - virCondBroadcast(&pool->prioCond); + g_cond_broadcast(&pool->prioCond); } while (pool->nWorkers > 0 || pool->nPrioWorkers > 0) - ignore_value(virCondWait(&pool->quit_cond, &pool->mutex)); + g_cond_wait(&pool->quit_cond, &pool->mutex); while ((job = pool->jobList.head)) { pool->jobList.head = pool->jobList.head->next; @@ -297,13 +289,13 @@ void virThreadPoolFree(virThreadPoolPtr pool) } VIR_FREE(pool->workers); - virMutexUnlock(&pool->mutex); - virMutexDestroy(&pool->mutex); - virCondDestroy(&pool->quit_cond); - virCondDestroy(&pool->cond); + g_mutex_unlock(&pool->mutex); + g_mutex_clear(&pool->mutex); + g_cond_clear(&pool->quit_cond); + g_cond_clear(&pool->cond); if (priority) { VIR_FREE(pool->prioWorkers); - virCondDestroy(&pool->prioCond); + g_cond_clear(&pool->prioCond); } VIR_FREE(pool); } @@ -311,68 +303,38 @@ void virThreadPoolFree(virThreadPoolPtr pool) size_t virThreadPoolGetMinWorkers(virThreadPoolPtr pool) { - size_t ret; - - virMutexLock(&pool->mutex); - ret = pool->minWorkers; - virMutexUnlock(&pool->mutex); - - return ret; + g_autoptr(GMutexLocker) locker = g_mutex_locker_new(&pool->mutex); + return pool->minWorkers; } size_t virThreadPoolGetMaxWorkers(virThreadPoolPtr pool) { - size_t ret; - - virMutexLock(&pool->mutex); - ret = pool->maxWorkers; - virMutexUnlock(&pool->mutex); - - return ret; + g_autoptr(GMutexLocker) locker = g_mutex_locker_new(&pool->mutex); + return pool->maxWorkers; } size_t virThreadPoolGetPriorityWorkers(virThreadPoolPtr pool) { - size_t ret; - - virMutexLock(&pool->mutex); - ret = pool->nPrioWorkers; - virMutexUnlock(&pool->mutex); - - return ret; + g_autoptr(GMutexLocker) locker = g_mutex_locker_new(&pool->mutex); + return pool->nPrioWorkers; } size_t virThreadPoolGetCurrentWorkers(virThreadPoolPtr pool) { - size_t ret; - - virMutexLock(&pool->mutex); - ret = pool->nWorkers; - virMutexUnlock(&pool->mutex); - - return ret; + g_autoptr(GMutexLocker) locker = g_mutex_locker_new(&pool->mutex); + return pool->nWorkers; } size_t virThreadPoolGetFreeWorkers(virThreadPoolPtr pool) { - size_t ret; - - virMutexLock(&pool->mutex); - ret = pool->freeWorkers; - virMutexUnlock(&pool->mutex); - - return ret; + g_autoptr(GMutexLocker) locker = g_mutex_locker_new(&pool->mutex); + return pool->freeWorkers; } size_t virThreadPoolGetJobQueueDepth(virThreadPoolPtr pool) { - size_t ret; - - virMutexLock(&pool->mutex); - ret = pool->jobQueueDepth; - virMutexUnlock(&pool->mutex); - - return ret; + g_autoptr(GMutexLocker) locker = g_mutex_locker_new(&pool->mutex); + return pool->jobQueueDepth; } /* @@ -384,18 +346,18 @@ int virThreadPoolSendJob(virThreadPoolPtr pool, void *jobData) { virThreadPoolJobPtr job; + g_autoptr(GMutexLocker) locker = g_mutex_locker_new(&pool->mutex); - virMutexLock(&pool->mutex); if (pool->quit) - goto error; + return -1; if (pool->freeWorkers - pool->jobQueueDepth <= 0 && pool->nWorkers < pool->maxWorkers && virThreadPoolExpand(pool, 1, false) < 0) - goto error; + return -1; if (VIR_ALLOC(job) < 0) - goto error; + return -1; job->data = jobData; job->priority = priority; @@ -413,16 +375,11 @@ int virThreadPoolSendJob(virThreadPoolPtr pool, pool->jobQueueDepth++; - virCondSignal(&pool->cond); + g_cond_signal(&pool->cond); if (priority) - virCondSignal(&pool->prioCond); + g_cond_signal(&pool->prioCond); - virMutexUnlock(&pool->mutex); return 0; - - error: - virMutexUnlock(&pool->mutex); - return -1; } int @@ -433,15 +390,14 @@ virThreadPoolSetParameters(virThreadPoolPtr pool, { size_t max; size_t min; - - virMutexLock(&pool->mutex); + g_autoptr(GMutexLocker) locker = g_mutex_locker_new(&pool->mutex); max = maxWorkers >= 0 ? maxWorkers : pool->maxWorkers; min = minWorkers >= 0 ? minWorkers : pool->minWorkers; if (min > max) { virReportError(VIR_ERR_INVALID_ARG, "%s", _("minWorkers cannot be larger than maxWorkers")); - goto error; + return -1; } if ((maxWorkers == 0 && pool->maxWorkers > 0) || @@ -449,37 +405,32 @@ virThreadPoolSetParameters(virThreadPoolPtr pool, virReportError(VIR_ERR_INVALID_ARG, "%s", _("maxWorkers must not be switched from zero to non-zero" " and vice versa")); - goto error; + return -1; } if (minWorkers >= 0) { if ((size_t) minWorkers > pool->nWorkers && virThreadPoolExpand(pool, minWorkers - pool->nWorkers, false) < 0) - goto error; + return -1; pool->minWorkers = minWorkers; } if (maxWorkers >= 0) { pool->maxWorkers = maxWorkers; - virCondBroadcast(&pool->cond); + g_cond_broadcast(&pool->cond); } if (prioWorkers >= 0) { if (prioWorkers < pool->nPrioWorkers) { - virCondBroadcast(&pool->prioCond); + g_cond_broadcast(&pool->prioCond); } else if ((size_t) prioWorkers > pool->nPrioWorkers && virThreadPoolExpand(pool, prioWorkers - pool->nPrioWorkers, true) < 0) { - goto error; + return -1; } pool->maxPrioWorkers = prioWorkers; } - virMutexUnlock(&pool->mutex); return 0; - - error: - virMutexUnlock(&pool->mutex); - return -1; } -- 2.25.2