This adds functionality to do work in parallel. The whole life cycle of such a thread pool would look like struct task_queue * tq = create_task_queue(32); // no of threads for (...) add_task(tq, process_one_item_function, item); // non blocking ... int ret = finish_task_queue(tq); // blocks until all tasks are done if (!tq) die ("Not all items were be processed"); The caller must take care of handling the output. Signed-off-by: Stefan Beller <sbeller@xxxxxxxxxx> --- I sent this a while ago to the list, no comments on it :( The core functionality stayed the same, but I hope to improved naming and location of the code. The WIP is only for the NO_PTHREADS case. run-command.c | 212 ++++++++++++++++++++++++++++++++++++++++++++++++++++++---- run-command.h | 30 +++++++++ 2 files changed, 230 insertions(+), 12 deletions(-) diff --git a/run-command.c b/run-command.c index 28e1d55..4029011 100644 --- a/run-command.c +++ b/run-command.c @@ -4,6 +4,21 @@ #include "sigchain.h" #include "argv-array.h" +#ifdef NO_PTHREADS + +#else + +#include "thread-utils.h" + +#include <pthread.h> +#include <semaphore.h> +#include <stdio.h> +#include <unistd.h> + +#endif + +#include "git-compat-util.h" + void child_process_init(struct child_process *child) { memset(child, 0, sizeof(*child)); @@ -668,6 +683,22 @@ int git_atexit(void (*handler)(void)) #endif +void setup_main_thread() +{ + if (!main_thread_set) { + /* + * We assume that the first time that start_async is called + * it is from the main thread. + */ + main_thread_set = 1; + main_thread = pthread_self(); + pthread_key_create(&async_key, NULL); + pthread_key_create(&async_die_counter, NULL); + set_die_routine(die_async); + set_die_is_recursing_routine(async_die_is_recursing); + } +} + int start_async(struct async *async) { int need_in, need_out; @@ -740,18 +771,7 @@ int start_async(struct async *async) else if (async->out) close(async->out); #else - if (!main_thread_set) { - /* - * We assume that the first time that start_async is called - * it is from the main thread. - */ - main_thread_set = 1; - main_thread = pthread_self(); - pthread_key_create(&async_key, NULL); - pthread_key_create(&async_die_counter, NULL); - set_die_routine(die_async); - set_die_is_recursing_routine(async_die_is_recursing); - } + setup_main_thread(); if (proc_in >= 0) set_cloexec(proc_in); @@ -852,3 +872,171 @@ int capture_command(struct child_process *cmd, struct strbuf *buf, size_t hint) close(cmd->out); return finish_command(cmd); } + +#ifndef NO_PTHREADS +struct job_list { + int (*fct)(struct task_queue *aq, void *task); + void *task; + struct job_list *next; +}; +#endif + +struct task_queue { +#ifndef NO_PTHREADS + /* + * To avoid deadlocks always aquire the semaphores with lowest priority + * first, priorites are in descending order as listed. + * + * The `mutex` is a general purpose lock for modifying data in the async + * queue, such as adding a new task or adding a return value from + * an already run task. + * + * `workingcount` and `freecount` are opposing semaphores, the sum of + * their values should equal `max_threads` at any time while the `mutex` + * is available. + */ + sem_t mutex; + sem_t workingcount; + sem_t freecount; + + pthread_t *threads; + unsigned max_threads; + + struct job_list *first; + struct job_list *last; +#endif + int early_return; +}; + +#ifndef NO_PTHREADS + +static void get_task(struct task_queue *aq, + int (**fct)(struct task_queue *aq, void *task), + void **task, + int *early_return) +{ + struct job_list *job; + + sem_wait(&aq->workingcount); + sem_wait(&aq->mutex); + + if (!aq->first) + die("BUG: internal error with dequeuing jobs for threads"); + job = aq->first; + *fct = job->fct; + *task = job->task; + aq->early_return |= *early_return; + *early_return = aq->early_return; + aq->first = job->next; + if (!aq->first) + aq->last = NULL; + + sem_post(&aq->freecount); + sem_post(&aq->mutex); + + free(job); +} + +static void* dispatcher(void *args) +{ + void *task; + int (*fct)(struct task_queue *aq, void *data); + int early_return = 0; + struct task_queue *aq = args; + + get_task(aq, &fct, &task, &early_return); + while (fct || early_return != 0) { + early_return = fct(aq, task); + get_task(aq, &fct, &task, &early_return); + } + + pthread_exit(0); +} +#endif + +struct task_queue *create_task_queue(unsigned max_threads) +{ + struct task_queue *aq = xmalloc(sizeof(*aq)); + +#ifndef NO_PTHREADS + int i; + if (!max_threads) + aq->max_threads = online_cpus(); + else + aq->max_threads = max_threads; + + sem_init(&aq->mutex, 0, 1); + sem_init(&aq->workingcount, 0, 0); + sem_init(&aq->freecount, 0, aq->max_threads); + aq->threads = xmalloc(aq->max_threads * sizeof(pthread_t)); + + for (i = 0; i < aq->max_threads; i++) + pthread_create(&aq->threads[i], 0, &dispatcher, aq); + + aq->first = NULL; + aq->last = NULL; + + setup_main_thread(); +#endif + aq->early_return = 0; + + return aq; +} + +void add_task(struct task_queue *aq, + int (*fct)(struct task_queue *aq, void *task), + void *task) +{ +#ifndef NO_PTHREADS + struct job_list *job_list; + + job_list = xmalloc(sizeof(*job_list)); + job_list->task = task; + job_list->fct = fct; + job_list->next = NULL; + + sem_wait(&aq->freecount); + sem_wait(&aq->mutex); + + if (!aq->last) { + aq->last = job_list; + aq->first = aq->last; + } else { + aq->last->next = job_list; + aq->last = aq->last->next; + } + + sem_post(&aq->workingcount); + sem_post(&aq->mutex); +#else + ALLOC_GROW(aq->ret->ret, aq->ret->count + 1, aq->ret->alloc); + aq->ret->ret[aq->ret->count++] = aq->function(job); +#endif +} + +int finish_task_queue(struct task_queue *aq) +{ + int ret; +#ifndef NO_PTHREADS + int i; + for (i = 0; i < aq->max_threads; i++) + add_task(aq, NULL, NULL); + + for (i = 0; i < aq->max_threads; i++) + pthread_join(aq->threads[i], 0); + + sem_destroy(&aq->mutex); + sem_destroy(&aq->workingcount); + sem_destroy(&aq->freecount); + + if (aq->first) + die("BUG: internal error with queuing jobs for threads"); + + free(aq->threads); +#endif + ret = aq->early_return; + + free(aq); + return ret; +} + diff --git a/run-command.h b/run-command.h index 5b4425a..c2cfd49 100644 --- a/run-command.h +++ b/run-command.h @@ -119,4 +119,34 @@ struct async { int start_async(struct async *async); int finish_async(struct async *async); +/* + * Creates a struct `task_queue`, which holds a list of tasks. Up to + * `max_threads` threads are active to process the enqueued tasks + * processing the tasks in a first in first out order. + * + * If `max_threads` is zero the number of cores available will be used. + * + * Currently this only works in environments with pthreads, in other + * environments, the task will be processed sequentially in `add_task`. + */ +struct task_queue *create_task_queue(unsigned max_threads); + +/* + * The function and data are put into the task queue. + * + * The function `fct` must not be NULL, as that's used internally + * in `finish_task_queue` to signal shutdown. If the return code + * of `fct` is unequal to 0, the tasks will stop eventually, + * the current parallel tasks will be flushed out. + */ +void add_task(struct task_queue *aq, + int (*fct)(struct task_queue *aq, void *task), + void *task); + +/* + * Waits for all tasks to be done and frees the object. The return code + * is zero if all enqueued tasks were processed. + */ +int finish_task_queue(struct task_queue *aq); + #endif -- 2.5.0.264.g01b5c38.dirty -- To unsubscribe from this list: send the line "unsubscribe git" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html