2016-05-04 21:27+0200, Radim Krčmář: > And I completely forgot that we can preallocate the whole tree and use a > effective packed storage thanks to that. My first guess is that it > would be make sense with double the memory of our bitmap. Scans and > insertion would be slower than for a per-vcpu list, but much faster than > with a dynamically allocated structure. I'll think a bit about that. The preallocate allocated radix tree has * fast insertion (few atomic bit writes at places that are computed only with arithmetics = no pointer traversal) * feasible lockless removal (not sure how fast it will be, but it only slows down the read side, and if we can guarantee that all vcpus are stopped, then it's trivial) * compatible with bitmap (a subset of the tree is 1:1 legacy bitmap) * very fast search if pages are allocated in the same range (on par with list) * only ~2 times slower search that bitmap when the bitmap is full * high memory consumption (roughly double the bitmap) The only reason why a dynamically allocated tree would be used instead is unacceptable memory consumption, but we already lived with a bitmap, so I think it is worth a try. I wrote a dirty, buggy an unoptimized userspace implementation. (Code attached if you're interested.) Especially the bitmap is crude, so real numbers will look differently, but asymptotic behaviour should remain. If we consider a slot with 10000000 pages, which is 38 GB of tracked memory, and datasets from 100 to 10000000 random dirty frames, then timing for codes that compact [l]ist, [t]ree and [b]itmap into a dirty list, are: % ./radix_tree 10000000 100 kB size l:0 t:1513 b:1220 l: 642 ns (1.00, 78.05, 45977.38) t: 50108 ns (0.01, 1.00, 589.08) b: 29517475 ns (0.00, 0.00, 1.00) % ./radix_tree 10000000 1000 l: 9542 ns (1.00, 43.29, 3347.11) t: 413110 ns (0.02, 1.00, 77.31) b: 31938143 ns (0.00, 0.01, 1.00) % ./radix_tree 10000000 10000 l: 105859 ns (1.00, 21.07, 273.75) t: 2230429 ns (0.05, 1.00, 12.99) b: 28978510 ns (0.00, 0.08, 1.00) % ./radix_tree 10000000 100000 l: 984729 ns (1.00, 12.16, 25.53) t: 11970068 ns (0.08, 1.00, 2.10) b: 25144204 ns (0.04, 0.48, 1.00) % ./radix_tree 10000000 1000000 kB size l:7812 t:1513 b:1220 l: 6420041 ns (1.00, 5.69, 4.13) t: 36511925 ns (0.18, 1.00, 0.73) b: 26523022 ns (0.24, 1.38, 1.00) % ./radix_tree 10000000 10000000 kB size l:78125 t:1513 b:1220 l: 38180627 ns (1.00, 1.56, 1.31) t: 59681915 ns (0.64, 1.00, 0.84) b: 50079550 ns (0.76, 1.19, 1.00) The tree is 589.08 times faster than bitmap with 100 "dirty pages" and list is 78.05 times faster than the tree. Speedups get smaller as the increases and the tree ends up performing worse than a bitmap. The tree performs much better when the dataset isn't random, i.e. dirty frames are a seqence from 0 to the size of the dataset: % ./radix_tree 10000000 1000 seq l: 8928 ns (1.00, 1.93, 3366.93) t: 17198 ns (0.52, 1.00, 1747.88) b: 30059972 ns (0.00, 0.00, 1.00) % ./radix_tree 10000000 10000 seq l: 96469 ns (1.00, 1.36, 271.40) t: 131124 ns (0.74, 1.00, 199.67) b: 26181839 ns (0.00, 0.01, 1.00) % ./radix_tree 10000000 100000 seq l: 1256119 ns (1.00, 1.28, 24.71) t: 1609232 ns (0.78, 1.00, 19.29) b: 31036003 ns (0.04, 0.05, 1.00)
#include <stdlib.h> #include <stdint.h> #include <string.h> #include <stdio.h> #include <inttypes.h> #include <time.h> typedef uint8_t u8; typedef uint64_t u64; typedef int16_t s16; const u8 radix_shift = 3; const u8 radix = 1 << 3; struct packed_radix_8_tree { u64 bitmap_offset; u64 bitmap_size; u8 packed[]; }; static inline void set_bit(u8 *data, u8 bit) { *data |= 1 << bit; /* not atomic */ } static inline u8 find_next_bit(u8 *data, u8 size, u8 bit) { while ((bit < size) && !(*data & (1 << bit))) bit++; return bit; } static inline u8 find_first_bit(u8 *data, u8 size) { return find_next_bit(data, size, 0); } static inline u64 child_node(u8 radix_shift, u64 node, u8 child) { return (node * radix) + 1 + child; } /* cn = child(radix, node, ci); * n = cn; * i = parent_node(radix, &n); * i == ci && n == node; */ static inline u8 parent_node(u8 radix, u64 *node) { u64 child_mask = radix - 1; u8 child = (*node - 1) & child_mask; *node = (*node - 1) / radix; return child; } u64 packed_radix_8_tree_encode_leaf(struct packed_radix_8_tree *tree, u64 value) { return value + tree->bitmap_offset + 1; } u64 packed_radix_8_tree_decode_leaf(struct packed_radix_8_tree *tree, u64 leaf) { return leaf - tree->bitmap_offset - 1; } void packed_radix_8_tree_insert(struct packed_radix_8_tree *tree, u64 value) { u64 node = packed_radix_8_tree_encode_leaf(tree, value); do { u8 child = parent_node(radix, &node); set_bit(&tree->packed[node], child); } while (node); } u64 __packed_radix_8_tree_find_first_leaf(struct packed_radix_8_tree *tree, u64 node) { u8 child; while (node < tree->bitmap_offset) { child = find_first_bit(&tree->packed[node], radix); if (child == radix) return -1ULL; node = child_node(radix, node, child); } return node; } u64 packed_radix_8_tree_find_first_leaf(struct packed_radix_8_tree *tree) { return __packed_radix_8_tree_find_first_leaf(tree, 0); } u64 packed_radix_8_tree_find_next_leaf(struct packed_radix_8_tree *tree, u64 leaf) { u64 node = leaf; u8 child; do { child = parent_node(radix, &node); child = find_next_bit(&tree->packed[node], radix, child + 1); } while (node && child == radix); if (child == radix) return -1ULL; return __packed_radix_8_tree_find_first_leaf(tree, child_node(radix, node, child)); } struct packed_radix_8_tree * packed_radix_8_tree_alloc(u64 size) { struct packed_radix_8_tree *tree; u64 bitmap_size = radix; u64 tree_size = 0; while (bitmap_size < size) { tree_size += bitmap_size; bitmap_size *= radix; } tree = malloc(sizeof(*tree) + (size + tree_size) / 8); memset(tree->packed, 0, (size + tree_size) / 8); tree->bitmap_offset = tree_size; tree->bitmap_size = size; return tree; } #define for_each_packed_radix_8_tree_leaf(tree, leaf) \ for (leaf = packed_radix_8_tree_find_first_leaf(tree); \ leaf != -1ULL; \ leaf = packed_radix_8_tree_find_next_leaf(tree, leaf)) u64 bitmap_find_next_bit(u8 *bitmap, u64 size, u64 bit) { u8 offset; while (bit < size) { for (offset = bit % 8; offset < 8 && !(bitmap[bit/8] & (1 << offset)); bit++, offset++); if (offset < 8) break; } return bit; } #define for_each_packed_radix_8_tree_bit(tree, bit) \ for (bit = bitmap_find_next_bit(&tree->packed[tree->bitmap_offset / radix], tree->bitmap_size, 0); \ bit < tree->bitmap_size; \ bit = bitmap_find_next_bit(&tree->packed[tree->bitmap_offset / radix], tree->bitmap_size, bit + 1)) u64 time_tree(struct packed_radix_8_tree *tree, u64 *list) { struct timespec begin, end; u64 leaf, count = 0; clock_gettime(CLOCK_MONOTONIC, &begin); for_each_packed_radix_8_tree_leaf(tree, leaf) list[count++] = packed_radix_8_tree_decode_leaf(tree, leaf); clock_gettime(CLOCK_MONOTONIC, &end); return (end.tv_sec - begin.tv_sec)*1000000000 + end.tv_nsec - begin.tv_nsec; } u64 time_bitmap(struct packed_radix_8_tree *tree, u64 *list) { struct timespec begin, end; u64 bit, count = 0; clock_gettime(CLOCK_MONOTONIC, &begin); for_each_packed_radix_8_tree_bit(tree, bit) list[count++] = bit; clock_gettime(CLOCK_MONOTONIC, &end); return (end.tv_sec - begin.tv_sec)*1000000000 + end.tv_nsec - begin.tv_nsec; } u64 time_list(u64 *list, u64 size, u64 *dest) { struct timespec begin, end; u64 i, list_result = 0; clock_gettime(CLOCK_MONOTONIC, &begin); for (i = 0; i < size; i++) dest[i] = list[i]; clock_gettime(CLOCK_MONOTONIC, &end); return (end.tv_sec - begin.tv_sec)*1000000000 + end.tv_nsec - begin.tv_nsec; } int main(int argc, char **argv) { u64 size = strtoul(argv[1], NULL, 10); u64 i, count = strtoul(argv[2], NULL, 10); u64 *list = malloc(sizeof(*list) * count); u64 *dest = malloc(sizeof(*dest) * count); struct packed_radix_8_tree *tree = packed_radix_8_tree_alloc(size); u64 tl, tt, tb; printf("kB size l:%"PRIu64" t:%"PRIu64" b:%"PRIu64"\n", sizeof(*list) * count / 1024, (tree->bitmap_size + tree->bitmap_offset) / 8 / 1024, size / 8 / 1024); srand(size + count); for (i = 0; i < count; i++) { u64 val = argc > 3 ? i : rand() % size; packed_radix_8_tree_insert(tree, val); list[i] = val; } tl = time_list(list, count, dest); tt = time_tree(tree, dest); tb = time_bitmap(tree, dest); printf("l: %10"PRIu64" ns (%4.2f, %6.2f, %8.2f)\n", tl, 1.0, (float)tt/tl, (float)tb/tl); printf("t: %10"PRIu64" ns (%4.2f, %6.2f, %8.2f)\n", tt, (float)tl/tt, 1.0, (float)tb/tt); printf("b: %10"PRIu64" ns (%4.2f, %6.2f, %8.2f)\n", tb, (float)tl/tb, (float)tt/tb, 1.0); }