[PATCH 0/8] fetch submodules in parallel

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Changes to v4: (diff below)
* Some functions wanted to be static (Thanks Ramsay!)
* The patch to factor out return code handling has been dropped as
  the return code handling is slightly different in finish_command and
  the parallel case.
* We can handle signals a bit more gracefully now.
* More documentation in run-command.h 
* I thought it is a good idea to introduce `sigchain_pop_common`.

Jonathan Nieder (1):
  submodule.c: write "Fetching submodule <foo>" to stderr

Stefan Beller (7):
  xread: poll on non blocking fds
  xread_nonblock: add functionality to read from fds without blocking
  strbuf: add strbuf_read_once to read without blocking
  sigchain: add command to pop all common signals
  run-command: add an asynchronous parallel child processor
  fetch_populated_submodules: use new parallel job processing
  submodules: allow parallel fetching, add tests and documentation

 Documentation/fetch-options.txt |   7 +
 builtin/fetch.c                 |   6 +-
 builtin/pull.c                  |   6 +
 git-compat-util.h               |   1 +
 run-command.c                   | 348 ++++++++++++++++++++++++++++++++++++++++
 run-command.h                   |  63 ++++++++
 sigchain.c                      |   9 ++
 sigchain.h                      |   1 +
 strbuf.c                        |  11 ++
 strbuf.h                        |   9 ++
 submodule.c                     | 127 +++++++++++----
 submodule.h                     |   2 +-
 t/t0061-run-command.sh          |  20 +++
 t/t5526-fetch-submodules.sh     |  70 +++++---
 test-run-command.c              |  24 +++
 wrapper.c                       |  35 +++-
 16 files changed, 675 insertions(+), 64 deletions(-)

diff --git a/run-command.c b/run-command.c
index 494e1f8..df84985 100644
--- a/run-command.c
+++ b/run-command.c
@@ -234,35 +234,6 @@ static inline void set_cloexec(int fd)
 		fcntl(fd, F_SETFD, flags | FD_CLOEXEC);
 }
 
-static int determine_return_value(int wait_status,
-				  int *result,
-				  int *error_code,
-				  const char *argv0)
-{
-	if (WIFSIGNALED(wait_status)) {
-		*result = WTERMSIG(wait_status);
-		if (*result != SIGINT && *result != SIGQUIT)
-			error("%s died of signal %d", argv0, *result);
-		/*
-		 * This return value is chosen so that code & 0xff
-		 * mimics the exit code that a POSIX shell would report for
-		 * a program that died from this signal.
-		 */
-		*result += 128;
-	} else if (WIFEXITED(wait_status)) {
-		*result = WEXITSTATUS(wait_status);
-		/*
-		 * Convert special exit code when execvp failed.
-		 */
-		if (*result == 127) {
-			*result = -1;
-			*error_code = ENOENT;
-		}
-	} else
-		return -1;
-	return 0;
-}
-
 static int wait_or_whine(pid_t pid, const char *argv0)
 {
 	int status, code = -1;
@@ -275,12 +246,29 @@ static int wait_or_whine(pid_t pid, const char *argv0)
 	if (waiting < 0) {
 		failed_errno = errno;
 		error("waitpid for %s failed: %s", argv0, strerror(errno));
+	} else if (waiting != pid) {
+		error("waitpid is confused (%s)", argv0);
+	} else if (WIFSIGNALED(status)) {
+		code = WTERMSIG(status);
+		if (code != SIGINT && code != SIGQUIT)
+			error("%s died of signal %d", argv0, code);
+		/*
+		 * This return value is chosen so that code & 0xff
+		 * mimics the exit code that a POSIX shell would report for
+		 * a program that died from this signal.
+		 */
+		code += 128;
+	} else if (WIFEXITED(status)) {
+		code = WEXITSTATUS(status);
+		/*
+		 * Convert special exit code when execvp failed.
+		 */
+		if (code == 127) {
+			code = -1;
+			failed_errno = ENOENT;
+		}
 	} else {
-		if (waiting != pid || (determine_return_value(status,
-							      &code,
-							      &failed_errno,
-							      argv0) < 0))
-			error("waitpid is confused (%s)", argv0);
+		error("waitpid is confused (%s)", argv0);
 	}
 
 	clear_child_for_cleanup(pid);
@@ -888,46 +876,67 @@ struct parallel_processes {
 	 */
 	struct pollfd *pfd;
 
+	unsigned shutdown : 1;
+
 	int output_owner;
 	struct strbuf buffered_output; /* of finished children */
-};
+} parallel_processes_struct;
 
