[PATCHv4 00/14] fetch submodules in parallel and a preview on parallel "submodule update"

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Today there was lots of discussion on the correct way of reading the
strbufs as well as some discussion on the structure of the
asynchronous parallel process loop.

Patches 1-8 bring parallel fetching of submodules, and have had some good exposure
to review and feedback is incorporated.

Patches 9-14 bring parallel submodule updates.
Patch 14 is not ready yet (i.e. test suite failures), but the cleanups before
in patch 9-13 can be reviewed without wasting time.

Any feedback welcome,
Thanks,
Stefan

Diff to v3 below. The patches can also be found at [1]
[1] https://github.com/stefanbeller/git/tree/submodulec_nonthreaded_parallel_4

Jonathan Nieder (1):
  submodule: Send "Fetching submodule <foo>" to standard error

Stefan Beller (13):
  xread: poll on non blocking fds
  xread_nonblock: add functionality to read from fds without blocking
  strbuf: add strbuf_read_once to read without blocking
  run-command: factor out return value computation
  run-command: add an asynchronous parallel child processor
  fetch_populated_submodules: use new parallel job processing
  submodules: allow parallel fetching, add tests and documentation
  submodule-config: Untangle logic in parse_config
  submodule config: keep update strategy around
  git submodule update: cmd_update_recursive
  git submodule update: cmd_update_clone
  git submodule update: cmd_update_fetch
  Rewrite submodule update in C

 Documentation/fetch-options.txt |   7 +
 builtin/fetch.c                 |   6 +-
 builtin/pull.c                  |   6 +
 builtin/submodule--helper.c     | 251 +++++++++++++++++++++++++++++
 git-compat-util.h               |   1 +
 git-submodule.sh                | 339 ++++++++++++++--------------------------
 run-command.c                   | 320 ++++++++++++++++++++++++++++++++++---
 run-command.h                   |  36 +++++
 strbuf.c                        |  11 ++
 strbuf.h                        |   9 ++
 submodule-config.c              |  85 +++++-----
 submodule-config.h              |   1 +
 submodule.c                     | 120 +++++++++-----
 submodule.h                     |   2 +-
 t/t0061-run-command.sh          |  20 +++
 t/t5526-fetch-submodules.sh     |  70 ++++++---
 test-run-command.c              |  24 +++
 wrapper.c                       |  35 ++++-
 18 files changed, 987 insertions(+), 356 deletions(-)

diff --git a/builtin/submodule--helper.c b/builtin/submodule--helper.c
index baa7563..b79117a 100644
--- a/builtin/submodule--helper.c
+++ b/builtin/submodule--helper.c
@@ -382,10 +382,14 @@ static int update_next_task(void *data,
 		argv_array_pushf(&cp->env_array, "sm_path=%s", sub->path);
 		argv_array_pushf(&cp->env_array, "name=%s", sub->name);
 		argv_array_pushf(&cp->env_array, "url=%s", sub->url);
+		argv_array_pushf(&cp->env_array, "sha1=%s", sha1_to_hex(ce->sha1));
 		argv_array_pushf(&cp->env_array, "update_module=%s", update_module);
 
 		cp->git_cmd = 1;
+		cp->no_stdin = 1;
 		cp->stdout_to_stderr = 1;
+		cp->err = -1;
+		argv_array_init(&cp->args);
 		argv_array_push(&cp->args, "submodule");
 		if (!file_exists(sm_gitdir))
 			argv_array_push(&cp->args, "update_clone");
diff --git a/run-command.c b/run-command.c
index 06d5a5d..494e1f8 100644
--- a/run-command.c
+++ b/run-command.c
@@ -276,8 +276,10 @@ static int wait_or_whine(pid_t pid, const char *argv0)
 		failed_errno = errno;
 		error("waitpid for %s failed: %s", argv0, strerror(errno));
 	} else {
-		if (waiting != pid
-		   || (determine_return_value(status, &code, &failed_errno, argv0) < 0))
+		if (waiting != pid || (determine_return_value(status,
+							      &code,
+							      &failed_errno,
+							      argv0) < 0))
 			error("waitpid is confused (%s)", argv0);
 	}
 
