Re: [RFC PATCH 2/3] run-commands: add an async queue processor

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Fri, Aug 21, 2015 at 12:05 PM, Junio C Hamano <gitster@xxxxxxxxx> wrote:
> Stefan Beller <sbeller@xxxxxxxxxx> writes:
>
>> This adds functionality to do work in parallel.
>>
>> The whole life cycle of such a thread pool would look like
>>
>>     struct task_queue * tq = create_task_queue(32); // no of threads
>>     for (...)
>>         add_task(tq, process_one_item_function, item); // non blocking
>>     ...
>>     int ret = finish_task_queue(tq); // blocks until all tasks are done
>>     if (!tq)
>>         die ("Not all items were be processed");
>>
>> The caller must take care of handling the output.
>>
>> Signed-off-by: Stefan Beller <sbeller@xxxxxxxxxx>
>> ---
>>
>> I sent this a while ago to the list, no comments on it :(
>
> The primary reason I suspect is because you sent to a wrong set of
> people.  Submodule folks have largely been working in the scripted
> ones, and may not necessarily be the ones who are most familiar with
> the run-command infrastructure.
>
> "shortlog --no-merges" tells me that the obvious suspects are j6t
> and peff.

noted.

>
>> The core functionality stayed the same, but I hope to improved naming and
>> location of the code.
>>
>> The WIP is only for the NO_PTHREADS case.
>
>>  run-command.c | 212 ++++++++++++++++++++++++++++++++++++++++++++++++++++++----
>>  run-command.h |  30 +++++++++
>>  2 files changed, 230 insertions(+), 12 deletions(-)
>>
>> diff --git a/run-command.c b/run-command.c
>> index 28e1d55..4029011 100644
>> --- a/run-command.c
>> +++ b/run-command.c
>> @@ -4,6 +4,21 @@
>>  #include "sigchain.h"
>>  #include "argv-array.h"
>>
>> +#ifdef NO_PTHREADS
>> +
>> +#else
>> +
>> +#include "thread-utils.h"
>> +
>> +#include <pthread.h>
>> +#include <semaphore.h>
>> +#include <stdio.h>
>> +#include <unistd.h>
>> +
>> +#endif
>> +
>> +#include "git-compat-util.h"
>> +
>
> This goes against the way we have been organizing the header files.
>
> http://thread.gmane.org/gmane.comp.version-control.git/276241/focus=276265

ok
>
>> @@ -668,6 +683,22 @@ int git_atexit(void (*handler)(void))
>>
>>  #endif
>>
>> +void setup_main_thread()
>
> void setup_main_thread(void)
>
>> @@ -852,3 +872,171 @@ int capture_command(struct child_process *cmd, struct strbuf *buf, size_t hint)
>>       close(cmd->out);
>>       return finish_command(cmd);
>>  }
>> +
>> +#ifndef NO_PTHREADS
>> +struct job_list {
>> +     int (*fct)(struct task_queue *aq, void *task);
>> +     void *task;
>> +     struct job_list *next;
>> +};
>> +#endif
>> +
>> +struct task_queue {
>> +#ifndef NO_PTHREADS
>> +     /*
>> +      * To avoid deadlocks always aquire the semaphores with lowest priority
>
> acquire.
>
>> +      * first, priorites are in descending order as listed.
>> +      *
>> +      * The `mutex` is a general purpose lock for modifying data in the async
>> +      * queue, such as adding a new task or adding a return value from
>> +      * an already run task.
>> +      *
>> +      * `workingcount` and `freecount` are opposing semaphores, the sum of
>> +      * their values should equal `max_threads` at any time while the `mutex`
>> +      * is available.
>> +      */
>> +     sem_t mutex;
>> +     sem_t workingcount;
>> +     sem_t freecount;
>> +
>> +     pthread_t *threads;
>> +     unsigned max_threads;
>> +
>> +     struct job_list *first;
>> +     struct job_list *last;
>> +#endif
>> +     int early_return;
>> +};
>> +
>> +#ifndef NO_PTHREADS
>> +
>> +static void get_task(struct task_queue *aq,
>> +                  int (**fct)(struct task_queue *aq, void *task),
>> +                  void **task,
>> +                  int *early_return)
>> +{
>> +     struct job_list *job;
>> +
>> +     sem_wait(&aq->workingcount);
>> +     sem_wait(&aq->mutex);
>> +
>> +     if (!aq->first)
>> +             die("BUG: internal error with dequeuing jobs for threads");
>> +     job = aq->first;
>> +     *fct = job->fct;
>> +     *task = job->task;
>> +     aq->early_return |= *early_return;
>> +     *early_return = aq->early_return;
>> +     aq->first = job->next;
>> +     if (!aq->first)
>> +             aq->last = NULL;
>> +
>> +     sem_post(&aq->freecount);
>> +     sem_post(&aq->mutex);
>> +
>> +     free(job);
>> +}
>> +
>> +static void* dispatcher(void *args)
>
> static void *dispatcher(....)
>
>> +{
>> +     void *task;
>> +     int (*fct)(struct task_queue *aq, void *data);
>
> s/data/task/?
>
>> +     int early_return = 0;
>> +     struct task_queue *aq = args;
>> +
>> +     get_task(aq, &fct, &task, &early_return);
>> +     while (fct || early_return != 0) {
>> +             early_return = fct(aq, task);
>> +             get_task(aq, &fct, &task, &early_return);
>> +     }
>
> If the func said "we are done, you may stop dispatching now", do you
> still want to do another get_task()?

This question shows me I messed up readability of this. `get_task` both gets
a new task as well as taking the value from early_return writing it to
aq->early_return,
such that other threads are notified.

Maybe I should do that explicitely here and not get the new task.

>
>> +     pthread_exit(0);
>> +}
>> +#endif
>> +
>> +struct task_queue *create_task_queue(unsigned max_threads)
>> +{
>> +     struct task_queue *aq = xmalloc(sizeof(*aq));
>> +
>> +#ifndef NO_PTHREADS
>> +     int i;
>> +     if (!max_threads)
>> +             aq->max_threads = online_cpus();
>> +     else
>> +             aq->max_threads = max_threads;
>> +
>> +     sem_init(&aq->mutex, 0, 1);
>> +     sem_init(&aq->workingcount, 0, 0);
>> +     sem_init(&aq->freecount, 0, aq->max_threads);
>> +     aq->threads = xmalloc(aq->max_threads * sizeof(pthread_t));
>> +
>> +     for (i = 0; i < aq->max_threads; i++)
>> +             pthread_create(&aq->threads[i], 0, &dispatcher, aq);
>> +
>> +     aq->first = NULL;
>> +     aq->last = NULL;
>
>
> Shouldn't these be initialized before letting threads call into
> dispatcher?  The workingcount semaphore that is initialized to 0 may
> prevent them from peeking into these pointers and barfing, but still...

They are initialized to NULL as the empty queue doesn't need a
container element.
Do we do queues in another way usually?

>
>> +
>> +     setup_main_thread();
>> +#endif
>> +     aq->early_return = 0;
>> +
>> +     return aq;
>> +}
--
To unsubscribe from this list: send the line "unsubscribe git" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html



[Index of Archives]     [Linux Kernel Development]     [Gcc Help]     [IETF Annouce]     [DCCP]     [Netdev]     [Networking]     [Security]     [V4L]     [Bugtraq]     [Yosemite]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux RAID]     [Linux SCSI]     [Fedora Users]