-void default_start_failure(void *data,
-			   struct child_process *cp,
-			   struct strbuf *err)
+static int default_start_failure(void *data,
+				 struct child_process *cp,
+				 struct strbuf *err)
 {
 	int i;
-	struct strbuf sb = STRBUF_INIT;
 
+	strbuf_addstr(err, "Starting a child failed:");
 	for (i = 0; cp->argv[i]; i++)
-		strbuf_addf(&sb, " %s", cp->argv[i]);
+		strbuf_addf(err, " %s", cp->argv[i]);
 
-	die_errno("Starting a child failed:%s", sb.buf);
+	return 0;
 }
 
-void default_return_value(void *data,
-			  struct child_process *cp,
-			  int result)
+static int default_return_value(void *data,
+				struct child_process *cp,
+				struct strbuf *err,
+				int result)
 {
 	int i;
-	struct strbuf sb = STRBUF_INIT;
 
 	if (!result)
-		return;
+		return 0;
 
+	strbuf_addf(err, "A child failed with return code %d:", result);
 	for (i = 0; cp->argv[i]; i++)
-		strbuf_addf(&sb, " %s", cp->argv[i]);
+		strbuf_addf(err, " %s", cp->argv[i]);
 
-	die_errno("A child failed with return code %d:%s", result, sb.buf);
+	return 0;
 }
 
-static void pp_init(struct parallel_processes *pp,
-					int n, void *data,
-					get_next_task_fn get_next_task,
-					start_failure_fn start_failure,
-					return_value_fn return_value)
+static void kill_children(struct parallel_processes *pp, int signo)
+{
+	int i, n = pp->max_processes;
+
+	for (i = 0; i < n; i++)
+		if (pp->children[i].in_use)
+			kill(pp->children[i].process.pid, signo);
+}
+
+static void handle_children_on_signal(int signo)
+{
+	struct parallel_processes *pp = &parallel_processes_struct;
+
+	kill_children(pp, signo);
+	sigchain_pop(signo);
+	raise(signo);
+}
+
+static struct parallel_processes *pp_init(int n, void *data,
+					  get_next_task_fn get_next_task,
+					  start_failure_fn start_failure,
+					  return_value_fn return_value)
 {
 	int i;
+	struct parallel_processes *pp = &parallel_processes_struct;
 
 	if (n < 1)
 		n = online_cpus();
@@ -952,6 +961,8 @@ static void pp_init(struct parallel_processes *pp,
 		pp->pfd[i].events = POLLIN;
 		pp->pfd[i].fd = -1;
 	}
+	sigchain_push_common(handle_children_on_signal);
+	return pp;
 }
 
 static void pp_cleanup(struct parallel_processes *pp)
@@ -964,6 +975,8 @@ static void pp_cleanup(struct parallel_processes *pp)
 	free(pp->children);
 	free(pp->pfd);
 	strbuf_release(&pp->buffered_output);
+
+	sigchain_pop_common();
 }
 
 static void set_nonblocking(int fd)
@@ -977,7 +990,12 @@ static void set_nonblocking(int fd)
 			"output will be degraded");
 }
 
-/* returns 1 if a process was started, 0 otherwise */
+/* returns
+ *  0 if a new task was started.
+ *  1 if no new jobs was started (get_next_task ran out of work, non critical
+ *    problem with starting a new command)
+ * -1 no new job was started, user wishes to shutdown early.
+ */
 static int pp_start_one(struct parallel_processes *pp)
 {
 	int i;
@@ -993,10 +1011,14 @@ static int pp_start_one(struct parallel_processes *pp)
 			       &pp->children[i].err))
 		return 1;
 
-	if (start_command(&pp->children[i].process))
-		pp->start_failure(pp->data,
-				  &pp->children[i].process,
-				  &pp->children[i].err);
+	if (start_command(&pp->children[i].process)) {
+		int code = pp->start_failure(pp->data,
+					     &pp->children[i].process,
+					     &pp->children[i].err);
+		strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
+		strbuf_reset(&pp->children[i].err);
+		return code ? -1 : 1;
+	}
 
 	set_nonblocking(pp->children[i].process.err);
 
@@ -1006,11 +1028,11 @@ static int pp_start_one(struct parallel_processes *pp)
 	return 0;
 }
 
