Today there was lots of discussion on the correct way of reading the strbufs as well as some discussion on the structure of the asynchronous parallel process loop. Patches 1-8 bring parallel fetching of submodules, and have had some good exposure to review and feedback is incorporated. Patches 9-14 bring parallel submodule updates. Patch 14 is not ready yet (i.e. test suite failures), but the cleanups before in patch 9-13 can be reviewed without wasting time. Any feedback welcome, Thanks, Stefan Diff to v3 below. The patches can also be found at [1] [1] https://github.com/stefanbeller/git/tree/submodulec_nonthreaded_parallel_4 Jonathan Nieder (1): submodule: Send "Fetching submodule <foo>" to standard error Stefan Beller (13): xread: poll on non blocking fds xread_nonblock: add functionality to read from fds without blocking strbuf: add strbuf_read_once to read without blocking run-command: factor out return value computation run-command: add an asynchronous parallel child processor fetch_populated_submodules: use new parallel job processing submodules: allow parallel fetching, add tests and documentation submodule-config: Untangle logic in parse_config submodule config: keep update strategy around git submodule update: cmd_update_recursive git submodule update: cmd_update_clone git submodule update: cmd_update_fetch Rewrite submodule update in C Documentation/fetch-options.txt | 7 + builtin/fetch.c | 6 +- builtin/pull.c | 6 + builtin/submodule--helper.c | 251 +++++++++++++++++++++++++++++ git-compat-util.h | 1 + git-submodule.sh | 339 ++++++++++++++-------------------------- run-command.c | 320 ++++++++++++++++++++++++++++++++++--- run-command.h | 36 +++++ strbuf.c | 11 ++ strbuf.h | 9 ++ submodule-config.c | 85 +++++----- submodule-config.h | 1 + submodule.c | 120 +++++++++----- submodule.h | 2 +- t/t0061-run-command.sh | 20 +++ t/t5526-fetch-submodules.sh | 70 ++++++--- test-run-command.c | 24 +++ wrapper.c | 35 ++++- 18 files changed, 987 insertions(+), 356 deletions(-) diff --git a/builtin/submodule--helper.c b/builtin/submodule--helper.c index baa7563..b79117a 100644 --- a/builtin/submodule--helper.c +++ b/builtin/submodule--helper.c @@ -382,10 +382,14 @@ static int update_next_task(void *data, argv_array_pushf(&cp->env_array, "sm_path=%s", sub->path); argv_array_pushf(&cp->env_array, "name=%s", sub->name); argv_array_pushf(&cp->env_array, "url=%s", sub->url); + argv_array_pushf(&cp->env_array, "sha1=%s", sha1_to_hex(ce->sha1)); argv_array_pushf(&cp->env_array, "update_module=%s", update_module); cp->git_cmd = 1; + cp->no_stdin = 1; cp->stdout_to_stderr = 1; + cp->err = -1; + argv_array_init(&cp->args); argv_array_push(&cp->args, "submodule"); if (!file_exists(sm_gitdir)) argv_array_push(&cp->args, "update_clone"); diff --git a/run-command.c b/run-command.c index 06d5a5d..494e1f8 100644 --- a/run-command.c +++ b/run-command.c @@ -276,8 +276,10 @@ static int wait_or_whine(pid_t pid, const char *argv0) failed_errno = errno; error("waitpid for %s failed: %s", argv0, strerror(errno)); } else { - if (waiting != pid - || (determine_return_value(status, &code, &failed_errno, argv0) < 0)) + if (waiting != pid || (determine_return_value(status, + &code, + &failed_errno, + argv0) < 0)) error("waitpid is confused (%s)", argv0); } @@ -870,7 +872,6 @@ struct parallel_processes { int max_processes; int nr_processes; - unsigned all_tasks_started : 1; get_next_task_fn get_next_task; start_failure_fn start_failure; @@ -899,9 +900,9 @@ void default_start_failure(void *data, struct strbuf sb = STRBUF_INIT; for (i = 0; cp->argv[i]; i++) - strbuf_addf(&sb, "%s ", cp->argv[i]); + strbuf_addf(&sb, " %s", cp->argv[i]); - die_errno("Starting a child failed:\n%s", sb.buf); + die_errno("Starting a child failed:%s", sb.buf); } void default_return_value(void *data, @@ -915,12 +916,12 @@ void default_return_value(void *data, return; for (i = 0; cp->argv[i]; i++) - strbuf_addf(&sb, "%s ", cp->argv[i]); + strbuf_addf(&sb, " %s", cp->argv[i]); - die_errno("A child failed with return code:\n%s\n%d", sb.buf, result); + die_errno("A child failed with return code %d:%s", result, sb.buf); } -static void run_processes_parallel_init(struct parallel_processes *pp, +static void pp_init(struct parallel_processes *pp, int n, void *data, get_next_task_fn get_next_task, start_failure_fn start_failure, @@ -941,7 +942,6 @@ static void run_processes_parallel_init(struct parallel_processes *pp, pp->return_value = return_value ? return_value : default_return_value; pp->nr_processes = 0; - pp->all_tasks_started = 0; pp->output_owner = 0; pp->children = xcalloc(n, sizeof(*pp->children)); pp->pfd = xcalloc(n, sizeof(*pp->pfd)); @@ -954,9 +954,10 @@ static void run_processes_parallel_init(struct parallel_processes *pp, } } -static void run_processes_parallel_cleanup(struct parallel_processes *pp) +static void pp_cleanup(struct parallel_processes *pp) { int i; + for (i = 0; i < pp->max_processes; i++) strbuf_release(&pp->children[i].err); @@ -976,7 +977,8 @@ static void set_nonblocking(int fd) "output will be degraded"); } -static void run_processes_parallel_start_one(struct parallel_processes *pp) +/* returns 1 if a process was started, 0 otherwise */ +static int pp_start_one(struct parallel_processes *pp) { int i; @@ -988,10 +990,9 @@ static void run_processes_parallel_start_one(struct parallel_processes *pp) if (!pp->get_next_task(pp->data, &pp->children[i].process, - &pp->children[i].err)) { - pp->all_tasks_started = 1; - return; - } + &pp->children[i].err)) + return 1; + if (start_command(&pp->children[i].process)) pp->start_failure(pp->data, &pp->children[i].process, @@ -1002,23 +1003,17 @@ static void run_processes_parallel_start_one(struct parallel_processes *pp) pp->nr_processes++; pp->children[i].in_use = 1; pp->pfd[i].fd = pp->children[i].process.err; + return 0; } -static void run_processes_parallel_start_as_needed(struct parallel_processes *pp) -{ - while (pp->nr_processes < pp->max_processes && - !pp->all_tasks_started) - run_processes_parallel_start_one(pp); -} - -static void run_processes_parallel_buffer_stderr(struct parallel_processes *pp) +static void pp_buffer_stderr(struct parallel_processes *pp) { int i; while ((i = poll(pp->pfd, pp->max_processes, 100)) < 0) { if (errno == EINTR) continue; - run_processes_parallel_cleanup(pp); + pp_cleanup(pp); die_errno("poll"); } @@ -1033,7 +1028,7 @@ static void run_processes_parallel_buffer_stderr(struct parallel_processes *pp) } } -static void run_processes_parallel_output(struct parallel_processes *pp) +static void pp_output(struct parallel_processes *pp) { int i = pp->output_owner; if (pp->children[i].in_use && @@ -1043,7 +1038,7 @@ static void run_processes_parallel_output(struct parallel_processes *pp) } } -static void run_processes_parallel_collect_finished(struct parallel_processes *pp) +static void pp_collect_finished(struct parallel_processes *pp) { int i = 0; pid_t pid; @@ -1063,17 +1058,11 @@ static void run_processes_parallel_collect_finished(struct parallel_processes *p pid == pp->children[i].process.pid) break; if (i == pp->max_processes) - /* - * waitpid returned another process id - * which we are not waiting for. - */ - return; - - if (strbuf_read_once(&pp->children[i].err, - pp->children[i].process.err, 0) < 0 && - errno != EAGAIN) - die_errno("strbuf_read_once"); + die("BUG: found a child process we were not aware of"); + if (strbuf_read(&pp->children[i].err, + pp->children[i].process.err, 0) < 0) + die_errno("strbuf_read"); if (determine_return_value(wait_status, &code, &errno, pp->children[i].process.argv[0]) < 0) @@ -1122,18 +1111,20 @@ int run_processes_parallel(int n, void *data, return_value_fn return_value) { struct parallel_processes pp; - run_processes_parallel_init(&pp, n, data, - get_next_task, - start_failure, - return_value); - - while (!pp.all_tasks_started || pp.nr_processes > 0) { - run_processes_parallel_start_as_needed(&pp); - run_processes_parallel_buffer_stderr(&pp); - run_processes_parallel_output(&pp); - run_processes_parallel_collect_finished(&pp); + pp_init(&pp, n, data, get_next_task, start_failure, return_value); + + while (1) { + while (pp.nr_processes < pp.max_processes && + !pp_start_one(&pp)) + ; /* nothing */ + if (!pp.nr_processes) + break; + pp_buffer_stderr(&pp); + pp_output(&pp); + pp_collect_finished(&pp); } - run_processes_parallel_cleanup(&pp); + + pp_cleanup(&pp); return 0; } diff --git a/run-command.h b/run-command.h index 0c1b363..3807fd1 100644 --- a/run-command.h +++ b/run-command.h @@ -155,6 +155,4 @@ int run_processes_parallel(int n, void *data, start_failure_fn, return_value_fn); -void run_processes_parallel_schedule_error(struct strbuf *err); - #endif diff --git a/strbuf.h b/strbuf.h index 4d4e5b1..ea69665 100644 --- a/strbuf.h +++ b/strbuf.h @@ -367,8 +367,11 @@ extern size_t strbuf_fread(struct strbuf *, size_t, FILE *); extern ssize_t strbuf_read(struct strbuf *, int fd, size_t hint); /** - * Same as strbuf_read, just returns non-blockingly by ignoring EAGAIN. - * The fd must have set O_NONBLOCK. + * Read from a file descriptor that is marked as O_NONBLOCK without + * blocking. Returns the number of new bytes appended to the sb. + * Negative return value signals there was an error returned from + * underlying read(2), in which case the caller should check errno. + * e.g. errno == EAGAIN when the read may have blocked. */ extern ssize_t strbuf_read_once(struct strbuf *, int fd, size_t hint); diff --git a/submodule-config.c b/submodule-config.c index 0298a60..8b8c7d1 100644 --- a/submodule-config.c +++ b/submodule-config.c @@ -258,93 +258,72 @@ static int parse_config(const char *var, const char *value, void *data) if (!name_and_item_from_var(var, &name, &item)) return 0; - submodule = lookup_or_create_by_name(me->cache, me->gitmodules_sha1, - name.buf); + submodule = lookup_or_create_by_name(me->cache, + me->gitmodules_sha1, + name.buf); if (!strcmp(item.buf, "path")) { - struct strbuf path = STRBUF_INIT; - if (!value) { + if (!value) ret = config_error_nonbool(var); - goto release_return; - } - if (!me->overwrite && submodule->path != NULL) { + else if (!me->overwrite && submodule->path != NULL) warn_multiple_config(me->commit_sha1, submodule->name, "path"); - goto release_return; + else { + if (submodule->path) + cache_remove_path(me->cache, submodule); + free((void *) submodule->path); + submodule->path = xstrdup(value); + cache_put_path(me->cache, submodule); } - - if (submodule->path) - cache_remove_path(me->cache, submodule); - free((void *) submodule->path); - strbuf_addstr(&path, value); - submodule->path = strbuf_detach(&path, NULL); - cache_put_path(me->cache, submodule); } else if (!strcmp(item.buf, "fetchrecursesubmodules")) { /* when parsing worktree configurations we can die early */ int die_on_error = is_null_sha1(me->gitmodules_sha1); if (!me->overwrite && - submodule->fetch_recurse != RECURSE_SUBMODULES_NONE) { + submodule->fetch_recurse != RECURSE_SUBMODULES_NONE) warn_multiple_config(me->commit_sha1, submodule->name, "fetchrecursesubmodules"); - goto release_return; - } - - submodule->fetch_recurse = parse_fetch_recurse(var, value, + else + submodule->fetch_recurse = parse_fetch_recurse( + var, value, die_on_error); } else if (!strcmp(item.buf, "ignore")) { - struct strbuf ignore = STRBUF_INIT; - if (!me->overwrite && submodule->ignore != NULL) { + if (!value) + ret = config_error_nonbool(var); + else if (!me->overwrite && submodule->ignore != NULL) warn_multiple_config(me->commit_sha1, submodule->name, "ignore"); - goto release_return; - } - if (!value) { - ret = config_error_nonbool(var); - goto release_return; - } - if (strcmp(value, "untracked") && strcmp(value, "dirty") && - strcmp(value, "all") && strcmp(value, "none")) { + else if (strcmp(value, "untracked") && + strcmp(value, "dirty") && + strcmp(value, "all") && + strcmp(value, "none")) warning("Invalid parameter '%s' for config option " "'submodule.%s.ignore'", value, var); - goto release_return; + else { + free((void *) submodule->ignore); + submodule->ignore = xstrdup(value); } - - free((void *) submodule->ignore); - strbuf_addstr(&ignore, value); - submodule->ignore = strbuf_detach(&ignore, NULL); } else if (!strcmp(item.buf, "url")) { - struct strbuf url = STRBUF_INIT; if (!value) { ret = config_error_nonbool(var); - goto release_return; - } - if (!me->overwrite && submodule->url != NULL) { + } else if (!me->overwrite && submodule->url != NULL) { warn_multiple_config(me->commit_sha1, submodule->name, "url"); - goto release_return; + } else { + free((void *) submodule->url); + submodule->url = xstrdup(value); } - - free((void *) submodule->url); - strbuf_addstr(&url, value); - submodule->url = strbuf_detach(&url, NULL); } else if (!strcmp(item.buf, "update")) { - struct strbuf update = STRBUF_INIT; - if (!value) { + if (!value) ret = config_error_nonbool(var); - goto release_return; - } - if (!me->overwrite && submodule->update != NULL) { + else if (!me->overwrite && submodule->update != NULL) warn_multiple_config(me->commit_sha1, submodule->name, - "update"); - goto release_return; + "update"); + else { + free((void *)submodule->update); + submodule->update = xstrdup(value); } - - free((void *) submodule->update); - strbuf_addstr(&update, value); - submodule->update = strbuf_detach(&update, NULL); } -release_return: strbuf_release(&name); strbuf_release(&item); diff --git a/submodule.c b/submodule.c index d15364f..fdaf3e4 100644 --- a/submodule.c +++ b/submodule.c @@ -650,10 +650,12 @@ int fetch_populated_submodules(const struct argv_array *options, { int i; struct submodule_parallel_fetch spf = SPF_INIT; + spf.work_tree = get_git_work_tree(); spf.command_line_option = command_line_option; spf.quiet = quiet; spf.prefix = prefix; + if (!spf.work_tree) goto out; @@ -738,12 +740,11 @@ int get_next_submodule(void *data, struct child_process *cp, if (is_directory(git_dir)) { child_process_init(cp); cp->dir = strbuf_detach(&submodule_path, NULL); + cp->env = local_repo_env; cp->git_cmd = 1; - cp->no_stdout = 1; cp->no_stdin = 1; cp->stdout_to_stderr = 1; cp->err = -1; - cp->env = local_repo_env; if (!spf->quiet) strbuf_addf(err, "Fetching submodule %s%s\n", spf->prefix, ce->name); diff --git a/wrapper.c b/wrapper.c index 54ce231..41a21e1 100644 --- a/wrapper.c +++ b/wrapper.c @@ -206,16 +206,10 @@ ssize_t xread(int fd, void *buf, size_t len) continue; if (errno == EAGAIN || errno == EWOULDBLOCK) { struct pollfd pfd; - int i; pfd.events = POLLIN; pfd.fd = fd; - i = poll(&pfd, 1, 100); - if (i < 0) { - if (errno == EINTR || errno == ENOMEM) - continue; - else - die_errno("poll"); - } + /* We deliberately ignore the return value */ + poll(&pfd, 1, -1); } } return nr; @@ -225,13 +219,13 @@ ssize_t xread(int fd, void *buf, size_t len) /* * xread_nonblock() is the same a read(), but it automatically restarts read() * interrupted operations (EINTR). xread_nonblock() DOES NOT GUARANTEE that - * "len" bytes is read even if the data is available. + * "len" bytes is read. EWOULDBLOCK is turned into EAGAIN. */ ssize_t xread_nonblock(int fd, void *buf, size_t len) { ssize_t nr; if (len > MAX_IO_SIZE) - len = MAX_IO_SIZE; + len = MAX_IO_SIZE; while (1) { nr = read(fd, buf, len); if (nr < 0) { -- 2.5.0.272.ga84127c.dirty -- To unsubscribe from this list: send the line "unsubscribe git" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html