[PATCH v8 0/7] speed up index load through parallelization

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



From: Ben Peart <benpeart@xxxxxxxxxxxxx>

Fixed issues identified in review the most impactful probably being plugging
some leaks and improved error handling.  Also added better error messages
and some code cleanup to code I'd touched.

The biggest change in the interdiff is the impact of renaming ieot_offset to
ieot_start and ieot_work to ieot_blocks in hopes of making it easier to read
and understand the code.

Base Ref: master
Web-Diff: https://github.com/benpeart/git/commit/6caa0bac46
Checkout: git fetch https://github.com/benpeart/git read-index-multithread-v8 && git checkout 6caa0bac46


### Interdiff (v7..v8):

diff --git a/read-cache.c b/read-cache.c
index 14402a0738..7acc2c86f4 100644
--- a/read-cache.c
+++ b/read-cache.c
@@ -1901,7 +1901,7 @@ struct index_entry_offset
 struct index_entry_offset_table
 {
 	int nr;
-	struct index_entry_offset entries[0];
+	struct index_entry_offset entries[FLEX_ARRAY];
 };
 
 #ifndef NO_PTHREADS
@@ -1935,9 +1935,7 @@ static void *load_index_extensions(void *_data)
 		 * extension name (4-byte) and section length
 		 * in 4-byte network byte order.
 		 */
