[PATCH v1 3/3] Add initial parallel version of unpack_trees()

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Signed-off-by: Ben Peart <benpeart@xxxxxxxxxxxxx>
---
 cache.h        |   1 +
 config.c       |   5 +
 environment.c  |   1 +
 unpack-trees.c | 313 ++++++++++++++++++++++++++++++++++++++++++++++++-
 unpack-trees.h |  30 +++++
 5 files changed, 348 insertions(+), 2 deletions(-)

diff --git a/cache.h b/cache.h
index d49092d94d..4bfa35c497 100644
--- a/cache.h
+++ b/cache.h
@@ -815,6 +815,7 @@ extern int fsync_object_files;
 extern int core_preload_index;
 extern int core_commit_graph;
 extern int core_apply_sparse_checkout;
+extern int core_parallel_unpack_trees;
 extern int precomposed_unicode;
 extern int protect_hfs;
 extern int protect_ntfs;
diff --git a/config.c b/config.c
index f4a208a166..34d5506588 100644
--- a/config.c
+++ b/config.c
@@ -1346,6 +1346,11 @@ static int git_default_core_config(const char *var, const char *value)
 					 var, value);
 	}
 
+	if (!strcmp(var, "core.parallelunpacktrees")) {
+		core_parallel_unpack_trees = git_config_bool(var, value);
+		return 0;
+	}
+
 	/* Add other config variables here and to Documentation/config.txt. */
 	return 0;
 }
diff --git a/environment.c b/environment.c
index 2a6de2330b..1eb0a05074 100644
--- a/environment.c
+++ b/environment.c
@@ -68,6 +68,7 @@ char *notes_ref_name;
 int grafts_replace_parents = 1;
 int core_commit_graph;
 int core_apply_sparse_checkout;
+int core_parallel_unpack_trees;
 int merge_log_config = -1;
 int precomposed_unicode = -1; /* see probe_utf8_pathname_composition() */
 unsigned long pack_size_limit_cfg;
diff --git a/unpack-trees.c b/unpack-trees.c
index 1f58efc6bb..2333626efd 100644
--- a/unpack-trees.c
+++ b/unpack-trees.c
@@ -17,6 +17,7 @@
 #include "submodule-config.h"
 #include "fsmonitor.h"
 #include "fetch-object.h"
