Kyotaro HORIGUCHI <horiguchi.kyotaro@xxxxxxxxxxxxx> writes: > The attached patch is the revised version. Hmm, this still isn't right --- testing shows that you had the comparison rule right the first time. Looking at what we've got here, it's already a substantial fraction of what would be needed to provide a compiler-independent implementation of the int128-based aggregate logic in numeric.c. With that in mind, I propose that we extract the relevant stuff into a new header file that is designed as general-purpose int128 support. Something like the attached. I also attach the test program I put together to verify it. On my Fedora 25 laptop, it appears that the hand-rolled implementation is actually respectably fast compared to gcc's "native" functionality; the test program runs in ~2m for 1B iterations with the native logic, and ~2.5m with the hand-rolled logic. Allowing for overhead and the fact that we're doing the arithmetic twice, we're probably within 2X of the native code. Not bad at all. I'm not entirely sure what to do with the test program: 1. discard it 2. commit it as utils/adt/int128.c, as suggested in its comment 3. commit it somewhere else, maybe src/tools/. Thoughts? regards, tom lane
/*------------------------------------------------------------------------- * * int128.h * Roll-our-own 128-bit integer arithmetic. * * We make use of the native int128 type if there is one, otherwise * implement things the hard way based on two int64 halves. * * Copyright (c) 2017, PostgreSQL Global Development Group * * src/include/utils/int128.h * *------------------------------------------------------------------------- */ #ifndef INT128_H #define INT128_H /* * For testing purposes, use of native int128 can be switched on/off by * predefining USE_NATIVE_INT128. */ #ifndef USE_NATIVE_INT128 #ifdef HAVE_INT128 #define USE_NATIVE_INT128 1 #else #define USE_NATIVE_INT128 0 #endif #endif #if USE_NATIVE_INT128 typedef int128 INT128; /* * Add an unsigned int64 value into an INT128 variable. */ static inline void int128_add_uint64(INT128 *i128, uint64 v) { *i128 += v; } /* * Add a signed int64 value into an INT128 variable. */ static inline void int128_add_int64(INT128 *i128, int64 v) { *i128 += v; } /* * Add the 128-bit product of two int64 values into an INT128 variable. * * XXX with a stupid compiler, this could actually be less efficient than * the other implementation; maybe we should do it by hand always? */ static inline void int128_add_int64_mul_int64(INT128 *i128, int64 x, int64 y) { *i128 += (int128) x *(int128) y; } /* * Compare two INT128 values, return -1, 0, or +1. */ static inline int int128_compare(INT128 x, INT128 y) { if (x < y) return -1; if (x > y) return 1; return 0; } #else /* !USE_NATIVE_INT128 */ /* * We lay out the INT128 structure with the same content and byte ordering * that a native int128 type would (probably) have. This makes no difference * for ordinary use of INT128, but allows union'ing INT128 with int128 for * testing purposes. */ typedef struct { #ifdef WORDS_BIGENDIAN int64 hi; /* most significant 64 bits, including sign */ uint64 lo; /* least significant 64 bits, without sign */ #else uint64 lo; /* least significant 64 bits, without sign */ int64 hi; /* most significant 64 bits, including sign */ #endif } INT128; /* * Add an unsigned int64 value into an INT128 variable. */ static inline void int128_add_uint64(INT128 *i128, uint64 v) { /* * First add the value to the .lo part, then check to see if a carry needs * to be propagated into the .hi part. A carry is needed if both inputs * have high bits set, or if just one input has high bit set while the new * .lo part doesn't. Remember that .lo part is unsigned; we cast to * signed here just as a cheap way to check the high bit. */ uint64 oldlo = i128->lo; i128->lo += v; if (((int64) v < 0 && (int64) oldlo < 0) || (((int64) v < 0 || (int64) oldlo < 0) && (int64) i128->lo >= 0)) i128->hi++; } /* * Add a signed int64 value into an INT128 variable. */ static inline void int128_add_int64(INT128 *i128, int64 v) { /* * This is much like the above except that the carry logic differs for * negative v. Ordinarily we'd need to subtract 1 from the .hi part * (corresponding to adding the sign-extended bits of v to it); but if * there is a carry out of the .lo part, that cancels and we do nothing. */ uint64 oldlo = i128->lo; i128->lo += v; if (v >= 0) { if ((int64) oldlo < 0 && (int64) i128->lo >= 0) i128->hi++; } else { if (!((int64) oldlo < 0 || (int64) i128->lo >= 0)) i128->hi--; } } /* * INT64_AU32 extracts the most significant 32 bits of int64 as int64, while * INT64_AL32 extracts the least significant 32 bits as uint64. */ #define INT64_AU32(i64) ((i64) >> 32) #define INT64_AL32(i64) ((i64) & UINT64CONST(0xFFFFFFFF)) /* * Add the 128-bit product of two int64 values into an INT128 variable. */ static inline void int128_add_int64_mul_int64(INT128 *i128, int64 x, int64 y) { /* INT64_AU32 must use arithmetic right shift */ StaticAssertStmt(((int64) -1 >> 1) == (int64) -1, "arithmetic right shift is needed"); /*---------- * Form the 128-bit product x * y using 64-bit arithmetic. * Considering each 64-bit input as having 32-bit high and low parts, * we can compute * * x * y = ((x.hi << 32) + x.lo) * (((y.hi << 32) + y.lo) * = (x.hi * y.hi) << 64 + * (x.hi * y.lo) << 32 + * (x.lo * y.hi) << 32 + * x.lo * y.lo * * Each individual product is of 32-bit terms so it won't overflow when * computed in 64-bit arithmetic. Then we just have to shift it to the * correct position while adding into the 128-bit result. We must also * keep in mind that the "lo" parts must be treated as unsigned. *---------- */ /* No need to work hard if product must be zero */ if (x != 0 && y != 0) { int64 x_u32 = INT64_AU32(x); uint64 x_l32 = INT64_AL32(x); int64 y_u32 = INT64_AU32(y); uint64 y_l32 = INT64_AL32(y); int64 tmp; /* the first term */ i128->hi += x_u32 * y_u32; /* the second term: sign-extend it only if x is negative */ tmp = x_u32 * y_l32; if (x < 0) i128->hi += INT64_AU32(tmp); else i128->hi += ((uint64) tmp) >> 32; int128_add_uint64(i128, ((uint64) INT64_AL32(tmp)) << 32); /* the third term: sign-extend it only if y is negative */ tmp = x_l32 * y_u32; if (y < 0) i128->hi += INT64_AU32(tmp); else i128->hi += ((uint64) tmp) >> 32; int128_add_uint64(i128, ((uint64) INT64_AL32(tmp)) << 32); /* the fourth term: always unsigned */ int128_add_uint64(i128, x_l32 * y_l32); } } /* * Compare two INT128 values, return -1, 0, or +1. */ static inline int int128_compare(INT128 x, INT128 y) { if (x.hi < y.hi) return -1; if (x.hi > y.hi) return 1; if (x.lo < y.lo) return -1; if (x.lo > y.lo) return 1; return 0; } #endif /* USE_NATIVE_INT128 */ #endif /* INT128_H */
/*------------------------------------------------------------------------- * * int128.c * Testbed for roll-our-own 128-bit integer arithmetic. * * This file is not meant to be compiled into the backend; rather, it builds * a standalone test program that compares the behavior of an implementation * in int128.h to an (assumed correct) int128 native type. * * Copyright (c) 2017, PostgreSQL Global Development Group * * * IDENTIFICATION * src/backend/utils/adt/int128.c * *------------------------------------------------------------------------- */ #include "postgres.h" /* * By default, we test the non-native implementation in int128.h; but * by predefining USE_NATIVE_INT128 to 1, you can test the native * implementation, just to be sure. */ #ifndef USE_NATIVE_INT128 #define USE_NATIVE_INT128 0 #endif #include "int128.h" /* * We assume the parts of this union are laid out compatibly. */ typedef union { int128 i128; INT128 I128; union { #ifdef WORDS_BIGENDIAN int64 hi; uint64 lo; #else uint64 lo; int64 hi; #endif } hl; } test128; /* * Control version of comparator. */ static inline int my_int128_compare(int128 x, int128 y) { if (x < y) return -1; if (x > y) return 1; return 0; } /* * Get a random uint64 value. * We don't assume random() is good for more than 16 bits. */ static uint64 get_random_uint64(void) { uint64 x; x = (uint64) (random() & 0xFFFF) << 48; x |= (uint64) (random() & 0xFFFF) << 32; x |= (uint64) (random() & 0xFFFF) << 16; x |= (uint64) (random() & 0xFFFF); return x; } /* * Main program. * * You can give it a loop count if you don't like the default 1B iterations. */ int main(int argc, char **argv) { long count; if (argc >= 2) count = strtol(argv[1], NULL, 0); else count = 1000000000; while (count-- > 0) { int64 x = get_random_uint64(); int64 y = get_random_uint64(); int64 z = get_random_uint64(); test128 t1; test128 t2; /* check unsigned addition */ t1.hl.hi = x; t1.hl.lo = y; t2 = t1; t1.i128 += (int128) (uint64) z; int128_add_uint64(&t2.I128, (uint64) z); if (t1.hl.hi != t2.hl.hi || t1.hl.lo != t2.hl.lo) { printf("%016lX%016lX + unsigned %lX\n", x, y, z); printf("native = %016lX%016lX\n", t1.hl.hi, t1.hl.lo); printf("result = %016lX%016lX\n", t2.hl.hi, t2.hl.lo); return 1; } /* check signed addition */ t1.hl.hi = x; t1.hl.lo = y; t2 = t1; t1.i128 += (int128) z; int128_add_int64(&t2.I128, z); if (t1.hl.hi != t2.hl.hi || t1.hl.lo != t2.hl.lo) { printf("%016lX%016lX + signed %lX\n", x, y, z); printf("native = %016lX%016lX\n", t1.hl.hi, t1.hl.lo); printf("result = %016lX%016lX\n", t2.hl.hi, t2.hl.lo); return 1; } /* check multiplication */ t1.i128 = (int128) x *(int128) y; t2.hl.hi = t2.hl.lo = 0; int128_add_int64_mul_int64(&t2.I128, x, y); if (t1.hl.hi != t2.hl.hi || t1.hl.lo != t2.hl.lo) { printf("%lX * %lX\n", x, y); printf("native = %016lX%016lX\n", t1.hl.hi, t1.hl.lo); printf("result = %016lX%016lX\n", t2.hl.hi, t2.hl.lo); return 1; } /* check comparison */ t1.hl.hi = x; t1.hl.lo = y; t2.hl.hi = z; t2.hl.lo = get_random_uint64(); if (my_int128_compare(t1.i128, t2.i128) != int128_compare(t1.I128, t2.I128)) { printf("comparison failure: %d vs %d\n", my_int128_compare(t1.i128, t2.i128), int128_compare(t1.I128, t2.I128)); printf("arg1 = %016lX%016lX\n", t1.hl.hi, t1.hl.lo); printf("arg2 = %016lX%016lX\n", t2.hl.hi, t2.hl.lo); return 1; } /* check case with identical hi parts; above will hardly ever hit it */ t2.hl.hi = x; if (my_int128_compare(t1.i128, t2.i128) != int128_compare(t1.I128, t2.I128)) { printf("comparison failure: %d vs %d\n", my_int128_compare(t1.i128, t2.i128), int128_compare(t1.I128, t2.I128)); printf("arg1 = %016lX%016lX\n", t1.hl.hi, t1.hl.lo); printf("arg2 = %016lX%016lX\n", t2.hl.hi, t2.hl.lo); return 1; } } return 0; }
-- Sent via pgsql-general mailing list (pgsql-general@xxxxxxxxxxxxxx) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-general