mergesort: use ranks stack

The bottom-up mergesort implementation needs to skip through sublists a
lot.  A recursive version could avoid that, but would require log2(n)
stack frames.  Explicitly manage a stack of sorted sublists of various
lengths instead to avoid fast-forwarding while also keeping a lid on
memory usage.

While this patch was developed independently, a ranks stack is also used
in https://github.com/mono/mono/blob/master/mono/eglib/sort.frag.h by
the Mono project.

The idea is to keep slots for log2(n_max) sorted sublists, one for each
power of 2.  Such a construct can accommodate lists of any length up to
n_max.  Since there is a known maximum number of items (effectively
SIZE_MAX), we can preallocate the whole rank stack.

We add items one by one, which is akin to incrementing a binary number.
Make use of that by keeping track of the number of items and check bits
in it instead of checking for NULL in the rank stack when checking if a
sublist of a certain rank exists, in order to avoid memory accesses.

The first item can go into the empty first slot as a sublist of length
2^0.  The second one needs to be merged with the previous sublist and
the result goes into the empty second slot as a sublist of length 2^1.
The third one goes into vacated first slot and so on.  At the end we
merge all the sublists to get the result.

The new version still performs a stable sort by making sure to put items
seen earlier first when the compare function indicates equality.  That's
done by preferring items from sublists with a higher rank.

The new merge function also tries to minimize the number of operations.
Like blame.c::blame_merge(), the function doesn't set the next pointer
if it already points to the right item, and it exits when it reaches the
end of one of the two sublists that it's given.  The old code couldn't
do the latter because it kept all items in a single list.

The number of comparisons stays the same, though.  Here's example output
of "test-tool mergesort test" for the rand distributions with the most
number of comparisons with the ranks stack:

   $ t/helper/test-tool mergesort test | awk '
       NR > 1 && $1 != "rand" {next}
       $7 > max[$3] {max[$3] = $7; line[$3] = $0}
       END {for (n in line) print line[n]}
   '

distribut mode                    n        m get_next set_next  compare verdict
rand      copy                  100       32      669      420      569 OK
rand      dither               1023       64     9997     5396     8974 OK
rand      dither               1024      512    10007     6159     8983 OK
rand      dither               1025      256    10993     5988     9968 OK

Here are the differences to the results without this patch:

distribut mode                    n        m get_next set_next  compare
rand      copy                  100       32     -515     -280        0
rand      dither               1023       64    -6376    -4834        0
rand      dither               1024      512    -6377    -4081        0
rand      dither               1025      256    -7461    -5287        0

The numbers of get_next and set_next calls are reduced significantly.

NB: These winners are different than the ones shown in the patch that
introduced the unriffle mode because the addition of the unriffle_skewed
mode in between changed the consumption of rand() values.

Here are the distributions with the most comparisons overall with the
ranks stack:

   $ t/helper/test-tool mergesort test | awk '
       $7 > max[$3] {max[$3] = $7; line[$3] = $0}
       END {for (n in line) print line[n]}
   '

distribut mode                    n        m get_next set_next  compare verdict
sawtooth  unriffle_skewed       100      128      689      632      589 OK
sawtooth  unriffle_skewed      1023     1024    10230    10220     9207 OK
sawtooth  unriffle             1024     1024    10241    10240     9217 OK
sawtooth  unriffle_skewed      1025     2048    11266    10242    10241 OK

And here the differences to before:

distribut mode                    n        m get_next set_next  compare
sawtooth  unriffle_skewed       100      128     -495      -68        0
sawtooth  unriffle_skewed      1023     1024    -6143      -10        0
sawtooth  unriffle             1024     1024    -6143        0        0
sawtooth  unriffle_skewed      1025     2048    -7188    -1033        0

We get a similar reduction of get_next calls here, but only a slight
reduction of set_next calls, if at all.

And here are the results of p0071-sort.sh before:

0071.12: llist_mergesort() unsorted    0.36(0.33+0.01)
0071.14: llist_mergesort() sorted      0.15(0.13+0.01)
0071.16: llist_mergesort() reversed    0.16(0.14+0.01)

... and here the ones with this patch:

0071.12: llist_mergesort() unsorted    0.24(0.22+0.01)
0071.14: llist_mergesort() sorted      0.12(0.10+0.01)
0071.16: llist_mergesort() reversed    0.12(0.10+0.01)

