Re: [PATCH v2 1/3] read-cache: speed up index load through parallelization

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Wed, Aug 29, 2018 at 5:25 PM Ben Peart <Ben.Peart@xxxxxxxxxxxxx> wrote:
> diff --git a/read-cache.c b/read-cache.c
> index 7b1354d759..c30346388a 100644
> --- a/read-cache.c
> +++ b/read-cache.c
> @@ -1889,16 +1889,229 @@ static size_t estimate_cache_size(size_t ondisk_size, unsigned int entries)
>         return ondisk_size + entries * per_entry;
>  }
>
> +/*
> + * A helper function that will load the specified range of cache entries
> + * from the memory mapped file and add them to the given index.
> + */
> +static unsigned long load_cache_entry_block(struct index_state *istate,
> +                       struct mem_pool *ce_mem_pool, int offset, int nr, void *mmap,
> +                       unsigned long start_offset, struct strbuf *previous_name)
> +{
> +       int i;
> +       unsigned long src_offset = start_offset;
> +
> +       for (i = offset; i < offset + nr; i++) {

It may be micro optimization, but since we're looping a lot and can't
trust the compiler to optimize this, maybe just calculate this upper
limit and store in a local variable to make it clear the upper limit
is known, no point of recalculating it at every iteration.

> +               struct ondisk_cache_entry *disk_ce;
> +               struct cache_entry *ce;
> +               unsigned long consumed;
> +
> +               disk_ce = (struct ondisk_cache_entry *)((char *)mmap + src_offset);
> +               ce = create_from_disk(ce_mem_pool, disk_ce, &consumed, previous_name);
> +               set_index_entry(istate, i, ce);
> +
> +               src_offset += consumed;
> +       }
> +       return src_offset - start_offset;
> +}
> +
> +static unsigned long load_all_cache_entries(struct index_state *istate,
> +                       void *mmap, size_t mmap_size, unsigned long src_offset)
> +{
> +       struct strbuf previous_name_buf = STRBUF_INIT, *previous_name;
> +       unsigned long consumed;
> +
> +       if (istate->version == 4) {
> +               previous_name = &previous_name_buf;
> +               mem_pool_init(&istate->ce_mem_pool,
> +                             estimate_cache_size_from_compressed(istate->cache_nr));
> +       } else {
> +               previous_name = NULL;
> +               mem_pool_init(&istate->ce_mem_pool,
> +                             estimate_cache_size(mmap_size, istate->cache_nr));
> +       }
> +
> +       consumed = load_cache_entry_block(istate, istate->ce_mem_pool,
> +                                       0, istate->cache_nr, mmap, src_offset, previous_name);
> +       strbuf_release(&previous_name_buf);
> +       return consumed;
> +}
> +
> +#ifdef NO_PTHREADS
> +
> +#define load_cache_entries load_all_cache_entries
> +
> +#else
> +
> +#include "thread-utils.h"

Don't include files in a middle of a file.

> +
> +/*
> +* Mostly randomly chosen maximum thread counts: we
> +* cap the parallelism to online_cpus() threads, and we want
> +* to have at least 7500 cache entries per thread for it to
> +* be worth starting a thread.
> +*/
> +#define THREAD_COST            (7500)

Isn't 7500 a bit too low? I'm still basing on webkit.git,  and 7500
entries take about 1.2ms on average. 100k files would take about 16ms
and may be more reasonable (still too low in my opinion).

