Re: [PATCH 2/2] pack-objects: fix threaded load balancing

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On 12/8/07, Nicolas Pitre <nico@xxxxxxx> wrote:
>
> The current method consists of a master thread serving chunks of objects
> to work threads when they're done with their previous chunk.  The issue
> is to determine the best chunk size: making it too large creates poor
> load balancing, while making it too small has a negative effect on pack
> size because of the increased number of chunk boundaries and poor delta
> window utilization.
>
> This patch implements a completely different approach by initially
> splitting the work in large chunks uniformly amongst all threads, and
> whenever a thread is done then it steals half of the remaining work from
> another thread with the largest amount of unprocessed objects.
>
> This has the advantage of greatly reducing the number of chunk boundaries
> with an almost perfect load balancing.
>
> Signed-off-by: Nicolas Pitre <nico@xxxxxxx>
> ---
>  builtin-pack-objects.c |  117 +++++++++++++++++++++++++++++++++++-------------
>  1 files changed, 85 insertions(+), 32 deletions(-)
>
> diff --git a/builtin-pack-objects.c b/builtin-pack-objects.c
> index 5002cc6..fcc1901 100644
> --- a/builtin-pack-objects.c
> +++ b/builtin-pack-objects.c
> @@ -1479,10 +1479,10 @@ static unsigned long free_unpacked(struct unpacked *n)
>         return freed_mem;
>  }
>
> -static void find_deltas(struct object_entry **list, unsigned list_size,
> +static void find_deltas(struct object_entry **list, unsigned *list_size,
>                         int window, int depth, unsigned *processed)
>  {
> -       uint32_t i = 0, idx = 0, count = 0;
> +       uint32_t i, idx = 0, count = 0;
>         unsigned int array_size = window * sizeof(struct unpacked);
>         struct unpacked *array;
>         unsigned long mem_usage = 0;
> @@ -1490,11 +1490,23 @@ static void find_deltas(struct object_entry **list, unsigned list_size,
>         array = xmalloc(array_size);
>         memset(array, 0, array_size);
>
> -       do {
> -               struct object_entry *entry = list[i++];
> +       for (;;) {
> +               struct object_entry *entry = *list++;
>                 struct unpacked *n = array + idx;
>                 int j, max_depth, best_base = -1;
>
> +               progress_lock();
> +               if (!*list_size) {
> +                       progress_unlock();
> +                       break;
> +               }
> +               (*list_size)--;
> +               if (!entry->preferred_base) {
> +                       (*processed)++;
> +                       display_progress(progress_state, *processed);
> +               }
> +               progress_unlock();
> +
>                 mem_usage -= free_unpacked(n);
>                 n->entry = entry;
>
> @@ -1512,11 +1524,6 @@ static void find_deltas(struct object_entry **list, unsigned list_size,
>                 if (entry->preferred_base)
>                         goto next;
>
> -               progress_lock();
> -               (*processed)++;
> -               display_progress(progress_state, *processed);
> -               progress_unlock();
> -
>                 /*
>                  * If the current object is at pack edge, take the depth the
>                  * objects that depend on the current object into account
> @@ -1576,7 +1583,7 @@ static void find_deltas(struct object_entry **list, unsigned list_size,
>                         count++;
>                 if (idx >= window)
>                         idx = 0;
> -       } while (i < list_size);
> +       }
>
>         for (i = 0; i < window; ++i) {
>                 free_delta_index(array[i].index);
> @@ -1591,6 +1598,7 @@ struct thread_params {
>         pthread_t thread;
>         struct object_entry **list;
>         unsigned list_size;
> +       unsigned remaining;
>         int window;
>         int depth;
>         unsigned *processed;
> @@ -1612,10 +1620,10 @@ static void *threaded_find_deltas(void *arg)
>                 pthread_mutex_lock(&data_ready);
>                 pthread_mutex_unlock(&data_request);
>
> -               if (!me->list_size)
> +               if (!me->remaining)
>                         return NULL;
>
> -               find_deltas(me->list, me->list_size,
> +               find_deltas(me->list, &me->remaining,
>                             me->window, me->depth, me->processed);
>         }
>  }
> @@ -1624,57 +1632,102 @@ static void ll_find_deltas(struct object_entry **list, unsigned list_size,
>                            int window, int depth, unsigned *processed)
>  {
>         struct thread_params *target, p[delta_search_threads];
> -       int i, ret;
> -       unsigned chunk_size;
> +       int i, ret, active_threads = 0;
>
>         if (delta_search_threads <= 1) {
> -               find_deltas(list, list_size, window, depth, processed);
> +               find_deltas(list, &list_size, window, depth, processed);
>                 return;
>         }
>
>         pthread_mutex_lock(&data_provider);
>         pthread_mutex_lock(&data_ready);
>
> +       /* Start work threads. */
>         for (i = 0; i < delta_search_threads; i++) {
>                 p[i].window = window;
>                 p[i].depth = depth;
>                 p[i].processed = processed;
> +               p[i].remaining = 0;
>                 ret = pthread_create(&p[i].thread, NULL,
>                                      threaded_find_deltas, &p[i]);
>                 if (ret)
>                         die("unable to create thread: %s", strerror(ret));
> +               active_threads++;
>         }
>
> -       /* this should be auto-tuned somehow */
> -       chunk_size = window * 1000;
> +       /* Then partition the work amongst them. */
> +       for (i = 0; i < delta_search_threads; i++) {
> +               unsigned sub_size = list_size / (delta_search_threads - i);
>
> -       do {
> -               unsigned sublist_size = chunk_size;
> -               if (sublist_size > list_size)
> -                       sublist_size = list_size;
> +               pthread_mutex_lock(&data_provider);
> +               target = data_requester;
> +               if (!sub_size) {
> +                       pthread_mutex_unlock(&data_ready);
> +                       pthread_join(target->thread, NULL);
> +                       active_threads--;
> +                       continue;
> +               }
>
>                 /* try to split chunks on "path" boundaries */
> -               while (sublist_size < list_size && list[sublist_size]->hash &&
> -                      list[sublist_size]->hash == list[sublist_size-1]->hash)
> -                       sublist_size++;
> +               while (sub_size < list_size && list[sub_size]->hash &&
> +                      list[sub_size]->hash == list[sub_size-1]->hash)
> +                       sub_size++;
> +
> +               target->list = list;
> +               target->list_size = sub_size;
> +               target->remaining = sub_size;
> +               pthread_mutex_unlock(&data_ready);
>
> +               list += sub_size;
> +               list_size -= sub_size;
> +       }
> +
> +       /*
> +        * Now let's wait for work completion.  Each time a thread is done
> +        * with its work, we steal half of the remaining work from the
> +        * thread with the largest number of unprocessed objects and give
> +        * it to that newly idle thread.  This ensure good load balancing
> +        * until the remaining object list segments are simply too short
> +        * to be worth splitting anymore.
> +        */
> +       do {
> +               struct thread_params *victim = NULL;
> +               unsigned sub_size = 0;
>                 pthread_mutex_lock(&data_provider);
>                 target = data_requester;
> -               target->list = list;
> -               target->list_size = sublist_size;
> +
> +               progress_lock();
> +               for (i = 0; i < delta_search_threads; i++)
> +                       if (p[i].remaining > 2*window &&
> +                           (!victim || victim->remaining < p[i].remaining))
> +                               victim = &p[i];
> +               if (victim) {
> +                       sub_size = victim->remaining / 2;
> +                       list = victim->list + victim->list_size - sub_size;
> +                       while (sub_size && list[0]->hash &&
> +                              list[0]->hash == list[-1]->hash) {
> +                               list++;

I think you needed to copy sub_size to another variable for this loop

> +                               sub_size--;
> +                       }
> +                       target->list = list;
> +                       victim->list_size -= sub_size;
> +                       victim->remaining -= sub_size;
> +               }
> +               progress_unlock();
> +
> +               target->list_size = sub_size;
> +               target->remaining = sub_size;
>                 pthread_mutex_unlock(&data_ready);
>
> -               list += sublist_size;
> -               list_size -= sublist_size;
> -               if (!sublist_size) {
> +               if (!sub_size) {
>                         pthread_join(target->thread, NULL);
> -                       i--;
> +                       active_threads--;
>                 }
> -       } while (i);
> +       } while (active_threads);
>  }
>
>  #else
> -#define ll_find_deltas find_deltas
> +#define ll_find_deltas(l, s, w, d, p)  find_deltas(l, &s, w, d, p)
>  #endif
>
>  static void prepare_pack(int window, int depth)
> --
> 1.5.3.7.2184.ge321d-dirty
>
>


-- 
Jon Smirl
jonsmirl@xxxxxxxxx
-
To unsubscribe from this list: send the line "unsubscribe git" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html

[Index of Archives]     [Linux Kernel Development]     [Gcc Help]     [IETF Annouce]     [DCCP]     [Netdev]     [Networking]     [Security]     [V4L]     [Bugtraq]     [Yosemite]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux RAID]     [Linux SCSI]     [Fedora Users]

  Powered by Linux