Stefan Beller <sbeller@xxxxxxxxxx> writes: > time --> > output: |---A---| |-B-| |----C-----------| |-D-| |-E-| Be nice and distribute the line evenly around "C". Same for thread 2 below. > diff --git a/run-command.c b/run-command.c > index c892e9a..3af97ab 100644 > --- a/run-command.c > +++ b/run-command.c > @@ -3,6 +3,7 @@ > #include "exec_cmd.h" > #include "sigchain.h" > #include "argv-array.h" > +#include "thread-utils.h" > > void child_process_init(struct child_process *child) > { > @@ -862,3 +863,230 @@ int capture_command(struct child_process *cmd, struct strbuf *buf, size_t hint) > close(cmd->out); > return finish_command(cmd); > } > + > +struct parallel_processes { > + int max_number_processes; > + void *data; > + get_next_task fn; > + handle_child_starting_failure fn_err; > + handle_child_return_value fn_exit; The 'fn' feels really misnamed, especially when compared with the other fields. fn_task or something, perhaps. Also I think we call a function type we define with a name that ends with _fn, e.g. typedef void (*show_commit_fn)(struct commit *, void *); void traverse_commit_list(struct rev_info *revs, show_commit_fn show_commit, show_object_fn show_object, void *data) So perhaps get_next_task_fn get_next_task; start_failure_fn start_failure; return_value_fn return_value; or something like that. > + int nr_processes; > + int all_tasks_started; > + int foreground_child; > + char *slots; What does slots[i] mean? Whatever explanation you would use as an answer to that question, I'd name the field after the key words used in the explanation. For example, if it means "children[i] is in use with a process", then the code would be a lot happier if the field is called in_use[] or something. But do not just rename the field yet... > + struct child_process *children; > + struct pollfd *pfd; > + struct strbuf *err; struct pollfd needs to be a contiguous array of nr_processes elements because that is what poll(2) takes, but other per-child fields would be easier to grasp if you did it like so: struct parallel_processes { ... struct { int in_use; struct child_process child; struct strbuf err; ... /* maybe other per-child field later */ } *children; ... } > + struct strbuf finished_children; A strbuf that holds "finished_children"? Does it hold "what finished_children said"? We care about good naming because clearly named variables and fields make the code easier to read. > +static void unblock_fd(int fd) I would probably have called this "set_nonblocking()". > +{ > + int flags = fcntl(fd, F_GETFL); > + if (flags < 0) { > + warning("Could not get file status flags, " > + "output will be degraded"); > + return; > + } > + if (fcntl(fd, F_SETFL, flags | O_NONBLOCK)) { > + warning("Could not set file status flags, " > + "output will be degraded"); The first line of "warning(" is indented one level too deep. > + return; > + } Wouldn't it be easier to follow if you did fn(...) { if (flags < 0) warn(); else if (fcntrl() < 0) warn(); } > +static void run_processes_parallel_start_new(struct parallel_processes *pp) > +{ > + int i; > + /* Start new processes. */ Drop this comment and replace it with a blank line. > + while (!pp->all_tasks_started > + && pp->nr_processes < pp->max_number_processes) { Remove "&& " and add " &&" at the end of the previous line. > + for (i = 0; i < pp->max_number_processes; i++) > + if (!pp->slots[i]) > + break; /* found an empty slot */ The comment does not help us at all. if (...) break; tells as much, and it does not tell us what it means that slot[] being empty at all. for (...) if (!pp->children[i].in_use) break; would be a lot easier to follow without any comment. > + if (i == pp->max_number_processes) > + die("BUG: bookkeeping is hard"); > + > + if (pp->fn(pp->data, &pp->children[i], &pp->err[i])) { This use pattern (and the explanation in run-command.h) suggests that the name of the field should be s/fn/more_task/; or something along that line (and flip the return value, i.e. more_task() returns yes if it did grab another task, no if there is no more task). > + pp->all_tasks_started = 1; > + break; > + } > + if (start_command(&pp->children[i])) > + pp->fn_err(pp->data, &pp->children[i], &pp->err[i]); Can fn_err be NULL here? Shouldn't it (to give a default behaviour to lazy or bog-standard callers)? > + unblock_fd(pp->children[i].err); > + > + pp->nr_processes++; > + pp->slots[i] = 1; > + pp->pfd[i].fd = pp->children[i].err; > + } > +} > + > +static int run_processes_parallel_buffer_stderr(struct parallel_processes *pp) > +{ > + int i; Have a blank line here between decls and the first statement. > + i = poll(pp->pfd, pp->max_number_processes, 100); Give a symbolic constant for this 100ms, e.g. OUTPUT_POLL_INTERVAL or something. > + if (i < 0) { > + if (errno == EINTR) > + /* A signal was caught; try again */ > + return -1; > + else { > + run_processes_parallel_cleanup(pp); > + die_errno("poll"); > + } > + } Shouldn't this be more like while ((i = poll()) < 0) { if (errno == EINTR) continue; cleanup; die; } The caller after all was willing to wait for some time, and we were interrupted before that time came. > + /* Buffer output from all pipes. */ > + for (i = 0; i < pp->max_number_processes; i++) { > + if (!pp->slots[i]) > + continue; > + if (pp->pfd[i].revents & POLLIN) > + strbuf_read_noblock(&pp->err[i], pp->children[i].err, 0); > + if (pp->foreground_child == i) { > + fputs(pp->err[i].buf, stderr); > + strbuf_reset(&pp->err[i]); > + } Even if we own the output channel, we may not have read anything yet---poll() may have said that pfd[i] is not ready, or read_nonblock() may have returned EWOULDBLOCK. Perhaps check not just that i owns the output but err[i].len is not zero? I think output should be done outside the loop and make the comment before the loop match what the loop actually does. /* Buffer output from all pipes. */ for (i = 0; ...) { if (pp->children[i].in_use && (pp->pfd[i].revents & POLLIN)) strbuf_read_nonblock(); } /* Drain the output from the owner of the output channel */ if (pp->children[pp->output_owner].in_use && pp->children[pp->output_owner].err.len) { fputs(...); strbuf_reset(...); } > +static void run_processes_parallel_collect_finished(struct parallel_processes *pp) > +{ > + int i = 0; > + pid_t pid; > + int wait_status, code; > + int n = pp->max_number_processes; > + /* Collect finished child processes. */ Drop this comment and replace it with a blank line. > + while (pp->nr_processes > 0) { > + pid = waitpid(-1, &wait_status, WNOHANG); > + if (pid == 0) > + return; /* no child finished */ Do we need that comment? > + if (pid < 0) { > + if (errno == EINTR) > + return; /* just try again next time */ Can we get EINTR here (we are passing WNOHANG above)? > + if (errno == EINVAL || errno == ECHILD) > + die_errno("wait"); What should happen when we get an error not listed here? 'i' is left as initialized to 0 and we do strbuf_read_nonblock() for the first child (which may not even be running)? You can sweep this bug under the rug by returning here, but I suspect that you would just want if (pid < 0) die_errno(); And that would allow you to dedent the else clause below. > + } else { > + /* Find the finished child. */ > + for (i = 0; i < pp->max_number_processes; i++) > + if (pp->slots[i] && pid == pp->children[i].pid) > + break; Hmm, you are relying on the fact that a valid pid can never be 0, so you can just use pp->children[i].child.pid to see if a "slot" is occupied without even using pp->slots[] (or pp->children[i].in_use). > + if (i == pp->max_number_processes) > + /* > + * waitpid returned another process id > + * which we are not waiting for. > + */ > + return; > + } > + strbuf_read_noblock(&pp->err[i], pp->children[i].err, 0); This is to read leftover output? As discussed elsewhere, read_nonblock() will have to have "read some, not necessarily to the end" semantics to serve the caller in run_processes_parallel_buffer_stderr(), so you'd need a loop around it here to read until you see EOF. Or you may be able to just call strbuf_read() and the function may do the right thing to read things through to the EOF. It depends on how you redo the patch [2/10]. > + if (determine_return_value(wait_status, &code, &errno, > + pp->children[i].argv[0]) < 0) > + error("waitpid is confused (%s)", > + pp->children[i].argv[0]); > + > + pp->fn_exit(pp->data, &pp->children[i], code); You are clobbering errno by calling determine_return_value() but you do not use the returned value anywhere. Intended? Or should that be given to fn_exit() for error reporting? > + argv_array_clear(&pp->children[i].args); > + argv_array_clear(&pp->children[i].env_array); > + > + pp->nr_processes--; > + pp->slots[i] = 0; > + pp->pfd[i].fd = -1; Mental note: here the "slot" is cleared for the child 'i'. > + if (i != pp->foreground_child) { > + strbuf_addbuf(&pp->finished_children, &pp->err[i]); > + strbuf_reset(&pp->err[i]); OK, so the idea is that pp->child[i].err holds the entire output for any process that does not own the output channel until it dies, and they are appended to pp->finished_children. That suggests that the name of the "finished" field should have "output" somewhere in it. > + } else { Mental note: this side of if/else is what happens to the process that used to own the output channel. > + fputs(pp->err[i].buf, stderr); > + strbuf_reset(&pp->err[i]); ... and it just flushes the final part of the output. > + /* Output all other finished child processes */ > + fputs(pp->finished_children.buf, stderr); > + strbuf_reset(&pp->finished_children); If there is any, that is. > + /* > + * Pick next process to output live. > + * NEEDSWORK: > + * For now we pick it randomly by doing a round > + * robin. Later we may want to pick the one with > + * the most output or the longest or shortest > + * running process time. > + */ > + for (i = 0; i < n; i++) > + if (pp->slots[(pp->foreground_child + i) % n]) > + break; > + pp->foreground_child = (pp->foreground_child + i) % n; ... and then picks a new owner of the output channel. Up to this point it looks sensible. > + fputs(pp->err[pp->foreground_child].buf, stderr); > + strbuf_reset(&pp->err[pp->foreground_child]); I do not think these two lines need to be here, especially if you follow the above advice of separating buffering and draining. > +int run_processes_parallel(int n, void *data, > + get_next_task fn, > + handle_child_starting_failure fn_err, > + handle_child_return_value fn_exit) > +{ > + struct parallel_processes pp; > + run_processes_parallel_init(&pp, n, data, fn, fn_err, fn_exit); > + > + while (!pp.all_tasks_started || pp.nr_processes > 0) { The former is true as long as more_task() says there may be more. The latter is true as long as we have something already running. In either case, we should keep collecting and spawning as needed. > + run_processes_parallel_start_new(&pp); But calling start_new() unconditionally feels sloppy. It should at least be something like if (pp.nr_processes < pp.max_processes && !pp.all_task_started) start_new_process() no? > diff --git a/test-run-command.c b/test-run-command.c > index 89c7de2..70b6c7a 100644 > --- a/test-run-command.c > +++ b/test-run-command.c > @@ -30,6 +50,10 @@ int main(int argc, char **argv) > if (!strcmp(argv[1], "run-command")) > exit(run_command(&proc)); > > + if (!strcmp(argv[1], "run-command-parallel-4")) > + exit(run_processes_parallel(4, &proc, parallel_next, > + NULL, NULL)); So not only fn_err, but fn_exit is optional. You'd need to update the code above to allow these. > + > fprintf(stderr, "check usage\n"); > return 1; > } It was a fun read. There were tons of niggles, but I didn't see anything fundamentally unsalvageable. Thanks. -- To unsubscribe from this list: send the line "unsubscribe git" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html