-static void pp_buffer_stderr(struct parallel_processes *pp)
+static void pp_buffer_stderr(struct parallel_processes *pp, int output_timeout)
 {
 	int i;
 
-	while ((i = poll(pp->pfd, pp->max_processes, 100)) < 0) {
+	while ((i = poll(pp->pfd, pp->max_processes, output_timeout)) < 0) {
 		if (errno == EINTR)
 			continue;
 		pp_cleanup(pp);
@@ -1038,17 +1060,18 @@ static void pp_output(struct parallel_processes *pp)
 	}
 }
 
-static void pp_collect_finished(struct parallel_processes *pp)
+static int pp_collect_finished(struct parallel_processes *pp)
 {
 	int i = 0;
 	pid_t pid;
 	int wait_status, code;
 	int n = pp->max_processes;
+	int result = 0;
 
 	while (pp->nr_processes > 0) {
 		pid = waitpid(-1, &wait_status, WNOHANG);
 		if (pid == 0)
-			return;
+			return 0;
 
 		if (pid < 0)
 			die_errno("wait");
@@ -1064,12 +1087,38 @@ static void pp_collect_finished(struct parallel_processes *pp)
 				pp->children[i].process.err, 0) < 0)
 			die_errno("strbuf_read");
 
-		if (determine_return_value(wait_status, &code, &errno,
-					   pp->children[i].process.argv[0]) < 0)
-			error("waitpid is confused (%s)",
-			      pp->children[i].process.argv[0]);
+		if (WIFSIGNALED(wait_status)) {
+			code = WTERMSIG(wait_status);
+			if (!pp->shutdown &&
+			    code != SIGINT && code != SIGQUIT)
+				strbuf_addf(&pp->children[i].err,
+					    "%s died of signal %d",
+					    pp->children[i].process.argv[0],
+					    code);
+			/*
+			 * This return value is chosen so that code & 0xff
+			 * mimics the exit code that a POSIX shell would report for
+			 * a program that died from this signal.
+			 */
+			code += 128;
+		} else if (WIFEXITED(wait_status)) {
+			code = WEXITSTATUS(wait_status);
+			/*
+			 * Convert special exit code when execvp failed.
+			 */
+			if (code == 127) {
+				code = -1;
+				errno = ENOENT;
+			}
+		} else
+			strbuf_addf(&pp->children[i].err,
+				    "waitpid is confused (%s)",
+				    pp->children[i].process.argv[0]);
+
 
-		pp->return_value(pp->data, &pp->children[i].process, code);
+		if (pp->return_value(pp->data, &pp->children[i].process,
+				     &pp->children[i].err, code))
+			result = 1;
 
 		argv_array_clear(&pp->children[i].process.args);
 		argv_array_clear(&pp->children[i].process.env_array);
@@ -1103,6 +1152,7 @@ static void pp_collect_finished(struct parallel_processes *pp)
 			pp->output_owner = (pp->output_owner + i) % n;
 		}
 	}
