Re: [PATCH v7 7/7] read-cache: load cache entries on worker threads

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Mon, Oct 1, 2018 at 3:46 PM Ben Peart <peartben@xxxxxxxxx> wrote:
> +/*
> + * A helper function that will load the specified range of cache entries
> + * from the memory mapped file and add them to the given index.
> + */
> +static unsigned long load_cache_entry_block(struct index_state *istate,
> +                       struct mem_pool *ce_mem_pool, int offset, int nr, const char *mmap,

Please use unsigned long for offset (here and in the thread_data
struct). We should use off_t instead, but that's out of scope. At
least keep offset type consistent in here.

> +                       unsigned long start_offset, const struct cache_entry *previous_ce)

I don't think you want to pass previous_ce in. You always pass NULL
anyway. And if this function is about loading a block (i.e. at block
boundary) then initial previous_ce _must_ be NULL or things break
horribly.

> @@ -1959,20 +2007,125 @@ static void *load_index_extensions(void *_data)
>
>  #define THREAD_COST            (10000)
>
> +struct load_cache_entries_thread_data
> +{
> +       pthread_t pthread;
> +       struct index_state *istate;
> +       struct mem_pool *ce_mem_pool;
> +       int offset;
> +       const char *mmap;
> +       struct index_entry_offset_table *ieot;
> +       int ieot_offset;        /* starting index into the ieot array */

If it's an index, maybe just name it ieot_index and we can get rid of
the comment.

> +       int ieot_work;          /* count of ieot entries to process */

Maybe instead of saving the whole "ieot" table here. Add

     struct index_entry_offset *blocks;

which points to the starting block for this thread and rename that
mysterious (to me) ieot_work to nr_blocks. The thread will have access
from blocks[0] to blocks[nr_blocks - 1]

> +       unsigned long consumed; /* return # of bytes in index file processed */
> +};
> +
> +/*
> + * A thread proc to run the load_cache_entries() computation
> + * across multiple background threads.
> + */
> +static void *load_cache_entries_thread(void *_data)
> +{
> +       struct load_cache_entries_thread_data *p = _data;
> +       int i;
> +
> +       /* iterate across all ieot blocks assigned to this thread */
> +       for (i = p->ieot_offset; i < p->ieot_offset + p->ieot_work; i++) {
> +               p->consumed += load_cache_entry_block(p->istate, p->ce_mem_pool, p->offset, p->ieot->entries[i].nr, p->mmap, p->ieot->entries[i].offset, NULL);

Please wrap this long line.

> +               p->offset += p->ieot->entries[i].nr;
> +       }
> +       return NULL;
> +}
> +
> +static unsigned long load_cache_entries_threaded(struct index_state *istate, const char *mmap, size_t mmap_size,
> +                       unsigned long src_offset, int nr_threads, struct index_entry_offset_table *ieot)
> +{
> +       int i, offset, ieot_work, ieot_offset, err;
> +       struct load_cache_entries_thread_data *data;
> +       unsigned long consumed = 0;
> +       int nr;
> +
> +       /* a little sanity checking */
> +       if (istate->name_hash_initialized)
> +               BUG("the name hash isn't thread safe");
> +
> +       mem_pool_init(&istate->ce_mem_pool, 0);
> +       data = xcalloc(nr_threads, sizeof(struct load_cache_entries_thread_data));

we normally use sizeof(*data) instead of sizeof(struct ...)

> +
> +       /* ensure we have no more threads than we have blocks to process */
> +       if (nr_threads > ieot->nr)
> +               nr_threads = ieot->nr;
> +       data = xcalloc(nr_threads, sizeof(struct load_cache_entries_thread_data));

eh.. reallocate the same "data"?

> +
> +       offset = ieot_offset = 0;
> +       ieot_work = DIV_ROUND_UP(ieot->nr, nr_threads);
> +       for (i = 0; i < nr_threads; i++) {
> +               struct load_cache_entries_thread_data *p = &data[i];
> +               int j;
> +
> +               if (ieot_offset + ieot_work > ieot->nr)
> +                       ieot_work = ieot->nr - ieot_offset;
> +
> +               p->istate = istate;
> +               p->offset = offset;
> +               p->mmap = mmap;
> +               p->ieot = ieot;
> +               p->ieot_offset = ieot_offset;
> +               p->ieot_work = ieot_work;
> +
> +               /* create a mem_pool for each thread */
> +               nr = 0;

Since nr is only used in this for loop. Declare it in this scope
instead of declaring it for the whole function.

> +               for (j = p->ieot_offset; j < p->ieot_offset + p->ieot_work; j++)
> +                       nr += p->ieot->entries[j].nr;
> +               if (istate->version == 4) {
> +                       mem_pool_init(&p->ce_mem_pool,
> +                               estimate_cache_size_from_compressed(nr));
> +               }
> +               else {
> +                       mem_pool_init(&p->ce_mem_pool,
> +                               estimate_cache_size(mmap_size, nr));
> +               }

Maybe keep this mem_pool_init code inside load_cache_entries_thread(),
similar to how you do it for load_cache_entries_thread(). It's mostly
to keep this loop shorter to see (and understand), of course
parallelizing this mem_pool_init() is just noise.

> +
> +               err = pthread_create(&p->pthread, NULL, load_cache_entries_thread, p);
> +               if (err)
> +                       die(_("unable to create load_cache_entries thread: %s"), strerror(err));
> +
> +               /* increment by the number of cache entries in the ieot block being processed */
> +               for (j = 0; j < ieot_work; j++)
> +                       offset += ieot->entries[ieot_offset + j].nr;

I wonder if it makes things simpler if you store cache_entry _index_
in entrie[] array instead of storing the number of entries. You can
easily calculate nr then by doing entries[i].index -
entries[i-1].index. And you can count multiple blocks the same way,
without looping like this.

> +               ieot_offset += ieot_work;
> +       }
> +
> +       for (i = 0; i < nr_threads; i++) {
> +               struct load_cache_entries_thread_data *p = &data[i];
> +
> +               err = pthread_join(p->pthread, NULL);
> +               if (err)
> +                       die(_("unable to join load_cache_entries thread: %s"), strerror(err));
> +               mem_pool_combine(istate->ce_mem_pool, p->ce_mem_pool);
> +               consumed += p->consumed;
> +       }
> +
> +       free(data);
> +
> +       return consumed;
> +}
> +#endif
> +
>  /* remember to discard_cache() before reading a different cache! */
>  int do_read_index(struct index_state *istate, const char *path, int must_exist)
>  {
> -       int fd, i;
> +       int fd;
>         struct stat st;
>         unsigned long src_offset;
>         const struct cache_header *hdr;
>         const char *mmap;
>         size_t mmap_size;
> -       const struct cache_entry *previous_ce = NULL;
>         struct load_index_extensions p;
>         size_t extension_offset = 0;
>  #ifndef NO_PTHREADS
> -       int nr_threads;
> +       int nr_threads, cpus;
> +       struct index_entry_offset_table *ieot = NULL;
>  #endif
>
>         if (istate->initialized)
> @@ -2014,10 +2167,18 @@ int do_read_index(struct index_state *istate, const char *path, int must_exist)
>         p.mmap = mmap;
>         p.mmap_size = mmap_size;
>
> +       src_offset = sizeof(*hdr);

OK we've been doing this since forever, sizeof(struct cache_header)
probably does not have extra padding on any supported platform.

> @@ -2032,29 +2193,22 @@ int do_read_index(struct index_state *istate, const char *path, int must_exist)
>                         nr_threads--;
>                 }
>         }
> -#endif
> -
> -       if (istate->version == 4) {
> -               mem_pool_init(&istate->ce_mem_pool,
> -                             estimate_cache_size_from_compressed(istate->cache_nr));
> -       } else {
> -               mem_pool_init(&istate->ce_mem_pool,
> -                             estimate_cache_size(mmap_size, istate->cache_nr));
> -       }
>
> -       src_offset = sizeof(*hdr);
> -       for (i = 0; i < istate->cache_nr; i++) {
> -               struct ondisk_cache_entry *disk_ce;
> -               struct cache_entry *ce;
> -               unsigned long consumed;
> +       /*
> +        * Locate and read the index entry offset table so that we can use it
> +        * to multi-thread the reading of the cache entries.
> +        */
> +       if (extension_offset && nr_threads > 1)
> +               ieot = read_ieot_extension(mmap, mmap_size, extension_offset);

You need to free ieot at some point.

>
> -               disk_ce = (struct ondisk_cache_entry *)(mmap + src_offset);
> -               ce = create_from_disk(istate, disk_ce, &consumed, previous_ce);
> -               set_index_entry(istate, i, ce);
> +       if (ieot)
> +               src_offset += load_cache_entries_threaded(istate, mmap, mmap_size, src_offset, nr_threads, ieot);
> +       else
> +               src_offset += load_all_cache_entries(istate, mmap, mmap_size, src_offset);
> +#else
> +       src_offset += load_all_cache_entries(istate, mmap, mmap_size, src_offset);
> +#endif
>
> -               src_offset += consumed;
> -               previous_ce = ce;
> -       }
>         istate->timestamp.sec = st.st_mtime;
>         istate->timestamp.nsec = ST_MTIME_NSEC(st);
>
> --
> 2.18.0.windows.1
>
-- 
Duy



[Index of Archives]     [Linux Kernel Development]     [Gcc Help]     [IETF Annouce]     [DCCP]     [Netdev]     [Networking]     [Security]     [V4L]     [Bugtraq]     [Yosemite]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux RAID]     [Linux SCSI]     [Fedora Users]

  Powered by Linux