Re: [RFC PATCH 2/3] run-commands: add an async queue processor

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Stefan Beller <sbeller@xxxxxxxxxx> writes:

> This adds functionality to do work in parallel.
>
> The whole life cycle of such a thread pool would look like
>
>     struct task_queue * tq = create_task_queue(32); // no of threads
>     for (...)
>         add_task(tq, process_one_item_function, item); // non blocking
>     ...
>     int ret = finish_task_queue(tq); // blocks until all tasks are done
>     if (!tq)
>         die ("Not all items were be processed");
>
> The caller must take care of handling the output.
>
> Signed-off-by: Stefan Beller <sbeller@xxxxxxxxxx>
> ---
>
> I sent this a while ago to the list, no comments on it :(

The primary reason I suspect is because you sent to a wrong set of
people.  Submodule folks have largely been working in the scripted
ones, and may not necessarily be the ones who are most familiar with
the run-command infrastructure.

"shortlog --no-merges" tells me that the obvious suspects are j6t
and peff.

> The core functionality stayed the same, but I hope to improved naming and
> location of the code.
>
> The WIP is only for the NO_PTHREADS case.

>  run-command.c | 212 ++++++++++++++++++++++++++++++++++++++++++++++++++++++----
>  run-command.h |  30 +++++++++
>  2 files changed, 230 insertions(+), 12 deletions(-)
>
> diff --git a/run-command.c b/run-command.c
> index 28e1d55..4029011 100644
> --- a/run-command.c
> +++ b/run-command.c
> @@ -4,6 +4,21 @@
>  #include "sigchain.h"
>  #include "argv-array.h"
>  
> +#ifdef NO_PTHREADS
> +
> +#else
> +
> +#include "thread-utils.h"
> +
> +#include <pthread.h>
> +#include <semaphore.h>
> +#include <stdio.h>
> +#include <unistd.h>
> +
> +#endif
> +
> +#include "git-compat-util.h"
> +

This goes against the way we have been organizing the header files.

http://thread.gmane.org/gmane.comp.version-control.git/276241/focus=276265

> @@ -668,6 +683,22 @@ int git_atexit(void (*handler)(void))
>  
>  #endif
>  
> +void setup_main_thread()

void setup_main_thread(void)

> @@ -852,3 +872,171 @@ int capture_command(struct child_process *cmd, struct strbuf *buf, size_t hint)
>  	close(cmd->out);
>  	return finish_command(cmd);
>  }
> +
> +#ifndef NO_PTHREADS
> +struct job_list {
> +	int (*fct)(struct task_queue *aq, void *task);
> +	void *task;
> +	struct job_list *next;
> +};
> +#endif
> +
> +struct task_queue {
> +#ifndef NO_PTHREADS
> +	/*
> +	 * To avoid deadlocks always aquire the semaphores with lowest priority

acquire.

> +	 * first, priorites are in descending order as listed.
> +	 *
> +	 * The `mutex` is a general purpose lock for modifying data in the async
> +	 * queue, such as adding a new task or adding a return value from
> +	 * an already run task.
> +	 *
> +	 * `workingcount` and `freecount` are opposing semaphores, the sum of
> +	 * their values should equal `max_threads` at any time while the `mutex`
> +	 * is available.
> +	 */
> +	sem_t mutex;
> +	sem_t workingcount;
> +	sem_t freecount;
> +
> +	pthread_t *threads;
> +	unsigned max_threads;
> +
> +	struct job_list *first;
> +	struct job_list *last;
> +#endif
> +	int early_return;
> +};
> +
> +#ifndef NO_PTHREADS
> +
> +static void get_task(struct task_queue *aq,
> +		     int (**fct)(struct task_queue *aq, void *task),
> +		     void **task,
> +		     int *early_return)
> +{
> +	struct job_list *job;
> +
> +	sem_wait(&aq->workingcount);
> +	sem_wait(&aq->mutex);
> +
> +	if (!aq->first)
> +		die("BUG: internal error with dequeuing jobs for threads");
> +	job = aq->first;
> +	*fct = job->fct;
> +	*task = job->task;
> +	aq->early_return |= *early_return;
> +	*early_return = aq->early_return;
> +	aq->first = job->next;
> +	if (!aq->first)
> +		aq->last = NULL;
> +
> +	sem_post(&aq->freecount);
> +	sem_post(&aq->mutex);
> +
> +	free(job);
> +}
> +
> +static void* dispatcher(void *args)

static void *dispatcher(....)

> +{
> +	void *task;
> +	int (*fct)(struct task_queue *aq, void *data);

s/data/task/?

> +	int early_return = 0;
> +	struct task_queue *aq = args;
> +
> +	get_task(aq, &fct, &task, &early_return);
> +	while (fct || early_return != 0) {
> +		early_return = fct(aq, task);
> +		get_task(aq, &fct, &task, &early_return);
> +	}

If the func said "we are done, you may stop dispatching now", do you
still want to do another get_task()?

> +	pthread_exit(0);
> +}
> +#endif
> +
> +struct task_queue *create_task_queue(unsigned max_threads)
> +{
> +	struct task_queue *aq = xmalloc(sizeof(*aq));
> +
> +#ifndef NO_PTHREADS
> +	int i;
> +	if (!max_threads)
> +		aq->max_threads = online_cpus();
> +	else
> +		aq->max_threads = max_threads;
> +
> +	sem_init(&aq->mutex, 0, 1);
> +	sem_init(&aq->workingcount, 0, 0);
> +	sem_init(&aq->freecount, 0, aq->max_threads);
> +	aq->threads = xmalloc(aq->max_threads * sizeof(pthread_t));
> +
> +	for (i = 0; i < aq->max_threads; i++)
> +		pthread_create(&aq->threads[i], 0, &dispatcher, aq);
> +
> +	aq->first = NULL;
> +	aq->last = NULL;


Shouldn't these be initialized before letting threads call into
dispatcher?  The workingcount semaphore that is initialized to 0 may
prevent them from peeking into these pointers and barfing, but still...

> +
> +	setup_main_thread();
> +#endif
> +	aq->early_return = 0;
> +
> +	return aq;
> +}
--
To unsubscribe from this list: send the line "unsubscribe git" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html



[Index of Archives]     [Linux Kernel Development]     [Gcc Help]     [IETF Annouce]     [DCCP]     [Netdev]     [Networking]     [Security]     [V4L]     [Bugtraq]     [Yosemite]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux RAID]     [Linux SCSI]     [Fedora Users]