Stefan Beller <sbeller@xxxxxxxxxx> writes: > This adds functionality to do work in parallel. > > The whole life cycle of such a thread pool would look like > > struct task_queue * tq = create_task_queue(32); // no of threads > for (...) > add_task(tq, process_one_item_function, item); // non blocking > ... > int ret = finish_task_queue(tq); // blocks until all tasks are done > if (!tq) > die ("Not all items were be processed"); > > The caller must take care of handling the output. > > Signed-off-by: Stefan Beller <sbeller@xxxxxxxxxx> > --- > > I sent this a while ago to the list, no comments on it :( The primary reason I suspect is because you sent to a wrong set of people. Submodule folks have largely been working in the scripted ones, and may not necessarily be the ones who are most familiar with the run-command infrastructure. "shortlog --no-merges" tells me that the obvious suspects are j6t and peff. > The core functionality stayed the same, but I hope to improved naming and > location of the code. > > The WIP is only for the NO_PTHREADS case. > run-command.c | 212 ++++++++++++++++++++++++++++++++++++++++++++++++++++++---- > run-command.h | 30 +++++++++ > 2 files changed, 230 insertions(+), 12 deletions(-) > > diff --git a/run-command.c b/run-command.c > index 28e1d55..4029011 100644 > --- a/run-command.c > +++ b/run-command.c > @@ -4,6 +4,21 @@ > #include "sigchain.h" > #include "argv-array.h" > > +#ifdef NO_PTHREADS > + > +#else > + > +#include "thread-utils.h" > + > +#include <pthread.h> > +#include <semaphore.h> > +#include <stdio.h> > +#include <unistd.h> > + > +#endif > + > +#include "git-compat-util.h" > + This goes against the way we have been organizing the header files. http://thread.gmane.org/gmane.comp.version-control.git/276241/focus=276265 > @@ -668,6 +683,22 @@ int git_atexit(void (*handler)(void)) > > #endif > > +void setup_main_thread() void setup_main_thread(void) > @@ -852,3 +872,171 @@ int capture_command(struct child_process *cmd, struct strbuf *buf, size_t hint) > close(cmd->out); > return finish_command(cmd); > } > + > +#ifndef NO_PTHREADS > +struct job_list { > + int (*fct)(struct task_queue *aq, void *task); > + void *task; > + struct job_list *next; > +}; > +#endif > + > +struct task_queue { > +#ifndef NO_PTHREADS > + /* > + * To avoid deadlocks always aquire the semaphores with lowest priority acquire. > + * first, priorites are in descending order as listed. > + * > + * The `mutex` is a general purpose lock for modifying data in the async > + * queue, such as adding a new task or adding a return value from > + * an already run task. > + * > + * `workingcount` and `freecount` are opposing semaphores, the sum of > + * their values should equal `max_threads` at any time while the `mutex` > + * is available. > + */ > + sem_t mutex; > + sem_t workingcount; > + sem_t freecount; > + > + pthread_t *threads; > + unsigned max_threads; > + > + struct job_list *first; > + struct job_list *last; > +#endif > + int early_return; > +}; > + > +#ifndef NO_PTHREADS > + > +static void get_task(struct task_queue *aq, > + int (**fct)(struct task_queue *aq, void *task), > + void **task, > + int *early_return) > +{ > + struct job_list *job; > + > + sem_wait(&aq->workingcount); > + sem_wait(&aq->mutex); > + > + if (!aq->first) > + die("BUG: internal error with dequeuing jobs for threads"); > + job = aq->first; > + *fct = job->fct; > + *task = job->task; > + aq->early_return |= *early_return; > + *early_return = aq->early_return; > + aq->first = job->next; > + if (!aq->first) > + aq->last = NULL; > + > + sem_post(&aq->freecount); > + sem_post(&aq->mutex); > + > + free(job); > +} > + > +static void* dispatcher(void *args) static void *dispatcher(....) > +{ > + void *task; > + int (*fct)(struct task_queue *aq, void *data); s/data/task/? > + int early_return = 0; > + struct task_queue *aq = args; > + > + get_task(aq, &fct, &task, &early_return); > + while (fct || early_return != 0) { > + early_return = fct(aq, task); > + get_task(aq, &fct, &task, &early_return); > + } If the func said "we are done, you may stop dispatching now", do you still want to do another get_task()? > + pthread_exit(0); > +} > +#endif > + > +struct task_queue *create_task_queue(unsigned max_threads) > +{ > + struct task_queue *aq = xmalloc(sizeof(*aq)); > + > +#ifndef NO_PTHREADS > + int i; > + if (!max_threads) > + aq->max_threads = online_cpus(); > + else > + aq->max_threads = max_threads; > + > + sem_init(&aq->mutex, 0, 1); > + sem_init(&aq->workingcount, 0, 0); > + sem_init(&aq->freecount, 0, aq->max_threads); > + aq->threads = xmalloc(aq->max_threads * sizeof(pthread_t)); > + > + for (i = 0; i < aq->max_threads; i++) > + pthread_create(&aq->threads[i], 0, &dispatcher, aq); > + > + aq->first = NULL; > + aq->last = NULL; Shouldn't these be initialized before letting threads call into dispatcher? The workingcount semaphore that is initialized to 0 may prevent them from peeking into these pointers and barfing, but still... > + > + setup_main_thread(); > +#endif > + aq->early_return = 0; > + > + return aq; > +} -- To unsubscribe from this list: send the line "unsubscribe git" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html