@@ -870,7 +872,6 @@ struct parallel_processes {
 
 	int max_processes;
 	int nr_processes;
-	unsigned all_tasks_started : 1;
 
 	get_next_task_fn get_next_task;
 	start_failure_fn start_failure;
@@ -899,9 +900,9 @@ void default_start_failure(void *data,
 	struct strbuf sb = STRBUF_INIT;
 
 	for (i = 0; cp->argv[i]; i++)
-		strbuf_addf(&sb, "%s ", cp->argv[i]);
+		strbuf_addf(&sb, " %s", cp->argv[i]);
 
-	die_errno("Starting a child failed:\n%s", sb.buf);
+	die_errno("Starting a child failed:%s", sb.buf);
 }
 
 void default_return_value(void *data,
@@ -915,12 +916,12 @@ void default_return_value(void *data,
 		return;
 
 	for (i = 0; cp->argv[i]; i++)
-		strbuf_addf(&sb, "%s ", cp->argv[i]);
+		strbuf_addf(&sb, " %s", cp->argv[i]);
 
-	die_errno("A child failed with return code:\n%s\n%d", sb.buf, result);
+	die_errno("A child failed with return code %d:%s", result, sb.buf);
 }
 
-static void run_processes_parallel_init(struct parallel_processes *pp,
+static void pp_init(struct parallel_processes *pp,
 					int n, void *data,
 					get_next_task_fn get_next_task,
 					start_failure_fn start_failure,
@@ -941,7 +942,6 @@ static void run_processes_parallel_init(struct parallel_processes *pp,
 	pp->return_value = return_value ? return_value : default_return_value;
 
 	pp->nr_processes = 0;
-	pp->all_tasks_started = 0;
 	pp->output_owner = 0;
 	pp->children = xcalloc(n, sizeof(*pp->children));
 	pp->pfd = xcalloc(n, sizeof(*pp->pfd));
@@ -954,9 +954,10 @@ static void run_processes_parallel_init(struct parallel_processes *pp,
 	}
 }
 
-static void run_processes_parallel_cleanup(struct parallel_processes *pp)
+static void pp_cleanup(struct parallel_processes *pp)
 {
 	int i;
+
 	for (i = 0; i < pp->max_processes; i++)
 		strbuf_release(&pp->children[i].err);
 
@@ -976,7 +977,8 @@ static void set_nonblocking(int fd)
 			"output will be degraded");
 }
 
-static void run_processes_parallel_start_one(struct parallel_processes *pp)
+/* returns 1 if a process was started, 0 otherwise */
+static int pp_start_one(struct parallel_processes *pp)
 {
 	int i;
 
@@ -988,10 +990,9 @@ static void run_processes_parallel_start_one(struct parallel_processes *pp)
 
 	if (!pp->get_next_task(pp->data,
 			       &pp->children[i].process,
-			       &pp->children[i].err)) {
-		pp->all_tasks_started = 1;
-		return;
-	}
+			       &pp->children[i].err))
+		return 1;
+
 	if (start_command(&pp->children[i].process))
 		pp->start_failure(pp->data,
 				  &pp->children[i].process,
@@ -1002,23 +1003,17 @@ static void run_processes_parallel_start_one(struct parallel_processes *pp)
 	pp->nr_processes++;
 	pp->children[i].in_use = 1;
 	pp->pfd[i].fd = pp->children[i].process.err;
+	return 0;
 }
 
-static void run_processes_parallel_start_as_needed(struct parallel_processes *pp)
-{
-	while (pp->nr_processes < pp->max_processes &&
-	       !pp->all_tasks_started)
-		run_processes_parallel_start_one(pp);
-}
-
-static void run_processes_parallel_buffer_stderr(struct parallel_processes *pp)
+static void pp_buffer_stderr(struct parallel_processes *pp)
 {
 	int i;
 
 	while ((i = poll(pp->pfd, pp->max_processes, 100)) < 0) {
 		if (errno == EINTR)
 			continue;
-		run_processes_parallel_cleanup(pp);
+		pp_cleanup(pp);
 		die_errno("poll");
 	}
 
@@ -1033,7 +1028,7 @@ static void run_processes_parallel_buffer_stderr(struct parallel_processes *pp)
 	}
 }
 
-static void run_processes_parallel_output(struct parallel_processes *pp)
+static void pp_output(struct parallel_processes *pp)
 {
 	int i = pp->output_owner;
 	if (pp->children[i].in_use &&
@@ -1043,7 +1038,7 @@ static void run_processes_parallel_output(struct parallel_processes *pp)
 	}
 }
 
-static void run_processes_parallel_collect_finished(struct parallel_processes *pp)
+static void pp_collect_finished(struct parallel_processes *pp)
 {
 	int i = 0;
 	pid_t pid;
@@ -1063,17 +1058,11 @@ static void run_processes_parallel_collect_finished(struct parallel_processes *p
 			    pid == pp->children[i].process.pid)
 				break;
 		if (i == pp->max_processes)
-			/*
-			 * waitpid returned another process id
-			 * which we are not waiting for.
-			 */
-			return;
-
-		if (strbuf_read_once(&pp->children[i].err,
-				     pp->children[i].process.err, 0) < 0 &&
-		    errno != EAGAIN)
-			die_errno("strbuf_read_once");
+			die("BUG: found a child process we were not aware of");
 
+		if (strbuf_read(&pp->children[i].err,
+				pp->children[i].process.err, 0) < 0)
+			die_errno("strbuf_read");
 
 		if (determine_return_value(wait_status, &code, &errno,
 					   pp->children[i].process.argv[0]) < 0)
@@ -1122,18 +1111,20 @@ int run_processes_parallel(int n, void *data,
 			   return_value_fn return_value)
 {
 	struct parallel_processes pp;
-	run_processes_parallel_init(&pp, n, data,
-				    get_next_task,
-				    start_failure,
-				    return_value);
-
-	while (!pp.all_tasks_started || pp.nr_processes > 0) {
-		run_processes_parallel_start_as_needed(&pp);
-		run_processes_parallel_buffer_stderr(&pp);
-		run_processes_parallel_output(&pp);
-		run_processes_parallel_collect_finished(&pp);
+	pp_init(&pp, n, data, get_next_task, start_failure, return_value);
+
+	while (1) {
+		while (pp.nr_processes < pp.max_processes &&
+		       !pp_start_one(&pp))
+			; /* nothing */
+		if (!pp.nr_processes)
+			break;
+		pp_buffer_stderr(&pp);
+		pp_output(&pp);
+		pp_collect_finished(&pp);
 	}
-	run_processes_parallel_cleanup(&pp);
+
+	pp_cleanup(&pp);
 
 	return 0;
 }
diff --git a/run-command.h b/run-command.h
index 0c1b363..3807fd1 100644
--- a/run-command.h
+++ b/run-command.h
@@ -155,6 +155,4 @@ int run_processes_parallel(int n, void *data,
 			   start_failure_fn,
 			   return_value_fn);
 
-void run_processes_parallel_schedule_error(struct strbuf *err);
-
 #endif
diff --git a/strbuf.h b/strbuf.h
index 4d4e5b1..ea69665 100644
--- a/strbuf.h
+++ b/strbuf.h
@@ -367,8 +367,11 @@ extern size_t strbuf_fread(struct strbuf *, size_t, FILE *);
 extern ssize_t strbuf_read(struct strbuf *, int fd, size_t hint);
 
 /**
- * Same as strbuf_read, just returns non-blockingly by ignoring EAGAIN.
- * The fd must have set O_NONBLOCK.
+ * Read from a file descriptor that is marked as O_NONBLOCK without
+ * blocking.  Returns the number of new bytes appended to the sb.
+ * Negative return value signals there was an error returned from
+ * underlying read(2), in which case the caller should check errno.
+ * e.g. errno == EAGAIN when the read may have blocked.
  */
 extern ssize_t strbuf_read_once(struct strbuf *, int fd, size_t hint);
 
diff --git a/submodule-config.c b/submodule-config.c
index 0298a60..8b8c7d1 100644
--- a/submodule-config.c
+++ b/submodule-config.c
@@ -258,93 +258,72 @@ static int parse_config(const char *var, const char *value, void *data)
 	if (!name_and_item_from_var(var, &name, &item))
 		return 0;
 
-	submodule = lookup_or_create_by_name(me->cache, me->gitmodules_sha1,
-			name.buf);
+	submodule = lookup_or_create_by_name(me->cache,
+					     me->gitmodules_sha1,
+					     name.buf);
 
 	if (!strcmp(item.buf, "path")) {
-		struct strbuf path = STRBUF_INIT;
-		if (!value) {
+		if (!value)
 			ret = config_error_nonbool(var);
-			goto release_return;
-		}
-		if (!me->overwrite && submodule->path != NULL) {
+		else if (!me->overwrite && submodule->path != NULL)
 			warn_multiple_config(me->commit_sha1, submodule->name,
 					"path");
-			goto release_return;
+		else {
+			if (submodule->path)
+				cache_remove_path(me->cache, submodule);
+			free((void *) submodule->path);
+			submodule->path = xstrdup(value);
+			cache_put_path(me->cache, submodule);
 		}
-
-		if (submodule->path)
-			cache_remove_path(me->cache, submodule);
-		free((void *) submodule->path);
-		strbuf_addstr(&path, value);
-		submodule->path = strbuf_detach(&path, NULL);
-		cache_put_path(me->cache, submodule);
 	} else if (!strcmp(item.buf, "fetchrecursesubmodules")) {
 		/* when parsing worktree configurations we can die early */
 		int die_on_error = is_null_sha1(me->gitmodules_sha1);
 		if (!me->overwrite &&
-		    submodule->fetch_recurse != RECURSE_SUBMODULES_NONE) {
+		    submodule->fetch_recurse != RECURSE_SUBMODULES_NONE)
 			warn_multiple_config(me->commit_sha1, submodule->name,
 					"fetchrecursesubmodules");
-			goto release_return;
-		}
-
-		submodule->fetch_recurse = parse_fetch_recurse(var, value,
+		else
+			submodule->fetch_recurse = parse_fetch_recurse(
+								var, value,
 								die_on_error);
 	} else if (!strcmp(item.buf, "ignore")) {
-		struct strbuf ignore = STRBUF_INIT;
-		if (!me->overwrite && submodule->ignore != NULL) {
+		if (!value)
+			ret = config_error_nonbool(var);
+		else if (!me->overwrite && submodule->ignore != NULL)
 			warn_multiple_config(me->commit_sha1, submodule->name,
 					"ignore");
-			goto release_return;
-		}
-		if (!value) {
-			ret = config_error_nonbool(var);
-			goto release_return;
-		}
-		if (strcmp(value, "untracked") && strcmp(value, "dirty") &&
-		    strcmp(value, "all") && strcmp(value, "none")) {
+		else if (strcmp(value, "untracked") &&
+			 strcmp(value, "dirty") &&
+			 strcmp(value, "all") &&
+			 strcmp(value, "none"))
 			warning("Invalid parameter '%s' for config option "
 					"'submodule.%s.ignore'", value, var);
-			goto release_return;
+		else {
+			free((void *) submodule->ignore);
+			submodule->ignore = xstrdup(value);
 		}
-
-		free((void *) submodule->ignore);
-		strbuf_addstr(&ignore, value);
-		submodule->ignore = strbuf_detach(&ignore, NULL);
 	} else if (!strcmp(item.buf, "url")) {
-		struct strbuf url = STRBUF_INIT;
 		if (!value) {
 			ret = config_error_nonbool(var);
-			goto release_return;
-		}
-		if (!me->overwrite && submodule->url != NULL) {
+		} else if (!me->overwrite && submodule->url != NULL) {
 			warn_multiple_config(me->commit_sha1, submodule->name,
 					"url");
-			goto release_return;
+		} else {
+			free((void *) submodule->url);
+			submodule->url = xstrdup(value);
 		}
-
-		free((void *) submodule->url);
-		strbuf_addstr(&url, value);
-		submodule->url = strbuf_detach(&url, NULL);
 	} else if (!strcmp(item.buf, "update")) {
-		struct strbuf update = STRBUF_INIT;
-		if (!value) {
+		if (!value)
 			ret = config_error_nonbool(var);
-			goto release_return;
-		}
-		if (!me->overwrite && submodule->update != NULL) {
+		else if (!me->overwrite && submodule->update != NULL)
 			warn_multiple_config(me->commit_sha1, submodule->name,
-					"update");
-			goto release_return;
+					     "update");
+		else {
+			free((void *)submodule->update);
+			submodule->update = xstrdup(value);
 		}
-
-		free((void *) submodule->update);
-		strbuf_addstr(&update, value);
-		submodule->update = strbuf_detach(&update, NULL);
 	}
 
-release_return:
 	strbuf_release(&name);
 	strbuf_release(&item);
 
diff --git a/submodule.c b/submodule.c
index d15364f..fdaf3e4 100644
--- a/submodule.c
+++ b/submodule.c
@@ -650,10 +650,12 @@ int fetch_populated_submodules(const struct argv_array *options,
 {
 	int i;
 	struct submodule_parallel_fetch spf = SPF_INIT;
+
 	spf.work_tree = get_git_work_tree();
 	spf.command_line_option = command_line_option;
 	spf.quiet = quiet;
 	spf.prefix = prefix;
+
 	if (!spf.work_tree)
 		goto out;
 
@@ -738,12 +740,11 @@ int get_next_submodule(void *data, struct child_process *cp,
 		if (is_directory(git_dir)) {
 			child_process_init(cp);
 			cp->dir = strbuf_detach(&submodule_path, NULL);
+			cp->env = local_repo_env;
 			cp->git_cmd = 1;
-			cp->no_stdout = 1;
 			cp->no_stdin = 1;
 			cp->stdout_to_stderr = 1;
 			cp->err = -1;
-			cp->env = local_repo_env;
 			if (!spf->quiet)
 				strbuf_addf(err, "Fetching submodule %s%s\n",
 					    spf->prefix, ce->name);
diff --git a/wrapper.c b/wrapper.c
index 54ce231..41a21e1 100644
--- a/wrapper.c
+++ b/wrapper.c
@@ -206,16 +206,10 @@ ssize_t xread(int fd, void *buf, size_t len)
 				continue;
 			if (errno == EAGAIN || errno == EWOULDBLOCK) {
 				struct pollfd pfd;
-				int i;
 				pfd.events = POLLIN;
 				pfd.fd = fd;
-				i = poll(&pfd, 1, 100);
-				if (i < 0) {
-					if (errno == EINTR || errno == ENOMEM)
-						continue;
-					else
-						die_errno("poll");
-				}
+				/* We deliberately ignore the return value */
+				poll(&pfd, 1, -1);
 			}
 		}
 		return nr;
@@ -225,13 +219,13 @@ ssize_t xread(int fd, void *buf, size_t len)
 /*
  * xread_nonblock() is the same a read(), but it automatically restarts read()
  * interrupted operations (EINTR). xread_nonblock() DOES NOT GUARANTEE that
- * "len" bytes is read even if the data is available.
+ * "len" bytes is read. EWOULDBLOCK is turned into EAGAIN.
  */
 ssize_t xread_nonblock(int fd, void *buf, size_t len)
 {
 	ssize_t nr;
 	if (len > MAX_IO_SIZE)
-	    len = MAX_IO_SIZE;
+		len = MAX_IO_SIZE;
 	while (1) {
 		nr = read(fd, buf, len);
 		if (nr < 0) {

-- 
2.5.0.272.ga84127c.dirty

--
To unsubscribe from this list: send the line "unsubscribe git" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html



[Index of Archives]     [Linux Kernel Development]     [Gcc Help]     [IETF Annouce]     [DCCP]     [Netdev]     [Networking]     [Security]     [V4L]     [Bugtraq]     [Yosemite]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux RAID]     [Linux SCSI]     [Fedora Users]