+	return result;
 }
 
 int run_processes_parallel(int n, void *data,
@@ -1110,21 +1160,43 @@ int run_processes_parallel(int n, void *data,
 			   start_failure_fn start_failure,
 			   return_value_fn return_value)
 {
-	struct parallel_processes pp;
-	pp_init(&pp, n, data, get_next_task, start_failure, return_value);
+	int no_more_task = 0;
+	struct parallel_processes *pp;
 
+	pp = pp_init(n, data, get_next_task, start_failure, return_value);
 	while (1) {
-		while (pp.nr_processes < pp.max_processes &&
-		       !pp_start_one(&pp))
-			; /* nothing */
-		if (!pp.nr_processes)
+		int i;
+		int output_timeout = 100;
+		int spawn_cap = 4;
+
+		if (!no_more_task) {
+			for (i = 0; i < spawn_cap; i++) {
+				int code;
+				if (pp->nr_processes == pp->max_processes)
+					break;
+
+				code = pp_start_one(pp);
+				if (!code)
+					continue;
+				if (code < 0) {
+					pp->shutdown = 1;
+					kill_children(pp, SIGTERM);
+				}
+				no_more_task = 1;
+				break;
+			}
+		}
+		if (no_more_task && !pp->nr_processes)
 			break;
-		pp_buffer_stderr(&pp);
-		pp_output(&pp);
-		pp_collect_finished(&pp);
+		pp_buffer_stderr(pp, output_timeout);
+		pp_output(pp);
+		if (pp_collect_finished(pp)) {
+			kill_children(pp, SIGTERM);
+			pp->shutdown = 1;
+			no_more_task = 1;
+		}
 	}
 
-	pp_cleanup(&pp);
-
+	pp_cleanup(pp);
 	return 0;
 }
diff --git a/run-command.h b/run-command.h
index 3807fd1..1179cb0 100644
--- a/run-command.h
+++ b/run-command.h
@@ -132,13 +132,36 @@ typedef int (*get_next_task_fn)(void *data,
 				struct child_process *cp,
 				struct strbuf *err);
 
-typedef void (*start_failure_fn)(void *data,
-				 struct child_process *cp,
-				 struct strbuf *err);
-
-typedef void (*return_value_fn)(void *data,
+/**
+ * This callback is called whenever there are problems starting
+ * a new process.
+ *
+ * You must not write to stdout or stderr in this function. Add your
+ * message to the strbuf err instead, which will be printed without
+ * messing up the output of the other parallel processes.
+ *
+ * Return 0 to continue the parallel processing. To abort gracefully,
+ * return non zero.
+ */
+typedef int (*start_failure_fn)(void *data,
 				struct child_process *cp,
-				int result);
+				struct strbuf *err);
+
+/**
+ * This callback is called on every there are problems starting
+ * a new process.
+ *
+ * You must not write to stdout or stderr in this function. Add your
+ * message to the strbuf err instead, which will be printed without
+ * messing up the output of the other parallel processes.
+ *
+ * Return 0 to continue the parallel processing. To abort gracefully,
+ * return non zero.
+ */
+typedef int (*return_value_fn)(void *data,
+			       struct child_process *cp,
+			       struct strbuf *err,
+			       int result);
 
 /**
  * Runs up to n processes at the same time. Whenever a process can be
@@ -148,6 +171,10 @@ typedef void (*return_value_fn)(void *data,
  * The children started via this function run in parallel and their output
  * to stderr is buffered, while one of the children will directly output
  * to stderr.
+ *
+ * If start_failure_fn and return_value_fn are NULL, default handlers
+ * will be used. The default handlers will print an error message on
+ * error without issuing an emergency stop.
  */
 
 int run_processes_parallel(int n, void *data,
diff --git a/sigchain.c b/sigchain.c
index faa375d..9262307 100644
--- a/sigchain.c
+++ b/sigchain.c
@@ -50,3 +50,12 @@ void sigchain_push_common(sigchain_fun f)
 	sigchain_push(SIGQUIT, f);
 	sigchain_push(SIGPIPE, f);
 }
+
+void sigchain_pop_common(void)
+{
+	sigchain_pop(SIGINT);
+	sigchain_pop(SIGHUP);
+	sigchain_pop(SIGTERM);
+	sigchain_pop(SIGQUIT);
+	sigchain_pop(SIGPIPE);
+}
diff --git a/sigchain.h b/sigchain.h
index 618083b..138b20f 100644
--- a/sigchain.h
+++ b/sigchain.h
@@ -7,5 +7,6 @@ int sigchain_push(int sig, sigchain_fun f);
 int sigchain_pop(int sig);
 
 void sigchain_push_common(sigchain_fun f);
+void sigchain_pop_common(void);
 
 #endif /* SIGCHAIN_H */
diff --git a/submodule.c b/submodule.c
index fdaf3e4..7ab89f4 100644
--- a/submodule.c
+++ b/submodule.c
@@ -630,18 +630,25 @@ struct submodule_parallel_fetch {
 int get_next_submodule(void *data, struct child_process *cp,
 		       struct strbuf *err);
 
-void handle_submodule_fetch_start_err(void *data, struct child_process *cp, struct strbuf *err)
+static int fetch_start_failure(void *data, struct child_process *cp,
+			       struct strbuf *err)
 {
 	struct submodule_parallel_fetch *spf = data;
+
 	spf->result = 1;
+
+	return 0;
 }
 
-void handle_submodule_fetch_finish( void *data, struct child_process *cp, int retvalue)
+static int fetch_finish(void *data, struct child_process *cp,
+			struct strbuf *err, int retvalue)
 {
 	struct submodule_parallel_fetch *spf = data;
 
 	if (retvalue)
 		spf->result = 1;
+
+	return 0;
 }
 
 int fetch_populated_submodules(const struct argv_array *options,
@@ -671,8 +678,8 @@ int fetch_populated_submodules(const struct argv_array *options,
 	calculate_changed_submodule_paths();
 	run_processes_parallel(max_parallel_jobs, &spf,
 			       get_next_submodule,
-			       handle_submodule_fetch_start_err,
-			       handle_submodule_fetch_finish);
+			       fetch_start_failure,
+			       fetch_finish);
 
 	argv_array_clear(&spf.args);
 out:
diff --git a/test-run-command.c b/test-run-command.c
index 94c6eee..2555791 100644
--- a/test-run-command.c
+++ b/test-run-command.c
@@ -16,9 +16,9 @@
 #include <errno.h>
 
 static int number_callbacks;
-int parallel_next(void *data,
-		  struct child_process *cp,
-		  struct strbuf *err)
+static int parallel_next(void *data,
+			 struct child_process *cp,
+			 struct strbuf *err)
 {
 	struct child_process *d = data;
 	if (number_callbacks >= 4)


-- 
2.5.0.273.g6fa2560.dirty

--
To unsubscribe from this list: send the line "unsubscribe git" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html



[Index of Archives]     [Linux Kernel Development]     [Gcc Help]     [IETF Annouce]     [DCCP]     [Netdev]     [Networking]     [Security]     [V4L]     [Bugtraq]     [Yosemite]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux RAID]     [Linux SCSI]     [Fedora Users]