The bottom-up mergesort implementation needs to skip through sublists a lot. A recursive version could avoid that, but would require log2(n) stack frames. Explicitly manage a stack of sorted sublists of various lengths instead to avoid fast-forwarding while also keeping a lid on memory usage. While this patch was developed independently, a ranks stack is also used in https://github.com/mono/mono/blob/master/mono/eglib/sort.frag.h by the Mono project. The idea is to keep slots for log2(n_max) sorted sublists, one for each power of 2. Such a construct can accommodate lists of any length up to n_max. Since there is a known maximum number of items (effectively SIZE_MAX), we can preallocate the whole rank stack. We add items one by one, which is akin to incrementing a binary number. Make use of that by keeping track of the number of items and check bits in it instead of checking for NULL in the rank stack when checking if a sublist of a certain rank exists, in order to avoid memory accesses. The first item can go into the empty first slot as a sublist of length 2^0. The second one needs to be merged with the previous sublist and the result goes into the empty second slot as a sublist of length 2^1. The third one goes into vacated first slot and so on. At the end we merge all the sublists to get the result. The new version still performs a stable sort by making sure to put items seen earlier first when the compare function indicates equality. That's done by preferring items from sublists with a higher rank. The new merge function also tries to minimize the number of operations. Like blame.c::blame_merge(), the function doesn't set the next pointer if it already points to the right item, and it exits when it reaches the end of one of the two sublists that it's given. The old code couldn't do the latter because it kept all items in a single list. The number of comparisons stays the same, though. Here's example output of "test-tool mergesort test" for the rand distributions with the most number of comparisons with the ranks stack: $ t/helper/test-tool mergesort test | awk ' NR > 1 && $1 != "rand" {next} $7 > max[$3] {max[$3] = $7; line[$3] = $0} END {for (n in line) print line[n]} ' distribut mode n m get_next set_next compare verdict rand copy 100 32 669 420 569 OK rand dither 1023 64 9997 5396 8974 OK rand dither 1024 512 10007 6159 8983 OK rand dither 1025 256 10993 5988 9968 OK Here are the differences to the results without this patch: distribut mode n m get_next set_next compare rand copy 100 32 -515 -280 0 rand dither 1023 64 -6376 -4834 0 rand dither 1024 512 -6377 -4081 0 rand dither 1025 256 -7461 -5287 0 The numbers of get_next and set_next calls are reduced significantly. NB: These winners are different than the ones shown in the patch that introduced the unriffle mode because the addition of the unriffle_skewed mode in between changed the consumption of rand() values. Here are the distributions with the most comparisons overall with the ranks stack: $ t/helper/test-tool mergesort test | awk ' $7 > max[$3] {max[$3] = $7; line[$3] = $0} END {for (n in line) print line[n]} ' distribut mode n m get_next set_next compare verdict sawtooth unriffle_skewed 100 128 689 632 589 OK sawtooth unriffle_skewed 1023 1024 10230 10220 9207 OK sawtooth unriffle 1024 1024 10241 10240 9217 OK sawtooth unriffle_skewed 1025 2048 11266 10242 10241 OK And here the differences to before: distribut mode n m get_next set_next compare sawtooth unriffle_skewed 100 128 -495 -68 0 sawtooth unriffle_skewed 1023 1024 -6143 -10 0 sawtooth unriffle 1024 1024 -6143 0 0 sawtooth unriffle_skewed 1025 2048 -7188 -1033 0 We get a similar reduction of get_next calls here, but only a slight reduction of set_next calls, if at all. And here are the results of p0071-sort.sh before: 0071.12: llist_mergesort() unsorted 0.36(0.33+0.01) 0071.14: llist_mergesort() sorted 0.15(0.13+0.01) 0071.16: llist_mergesort() reversed 0.16(0.14+0.01) ... and here the ones with this patch: 0071.12: llist_mergesort() unsorted 0.24(0.22+0.01) 0071.14: llist_mergesort() sorted 0.12(0.10+0.01) 0071.16: llist_mergesort() reversed 0.12(0.10+0.01) NB: We can't use t/perf/run to compare revisions in one run because it uses the test-tool from the worktree, not from the revisions being tested. Signed-off-by: René Scharfe <l.s.r@xxxxxx> --- mergesort.c | 121 ++++++++++++++++++++++++++++------------------------ 1 file changed, 66 insertions(+), 55 deletions(-) diff --git a/mergesort.c b/mergesort.c index e5fdf2ee4a..6216835566 100644 --- a/mergesort.c +++ b/mergesort.c @@ -1,73 +1,84 @@ #include "cache.h" #include "mergesort.h" -struct mergesort_sublist { - void *ptr; - unsigned long len; -}; - -static void *get_nth_next(void *list, unsigned long n, - void *(*get_next_fn)(const void *)) +/* Combine two sorted lists. Take from `list` on equality. */ +static void *llist_merge(void *list, void *other, + void *(*get_next_fn)(const void *), + void (*set_next_fn)(void *, void *), + int (*compare_fn)(const void *, const void *)) { - while (n-- && list) - list = get_next_fn(list); - return list; -} + void *result = list, *tail; -static void *pop_item(struct mergesort_sublist *l, - void *(*get_next_fn)(const void *)) -{ - void *p = l->ptr; - l->ptr = get_next_fn(l->ptr); - l->len = l->ptr ? (l->len - 1) : 0; - return p; + if (compare_fn(list, other) > 0) { + result = other; + goto other; + } + for (;;) { + do { + tail = list; + list = get_next_fn(list); + if (!list) { + set_next_fn(tail, other); + return result; + } + } while (compare_fn(list, other) <= 0); + set_next_fn(tail, other); + other: + do { + tail = other; + other = get_next_fn(other); + if (!other) { + set_next_fn(tail, list); + return result; + } + } while (compare_fn(list, other) > 0); + set_next_fn(tail, list); + } } +/* + * Perform an iterative mergesort using an array of sublists. + * + * n is the number of items. + * ranks[i] is undefined if n & 2^i == 0, and assumed empty. + * ranks[i] contains a sublist of length 2^i otherwise. + * + * The number of bits in a void pointer limits the number of objects + * that can be created, and thus the number of array elements necessary + * to be able to sort any valid list. + * + * Adding an item to this array is like incrementing a binary number; + * positional values for set bits correspond to sublist lengths. + */ void *llist_mergesort(void *list, void *(*get_next_fn)(const void *), void (*set_next_fn)(void *, void *), int (*compare_fn)(const void *, const void *)) { - unsigned long l; - - if (!list) - return NULL; - for (l = 1; ; l *= 2) { - void *curr; - struct mergesort_sublist p, q; + void *ranks[bitsizeof(void *)]; + size_t n = 0; + int i; - p.ptr = list; - q.ptr = get_nth_next(p.ptr, l, get_next_fn); - if (!q.ptr) - break; - p.len = q.len = l; + while (list) { + void *next = get_next_fn(list); + if (next) + set_next_fn(list, NULL); + for (i = 0; n & (1 << i); i++) + list = llist_merge(ranks[i], list, get_next_fn, + set_next_fn, compare_fn); + n++; + ranks[i] = list; + list = next; + } - if (compare_fn(p.ptr, q.ptr) > 0) - list = curr = pop_item(&q, get_next_fn); + for (i = 0; n; i++, n >>= 1) { + if (!(n & 1)) + continue; + if (list) + list = llist_merge(ranks[i], list, get_next_fn, + set_next_fn, compare_fn); else - list = curr = pop_item(&p, get_next_fn); - - while (p.ptr) { - while (p.len || q.len) { - void *prev = curr; - - if (!p.len) - curr = pop_item(&q, get_next_fn); - else if (!q.len) - curr = pop_item(&p, get_next_fn); - else if (compare_fn(p.ptr, q.ptr) > 0) - curr = pop_item(&q, get_next_fn); - else - curr = pop_item(&p, get_next_fn); - set_next_fn(prev, curr); - } - p.ptr = q.ptr; - p.len = l; - q.ptr = get_nth_next(p.ptr, l, get_next_fn); - q.len = q.ptr ? l : 0; - - } - set_next_fn(curr, NULL); + list = ranks[i]; } return list; } -- 2.33.0