+#include "thread-utils.h"
 
 /*
  * Error messages expected by scripts out of plumbing commands such as
@@ -641,6 +642,98 @@ static inline int are_same_oid(struct name_entry *name_j, struct name_entry *nam
 	return name_j->oid && name_k->oid && !oidcmp(name_j->oid, name_k->oid);
 }
 
+#ifndef NO_PTHREADS
+
+struct traverse_info_parallel {
+	struct mpmcq_entry entry;
+	struct tree_desc t[MAX_UNPACK_TREES];
+	void *buf[MAX_UNPACK_TREES];
+	struct traverse_info info;
+	int n;
+	int nr_buf;
+	int ret;
+};
+
+static int traverse_trees_parallel(int n, unsigned long dirmask,
+				   unsigned long df_conflicts,
+				   struct name_entry *names,
+				   struct traverse_info *info)
+{
+	int i;
+	struct name_entry *p;
+	struct unpack_trees_options *o = info->data;
+	struct traverse_info_parallel *newinfo;
+
+	p = names;
+	while (!p->mode)
+		p++;
+
+	newinfo = xmalloc(sizeof(struct traverse_info_parallel));
+	mpmcq_entry_init(&newinfo->entry);
+	newinfo->info = *info;
+	newinfo->info.prev = info;
+	newinfo->info.pathspec = info->pathspec;
+	newinfo->info.name = *p;
+	newinfo->info.pathlen += tree_entry_len(p) + 1;
+	newinfo->info.df_conflicts |= df_conflicts;
+	newinfo->nr_buf = 0;
+	newinfo->n = n;
+
+	/*
+	 * Fetch the tree from the ODB for each peer directory in the
+	 * n commits.
+	 *
+	 * For 2- and 3-way traversals, we try to avoid hitting the
+	 * ODB twice for the same OID.  This should yield a nice speed
+	 * up in checkouts and merges when the commits are similar.
+	 *
+	 * We don't bother doing the full O(n^2) search for larger n,
+	 * because wider traversals don't happen that often and we
+	 * avoid the search setup.
+	 *
+	 * When 2 peer OIDs are the same, we just copy the tree
+	 * descriptor data.  This implicitly borrows the buffer
+	 * data from the earlier cell.
+	 */
+	for (i = 0; i < n; i++, dirmask >>= 1) {
+		if (i > 0 && are_same_oid(&names[i], &names[i - 1]))
+			newinfo->t[i] = newinfo->t[i - 1];
+		else if (i > 1 && are_same_oid(&names[i], &names[i - 2]))
+			newinfo->t[i] = newinfo->t[i - 2];
+		else {
+			const struct object_id *oid = NULL;
+			if (dirmask & 1)
+				oid = names[i].oid;
+
+			/*
+			 * fill_tree_descriptor() will load the tree from the
+			 * ODB. Accessing the ODB is not thread safe so
+			 * serialize access using the odb_mutex.
+			 */
+			pthread_mutex_lock(&o->odb_mutex);
+			newinfo->buf[newinfo->nr_buf++] =
+				fill_tree_descriptor(newinfo->t + i, oid);
+			pthread_mutex_unlock(&o->odb_mutex);
+		}
+	}
+
+	/*
+	 * We can't play games with the cache bottom as we are processing
+	 * the tree objects in parallel.
+	 * newinfo->bottom = switch_cache_bottom(&newinfo->info);
+	 */
+
+	/* All I really need here is fetch_and_add() */
+	pthread_mutex_lock(&o->work_mutex);
+	o->remaining_work++;
+	pthread_mutex_unlock(&o->work_mutex);
+	mpmcq_push(&o->queue, &newinfo->entry);
+
+	return 0;
+}
+
+#endif
+
 static int traverse_trees_recursive(int n, unsigned long dirmask,
 				    unsigned long df_conflicts,
 				    struct name_entry *names,
@@ -995,6 +1088,108 @@ static void debug_unpack_callback(int n,
 		debug_name_entry(i, names + i);
 }
 
+static int unpack_callback_parallel(int n, unsigned long mask,
+				    unsigned long dirmask,
+				    struct name_entry *names,
+				    struct traverse_info *info)
+{
+	struct cache_entry *src[MAX_UNPACK_TREES + 1] = {
+		NULL,
+	};
+	struct unpack_trees_options *o = info->data;
+	const struct name_entry *p = names;
+
+	/* Find first entry with a real name (we could use "mask" too) */
+	while (!p->mode)
+		p++;
+
+	if (o->debug_unpack)
+		debug_unpack_callback(n, mask, dirmask, names, info);
+
+	/* Are we supposed to look at the index too? */
+	if (o->merge) {
+		while (1) {
+			int cmp;
+			struct cache_entry *ce;
+
+			if (o->diff_index_cached)
+				ce = next_cache_entry(o);
+			else
+				ce = find_cache_entry(info, p);
+
+			if (!ce)
+				break;
+			cmp = compare_entry(ce, info, p);
+			if (cmp < 0) {
+				int ret;
+
+				pthread_mutex_lock(&o->unpack_index_entry_mutex);
+				ret = unpack_index_entry(ce, o);
+				pthread_mutex_unlock(&o->unpack_index_entry_mutex);
+				if (ret < 0)
+					return unpack_failed(o, NULL);
+				continue;
+			}
+			if (!cmp) {
+				if (ce_stage(ce)) {
+					/*
+					 * If we skip unmerged index
+					 * entries, we'll skip this
+					 * entry *and* the tree
+					 * entries associated with it!
+					 */
+					if (o->skip_unmerged) {
+						add_same_unmerged(ce, o);
+						return mask;
+					}
+				}
+				src[0] = ce;
+			}
+			break;
+		}
+	}
+
+	pthread_mutex_lock(&o->unpack_nondirectories_mutex);
+	int ret = unpack_nondirectories(n, mask, dirmask, src, names, info);
+	pthread_mutex_unlock(&o->unpack_nondirectories_mutex);
+	if (ret < 0)
+		return -1;
+
+	if (o->merge && src[0]) {
+		if (ce_stage(src[0]))
+			mark_ce_used_same_name(src[0], o);
+		else
+			mark_ce_used(src[0], o);
+	}
+
+	/* Now handle any directories.. */
+	if (dirmask) {
+		/* special case: "diff-index --cached" looking at a tree */
+		if (o->diff_index_cached && n == 1 && dirmask == 1 &&
+		    S_ISDIR(names->mode)) {
+			int matches;
+			matches = cache_tree_matches_traversal(
+				o->src_index->cache_tree, names, info);
+			/*
+			 * Everything under the name matches; skip the
+			 * entire hierarchy.  diff_index_cached codepath
+			 * special cases D/F conflicts in such a way that
+			 * it does not do any look-ahead, so this is safe.
+			 */
+			if (matches) {
+				o->cache_bottom += matches;
+				return mask;
+			}
+		}
+
+		if (traverse_trees_parallel(n, dirmask, mask & ~dirmask, names, info) < 0)
+			return -1;
+		return mask;
+	}
+
+	return mask;
+}
+
 static int unpack_callback(int n, unsigned long mask, unsigned long dirmask, struct name_entry *names, struct traverse_info *info)
 {
 	struct cache_entry *src[MAX_UNPACK_TREES + 1] = { NULL, };
@@ -1263,6 +1458,116 @@ static void mark_new_skip_worktree(struct exclude_list *el,
 static int verify_absent(const struct cache_entry *,
 			 enum unpack_trees_error_types,
 			 struct unpack_trees_options *);
+
+#ifndef NO_PTHREADS
+static void *traverse_trees_parallel_thread_proc(void *_data)
+{
+	struct unpack_trees_options *o = _data;
+	struct traverse_info_parallel *info;
+	int i;
+
+	while (1) {
+		info = (struct traverse_info_parallel *)mpmcq_pop(&o->queue);
+		if (!info)
+			break;
+
+		info->ret = traverse_trees(info->n, info->t, &info->info);
+		/*
+		 * We can't play games with the cache bottom as we are processing
+		 * the tree objects in parallel.
+		 * restore_cache_bottom(&info->info, info->bottom);
+		 */
+
+		for (i = 0; i < info->nr_buf; i++)
+			free(info->buf[i]);
+		/*
+		 * TODO: Can't free "info" when thread is done because it can be used
+		 * as ->prev link in child info objects.  Ref count?  Free all at end?
+		free(info);
+		 */
+
+		/* All I really need here is fetch_and_add() */
+		pthread_mutex_lock(&o->work_mutex);
+		o->remaining_work--;
+		if (o->remaining_work == 0)
+			mpmcq_cancel(&o->queue);
+		pthread_mutex_unlock(&o->work_mutex);
+	}
+
+	return NULL;
+}
+
+static void init_parallel_traverse(struct unpack_trees_options *o,
+				   struct traverse_info *info)
+{
+	/*
+	 * TODO: Add logic to bypass parallel path when not needed.
+	 *			- not enough CPU cores to help
+	 *			- 'git status' is always fast - how to detect?
+	 *			- small trees (may be able to use index size as proxy, small index likely means small commit tree)
+	 */
+	if (core_parallel_unpack_trees) {
+		int t;
+
+		mpmcq_init(&o->queue);
+		o->remaining_work = 0;
+		pthread_mutex_init(&o->unpack_nondirectories_mutex, NULL);
+		pthread_mutex_init(&o->unpack_index_entry_mutex, NULL);
+		pthread_mutex_init(&o->odb_mutex, NULL);
+		pthread_mutex_init(&o->work_mutex, NULL);
+		o->nr_threads = online_cpus();
+		o->pthreads = xcalloc(o->nr_threads, sizeof(pthread_t));
+		info->fn = unpack_callback_parallel;
+
+		for (t = 0; t < o->nr_threads; t++) {
+			if (pthread_create(&o->pthreads[t], NULL,
+					   traverse_trees_parallel_thread_proc,
+					   o))
+				die("unable to create traverse_trees_parallel_thread");
+		}
+	}
+}
+
+static void wait_parallel_traverse(struct unpack_trees_options *o)
+{
+	/*
+	 * The first tree (root directory) is processed on the main thread.
+	 * This function is called after it has completed.  If there is no
+	 * remaining work, we know we are finished.
+	 */
+	if (core_parallel_unpack_trees) {
+		int t;
+
+		pthread_mutex_lock(&o->work_mutex);
+		if (o->remaining_work == 0)
+			mpmcq_cancel(&o->queue);
+		pthread_mutex_unlock(&o->work_mutex);
+
+		for (t = 0; t < o->nr_threads; t++) {
+			if (pthread_join(o->pthreads[t], NULL))
+				die("unable to join traverse_trees_parallel_thread");
+		}
+
+		free(o->pthreads);
+		pthread_mutex_destroy(&o->work_mutex);
+		pthread_mutex_destroy(&o->odb_mutex);
+		pthread_mutex_destroy(&o->unpack_index_entry_mutex);
+		pthread_mutex_destroy(&o->unpack_nondirectories_mutex);
+		mpmcq_destroy(&o->queue);
+	}
+}
+#else
+static void init_parallel_traverse(struct unpack_trees_options *o)
+{
+	return;
+}
+
+static void wait_parallel_traverse(struct unpack_trees_options *o)
+{
+	return;
+}
+#endif
+
 /*
  * N-way merge "len" trees.  Returns 0 on success, -1 on failure to manipulate the
  * resulting index, -2 on failure to reflect the changes to the work tree.
@@ -1327,6 +1632,7 @@ int unpack_trees(unsigned len, struct tree_desc *t, struct unpack_trees_options
 		const char *prefix = o->prefix ? o->prefix : "";
 		struct traverse_info info;
 		uint64_t start;
+		int ret;
 
 		setup_traverse_info(&info, prefix);
 		info.fn = unpack_callback;
@@ -1352,9 +1658,12 @@ int unpack_trees(unsigned len, struct tree_desc *t, struct unpack_trees_options
 		}
 
 		start = getnanotime();
-		if (traverse_trees(len, t, &info) < 0)
-			goto return_failed;
+		init_parallel_traverse(o, &info);
+		ret = traverse_trees(len, t, &info);
+		wait_parallel_traverse(o);
 		trace_performance_since(start, "traverse_trees");
+		if (ret < 0)
+			goto return_failed;
 	}
 
 	/* Any left-over entries in the index? */
diff --git a/unpack-trees.h b/unpack-trees.h
index c2b434c606..b7140099fa 100644
--- a/unpack-trees.h
+++ b/unpack-trees.h
@@ -3,6 +3,11 @@
 
 #include "tree-walk.h"
 #include "argv-array.h"
+#ifndef NO_PTHREADS
+#include "git-compat-util.h"
+#include <pthread.h>
+#include "mpmcqueue.h"
+#endif
 
 #define MAX_UNPACK_TREES 8
 
@@ -80,6 +85,31 @@ struct unpack_trees_options {
 	struct index_state result;
 
 	struct exclude_list *el; /* for internal use */
+#ifndef NO_PTHREADS
+	/*
+	 * Speed up the tree traversal by adding all discovered tree objects
+	 * into a queue and have a pool of worker threads process them in
+	 * parallel.  Since there is no upper bound on the size of a tree and
+	 * each worker thread will be adding discovered tree objects to the
+	 * queue, we need an unbounded multi-producer-multi-consumer queue.
+	 */
+	struct mpmcq queue;
+
+	int nr_threads;
+	pthread_t *pthreads;
+
+	/* need a mutex as we don't have fetch_and_add() */
+	int remaining_work;
+	pthread_mutex_t work_mutex;
+
+	/* The ODB is not thread safe so we must serialize access to it */
+	pthread_mutex_t odb_mutex;
+
+	/* various functions that are not thread safe and must be serialized for now */
+	pthread_mutex_t unpack_index_entry_mutex;
+	pthread_mutex_t unpack_nondirectories_mutex;
+
+#endif
 };
 
 extern int unpack_trees(unsigned n, struct tree_desc *t,
-- 
2.17.0.gvfs.1.123.g449c066





[Index of Archives]     [Linux Kernel Development]     [Gcc Help]     [IETF Annouce]     [DCCP]     [Netdev]     [Networking]     [Security]     [V4L]     [Bugtraq]     [Yosemite]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux RAID]     [Linux SCSI]     [Fedora Users]

  Powered by Linux