Signed-off-by: Ben Peart <benpeart@xxxxxxxxxxxxx> --- cache.h | 1 + config.c | 5 + environment.c | 1 + unpack-trees.c | 313 ++++++++++++++++++++++++++++++++++++++++++++++++- unpack-trees.h | 30 +++++ 5 files changed, 348 insertions(+), 2 deletions(-) diff --git a/cache.h b/cache.h index d49092d94d..4bfa35c497 100644 --- a/cache.h +++ b/cache.h @@ -815,6 +815,7 @@ extern int fsync_object_files; extern int core_preload_index; extern int core_commit_graph; extern int core_apply_sparse_checkout; +extern int core_parallel_unpack_trees; extern int precomposed_unicode; extern int protect_hfs; extern int protect_ntfs; diff --git a/config.c b/config.c index f4a208a166..34d5506588 100644 --- a/config.c +++ b/config.c @@ -1346,6 +1346,11 @@ static int git_default_core_config(const char *var, const char *value) var, value); } + if (!strcmp(var, "core.parallelunpacktrees")) { + core_parallel_unpack_trees = git_config_bool(var, value); + return 0; + } + /* Add other config variables here and to Documentation/config.txt. */ return 0; } diff --git a/environment.c b/environment.c index 2a6de2330b..1eb0a05074 100644 --- a/environment.c +++ b/environment.c @@ -68,6 +68,7 @@ char *notes_ref_name; int grafts_replace_parents = 1; int core_commit_graph; int core_apply_sparse_checkout; +int core_parallel_unpack_trees; int merge_log_config = -1; int precomposed_unicode = -1; /* see probe_utf8_pathname_composition() */ unsigned long pack_size_limit_cfg; diff --git a/unpack-trees.c b/unpack-trees.c index 1f58efc6bb..2333626efd 100644 --- a/unpack-trees.c +++ b/unpack-trees.c @@ -17,6 +17,7 @@ #include "submodule-config.h" #include "fsmonitor.h" #include "fetch-object.h" +#include "thread-utils.h" /* * Error messages expected by scripts out of plumbing commands such as @@ -641,6 +642,98 @@ static inline int are_same_oid(struct name_entry *name_j, struct name_entry *nam return name_j->oid && name_k->oid && !oidcmp(name_j->oid, name_k->oid); } +#ifndef NO_PTHREADS + +struct traverse_info_parallel { + struct mpmcq_entry entry; + struct tree_desc t[MAX_UNPACK_TREES]; + void *buf[MAX_UNPACK_TREES]; + struct traverse_info info; + int n; + int nr_buf; + int ret; +}; + +static int traverse_trees_parallel(int n, unsigned long dirmask, + unsigned long df_conflicts, + struct name_entry *names, + struct traverse_info *info) +{ + int i; + struct name_entry *p; + struct unpack_trees_options *o = info->data; + struct traverse_info_parallel *newinfo; + + p = names; + while (!p->mode) + p++; + + newinfo = xmalloc(sizeof(struct traverse_info_parallel)); + mpmcq_entry_init(&newinfo->entry); + newinfo->info = *info; + newinfo->info.prev = info; + newinfo->info.pathspec = info->pathspec; + newinfo->info.name = *p; + newinfo->info.pathlen += tree_entry_len(p) + 1; + newinfo->info.df_conflicts |= df_conflicts; + newinfo->nr_buf = 0; + newinfo->n = n; + + /* + * Fetch the tree from the ODB for each peer directory in the + * n commits. + * + * For 2- and 3-way traversals, we try to avoid hitting the + * ODB twice for the same OID. This should yield a nice speed + * up in checkouts and merges when the commits are similar. + * + * We don't bother doing the full O(n^2) search for larger n, + * because wider traversals don't happen that often and we + * avoid the search setup. + * + * When 2 peer OIDs are the same, we just copy the tree + * descriptor data. This implicitly borrows the buffer + * data from the earlier cell. + */ + for (i = 0; i < n; i++, dirmask >>= 1) { + if (i > 0 && are_same_oid(&names[i], &names[i - 1])) + newinfo->t[i] = newinfo->t[i - 1]; + else if (i > 1 && are_same_oid(&names[i], &names[i - 2])) + newinfo->t[i] = newinfo->t[i - 2]; + else { + const struct object_id *oid = NULL; + if (dirmask & 1) + oid = names[i].oid; + + /* + * fill_tree_descriptor() will load the tree from the + * ODB. Accessing the ODB is not thread safe so + * serialize access using the odb_mutex. + */ + pthread_mutex_lock(&o->odb_mutex); + newinfo->buf[newinfo->nr_buf++] = + fill_tree_descriptor(newinfo->t + i, oid); + pthread_mutex_unlock(&o->odb_mutex); + } + } + + /* + * We can't play games with the cache bottom as we are processing + * the tree objects in parallel. + * newinfo->bottom = switch_cache_bottom(&newinfo->info); + */ + + /* All I really need here is fetch_and_add() */ + pthread_mutex_lock(&o->work_mutex); + o->remaining_work++; + pthread_mutex_unlock(&o->work_mutex); + mpmcq_push(&o->queue, &newinfo->entry); + + return 0; +} + +#endif + static int traverse_trees_recursive(int n, unsigned long dirmask, unsigned long df_conflicts, struct name_entry *names, @@ -995,6 +1088,108 @@ static void debug_unpack_callback(int n, debug_name_entry(i, names + i); } +static int unpack_callback_parallel(int n, unsigned long mask, + unsigned long dirmask, + struct name_entry *names, + struct traverse_info *info) +{ + struct cache_entry *src[MAX_UNPACK_TREES + 1] = { + NULL, + }; + struct unpack_trees_options *o = info->data; + const struct name_entry *p = names; + + /* Find first entry with a real name (we could use "mask" too) */ + while (!p->mode) + p++; + + if (o->debug_unpack) + debug_unpack_callback(n, mask, dirmask, names, info); + + /* Are we supposed to look at the index too? */ + if (o->merge) { + while (1) { + int cmp; + struct cache_entry *ce; + + if (o->diff_index_cached) + ce = next_cache_entry(o); + else + ce = find_cache_entry(info, p); + + if (!ce) + break; + cmp = compare_entry(ce, info, p); + if (cmp < 0) { + int ret; + + pthread_mutex_lock(&o->unpack_index_entry_mutex); + ret = unpack_index_entry(ce, o); + pthread_mutex_unlock(&o->unpack_index_entry_mutex); + if (ret < 0) + return unpack_failed(o, NULL); + continue; + } + if (!cmp) { + if (ce_stage(ce)) { + /* + * If we skip unmerged index + * entries, we'll skip this + * entry *and* the tree + * entries associated with it! + */ + if (o->skip_unmerged) { + add_same_unmerged(ce, o); + return mask; + } + } + src[0] = ce; + } + break; + } + } + + pthread_mutex_lock(&o->unpack_nondirectories_mutex); + int ret = unpack_nondirectories(n, mask, dirmask, src, names, info); + pthread_mutex_unlock(&o->unpack_nondirectories_mutex); + if (ret < 0) + return -1; + + if (o->merge && src[0]) { + if (ce_stage(src[0])) + mark_ce_used_same_name(src[0], o); + else + mark_ce_used(src[0], o); + } + + /* Now handle any directories.. */ + if (dirmask) { + /* special case: "diff-index --cached" looking at a tree */ + if (o->diff_index_cached && n == 1 && dirmask == 1 && + S_ISDIR(names->mode)) { + int matches; + matches = cache_tree_matches_traversal( + o->src_index->cache_tree, names, info); + /* + * Everything under the name matches; skip the + * entire hierarchy. diff_index_cached codepath + * special cases D/F conflicts in such a way that + * it does not do any look-ahead, so this is safe. + */ + if (matches) { + o->cache_bottom += matches; + return mask; + } + } + + if (traverse_trees_parallel(n, dirmask, mask & ~dirmask, names, info) < 0) + return -1; + return mask; + } + + return mask; +} + static int unpack_callback(int n, unsigned long mask, unsigned long dirmask, struct name_entry *names, struct traverse_info *info) { struct cache_entry *src[MAX_UNPACK_TREES + 1] = { NULL, }; @@ -1263,6 +1458,116 @@ static void mark_new_skip_worktree(struct exclude_list *el, static int verify_absent(const struct cache_entry *, enum unpack_trees_error_types, struct unpack_trees_options *); + +#ifndef NO_PTHREADS +static void *traverse_trees_parallel_thread_proc(void *_data) +{ + struct unpack_trees_options *o = _data; + struct traverse_info_parallel *info; + int i; + + while (1) { + info = (struct traverse_info_parallel *)mpmcq_pop(&o->queue); + if (!info) + break; + + info->ret = traverse_trees(info->n, info->t, &info->info); + /* + * We can't play games with the cache bottom as we are processing + * the tree objects in parallel. + * restore_cache_bottom(&info->info, info->bottom); + */ + + for (i = 0; i < info->nr_buf; i++) + free(info->buf[i]); + /* + * TODO: Can't free "info" when thread is done because it can be used + * as ->prev link in child info objects. Ref count? Free all at end? + free(info); + */ + + /* All I really need here is fetch_and_add() */ + pthread_mutex_lock(&o->work_mutex); + o->remaining_work--; + if (o->remaining_work == 0) + mpmcq_cancel(&o->queue); + pthread_mutex_unlock(&o->work_mutex); + } + + return NULL; +} + +static void init_parallel_traverse(struct unpack_trees_options *o, + struct traverse_info *info) +{ + /* + * TODO: Add logic to bypass parallel path when not needed. + * - not enough CPU cores to help + * - 'git status' is always fast - how to detect? + * - small trees (may be able to use index size as proxy, small index likely means small commit tree) + */ + if (core_parallel_unpack_trees) { + int t; + + mpmcq_init(&o->queue); + o->remaining_work = 0; + pthread_mutex_init(&o->unpack_nondirectories_mutex, NULL); + pthread_mutex_init(&o->unpack_index_entry_mutex, NULL); + pthread_mutex_init(&o->odb_mutex, NULL); + pthread_mutex_init(&o->work_mutex, NULL); + o->nr_threads = online_cpus(); + o->pthreads = xcalloc(o->nr_threads, sizeof(pthread_t)); + info->fn = unpack_callback_parallel; + + for (t = 0; t < o->nr_threads; t++) { + if (pthread_create(&o->pthreads[t], NULL, + traverse_trees_parallel_thread_proc, + o)) + die("unable to create traverse_trees_parallel_thread"); + } + } +} + +static void wait_parallel_traverse(struct unpack_trees_options *o) +{ + /* + * The first tree (root directory) is processed on the main thread. + * This function is called after it has completed. If there is no + * remaining work, we know we are finished. + */ + if (core_parallel_unpack_trees) { + int t; + + pthread_mutex_lock(&o->work_mutex); + if (o->remaining_work == 0) + mpmcq_cancel(&o->queue); + pthread_mutex_unlock(&o->work_mutex); + + for (t = 0; t < o->nr_threads; t++) { + if (pthread_join(o->pthreads[t], NULL)) + die("unable to join traverse_trees_parallel_thread"); + } + + free(o->pthreads); + pthread_mutex_destroy(&o->work_mutex); + pthread_mutex_destroy(&o->odb_mutex); + pthread_mutex_destroy(&o->unpack_index_entry_mutex); + pthread_mutex_destroy(&o->unpack_nondirectories_mutex); + mpmcq_destroy(&o->queue); + } +} +#else +static void init_parallel_traverse(struct unpack_trees_options *o) +{ + return; +} + +static void wait_parallel_traverse(struct unpack_trees_options *o) +{ + return; +} +#endif + /* * N-way merge "len" trees. Returns 0 on success, -1 on failure to manipulate the * resulting index, -2 on failure to reflect the changes to the work tree. @@ -1327,6 +1632,7 @@ int unpack_trees(unsigned len, struct tree_desc *t, struct unpack_trees_options const char *prefix = o->prefix ? o->prefix : ""; struct traverse_info info; uint64_t start; + int ret; setup_traverse_info(&info, prefix); info.fn = unpack_callback; @@ -1352,9 +1658,12 @@ int unpack_trees(unsigned len, struct tree_desc *t, struct unpack_trees_options } start = getnanotime(); - if (traverse_trees(len, t, &info) < 0) - goto return_failed; + init_parallel_traverse(o, &info); + ret = traverse_trees(len, t, &info); + wait_parallel_traverse(o); trace_performance_since(start, "traverse_trees"); + if (ret < 0) + goto return_failed; } /* Any left-over entries in the index? */ diff --git a/unpack-trees.h b/unpack-trees.h index c2b434c606..b7140099fa 100644 --- a/unpack-trees.h +++ b/unpack-trees.h @@ -3,6 +3,11 @@ #include "tree-walk.h" #include "argv-array.h" +#ifndef NO_PTHREADS +#include "git-compat-util.h" +#include <pthread.h> +#include "mpmcqueue.h" +#endif #define MAX_UNPACK_TREES 8 @@ -80,6 +85,31 @@ struct unpack_trees_options { struct index_state result; struct exclude_list *el; /* for internal use */ +#ifndef NO_PTHREADS + /* + * Speed up the tree traversal by adding all discovered tree objects + * into a queue and have a pool of worker threads process them in + * parallel. Since there is no upper bound on the size of a tree and + * each worker thread will be adding discovered tree objects to the + * queue, we need an unbounded multi-producer-multi-consumer queue. + */ + struct mpmcq queue; + + int nr_threads; + pthread_t *pthreads; + + /* need a mutex as we don't have fetch_and_add() */ + int remaining_work; + pthread_mutex_t work_mutex; + + /* The ODB is not thread safe so we must serialize access to it */ + pthread_mutex_t odb_mutex; + + /* various functions that are not thread safe and must be serialized for now */ + pthread_mutex_t unpack_index_entry_mutex; + pthread_mutex_t unpack_nondirectories_mutex; + +#endif }; extern int unpack_trees(unsigned n, struct tree_desc *t, -- 2.17.0.gvfs.1.123.g449c066