On Wed, Apr 24, 2019 at 02:36:33AM +0200, Tomas Vondra wrote:
... I still think the idea with an "overflow batch" is worth considering, because it'd allow us to keep the memory usage within work_mem. And after getting familiar with the hash join code again (haven't messed with it since 9.5 or so) I think it should not be all that difficult. I'll give it a try over the weekend if I get bored for a while.
OK, so I took a stab at this, and overall it seems to be workable. The patches I have are nowhere near committable, but I think the approach works fairly well - the memory is kept in check, and the performance is comparable to the "ballancing" approach tested before. To explain it a bit, the idea is that we can compute how many BufFile structures we can keep in memory - we can't use more than work_mem/2 for that, because then we'd mostly eliminate space for the actual data. For example with 4MB, we know we can keep 128 batches - we need 128 for outer and inner side, so 256 in total, and 256*8kB = 2MB. And then, we just increase the number of batches but instead of adding the BufFile entries, we split batches into slices that we can keep in memory (say, the 128 batches). And we keep BufFiles for the current one and an "overflow file" for the other slices. After processing a slice, we simply switch to the next one, and use the overflow file as a temp file for the first batch - we redistribute it into the other batches in the slice and another overflow file. That's what the v3 patch (named 'single overflow file') does. I does work, but unfortunately it significantly inflates the amount of data written to temporary files. Assume we need e.g. 1024 batches, but only 128 fit into memory. That means we'll need 8 slices, and during the first pass we'll handle 1/8 of the data and write 7/8 to the overflow file. Then after processing the slice and switching to the next one, we repeat this dance - 1/8 gets processed, 6/8 written to another overflow file. So essentially we "forward" about 7/8 + 6/8 + 5/8 + ... + 1/8 = 28/8 = 3.5 of data between slices, and we need to re-shuffle data in each slice, which amounts to additional 1x data. That's pretty significant overhead, as will be clear from the measurements I'll present shortly. But luckily, there's a simple solution to this - instead of writing the data into a single overflow file, we can create one overflow file for each slice. That will leave us with the ~1x of additional writes when distributing data into batches in the current slice, but it eliminates the main source of write amplification - awalanche-like forwarding of data between slices. This relaxes the memory limit a bit again, because we can't really keep the number of overflow files constrained by work_mem, but we should only need few of them (much less than when adding one file per batch right away). For example with 128 in-memory batches, this reduces the amount of necessary memory 128x. And this is what v4 (per-slice overflow file) does, pretty much. Two more comments, regarding memory accounting in previous patches. It was a bit broken, because we actually need 2x the number of BufFiles. We needed nbatch files for outer side and nbatch files for inner side, but we only considered one of those - both when deciding when to increase the number of batches / increase spaceAllowed, and when reporting the memory usage. So with large number of batches the reported amount of used memory was roughly 1/2 of the actual value :-/ The memory accounting was a bit bogus for another reason - spaceUsed simply tracks the amount of memory for hash table contents. But at the end we were simply adding the current space for BufFile stuff, ignoring the fact that that's likely much larger than when the spacePeak value got stored. For example we might have kept early spaceUsed when it was almost work_mem, and then added the final large BufFile allocation. I've fixed both issues in the patches attached to this message. It does not make a huge difference in practice, but it makes it easier to compare values between patches. Now, some test results - I've repeated the simple test with uniform data set, which is pretty much ideal for hash joins (no unexlectedly large batches that can't be split, etc.). I've done this with 1M, 5M, 10M, 25M and 50M rows in the large table (which gets picked for the "hash" side), and measured how much memory gets used, how many batches, how long it takes and how much data gets written to temp files. See the hashjoin-test.sh script for more details. So, here are the results with work_mem = 4MB (so the number of in-memory batches for the last two entries is 128). The columns are: * nbatch - the final number of batches * memory - memory usage, as reported by explain analyze * time - duration of the query (without explain analyze) in seconds * size - size of the large table * temp - amount of data written to temp files * amplif - write amplification (temp / size) 1M rows =================================================================== nbatch memory time size (MB) temp (MB) amplif ------------------------------------------------------------------- master 256 7681 3.3 730 899 1.23 rebalance 256 7711 3.3 730 884 1.21 single file 1024 4161 7.2 730 3168 4.34 per-slice file 1024 4161 4.7 730 1653 2.26 5M rows =================================================================== nbatch memory time size (MB) temp (MB) amplif ------------------------------------------------------------------- master 2048 36353 22 3652 5276 1.44 rebalance 512 16515 18 3652 4169 1.14 single file 4096 4353 156 3652 53897 14.76 per-slice file 4096 4353 28 3652 8106 2.21 10M rows =================================================================== nbatch memory time size (MB) temp (MB) amplif ------------------------------------------------------------------- master 4096 69121 61 7303 10556 1.45 rebalance 512 24326 46 7303 7405 1.01 single file 8192 4636 762 7303 211234 28.92 per-slice file 8192 4636 65 7303 16278 2.23 25M rows =================================================================== nbatch memory time size (MB) temp (MB) amplif ------------------------------------------------------------------- master 8192 134657 190 7303 24279 1.33 rebalance 1024 36611 158 7303 20024 1.10 single file 16384 6011 4054 7303 1046174 57.32 per-slice file 16384 6011 207 7303 39073 2.14 50M rows =================================================================== nbatch memory time size (MB) temp (MB) amplif ------------------------------------------------------------------- master 16384 265729 531 36500 48519 1.33 rebalance 2048 53241 447 36500 48077 1.32 single file - - - 36500 - - per-slice file 32768 8125 451 36500 78662 2.16
From those numbers it's pretty clear that per-slice overflow file does
by far the best job in enforcing work_mem and minimizing the amount of data spilled to temp files. It does write a bit more data than both master and the simple rebalancing, but that's the cost for enforcing work_mem more strictly. It's generally a bit slower than those two approaches, although on the largest scale it's actually a bit faster than master. I think that's pretty acceptable, considering this is meant to address extreme underestimates where we currently just eat memory. The case with single overflow file performs rather poorly - I haven't even collected data from the largest scale, but considering it spilled 1TB of temp files with a dataset half the size, that's not an issue. (Note that this does not mean it needs 1TB of temp space, those writes are spread over time and the files are created/closed as we go. The system only has ~100GB of free disk space.) Gunther, could you try the v2 and v4 patches on your data set? That would be an interesting data point, I think. regards -- Tomas Vondra http://www.2ndQuadrant.comPostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 6ffaa751f2..4d5a6872cc 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -80,6 +80,7 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable, static void ExecParallelHashMergeCounters(HashJoinTable hashtable); static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable); +static void ExecHashUpdateSpacePeak(HashJoinTable hashtable); /* ---------------------------------------------------------------- * ExecHash @@ -193,10 +194,8 @@ MultiExecPrivateHash(HashState *node) if (hashtable->nbuckets != hashtable->nbuckets_optimal) ExecHashIncreaseNumBuckets(hashtable); - /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */ - hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple); - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + /* refresh info about peak used memory */ + ExecHashUpdateSpacePeak(hashtable); hashtable->partialTuples = hashtable->totalTuples; } @@ -1647,12 +1646,56 @@ ExecHashTableInsert(HashJoinTable hashtable, /* Account for space used, and back off if we've used too much */ hashtable->spaceUsed += hashTupleSize; - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + + /* refresh info about peak used memory */ + ExecHashUpdateSpacePeak(hashtable); + + /* + * Consider increasing number of batches. + * + * Each batch requires a non-trivial amount of memory, because BufFile + * includes a PGAlignedBlock (typically 8kB buffer). So when doubling + * the number of batches, we need to be careful and only allow that if + * it actually has a chance of reducing memory usage. + * + * In particular, doubling the number of batches is pointless when + * + * (spaceUsed / 2) < (nbatches * sizeof(BufFile)) + * + * because we expect to save roughly 1/2 of memory currently used for + * data (rows) at the price of doubling the memory used for BufFile. + * + * We can't stop adding batches entirely, because that would just mean + * the batches would need more and more memory. So we need to increase + * the number of batches, even if we can't enforce work_mem properly. + * The goal is to minimize the overall memory usage of the hash join. + * + * Note: This applies mostly to cases of significant underestimates, + * resulting in an explosion of the number of batches. The properly + * estimated cases should generally end up using merge join based on + * high cost of the batched hash join. + */ if (hashtable->spaceUsed + - hashtable->nbuckets_optimal * sizeof(HashJoinTuple) + hashtable->nbuckets_optimal * sizeof(HashJoinTuple) + + hashtable->nbatch * sizeof(PGAlignedBlock) * 2 > hashtable->spaceAllowed) + { ExecHashIncreaseNumBatches(hashtable); + + /* + * Consider increasing the resize threshold. + * + * For well estimated cases this does nothing, because batches are + * expected to account only for small fraction of work_mem. But if + * we significantly underestimate the number of batches, we may end + * up in a situation where BufFile alone exceed work_mem. So move + * the threshold a bit, until the next point where it'll make sense + * to consider adding batches again. + */ + hashtable->spaceAllowed + = Max(hashtable->spaceAllowed, + hashtable->nbatch * sizeof(PGAlignedBlock) * 3); + } } else { @@ -1893,6 +1936,21 @@ ExecHashGetBucketAndBatch(HashJoinTable hashtable, } } +static void +ExecHashUpdateSpacePeak(HashJoinTable hashtable) +{ + Size spaceUsed = hashtable->spaceUsed; + + /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */ + spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple); + + /* Account for memory used for batch files (inner + outer) */ + spaceUsed += hashtable->nbatch * sizeof(PGAlignedBlock) * 2; + + if (spaceUsed > hashtable->spacePeak) + hashtable->spacePeak = spaceUsed; +} + /* * ExecScanHashBucket * scan a hash bucket for matches to the current outer tuple @@ -2272,8 +2330,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse) + mcvsToUse * sizeof(int); hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *) + mcvsToUse * sizeof(int); - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + + /* refresh info about peak used memory */ + ExecHashUpdateSpacePeak(hashtable); /* * Create a skew bucket for each MCV hash value. @@ -2322,8 +2381,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse) hashtable->nSkewBuckets++; hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD; hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD; - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + + /* refresh info about peak used memory */ + ExecHashUpdateSpacePeak(hashtable); } free_attstatsslot(&sslot); @@ -2411,8 +2471,10 @@ ExecHashSkewTableInsert(HashJoinTable hashtable, /* Account for space used, and back off if we've used too much */ hashtable->spaceUsed += hashTupleSize; hashtable->spaceUsedSkew += hashTupleSize; - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + + /* refresh info about peak used memory */ + ExecHashUpdateSpacePeak(hashtable); + while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew) ExecHashRemoveNextSkewBucket(hashtable);
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 799a22e9d5..c957043599 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -2612,6 +2612,8 @@ show_hash_info(HashState *hashstate, ExplainState *es) hinstrument.nbatch, es); ExplainPropertyInteger("Original Hash Batches", NULL, hinstrument.nbatch_original, es); + ExplainPropertyInteger("In-Memory Hash Batches", NULL, + hinstrument.nbatch_original, es); ExplainPropertyInteger("Peak Memory Usage", "kB", spacePeakKb, es); } @@ -2619,21 +2621,38 @@ show_hash_info(HashState *hashstate, ExplainState *es) hinstrument.nbuckets_original != hinstrument.nbuckets) { appendStringInfoSpaces(es->str, es->indent * 2); - appendStringInfo(es->str, - "Buckets: %d (originally %d) Batches: %d (originally %d) Memory Usage: %ldkB\n", - hinstrument.nbuckets, - hinstrument.nbuckets_original, - hinstrument.nbatch, - hinstrument.nbatch_original, - spacePeakKb); + if (hinstrument.nbatch != hinstrument.nbatch_inmemory) + appendStringInfo(es->str, + "Buckets: %d (originally %d) Batches: %d (originally %d, in-memory %d) Memory Usage: %ldkB\n", + hinstrument.nbuckets, + hinstrument.nbuckets_original, + hinstrument.nbatch, + hinstrument.nbatch_original, + hinstrument.nbatch_inmemory, + spacePeakKb); + else + appendStringInfo(es->str, + "Buckets: %d (originally %d) Batches: %d (originally %d) Memory Usage: %ldkB\n", + hinstrument.nbuckets, + hinstrument.nbuckets_original, + hinstrument.nbatch, + hinstrument.nbatch_original, + spacePeakKb); } else { appendStringInfoSpaces(es->str, es->indent * 2); - appendStringInfo(es->str, - "Buckets: %d Batches: %d Memory Usage: %ldkB\n", - hinstrument.nbuckets, hinstrument.nbatch, - spacePeakKb); + if (hinstrument.nbatch != hinstrument.nbatch_inmemory) + appendStringInfo(es->str, + "Buckets: %d Batches: %d (in-memory: %d) Memory Usage: %ldkB\n", + hinstrument.nbuckets, hinstrument.nbatch, + hinstrument.nbatch_inmemory, + spacePeakKb); + else + appendStringInfo(es->str, + "Buckets: %d Batches: %d Memory Usage: %ldkB\n", + hinstrument.nbuckets, hinstrument.nbatch, + spacePeakKb); } } } diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 6ffaa751f2..044d360fd4 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -80,6 +80,7 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable, static void ExecParallelHashMergeCounters(HashJoinTable hashtable); static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable); +static void ExecHashUpdateSpacePeak(HashJoinTable hashtable); /* ---------------------------------------------------------------- * ExecHash @@ -193,10 +194,8 @@ MultiExecPrivateHash(HashState *node) if (hashtable->nbuckets != hashtable->nbuckets_optimal) ExecHashIncreaseNumBuckets(hashtable); - /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */ - hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple); - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + /* refresh info about peak used memory */ + ExecHashUpdateSpacePeak(hashtable); hashtable->partialTuples = hashtable->totalTuples; } @@ -433,6 +432,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls) size_t space_allowed; int nbuckets; int nbatch; + int nbatch_inmemory; double rows; int num_skew_mcvs; int log2_nbuckets; @@ -462,7 +462,8 @@ ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls) state->parallel_state != NULL ? state->parallel_state->nparticipants - 1 : 0, &space_allowed, - &nbuckets, &nbatch, &num_skew_mcvs); + &nbuckets, &nbatch, &nbatch_inmemory, + &num_skew_mcvs); /* nbuckets must be a power of 2 */ log2_nbuckets = my_log2(nbuckets); @@ -489,6 +490,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls) hashtable->nSkewBuckets = 0; hashtable->skewBucketNums = NULL; hashtable->nbatch = nbatch; + hashtable->nbatch_inmemory = nbatch_inmemory; hashtable->curbatch = 0; hashtable->nbatch_original = nbatch; hashtable->nbatch_outstart = nbatch; @@ -498,6 +500,8 @@ ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls) hashtable->skewTuples = 0; hashtable->innerBatchFile = NULL; hashtable->outerBatchFile = NULL; + hashtable->innerOverflowFile = NULL; + hashtable->outerOverflowFile = NULL; hashtable->spaceUsed = 0; hashtable->spacePeak = 0; hashtable->spaceAllowed = space_allowed; @@ -559,14 +563,16 @@ ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls) if (nbatch > 1 && hashtable->parallel_state == NULL) { + int cnt = Min(nbatch, nbatch_inmemory); + /* * allocate and initialize the file arrays in hashCxt (not needed for * parallel case which uses shared tuplestores instead of raw files) */ hashtable->innerBatchFile = (BufFile **) - palloc0(nbatch * sizeof(BufFile *)); + palloc0(cnt * sizeof(BufFile *)); hashtable->outerBatchFile = (BufFile **) - palloc0(nbatch * sizeof(BufFile *)); + palloc0(cnt * sizeof(BufFile *)); /* The files will not be opened until needed... */ /* ... but make sure we have temp tablespaces established for them */ PrepareTempTablespaces(); @@ -665,6 +671,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, size_t *space_allowed, int *numbuckets, int *numbatches, + int *numbatches_inmemory, int *num_skew_mcvs) { int tupsize; @@ -675,6 +682,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, long max_pointers; long mppow2; int nbatch = 1; + int nbatch_inmemory = 1; int nbuckets; double dbuckets; @@ -795,6 +803,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, space_allowed, numbuckets, numbatches, + numbatches_inmemory, num_skew_mcvs); return; } @@ -831,11 +840,22 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, nbatch <<= 1; } + /* + * See how many batches we can fit into memory (driven mostly by size + * of BufFile, with PGAlignedBlock being the largest part of that). + * We need one BufFile for inner and outer side, so we count it twice + * for each batch, and we stop once we exceed (work_mem/2). + */ + while ((nbatch_inmemory * 2) * sizeof(PGAlignedBlock) * 2 + <= (work_mem * 1024L / 2)) + nbatch_inmemory *= 2; + Assert(nbuckets > 0); Assert(nbatch > 0); *numbuckets = nbuckets; *numbatches = nbatch; + *numbatches_inmemory = nbatch_inmemory; } @@ -857,13 +877,21 @@ ExecHashTableDestroy(HashJoinTable hashtable) */ if (hashtable->innerBatchFile != NULL) { - for (i = 1; i < hashtable->nbatch; i++) + int nbatch = Min(hashtable->nbatch, hashtable->nbatch_inmemory); + + for (i = 1; i < nbatch; i++) { if (hashtable->innerBatchFile[i]) BufFileClose(hashtable->innerBatchFile[i]); if (hashtable->outerBatchFile[i]) BufFileClose(hashtable->outerBatchFile[i]); } + + if (hashtable->innerOverflowFile) + BufFileClose(hashtable->innerOverflowFile); + + if (hashtable->outerOverflowFile) + BufFileClose(hashtable->outerOverflowFile); } /* Release working memory (batchCxt is a child, so it goes away too) */ @@ -909,6 +937,8 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) if (hashtable->innerBatchFile == NULL) { + /* XXX nbatch=1, no need to deal with nbatch_inmemory here */ + /* we had no file arrays before */ hashtable->innerBatchFile = (BufFile **) palloc0(nbatch * sizeof(BufFile *)); @@ -919,15 +949,23 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) } else { + int nbatch_tmp = Min(nbatch, hashtable->nbatch_inmemory); + /* enlarge arrays and zero out added entries */ hashtable->innerBatchFile = (BufFile **) - repalloc(hashtable->innerBatchFile, nbatch * sizeof(BufFile *)); + repalloc(hashtable->innerBatchFile, nbatch_tmp * sizeof(BufFile *)); hashtable->outerBatchFile = (BufFile **) - repalloc(hashtable->outerBatchFile, nbatch * sizeof(BufFile *)); - MemSet(hashtable->innerBatchFile + oldnbatch, 0, - (nbatch - oldnbatch) * sizeof(BufFile *)); - MemSet(hashtable->outerBatchFile + oldnbatch, 0, - (nbatch - oldnbatch) * sizeof(BufFile *)); + repalloc(hashtable->outerBatchFile, nbatch_tmp * sizeof(BufFile *)); + + if (oldnbatch < nbatch_tmp) + { + MemSet(hashtable->innerBatchFile + oldnbatch, 0, + (nbatch_tmp - oldnbatch) * sizeof(BufFile *)); + MemSet(hashtable->outerBatchFile + oldnbatch, 0, + (nbatch_tmp - oldnbatch) * sizeof(BufFile *)); + } + + /* no need to initialize the overflow files explicitly */ } MemoryContextSwitchTo(oldcxt); @@ -999,11 +1037,18 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) } else { + BufFile **batchFile; + /* dump it out */ Assert(batchno > curbatch); + + batchFile = ExecHashGetBatchFile(hashtable, batchno, + hashtable->innerBatchFile, + &hashtable->innerOverflowFile); + ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple), hashTuple->hashvalue, - &hashtable->innerBatchFile[batchno]); + batchFile); hashtable->spaceUsed -= hashTupleSize; nfreed++; @@ -1647,22 +1692,33 @@ ExecHashTableInsert(HashJoinTable hashtable, /* Account for space used, and back off if we've used too much */ hashtable->spaceUsed += hashTupleSize; - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + + /* refresh info about peak used memory */ + ExecHashUpdateSpacePeak(hashtable); + + /* Consider increasing number of batches if we filled work_mem. */ if (hashtable->spaceUsed + - hashtable->nbuckets_optimal * sizeof(HashJoinTuple) + hashtable->nbuckets_optimal * sizeof(HashJoinTuple) + + Min(hashtable->nbatch, hashtable->nbatch_inmemory) * sizeof(PGAlignedBlock) * 2 /* inner + outer */ > hashtable->spaceAllowed) ExecHashIncreaseNumBatches(hashtable); } else { + BufFile **batchFile; + /* * put the tuple into a temp file for later batches */ Assert(batchno > hashtable->curbatch); + + batchFile = ExecHashGetBatchFile(hashtable, batchno, + hashtable->innerBatchFile, + &hashtable->innerOverflowFile); + ExecHashJoinSaveTuple(tuple, hashvalue, - &hashtable->innerBatchFile[batchno]); + batchFile); } } @@ -1893,6 +1949,100 @@ ExecHashGetBucketAndBatch(HashJoinTable hashtable, } } +int +ExecHashGetBatchIndex(HashJoinTable hashtable, int batchno) +{ + int slice, + curslice; + + if (hashtable->nbatch <= hashtable->nbatch_inmemory) + return batchno; + + slice = batchno / hashtable->nbatch_inmemory; + curslice = hashtable->curbatch / hashtable->nbatch_inmemory; + + /* slices can't go backwards */ + Assert(slice >= curslice); + + /* overflow slice */ + if (slice > curslice) + return -1; + + /* current slice, compute index in the current array */ + return (batchno % hashtable->nbatch_inmemory); +} + +BufFile ** +ExecHashGetBatchFile(HashJoinTable hashtable, int batchno, + BufFile **batchFiles, BufFile **overflowFile) +{ + int idx = ExecHashGetBatchIndex(hashtable, batchno); + + if (idx == -1) + return overflowFile; + + return &batchFiles[idx]; +} + +void +ExecHashSwitchToNextBatchSlice(HashJoinTable hashtable) +{ + memset(hashtable->innerBatchFile, 0, + hashtable->nbatch_inmemory * sizeof(BufFile *)); + + hashtable->innerBatchFile[0] = hashtable->innerOverflowFile; + hashtable->innerOverflowFile = NULL; + + memset(hashtable->outerBatchFile, 0, + hashtable->nbatch_inmemory * sizeof(BufFile *)); + + hashtable->outerBatchFile[0] = hashtable->outerOverflowFile; + hashtable->outerOverflowFile = NULL; +} + +int +ExecHashSwitchToNextBatch(HashJoinTable hashtable) +{ + int batchidx; + + hashtable->curbatch++; + + /* see if we skipped to the next batch slice */ + batchidx = ExecHashGetBatchIndex(hashtable, hashtable->curbatch); + + /* Can't be -1, current batch is in the current slice by definition. */ + Assert(batchidx >= 0 && batchidx < hashtable->nbatch_inmemory); + + /* + * If we skipped to the next slice of batches, reset the array of files + * and use the overflow file as the first batch. + */ + if (batchidx == 0) + ExecHashSwitchToNextBatchSlice(hashtable); + + return hashtable->curbatch; +} + +static void +ExecHashUpdateSpacePeak(HashJoinTable hashtable) +{ + Size spaceUsed = hashtable->spaceUsed; + + /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */ + spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple); + + /* Account for memory used for batch files (inner + outer) */ + spaceUsed += Min(hashtable->nbatch, hashtable->nbatch_inmemory) * + sizeof(PGAlignedBlock) * 2; + + /* Account for slice files (inner + outer) */ + spaceUsed += (hashtable->nbatch / hashtable->nbatch_inmemory) * + sizeof(PGAlignedBlock) * 2; + + if (spaceUsed > hashtable->spacePeak) + hashtable->spacePeak = spaceUsed; +} + /* * ExecScanHashBucket * scan a hash bucket for matches to the current outer tuple @@ -2272,8 +2422,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse) + mcvsToUse * sizeof(int); hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *) + mcvsToUse * sizeof(int); - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + + /* refresh info about peak used memory */ + ExecHashUpdateSpacePeak(hashtable); /* * Create a skew bucket for each MCV hash value. @@ -2322,8 +2473,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse) hashtable->nSkewBuckets++; hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD; hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD; - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + + /* refresh info about peak used memory */ + ExecHashUpdateSpacePeak(hashtable); } free_attstatsslot(&sslot); @@ -2411,8 +2563,10 @@ ExecHashSkewTableInsert(HashJoinTable hashtable, /* Account for space used, and back off if we've used too much */ hashtable->spaceUsed += hashTupleSize; hashtable->spaceUsedSkew += hashTupleSize; - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + + /* refresh info about peak used memory */ + ExecHashUpdateSpacePeak(hashtable); + while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew) ExecHashRemoveNextSkewBucket(hashtable); @@ -2488,10 +2642,17 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable) } else { + BufFile **batchFile; + /* Put the tuple into a temp file for later batches */ Assert(batchno > hashtable->curbatch); + + batchFile = ExecHashGetBatchFile(hashtable, batchno, + hashtable->innerBatchFile, + &hashtable->innerOverflowFile); + ExecHashJoinSaveTuple(tuple, hashvalue, - &hashtable->innerBatchFile[batchno]); + batchFile); pfree(hashTuple); hashtable->spaceUsed -= tupleSize; hashtable->spaceUsedSkew -= tupleSize; @@ -2640,6 +2801,7 @@ ExecHashGetInstrumentation(HashInstrumentation *instrument, instrument->nbuckets_original = hashtable->nbuckets_original; instrument->nbatch = hashtable->nbatch; instrument->nbatch_original = hashtable->nbatch_original; + instrument->nbatch_inmemory = Min(hashtable->nbatch, hashtable->nbatch_inmemory); instrument->space_peak = hashtable->spacePeak; } diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 5922e60eed..e59d4d8003 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -389,15 +389,22 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) if (batchno != hashtable->curbatch && node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO) { + BufFile **batchFile; + /* * Need to postpone this outer tuple to a later batch. * Save it in the corresponding outer-batch file. */ Assert(parallel_state == NULL); Assert(batchno > hashtable->curbatch); + + batchFile = ExecHashGetBatchFile(hashtable, batchno, + hashtable->outerBatchFile, + &hashtable->outerOverflowFile); + ExecHashJoinSaveTuple(ExecFetchSlotMinimalTuple(outerTupleSlot), hashvalue, - &hashtable->outerBatchFile[batchno]); + batchFile); /* Loop around, staying in HJ_NEED_NEW_OUTER state */ continue; @@ -849,17 +856,19 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode, } else if (curbatch < hashtable->nbatch) { - BufFile *file = hashtable->outerBatchFile[curbatch]; + BufFile **file = ExecHashGetBatchFile(hashtable, curbatch, + hashtable->outerBatchFile, + &hashtable->outerOverflowFile); /* * In outer-join cases, we could get here even though the batch file * is empty. */ - if (file == NULL) + if (*file == NULL) return NULL; slot = ExecHashJoinGetSavedTuple(hjstate, - file, + *file, hashvalue, hjstate->hj_OuterTupleSlot); if (!TupIsNull(slot)) @@ -946,9 +955,18 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) BufFile *innerFile; TupleTableSlot *slot; uint32 hashvalue; + int batchidx; + int curbatch_old; nbatch = hashtable->nbatch; curbatch = hashtable->curbatch; + curbatch_old = curbatch; + + /* index of the old batch */ + batchidx = ExecHashGetBatchIndex(hashtable, curbatch); + + /* has to be in the current slice of batches */ + Assert(batchidx >= 0 && batchidx < hashtable->nbatch_inmemory); if (curbatch > 0) { @@ -956,9 +974,9 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) * We no longer need the previous outer batch file; close it right * away to free disk space. */ - if (hashtable->outerBatchFile[curbatch]) - BufFileClose(hashtable->outerBatchFile[curbatch]); - hashtable->outerBatchFile[curbatch] = NULL; + if (hashtable->outerBatchFile[batchidx]) + BufFileClose(hashtable->outerBatchFile[batchidx]); + hashtable->outerBatchFile[batchidx] = NULL; } else /* we just finished the first batch */ { @@ -992,45 +1010,50 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) * scan, we have to rescan outer batches in case they contain tuples that * need to be reassigned. */ - curbatch++; + curbatch = ExecHashSwitchToNextBatch(hashtable); + batchidx = ExecHashGetBatchIndex(hashtable, curbatch); + while (curbatch < nbatch && - (hashtable->outerBatchFile[curbatch] == NULL || - hashtable->innerBatchFile[curbatch] == NULL)) + (hashtable->outerBatchFile[batchidx] == NULL || + hashtable->innerBatchFile[batchidx] == NULL)) { - if (hashtable->outerBatchFile[curbatch] && + if (hashtable->outerBatchFile[batchidx] && HJ_FILL_OUTER(hjstate)) break; /* must process due to rule 1 */ - if (hashtable->innerBatchFile[curbatch] && + if (hashtable->innerBatchFile[batchidx] && HJ_FILL_INNER(hjstate)) break; /* must process due to rule 1 */ - if (hashtable->innerBatchFile[curbatch] && + if (hashtable->innerBatchFile[batchidx] && nbatch != hashtable->nbatch_original) break; /* must process due to rule 2 */ - if (hashtable->outerBatchFile[curbatch] && + if (hashtable->outerBatchFile[batchidx] && nbatch != hashtable->nbatch_outstart) break; /* must process due to rule 3 */ /* We can ignore this batch. */ /* Release associated temp files right away. */ - if (hashtable->innerBatchFile[curbatch]) - BufFileClose(hashtable->innerBatchFile[curbatch]); - hashtable->innerBatchFile[curbatch] = NULL; - if (hashtable->outerBatchFile[curbatch]) - BufFileClose(hashtable->outerBatchFile[curbatch]); - hashtable->outerBatchFile[curbatch] = NULL; - curbatch++; + if (hashtable->innerBatchFile[batchidx]) + BufFileClose(hashtable->innerBatchFile[batchidx]); + hashtable->innerBatchFile[batchidx] = NULL; + if (hashtable->outerBatchFile[batchidx]) + BufFileClose(hashtable->outerBatchFile[batchidx]); + hashtable->outerBatchFile[batchidx] = NULL; + + curbatch = ExecHashSwitchToNextBatch(hashtable); + batchidx = ExecHashGetBatchIndex(hashtable, curbatch); } if (curbatch >= nbatch) + { + hashtable->curbatch = curbatch_old; return false; /* no more batches */ - - hashtable->curbatch = curbatch; + } /* * Reload the hash table with the new inner batch (which could be empty) */ ExecHashTableReset(hashtable); - innerFile = hashtable->innerBatchFile[curbatch]; + innerFile = hashtable->innerBatchFile[batchidx]; if (innerFile != NULL) { @@ -1056,15 +1079,15 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) * needed */ BufFileClose(innerFile); - hashtable->innerBatchFile[curbatch] = NULL; + hashtable->innerBatchFile[batchidx] = NULL; } /* * Rewind outer batch file (if present), so that we can start reading it. */ - if (hashtable->outerBatchFile[curbatch] != NULL) + if (hashtable->outerBatchFile[batchidx] != NULL) { - if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, SEEK_SET)) + if (BufFileSeek(hashtable->outerBatchFile[batchidx], 0, 0L, SEEK_SET)) ereport(ERROR, (errcode_for_file_access(), errmsg("could not rewind hash-join temporary file: %m"))); diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index c7400941ee..e324869c09 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -3170,6 +3170,7 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace, int num_hashclauses = list_length(hashclauses); int numbuckets; int numbatches; + int numbatches_inmemory; int num_skew_mcvs; size_t space_allowed; /* unused */ @@ -3219,6 +3220,7 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace, &space_allowed, &numbuckets, &numbatches, + &numbatches_inmemory, &num_skew_mcvs); /* diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index a9f9872a78..ef60df5024 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -308,6 +308,7 @@ typedef struct HashJoinTableData int *skewBucketNums; /* array indexes of active skew buckets */ int nbatch; /* number of batches */ + int nbatch_inmemory; /* max number of in-memory batches */ int curbatch; /* current batch #; 0 during 1st pass */ int nbatch_original; /* nbatch when we started inner scan */ @@ -329,6 +330,9 @@ typedef struct HashJoinTableData BufFile **innerBatchFile; /* buffered virtual temp file per batch */ BufFile **outerBatchFile; /* buffered virtual temp file per batch */ + BufFile *innerOverflowFile; /* temp file for overflow batch batch */ + BufFile *outerOverflowFile; /* temp file for overflow batch batch */ + /* * Info about the datatype-specific hash functions for the datatypes being * hashed. These are arrays of the same length as the number of hash join diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h index 8d700c06c5..78389bc0cf 100644 --- a/src/include/executor/nodeHash.h +++ b/src/include/executor/nodeHash.h @@ -16,6 +16,7 @@ #include "access/parallel.h" #include "nodes/execnodes.h" +#include "storage/buffile.h" struct SharedHashJoinBatch; @@ -53,6 +54,11 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno); +extern int ExecHashGetBatchIndex(HashJoinTable hashtable, int batchno); +extern BufFile **ExecHashGetBatchFile(HashJoinTable hashtable, int batchno, + BufFile **files, BufFile **overflow); +extern void ExecHashSwitchToNextBatchSlice(HashJoinTable hashtable); +extern int ExecHashSwitchToNextBatch(HashJoinTable hashtable); extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext); extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext); extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate); @@ -66,6 +72,7 @@ extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, size_t *space_allowed, int *numbuckets, int *numbatches, + int *numbatches_inmemory, int *num_skew_mcvs); extern int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue); extern void ExecHashEstimate(HashState *node, ParallelContext *pcxt); diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 9959c9e31f..6c53c5abd2 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -2115,6 +2115,7 @@ typedef struct HashInstrumentation int nbuckets_original; /* planned number of buckets */ int nbatch; /* number of batches at end of execution */ int nbatch_original; /* planned number of batches */ + int nbatch_inmemory; /* number of batches kept in memory */ size_t space_peak; /* speak memory usage in bytes */ } HashInstrumentation;
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 799a22e9d5..c957043599 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -2612,6 +2612,8 @@ show_hash_info(HashState *hashstate, ExplainState *es) hinstrument.nbatch, es); ExplainPropertyInteger("Original Hash Batches", NULL, hinstrument.nbatch_original, es); + ExplainPropertyInteger("In-Memory Hash Batches", NULL, + hinstrument.nbatch_original, es); ExplainPropertyInteger("Peak Memory Usage", "kB", spacePeakKb, es); } @@ -2619,21 +2621,38 @@ show_hash_info(HashState *hashstate, ExplainState *es) hinstrument.nbuckets_original != hinstrument.nbuckets) { appendStringInfoSpaces(es->str, es->indent * 2); - appendStringInfo(es->str, - "Buckets: %d (originally %d) Batches: %d (originally %d) Memory Usage: %ldkB\n", - hinstrument.nbuckets, - hinstrument.nbuckets_original, - hinstrument.nbatch, - hinstrument.nbatch_original, - spacePeakKb); + if (hinstrument.nbatch != hinstrument.nbatch_inmemory) + appendStringInfo(es->str, + "Buckets: %d (originally %d) Batches: %d (originally %d, in-memory %d) Memory Usage: %ldkB\n", + hinstrument.nbuckets, + hinstrument.nbuckets_original, + hinstrument.nbatch, + hinstrument.nbatch_original, + hinstrument.nbatch_inmemory, + spacePeakKb); + else + appendStringInfo(es->str, + "Buckets: %d (originally %d) Batches: %d (originally %d) Memory Usage: %ldkB\n", + hinstrument.nbuckets, + hinstrument.nbuckets_original, + hinstrument.nbatch, + hinstrument.nbatch_original, + spacePeakKb); } else { appendStringInfoSpaces(es->str, es->indent * 2); - appendStringInfo(es->str, - "Buckets: %d Batches: %d Memory Usage: %ldkB\n", - hinstrument.nbuckets, hinstrument.nbatch, - spacePeakKb); + if (hinstrument.nbatch != hinstrument.nbatch_inmemory) + appendStringInfo(es->str, + "Buckets: %d Batches: %d (in-memory: %d) Memory Usage: %ldkB\n", + hinstrument.nbuckets, hinstrument.nbatch, + hinstrument.nbatch_inmemory, + spacePeakKb); + else + appendStringInfo(es->str, + "Buckets: %d Batches: %d Memory Usage: %ldkB\n", + hinstrument.nbuckets, hinstrument.nbatch, + spacePeakKb); } } } diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 6ffaa751f2..4364eb7cdd 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -80,6 +80,7 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable, static void ExecParallelHashMergeCounters(HashJoinTable hashtable); static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable); +static void ExecHashUpdateSpacePeak(HashJoinTable hashtable); /* ---------------------------------------------------------------- * ExecHash @@ -193,10 +194,8 @@ MultiExecPrivateHash(HashState *node) if (hashtable->nbuckets != hashtable->nbuckets_optimal) ExecHashIncreaseNumBuckets(hashtable); - /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */ - hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple); - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + /* refresh info about peak used memory */ + ExecHashUpdateSpacePeak(hashtable); hashtable->partialTuples = hashtable->totalTuples; } @@ -433,6 +432,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls) size_t space_allowed; int nbuckets; int nbatch; + int nbatch_inmemory; double rows; int num_skew_mcvs; int log2_nbuckets; @@ -462,7 +462,8 @@ ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls) state->parallel_state != NULL ? state->parallel_state->nparticipants - 1 : 0, &space_allowed, - &nbuckets, &nbatch, &num_skew_mcvs); + &nbuckets, &nbatch, &nbatch_inmemory, + &num_skew_mcvs); /* nbuckets must be a power of 2 */ log2_nbuckets = my_log2(nbuckets); @@ -489,6 +490,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls) hashtable->nSkewBuckets = 0; hashtable->skewBucketNums = NULL; hashtable->nbatch = nbatch; + hashtable->nbatch_inmemory = nbatch_inmemory; hashtable->curbatch = 0; hashtable->nbatch_original = nbatch; hashtable->nbatch_outstart = nbatch; @@ -498,6 +500,8 @@ ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls) hashtable->skewTuples = 0; hashtable->innerBatchFile = NULL; hashtable->outerBatchFile = NULL; + hashtable->innerOverflowFiles = NULL; + hashtable->outerOverflowFiles = NULL; hashtable->spaceUsed = 0; hashtable->spacePeak = 0; hashtable->spaceAllowed = space_allowed; @@ -559,16 +563,30 @@ ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls) if (nbatch > 1 && hashtable->parallel_state == NULL) { + int cnt = Min(nbatch, nbatch_inmemory); + /* * allocate and initialize the file arrays in hashCxt (not needed for * parallel case which uses shared tuplestores instead of raw files) */ hashtable->innerBatchFile = (BufFile **) - palloc0(nbatch * sizeof(BufFile *)); + palloc0(cnt * sizeof(BufFile *)); hashtable->outerBatchFile = (BufFile **) - palloc0(nbatch * sizeof(BufFile *)); + palloc0(cnt * sizeof(BufFile *)); /* The files will not be opened until needed... */ /* ... but make sure we have temp tablespaces established for them */ + + /* also allocate files for overflow batches */ + if (nbatch > nbatch_inmemory) + { + int nslices = (nbatch / nbatch_inmemory); + + hashtable->innerOverflowFiles = (BufFile **) + palloc0(nslices * sizeof(BufFile *)); + hashtable->outerOverflowFiles = (BufFile **) + palloc0(nslices * sizeof(BufFile *)); + } + PrepareTempTablespaces(); } @@ -665,6 +683,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, size_t *space_allowed, int *numbuckets, int *numbatches, + int *numbatches_inmemory, int *num_skew_mcvs) { int tupsize; @@ -675,6 +694,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, long max_pointers; long mppow2; int nbatch = 1; + int nbatch_inmemory = 1; int nbuckets; double dbuckets; @@ -795,6 +815,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, space_allowed, numbuckets, numbatches, + numbatches_inmemory, num_skew_mcvs); return; } @@ -831,11 +852,22 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, nbatch <<= 1; } + /* + * See how many batches we can fit into memory (driven mostly by size + * of BufFile, with PGAlignedBlock being the largest part of that). + * We need one BufFile for inner and outer side, so we count it twice + * for each batch, and we stop once we exceed (work_mem/2). + */ + while ((nbatch_inmemory * 2) * sizeof(PGAlignedBlock) * 2 + <= (work_mem * 1024L / 2)) + nbatch_inmemory *= 2; + Assert(nbuckets > 0); Assert(nbatch > 0); *numbuckets = nbuckets; *numbatches = nbatch; + *numbatches_inmemory = nbatch_inmemory; } @@ -857,13 +889,27 @@ ExecHashTableDestroy(HashJoinTable hashtable) */ if (hashtable->innerBatchFile != NULL) { - for (i = 1; i < hashtable->nbatch; i++) + int n = Min(hashtable->nbatch, hashtable->nbatch_inmemory); + + for (i = 1; i < n; i++) { if (hashtable->innerBatchFile[i]) BufFileClose(hashtable->innerBatchFile[i]); if (hashtable->outerBatchFile[i]) BufFileClose(hashtable->outerBatchFile[i]); } + + /* number of batch slices */ + n = hashtable->nbatch / hashtable->nbatch_inmemory; + + for (i = 1; i < n; i++) + { + if (hashtable->innerOverflowFiles[i]) + BufFileClose(hashtable->innerOverflowFiles[i]); + + if (hashtable->outerOverflowFiles[i]) + BufFileClose(hashtable->outerOverflowFiles[i]); + } } /* Release working memory (batchCxt is a child, so it goes away too) */ @@ -909,6 +955,8 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) if (hashtable->innerBatchFile == NULL) { + /* XXX nbatch=1, no need to deal with nbatch_inmemory here */ + /* we had no file arrays before */ hashtable->innerBatchFile = (BufFile **) palloc0(nbatch * sizeof(BufFile *)); @@ -919,15 +967,50 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) } else { + int nbatch_tmp = Min(nbatch, hashtable->nbatch_inmemory); + /* enlarge arrays and zero out added entries */ hashtable->innerBatchFile = (BufFile **) - repalloc(hashtable->innerBatchFile, nbatch * sizeof(BufFile *)); + repalloc(hashtable->innerBatchFile, nbatch_tmp * sizeof(BufFile *)); hashtable->outerBatchFile = (BufFile **) - repalloc(hashtable->outerBatchFile, nbatch * sizeof(BufFile *)); - MemSet(hashtable->innerBatchFile + oldnbatch, 0, - (nbatch - oldnbatch) * sizeof(BufFile *)); - MemSet(hashtable->outerBatchFile + oldnbatch, 0, - (nbatch - oldnbatch) * sizeof(BufFile *)); + repalloc(hashtable->outerBatchFile, nbatch_tmp * sizeof(BufFile *)); + + if (oldnbatch < nbatch_tmp) + { + MemSet(hashtable->innerBatchFile + oldnbatch, 0, + (nbatch_tmp - oldnbatch) * sizeof(BufFile *)); + MemSet(hashtable->outerBatchFile + oldnbatch, 0, + (nbatch_tmp - oldnbatch) * sizeof(BufFile *)); + } + + if (nbatch_tmp > hashtable->nbatch_inmemory) + { + int nslices = (nbatch / hashtable->nbatch_inmemory); + + if (hashtable->innerOverflowFiles == NULL) + { + hashtable->innerOverflowFiles = (BufFile **) + palloc0(nslices * sizeof(BufFile *)); + hashtable->outerOverflowFiles = (BufFile **) + palloc0(nslices * sizeof(BufFile *)); + } + else + { + hashtable->innerOverflowFiles = (BufFile **) + repalloc(hashtable->innerOverflowFiles, + nslices * sizeof(BufFile *)); + hashtable->outerOverflowFiles = (BufFile **) + repalloc(hashtable->outerOverflowFiles, + nslices * sizeof(BufFile *)); + + /* we double the number of batches, so we know the old + * value was nslices/2 exactly */ + memset(hashtable->innerOverflowFiles + nslices/2, 0, + (nslices/2) * sizeof(BufFile *)); + memset(hashtable->outerOverflowFiles + nslices/2, 0, + (nslices/2) * sizeof(BufFile *)); + } + } } MemoryContextSwitchTo(oldcxt); @@ -999,11 +1082,18 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) } else { + BufFile **batchFile; + /* dump it out */ Assert(batchno > curbatch); + + batchFile = ExecHashGetBatchFile(hashtable, batchno, + hashtable->innerBatchFile, + hashtable->innerOverflowFiles); + ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple), hashTuple->hashvalue, - &hashtable->innerBatchFile[batchno]); + batchFile); hashtable->spaceUsed -= hashTupleSize; nfreed++; @@ -1647,22 +1737,33 @@ ExecHashTableInsert(HashJoinTable hashtable, /* Account for space used, and back off if we've used too much */ hashtable->spaceUsed += hashTupleSize; - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + + /* refresh info about peak used memory */ + ExecHashUpdateSpacePeak(hashtable); + + /* Consider increasing number of batches if we filled work_mem. */ if (hashtable->spaceUsed + - hashtable->nbuckets_optimal * sizeof(HashJoinTuple) + hashtable->nbuckets_optimal * sizeof(HashJoinTuple) + + Min(hashtable->nbatch, hashtable->nbatch_inmemory) * sizeof(PGAlignedBlock) * 2 /* inner + outer */ > hashtable->spaceAllowed) ExecHashIncreaseNumBatches(hashtable); } else { + BufFile **batchFile; + /* * put the tuple into a temp file for later batches */ Assert(batchno > hashtable->curbatch); + + batchFile = ExecHashGetBatchFile(hashtable, batchno, + hashtable->innerBatchFile, + hashtable->innerOverflowFiles); + ExecHashJoinSaveTuple(tuple, hashvalue, - &hashtable->innerBatchFile[batchno]); + batchFile); } } @@ -1893,6 +1994,108 @@ ExecHashGetBucketAndBatch(HashJoinTable hashtable, } } +int +ExecHashGetBatchIndex(HashJoinTable hashtable, int batchno) +{ + int slice, + curslice; + + if (hashtable->nbatch <= hashtable->nbatch_inmemory) + return batchno; + + slice = batchno / hashtable->nbatch_inmemory; + curslice = hashtable->curbatch / hashtable->nbatch_inmemory; + + /* slices can't go backwards */ + Assert(slice >= curslice); + + /* overflow slice */ + if (slice > curslice) + return -1; + + /* current slice, compute index in the current array */ + return (batchno % hashtable->nbatch_inmemory); +} + +BufFile ** +ExecHashGetBatchFile(HashJoinTable hashtable, int batchno, + BufFile **batchFiles, BufFile **overflowFiles) +{ + int idx = ExecHashGetBatchIndex(hashtable, batchno); + + /* get the right overflow file */ + if (idx == -1) + { + int slice = (batchno / hashtable->nbatch_inmemory); + + return &overflowFiles[slice]; + } + + /* batch file in the current slice */ + return &batchFiles[idx]; +} + +void +ExecHashSwitchToNextBatchSlice(HashJoinTable hashtable) +{ + int slice = (hashtable->curbatch / hashtable->nbatch_inmemory); + + memset(hashtable->innerBatchFile, 0, + hashtable->nbatch_inmemory * sizeof(BufFile *)); + + hashtable->innerBatchFile[0] = hashtable->innerOverflowFiles[slice]; + hashtable->innerOverflowFiles[slice] = NULL; + + memset(hashtable->outerBatchFile, 0, + hashtable->nbatch_inmemory * sizeof(BufFile *)); + + hashtable->outerBatchFile[0] = hashtable->outerOverflowFiles[slice]; + hashtable->outerOverflowFiles[slice] = NULL; +} + +int +ExecHashSwitchToNextBatch(HashJoinTable hashtable) +{ + int batchidx; + + hashtable->curbatch++; + + /* see if we skipped to the next batch slice */ + batchidx = ExecHashGetBatchIndex(hashtable, hashtable->curbatch); + + /* Can't be -1, current batch is in the current slice by definition. */ + Assert(batchidx >= 0 && batchidx < hashtable->nbatch_inmemory); + + /* + * If we skipped to the next slice of batches, reset the array of files + * and use the overflow file as the first batch. + */ + if (batchidx == 0) + ExecHashSwitchToNextBatchSlice(hashtable); + + return hashtable->curbatch; +} + +static void +ExecHashUpdateSpacePeak(HashJoinTable hashtable) +{ + Size spaceUsed = hashtable->spaceUsed; + + /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */ + spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple); + + /* Account for memory used for batch files (inner + outer) */ + spaceUsed += Min(hashtable->nbatch, hashtable->nbatch_inmemory) * + sizeof(PGAlignedBlock) * 2; + + /* Account for slice files (inner + outer) */ + spaceUsed += (hashtable->nbatch / hashtable->nbatch_inmemory) * + sizeof(PGAlignedBlock) * 2; + + if (spaceUsed > hashtable->spacePeak) + hashtable->spacePeak = spaceUsed; +} + /* * ExecScanHashBucket * scan a hash bucket for matches to the current outer tuple @@ -2272,8 +2475,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse) + mcvsToUse * sizeof(int); hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *) + mcvsToUse * sizeof(int); - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + + /* refresh info about peak used memory */ + ExecHashUpdateSpacePeak(hashtable); /* * Create a skew bucket for each MCV hash value. @@ -2322,8 +2526,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse) hashtable->nSkewBuckets++; hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD; hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD; - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + + /* refresh info about peak used memory */ + ExecHashUpdateSpacePeak(hashtable); } free_attstatsslot(&sslot); @@ -2411,8 +2616,10 @@ ExecHashSkewTableInsert(HashJoinTable hashtable, /* Account for space used, and back off if we've used too much */ hashtable->spaceUsed += hashTupleSize; hashtable->spaceUsedSkew += hashTupleSize; - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + + /* refresh info about peak used memory */ + ExecHashUpdateSpacePeak(hashtable); + while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew) ExecHashRemoveNextSkewBucket(hashtable); @@ -2488,10 +2695,17 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable) } else { + BufFile **batchFile; + /* Put the tuple into a temp file for later batches */ Assert(batchno > hashtable->curbatch); + + batchFile = ExecHashGetBatchFile(hashtable, batchno, + hashtable->innerBatchFile, + hashtable->innerOverflowFiles); + ExecHashJoinSaveTuple(tuple, hashvalue, - &hashtable->innerBatchFile[batchno]); + batchFile); pfree(hashTuple); hashtable->spaceUsed -= tupleSize; hashtable->spaceUsedSkew -= tupleSize; @@ -2640,6 +2854,7 @@ ExecHashGetInstrumentation(HashInstrumentation *instrument, instrument->nbuckets_original = hashtable->nbuckets_original; instrument->nbatch = hashtable->nbatch; instrument->nbatch_original = hashtable->nbatch_original; + instrument->nbatch_inmemory = Min(hashtable->nbatch, hashtable->nbatch_inmemory); instrument->space_peak = hashtable->spacePeak; } diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 5922e60eed..a8db71925b 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -389,15 +389,22 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) if (batchno != hashtable->curbatch && node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO) { + BufFile **batchFile; + /* * Need to postpone this outer tuple to a later batch. * Save it in the corresponding outer-batch file. */ Assert(parallel_state == NULL); Assert(batchno > hashtable->curbatch); + + batchFile = ExecHashGetBatchFile(hashtable, batchno, + hashtable->outerBatchFile, + hashtable->outerOverflowFiles); + ExecHashJoinSaveTuple(ExecFetchSlotMinimalTuple(outerTupleSlot), hashvalue, - &hashtable->outerBatchFile[batchno]); + batchFile); /* Loop around, staying in HJ_NEED_NEW_OUTER state */ continue; @@ -849,17 +856,19 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode, } else if (curbatch < hashtable->nbatch) { - BufFile *file = hashtable->outerBatchFile[curbatch]; + BufFile **file = ExecHashGetBatchFile(hashtable, curbatch, + hashtable->outerBatchFile, + hashtable->outerOverflowFiles); /* * In outer-join cases, we could get here even though the batch file * is empty. */ - if (file == NULL) + if (*file == NULL) return NULL; slot = ExecHashJoinGetSavedTuple(hjstate, - file, + *file, hashvalue, hjstate->hj_OuterTupleSlot); if (!TupIsNull(slot)) @@ -946,9 +955,18 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) BufFile *innerFile; TupleTableSlot *slot; uint32 hashvalue; + int batchidx; + int curbatch_old; nbatch = hashtable->nbatch; curbatch = hashtable->curbatch; + curbatch_old = curbatch; + + /* index of the old batch */ + batchidx = ExecHashGetBatchIndex(hashtable, curbatch); + + /* has to be in the current slice of batches */ + Assert(batchidx >= 0 && batchidx < hashtable->nbatch_inmemory); if (curbatch > 0) { @@ -956,9 +974,9 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) * We no longer need the previous outer batch file; close it right * away to free disk space. */ - if (hashtable->outerBatchFile[curbatch]) - BufFileClose(hashtable->outerBatchFile[curbatch]); - hashtable->outerBatchFile[curbatch] = NULL; + if (hashtable->outerBatchFile[batchidx]) + BufFileClose(hashtable->outerBatchFile[batchidx]); + hashtable->outerBatchFile[batchidx] = NULL; } else /* we just finished the first batch */ { @@ -992,45 +1010,50 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) * scan, we have to rescan outer batches in case they contain tuples that * need to be reassigned. */ - curbatch++; + curbatch = ExecHashSwitchToNextBatch(hashtable); + batchidx = ExecHashGetBatchIndex(hashtable, curbatch); + while (curbatch < nbatch && - (hashtable->outerBatchFile[curbatch] == NULL || - hashtable->innerBatchFile[curbatch] == NULL)) + (hashtable->outerBatchFile[batchidx] == NULL || + hashtable->innerBatchFile[batchidx] == NULL)) { - if (hashtable->outerBatchFile[curbatch] && + if (hashtable->outerBatchFile[batchidx] && HJ_FILL_OUTER(hjstate)) break; /* must process due to rule 1 */ - if (hashtable->innerBatchFile[curbatch] && + if (hashtable->innerBatchFile[batchidx] && HJ_FILL_INNER(hjstate)) break; /* must process due to rule 1 */ - if (hashtable->innerBatchFile[curbatch] && + if (hashtable->innerBatchFile[batchidx] && nbatch != hashtable->nbatch_original) break; /* must process due to rule 2 */ - if (hashtable->outerBatchFile[curbatch] && + if (hashtable->outerBatchFile[batchidx] && nbatch != hashtable->nbatch_outstart) break; /* must process due to rule 3 */ /* We can ignore this batch. */ /* Release associated temp files right away. */ - if (hashtable->innerBatchFile[curbatch]) - BufFileClose(hashtable->innerBatchFile[curbatch]); - hashtable->innerBatchFile[curbatch] = NULL; - if (hashtable->outerBatchFile[curbatch]) - BufFileClose(hashtable->outerBatchFile[curbatch]); - hashtable->outerBatchFile[curbatch] = NULL; - curbatch++; + if (hashtable->innerBatchFile[batchidx]) + BufFileClose(hashtable->innerBatchFile[batchidx]); + hashtable->innerBatchFile[batchidx] = NULL; + if (hashtable->outerBatchFile[batchidx]) + BufFileClose(hashtable->outerBatchFile[batchidx]); + hashtable->outerBatchFile[batchidx] = NULL; + + curbatch = ExecHashSwitchToNextBatch(hashtable); + batchidx = ExecHashGetBatchIndex(hashtable, curbatch); } if (curbatch >= nbatch) + { + hashtable->curbatch = curbatch_old; return false; /* no more batches */ - - hashtable->curbatch = curbatch; + } /* * Reload the hash table with the new inner batch (which could be empty) */ ExecHashTableReset(hashtable); - innerFile = hashtable->innerBatchFile[curbatch]; + innerFile = hashtable->innerBatchFile[batchidx]; if (innerFile != NULL) { @@ -1056,15 +1079,15 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) * needed */ BufFileClose(innerFile); - hashtable->innerBatchFile[curbatch] = NULL; + hashtable->innerBatchFile[batchidx] = NULL; } /* * Rewind outer batch file (if present), so that we can start reading it. */ - if (hashtable->outerBatchFile[curbatch] != NULL) + if (hashtable->outerBatchFile[batchidx] != NULL) { - if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, SEEK_SET)) + if (BufFileSeek(hashtable->outerBatchFile[batchidx], 0, 0L, SEEK_SET)) ereport(ERROR, (errcode_for_file_access(), errmsg("could not rewind hash-join temporary file: %m"))); diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index c7400941ee..e324869c09 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -3170,6 +3170,7 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace, int num_hashclauses = list_length(hashclauses); int numbuckets; int numbatches; + int numbatches_inmemory; int num_skew_mcvs; size_t space_allowed; /* unused */ @@ -3219,6 +3220,7 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace, &space_allowed, &numbuckets, &numbatches, + &numbatches_inmemory, &num_skew_mcvs); /* diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index a9f9872a78..311a0980ee 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -308,6 +308,7 @@ typedef struct HashJoinTableData int *skewBucketNums; /* array indexes of active skew buckets */ int nbatch; /* number of batches */ + int nbatch_inmemory; /* max number of in-memory batches */ int curbatch; /* current batch #; 0 during 1st pass */ int nbatch_original; /* nbatch when we started inner scan */ @@ -329,6 +330,9 @@ typedef struct HashJoinTableData BufFile **innerBatchFile; /* buffered virtual temp file per batch */ BufFile **outerBatchFile; /* buffered virtual temp file per batch */ + BufFile **innerOverflowFiles; /* temp file for inner overflow batches */ + BufFile **outerOverflowFiles; /* temp file for outer overflow batches */ + /* * Info about the datatype-specific hash functions for the datatypes being * hashed. These are arrays of the same length as the number of hash join diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h index 8d700c06c5..bb6b24a1b4 100644 --- a/src/include/executor/nodeHash.h +++ b/src/include/executor/nodeHash.h @@ -16,6 +16,7 @@ #include "access/parallel.h" #include "nodes/execnodes.h" +#include "storage/buffile.h" struct SharedHashJoinBatch; @@ -53,6 +54,11 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno); +extern int ExecHashGetBatchIndex(HashJoinTable hashtable, int batchno); +extern BufFile **ExecHashGetBatchFile(HashJoinTable hashtable, int batchno, + BufFile **batchFiles, BufFile **overflowFiles); +extern void ExecHashSwitchToNextBatchSlice(HashJoinTable hashtable); +extern int ExecHashSwitchToNextBatch(HashJoinTable hashtable); extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext); extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext); extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate); @@ -66,6 +72,7 @@ extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, size_t *space_allowed, int *numbuckets, int *numbatches, + int *numbatches_inmemory, int *num_skew_mcvs); extern int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue); extern void ExecHashEstimate(HashState *node, ParallelContext *pcxt); diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 9959c9e31f..6c53c5abd2 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -2115,6 +2115,7 @@ typedef struct HashInstrumentation int nbuckets_original; /* planned number of buckets */ int nbatch; /* number of batches at end of execution */ int nbatch_original; /* planned number of batches */ + int nbatch_inmemory; /* number of batches kept in memory */ size_t space_peak; /* speak memory usage in bytes */ } HashInstrumentation;
Attachment:
hashjoin-test.sh
Description: Bourne shell script