The current method consists of a master thread serving chunks of objects to work threads when they're done with their previous chunk. The issue is to determine the best chunk size: making it too large creates poor load balancing, while making it too small has a negative effect on pack size because of the increased number of chunk boundaries and poor delta window utilization. This patch implements a completely different approach by initially splitting the work in large chunks uniformly amongst all threads, and whenever a thread is done then it steals half of the remaining work from another thread with the largest amount of unprocessed objects. This has the advantage of greatly reducing the number of chunk boundaries with an almost perfect load balancing. Signed-off-by: Nicolas Pitre <nico@xxxxxxx> --- builtin-pack-objects.c | 117 +++++++++++++++++++++++++++++++++++------------- 1 files changed, 85 insertions(+), 32 deletions(-) diff --git a/builtin-pack-objects.c b/builtin-pack-objects.c index 5002cc6..fcc1901 100644 --- a/builtin-pack-objects.c +++ b/builtin-pack-objects.c @@ -1479,10 +1479,10 @@ static unsigned long free_unpacked(struct unpacked *n) return freed_mem; } -static void find_deltas(struct object_entry **list, unsigned list_size, +static void find_deltas(struct object_entry **list, unsigned *list_size, int window, int depth, unsigned *processed) { - uint32_t i = 0, idx = 0, count = 0; + uint32_t i, idx = 0, count = 0; unsigned int array_size = window * sizeof(struct unpacked); struct unpacked *array; unsigned long mem_usage = 0; @@ -1490,11 +1490,23 @@ static void find_deltas(struct object_entry **list, unsigned list_size, array = xmalloc(array_size); memset(array, 0, array_size); - do { - struct object_entry *entry = list[i++]; + for (;;) { + struct object_entry *entry = *list++; struct unpacked *n = array + idx; int j, max_depth, best_base = -1; + progress_lock(); + if (!*list_size) { + progress_unlock(); + break; + } + (*list_size)--; + if (!entry->preferred_base) { + (*processed)++; + display_progress(progress_state, *processed); + } + progress_unlock(); + mem_usage -= free_unpacked(n); n->entry = entry; @@ -1512,11 +1524,6 @@ static void find_deltas(struct object_entry **list, unsigned list_size, if (entry->preferred_base) goto next; - progress_lock(); - (*processed)++; - display_progress(progress_state, *processed); - progress_unlock(); - /* * If the current object is at pack edge, take the depth the * objects that depend on the current object into account @@ -1576,7 +1583,7 @@ static void find_deltas(struct object_entry **list, unsigned list_size, count++; if (idx >= window) idx = 0; - } while (i < list_size); + } for (i = 0; i < window; ++i) { free_delta_index(array[i].index); @@ -1591,6 +1598,7 @@ struct thread_params { pthread_t thread; struct object_entry **list; unsigned list_size; + unsigned remaining; int window; int depth; unsigned *processed; @@ -1612,10 +1620,10 @@ static void *threaded_find_deltas(void *arg) pthread_mutex_lock(&data_ready); pthread_mutex_unlock(&data_request); - if (!me->list_size) + if (!me->remaining) return NULL; - find_deltas(me->list, me->list_size, + find_deltas(me->list, &me->remaining, me->window, me->depth, me->processed); } } @@ -1624,57 +1632,102 @@ static void ll_find_deltas(struct object_entry **list, unsigned list_size, int window, int depth, unsigned *processed) { struct thread_params *target, p[delta_search_threads]; - int i, ret; - unsigned chunk_size; + int i, ret, active_threads = 0; if (delta_search_threads <= 1) { - find_deltas(list, list_size, window, depth, processed); + find_deltas(list, &list_size, window, depth, processed); return; } pthread_mutex_lock(&data_provider); pthread_mutex_lock(&data_ready); + /* Start work threads. */ for (i = 0; i < delta_search_threads; i++) { p[i].window = window; p[i].depth = depth; p[i].processed = processed; + p[i].remaining = 0; ret = pthread_create(&p[i].thread, NULL, threaded_find_deltas, &p[i]); if (ret) die("unable to create thread: %s", strerror(ret)); + active_threads++; } - /* this should be auto-tuned somehow */ - chunk_size = window * 1000; + /* Then partition the work amongst them. */ + for (i = 0; i < delta_search_threads; i++) { + unsigned sub_size = list_size / (delta_search_threads - i); - do { - unsigned sublist_size = chunk_size; - if (sublist_size > list_size) - sublist_size = list_size; + pthread_mutex_lock(&data_provider); + target = data_requester; + if (!sub_size) { + pthread_mutex_unlock(&data_ready); + pthread_join(target->thread, NULL); + active_threads--; + continue; + } /* try to split chunks on "path" boundaries */ - while (sublist_size < list_size && list[sublist_size]->hash && - list[sublist_size]->hash == list[sublist_size-1]->hash) - sublist_size++; + while (sub_size < list_size && list[sub_size]->hash && + list[sub_size]->hash == list[sub_size-1]->hash) + sub_size++; + + target->list = list; + target->list_size = sub_size; + target->remaining = sub_size; + pthread_mutex_unlock(&data_ready); + list += sub_size; + list_size -= sub_size; + } + + /* + * Now let's wait for work completion. Each time a thread is done + * with its work, we steal half of the remaining work from the + * thread with the largest number of unprocessed objects and give + * it to that newly idle thread. This ensure good load balancing + * until the remaining object list segments are simply too short + * to be worth splitting anymore. + */ + do { + struct thread_params *victim = NULL; + unsigned sub_size = 0; pthread_mutex_lock(&data_provider); target = data_requester; - target->list = list; - target->list_size = sublist_size; + + progress_lock(); + for (i = 0; i < delta_search_threads; i++) + if (p[i].remaining > 2*window && + (!victim || victim->remaining < p[i].remaining)) + victim = &p[i]; + if (victim) { + sub_size = victim->remaining / 2; + list = victim->list + victim->list_size - sub_size; + while (sub_size && list[0]->hash && + list[0]->hash == list[-1]->hash) { + list++; + sub_size--; + } + target->list = list; + victim->list_size -= sub_size; + victim->remaining -= sub_size; + } + progress_unlock(); + + target->list_size = sub_size; + target->remaining = sub_size; pthread_mutex_unlock(&data_ready); - list += sublist_size; - list_size -= sublist_size; - if (!sublist_size) { + if (!sub_size) { pthread_join(target->thread, NULL); - i--; + active_threads--; } - } while (i); + } while (active_threads); } #else -#define ll_find_deltas find_deltas +#define ll_find_deltas(l, s, w, d, p) find_deltas(l, &s, w, d, p) #endif static void prepare_pack(int window, int depth) -- 1.5.3.7.2184.ge321d-dirty - To unsubscribe from this list: send the line "unsubscribe git" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html