Re: [PATCH 03/10] run-command: add an asynchronous parallel child processor

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Stefan Beller <sbeller@xxxxxxxxxx> writes:

>  time -->
>  output: |---A---|   |-B-|   |----C-----------|   |-D-|   |-E-|

Be nice and distribute the line evenly around "C".  Same for thread
2 below.

> diff --git a/run-command.c b/run-command.c
> index c892e9a..3af97ab 100644
> --- a/run-command.c
> +++ b/run-command.c
> @@ -3,6 +3,7 @@
>  #include "exec_cmd.h"
>  #include "sigchain.h"
>  #include "argv-array.h"
> +#include "thread-utils.h"
>  
>  void child_process_init(struct child_process *child)
>  {
> @@ -862,3 +863,230 @@ int capture_command(struct child_process *cmd, struct strbuf *buf, size_t hint)
>  	close(cmd->out);
>  	return finish_command(cmd);
>  }
> +
> +struct parallel_processes {
> +	int max_number_processes;
> +	void *data;

> +	get_next_task fn;
> +	handle_child_starting_failure fn_err;
> +	handle_child_return_value fn_exit;

The 'fn' feels really misnamed, especially when compared with the
other fields.  fn_task or something, perhaps.

Also I think we call a function type we define with a name that ends
with _fn, e.g.

    typedef void (*show_commit_fn)(struct commit *, void *);
    void traverse_commit_list(struct rev_info *revs,
                              show_commit_fn show_commit,
                              show_object_fn show_object,
                              void *data)

So perhaps

	get_next_task_fn get_next_task;
        start_failure_fn start_failure;
        return_value_fn return_value;

or something like that.

> +	int nr_processes;
> +	int all_tasks_started;
> +	int foreground_child;
> +	char *slots;

What does slots[i] mean?  Whatever explanation you would use as an
answer to that question, I'd name the field after the key words used
in the explanation.  For example, if it means "children[i] is in use
with a process", then the code would be a lot happier if the field
is called in_use[] or something.

But do not just rename the field yet...

> +	struct child_process *children;
> +	struct pollfd *pfd;
> +	struct strbuf *err;

struct pollfd needs to be a contiguous array of nr_processes
elements because that is what poll(2) takes, but other per-child
fields would be easier to grasp if you did it like so:

	struct parallel_processes {
        	...
                struct {
                	int in_use;
                        struct child_process child;
                        struct strbuf err;
                        ... /* maybe other per-child field later */
		} *children;
		...
	}

> +	struct strbuf finished_children;

A strbuf that holds "finished_children"?  Does it hold "what
finished_children said"?

We care about good naming because clearly named variables and fields
make the code easier to read.

> +static void unblock_fd(int fd)

I would probably have called this "set_nonblocking()".

> +{
> +	int flags = fcntl(fd, F_GETFL);
> +	if (flags < 0) {
> +		warning("Could not get file status flags, "
> +			"output will be degraded");
> +		return;
> +	}
> +	if (fcntl(fd, F_SETFL, flags | O_NONBLOCK)) {
> +			warning("Could not set file status flags, "
> +			"output will be degraded");

The first line of "warning(" is indented one level too deep.

> +		return;
> +	}

Wouldn't it be easier to follow if you did

	fn(...)
	{
                if (flags < 0)
                        warn();
                else if (fcntrl() < 0)
                        warn();
	}

> +static void run_processes_parallel_start_new(struct parallel_processes *pp)
> +{
> +	int i;
> +	/* Start new processes. */

Drop this comment and replace it with a blank line.

> +	while (!pp->all_tasks_started
> +	       && pp->nr_processes < pp->max_number_processes) {

Remove "&& " and add " &&" at the end of the previous line.

> +		for (i = 0; i < pp->max_number_processes; i++)
> +			if (!pp->slots[i])
> +				break; /* found an empty slot */

The comment does not help us at all.  if (...) break; tells as much,
and it does not tell us what it means that slot[] being empty at all.

		for (...)
			if (!pp->children[i].in_use)
				break;

would be a lot easier to follow without any comment.

> +		if (i == pp->max_number_processes)
> +			die("BUG: bookkeeping is hard");
> +
> +		if (pp->fn(pp->data, &pp->children[i], &pp->err[i])) {

This use pattern (and the explanation in run-command.h) suggests
that the name of the field should be s/fn/more_task/; or something
along that line (and flip the return value, i.e. more_task() returns
yes if it did grab another task, no if there is no more task).

> +			pp->all_tasks_started = 1;
> +			break;
> +		}
> +		if (start_command(&pp->children[i]))
> +			pp->fn_err(pp->data, &pp->children[i], &pp->err[i]);

Can fn_err be NULL here?  Shouldn't it (to give a default behaviour
to lazy or bog-standard callers)?

> +		unblock_fd(pp->children[i].err);
> +
> +		pp->nr_processes++;
> +		pp->slots[i] = 1;
> +		pp->pfd[i].fd = pp->children[i].err;
> +	}
> +}
> +
> +static int run_processes_parallel_buffer_stderr(struct parallel_processes *pp)
> +{
> +	int i;

Have a blank line here between decls and the first statement.

> +	i = poll(pp->pfd, pp->max_number_processes, 100);

Give a symbolic constant for this 100ms, e.g. OUTPUT_POLL_INTERVAL
or something.

> +	if (i < 0) {
> +		if (errno == EINTR)
> +			/* A signal was caught; try again */
> +			return -1;
> +		else {
> +			run_processes_parallel_cleanup(pp);
> +			die_errno("poll");
> +		}
> +	}

Shouldn't this be more like

	while ((i = poll()) < 0)  {
        	if (errno == EINTR)
                	continue;
		cleanup;
		die;
	}

The caller after all was willing to wait for some time, and we were
interrupted before that time came.

> +	/* Buffer output from all pipes. */
> +	for (i = 0; i < pp->max_number_processes; i++) {
> +		if (!pp->slots[i])
> +			continue;
> +		if (pp->pfd[i].revents & POLLIN)
> +			strbuf_read_noblock(&pp->err[i], pp->children[i].err, 0);
> +		if (pp->foreground_child == i) {
> +			fputs(pp->err[i].buf, stderr);
> +			strbuf_reset(&pp->err[i]);
> +		}

Even if we own the output channel, we may not have read anything
yet---poll() may have said that pfd[i] is not ready, or
read_nonblock() may have returned EWOULDBLOCK.  Perhaps check not
just that i owns the output but err[i].len is not zero?

I think output should be done outside the loop and make the comment
before the loop match what the loop actually does.

	/* Buffer output from all pipes. */
	for (i = 0; ...) {
		if (pp->children[i].in_use && (pp->pfd[i].revents & POLLIN))
			strbuf_read_nonblock();
	}

	/* Drain the output from the owner of the output channel */
	if (pp->children[pp->output_owner].in_use &&
	    pp->children[pp->output_owner].err.len) {
	    	fputs(...);
                strbuf_reset(...);
	}

> +static void run_processes_parallel_collect_finished(struct parallel_processes *pp)
> +{
> +	int i = 0;
> +	pid_t pid;
> +	int wait_status, code;
> +	int n = pp->max_number_processes;
> +	/* Collect finished child processes. */

Drop this comment and replace it with a blank line.

> +	while (pp->nr_processes > 0) {
> +		pid = waitpid(-1, &wait_status, WNOHANG);
> +		if (pid == 0)
> +			return; /* no child finished */

Do we need that comment?

> +		if (pid < 0) {
> +			if (errno == EINTR)
> +				return; /* just try again  next time */

Can we get EINTR here (we are passing WNOHANG above)?

> +			if (errno == EINVAL || errno == ECHILD)
> +				die_errno("wait");

What should happen when we get an error not listed here?  'i' is
left as initialized to 0 and we do strbuf_read_nonblock() for the
first child (which may not even be running)?

You can sweep this bug under the rug by returning here, but I
suspect that you would just want

	if (pid < 0)
		die_errno();

And that would allow you to dedent the else clause below.

> +		} else {
> +			/* Find the finished child. */
> +			for (i = 0; i < pp->max_number_processes; i++)
> +				if (pp->slots[i] && pid == pp->children[i].pid)
> +					break;

Hmm, you are relying on the fact that a valid pid can never be 0, so
you can just use pp->children[i].child.pid to see if a "slot" is
occupied without even using pp->slots[] (or pp->children[i].in_use).

> +			if (i == pp->max_number_processes)
> +				/*
> +				 * waitpid returned another process id
> +				 * which we are not waiting for.
> +				 */
> +				return;
> +		}
> +		strbuf_read_noblock(&pp->err[i], pp->children[i].err, 0);

This is to read leftover output?

As discussed elsewhere, read_nonblock() will have to have "read
some, not necessarily to the end" semantics to serve the caller in
run_processes_parallel_buffer_stderr(), so you'd need a loop around
it here to read until you see EOF.

Or you may be able to just call strbuf_read() and the function may
do the right thing to read things through to the EOF.  It depends on
how you redo the patch [2/10].

> +		if (determine_return_value(wait_status, &code, &errno,
> +					   pp->children[i].argv[0]) < 0)
> +			error("waitpid is confused (%s)",
> +			      pp->children[i].argv[0]);
> +
> +		pp->fn_exit(pp->data, &pp->children[i], code);

You are clobbering errno by calling determine_return_value() but you
do not use the returned value anywhere.  Intended?  Or should that
be given to fn_exit() for error reporting?

> +		argv_array_clear(&pp->children[i].args);
> +		argv_array_clear(&pp->children[i].env_array);
> +
> +		pp->nr_processes--;
> +		pp->slots[i] = 0;
> +		pp->pfd[i].fd = -1;

Mental note: here the "slot" is cleared for the child 'i'.

> +		if (i != pp->foreground_child) {
> +			strbuf_addbuf(&pp->finished_children, &pp->err[i]);
> +			strbuf_reset(&pp->err[i]);

OK, so the idea is that pp->child[i].err holds the entire output for
any process that does not own the output channel until it dies, and
they are appended to pp->finished_children.  That suggests that the
name of the "finished" field should have "output" somewhere in it.

> +		} else {

Mental note: this side of if/else is what happens to the process
that used to own the output channel.

> +			fputs(pp->err[i].buf, stderr);
> +			strbuf_reset(&pp->err[i]);

... and it just flushes the final part of the output.

> +			/* Output all other finished child processes */
> +			fputs(pp->finished_children.buf, stderr);
> +			strbuf_reset(&pp->finished_children);

If there is any, that is.

> +			/*
> +			 * Pick next process to output live.
> +			 * NEEDSWORK:
> +			 * For now we pick it randomly by doing a round
> +			 * robin. Later we may want to pick the one with
> +			 * the most output or the longest or shortest
> +			 * running process time.
> +			 */
> +			for (i = 0; i < n; i++)
> +				if (pp->slots[(pp->foreground_child + i) % n])
> +					break;
> +			pp->foreground_child = (pp->foreground_child + i) % n;

... and then picks a new owner of the output channel.

Up to this point it looks sensible.

> +			fputs(pp->err[pp->foreground_child].buf, stderr);
> +			strbuf_reset(&pp->err[pp->foreground_child]);

I do not think these two lines need to be here, especially if you
follow the above advice of separating buffering and draining.

> +int run_processes_parallel(int n, void *data,
> +			   get_next_task fn,
> +			   handle_child_starting_failure fn_err,
> +			   handle_child_return_value fn_exit)
> +{
> +	struct parallel_processes pp;
> +	run_processes_parallel_init(&pp, n, data, fn, fn_err, fn_exit);
> +
> +	while (!pp.all_tasks_started || pp.nr_processes > 0) {

The former is true as long as more_task() says there may be more.
The latter is true as long as we have something already running.

In either case, we should keep collecting and spawning as needed.

> +		run_processes_parallel_start_new(&pp);

But calling start_new() unconditionally feels sloppy.  It should at
least be something like

	if (pp.nr_processes < pp.max_processes &&
	    !pp.all_task_started)
		start_new_process()

no?

> diff --git a/test-run-command.c b/test-run-command.c
> index 89c7de2..70b6c7a 100644
> --- a/test-run-command.c
> +++ b/test-run-command.c
> @@ -30,6 +50,10 @@ int main(int argc, char **argv)
>  	if (!strcmp(argv[1], "run-command"))
>  		exit(run_command(&proc));
>  
> +	if (!strcmp(argv[1], "run-command-parallel-4"))
> +		exit(run_processes_parallel(4, &proc, parallel_next,
> +					 NULL, NULL));

So not only fn_err, but fn_exit is optional.  You'd need to update
the code above to allow these.

> +
>  	fprintf(stderr, "check usage\n");
>  	return 1;
>  }

It was a fun read.  There were tons of niggles, but I didn't see
anything fundamentally unsalvageable.

Thanks.
--
To unsubscribe from this list: send the line "unsubscribe git" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html



[Index of Archives]     [Linux Kernel Development]     [Gcc Help]     [IETF Annouce]     [DCCP]     [Netdev]     [Networking]     [Security]     [V4L]     [Bugtraq]     [Yosemite]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux RAID]     [Linux SCSI]     [Fedora Users]