I wrote: > Looking at what we've got here, it's already a substantial fraction of > what would be needed to provide a compiler-independent implementation > of the int128-based aggregate logic in numeric.c. With that in mind, > I propose that we extract the relevant stuff into a new header file > that is designed as general-purpose int128 support. Something like the > attached. I also attach the test program I put together to verify it. Here's a fleshed-out patch for the original problem based on that. I found that direct int64-to-int128 coercions were also needed to handle some of the steps in timestamp.c, so I added those to int128.h. I think it would be reasonable to back-patch this, although it would need some adjustments for the back branches since we only recently got rid of the float-timestamp option. Also I'd not be inclined to depend on native int128 any further back than it was already in use. regards, tom lane
diff --git a/src/backend/utils/adt/int128.c b/src/backend/utils/adt/int128.c index ...8bc0663 . *** a/src/backend/utils/adt/int128.c --- b/src/backend/utils/adt/int128.c *************** *** 0 **** --- 1,181 ---- + /*------------------------------------------------------------------------- + * + * int128.c + * Testbed for roll-our-own 128-bit integer arithmetic. + * + * This file is not meant to be compiled into the backend; rather, it builds + * a standalone test program that compares the behavior of an implementation + * in int128.h to an (assumed correct) int128 native type. + * + * Copyright (c) 2017, PostgreSQL Global Development Group + * + * + * IDENTIFICATION + * src/backend/utils/adt/int128.c + * + *------------------------------------------------------------------------- + */ + + #include "postgres.h" + + /* + * By default, we test the non-native implementation in int128.h; but + * by predefining USE_NATIVE_INT128 to 1, you can test the native + * implementation, just to be sure. + */ + #ifndef USE_NATIVE_INT128 + #define USE_NATIVE_INT128 0 + #endif + + #include "utils/int128.h" + + /* + * We assume the parts of this union are laid out compatibly. + */ + typedef union + { + int128 i128; + INT128 I128; + union + { + #ifdef WORDS_BIGENDIAN + int64 hi; + uint64 lo; + #else + uint64 lo; + int64 hi; + #endif + } hl; + } test128; + + + /* + * Control version of comparator. + */ + static inline int + my_int128_compare(int128 x, int128 y) + { + if (x < y) + return -1; + if (x > y) + return 1; + return 0; + } + + /* + * Get a random uint64 value. + * We don't assume random() is good for more than 16 bits. + */ + static uint64 + get_random_uint64(void) + { + uint64 x; + + x = (uint64) (random() & 0xFFFF) << 48; + x |= (uint64) (random() & 0xFFFF) << 32; + x |= (uint64) (random() & 0xFFFF) << 16; + x |= (uint64) (random() & 0xFFFF); + return x; + } + + /* + * Main program. + * + * You can give it a loop count if you don't like the default 1B iterations. + */ + int + main(int argc, char **argv) + { + long count; + + if (argc >= 2) + count = strtol(argv[1], NULL, 0); + else + count = 1000000000; + + while (count-- > 0) + { + int64 x = get_random_uint64(); + int64 y = get_random_uint64(); + int64 z = get_random_uint64(); + test128 t1; + test128 t2; + + /* check unsigned addition */ + t1.hl.hi = x; + t1.hl.lo = y; + t2 = t1; + t1.i128 += (int128) (uint64) z; + int128_add_uint64(&t2.I128, (uint64) z); + + if (t1.hl.hi != t2.hl.hi || t1.hl.lo != t2.hl.lo) + { + printf("%016lX%016lX + unsigned %lX\n", x, y, z); + printf("native = %016lX%016lX\n", t1.hl.hi, t1.hl.lo); + printf("result = %016lX%016lX\n", t2.hl.hi, t2.hl.lo); + return 1; + } + + /* check signed addition */ + t1.hl.hi = x; + t1.hl.lo = y; + t2 = t1; + t1.i128 += (int128) z; + int128_add_int64(&t2.I128, z); + + if (t1.hl.hi != t2.hl.hi || t1.hl.lo != t2.hl.lo) + { + printf("%016lX%016lX + signed %lX\n", x, y, z); + printf("native = %016lX%016lX\n", t1.hl.hi, t1.hl.lo); + printf("result = %016lX%016lX\n", t2.hl.hi, t2.hl.lo); + return 1; + } + + /* check multiplication */ + t1.i128 = (int128) x *(int128) y; + + t2.hl.hi = t2.hl.lo = 0; + int128_add_int64_mul_int64(&t2.I128, x, y); + + if (t1.hl.hi != t2.hl.hi || t1.hl.lo != t2.hl.lo) + { + printf("%lX * %lX\n", x, y); + printf("native = %016lX%016lX\n", t1.hl.hi, t1.hl.lo); + printf("result = %016lX%016lX\n", t2.hl.hi, t2.hl.lo); + return 1; + } + + /* check comparison */ + t1.hl.hi = x; + t1.hl.lo = y; + t2.hl.hi = z; + t2.hl.lo = get_random_uint64(); + + if (my_int128_compare(t1.i128, t2.i128) != + int128_compare(t1.I128, t2.I128)) + { + printf("comparison failure: %d vs %d\n", + my_int128_compare(t1.i128, t2.i128), + int128_compare(t1.I128, t2.I128)); + printf("arg1 = %016lX%016lX\n", t1.hl.hi, t1.hl.lo); + printf("arg2 = %016lX%016lX\n", t2.hl.hi, t2.hl.lo); + return 1; + } + + /* check case with identical hi parts; above will hardly ever hit it */ + t2.hl.hi = x; + + if (my_int128_compare(t1.i128, t2.i128) != + int128_compare(t1.I128, t2.I128)) + { + printf("comparison failure: %d vs %d\n", + my_int128_compare(t1.i128, t2.i128), + int128_compare(t1.I128, t2.I128)); + printf("arg1 = %016lX%016lX\n", t1.hl.hi, t1.hl.lo); + printf("arg2 = %016lX%016lX\n", t2.hl.hi, t2.hl.lo); + return 1; + } + } + + return 0; + } diff --git a/src/backend/utils/adt/timestamp.c b/src/backend/utils/adt/timestamp.c index 4be1999..88ecb78 100644 *** a/src/backend/utils/adt/timestamp.c --- b/src/backend/utils/adt/timestamp.c *************** *** 33,38 **** --- 33,40 ---- #include "utils/array.h" #include "utils/builtins.h" #include "utils/datetime.h" + #include "utils/int128.h" + /* * gcc's -ffast-math switch breaks routines that expect exact results from *************** timestamptz_cmp_timestamp(PG_FUNCTION_AR *** 2288,2302 **** /* * interval_relop - is interval1 relop interval2 */ ! static inline TimeOffset interval_cmp_value(const Interval *interval) { ! TimeOffset span; ! span = interval->time; ! span += interval->month * INT64CONST(30) * USECS_PER_DAY; ! span += interval->day * INT64CONST(24) * USECS_PER_HOUR; return span; } --- 2290,2324 ---- /* * interval_relop - is interval1 relop interval2 + * + * Interval comparison is based on converting interval values to a linear + * representation expressed in the units of the time field (microseconds, + * in the case of integer timestamps) with days assumed to be always 24 hours + * and months assumed to be always 30 days. To avoid overflow, we need a + * wider-than-int64 datatype for the linear representation, so use INT128. */ ! ! static inline INT128 interval_cmp_value(const Interval *interval) { ! INT128 span; ! int64 dayfraction; ! int64 days; ! /* ! * Separate time field into days and dayfraction, then add the month and ! * day fields to the days part. We cannot overflow int64 days here. ! */ ! dayfraction = interval->time % USECS_PER_DAY; ! days = interval->time / USECS_PER_DAY; ! days += interval->month * INT64CONST(30); ! days += interval->day; ! ! /* Widen dayfraction to 128 bits */ ! span = int64_to_int128(dayfraction); ! ! /* Scale up days to microseconds, forming a 128-bit product */ ! int128_add_int64_mul_int64(&span, days, USECS_PER_DAY); return span; } *************** interval_cmp_value(const Interval *inter *** 2304,2313 **** static int interval_cmp_internal(Interval *interval1, Interval *interval2) { ! TimeOffset span1 = interval_cmp_value(interval1); ! TimeOffset span2 = interval_cmp_value(interval2); ! return ((span1 < span2) ? -1 : (span1 > span2) ? 1 : 0); } Datum --- 2326,2335 ---- static int interval_cmp_internal(Interval *interval1, Interval *interval2) { ! INT128 span1 = interval_cmp_value(interval1); ! INT128 span2 = interval_cmp_value(interval2); ! return int128_compare(span1, span2); } Datum *************** Datum *** 2384,2392 **** interval_hash(PG_FUNCTION_ARGS) { Interval *interval = PG_GETARG_INTERVAL_P(0); ! TimeOffset span = interval_cmp_value(interval); ! return DirectFunctionCall1(hashint8, Int64GetDatumFast(span)); } /* overlaps_timestamp() --- implements the SQL OVERLAPS operator. --- 2406,2423 ---- interval_hash(PG_FUNCTION_ARGS) { Interval *interval = PG_GETARG_INTERVAL_P(0); ! INT128 span = interval_cmp_value(interval); ! int64 span64; ! /* ! * Use only the least significant 64 bits for hashing. The upper 64 bits ! * seldom add any useful information, and besides we must do it like this ! * for compatibility with hashes calculated before use of INT128 was ! * introduced. ! */ ! span64 = int128_to_int64(span); ! ! return DirectFunctionCall1(hashint8, Int64GetDatumFast(span64)); } /* overlaps_timestamp() --- implements the SQL OVERLAPS operator. diff --git a/src/include/utils/int128.h b/src/include/utils/int128.h index ...876610b . *** a/src/include/utils/int128.h --- b/src/include/utils/int128.h *************** *** 0 **** --- 1,274 ---- + /*------------------------------------------------------------------------- + * + * int128.h + * Roll-our-own 128-bit integer arithmetic. + * + * We make use of the native int128 type if there is one, otherwise + * implement things the hard way based on two int64 halves. + * + * Copyright (c) 2017, PostgreSQL Global Development Group + * + * src/include/utils/int128.h + * + *------------------------------------------------------------------------- + */ + #ifndef INT128_H + #define INT128_H + + /* + * For testing purposes, use of native int128 can be switched on/off by + * predefining USE_NATIVE_INT128. + */ + #ifndef USE_NATIVE_INT128 + #ifdef HAVE_INT128 + #define USE_NATIVE_INT128 1 + #else + #define USE_NATIVE_INT128 0 + #endif + #endif + + + #if USE_NATIVE_INT128 + + typedef int128 INT128; + + /* + * Add an unsigned int64 value into an INT128 variable. + */ + static inline void + int128_add_uint64(INT128 *i128, uint64 v) + { + *i128 += v; + } + + /* + * Add a signed int64 value into an INT128 variable. + */ + static inline void + int128_add_int64(INT128 *i128, int64 v) + { + *i128 += v; + } + + /* + * Add the 128-bit product of two int64 values into an INT128 variable. + * + * XXX with a stupid compiler, this could actually be less efficient than + * the other implementation; maybe we should do it by hand always? + */ + static inline void + int128_add_int64_mul_int64(INT128 *i128, int64 x, int64 y) + { + *i128 += (int128) x *(int128) y; + } + + /* + * Compare two INT128 values, return -1, 0, or +1. + */ + static inline int + int128_compare(INT128 x, INT128 y) + { + if (x < y) + return -1; + if (x > y) + return 1; + return 0; + } + + /* + * Widen int64 to INT128. + */ + static inline INT128 + int64_to_int128(int64 v) + { + return (INT128) v; + } + + /* + * Convert INT128 to int64 (losing any high-order bits). + * This also works fine for casting down to uint64. + */ + static inline int64 + int128_to_int64(INT128 val) + { + return (int64) val; + } + + #else /* !USE_NATIVE_INT128 */ + + /* + * We lay out the INT128 structure with the same content and byte ordering + * that a native int128 type would (probably) have. This makes no difference + * for ordinary use of INT128, but allows union'ing INT128 with int128 for + * testing purposes. + */ + typedef struct + { + #ifdef WORDS_BIGENDIAN + int64 hi; /* most significant 64 bits, including sign */ + uint64 lo; /* least significant 64 bits, without sign */ + #else + uint64 lo; /* least significant 64 bits, without sign */ + int64 hi; /* most significant 64 bits, including sign */ + #endif + } INT128; + + /* + * Add an unsigned int64 value into an INT128 variable. + */ + static inline void + int128_add_uint64(INT128 *i128, uint64 v) + { + /* + * First add the value to the .lo part, then check to see if a carry needs + * to be propagated into the .hi part. A carry is needed if both inputs + * have high bits set, or if just one input has high bit set while the new + * .lo part doesn't. Remember that .lo part is unsigned; we cast to + * signed here just as a cheap way to check the high bit. + */ + uint64 oldlo = i128->lo; + + i128->lo += v; + if (((int64) v < 0 && (int64) oldlo < 0) || + (((int64) v < 0 || (int64) oldlo < 0) && (int64) i128->lo >= 0)) + i128->hi++; + } + + /* + * Add a signed int64 value into an INT128 variable. + */ + static inline void + int128_add_int64(INT128 *i128, int64 v) + { + /* + * This is much like the above except that the carry logic differs for + * negative v. Ordinarily we'd need to subtract 1 from the .hi part + * (corresponding to adding the sign-extended bits of v to it); but if + * there is a carry out of the .lo part, that cancels and we do nothing. + */ + uint64 oldlo = i128->lo; + + i128->lo += v; + if (v >= 0) + { + if ((int64) oldlo < 0 && (int64) i128->lo >= 0) + i128->hi++; + } + else + { + if (!((int64) oldlo < 0 || (int64) i128->lo >= 0)) + i128->hi--; + } + } + + /* + * INT64_AU32 extracts the most significant 32 bits of int64 as int64, while + * INT64_AL32 extracts the least significant 32 bits as uint64. + */ + #define INT64_AU32(i64) ((i64) >> 32) + #define INT64_AL32(i64) ((i64) & UINT64CONST(0xFFFFFFFF)) + + /* + * Add the 128-bit product of two int64 values into an INT128 variable. + */ + static inline void + int128_add_int64_mul_int64(INT128 *i128, int64 x, int64 y) + { + /* INT64_AU32 must use arithmetic right shift */ + StaticAssertStmt(((int64) -1 >> 1) == (int64) -1, + "arithmetic right shift is needed"); + + /*---------- + * Form the 128-bit product x * y using 64-bit arithmetic. + * Considering each 64-bit input as having 32-bit high and low parts, + * we can compute + * + * x * y = ((x.hi << 32) + x.lo) * (((y.hi << 32) + y.lo) + * = (x.hi * y.hi) << 64 + + * (x.hi * y.lo) << 32 + + * (x.lo * y.hi) << 32 + + * x.lo * y.lo + * + * Each individual product is of 32-bit terms so it won't overflow when + * computed in 64-bit arithmetic. Then we just have to shift it to the + * correct position while adding into the 128-bit result. We must also + * keep in mind that the "lo" parts must be treated as unsigned. + *---------- + */ + + /* No need to work hard if product must be zero */ + if (x != 0 && y != 0) + { + int64 x_u32 = INT64_AU32(x); + uint64 x_l32 = INT64_AL32(x); + int64 y_u32 = INT64_AU32(y); + uint64 y_l32 = INT64_AL32(y); + int64 tmp; + + /* the first term */ + i128->hi += x_u32 * y_u32; + + /* the second term: sign-extend it only if x is negative */ + tmp = x_u32 * y_l32; + if (x < 0) + i128->hi += INT64_AU32(tmp); + else + i128->hi += ((uint64) tmp) >> 32; + int128_add_uint64(i128, ((uint64) INT64_AL32(tmp)) << 32); + + /* the third term: sign-extend it only if y is negative */ + tmp = x_l32 * y_u32; + if (y < 0) + i128->hi += INT64_AU32(tmp); + else + i128->hi += ((uint64) tmp) >> 32; + int128_add_uint64(i128, ((uint64) INT64_AL32(tmp)) << 32); + + /* the fourth term: always unsigned */ + int128_add_uint64(i128, x_l32 * y_l32); + } + } + + /* + * Compare two INT128 values, return -1, 0, or +1. + */ + static inline int + int128_compare(INT128 x, INT128 y) + { + if (x.hi < y.hi) + return -1; + if (x.hi > y.hi) + return 1; + if (x.lo < y.lo) + return -1; + if (x.lo > y.lo) + return 1; + return 0; + } + + /* + * Widen int64 to INT128. + */ + static inline INT128 + int64_to_int128(int64 v) + { + INT128 val; + + val.lo = (uint64) v; + val.hi = (v < 0) ? -INT64CONST(1) : INT64CONST(0); + return val; + } + + /* + * Convert INT128 to int64 (losing any high-order bits). + * This also works fine for casting down to uint64. + */ + static inline int64 + int128_to_int64(INT128 val) + { + return (int64) val.lo; + } + + #endif /* USE_NATIVE_INT128 */ + + #endif /* INT128_H */ diff --git a/src/test/regress/expected/interval.out b/src/test/regress/expected/interval.out index 946c97a..d4e6a00 100644 *** a/src/test/regress/expected/interval.out --- b/src/test/regress/expected/interval.out *************** SELECT '' AS fortyfive, r1.*, r2.* *** 207,212 **** --- 207,248 ---- | 34 years | 6 years (45 rows) + -- intervals out of 64bit linear value in microseconds + CREATE TABLE INTERVAL_TBL_OF (f1 interval); + INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('2147483647 days 2147483647 months'); + INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('1 year'); + INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('-2147483648 days -2147483648 months'); + SELECT '' AS "64bits", r1.*, r2.* + FROM INTERVAL_TBL_OF r1, INTERVAL_TBL_OF r2 + WHERE r1.f1 > r2.f1 + ORDER BY r1.f1, r2.f1; + 64bits | f1 | f1 + --------+----------------------------------------+------------------------------------------- + | 1 year | -178956970 years -8 mons -2147483648 days + | 178956970 years 7 mons 2147483647 days | -178956970 years -8 mons -2147483648 days + | 178956970 years 7 mons 2147483647 days | 1 year + (3 rows) + + CREATE INDEX ON INTERVAL_TBL_OF USING btree (f1); + SET enable_seqscan to false; + EXPLAIN SELECT '' AS "64bits_2", f1 + FROM INTERVAL_TBL_OF r1 ORDER BY f1; + QUERY PLAN + -------------------------------------------------------------------------------------------------------- + Index Only Scan using interval_tbl_of_f1_idx on interval_tbl_of r1 (cost=0.13..12.18 rows=3 width=48) + (1 row) + + SELECT '' AS "64bits_2", f1 + FROM INTERVAL_TBL_OF r1 ORDER BY f1; + 64bits_2 | f1 + ----------+------------------------------------------- + | -178956970 years -8 mons -2147483648 days + | 1 year + | 178956970 years 7 mons 2147483647 days + (3 rows) + + SET enable_seqscan to default; + DROP TABLE INTERVAL_TBL_OF; -- Test multiplication and division with intervals. -- Floating point arithmetic rounding errors can lead to unexpected results, -- though the code attempts to do the right thing and round up to days and diff --git a/src/test/regress/sql/interval.sql b/src/test/regress/sql/interval.sql index cff9ada..aa21673 100644 *** a/src/test/regress/sql/interval.sql --- b/src/test/regress/sql/interval.sql *************** SELECT '' AS fortyfive, r1.*, r2.* *** 59,64 **** --- 59,81 ---- WHERE r1.f1 > r2.f1 ORDER BY r1.f1, r2.f1; + -- intervals out of 64bit linear value in microseconds + CREATE TABLE INTERVAL_TBL_OF (f1 interval); + INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('2147483647 days 2147483647 months'); + INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('1 year'); + INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('-2147483648 days -2147483648 months'); + SELECT '' AS "64bits", r1.*, r2.* + FROM INTERVAL_TBL_OF r1, INTERVAL_TBL_OF r2 + WHERE r1.f1 > r2.f1 + ORDER BY r1.f1, r2.f1; + CREATE INDEX ON INTERVAL_TBL_OF USING btree (f1); + SET enable_seqscan to false; + EXPLAIN SELECT '' AS "64bits_2", f1 + FROM INTERVAL_TBL_OF r1 ORDER BY f1; + SELECT '' AS "64bits_2", f1 + FROM INTERVAL_TBL_OF r1 ORDER BY f1; + SET enable_seqscan to default; + DROP TABLE INTERVAL_TBL_OF; -- Test multiplication and division with intervals. -- Floating point arithmetic rounding errors can lead to unexpected results,
-- Sent via pgsql-general mailing list (pgsql-general@xxxxxxxxxxxxxx) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-general