This patch helps address the CPU cost of loading the index by creating multiple threads to divide the work of loading and converting the cache entries across all available CPU cores. It accomplishes this by having the primary thread loop across the index file tracking the offset and (for V4 indexes) expanding the name. It creates a thread to process each block of entries as it comes to them. Once the threads are complete and the cache entries are loaded, the rest of the extensions can be loaded and processed normally on the primary thread. Performance impact: read cache .git/index times on a synthetic repo with: 100,000 entries FALSE TRUE Savings %Savings 0.014798767 0.009580433 0.005218333 35.26% 1,000,000 entries FALSE TRUE Savings %Savings 0.240896533 0.1751243 0.065772233 27.30% read cache .git/index times on an actual repo with: ~3M entries FALSE TRUE Savings %Savings 0.59898098 0.4513169 0.14766408 24.65% Signed-off-by: Ben Peart <Ben.Peart@xxxxxxxxxxxxx> --- Notes: Base Ref: master Web-Diff: https://github.com/benpeart/git/commit/67a700419b Checkout: git fetch https://github.com/benpeart/git read-index-multithread-v1 && git checkout 67a700419b Documentation/config.txt | 8 ++ config.c | 13 +++ config.h | 1 + read-cache.c | 218 ++++++++++++++++++++++++++++++++++----- 4 files changed, 216 insertions(+), 24 deletions(-) diff --git a/Documentation/config.txt b/Documentation/config.txt index 1c42364988..3344685cc4 100644 --- a/Documentation/config.txt +++ b/Documentation/config.txt @@ -899,6 +899,14 @@ relatively high IO latencies. When enabled, Git will do the index comparison to the filesystem data in parallel, allowing overlapping IO's. Defaults to true. +core.fastIndex:: + Enable parallel index loading ++ +This can speed up operations like 'git diff' and 'git status' especially +when the index is very large. When enabled, Git will do the index +loading from the on disk format to the in-memory format in parallel. +Defaults to true. + core.createObject:: You can set this to 'link', in which case a hardlink followed by a delete of the source are used to make sure that object creation diff --git a/config.c b/config.c index 9a0b10d4bc..883092fdd3 100644 --- a/config.c +++ b/config.c @@ -2289,6 +2289,19 @@ int git_config_get_fsmonitor(void) return 0; } +int git_config_get_fast_index(void) +{ + int val; + + if (!git_config_get_maybe_bool("core.fastindex", &val)) + return val; + + if (getenv("GIT_FASTINDEX_TEST")) + return 1; + + return -1; /* default value */ +} + NORETURN void git_die_config_linenr(const char *key, const char *filename, int linenr) { diff --git a/config.h b/config.h index ab46e0165d..74ca4e7db5 100644 --- a/config.h +++ b/config.h @@ -250,6 +250,7 @@ extern int git_config_get_untracked_cache(void); extern int git_config_get_split_index(void); extern int git_config_get_max_percent_split_change(void); extern int git_config_get_fsmonitor(void); +extern int git_config_get_fast_index(void); /* This dies if the configured or default date is in the future */ extern int git_config_get_expiry(const char *key, const char **output); diff --git a/read-cache.c b/read-cache.c index 7b1354d759..0fa7e1a04c 100644 --- a/read-cache.c +++ b/read-cache.c @@ -24,6 +24,10 @@ #include "utf8.h" #include "fsmonitor.h" +#ifndef min +#define min(a,b) (((a) < (b)) ? (a) : (b)) +#endif + /* Mask for the name length in ce_flags in the on-disk index */ #define CE_NAMEMASK (0x0fff) @@ -1889,16 +1893,203 @@ static size_t estimate_cache_size(size_t ondisk_size, unsigned int entries) return ondisk_size + entries * per_entry; } +static unsigned long load_cache_entry_block(struct index_state *istate, struct mem_pool *ce_mem_pool, int offset, int nr, void *mmap, unsigned long start_offset, struct strbuf *previous_name) +{ + int i; + unsigned long src_offset = start_offset; + + for (i = offset; i < offset + nr; i++) { + struct ondisk_cache_entry *disk_ce; + struct cache_entry *ce; + unsigned long consumed; + + disk_ce = (struct ondisk_cache_entry *)((char *)mmap + src_offset); + ce = create_from_disk(ce_mem_pool, disk_ce, &consumed, previous_name); + set_index_entry(istate, i, ce); + + src_offset += consumed; + } + return src_offset - start_offset; +} + +static unsigned long load_all_cache_entries(struct index_state *istate, void *mmap, size_t mmap_size, unsigned long src_offset) +{ + struct strbuf previous_name_buf = STRBUF_INIT, *previous_name; + unsigned long consumed; + + if (istate->version == 4) { + previous_name = &previous_name_buf; + mem_pool_init(&istate->ce_mem_pool, + estimate_cache_size_from_compressed(istate->cache_nr)); + } else { + previous_name = NULL; + mem_pool_init(&istate->ce_mem_pool, + estimate_cache_size(mmap_size, istate->cache_nr)); + } + + consumed = load_cache_entry_block(istate, istate->ce_mem_pool, 0, istate->cache_nr, mmap, src_offset, previous_name); + strbuf_release(&previous_name_buf); + return consumed; +} + +#ifdef NO_PTHREADS + +#define load_cache_entries load_all_cache_entries + +#else + +#include "thread-utils.h" + +/* +* Mostly randomly chosen maximum thread counts: we +* cap the parallelism to online_cpus() threads, and we want +* to have at least 7500 cache entries per thread for it to +* be worth starting a thread. +*/ +#define THREAD_COST (7500) + +struct load_cache_entries_thread_data +{ + pthread_t pthread; + struct index_state *istate; + struct mem_pool *ce_mem_pool; + int offset, nr; + void *mmap; + unsigned long start_offset; + struct strbuf previous_name_buf; + struct strbuf *previous_name; + unsigned long consumed; /* return # of bytes in index file processed */ +}; + +/* +* A thread proc to run the load_cache_entries() computation +* across multiple background threads. +*/ +static void *load_cache_entries_thread(void *_data) +{ + struct load_cache_entries_thread_data *p = _data; + + p->consumed += load_cache_entry_block(p->istate, p->ce_mem_pool, p->offset, p->nr, p->mmap, p->start_offset, p->previous_name); + return NULL; +} + +static unsigned long load_cache_entries(struct index_state *istate, void *mmap, size_t mmap_size, unsigned long src_offset) +{ + struct strbuf previous_name_buf = STRBUF_INIT, *previous_name; + struct load_cache_entries_thread_data *data; + int threads, cpus, thread_nr; + unsigned long consumed; + int i, thread; + + cpus = online_cpus(); + threads = istate->cache_nr / THREAD_COST; + if (threads > cpus) + threads = cpus; + + /* enable testing with fewer than default minimum of entries */ + if ((istate->cache_nr > 1) && (threads < 2) && getenv("GIT_FASTINDEX_TEST")) + threads = 2; + + if (threads < 2 || !git_config_get_fast_index()) + return load_all_cache_entries(istate, mmap, mmap_size, src_offset); + + mem_pool_init(&istate->ce_mem_pool, 0); + if (istate->version == 4) + previous_name = &previous_name_buf; + else + previous_name = NULL; + + thread_nr = (istate->cache_nr + threads - 1) / threads; + data = xcalloc(threads, sizeof(struct load_cache_entries_thread_data)); + + /* loop through index entries starting a thread for every thread_nr entries */ + consumed = thread = 0; + for (i = 0; ; i++) { + struct ondisk_cache_entry *ondisk; + const char *name; + unsigned int flags; + + /* we've reached the begining of a block of cache entries, kick off a thread to process them */ + if (0 == i % thread_nr) { + struct load_cache_entries_thread_data *p = &data[thread]; + + p->istate = istate; + p->offset = i; + p->nr = min(thread_nr, istate->cache_nr - i); + + /* create a mem_pool for each thread */ + if (istate->version == 4) + mem_pool_init(&p->ce_mem_pool, + estimate_cache_size_from_compressed(p->nr)); + else + mem_pool_init(&p->ce_mem_pool, + estimate_cache_size(mmap_size, p->nr)); + + p->mmap = mmap; + p->start_offset = src_offset; + if (previous_name) { + strbuf_addbuf(&p->previous_name_buf, previous_name); + p->previous_name = &p->previous_name_buf; + } + + if (pthread_create(&p->pthread, NULL, load_cache_entries_thread, p)) + die("unable to create load_cache_entries_thread"); + if (++thread == threads || p->nr != thread_nr) + break; + } + + ondisk = (struct ondisk_cache_entry *)((char *)mmap + src_offset); + + /* On-disk flags are just 16 bits */ + flags = get_be16(&ondisk->flags); + + if (flags & CE_EXTENDED) { + struct ondisk_cache_entry_extended *ondisk2; + ondisk2 = (struct ondisk_cache_entry_extended *)ondisk; + name = ondisk2->name; + } else + name = ondisk->name; + + if (!previous_name) { + size_t len; + + /* v3 and earlier */ + len = flags & CE_NAMEMASK; + if (len == CE_NAMEMASK) + len = strlen(name); + src_offset += (flags & CE_EXTENDED) ? + ondisk_cache_entry_extended_size(len) : + ondisk_cache_entry_size(len); + } else + src_offset += (name - ((char *)ondisk)) + expand_name_field(previous_name, name); + } + + for (i = 0; i < threads; i++) { + struct load_cache_entries_thread_data *p = data + i; + if (pthread_join(p->pthread, NULL)) + die("unable to join load_cache_entries_thread"); + mem_pool_combine(istate->ce_mem_pool, p->ce_mem_pool); + strbuf_release(&p->previous_name_buf); + consumed += p->consumed; + } + + free(data); + strbuf_release(&previous_name_buf); + + return consumed; +} + +#endif + /* remember to discard_cache() before reading a different cache! */ int do_read_index(struct index_state *istate, const char *path, int must_exist) { - int fd, i; + int fd; struct stat st; unsigned long src_offset; struct cache_header *hdr; void *mmap; size_t mmap_size; - struct strbuf previous_name_buf = STRBUF_INIT, *previous_name; if (istate->initialized) return istate->cache_nr; @@ -1935,29 +2126,8 @@ int do_read_index(struct index_state *istate, const char *path, int must_exist) istate->cache = xcalloc(istate->cache_alloc, sizeof(*istate->cache)); istate->initialized = 1; - if (istate->version == 4) { - previous_name = &previous_name_buf; - mem_pool_init(&istate->ce_mem_pool, - estimate_cache_size_from_compressed(istate->cache_nr)); - } else { - previous_name = NULL; - mem_pool_init(&istate->ce_mem_pool, - estimate_cache_size(mmap_size, istate->cache_nr)); - } - src_offset = sizeof(*hdr); - for (i = 0; i < istate->cache_nr; i++) { - struct ondisk_cache_entry *disk_ce; - struct cache_entry *ce; - unsigned long consumed; - - disk_ce = (struct ondisk_cache_entry *)((char *)mmap + src_offset); - ce = create_from_disk(istate->ce_mem_pool, disk_ce, &consumed, previous_name); - set_index_entry(istate, i, ce); - - src_offset += consumed; - } - strbuf_release(&previous_name_buf); + src_offset += load_cache_entries(istate, mmap, mmap_size, src_offset); istate->timestamp.sec = st.st_mtime; istate->timestamp.nsec = ST_MTIME_NSEC(st); base-commit: 29d9e3e2c47dd4b5053b0a98c891878d398463e3 -- 2.18.0.windows.1