This adds infrastructure code to work a set of tasks from a thread pool. The whole life cycle of such a thread pool would look like struct workdispatcher *wd; struct return_values *rv; wd = create_workdispatcher(&command_for_task, max_parallel_jobs); for (...) { prepare(pointer_to_task); add_task(wd, pointer_to_task); } rv = wait_workdispatcher(wd); Signed-off-by: Stefan Beller <sbeller@xxxxxxxxxx> --- Makefile | 1 + workdispatcher.c | 184 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ workdispatcher.h | 29 +++++++++ 3 files changed, 214 insertions(+) create mode 100644 workdispatcher.c create mode 100644 workdispatcher.h diff --git a/Makefile b/Makefile index 6fb7484..2d8803c 100644 --- a/Makefile +++ b/Makefile @@ -805,6 +805,7 @@ LIB_OBJS += version.o LIB_OBJS += versioncmp.o LIB_OBJS += walker.o LIB_OBJS += wildmatch.o +LIB_OBJS += workdispatcher.o LIB_OBJS += wrapper.o LIB_OBJS += write_or_die.o LIB_OBJS += ws.o diff --git a/workdispatcher.c b/workdispatcher.c new file mode 100644 index 0000000..adfedd9 --- /dev/null +++ b/workdispatcher.c @@ -0,0 +1,184 @@ +#include "cache.h" +#include "workdispatcher.h" + +#ifndef NO_PTHREADS +#include <pthread.h> +#include <semaphore.h> +#include <stdio.h> +#include <unistd.h> + +#include "git-compat-util.h" +struct job_list { + void *item; + struct job_list *next; +}; +#endif + +struct workdispatcher { +#ifndef NO_PTHREADS + /* + * To avoid deadlocks always aquire the semaphores with lowest priority + * first, priorites are in descending order as listed. + * + * The `mutex` is a general purpose lock for modifying data in the work + * dispatcher, such as adding a new task or adding a return value from + * an already run task. + * + * `workingcount` and `freecount` are opposing semaphores, the sum of + * their values should equal `max_threads` at any time while the `mutex` + * is available. + */ + sem_t mutex; + sem_t workingcount; + sem_t freecount; + + pthread_t *threads; + unsigned max_threads; + + struct job_list *first; + struct job_list *last; +#endif + void *(*function)(void*); + struct return_values *ret; +}; + +#ifndef NO_PTHREADS +static unsigned number_cores(void) +{ + int count = sysconf(_SC_NPROCESSORS_ONLN); + if (count < 1) { + fprintf(stderr, "Number of CPUs online reported %d. " + "Using one core.\n", count); + count = 1; + } + return count; +} + +void *get_task(struct workdispatcher *wd) +{ + void *ret; + struct job_list *job; + + sem_wait(&wd->workingcount); + sem_wait(&wd->mutex); + + if (!wd->first) + die("BUG: internal error with dequeuing jobs for threads"); + job = wd->first; + ret = job->item; + wd->first = job->next; + if (!wd->first) + wd->last = NULL; + + sem_post(&wd->freecount); + sem_post(&wd->mutex); + + free(job); + return ret; +} + +void* dispatcher(void *args) +{ + struct workdispatcher *wd = args; + void *job = get_task(wd); + while (job) { + void *retvalue = wd->function(job); + + sem_wait(&wd->mutex); + struct return_values *rv = wd->ret; + ALLOC_GROW(rv->ret, rv->count + 1, rv->alloc); + wd->ret->ret[rv->count++] = retvalue; + sem_post(&wd->mutex); + + job = get_task(wd); + } + + pthread_exit(0); +} +#endif + +struct workdispatcher *create_workdispatcher(void *function(void*), + unsigned max_threads) +{ + struct workdispatcher *wd = xmalloc(sizeof(*wd)); + +#ifndef NO_PTHREADS + int i; + if (!max_threads) + wd->max_threads = number_cores(); + else + wd->max_threads = max_threads; + + sem_init(&wd->mutex, 0, 1); + sem_init(&wd->workingcount, 0, 0); + sem_init(&wd->freecount, 0, wd->max_threads); + wd->threads = xmalloc(wd->max_threads * sizeof(pthread_t)); + + for (i = 0; i < wd->max_threads; i++) + pthread_create(&wd->threads[i], 0, &dispatcher, wd); + + wd->first = NULL; + wd->last = NULL; +#endif + wd->function = function; + wd->ret = xmalloc(sizeof(*wd->ret)); + wd->ret->ret = NULL; + wd->ret->count = 0; + wd->ret->alloc = 0; + + return wd; +} + +void add_task(struct workdispatcher *wd, void *job) +{ +#ifndef NO_PTHREADS + struct job_list *job_list; + + job_list = xmalloc(sizeof(*job_list)); + job_list->item = job; + job_list->next = NULL; + + sem_wait(&wd->freecount); + sem_wait(&wd->mutex); + + if (!wd->last) { + wd->last = job_list; + wd->first = wd->last; + } else { + wd->last->next = job_list; + wd->last = wd->last->next; + } + + sem_post(&wd->workingcount); + sem_post(&wd->mutex); +#else + ALLOC_GROW(wd->ret->ret, wd->ret->count + 1, wd->ret->alloc); + wd->ret->ret[wd->ret->count++] = wd->function(job); +#endif +} + +struct return_values *wait_workdispatcher(struct workdispatcher *wd) +{ + struct return_values *ret; +#ifndef NO_PTHREADS + int i; + for (i = 0; i < wd->max_threads; i++) + add_task(wd, NULL); + + for (i = 0; i < wd->max_threads; i++) + pthread_join(wd->threads[i], 0); + + sem_destroy(&wd->mutex); + sem_destroy(&wd->workingcount); + sem_destroy(&wd->freecount); + + if (wd->first) + die("BUG: internal error with queuing jobs for threads"); + + free(wd->threads); +#endif + ret = wd->ret; + + free(wd); + return ret; +} diff --git a/workdispatcher.h b/workdispatcher.h new file mode 100644 index 0000000..9f78124 --- /dev/null +++ b/workdispatcher.h @@ -0,0 +1,29 @@ +#ifndef WORKDISPATCHER +#define WORKDISPATCHER + +struct return_values { + void **ret; + int count, alloc; +}; + +/* + * Creates a struct workdispatcher, which holds a job list and assigns the + * jobs to be processed to a number of threads `maxthreads`. + * Within the threads the function `fct` is called with the pointer as + * given in add_task. + * + * If `maxthreads` is zero the number of cores available will be used. + * + * Currently this only works in environments with pthreads, in other + * environments, the task will be processed directly in `add_task`. + */ +struct workdispatcher *create_workdispatcher(void *fct(void*), + unsigned maxthreads); + +/* Waits for all tasks to be done and frees the object. */ +struct return_values *wait_workdispatcher(struct workdispatcher *wd); + +/* `task` must not be NULL, as that's used internally to signal shutdown. */ +void add_task(struct workdispatcher *wd, void *task); + +#endif -- 2.5.0.239.g9728e1d.dirty -- To unsubscribe from this list: send the line "unsubscribe git" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html