NB: We can't use t/perf/run to compare revisions in one run because it
uses the test-tool from the worktree, not from the revisions being
tested.

Signed-off-by: René Scharfe <l.s.r@web.de>
Signed-off-by: Junio C Hamano <gitster@pobox.com>
This commit is contained in:
René Scharfe 2021-10-01 11:22:39 +02:00 committed by Junio C Hamano
parent 40bc872adb
commit afc72b5d3a

View File

@ -1,73 +1,84 @@
#include "cache.h"
#include "mergesort.h"
struct mergesort_sublist {
void *ptr;
unsigned long len;
};
static void *get_nth_next(void *list, unsigned long n,
void *(*get_next_fn)(const void *))
/* Combine two sorted lists. Take from `list` on equality. */
static void *llist_merge(void *list, void *other,
void *(*get_next_fn)(const void *),
void (*set_next_fn)(void *, void *),
int (*compare_fn)(const void *, const void *))
{
while (n-- && list)
list = get_next_fn(list);
return list;
}
static void *pop_item(struct mergesort_sublist *l,
void *(*get_next_fn)(const void *))
{
void *p = l->ptr;
l->ptr = get_next_fn(l->ptr);
l->len = l->ptr ? (l->len - 1) : 0;
return p;
void *result = list, *tail;
if (compare_fn(list, other) > 0) {
result = other;
goto other;
}
for (;;) {
do {
tail = list;
list = get_next_fn(list);
if (!list) {
set_next_fn(tail, other);
return result;
}
} while (compare_fn(list, other) <= 0);
set_next_fn(tail, other);
other:
do {
tail = other;
other = get_next_fn(other);
if (!other) {
set_next_fn(tail, list);
return result;
}
} while (compare_fn(list, other) > 0);
set_next_fn(tail, list);
}
}
/*
* Perform an iterative mergesort using an array of sublists.
*
* n is the number of items.
* ranks[i] is undefined if n & 2^i == 0, and assumed empty.
* ranks[i] contains a sublist of length 2^i otherwise.
*
* The number of bits in a void pointer limits the number of objects
* that can be created, and thus the number of array elements necessary
* to be able to sort any valid list.
*
* Adding an item to this array is like incrementing a binary number;
* positional values for set bits correspond to sublist lengths.
*/
void *llist_mergesort(void *list,
void *(*get_next_fn)(const void *),
void (*set_next_fn)(void *, void *),
int (*compare_fn)(const void *, const void *))
{
unsigned long l;
void *ranks[bitsizeof(void *)];
size_t n = 0;
int i;
if (!list)
return NULL;
for (l = 1; ; l *= 2) {
void *curr;
struct mergesort_sublist p, q;
while (list) {
void *next = get_next_fn(list);
if (next)
set_next_fn(list, NULL);
for (i = 0; n & (1 << i); i++)
list = llist_merge(ranks[i], list, get_next_fn,
set_next_fn, compare_fn);
n++;
ranks[i] = list;
list = next;
}
p.ptr = list;
q.ptr = get_nth_next(p.ptr, l, get_next_fn);
if (!q.ptr)
break;
p.len = q.len = l;
if (compare_fn(p.ptr, q.ptr) > 0)
list = curr = pop_item(&q, get_next_fn);
for (i = 0; n; i++, n >>= 1) {
if (!(n & 1))
continue;
if (list)
list = llist_merge(ranks[i], list, get_next_fn,
set_next_fn, compare_fn);
else
list = curr = pop_item(&p, get_next_fn);
while (p.ptr) {
while (p.len || q.len) {
void *prev = curr;
if (!p.len)
curr = pop_item(&q, get_next_fn);
else if (!q.len)
curr = pop_item(&p, get_next_fn);
else if (compare_fn(p.ptr, q.ptr) > 0)
curr = pop_item(&q, get_next_fn);
else
curr = pop_item(&p, get_next_fn);
set_next_fn(prev, curr);
}
p.ptr = q.ptr;
p.len = l;
q.ptr = get_nth_next(p.ptr, l, get_next_fn);
q.len = q.ptr ? l : 0;
}
set_next_fn(curr, NULL);
list = ranks[i];
}
return list;
}