> +
> +struct load_cache_entries_thread_data
> +{
> +       pthread_t pthread;
> +       struct index_state *istate;
> +       struct mem_pool *ce_mem_pool;
> +       int offset, nr;
> +       void *mmap;
> +       unsigned long start_offset;
> +       struct strbuf previous_name_buf;
> +       struct strbuf *previous_name;
> +       unsigned long consumed; /* return # of bytes in index file processed */
> +};
> +
> +/*
> +* A thread proc to run the load_cache_entries() computation
> +* across multiple background threads.
> +*/
> +static void *load_cache_entries_thread(void *_data)
> +{
> +       struct load_cache_entries_thread_data *p = _data;
> +
> +       p->consumed += load_cache_entry_block(p->istate, p->ce_mem_pool,
> +               p->offset, p->nr, p->mmap, p->start_offset, p->previous_name);
> +       return NULL;
> +}
> +
> +static unsigned long load_cache_entries(struct index_state *istate,
> +                       void *mmap, size_t mmap_size, unsigned long src_offset)
> +{
> +       struct strbuf previous_name_buf = STRBUF_INIT, *previous_name;
> +       struct load_cache_entries_thread_data *data;
> +       int nr_threads, cpus, ce_per_thread;
> +       unsigned long consumed;
> +       int i, thread;
> +
> +       nr_threads = git_config_get_index_threads();
> +       if (!nr_threads) {
> +               cpus = online_cpus();
> +               nr_threads = istate->cache_nr / THREAD_COST;
> +               if (nr_threads > cpus)
> +                       nr_threads = cpus;
> +       }
> +
> +       /* enable testing with fewer than default minimum of entries */
> +       if ((istate->cache_nr > 1) && (nr_threads < 2) && git_env_bool("GIT_INDEX_THREADS_TEST", 0))
> +               nr_threads = 2;

Please don't add more '()' than necessary. It's just harder to read.
Maybe break that "if" into two lines since it's getting long.

> +
> +       if (nr_threads < 2)
> +               return load_all_cache_entries(istate, mmap, mmap_size, src_offset);
> +
> +       /* a little sanity checking */
> +       if (istate->name_hash_initialized)
> +               die("the name hash isn't thread safe");
> +
> +       mem_pool_init(&istate->ce_mem_pool, 0);
> +       if (istate->version == 4)
> +               previous_name = &previous_name_buf;
> +       else
> +               previous_name = NULL;
> +
> +       ce_per_thread = DIV_ROUND_UP(istate->cache_nr, nr_threads);
> +       data = xcalloc(nr_threads, sizeof(struct load_cache_entries_thread_data));
> +
> +       /*
> +        * Loop through index entries starting a thread for every ce_per_thread
> +        * entries. Exit the loop when we've created the final thread (no need
> +        * to parse the remaining entries.
> +        */
> +       consumed = thread = 0;
> +       for (i = 0; ; i++) {
> +               struct ondisk_cache_entry *ondisk;
> +               const char *name;
> +               unsigned int flags;
> +
> +               /*
> +                * we've reached the beginning of a block of cache entries,
> +                * kick off a thread to process them
> +                */
> +               if (0 == i % ce_per_thread) {

I don't get why people keep putting constants in reversed order like
this. Perhaps in the old days, it helps catch "a = 0" mistakes, but
compilers nowadays are smart enough to complain about that and this is
just hard to read.

> +                       struct load_cache_entries_thread_data *p = &data[thread];
> +
> +                       p->istate = istate;
> +                       p->offset = i;
> +                       p->nr = ce_per_thread < istate->cache_nr - i ? ce_per_thread : istate->cache_nr - i;
> +
> +                       /* create a mem_pool for each thread */
> +                       if (istate->version == 4)
> +                               mem_pool_init(&p->ce_mem_pool,
> +                                                 estimate_cache_size_from_compressed(p->nr));
> +                       else
> +                               mem_pool_init(&p->ce_mem_pool,
> +                                                 estimate_cache_size(mmap_size, p->nr));
> +
> +                       p->mmap = mmap;
> +                       p->start_offset = src_offset;
> +                       if (previous_name) {
> +                               strbuf_addbuf(&p->previous_name_buf, previous_name);
> +                               p->previous_name = &p->previous_name_buf;
> +                       }
> +
> +                       if (pthread_create(&p->pthread, NULL, load_cache_entries_thread, p))
> +                               die("unable to create load_cache_entries_thread");
> +
> +                       /* exit the loop when we've created the last thread */
> +                       if (++thread == nr_threads)
> +                               break;

I still think it's better to have an extension to avoid looping
through like this. How much time does this "for (i = 0; ; i++)" loop
cost? The first thread can't start until you've scanned to the second
block, when you have zillion of entries and about 4 cores, that could
be significant delay. Unless you break smaller blocks and have one
thread handles multiple blocks, but then you pay the cost for
synchronization. Other threads may overlap a bit, but starting all
threads at the same time would benefit more. You also can't start
loading the extensions until you've scanned through all this.

> +               }
> +
> +               ondisk = (struct ondisk_cache_entry *)((char *)mmap + src_offset);
> +
> +               /* On-disk flags are just 16 bits */
> +               flags = get_be16(&ondisk->flags);
> +
> +               if (flags & CE_EXTENDED) {
> +                       struct ondisk_cache_entry_extended *ondisk2;
> +                       ondisk2 = (struct ondisk_cache_entry_extended *)ondisk;
> +                       name = ondisk2->name;
> +               } else
> +                       name = ondisk->name;
> +
> +               if (!previous_name) {
> +                       size_t len;
> +
> +                       /* v3 and earlier */
> +                       len = flags & CE_NAMEMASK;
> +                       if (len == CE_NAMEMASK)
> +                               len = strlen(name);
> +                       src_offset += (flags & CE_EXTENDED) ?
> +                               ondisk_cache_entry_extended_size(len) :
> +                               ondisk_cache_entry_size(len);
> +               } else
> +                       src_offset += (name - ((char *)ondisk)) + expand_name_field(previous_name, name);
> +       }
> +
> +       for (i = 0; i < nr_threads; i++) {
> +               struct load_cache_entries_thread_data *p = data + i;
> +               if (pthread_join(p->pthread, NULL))
> +                       die("unable to join load_cache_entries_thread");

_()

> +               mem_pool_combine(istate->ce_mem_pool, p->ce_mem_pool);
> +               strbuf_release(&p->previous_name_buf);
> +               consumed += p->consumed;
> +       }
> +
> +       free(data);
> +       strbuf_release(&previous_name_buf);
> +
> +       return consumed;
> +}
> +
> +#endif
> +
-- 
Duy



[Index of Archives]     [Linux Kernel Development]     [Gcc Help]     [IETF Annouce]     [DCCP]     [Netdev]     [Networking]     [Security]     [V4L]     [Bugtraq]     [Yosemite]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux RAID]     [Linux SCSI]     [Fedora Users]

  Powered by Linux