-		uint32_t extsize;
-		memcpy(&extsize, p->mmap + src_offset + 4, 4);
-		extsize = ntohl(extsize);
+		uint32_t extsize = get_be32(p->mmap + src_offset + 4);
 		if (read_index_extension(p->istate,
 					 p->mmap + src_offset,
 					 p->mmap + src_offset + 8,
@@ -2015,8 +2013,8 @@ struct load_cache_entries_thread_data
 	int offset;
 	const char *mmap;
 	struct index_entry_offset_table *ieot;
-	int ieot_offset;        /* starting index into the ieot array */
-	int ieot_work;          /* count of ieot entries to process */
+	int ieot_start;		/* starting index into the ieot array */
+	int ieot_blocks;	/* count of ieot entries to process */
 	unsigned long consumed;	/* return # of bytes in index file processed */
 };
 
@@ -2030,8 +2028,9 @@ static void *load_cache_entries_thread(void *_data)
 	int i;
 
 	/* iterate across all ieot blocks assigned to this thread */
-	for (i = p->ieot_offset; i < p->ieot_offset + p->ieot_work; i++) {
-		p->consumed += load_cache_entry_block(p->istate, p->ce_mem_pool, p->offset, p->ieot->entries[i].nr, p->mmap, p->ieot->entries[i].offset, NULL);
+	for (i = p->ieot_start; i < p->ieot_start + p->ieot_blocks; i++) {
+		p->consumed += load_cache_entry_block(p->istate, p->ce_mem_pool,
+			p->offset, p->ieot->entries[i].nr, p->mmap, p->ieot->entries[i].offset, NULL);
 		p->offset += p->ieot->entries[i].nr;
 	}
 	return NULL;
@@ -2040,48 +2039,45 @@ static void *load_cache_entries_thread(void *_data)
 static unsigned long load_cache_entries_threaded(struct index_state *istate, const char *mmap, size_t mmap_size,
 			unsigned long src_offset, int nr_threads, struct index_entry_offset_table *ieot)
 {
-	int i, offset, ieot_work, ieot_offset, err;
+	int i, offset, ieot_blocks, ieot_start, err;
 	struct load_cache_entries_thread_data *data;
 	unsigned long consumed = 0;
-	int nr;
 
 	/* a little sanity checking */
 	if (istate->name_hash_initialized)
 		BUG("the name hash isn't thread safe");
 
 	mem_pool_init(&istate->ce_mem_pool, 0);
-	data = xcalloc(nr_threads, sizeof(struct load_cache_entries_thread_data));
 
 	/* ensure we have no more threads than we have blocks to process */
 	if (nr_threads > ieot->nr)
 		nr_threads = ieot->nr;
-	data = xcalloc(nr_threads, sizeof(struct load_cache_entries_thread_data));
+	data = xcalloc(nr_threads, sizeof(*data));
 
-	offset = ieot_offset = 0;
-	ieot_work = DIV_ROUND_UP(ieot->nr, nr_threads);
+	offset = ieot_start = 0;
+	ieot_blocks = DIV_ROUND_UP(ieot->nr, nr_threads);
 	for (i = 0; i < nr_threads; i++) {
 		struct load_cache_entries_thread_data *p = &data[i];
-		int j;
+		int nr, j;
 
-		if (ieot_offset + ieot_work > ieot->nr)
-			ieot_work = ieot->nr - ieot_offset;
+		if (ieot_start + ieot_blocks > ieot->nr)
+			ieot_blocks = ieot->nr - ieot_start;
 
 		p->istate = istate;
 		p->offset = offset;
 		p->mmap = mmap;
 		p->ieot = ieot;
-		p->ieot_offset = ieot_offset;
-		p->ieot_work = ieot_work;
+		p->ieot_start = ieot_start;
+		p->ieot_blocks = ieot_blocks;
 
 		/* create a mem_pool for each thread */
 		nr = 0;
-		for (j = p->ieot_offset; j < p->ieot_offset + p->ieot_work; j++)
+		for (j = p->ieot_start; j < p->ieot_start + p->ieot_blocks; j++)
 			nr += p->ieot->entries[j].nr;
 		if (istate->version == 4) {
 			mem_pool_init(&p->ce_mem_pool,
 				estimate_cache_size_from_compressed(nr));
-		}
-		else {
+		} else {
 			mem_pool_init(&p->ce_mem_pool,
 				estimate_cache_size(mmap_size, nr));
 		}
@@ -2091,9 +2087,9 @@ static unsigned long load_cache_entries_threaded(struct index_state *istate, con
 			die(_("unable to create load_cache_entries thread: %s"), strerror(err));
 
 		/* increment by the number of cache entries in the ieot block being processed */
-		for (j = 0; j < ieot_work; j++)
-			offset += ieot->entries[ieot_offset + j].nr;
-		ieot_offset += ieot_work;
+		for (j = 0; j < ieot_blocks; j++)
+			offset += ieot->entries[ieot_start + j].nr;
+		ieot_start += ieot_blocks;
 	}
 
 	for (i = 0; i < nr_threads; i++) {
@@ -2201,10 +2197,12 @@ int do_read_index(struct index_state *istate, const char *path, int must_exist)
 	if (extension_offset && nr_threads > 1)
 		ieot = read_ieot_extension(mmap, mmap_size, extension_offset);
 
-	if (ieot)
+	if (ieot) {
 		src_offset += load_cache_entries_threaded(istate, mmap, mmap_size, src_offset, nr_threads, ieot);
-	else
+		free(ieot);
+	} else {
 		src_offset += load_all_cache_entries(istate, mmap, mmap_size, src_offset);
+	}
 #else
 	src_offset += load_all_cache_entries(istate, mmap, mmap_size, src_offset);
 #endif
@@ -2705,9 +2703,9 @@ static int do_write_index(struct index_state *istate, struct tempfile *tempfile,
 	struct strbuf previous_name_buf = STRBUF_INIT, *previous_name;
 	int drop_cache_tree = istate->drop_cache_tree;
 	off_t offset;
-	int ieot_work = 1;
+	int ieot_entries = 1;
 	struct index_entry_offset_table *ieot = NULL;
-	int nr;
+	int nr, nr_threads;
 
 	for (i = removed = extended = 0; i < entries; i++) {
 		if (cache[i]->ce_flags & CE_REMOVE)
@@ -2742,20 +2740,24 @@ static int do_write_index(struct index_state *istate, struct tempfile *tempfile,
 		return -1;
 
 #ifndef NO_PTHREADS
-	if ((nr = git_config_get_index_threads()) != 1) {
+	nr_threads = git_config_get_index_threads();
+	if (nr_threads != 1) {
 		int ieot_blocks, cpus;
 
 		/*
 		 * ensure default number of ieot blocks maps evenly to the
-		 * default number of threads that will process them
+		 * default number of threads that will process them leaving
+		 * room for the thread to load the index extensions.
 		 */
-		if (!nr) {
+		if (!nr_threads) {
 			ieot_blocks = istate->cache_nr / THREAD_COST;
 			cpus = online_cpus();
 			if (ieot_blocks > cpus - 1)
 				ieot_blocks = cpus - 1;
 		} else {
-			ieot_blocks = nr;
+			ieot_blocks = nr_threads;
+			if (ieot_blocks > istate->cache_nr)
+				ieot_blocks = istate->cache_nr;
 		}
 
 		/*
@@ -2765,13 +2767,17 @@ static int do_write_index(struct index_state *istate, struct tempfile *tempfile,
 		if (ieot_blocks > 1) {
 			ieot = xcalloc(1, sizeof(struct index_entry_offset_table)
 				+ (ieot_blocks * sizeof(struct index_entry_offset)));
-			ieot->nr = 0;
-			ieot_work = DIV_ROUND_UP(entries, ieot_blocks);
+			ieot_entries = DIV_ROUND_UP(entries, ieot_blocks);
 		}
 	}
 #endif
 
-	offset = lseek(newfd, 0, SEEK_CUR) + write_buffer_len;
+	offset = lseek(newfd, 0, SEEK_CUR);
+	if (offset < 0) {
+		free(ieot);
+		return -1;
+	}
+	offset += write_buffer_len;
 	nr = 0;
 	previous_name = (hdr_version == 4) ? &previous_name_buf : NULL;
 
@@ -2794,7 +2800,7 @@ static int do_write_index(struct index_state *istate, struct tempfile *tempfile,
 
 			drop_cache_tree = 1;
 		}
-		if (ieot && i && (i % ieot_work == 0)) {
+		if (ieot && i && (i % ieot_entries == 0)) {
 			ieot->entries[ieot->nr].nr = nr;
 			ieot->entries[ieot->nr].offset = offset;
 			ieot->nr++;
@@ -2806,7 +2812,12 @@ static int do_write_index(struct index_state *istate, struct tempfile *tempfile,
 			if (previous_name)
 				previous_name->buf[0] = 0;
 			nr = 0;
-			offset = lseek(newfd, 0, SEEK_CUR) + write_buffer_len;
+			offset = lseek(newfd, 0, SEEK_CUR);
+			if (offset < 0) {
+				free(ieot);
+				return -1;
+			}
+			offset += write_buffer_len;
 		}
 		if (ce_write_entry(&c, newfd, ce, previous_name, (struct ondisk_cache_entry *)&ondisk) < 0)
 			err = -1;
@@ -2822,16 +2833,23 @@ static int do_write_index(struct index_state *istate, struct tempfile *tempfile,
 	}
 	strbuf_release(&previous_name_buf);
 
-	if (err)
+	if (err) {
+		free(ieot);
 		return err;
+	}
 
 	/* Write extension data here */
-	offset = lseek(newfd, 0, SEEK_CUR) + write_buffer_len;
+	offset = lseek(newfd, 0, SEEK_CUR);
+	if (offset < 0) {
+		free(ieot);
+		return -1;
+	}
+	offset += write_buffer_len;
 	the_hash_algo->init_fn(&eoie_c);
 
 	/*
 	 * Lets write out CACHE_EXT_INDEXENTRYOFFSETTABLE first so that we
-	 * can minimze the number of extensions we have to scan through to
+	 * can minimize the number of extensions we have to scan through to
 	 * find it during load.  Write it out regardless of the
 	 * strip_extensions parameter as we need it when loading the shared
 	 * index.
@@ -2844,6 +2862,7 @@ static int do_write_index(struct index_state *istate, struct tempfile *tempfile,
 		err = write_index_ext_header(&c, &eoie_c, newfd, CACHE_EXT_INDEXENTRYOFFSETTABLE, sb.len) < 0
 			|| ce_write(&c, newfd, sb.buf, sb.len) < 0;
 		strbuf_release(&sb);
+		free(ieot);
 		if (err)
 			return -1;
 	}
@@ -3460,14 +3479,18 @@ static struct index_entry_offset_table *read_ieot_extension(const char *mmap, si
 
        /* validate the version is IEOT_VERSION */
        ext_version = get_be32(index);
-       if (ext_version != IEOT_VERSION)
+       if (ext_version != IEOT_VERSION) {
+	       error("invalid IEOT version %d", ext_version);
 	       return NULL;
+       }
        index += sizeof(uint32_t);
 
        /* extension size - version bytes / bytes per entry */
        nr = (extsize - sizeof(uint32_t)) / (sizeof(uint32_t) + sizeof(uint32_t));
-       if (!nr)
+       if (!nr) {
+	       error("invalid number of IEOT entries %d", nr);
 	       return NULL;
+       }
        ieot = xmalloc(sizeof(struct index_entry_offset_table)
 	       + (nr * sizeof(struct index_entry_offset)));
        ieot->nr = nr;


### Patches

Ben Peart (6):
  read-cache: clean up casting and byte decoding
  eoie: add End of Index Entry (EOIE) extension
  config: add new index.threads config setting
  read-cache: load cache extensions on a worker thread
  ieot: add Index Entry Offset Table (IEOT) extension
  read-cache: load cache entries on worker threads

Nguyễn Thái Ngọc Duy (1):
  read-cache.c: optimize reading index format v4

 Documentation/config.txt                 |   7 +
 Documentation/technical/index-format.txt |  41 ++
 config.c                                 |  18 +
 config.h                                 |   1 +
 read-cache.c                             | 774 +++++++++++++++++++----
 t/README                                 |   5 +
 t/t1700-split-index.sh                   |  13 +-
 7 files changed, 739 insertions(+), 120 deletions(-)


base-commit: fe8321ec057f9231c26c29b364721568e58040f7
-- 
2.18.0.windows.1





[Index of Archives]     [Linux Kernel Development]     [Gcc Help]     [IETF Annouce]     [DCCP]     [Netdev]     [Networking]     [Security]     [V4L]     [Bugtraq]     [Yosemite]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux RAID]     [Linux SCSI]     [Fedora Users]

  Powered by Linux