[PATCHv2] fetch submodules in parallel

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



This replaces the last patch of the "Parallel git submodule fetching"
series. Changes:

* have correct return code in submodule fetching when one submodule fails
* use poll instead of select now
* broke down into more smaller functions instead of one giant.
  (I think it is an improvement, but I wouldn't be surprised if someone objects)
* closed memory leaks
* document the need for stdout_to_stderr

I don't deem it RFC-ish any more but good to go.

Any feedback welcome!
Thanks,
Stefan


Stefan Beller (1):
  fetch: fetch submodules in parallel

 Documentation/fetch-options.txt |   7 +
 builtin/fetch.c                 |   6 +-
 builtin/pull.c                  |   6 +
 run-command.c                   | 278 ++++++++++++++++++++++++++++++++++++----
 run-command.h                   |  36 ++++++
 strbuf.c                        |  31 +++++
 strbuf.h                        |   1 +
 submodule.c                     | 119 ++++++++++++-----
 submodule.h                     |   2 +-
 t/t0061-run-command.sh          |  20 +++
 t/t5526-fetch-submodules.sh     |  19 +++
 test-run-command.c              |  24 ++++
 12 files changed, 490 insertions(+), 59 deletions(-)

Interdiff to RFCv1:

diff --git a/Documentation/fetch-options.txt b/Documentation/fetch-options.txt
index d432f98..6b109f6 100644
--- a/Documentation/fetch-options.txt
+++ b/Documentation/fetch-options.txt
@@ -105,7 +105,7 @@ ifndef::git-pull[]
 	Number of parallel children to be used for fetching submodules.
 	Each will fetch from different submodules, such that fetching many
 	submodules will be faster. By default submodules will be fetched
-	one at a time
+	one at a time.
 
 --no-recurse-submodules::
 	Disable recursive fetching of submodules (this has the same effect as
diff --git a/builtin/fetch.c b/builtin/fetch.c
index a1520bb..f28eac6 100644
--- a/builtin/fetch.c
+++ b/builtin/fetch.c
@@ -101,7 +101,7 @@ static struct option builtin_fetch_options[] = {
 	OPT_SET_INT('n', NULL, &tags,
 		    N_("do not fetch all tags (--no-tags)"), TAGS_UNSET),
 	OPT_INTEGER('j', "jobs", &max_children,
-		    N_("number of threads used for fetching")),
+		    N_("number of submodules fetched in parallel")),
 	OPT_BOOL('p', "prune", &prune,
 		 N_("prune remote-tracking branches no longer on remote")),
 	{ OPTION_CALLBACK, 0, "recurse-submodules", NULL, N_("on-demand"),
diff --git a/builtin/pull.c b/builtin/pull.c
index bc117e9..f0af196 100644
--- a/builtin/pull.c
+++ b/builtin/pull.c
@@ -179,7 +179,7 @@ static struct option pull_options[] = {
 		N_("control recursive fetching of submodules"),
 		PARSE_OPT_OPTARG),
 	OPT_PASSTHRU('j', "jobs", &max_children, N_("n"),
-		N_("number of threads used for fetching submodules"),
+		N_("number of submodules pulled in parallel"),
 		PARSE_OPT_OPTARG),
 	OPT_BOOL(0, "dry-run", &opt_dry_run,
 		N_("dry run")),
diff --git a/run-command.c b/run-command.c
index b8ff67b..6f6f9fb 100644
--- a/run-command.c
+++ b/run-command.c
@@ -232,6 +232,35 @@ static inline void set_cloexec(int fd)
 		fcntl(fd, F_SETFD, flags | FD_CLOEXEC);
 }
 
+static int determine_return_value(int wait_status,
+				  int *result,
+				  int *error_code,
+				  const char *argv0)
+{
+	if (WIFSIGNALED(wait_status)) {
+		*result = WTERMSIG(wait_status);
+		if (*result != SIGINT && *result != SIGQUIT)
+			error("%s died of signal %d", argv0, *result);
+		/*
+		 * This return value is chosen so that code & 0xff
+		 * mimics the exit code that a POSIX shell would report for
+		 * a program that died from this signal.
+		 */
+		*result += 128;
+	} else if (WIFEXITED(wait_status)) {
+		*result = WEXITSTATUS(wait_status);
+		/*
+		 * Convert special exit code when execvp failed.
+		 */
+		if (*result == 127) {
+			*result = -1;
+			*error_code = ENOENT;
+		}
+	} else
+		return 1;
+	return 0;
+}
+
 static int wait_or_whine(pid_t pid, const char *argv0)
 {
 	int status, code = -1;
@@ -244,29 +273,10 @@ static int wait_or_whine(pid_t pid, const char *argv0)
 	if (waiting < 0) {
 		failed_errno = errno;
 		error("waitpid for %s failed: %s", argv0, strerror(errno));
-	} else if (waiting != pid) {
-		error("waitpid is confused (%s)", argv0);
-	} else if (WIFSIGNALED(status)) {
-		code = WTERMSIG(status);
-		if (code != SIGINT && code != SIGQUIT)
-			error("%s died of signal %d", argv0, code);
-		/*
-		 * This return value is chosen so that code & 0xff
-		 * mimics the exit code that a POSIX shell would report for
-		 * a program that died from this signal.
-		 */
-		code += 128;
-	} else if (WIFEXITED(status)) {
-		code = WEXITSTATUS(status);
-		/*
-		 * Convert special exit code when execvp failed.
-		 */
-		if (code == 127) {
-			code = -1;
-			failed_errno = ENOENT;
-		}
 	} else {
-		error("waitpid is confused (%s)", argv0);
+		if (waiting != pid
+		   || (determine_return_value(status, &code, &failed_errno, argv0) < 0))
+			error("waitpid is confused (%s)", argv0);
 	}
 
 	clear_child_for_cleanup(pid);
@@ -853,146 +863,226 @@ int capture_command(struct child_process *cmd, struct strbuf *buf, size_t hint)
 	return finish_command(cmd);
 }
 
-int run_processes_async(int n, get_next_task fn, void *data)
+static void unblock_fd(int fd)
 {
-	int i, wait_status;
-	pid_t pid;
+	int flags = fcntl(fd, F_GETFL);
+	if (flags < 0) {
+		warning("Could not get file status flags, "
+			"output will be degraded");
+		return;
+	}
+	if (fcntl(fd, F_SETFL, flags | O_NONBLOCK)) {
+			warning("Could not set file status flags, "
+			"output will be degraded");
+		return;
+	}
+}
 
-	/* no more tasks. Also set when aborting early. */
-	int all_tasks_started = 0;
-	int nr_processes = 0;
-	int child_in_foreground = 0;
-	struct timeval timeout;
-	struct child_process *children = xcalloc(n, sizeof(*children));
-	char *slots = xcalloc(n, sizeof(*slots));
-	struct strbuf *err = xcalloc(n, sizeof(*err));
-	fd_set fdset;
-	int maxfd;
-	struct strbuf finished_children = STRBUF_INIT;
-	int flags;
-	for (i = 0; i < n; i++)
-		strbuf_init(&err[i], 0);
-
-	while (!all_tasks_started || nr_processes > 0) {
-		/* Start new processes. */
-		while (!all_tasks_started && nr_processes < n) {
-			for (i = 0; i < n; i++)
-				if (!slots[i])
-					break; /* found an empty slot */
-			if (i == n)
-				die("BUG: bookkeeping is hard");
-
-			if (fn(data, &children[i], &err[i])) {
-				all_tasks_started = 1;
-				break;
-			}
-			if (start_command(&children[i]))
-				die(_("Could not start child process"));
-			flags = fcntl(children[i].err, F_GETFL);
-			fcntl(children[i].err, F_SETFL, flags | O_NONBLOCK);
-			nr_processes++;
-			slots[i] = 1;
-		}
+struct parallel_processes {
+	int max_number_processes;
+	void *data;
+	get_next_task fn;
+	handle_child_starting_failure fn_err;
+	handle_child_return_value fn_exit;
+
+	int nr_processes;
+	int all_tasks_started;
+	int foreground_child;
+	char *slots;
+	struct child_process *children;
+	struct pollfd *pfd;
+	struct strbuf *err;
+	struct strbuf finished_children;
+};
+
+static void run_processes_parallel_init(struct parallel_processes *pp,
+					int n, void *data,
+					get_next_task fn,
+					handle_child_starting_failure fn_err,
+					handle_child_return_value fn_exit)
+{
+	int i;
+
+	pp->max_number_processes = n;
+	pp->data = data;
+	pp->fn = fn;
+	pp->fn_err = fn_err;
+	pp->fn_exit = fn_exit;
+
+	pp->nr_processes = 0;
+	pp->all_tasks_started = 0;
+	pp->foreground_child = 0;
+	pp->slots = xcalloc(n, sizeof(*pp->slots));
+	pp->children = xcalloc(n, sizeof(*pp->children));
+	pp->pfd = xcalloc(n, sizeof(*pp->pfd));
+	pp->err = xcalloc(n, sizeof(*pp->err));
+	strbuf_init(&pp->finished_children, 0);
+
+	for (i = 0; i < n; i++) {
+		strbuf_init(&pp->err[i], 0);
+		pp->pfd[i].events = POLLIN;
+		pp->pfd[i].fd = -1;
+	}
+}
+
+static void run_processes_parallel_cleanup(struct parallel_processes *pp)
+{
+	int i;
+	for (i = 0; i < pp->max_number_processes; i++)
+		strbuf_release(&pp->err[i]);
+
+	free(pp->children);
+	free(pp->slots);
+	free(pp->pfd);
+	free(pp->err);
+	strbuf_release(&pp->finished_children);
+}
 
-		/* prepare data for select call */
-		FD_ZERO(&fdset);
-		maxfd = 0;
-		for (i = 0; i < n; i++) {
-			if (!slots[i])
-				continue;
-			FD_SET(children[i].err, &fdset);
-			if (children[i].err > maxfd)
-				maxfd = children[i].err;
+static void run_processes_parallel_start_new(struct parallel_processes *pp)
+{
+	int i;
+	/* Start new processes. */
+	while (!pp->all_tasks_started
+	       && pp->nr_processes < pp->max_number_processes) {
+		for (i = 0; i < pp->max_number_processes; i++)
+			if (!pp->slots[i])
+				break; /* found an empty slot */
+		if (i == pp->max_number_processes)
+			die("BUG: bookkeeping is hard");
+
+		if (pp->fn(pp->data, &pp->children[i], &pp->err[i])) {
+			pp->all_tasks_started = 1;
+			break;
 		}
-		timeout.tv_sec = 0;
-		timeout.tv_usec = 500000;
+		if (start_command(&pp->children[i]))
+			pp->fn_err(pp->data, &pp->children[i], &pp->err[i]);
 
-		i = select(maxfd + 1, &fdset, NULL, NULL, &timeout);
-		if (i < 0) {
-			if (errno == EINTR)
-				/* A signal was caught; try again */
-				continue;
-			else if (errno == ENOMEM)
-				die_errno("BUG: keeping track of fds is hard");
-			else if (errno == EINVAL)
-				die_errno("BUG: invalid arguments to select");
-			else if (errno == EBADF)
-				die_errno("BUG: keeping track of fds is hard");
-			else
-				die_errno("Unknown error with select");
+		unblock_fd(pp->children[i].err);
+
+		pp->nr_processes++;
+		pp->slots[i] = 1;
+		pp->pfd[i].fd = pp->children[i].err;
+	}
+}
+
+static int run_processes_parallel_buffer_stderr(struct parallel_processes *pp)
+{
+	int i;
+	i = poll(pp->pfd, pp->max_number_processes, 100);
+	if (i < 0) {
+		if (errno == EINTR)
+			/* A signal was caught; try again */
+			return -1;
+		else {
+			run_processes_parallel_cleanup(pp);
+			die_errno("poll");
 		}
+	}
 
-		/* Buffer output from all pipes. */
-		for (i = 0; i < n; i++) {
-			if (!slots[i])
-				continue;
-			if (FD_ISSET(children[i].err, &fdset))
-				strbuf_read_noblock(&err[i], children[i].err, 0);
-			if (child_in_foreground == i) {
-				fputs(err[i].buf, stderr);
-				strbuf_reset(&err[i]);
-				fflush(stderr);
-			}
+	/* Buffer output from all pipes. */
+	for (i = 0; i < pp->max_number_processes; i++) {
+		if (!pp->slots[i])
+			continue;
+		if (pp->pfd[i].revents & POLLIN)
+			strbuf_read_noblock(&pp->err[i], pp->children[i].err, 0);
+		if (pp->foreground_child == i) {
+			fputs(pp->err[i].buf, stderr);
+			strbuf_reset(&pp->err[i]);
 		}
+	}
+	return 0;
+}
 
-		/* Collect finished child processes. */
-		while (nr_processes > 0) {
-			pid = waitpid(-1, &wait_status, WNOHANG);
-			if (pid == 0)
-				/* no child finished */
-				break;
-
-			if (pid < 0) {
-				if (errno == EINTR)
-					break; /* just try again  next time */
-				if (errno == EINVAL || errno == ECHILD)
-					die_errno("wait");
-			} else {
-				/* Find the finished child. */
-				for (i = 0; i < n; i++)
-					if (slots[i] && pid == children[i].pid)
-						break;
-				if (i == n)
-					/* waitpid returned another process id which
-					 * we are not waiting on, so ignore it*/
+
+static void run_processes_parallel_collect_finished(struct parallel_processes *pp)
+{
+	int i = 0;
+	pid_t pid;
+	int wait_status, code;
+	int n = pp->max_number_processes;
+	/* Collect finished child processes. */
+	while (pp->nr_processes > 0) {
+		pid = waitpid(-1, &wait_status, WNOHANG);
+		if (pid == 0)
+			return; /* no child finished */
+
+		if (pid < 0) {
+			if (errno == EINTR)
+				return; /* just try again  next time */
+			if (errno == EINVAL || errno == ECHILD)
+				die_errno("wait");
+		} else {
+			/* Find the finished child. */
+			for (i = 0; i < pp->max_number_processes; i++)
+				if (pp->slots[i] && pid == pp->children[i].pid)
 					break;
-			}
+			if (i == pp->max_number_processes)
+				/*
+				 * waitpid returned another process id
+				 * which we are not waiting for.
+				 */
+				return;
+		}
+		strbuf_read_noblock(&pp->err[i], pp->children[i].err, 0);
 
-			strbuf_read_noblock(&err[i], children[i].err, 0);
-			argv_array_clear(&children[i].args);
-			argv_array_clear(&children[i].env_array);
+		if (determine_return_value(wait_status, &code, &errno,
+					   pp->children[i].argv[0]) < 0)
+			error("waitpid is confused (%s)",
+			      pp->children[i].argv[0]);
 
-			slots[i] = 0;
-			nr_processes--;
+		pp->fn_exit(pp->data, &pp->children[i], code);
 
-			if (i != child_in_foreground) {
-				strbuf_addbuf(&finished_children, &err[i]);
-				strbuf_reset(&err[i]);
-			} else {
-				fputs(err[i].buf, stderr);
-				strbuf_reset(&err[i]);
+		argv_array_clear(&pp->children[i].args);
+		argv_array_clear(&pp->children[i].env_array);
 
-				/* Output all other finished child processes */
-				fputs(finished_children.buf, stderr);
-				strbuf_reset(&finished_children);
+		pp->nr_processes--;
+		pp->slots[i] = 0;
+		pp->pfd[i].fd = -1;
 
-				/*
-				 * Pick next process to output live.
-				 * There can be no active process if n==1
-				 * NEEDSWORK:
-				 * For now we pick it randomly by doing a round
-				 * robin. Later we may want to pick the one with
-				 * the most output or the longest or shortest
-				 * running process time.
-				 */
-				for (i = 0; i < n; i++)
-					if (slots[(child_in_foreground + i) % n])
-						break;
-				child_in_foreground = (child_in_foreground + i) % n;
-				fputs(err[child_in_foreground].buf, stderr);
-				strbuf_reset(&err[child_in_foreground]);
-			}
+		if (i != pp->foreground_child) {
+			strbuf_addbuf(&pp->finished_children, &pp->err[i]);
+			strbuf_reset(&pp->err[i]);
+		} else {
+			fputs(pp->err[i].buf, stderr);
+			strbuf_reset(&pp->err[i]);
+
+			/* Output all other finished child processes */
+			fputs(pp->finished_children.buf, stderr);
+			strbuf_reset(&pp->finished_children);
+
+			/*
+			 * Pick next process to output live.
+			 * NEEDSWORK:
+			 * For now we pick it randomly by doing a round
+			 * robin. Later we may want to pick the one with
+			 * the most output or the longest or shortest
+			 * running process time.
+			 */
+			for (i = 0; i < n; i++)
+				if (pp->slots[(pp->foreground_child + i) % n])
+					break;
+			pp->foreground_child = (pp->foreground_child + i) % n;
+			fputs(pp->err[pp->foreground_child].buf, stderr);
+			strbuf_reset(&pp->err[pp->foreground_child]);
 		}
 	}
+}
+
+int run_processes_parallel(int n, void *data,
+			   get_next_task fn,
+			   handle_child_starting_failure fn_err,
+			   handle_child_return_value fn_exit)
+{
+	struct parallel_processes pp;
+	run_processes_parallel_init(&pp, n, data, fn, fn_err, fn_exit);
+
+	while (!pp.all_tasks_started || pp.nr_processes > 0) {
+		run_processes_parallel_start_new(&pp);
+		if (run_processes_parallel_buffer_stderr(&pp))
+			continue;
+		run_processes_parallel_collect_finished(&pp);
+	}
+	run_processes_parallel_cleanup(&pp);
+
 	return 0;
 }
diff --git a/run-command.h b/run-command.h
index 8f53ad6..0487f71 100644
--- a/run-command.h
+++ b/run-command.h
@@ -120,32 +120,39 @@ int start_async(struct async *async);
 int finish_async(struct async *async);
 
 /**
- * Return 0 if the next child is ready to run.
- * This callback takes care to initialize the child process and preload the
- * out and error channel. The preloading of these outpout channels is useful
- * if you want to have a message printed directly before the output of the
- * child process.
+ * This callback should initialize the child process and preload the
+ * error channel. The preloading of is useful if you want to have a message
+ * printed directly before the output of the child process.
+ * You MUST set stdout_to_stderr.
  *
+ * Return 0 if the next child is ready to run.
  * Return != 0 if there are no more tasks to be processed.
  */
 typedef int (*get_next_task)(void *data,
 			     struct child_process *cp,
 			     struct strbuf *err);
 
+typedef void (*handle_child_starting_failure)(void *data,
+					      struct child_process *cp,
+					      struct strbuf *err);
+
+typedef void (*handle_child_return_value)(void *data,
+					  struct child_process *cp,
+					  int result);
+
 /**
- * Runs up to n processes at the same time. Whenever a process can
- * be started, the callback `get_next_task` is called to obtain the
- * data fed to the child process.
+ * Runs up to n processes at the same time. Whenever a process can be
+ * started, the callback `get_next_task` is called to obtain the data
+ * fed to the child process.
  *
  * The children started via this function run in parallel and their output
- * to both stdout and stderr is buffered, while one of the children will
- * directly output to stdout/stderr.
- *
- * This leads to a problem with output from processes which put out to
- * stdout/err alternatingly as the buffering will not be able to replay
- * the
+ * to stderr is buffered, while one of the children will directly output
+ * to stderr.
  */
 
-int run_processes_async(int n, get_next_task fn, void *data);
+int run_processes_parallel(int n, void *data,
+			   get_next_task fn,
+			   handle_child_starting_failure,
+			   handle_child_return_value);
 
 #endif
diff --git a/submodule.c b/submodule.c
index 6d757c6..a0e06e8 100644
--- a/submodule.c
+++ b/submodule.c
@@ -623,17 +623,32 @@ struct submodule_parallel_fetch {
 	const char *prefix;
 	int command_line_option;
 	int quiet;
+	int result;
 };
-#define SPF_INIT {0, ARGV_ARRAY_INIT, NULL}
+#define SPF_INIT {0, ARGV_ARRAY_INIT, NULL, NULL, 0, 0, 0}
 
 int get_next_submodule(void *data, struct child_process *cp,
 		       struct strbuf *err);
 
+void handle_submodule_fetch_start_err(void *data, struct child_process *cp, struct strbuf *err)
+{
+	struct submodule_parallel_fetch *spf = data;
+	spf->result = 1;
+}
+
+void handle_submodule_fetch_finish( void *data, struct child_process *cp, int retvalue)
+{
+	struct submodule_parallel_fetch *spf = data;
+
+	if (retvalue)
+		spf->result = 1;
+}
+
 int fetch_populated_submodules(const struct argv_array *options,
 			       const char *prefix, int command_line_option,
 			       int quiet, int max_parallel_jobs)
 {
-	int i, result = 0;
+	int i;
 	struct submodule_parallel_fetch spf = SPF_INIT;
 	spf.work_tree = get_git_work_tree();
 	spf.command_line_option = command_line_option;
@@ -652,12 +667,15 @@ int fetch_populated_submodules(const struct argv_array *options,
 	/* default value, "--submodule-prefix" and its value are added later */
 
 	calculate_changed_submodule_paths();
-	run_processes_async(max_parallel_jobs, get_next_submodule, &spf);
+	run_processes_parallel(max_parallel_jobs, &spf,
+			       get_next_submodule,
+			       handle_submodule_fetch_start_err,
+			       handle_submodule_fetch_finish);
 
 	argv_array_clear(&spf.args);
 out:
 	string_list_clear(&changed_submodule_paths, 1);
-	return result;
+	return spf.result;
 }
 
 int get_next_submodule(void *data, struct child_process *cp,
diff --git a/t/t0061-run-command.sh b/t/t0061-run-command.sh
index 0970fb0..37c89b9 100755
--- a/t/t0061-run-command.sh
+++ b/t/t0061-run-command.sh
@@ -48,18 +48,22 @@ test_expect_success POSIXPERM,SANITY 'unreadable directory in PATH' '
 '
 
 cat >expect <<-EOF
-Now running instance 0
-Hello World
-Now running instance 1
-Hello World
-Now running instance 2
-Hello World
-Now running instance 3
-Hello World
+preloaded output of a child
+Hello
+World
+preloaded output of a child
+Hello
+World
+preloaded output of a child
+Hello
+World
+preloaded output of a child
+Hello
+World
 EOF
 
 test_expect_success 'run_command runs in parallel' '
-	test-run-command run-command-async sh -c "echo Hello World >&2;" 2>actual &&
+	test-run-command run-command-async sh -c "printf \"%s\n%s\n\" Hello World" 2>actual &&
 	test_cmp expect actual
 '
 
diff --git a/test-run-command.c b/test-run-command.c
index 4817f6e..71fd3ca 100644
--- a/test-run-command.c
+++ b/test-run-command.c
@@ -16,9 +16,9 @@
 #include <errno.h>
 
 static int number_callbacks;
-int run_processes_async_next(void *data,
-			     struct child_process *cp,
-			     struct strbuf *err)
+int parallel_next(void *data,
+		  struct child_process *cp,
+		  struct strbuf *err)
 {
 	struct child_process *d = data;
 	if (number_callbacks >= 4)
@@ -28,7 +28,7 @@ int run_processes_async_next(void *data,
 	cp->stdout_to_stderr = 1;
 	cp->no_stdin = 1;
 	cp->err = -1;
-	strbuf_addf(err, "Now running instance %d\n", number_callbacks);
+	strbuf_addf(err, "preloaded output of a child\n");
 	number_callbacks++;
 	return 0;
 }
@@ -51,7 +51,8 @@ int main(int argc, char **argv)
 		exit(run_command(&proc));
 
 	if (!strcmp(argv[1], "run-command-async"))
-		exit(run_processes_async(4, run_processes_async_next, &proc));
+		exit(run_processes_parallel(4, &proc, parallel_next,
+					 NULL, NULL));
 
 	fprintf(stderr, "check usage\n");
 	return 1;

-- 
2.6.0.rc0.131.gf624c3d

--
To unsubscribe from this list: send the line "unsubscribe git" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html



[Index of Archives]     [Linux Kernel Development]     [Gcc Help]     [IETF Annouce]     [DCCP]     [Netdev]     [Networking]     [Security]     [V4L]     [Bugtraq]     [Yosemite]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux RAID]     [Linux SCSI]     [Fedora Users]