On Mon, Oct 1, 2018 at 3:46 PM Ben Peart <peartben@xxxxxxxxx> wrote: > +/* > + * A helper function that will load the specified range of cache entries > + * from the memory mapped file and add them to the given index. > + */ > +static unsigned long load_cache_entry_block(struct index_state *istate, > + struct mem_pool *ce_mem_pool, int offset, int nr, const char *mmap, Please use unsigned long for offset (here and in the thread_data struct). We should use off_t instead, but that's out of scope. At least keep offset type consistent in here. > + unsigned long start_offset, const struct cache_entry *previous_ce) I don't think you want to pass previous_ce in. You always pass NULL anyway. And if this function is about loading a block (i.e. at block boundary) then initial previous_ce _must_ be NULL or things break horribly. > @@ -1959,20 +2007,125 @@ static void *load_index_extensions(void *_data) > > #define THREAD_COST (10000) > > +struct load_cache_entries_thread_data > +{ > + pthread_t pthread; > + struct index_state *istate; > + struct mem_pool *ce_mem_pool; > + int offset; > + const char *mmap; > + struct index_entry_offset_table *ieot; > + int ieot_offset; /* starting index into the ieot array */ If it's an index, maybe just name it ieot_index and we can get rid of the comment. > + int ieot_work; /* count of ieot entries to process */ Maybe instead of saving the whole "ieot" table here. Add struct index_entry_offset *blocks; which points to the starting block for this thread and rename that mysterious (to me) ieot_work to nr_blocks. The thread will have access from blocks[0] to blocks[nr_blocks - 1] > + unsigned long consumed; /* return # of bytes in index file processed */ > +}; > + > +/* > + * A thread proc to run the load_cache_entries() computation > + * across multiple background threads. > + */ > +static void *load_cache_entries_thread(void *_data) > +{ > + struct load_cache_entries_thread_data *p = _data; > + int i; > + > + /* iterate across all ieot blocks assigned to this thread */ > + for (i = p->ieot_offset; i < p->ieot_offset + p->ieot_work; i++) { > + p->consumed += load_cache_entry_block(p->istate, p->ce_mem_pool, p->offset, p->ieot->entries[i].nr, p->mmap, p->ieot->entries[i].offset, NULL); Please wrap this long line. > + p->offset += p->ieot->entries[i].nr; > + } > + return NULL; > +} > + > +static unsigned long load_cache_entries_threaded(struct index_state *istate, const char *mmap, size_t mmap_size, > + unsigned long src_offset, int nr_threads, struct index_entry_offset_table *ieot) > +{ > + int i, offset, ieot_work, ieot_offset, err; > + struct load_cache_entries_thread_data *data; > + unsigned long consumed = 0; > + int nr; > + > + /* a little sanity checking */ > + if (istate->name_hash_initialized) > + BUG("the name hash isn't thread safe"); > + > + mem_pool_init(&istate->ce_mem_pool, 0); > + data = xcalloc(nr_threads, sizeof(struct load_cache_entries_thread_data)); we normally use sizeof(*data) instead of sizeof(struct ...) > + > + /* ensure we have no more threads than we have blocks to process */ > + if (nr_threads > ieot->nr) > + nr_threads = ieot->nr; > + data = xcalloc(nr_threads, sizeof(struct load_cache_entries_thread_data)); eh.. reallocate the same "data"? > + > + offset = ieot_offset = 0; > + ieot_work = DIV_ROUND_UP(ieot->nr, nr_threads); > + for (i = 0; i < nr_threads; i++) { > + struct load_cache_entries_thread_data *p = &data[i]; > + int j; > + > + if (ieot_offset + ieot_work > ieot->nr) > + ieot_work = ieot->nr - ieot_offset; > + > + p->istate = istate; > + p->offset = offset; > + p->mmap = mmap; > + p->ieot = ieot; > + p->ieot_offset = ieot_offset; > + p->ieot_work = ieot_work; > + > + /* create a mem_pool for each thread */ > + nr = 0; Since nr is only used in this for loop. Declare it in this scope instead of declaring it for the whole function. > + for (j = p->ieot_offset; j < p->ieot_offset + p->ieot_work; j++) > + nr += p->ieot->entries[j].nr; > + if (istate->version == 4) { > + mem_pool_init(&p->ce_mem_pool, > + estimate_cache_size_from_compressed(nr)); > + } > + else { > + mem_pool_init(&p->ce_mem_pool, > + estimate_cache_size(mmap_size, nr)); > + } Maybe keep this mem_pool_init code inside load_cache_entries_thread(), similar to how you do it for load_cache_entries_thread(). It's mostly to keep this loop shorter to see (and understand), of course parallelizing this mem_pool_init() is just noise. > + > + err = pthread_create(&p->pthread, NULL, load_cache_entries_thread, p); > + if (err) > + die(_("unable to create load_cache_entries thread: %s"), strerror(err)); > + > + /* increment by the number of cache entries in the ieot block being processed */ > + for (j = 0; j < ieot_work; j++) > + offset += ieot->entries[ieot_offset + j].nr; I wonder if it makes things simpler if you store cache_entry _index_ in entrie[] array instead of storing the number of entries. You can easily calculate nr then by doing entries[i].index - entries[i-1].index. And you can count multiple blocks the same way, without looping like this. > + ieot_offset += ieot_work; > + } > + > + for (i = 0; i < nr_threads; i++) { > + struct load_cache_entries_thread_data *p = &data[i]; > + > + err = pthread_join(p->pthread, NULL); > + if (err) > + die(_("unable to join load_cache_entries thread: %s"), strerror(err)); > + mem_pool_combine(istate->ce_mem_pool, p->ce_mem_pool); > + consumed += p->consumed; > + } > + > + free(data); > + > + return consumed; > +} > +#endif > + > /* remember to discard_cache() before reading a different cache! */ > int do_read_index(struct index_state *istate, const char *path, int must_exist) > { > - int fd, i; > + int fd; > struct stat st; > unsigned long src_offset; > const struct cache_header *hdr; > const char *mmap; > size_t mmap_size; > - const struct cache_entry *previous_ce = NULL; > struct load_index_extensions p; > size_t extension_offset = 0; > #ifndef NO_PTHREADS > - int nr_threads; > + int nr_threads, cpus; > + struct index_entry_offset_table *ieot = NULL; > #endif > > if (istate->initialized) > @@ -2014,10 +2167,18 @@ int do_read_index(struct index_state *istate, const char *path, int must_exist) > p.mmap = mmap; > p.mmap_size = mmap_size; > > + src_offset = sizeof(*hdr); OK we've been doing this since forever, sizeof(struct cache_header) probably does not have extra padding on any supported platform. > @@ -2032,29 +2193,22 @@ int do_read_index(struct index_state *istate, const char *path, int must_exist) > nr_threads--; > } > } > -#endif > - > - if (istate->version == 4) { > - mem_pool_init(&istate->ce_mem_pool, > - estimate_cache_size_from_compressed(istate->cache_nr)); > - } else { > - mem_pool_init(&istate->ce_mem_pool, > - estimate_cache_size(mmap_size, istate->cache_nr)); > - } > > - src_offset = sizeof(*hdr); > - for (i = 0; i < istate->cache_nr; i++) { > - struct ondisk_cache_entry *disk_ce; > - struct cache_entry *ce; > - unsigned long consumed; > + /* > + * Locate and read the index entry offset table so that we can use it > + * to multi-thread the reading of the cache entries. > + */ > + if (extension_offset && nr_threads > 1) > + ieot = read_ieot_extension(mmap, mmap_size, extension_offset); You need to free ieot at some point. > > - disk_ce = (struct ondisk_cache_entry *)(mmap + src_offset); > - ce = create_from_disk(istate, disk_ce, &consumed, previous_ce); > - set_index_entry(istate, i, ce); > + if (ieot) > + src_offset += load_cache_entries_threaded(istate, mmap, mmap_size, src_offset, nr_threads, ieot); > + else > + src_offset += load_all_cache_entries(istate, mmap, mmap_size, src_offset); > +#else > + src_offset += load_all_cache_entries(istate, mmap, mmap_size, src_offset); > +#endif > > - src_offset += consumed; > - previous_ce = ce; > - } > istate->timestamp.sec = st.st_mtime; > istate->timestamp.nsec = ST_MTIME_NSEC(st); > > -- > 2.18.0.windows.1 > -- Duy