On Fri, Aug 21, 2015 at 12:05 PM, Junio C Hamano <gitster@xxxxxxxxx> wrote: > Stefan Beller <sbeller@xxxxxxxxxx> writes: > >> This adds functionality to do work in parallel. >> >> The whole life cycle of such a thread pool would look like >> >> struct task_queue * tq = create_task_queue(32); // no of threads >> for (...) >> add_task(tq, process_one_item_function, item); // non blocking >> ... >> int ret = finish_task_queue(tq); // blocks until all tasks are done >> if (!tq) >> die ("Not all items were be processed"); >> >> The caller must take care of handling the output. >> >> Signed-off-by: Stefan Beller <sbeller@xxxxxxxxxx> >> --- >> >> I sent this a while ago to the list, no comments on it :( > > The primary reason I suspect is because you sent to a wrong set of > people. Submodule folks have largely been working in the scripted > ones, and may not necessarily be the ones who are most familiar with > the run-command infrastructure. > > "shortlog --no-merges" tells me that the obvious suspects are j6t > and peff. noted. > >> The core functionality stayed the same, but I hope to improved naming and >> location of the code. >> >> The WIP is only for the NO_PTHREADS case. > >> run-command.c | 212 ++++++++++++++++++++++++++++++++++++++++++++++++++++++---- >> run-command.h | 30 +++++++++ >> 2 files changed, 230 insertions(+), 12 deletions(-) >> >> diff --git a/run-command.c b/run-command.c >> index 28e1d55..4029011 100644 >> --- a/run-command.c >> +++ b/run-command.c >> @@ -4,6 +4,21 @@ >> #include "sigchain.h" >> #include "argv-array.h" >> >> +#ifdef NO_PTHREADS >> + >> +#else >> + >> +#include "thread-utils.h" >> + >> +#include <pthread.h> >> +#include <semaphore.h> >> +#include <stdio.h> >> +#include <unistd.h> >> + >> +#endif >> + >> +#include "git-compat-util.h" >> + > > This goes against the way we have been organizing the header files. > > http://thread.gmane.org/gmane.comp.version-control.git/276241/focus=276265 ok > >> @@ -668,6 +683,22 @@ int git_atexit(void (*handler)(void)) >> >> #endif >> >> +void setup_main_thread() > > void setup_main_thread(void) > >> @@ -852,3 +872,171 @@ int capture_command(struct child_process *cmd, struct strbuf *buf, size_t hint) >> close(cmd->out); >> return finish_command(cmd); >> } >> + >> +#ifndef NO_PTHREADS >> +struct job_list { >> + int (*fct)(struct task_queue *aq, void *task); >> + void *task; >> + struct job_list *next; >> +}; >> +#endif >> + >> +struct task_queue { >> +#ifndef NO_PTHREADS >> + /* >> + * To avoid deadlocks always aquire the semaphores with lowest priority > > acquire. > >> + * first, priorites are in descending order as listed. >> + * >> + * The `mutex` is a general purpose lock for modifying data in the async >> + * queue, such as adding a new task or adding a return value from >> + * an already run task. >> + * >> + * `workingcount` and `freecount` are opposing semaphores, the sum of >> + * their values should equal `max_threads` at any time while the `mutex` >> + * is available. >> + */ >> + sem_t mutex; >> + sem_t workingcount; >> + sem_t freecount; >> + >> + pthread_t *threads; >> + unsigned max_threads; >> + >> + struct job_list *first; >> + struct job_list *last; >> +#endif >> + int early_return; >> +}; >> + >> +#ifndef NO_PTHREADS >> + >> +static void get_task(struct task_queue *aq, >> + int (**fct)(struct task_queue *aq, void *task), >> + void **task, >> + int *early_return) >> +{ >> + struct job_list *job; >> + >> + sem_wait(&aq->workingcount); >> + sem_wait(&aq->mutex); >> + >> + if (!aq->first) >> + die("BUG: internal error with dequeuing jobs for threads"); >> + job = aq->first; >> + *fct = job->fct; >> + *task = job->task; >> + aq->early_return |= *early_return; >> + *early_return = aq->early_return; >> + aq->first = job->next; >> + if (!aq->first) >> + aq->last = NULL; >> + >> + sem_post(&aq->freecount); >> + sem_post(&aq->mutex); >> + >> + free(job); >> +} >> + >> +static void* dispatcher(void *args) > > static void *dispatcher(....) > >> +{ >> + void *task; >> + int (*fct)(struct task_queue *aq, void *data); > > s/data/task/? > >> + int early_return = 0; >> + struct task_queue *aq = args; >> + >> + get_task(aq, &fct, &task, &early_return); >> + while (fct || early_return != 0) { >> + early_return = fct(aq, task); >> + get_task(aq, &fct, &task, &early_return); >> + } > > If the func said "we are done, you may stop dispatching now", do you > still want to do another get_task()? This question shows me I messed up readability of this. `get_task` both gets a new task as well as taking the value from early_return writing it to aq->early_return, such that other threads are notified. Maybe I should do that explicitely here and not get the new task. > >> + pthread_exit(0); >> +} >> +#endif >> + >> +struct task_queue *create_task_queue(unsigned max_threads) >> +{ >> + struct task_queue *aq = xmalloc(sizeof(*aq)); >> + >> +#ifndef NO_PTHREADS >> + int i; >> + if (!max_threads) >> + aq->max_threads = online_cpus(); >> + else >> + aq->max_threads = max_threads; >> + >> + sem_init(&aq->mutex, 0, 1); >> + sem_init(&aq->workingcount, 0, 0); >> + sem_init(&aq->freecount, 0, aq->max_threads); >> + aq->threads = xmalloc(aq->max_threads * sizeof(pthread_t)); >> + >> + for (i = 0; i < aq->max_threads; i++) >> + pthread_create(&aq->threads[i], 0, &dispatcher, aq); >> + >> + aq->first = NULL; >> + aq->last = NULL; > > > Shouldn't these be initialized before letting threads call into > dispatcher? The workingcount semaphore that is initialized to 0 may > prevent them from peeking into these pointers and barfing, but still... They are initialized to NULL as the empty queue doesn't need a container element. Do we do queues in another way usually? > >> + >> + setup_main_thread(); >> +#endif >> + aq->early_return = 0; >> + >> + return aq; >> +} -- To unsubscribe from this list: send the line "unsubscribe git" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html