Re: Out of Memory errors are frustrating as heck!

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Wed, Apr 24, 2019 at 02:36:33AM +0200, Tomas Vondra wrote:

...

I still think the idea with an "overflow batch" is worth considering,
because it'd allow us to keep the memory usage within work_mem. And
after getting familiar with the hash join code again (haven't messed
with it since 9.5 or so) I think it should not be all that difficult.
I'll give it a try over the weekend if I get bored for a while.


OK, so I took a stab at this, and overall it seems to be workable. The
patches I have are nowhere near committable, but I think the approach
works fairly well - the memory is kept in check, and the performance is
comparable to the "ballancing" approach tested before.

To explain it a bit, the idea is that we can compute how many BufFile
structures we can keep in memory - we can't use more than work_mem/2 for
that, because then we'd mostly eliminate space for the actual data. For
example with 4MB, we know we can keep 128 batches - we need 128 for
outer and inner side, so 256 in total, and 256*8kB = 2MB.

And then, we just increase the number of batches but instead of adding
the BufFile entries, we split batches into slices that we can keep in
memory (say, the 128 batches). And we keep BufFiles for the current one
and an "overflow file" for the other slices. After processing a slice,
we simply switch to the next one, and use the overflow file as a temp
file for the first batch - we redistribute it into the other batches in
the slice and another overflow file.

That's what the v3 patch (named 'single overflow file') does. I does
work, but unfortunately it significantly inflates the amount of data
written to temporary files. Assume we need e.g. 1024 batches, but only
128 fit into memory. That means we'll need 8 slices, and during the
first pass we'll handle 1/8 of the data and write 7/8 to the overflow
file.  Then after processing the slice and switching to the next one, we
repeat this dance - 1/8 gets processed, 6/8 written to another overflow
file. So essentially we "forward" about

   7/8 + 6/8 + 5/8 + ... + 1/8 = 28/8 = 3.5

of data between slices, and we need to re-shuffle data in each slice,
which amounts to additional 1x data. That's pretty significant overhead,
as will be clear from the measurements I'll present shortly.

But luckily, there's a simple solution to this - instead of writing the
data into a single overflow file, we can create one overflow file for
each slice. That will leave us with the ~1x of additional writes when
distributing data into batches in the current slice, but it eliminates
the main source of write amplification - awalanche-like forwarding of
data between slices.

This relaxes the memory limit a bit again, because we can't really keep
the number of overflow files constrained by work_mem, but we should only
need few of them (much less than when adding one file per batch right
away). For example with 128 in-memory batches, this reduces the amount
of necessary memory 128x.

And this is what v4 (per-slice overflow file) does, pretty much.


Two more comments, regarding memory accounting in previous patches. It
was a bit broken, because we actually need 2x the number of BufFiles. We
needed nbatch files for outer side and nbatch files for inner side, but
we only considered one of those - both when deciding when to increase
the number of batches / increase spaceAllowed, and when reporting the
memory usage. So with large number of batches the reported amount of
used memory was roughly 1/2 of the actual value :-/

The memory accounting was a bit bogus for another reason - spaceUsed
simply tracks the amount of memory for hash table contents. But at the
end we were simply adding the current space for BufFile stuff, ignoring
the fact that that's likely much larger than when the spacePeak value
got stored. For example we might have kept early spaceUsed when it was
almost work_mem, and then added the final large BufFile allocation.

I've fixed both issues in the patches attached to this message. It does
not make a huge difference in practice, but it makes it easier to
compare values between patches.


Now, some test results - I've repeated the simple test with uniform data
set, which is pretty much ideal for hash joins (no unexlectedly large
batches that can't be split, etc.). I've done this with 1M, 5M, 10M, 25M
and 50M rows in the large table (which gets picked for the "hash" side),
and measured how much memory gets used, how many batches, how long it
takes and how much data gets written to temp files.

See the hashjoin-test.sh script for more details.

So, here are the results with work_mem = 4MB (so the number of in-memory
batches for the last two entries is 128). The columns are:

* nbatch - the final number of batches
* memory - memory usage, as reported by explain analyze
* time - duration of the query (without explain analyze) in seconds
* size - size of the large table
* temp - amount of data written to temp files
* amplif - write amplification (temp / size)


 1M rows
 ===================================================================
                 nbatch  memory   time  size (MB)  temp (MB)  amplif
 -------------------------------------------------------------------
 master             256    7681    3.3        730        899    1.23
 rebalance          256    7711    3.3        730        884    1.21
 single file       1024    4161    7.2        730       3168    4.34
 per-slice file    1024    4161    4.7        730       1653    2.26


 5M rows
 ===================================================================
                 nbatch  memory   time  size (MB)  temp (MB)  amplif
 -------------------------------------------------------------------
 master            2048   36353     22       3652       5276    1.44
 rebalance          512   16515     18       3652       4169    1.14
 single file       4096    4353    156       3652      53897   14.76
 per-slice file    4096    4353     28       3652       8106    2.21


 10M rows
 ===================================================================
                 nbatch  memory   time  size (MB)  temp (MB)  amplif
 -------------------------------------------------------------------
 master            4096   69121     61       7303      10556    1.45
 rebalance          512   24326     46       7303       7405    1.01
 single file       8192    4636    762       7303     211234   28.92
 per-slice file    8192    4636     65       7303      16278    2.23


 25M rows
 ===================================================================
                 nbatch  memory   time  size (MB)  temp (MB)  amplif
 -------------------------------------------------------------------
 master            8192  134657    190       7303      24279    1.33
 rebalance         1024   36611    158       7303      20024    1.10
 single file      16384    6011   4054       7303    1046174   57.32
 per-slice file   16384    6011    207       7303      39073    2.14


 50M rows
 ===================================================================
                 nbatch  memory   time  size (MB)  temp (MB)  amplif
 -------------------------------------------------------------------
 master           16384  265729    531      36500      48519   1.33
 rebalance         2048   53241    447      36500      48077   1.32
 single file          -       -      -      36500          -      -
 per-slice file   32768    8125    451      36500      78662   2.16


From those numbers it's pretty clear that per-slice overflow file does
by far the best job in enforcing work_mem and minimizing the amount of
data spilled to temp files. It does write a bit more data than both
master and the simple rebalancing, but that's the cost for enforcing
work_mem more strictly. It's generally a bit slower than those two
approaches, although on the largest scale it's actually a bit faster
than master. I think that's pretty acceptable, considering this is meant
to address extreme underestimates where we currently just eat memory.

The case with single overflow file performs rather poorly - I haven't
even collected data from the largest scale, but considering it spilled
1TB of temp files with a dataset half the size, that's not an issue.
(Note that this does not mean it needs 1TB of temp space, those writes
are spread over time and the files are created/closed as we go. The
system only has ~100GB of free disk space.)


Gunther, could you try the v2 and v4 patches on your data set? That
would be an interesting data point, I think.


regards

--
Tomas Vondra                  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 6ffaa751f2..4d5a6872cc 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -80,6 +80,7 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable,
 static void ExecParallelHashMergeCounters(HashJoinTable hashtable);
 static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable);
 
+static void ExecHashUpdateSpacePeak(HashJoinTable hashtable);
 
 /* ----------------------------------------------------------------
  *		ExecHash
@@ -193,10 +194,8 @@ MultiExecPrivateHash(HashState *node)
 	if (hashtable->nbuckets != hashtable->nbuckets_optimal)
 		ExecHashIncreaseNumBuckets(hashtable);
 
-	/* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
-	hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
-	if (hashtable->spaceUsed > hashtable->spacePeak)
-		hashtable->spacePeak = hashtable->spaceUsed;
+	/* refresh info about peak used memory */
+	ExecHashUpdateSpacePeak(hashtable);
 
 	hashtable->partialTuples = hashtable->totalTuples;
 }
@@ -1647,12 +1646,56 @@ ExecHashTableInsert(HashJoinTable hashtable,
 
 		/* Account for space used, and back off if we've used too much */
 		hashtable->spaceUsed += hashTupleSize;
-		if (hashtable->spaceUsed > hashtable->spacePeak)
-			hashtable->spacePeak = hashtable->spaceUsed;
+
+		/* refresh info about peak used memory */
+		ExecHashUpdateSpacePeak(hashtable);
+
+		/*
+		 * Consider increasing number of batches.
+		 *
+		 * Each batch requires a non-trivial amount of memory, because BufFile
+		 * includes a PGAlignedBlock (typically 8kB buffer). So when doubling
+		 * the number of batches, we need to be careful and only allow that if
+		 * it actually has a chance of reducing memory usage.
+		 *
+		 * In particular, doubling the number of batches is pointless when
+		 *
+		 *		(spaceUsed / 2) < (nbatches * sizeof(BufFile))
+		 *
+		 * because we expect to save roughly 1/2 of memory currently used for
+		 * data (rows) at the price of doubling the memory used for BufFile.
+		 *
+		 * We can't stop adding batches entirely, because that would just mean
+		 * the batches would need more and more memory. So we need to increase
+		 * the number of batches, even if we can't enforce work_mem properly.
+		 * The goal is to minimize the overall memory usage of the hash join.
+		 *
+		 * Note: This applies mostly to cases of significant underestimates,
+		 * resulting in an explosion of the number of batches. The properly
+		 * estimated cases should generally end up using merge join based on
+		 * high cost of the batched hash join.
+		 */
 		if (hashtable->spaceUsed +
-			hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
+			hashtable->nbuckets_optimal * sizeof(HashJoinTuple) +
+			hashtable->nbatch * sizeof(PGAlignedBlock) * 2
 			> hashtable->spaceAllowed)
+		{
 			ExecHashIncreaseNumBatches(hashtable);
+
+			/*
+			 * Consider increasing the resize threshold.
+			 *
+			 * For well estimated cases this does nothing, because batches are
+			 * expected to account only for small fraction of work_mem. But if
+			 * we significantly underestimate the number of batches, we may end
+			 * up in a situation where BufFile alone exceed work_mem. So move
+			 * the threshold a bit, until the next point where it'll make sense
+			 * to consider adding batches again.
+			 */
+			hashtable->spaceAllowed
+				= Max(hashtable->spaceAllowed,
+					  hashtable->nbatch * sizeof(PGAlignedBlock) * 3);
+		}
 	}
 	else
 	{
@@ -1893,6 +1936,21 @@ ExecHashGetBucketAndBatch(HashJoinTable hashtable,
 	}
 }
 
+static void
+ExecHashUpdateSpacePeak(HashJoinTable hashtable)
+{
+	Size	spaceUsed = hashtable->spaceUsed;
+
+	/* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
+	spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
+
+	/* Account for memory used for batch files (inner + outer) */
+	spaceUsed += hashtable->nbatch * sizeof(PGAlignedBlock) * 2;
+
+	if (spaceUsed > hashtable->spacePeak)
+		hashtable->spacePeak = spaceUsed;
+}
+
 /*
  * ExecScanHashBucket
  *		scan a hash bucket for matches to the current outer tuple
@@ -2272,8 +2330,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse)
 			+ mcvsToUse * sizeof(int);
 		hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *)
 			+ mcvsToUse * sizeof(int);
-		if (hashtable->spaceUsed > hashtable->spacePeak)
-			hashtable->spacePeak = hashtable->spaceUsed;
+
+		/* refresh info about peak used memory */
+		ExecHashUpdateSpacePeak(hashtable);
 
 		/*
 		 * Create a skew bucket for each MCV hash value.
@@ -2322,8 +2381,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse)
 			hashtable->nSkewBuckets++;
 			hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD;
 			hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD;
-			if (hashtable->spaceUsed > hashtable->spacePeak)
-				hashtable->spacePeak = hashtable->spaceUsed;
+
+			/* refresh info about peak used memory */
+			ExecHashUpdateSpacePeak(hashtable);
 		}
 
 		free_attstatsslot(&sslot);
@@ -2411,8 +2471,10 @@ ExecHashSkewTableInsert(HashJoinTable hashtable,
 	/* Account for space used, and back off if we've used too much */
 	hashtable->spaceUsed += hashTupleSize;
 	hashtable->spaceUsedSkew += hashTupleSize;
-	if (hashtable->spaceUsed > hashtable->spacePeak)
-		hashtable->spacePeak = hashtable->spaceUsed;
+
+	/* refresh info about peak used memory */
+	ExecHashUpdateSpacePeak(hashtable);
+
 	while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew)
 		ExecHashRemoveNextSkewBucket(hashtable);
 
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 799a22e9d5..c957043599 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -2612,6 +2612,8 @@ show_hash_info(HashState *hashstate, ExplainState *es)
 								   hinstrument.nbatch, es);
 			ExplainPropertyInteger("Original Hash Batches", NULL,
 								   hinstrument.nbatch_original, es);
+			ExplainPropertyInteger("In-Memory Hash Batches", NULL,
+								   hinstrument.nbatch_original, es);
 			ExplainPropertyInteger("Peak Memory Usage", "kB",
 								   spacePeakKb, es);
 		}
@@ -2619,21 +2621,38 @@ show_hash_info(HashState *hashstate, ExplainState *es)
 				 hinstrument.nbuckets_original != hinstrument.nbuckets)
 		{
 			appendStringInfoSpaces(es->str, es->indent * 2);
-			appendStringInfo(es->str,
-							 "Buckets: %d (originally %d)  Batches: %d (originally %d)  Memory Usage: %ldkB\n",
-							 hinstrument.nbuckets,
-							 hinstrument.nbuckets_original,
-							 hinstrument.nbatch,
-							 hinstrument.nbatch_original,
-							 spacePeakKb);
+			if (hinstrument.nbatch != hinstrument.nbatch_inmemory)
+				appendStringInfo(es->str,
+								 "Buckets: %d (originally %d)  Batches: %d (originally %d, in-memory %d)  Memory Usage: %ldkB\n",
+								 hinstrument.nbuckets,
+								 hinstrument.nbuckets_original,
+								 hinstrument.nbatch,
+								 hinstrument.nbatch_original,
+								 hinstrument.nbatch_inmemory,
+								 spacePeakKb);
+			else
+				appendStringInfo(es->str,
+								 "Buckets: %d (originally %d)  Batches: %d (originally %d)  Memory Usage: %ldkB\n",
+								 hinstrument.nbuckets,
+								 hinstrument.nbuckets_original,
+								 hinstrument.nbatch,
+								 hinstrument.nbatch_original,
+								 spacePeakKb);
 		}
 		else
 		{
 			appendStringInfoSpaces(es->str, es->indent * 2);
-			appendStringInfo(es->str,
-							 "Buckets: %d  Batches: %d  Memory Usage: %ldkB\n",
-							 hinstrument.nbuckets, hinstrument.nbatch,
-							 spacePeakKb);
+			if (hinstrument.nbatch != hinstrument.nbatch_inmemory)
+				appendStringInfo(es->str,
+								 "Buckets: %d  Batches: %d (in-memory: %d)  Memory Usage: %ldkB\n",
+								 hinstrument.nbuckets, hinstrument.nbatch,
+								 hinstrument.nbatch_inmemory,
+								 spacePeakKb);
+			else
+				appendStringInfo(es->str,
+								 "Buckets: %d  Batches: %d  Memory Usage: %ldkB\n",
+								 hinstrument.nbuckets, hinstrument.nbatch,
+								 spacePeakKb);
 		}
 	}
 }
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 6ffaa751f2..044d360fd4 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -80,6 +80,7 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable,
 static void ExecParallelHashMergeCounters(HashJoinTable hashtable);
 static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable);
 
+static void ExecHashUpdateSpacePeak(HashJoinTable hashtable);
 
 /* ----------------------------------------------------------------
  *		ExecHash
@@ -193,10 +194,8 @@ MultiExecPrivateHash(HashState *node)
 	if (hashtable->nbuckets != hashtable->nbuckets_optimal)
 		ExecHashIncreaseNumBuckets(hashtable);
 
-	/* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
-	hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
-	if (hashtable->spaceUsed > hashtable->spacePeak)
-		hashtable->spacePeak = hashtable->spaceUsed;
+	/* refresh info about peak used memory */
+	ExecHashUpdateSpacePeak(hashtable);
 
 	hashtable->partialTuples = hashtable->totalTuples;
 }
@@ -433,6 +432,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls)
 	size_t		space_allowed;
 	int			nbuckets;
 	int			nbatch;
+	int			nbatch_inmemory;
 	double		rows;
 	int			num_skew_mcvs;
 	int			log2_nbuckets;
@@ -462,7 +462,8 @@ ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls)
 							state->parallel_state != NULL ?
 							state->parallel_state->nparticipants - 1 : 0,
 							&space_allowed,
-							&nbuckets, &nbatch, &num_skew_mcvs);
+							&nbuckets, &nbatch, &nbatch_inmemory,
+							&num_skew_mcvs);
 
 	/* nbuckets must be a power of 2 */
 	log2_nbuckets = my_log2(nbuckets);
@@ -489,6 +490,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls)
 	hashtable->nSkewBuckets = 0;
 	hashtable->skewBucketNums = NULL;
 	hashtable->nbatch = nbatch;
+	hashtable->nbatch_inmemory = nbatch_inmemory;
 	hashtable->curbatch = 0;
 	hashtable->nbatch_original = nbatch;
 	hashtable->nbatch_outstart = nbatch;
@@ -498,6 +500,8 @@ ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls)
 	hashtable->skewTuples = 0;
 	hashtable->innerBatchFile = NULL;
 	hashtable->outerBatchFile = NULL;
+	hashtable->innerOverflowFile = NULL;
+	hashtable->outerOverflowFile = NULL;
 	hashtable->spaceUsed = 0;
 	hashtable->spacePeak = 0;
 	hashtable->spaceAllowed = space_allowed;
@@ -559,14 +563,16 @@ ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls)
 
 	if (nbatch > 1 && hashtable->parallel_state == NULL)
 	{
+		int	cnt = Min(nbatch, nbatch_inmemory);
+
 		/*
 		 * allocate and initialize the file arrays in hashCxt (not needed for
 		 * parallel case which uses shared tuplestores instead of raw files)
 		 */
 		hashtable->innerBatchFile = (BufFile **)
-			palloc0(nbatch * sizeof(BufFile *));
+			palloc0(cnt * sizeof(BufFile *));
 		hashtable->outerBatchFile = (BufFile **)
-			palloc0(nbatch * sizeof(BufFile *));
+			palloc0(cnt * sizeof(BufFile *));
 		/* The files will not be opened until needed... */
 		/* ... but make sure we have temp tablespaces established for them */
 		PrepareTempTablespaces();
@@ -665,6 +671,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 						size_t *space_allowed,
 						int *numbuckets,
 						int *numbatches,
+						int *numbatches_inmemory,
 						int *num_skew_mcvs)
 {
 	int			tupsize;
@@ -675,6 +682,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 	long		max_pointers;
 	long		mppow2;
 	int			nbatch = 1;
+	int			nbatch_inmemory = 1;
 	int			nbuckets;
 	double		dbuckets;
 
@@ -795,6 +803,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 									space_allowed,
 									numbuckets,
 									numbatches,
+									numbatches_inmemory,
 									num_skew_mcvs);
 			return;
 		}
@@ -831,11 +840,22 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 			nbatch <<= 1;
 	}
 
+	/*
+	 * See how many batches we can fit into memory (driven mostly by size
+	 * of BufFile, with PGAlignedBlock being the largest part of that).
+	 * We need one BufFile for inner and outer side, so we count it twice
+	 * for each batch, and we stop once we exceed (work_mem/2).
+	 */
+	while ((nbatch_inmemory * 2) * sizeof(PGAlignedBlock) * 2
+			<= (work_mem * 1024L / 2))
+		nbatch_inmemory *= 2;
+
 	Assert(nbuckets > 0);
 	Assert(nbatch > 0);
 
 	*numbuckets = nbuckets;
 	*numbatches = nbatch;
+	*numbatches_inmemory = nbatch_inmemory;
 }
 
 
@@ -857,13 +877,21 @@ ExecHashTableDestroy(HashJoinTable hashtable)
 	 */
 	if (hashtable->innerBatchFile != NULL)
 	{
-		for (i = 1; i < hashtable->nbatch; i++)
+		int nbatch = Min(hashtable->nbatch, hashtable->nbatch_inmemory);
+
+		for (i = 1; i < nbatch; i++)
 		{
 			if (hashtable->innerBatchFile[i])
 				BufFileClose(hashtable->innerBatchFile[i]);
 			if (hashtable->outerBatchFile[i])
 				BufFileClose(hashtable->outerBatchFile[i]);
 		}
+
+		if (hashtable->innerOverflowFile)
+			BufFileClose(hashtable->innerOverflowFile);
+
+		if (hashtable->outerOverflowFile)
+			BufFileClose(hashtable->outerOverflowFile);
 	}
 
 	/* Release working memory (batchCxt is a child, so it goes away too) */
@@ -909,6 +937,8 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 
 	if (hashtable->innerBatchFile == NULL)
 	{
+		/* XXX nbatch=1, no need to deal with nbatch_inmemory here */
+
 		/* we had no file arrays before */
 		hashtable->innerBatchFile = (BufFile **)
 			palloc0(nbatch * sizeof(BufFile *));
@@ -919,15 +949,23 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 	}
 	else
 	{
+		int nbatch_tmp = Min(nbatch, hashtable->nbatch_inmemory);
+
 		/* enlarge arrays and zero out added entries */
 		hashtable->innerBatchFile = (BufFile **)
-			repalloc(hashtable->innerBatchFile, nbatch * sizeof(BufFile *));
+			repalloc(hashtable->innerBatchFile, nbatch_tmp * sizeof(BufFile *));
 		hashtable->outerBatchFile = (BufFile **)
-			repalloc(hashtable->outerBatchFile, nbatch * sizeof(BufFile *));
-		MemSet(hashtable->innerBatchFile + oldnbatch, 0,
-			   (nbatch - oldnbatch) * sizeof(BufFile *));
-		MemSet(hashtable->outerBatchFile + oldnbatch, 0,
-			   (nbatch - oldnbatch) * sizeof(BufFile *));
+			repalloc(hashtable->outerBatchFile, nbatch_tmp * sizeof(BufFile *));
+
+		if (oldnbatch < nbatch_tmp)
+		{
+			MemSet(hashtable->innerBatchFile + oldnbatch, 0,
+				   (nbatch_tmp - oldnbatch) * sizeof(BufFile *));
+			MemSet(hashtable->outerBatchFile + oldnbatch, 0,
+				   (nbatch_tmp - oldnbatch) * sizeof(BufFile *));
+		}
+
+		/* no need to initialize the overflow files explicitly */
 	}
 
 	MemoryContextSwitchTo(oldcxt);
@@ -999,11 +1037,18 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 			}
 			else
 			{
+				BufFile **batchFile;
+
 				/* dump it out */
 				Assert(batchno > curbatch);
+
+				batchFile = ExecHashGetBatchFile(hashtable, batchno,
+												 hashtable->innerBatchFile,
+												 &hashtable->innerOverflowFile);
+
 				ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple),
 									  hashTuple->hashvalue,
-									  &hashtable->innerBatchFile[batchno]);
+									  batchFile);
 
 				hashtable->spaceUsed -= hashTupleSize;
 				nfreed++;
@@ -1647,22 +1692,33 @@ ExecHashTableInsert(HashJoinTable hashtable,
 
 		/* Account for space used, and back off if we've used too much */
 		hashtable->spaceUsed += hashTupleSize;
-		if (hashtable->spaceUsed > hashtable->spacePeak)
-			hashtable->spacePeak = hashtable->spaceUsed;
+
+		/* refresh info about peak used memory */
+		ExecHashUpdateSpacePeak(hashtable);
+
+		/* Consider increasing number of batches if we filled work_mem. */
 		if (hashtable->spaceUsed +
-			hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
+			hashtable->nbuckets_optimal * sizeof(HashJoinTuple) +
+			Min(hashtable->nbatch, hashtable->nbatch_inmemory) * sizeof(PGAlignedBlock) * 2	/* inner + outer */
 			> hashtable->spaceAllowed)
 			ExecHashIncreaseNumBatches(hashtable);
 	}
 	else
 	{
+		BufFile **batchFile;
+
 		/*
 		 * put the tuple into a temp file for later batches
 		 */
 		Assert(batchno > hashtable->curbatch);
+
+		batchFile = ExecHashGetBatchFile(hashtable, batchno,
+										 hashtable->innerBatchFile,
+										 &hashtable->innerOverflowFile);
+
 		ExecHashJoinSaveTuple(tuple,
 							  hashvalue,
-							  &hashtable->innerBatchFile[batchno]);
+							  batchFile);
 	}
 }
 
@@ -1893,6 +1949,100 @@ ExecHashGetBucketAndBatch(HashJoinTable hashtable,
 	}
 }
 
+int
+ExecHashGetBatchIndex(HashJoinTable hashtable, int batchno)
+{
+	int	slice,
+		curslice;
+
+	if (hashtable->nbatch <= hashtable->nbatch_inmemory)
+		return batchno;
+
+	slice = batchno / hashtable->nbatch_inmemory;
+	curslice = hashtable->curbatch / hashtable->nbatch_inmemory;
+
+	/* slices can't go backwards */
+	Assert(slice >= curslice);
+
+	/* overflow slice */
+	if (slice > curslice)
+		return -1;
+
+	/* current slice, compute index in the current array */
+	return (batchno % hashtable->nbatch_inmemory);
+}
+
+BufFile **
+ExecHashGetBatchFile(HashJoinTable hashtable, int batchno,
+					 BufFile **batchFiles, BufFile **overflowFile)
+{
+	int		idx = ExecHashGetBatchIndex(hashtable, batchno);
+
+	if (idx == -1)
+		return overflowFile;
+
+	return &batchFiles[idx];
+}
+
+void
+ExecHashSwitchToNextBatchSlice(HashJoinTable hashtable)
+{
+	memset(hashtable->innerBatchFile, 0,
+		   hashtable->nbatch_inmemory * sizeof(BufFile *));
+
+	hashtable->innerBatchFile[0] = hashtable->innerOverflowFile;
+	hashtable->innerOverflowFile = NULL;
+
+	memset(hashtable->outerBatchFile, 0,
+		   hashtable->nbatch_inmemory * sizeof(BufFile *));
+
+	hashtable->outerBatchFile[0] = hashtable->outerOverflowFile;
+	hashtable->outerOverflowFile = NULL;
+}
+
+int
+ExecHashSwitchToNextBatch(HashJoinTable hashtable)
+{
+	int		batchidx;
+
+	hashtable->curbatch++;
+
+	/* see if we skipped to the next batch slice */
+	batchidx = ExecHashGetBatchIndex(hashtable, hashtable->curbatch);
+
+	/* Can't be -1, current batch is in the current slice by definition. */
+	Assert(batchidx >= 0 && batchidx < hashtable->nbatch_inmemory);
+
+	/*
+	 * If we skipped to the next slice of batches, reset the array of files
+	 * and use the overflow file as the first batch.
+	 */
+	if (batchidx == 0)
+		ExecHashSwitchToNextBatchSlice(hashtable);
+
+	return hashtable->curbatch;
+}
+
+static void
+ExecHashUpdateSpacePeak(HashJoinTable hashtable)
+{
+	Size	spaceUsed = hashtable->spaceUsed;
+
+	/* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
+	spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
+
+	/* Account for memory used for batch files (inner + outer) */
+	spaceUsed += Min(hashtable->nbatch, hashtable->nbatch_inmemory) *
+				 sizeof(PGAlignedBlock) * 2;
+
+	/* Account for slice files (inner + outer) */
+	spaceUsed += (hashtable->nbatch / hashtable->nbatch_inmemory) *
+				 sizeof(PGAlignedBlock) * 2;
+
+	if (spaceUsed > hashtable->spacePeak)
+		hashtable->spacePeak = spaceUsed;
+}
+
 /*
  * ExecScanHashBucket
  *		scan a hash bucket for matches to the current outer tuple
@@ -2272,8 +2422,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse)
 			+ mcvsToUse * sizeof(int);
 		hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *)
 			+ mcvsToUse * sizeof(int);
-		if (hashtable->spaceUsed > hashtable->spacePeak)
-			hashtable->spacePeak = hashtable->spaceUsed;
+
+		/* refresh info about peak used memory */
+		ExecHashUpdateSpacePeak(hashtable);
 
 		/*
 		 * Create a skew bucket for each MCV hash value.
@@ -2322,8 +2473,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse)
 			hashtable->nSkewBuckets++;
 			hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD;
 			hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD;
-			if (hashtable->spaceUsed > hashtable->spacePeak)
-				hashtable->spacePeak = hashtable->spaceUsed;
+
+			/* refresh info about peak used memory */
+			ExecHashUpdateSpacePeak(hashtable);
 		}
 
 		free_attstatsslot(&sslot);
@@ -2411,8 +2563,10 @@ ExecHashSkewTableInsert(HashJoinTable hashtable,
 	/* Account for space used, and back off if we've used too much */
 	hashtable->spaceUsed += hashTupleSize;
 	hashtable->spaceUsedSkew += hashTupleSize;
-	if (hashtable->spaceUsed > hashtable->spacePeak)
-		hashtable->spacePeak = hashtable->spaceUsed;
+
+	/* refresh info about peak used memory */
+	ExecHashUpdateSpacePeak(hashtable);
+
 	while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew)
 		ExecHashRemoveNextSkewBucket(hashtable);
 
@@ -2488,10 +2642,17 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
 		}
 		else
 		{
+			BufFile **batchFile;
+
 			/* Put the tuple into a temp file for later batches */
 			Assert(batchno > hashtable->curbatch);
+
+			batchFile = ExecHashGetBatchFile(hashtable, batchno,
+											 hashtable->innerBatchFile,
+											 &hashtable->innerOverflowFile);
+
 			ExecHashJoinSaveTuple(tuple, hashvalue,
-								  &hashtable->innerBatchFile[batchno]);
+								  batchFile);
 			pfree(hashTuple);
 			hashtable->spaceUsed -= tupleSize;
 			hashtable->spaceUsedSkew -= tupleSize;
@@ -2640,6 +2801,7 @@ ExecHashGetInstrumentation(HashInstrumentation *instrument,
 	instrument->nbuckets_original = hashtable->nbuckets_original;
 	instrument->nbatch = hashtable->nbatch;
 	instrument->nbatch_original = hashtable->nbatch_original;
+	instrument->nbatch_inmemory = Min(hashtable->nbatch, hashtable->nbatch_inmemory);
 	instrument->space_peak = hashtable->spacePeak;
 }
 
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 5922e60eed..e59d4d8003 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -389,15 +389,22 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 				if (batchno != hashtable->curbatch &&
 					node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO)
 				{
+					BufFile	  **batchFile;
+
 					/*
 					 * Need to postpone this outer tuple to a later batch.
 					 * Save it in the corresponding outer-batch file.
 					 */
 					Assert(parallel_state == NULL);
 					Assert(batchno > hashtable->curbatch);
+
+					batchFile = ExecHashGetBatchFile(hashtable, batchno,
+													 hashtable->outerBatchFile,
+													 &hashtable->outerOverflowFile);
+
 					ExecHashJoinSaveTuple(ExecFetchSlotMinimalTuple(outerTupleSlot),
 										  hashvalue,
-										  &hashtable->outerBatchFile[batchno]);
+										  batchFile);
 
 					/* Loop around, staying in HJ_NEED_NEW_OUTER state */
 					continue;
@@ -849,17 +856,19 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode,
 	}
 	else if (curbatch < hashtable->nbatch)
 	{
-		BufFile    *file = hashtable->outerBatchFile[curbatch];
+		BufFile    **file = ExecHashGetBatchFile(hashtable, curbatch,
+												 hashtable->outerBatchFile,
+												 &hashtable->outerOverflowFile);
 
 		/*
 		 * In outer-join cases, we could get here even though the batch file
 		 * is empty.
 		 */
-		if (file == NULL)
+		if (*file == NULL)
 			return NULL;
 
 		slot = ExecHashJoinGetSavedTuple(hjstate,
-										 file,
+										 *file,
 										 hashvalue,
 										 hjstate->hj_OuterTupleSlot);
 		if (!TupIsNull(slot))
@@ -946,9 +955,18 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
 	BufFile    *innerFile;
 	TupleTableSlot *slot;
 	uint32		hashvalue;
+	int			batchidx;
+	int			curbatch_old;
 
 	nbatch = hashtable->nbatch;
 	curbatch = hashtable->curbatch;
+	curbatch_old = curbatch;
+
+	/* index of the old batch */
+	batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
+
+	/* has to be in the current slice of batches */
+	Assert(batchidx >= 0 && batchidx < hashtable->nbatch_inmemory);
 
 	if (curbatch > 0)
 	{
@@ -956,9 +974,9 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
 		 * We no longer need the previous outer batch file; close it right
 		 * away to free disk space.
 		 */
-		if (hashtable->outerBatchFile[curbatch])
-			BufFileClose(hashtable->outerBatchFile[curbatch]);
-		hashtable->outerBatchFile[curbatch] = NULL;
+		if (hashtable->outerBatchFile[batchidx])
+			BufFileClose(hashtable->outerBatchFile[batchidx]);
+		hashtable->outerBatchFile[batchidx] = NULL;
 	}
 	else						/* we just finished the first batch */
 	{
@@ -992,45 +1010,50 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
 	 * scan, we have to rescan outer batches in case they contain tuples that
 	 * need to be reassigned.
 	 */
-	curbatch++;
+	curbatch = ExecHashSwitchToNextBatch(hashtable);
+	batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
+
 	while (curbatch < nbatch &&
-		   (hashtable->outerBatchFile[curbatch] == NULL ||
-			hashtable->innerBatchFile[curbatch] == NULL))
+		   (hashtable->outerBatchFile[batchidx] == NULL ||
+			hashtable->innerBatchFile[batchidx] == NULL))
 	{
-		if (hashtable->outerBatchFile[curbatch] &&
+		if (hashtable->outerBatchFile[batchidx] &&
 			HJ_FILL_OUTER(hjstate))
 			break;				/* must process due to rule 1 */
-		if (hashtable->innerBatchFile[curbatch] &&
+		if (hashtable->innerBatchFile[batchidx] &&
 			HJ_FILL_INNER(hjstate))
 			break;				/* must process due to rule 1 */
-		if (hashtable->innerBatchFile[curbatch] &&
+		if (hashtable->innerBatchFile[batchidx] &&
 			nbatch != hashtable->nbatch_original)
 			break;				/* must process due to rule 2 */
-		if (hashtable->outerBatchFile[curbatch] &&
+		if (hashtable->outerBatchFile[batchidx] &&
 			nbatch != hashtable->nbatch_outstart)
 			break;				/* must process due to rule 3 */
 		/* We can ignore this batch. */
 		/* Release associated temp files right away. */
-		if (hashtable->innerBatchFile[curbatch])
-			BufFileClose(hashtable->innerBatchFile[curbatch]);
-		hashtable->innerBatchFile[curbatch] = NULL;
-		if (hashtable->outerBatchFile[curbatch])
-			BufFileClose(hashtable->outerBatchFile[curbatch]);
-		hashtable->outerBatchFile[curbatch] = NULL;
-		curbatch++;
+		if (hashtable->innerBatchFile[batchidx])
+			BufFileClose(hashtable->innerBatchFile[batchidx]);
+		hashtable->innerBatchFile[batchidx] = NULL;
+		if (hashtable->outerBatchFile[batchidx])
+			BufFileClose(hashtable->outerBatchFile[batchidx]);
+		hashtable->outerBatchFile[batchidx] = NULL;
+
+		curbatch = ExecHashSwitchToNextBatch(hashtable);
+		batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
 	}
 
 	if (curbatch >= nbatch)
+	{
+		hashtable->curbatch = curbatch_old;
 		return false;			/* no more batches */
-
-	hashtable->curbatch = curbatch;
+	}
 
 	/*
 	 * Reload the hash table with the new inner batch (which could be empty)
 	 */
 	ExecHashTableReset(hashtable);
 
-	innerFile = hashtable->innerBatchFile[curbatch];
+	innerFile = hashtable->innerBatchFile[batchidx];
 
 	if (innerFile != NULL)
 	{
@@ -1056,15 +1079,15 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
 		 * needed
 		 */
 		BufFileClose(innerFile);
-		hashtable->innerBatchFile[curbatch] = NULL;
+		hashtable->innerBatchFile[batchidx] = NULL;
 	}
 
 	/*
 	 * Rewind outer batch file (if present), so that we can start reading it.
 	 */
-	if (hashtable->outerBatchFile[curbatch] != NULL)
+	if (hashtable->outerBatchFile[batchidx] != NULL)
 	{
-		if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, SEEK_SET))
+		if (BufFileSeek(hashtable->outerBatchFile[batchidx], 0, 0L, SEEK_SET))
 			ereport(ERROR,
 					(errcode_for_file_access(),
 					 errmsg("could not rewind hash-join temporary file: %m")));
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index c7400941ee..e324869c09 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -3170,6 +3170,7 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace,
 	int			num_hashclauses = list_length(hashclauses);
 	int			numbuckets;
 	int			numbatches;
+	int			numbatches_inmemory;
 	int			num_skew_mcvs;
 	size_t		space_allowed;	/* unused */
 
@@ -3219,6 +3220,7 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace,
 							&space_allowed,
 							&numbuckets,
 							&numbatches,
+							&numbatches_inmemory,
 							&num_skew_mcvs);
 
 	/*
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index a9f9872a78..ef60df5024 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -308,6 +308,7 @@ typedef struct HashJoinTableData
 	int		   *skewBucketNums; /* array indexes of active skew buckets */
 
 	int			nbatch;			/* number of batches */
+	int			nbatch_inmemory;	/* max number of in-memory batches */
 	int			curbatch;		/* current batch #; 0 during 1st pass */
 
 	int			nbatch_original;	/* nbatch when we started inner scan */
@@ -329,6 +330,9 @@ typedef struct HashJoinTableData
 	BufFile   **innerBatchFile; /* buffered virtual temp file per batch */
 	BufFile   **outerBatchFile; /* buffered virtual temp file per batch */
 
+	BufFile	   *innerOverflowFile;	/* temp file for overflow batch batch */
+	BufFile	   *outerOverflowFile;	/* temp file for overflow batch batch */
+
 	/*
 	 * Info about the datatype-specific hash functions for the datatypes being
 	 * hashed. These are arrays of the same length as the number of hash join
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index 8d700c06c5..78389bc0cf 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -16,6 +16,7 @@
 
 #include "access/parallel.h"
 #include "nodes/execnodes.h"
+#include "storage/buffile.h"
 
 struct SharedHashJoinBatch;
 
@@ -53,6 +54,11 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable,
 						  uint32 hashvalue,
 						  int *bucketno,
 						  int *batchno);
+extern int ExecHashGetBatchIndex(HashJoinTable hashtable, int batchno);
+extern BufFile **ExecHashGetBatchFile(HashJoinTable hashtable, int batchno,
+					 BufFile **files, BufFile **overflow);
+extern void ExecHashSwitchToNextBatchSlice(HashJoinTable hashtable);
+extern int ExecHashSwitchToNextBatch(HashJoinTable hashtable);
 extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
 extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
 extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate);
@@ -66,6 +72,7 @@ extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 						size_t *space_allowed,
 						int *numbuckets,
 						int *numbatches,
+						int *numbatches_inmemory,
 						int *num_skew_mcvs);
 extern int	ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue);
 extern void ExecHashEstimate(HashState *node, ParallelContext *pcxt);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 9959c9e31f..6c53c5abd2 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2115,6 +2115,7 @@ typedef struct HashInstrumentation
 	int			nbuckets_original;	/* planned number of buckets */
 	int			nbatch;			/* number of batches at end of execution */
 	int			nbatch_original;	/* planned number of batches */
+	int			nbatch_inmemory;	/* number of batches kept in memory */
 	size_t		space_peak;		/* speak memory usage in bytes */
 } HashInstrumentation;
 
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 799a22e9d5..c957043599 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -2612,6 +2612,8 @@ show_hash_info(HashState *hashstate, ExplainState *es)
 								   hinstrument.nbatch, es);
 			ExplainPropertyInteger("Original Hash Batches", NULL,
 								   hinstrument.nbatch_original, es);
+			ExplainPropertyInteger("In-Memory Hash Batches", NULL,
+								   hinstrument.nbatch_original, es);
 			ExplainPropertyInteger("Peak Memory Usage", "kB",
 								   spacePeakKb, es);
 		}
@@ -2619,21 +2621,38 @@ show_hash_info(HashState *hashstate, ExplainState *es)
 				 hinstrument.nbuckets_original != hinstrument.nbuckets)
 		{
 			appendStringInfoSpaces(es->str, es->indent * 2);
-			appendStringInfo(es->str,
-							 "Buckets: %d (originally %d)  Batches: %d (originally %d)  Memory Usage: %ldkB\n",
-							 hinstrument.nbuckets,
-							 hinstrument.nbuckets_original,
-							 hinstrument.nbatch,
-							 hinstrument.nbatch_original,
-							 spacePeakKb);
+			if (hinstrument.nbatch != hinstrument.nbatch_inmemory)
+				appendStringInfo(es->str,
+								 "Buckets: %d (originally %d)  Batches: %d (originally %d, in-memory %d)  Memory Usage: %ldkB\n",
+								 hinstrument.nbuckets,
+								 hinstrument.nbuckets_original,
+								 hinstrument.nbatch,
+								 hinstrument.nbatch_original,
+								 hinstrument.nbatch_inmemory,
+								 spacePeakKb);
+			else
+				appendStringInfo(es->str,
+								 "Buckets: %d (originally %d)  Batches: %d (originally %d)  Memory Usage: %ldkB\n",
+								 hinstrument.nbuckets,
+								 hinstrument.nbuckets_original,
+								 hinstrument.nbatch,
+								 hinstrument.nbatch_original,
+								 spacePeakKb);
 		}
 		else
 		{
 			appendStringInfoSpaces(es->str, es->indent * 2);
-			appendStringInfo(es->str,
-							 "Buckets: %d  Batches: %d  Memory Usage: %ldkB\n",
-							 hinstrument.nbuckets, hinstrument.nbatch,
-							 spacePeakKb);
+			if (hinstrument.nbatch != hinstrument.nbatch_inmemory)
+				appendStringInfo(es->str,
+								 "Buckets: %d  Batches: %d (in-memory: %d)  Memory Usage: %ldkB\n",
+								 hinstrument.nbuckets, hinstrument.nbatch,
+								 hinstrument.nbatch_inmemory,
+								 spacePeakKb);
+			else
+				appendStringInfo(es->str,
+								 "Buckets: %d  Batches: %d  Memory Usage: %ldkB\n",
+								 hinstrument.nbuckets, hinstrument.nbatch,
+								 spacePeakKb);
 		}
 	}
 }
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 6ffaa751f2..4364eb7cdd 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -80,6 +80,7 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable,
 static void ExecParallelHashMergeCounters(HashJoinTable hashtable);
 static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable);
 
+static void ExecHashUpdateSpacePeak(HashJoinTable hashtable);
 
 /* ----------------------------------------------------------------
  *		ExecHash
@@ -193,10 +194,8 @@ MultiExecPrivateHash(HashState *node)
 	if (hashtable->nbuckets != hashtable->nbuckets_optimal)
 		ExecHashIncreaseNumBuckets(hashtable);
 
-	/* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
-	hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
-	if (hashtable->spaceUsed > hashtable->spacePeak)
-		hashtable->spacePeak = hashtable->spaceUsed;
+	/* refresh info about peak used memory */
+	ExecHashUpdateSpacePeak(hashtable);
 
 	hashtable->partialTuples = hashtable->totalTuples;
 }
@@ -433,6 +432,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls)
 	size_t		space_allowed;
 	int			nbuckets;
 	int			nbatch;
+	int			nbatch_inmemory;
 	double		rows;
 	int			num_skew_mcvs;
 	int			log2_nbuckets;
@@ -462,7 +462,8 @@ ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls)
 							state->parallel_state != NULL ?
 							state->parallel_state->nparticipants - 1 : 0,
 							&space_allowed,
-							&nbuckets, &nbatch, &num_skew_mcvs);
+							&nbuckets, &nbatch, &nbatch_inmemory,
+							&num_skew_mcvs);
 
 	/* nbuckets must be a power of 2 */
 	log2_nbuckets = my_log2(nbuckets);
@@ -489,6 +490,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls)
 	hashtable->nSkewBuckets = 0;
 	hashtable->skewBucketNums = NULL;
 	hashtable->nbatch = nbatch;
+	hashtable->nbatch_inmemory = nbatch_inmemory;
 	hashtable->curbatch = 0;
 	hashtable->nbatch_original = nbatch;
 	hashtable->nbatch_outstart = nbatch;
@@ -498,6 +500,8 @@ ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls)
 	hashtable->skewTuples = 0;
 	hashtable->innerBatchFile = NULL;
 	hashtable->outerBatchFile = NULL;
+	hashtable->innerOverflowFiles = NULL;
+	hashtable->outerOverflowFiles = NULL;
 	hashtable->spaceUsed = 0;
 	hashtable->spacePeak = 0;
 	hashtable->spaceAllowed = space_allowed;
@@ -559,16 +563,30 @@ ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls)
 
 	if (nbatch > 1 && hashtable->parallel_state == NULL)
 	{
+		int	cnt = Min(nbatch, nbatch_inmemory);
+
 		/*
 		 * allocate and initialize the file arrays in hashCxt (not needed for
 		 * parallel case which uses shared tuplestores instead of raw files)
 		 */
 		hashtable->innerBatchFile = (BufFile **)
-			palloc0(nbatch * sizeof(BufFile *));
+			palloc0(cnt * sizeof(BufFile *));
 		hashtable->outerBatchFile = (BufFile **)
-			palloc0(nbatch * sizeof(BufFile *));
+			palloc0(cnt * sizeof(BufFile *));
 		/* The files will not be opened until needed... */
 		/* ... but make sure we have temp tablespaces established for them */
+
+		/* also allocate files for overflow batches */
+		if (nbatch > nbatch_inmemory)
+		{
+			int nslices = (nbatch / nbatch_inmemory);
+
+			hashtable->innerOverflowFiles = (BufFile **)
+				palloc0(nslices * sizeof(BufFile *));
+			hashtable->outerOverflowFiles = (BufFile **)
+				palloc0(nslices * sizeof(BufFile *));
+		}
+
 		PrepareTempTablespaces();
 	}
 
@@ -665,6 +683,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 						size_t *space_allowed,
 						int *numbuckets,
 						int *numbatches,
+						int *numbatches_inmemory,
 						int *num_skew_mcvs)
 {
 	int			tupsize;
@@ -675,6 +694,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 	long		max_pointers;
 	long		mppow2;
 	int			nbatch = 1;
+	int			nbatch_inmemory = 1;
 	int			nbuckets;
 	double		dbuckets;
 
@@ -795,6 +815,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 									space_allowed,
 									numbuckets,
 									numbatches,
+									numbatches_inmemory,
 									num_skew_mcvs);
 			return;
 		}
@@ -831,11 +852,22 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 			nbatch <<= 1;
 	}
 
+	/*
+	 * See how many batches we can fit into memory (driven mostly by size
+	 * of BufFile, with PGAlignedBlock being the largest part of that).
+	 * We need one BufFile for inner and outer side, so we count it twice
+	 * for each batch, and we stop once we exceed (work_mem/2).
+	 */
+	while ((nbatch_inmemory * 2) * sizeof(PGAlignedBlock) * 2
+			<= (work_mem * 1024L / 2))
+		nbatch_inmemory *= 2;
+
 	Assert(nbuckets > 0);
 	Assert(nbatch > 0);
 
 	*numbuckets = nbuckets;
 	*numbatches = nbatch;
+	*numbatches_inmemory = nbatch_inmemory;
 }
 
 
@@ -857,13 +889,27 @@ ExecHashTableDestroy(HashJoinTable hashtable)
 	 */
 	if (hashtable->innerBatchFile != NULL)
 	{
-		for (i = 1; i < hashtable->nbatch; i++)
+		int n = Min(hashtable->nbatch, hashtable->nbatch_inmemory);
+
+		for (i = 1; i < n; i++)
 		{
 			if (hashtable->innerBatchFile[i])
 				BufFileClose(hashtable->innerBatchFile[i]);
 			if (hashtable->outerBatchFile[i])
 				BufFileClose(hashtable->outerBatchFile[i]);
 		}
+
+		/* number of batch slices */
+		n = hashtable->nbatch / hashtable->nbatch_inmemory;
+
+		for (i = 1; i < n; i++)
+		{
+			if (hashtable->innerOverflowFiles[i])
+				BufFileClose(hashtable->innerOverflowFiles[i]);
+
+			if (hashtable->outerOverflowFiles[i])
+				BufFileClose(hashtable->outerOverflowFiles[i]);
+		}
 	}
 
 	/* Release working memory (batchCxt is a child, so it goes away too) */
@@ -909,6 +955,8 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 
 	if (hashtable->innerBatchFile == NULL)
 	{
+		/* XXX nbatch=1, no need to deal with nbatch_inmemory here */
+
 		/* we had no file arrays before */
 		hashtable->innerBatchFile = (BufFile **)
 			palloc0(nbatch * sizeof(BufFile *));
@@ -919,15 +967,50 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 	}
 	else
 	{
+		int nbatch_tmp = Min(nbatch, hashtable->nbatch_inmemory);
+
 		/* enlarge arrays and zero out added entries */
 		hashtable->innerBatchFile = (BufFile **)
-			repalloc(hashtable->innerBatchFile, nbatch * sizeof(BufFile *));
+			repalloc(hashtable->innerBatchFile, nbatch_tmp * sizeof(BufFile *));
 		hashtable->outerBatchFile = (BufFile **)
-			repalloc(hashtable->outerBatchFile, nbatch * sizeof(BufFile *));
-		MemSet(hashtable->innerBatchFile + oldnbatch, 0,
-			   (nbatch - oldnbatch) * sizeof(BufFile *));
-		MemSet(hashtable->outerBatchFile + oldnbatch, 0,
-			   (nbatch - oldnbatch) * sizeof(BufFile *));
+			repalloc(hashtable->outerBatchFile, nbatch_tmp * sizeof(BufFile *));
+
+		if (oldnbatch < nbatch_tmp)
+		{
+			MemSet(hashtable->innerBatchFile + oldnbatch, 0,
+				   (nbatch_tmp - oldnbatch) * sizeof(BufFile *));
+			MemSet(hashtable->outerBatchFile + oldnbatch, 0,
+				   (nbatch_tmp - oldnbatch) * sizeof(BufFile *));
+		}
+
+		if (nbatch_tmp > hashtable->nbatch_inmemory)
+		{
+			int nslices = (nbatch / hashtable->nbatch_inmemory);
+
+			if (hashtable->innerOverflowFiles == NULL)
+			{
+				hashtable->innerOverflowFiles = (BufFile **)
+					palloc0(nslices * sizeof(BufFile *));
+				hashtable->outerOverflowFiles = (BufFile **)
+					palloc0(nslices * sizeof(BufFile *));
+			}
+			else
+			{
+				hashtable->innerOverflowFiles = (BufFile **)
+					repalloc(hashtable->innerOverflowFiles,
+							 nslices * sizeof(BufFile *));
+				hashtable->outerOverflowFiles = (BufFile **)
+					repalloc(hashtable->outerOverflowFiles,
+							 nslices * sizeof(BufFile *));
+
+				/* we double the number of batches, so we know the old
+				 * value was nslices/2 exactly */
+				memset(hashtable->innerOverflowFiles + nslices/2, 0,
+					   (nslices/2) * sizeof(BufFile *));
+				memset(hashtable->outerOverflowFiles + nslices/2, 0,
+					   (nslices/2) * sizeof(BufFile *));
+			}
+		}
 	}
 
 	MemoryContextSwitchTo(oldcxt);
@@ -999,11 +1082,18 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 			}
 			else
 			{
+				BufFile **batchFile;
+
 				/* dump it out */
 				Assert(batchno > curbatch);
+
+				batchFile = ExecHashGetBatchFile(hashtable, batchno,
+												 hashtable->innerBatchFile,
+												 hashtable->innerOverflowFiles);
+
 				ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple),
 									  hashTuple->hashvalue,
-									  &hashtable->innerBatchFile[batchno]);
+									  batchFile);
 
 				hashtable->spaceUsed -= hashTupleSize;
 				nfreed++;
@@ -1647,22 +1737,33 @@ ExecHashTableInsert(HashJoinTable hashtable,
 
 		/* Account for space used, and back off if we've used too much */
 		hashtable->spaceUsed += hashTupleSize;
-		if (hashtable->spaceUsed > hashtable->spacePeak)
-			hashtable->spacePeak = hashtable->spaceUsed;
+
+		/* refresh info about peak used memory */
+		ExecHashUpdateSpacePeak(hashtable);
+
+		/* Consider increasing number of batches if we filled work_mem. */
 		if (hashtable->spaceUsed +
-			hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
+			hashtable->nbuckets_optimal * sizeof(HashJoinTuple) +
+			Min(hashtable->nbatch, hashtable->nbatch_inmemory) * sizeof(PGAlignedBlock) * 2	/* inner + outer */
 			> hashtable->spaceAllowed)
 			ExecHashIncreaseNumBatches(hashtable);
 	}
 	else
 	{
+		BufFile **batchFile;
+
 		/*
 		 * put the tuple into a temp file for later batches
 		 */
 		Assert(batchno > hashtable->curbatch);
+
+		batchFile = ExecHashGetBatchFile(hashtable, batchno,
+										 hashtable->innerBatchFile,
+										 hashtable->innerOverflowFiles);
+
 		ExecHashJoinSaveTuple(tuple,
 							  hashvalue,
-							  &hashtable->innerBatchFile[batchno]);
+							  batchFile);
 	}
 }
 
@@ -1893,6 +1994,108 @@ ExecHashGetBucketAndBatch(HashJoinTable hashtable,
 	}
 }
 
+int
+ExecHashGetBatchIndex(HashJoinTable hashtable, int batchno)
+{
+	int	slice,
+		curslice;
+
+	if (hashtable->nbatch <= hashtable->nbatch_inmemory)
+		return batchno;
+
+	slice = batchno / hashtable->nbatch_inmemory;
+	curslice = hashtable->curbatch / hashtable->nbatch_inmemory;
+
+	/* slices can't go backwards */
+	Assert(slice >= curslice);
+
+	/* overflow slice */
+	if (slice > curslice)
+		return -1;
+
+	/* current slice, compute index in the current array */
+	return (batchno % hashtable->nbatch_inmemory);
+}
+
+BufFile **
+ExecHashGetBatchFile(HashJoinTable hashtable, int batchno,
+					 BufFile **batchFiles, BufFile **overflowFiles)
+{
+	int		idx = ExecHashGetBatchIndex(hashtable, batchno);
+
+	/* get the right overflow file */
+	if (idx == -1)
+	{
+		int slice = (batchno / hashtable->nbatch_inmemory);
+
+		return &overflowFiles[slice];
+	}
+
+	/* batch file in the current slice */
+	return &batchFiles[idx];
+}
+
+void
+ExecHashSwitchToNextBatchSlice(HashJoinTable hashtable)
+{
+	int	slice = (hashtable->curbatch / hashtable->nbatch_inmemory);
+
+	memset(hashtable->innerBatchFile, 0,
+		   hashtable->nbatch_inmemory * sizeof(BufFile *));
+
+	hashtable->innerBatchFile[0] = hashtable->innerOverflowFiles[slice];
+	hashtable->innerOverflowFiles[slice] = NULL;
+
+	memset(hashtable->outerBatchFile, 0,
+		   hashtable->nbatch_inmemory * sizeof(BufFile *));
+
+	hashtable->outerBatchFile[0] = hashtable->outerOverflowFiles[slice];
+	hashtable->outerOverflowFiles[slice] = NULL;
+}
+
+int
+ExecHashSwitchToNextBatch(HashJoinTable hashtable)
+{
+	int		batchidx;
+
+	hashtable->curbatch++;
+
+	/* see if we skipped to the next batch slice */
+	batchidx = ExecHashGetBatchIndex(hashtable, hashtable->curbatch);
+
+	/* Can't be -1, current batch is in the current slice by definition. */
+	Assert(batchidx >= 0 && batchidx < hashtable->nbatch_inmemory);
+
+	/*
+	 * If we skipped to the next slice of batches, reset the array of files
+	 * and use the overflow file as the first batch.
+	 */
+	if (batchidx == 0)
+		ExecHashSwitchToNextBatchSlice(hashtable);
+
+	return hashtable->curbatch;
+}
+
+static void
+ExecHashUpdateSpacePeak(HashJoinTable hashtable)
+{
+	Size	spaceUsed = hashtable->spaceUsed;
+
+	/* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
+	spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
+
+	/* Account for memory used for batch files (inner + outer) */
+	spaceUsed += Min(hashtable->nbatch, hashtable->nbatch_inmemory) *
+				 sizeof(PGAlignedBlock) * 2;
+
+	/* Account for slice files (inner + outer) */
+	spaceUsed += (hashtable->nbatch / hashtable->nbatch_inmemory) *
+				 sizeof(PGAlignedBlock) * 2;
+
+	if (spaceUsed > hashtable->spacePeak)
+		hashtable->spacePeak = spaceUsed;
+}
+
 /*
  * ExecScanHashBucket
  *		scan a hash bucket for matches to the current outer tuple
@@ -2272,8 +2475,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse)
 			+ mcvsToUse * sizeof(int);
 		hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *)
 			+ mcvsToUse * sizeof(int);
-		if (hashtable->spaceUsed > hashtable->spacePeak)
-			hashtable->spacePeak = hashtable->spaceUsed;
+
+		/* refresh info about peak used memory */
+		ExecHashUpdateSpacePeak(hashtable);
 
 		/*
 		 * Create a skew bucket for each MCV hash value.
@@ -2322,8 +2526,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse)
 			hashtable->nSkewBuckets++;
 			hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD;
 			hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD;
-			if (hashtable->spaceUsed > hashtable->spacePeak)
-				hashtable->spacePeak = hashtable->spaceUsed;
+
+			/* refresh info about peak used memory */
+			ExecHashUpdateSpacePeak(hashtable);
 		}
 
 		free_attstatsslot(&sslot);
@@ -2411,8 +2616,10 @@ ExecHashSkewTableInsert(HashJoinTable hashtable,
 	/* Account for space used, and back off if we've used too much */
 	hashtable->spaceUsed += hashTupleSize;
 	hashtable->spaceUsedSkew += hashTupleSize;
-	if (hashtable->spaceUsed > hashtable->spacePeak)
-		hashtable->spacePeak = hashtable->spaceUsed;
+
+	/* refresh info about peak used memory */
+	ExecHashUpdateSpacePeak(hashtable);
+
 	while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew)
 		ExecHashRemoveNextSkewBucket(hashtable);
 
@@ -2488,10 +2695,17 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
 		}
 		else
 		{
+			BufFile **batchFile;
+
 			/* Put the tuple into a temp file for later batches */
 			Assert(batchno > hashtable->curbatch);
+
+			batchFile = ExecHashGetBatchFile(hashtable, batchno,
+											 hashtable->innerBatchFile,
+											 hashtable->innerOverflowFiles);
+
 			ExecHashJoinSaveTuple(tuple, hashvalue,
-								  &hashtable->innerBatchFile[batchno]);
+								  batchFile);
 			pfree(hashTuple);
 			hashtable->spaceUsed -= tupleSize;
 			hashtable->spaceUsedSkew -= tupleSize;
@@ -2640,6 +2854,7 @@ ExecHashGetInstrumentation(HashInstrumentation *instrument,
 	instrument->nbuckets_original = hashtable->nbuckets_original;
 	instrument->nbatch = hashtable->nbatch;
 	instrument->nbatch_original = hashtable->nbatch_original;
+	instrument->nbatch_inmemory = Min(hashtable->nbatch, hashtable->nbatch_inmemory);
 	instrument->space_peak = hashtable->spacePeak;
 }
 
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 5922e60eed..a8db71925b 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -389,15 +389,22 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 				if (batchno != hashtable->curbatch &&
 					node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO)
 				{
+					BufFile	  **batchFile;
+
 					/*
 					 * Need to postpone this outer tuple to a later batch.
 					 * Save it in the corresponding outer-batch file.
 					 */
 					Assert(parallel_state == NULL);
 					Assert(batchno > hashtable->curbatch);
+
+					batchFile = ExecHashGetBatchFile(hashtable, batchno,
+													 hashtable->outerBatchFile,
+													 hashtable->outerOverflowFiles);
+
 					ExecHashJoinSaveTuple(ExecFetchSlotMinimalTuple(outerTupleSlot),
 										  hashvalue,
-										  &hashtable->outerBatchFile[batchno]);
+										  batchFile);
 
 					/* Loop around, staying in HJ_NEED_NEW_OUTER state */
 					continue;
@@ -849,17 +856,19 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode,
 	}
 	else if (curbatch < hashtable->nbatch)
 	{
-		BufFile    *file = hashtable->outerBatchFile[curbatch];
+		BufFile    **file = ExecHashGetBatchFile(hashtable, curbatch,
+												 hashtable->outerBatchFile,
+												 hashtable->outerOverflowFiles);
 
 		/*
 		 * In outer-join cases, we could get here even though the batch file
 		 * is empty.
 		 */
-		if (file == NULL)
+		if (*file == NULL)
 			return NULL;
 
 		slot = ExecHashJoinGetSavedTuple(hjstate,
-										 file,
+										 *file,
 										 hashvalue,
 										 hjstate->hj_OuterTupleSlot);
 		if (!TupIsNull(slot))
@@ -946,9 +955,18 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
 	BufFile    *innerFile;
 	TupleTableSlot *slot;
 	uint32		hashvalue;
+	int			batchidx;
+	int			curbatch_old;
 
 	nbatch = hashtable->nbatch;
 	curbatch = hashtable->curbatch;
+	curbatch_old = curbatch;
+
+	/* index of the old batch */
+	batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
+
+	/* has to be in the current slice of batches */
+	Assert(batchidx >= 0 && batchidx < hashtable->nbatch_inmemory);
 
 	if (curbatch > 0)
 	{
@@ -956,9 +974,9 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
 		 * We no longer need the previous outer batch file; close it right
 		 * away to free disk space.
 		 */
-		if (hashtable->outerBatchFile[curbatch])
-			BufFileClose(hashtable->outerBatchFile[curbatch]);
-		hashtable->outerBatchFile[curbatch] = NULL;
+		if (hashtable->outerBatchFile[batchidx])
+			BufFileClose(hashtable->outerBatchFile[batchidx]);
+		hashtable->outerBatchFile[batchidx] = NULL;
 	}
 	else						/* we just finished the first batch */
 	{
@@ -992,45 +1010,50 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
 	 * scan, we have to rescan outer batches in case they contain tuples that
 	 * need to be reassigned.
 	 */
-	curbatch++;
+	curbatch = ExecHashSwitchToNextBatch(hashtable);
+	batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
+
 	while (curbatch < nbatch &&
-		   (hashtable->outerBatchFile[curbatch] == NULL ||
-			hashtable->innerBatchFile[curbatch] == NULL))
+		   (hashtable->outerBatchFile[batchidx] == NULL ||
+			hashtable->innerBatchFile[batchidx] == NULL))
 	{
-		if (hashtable->outerBatchFile[curbatch] &&
+		if (hashtable->outerBatchFile[batchidx] &&
 			HJ_FILL_OUTER(hjstate))
 			break;				/* must process due to rule 1 */
-		if (hashtable->innerBatchFile[curbatch] &&
+		if (hashtable->innerBatchFile[batchidx] &&
 			HJ_FILL_INNER(hjstate))
 			break;				/* must process due to rule 1 */
-		if (hashtable->innerBatchFile[curbatch] &&
+		if (hashtable->innerBatchFile[batchidx] &&
 			nbatch != hashtable->nbatch_original)
 			break;				/* must process due to rule 2 */
-		if (hashtable->outerBatchFile[curbatch] &&
+		if (hashtable->outerBatchFile[batchidx] &&
 			nbatch != hashtable->nbatch_outstart)
 			break;				/* must process due to rule 3 */
 		/* We can ignore this batch. */
 		/* Release associated temp files right away. */
-		if (hashtable->innerBatchFile[curbatch])
-			BufFileClose(hashtable->innerBatchFile[curbatch]);
-		hashtable->innerBatchFile[curbatch] = NULL;
-		if (hashtable->outerBatchFile[curbatch])
-			BufFileClose(hashtable->outerBatchFile[curbatch]);
-		hashtable->outerBatchFile[curbatch] = NULL;
-		curbatch++;
+		if (hashtable->innerBatchFile[batchidx])
+			BufFileClose(hashtable->innerBatchFile[batchidx]);
+		hashtable->innerBatchFile[batchidx] = NULL;
+		if (hashtable->outerBatchFile[batchidx])
+			BufFileClose(hashtable->outerBatchFile[batchidx]);
+		hashtable->outerBatchFile[batchidx] = NULL;
+
+		curbatch = ExecHashSwitchToNextBatch(hashtable);
+		batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
 	}
 
 	if (curbatch >= nbatch)
+	{
+		hashtable->curbatch = curbatch_old;
 		return false;			/* no more batches */
-
-	hashtable->curbatch = curbatch;
+	}
 
 	/*
 	 * Reload the hash table with the new inner batch (which could be empty)
 	 */
 	ExecHashTableReset(hashtable);
 
-	innerFile = hashtable->innerBatchFile[curbatch];
+	innerFile = hashtable->innerBatchFile[batchidx];
 
 	if (innerFile != NULL)
 	{
@@ -1056,15 +1079,15 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
 		 * needed
 		 */
 		BufFileClose(innerFile);
-		hashtable->innerBatchFile[curbatch] = NULL;
+		hashtable->innerBatchFile[batchidx] = NULL;
 	}
 
 	/*
 	 * Rewind outer batch file (if present), so that we can start reading it.
 	 */
-	if (hashtable->outerBatchFile[curbatch] != NULL)
+	if (hashtable->outerBatchFile[batchidx] != NULL)
 	{
-		if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, SEEK_SET))
+		if (BufFileSeek(hashtable->outerBatchFile[batchidx], 0, 0L, SEEK_SET))
 			ereport(ERROR,
 					(errcode_for_file_access(),
 					 errmsg("could not rewind hash-join temporary file: %m")));
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index c7400941ee..e324869c09 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -3170,6 +3170,7 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace,
 	int			num_hashclauses = list_length(hashclauses);
 	int			numbuckets;
 	int			numbatches;
+	int			numbatches_inmemory;
 	int			num_skew_mcvs;
 	size_t		space_allowed;	/* unused */
 
@@ -3219,6 +3220,7 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace,
 							&space_allowed,
 							&numbuckets,
 							&numbatches,
+							&numbatches_inmemory,
 							&num_skew_mcvs);
 
 	/*
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index a9f9872a78..311a0980ee 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -308,6 +308,7 @@ typedef struct HashJoinTableData
 	int		   *skewBucketNums; /* array indexes of active skew buckets */
 
 	int			nbatch;			/* number of batches */
+	int			nbatch_inmemory;	/* max number of in-memory batches */
 	int			curbatch;		/* current batch #; 0 during 1st pass */
 
 	int			nbatch_original;	/* nbatch when we started inner scan */
@@ -329,6 +330,9 @@ typedef struct HashJoinTableData
 	BufFile   **innerBatchFile; /* buffered virtual temp file per batch */
 	BufFile   **outerBatchFile; /* buffered virtual temp file per batch */
 
+	BufFile	  **innerOverflowFiles;	/* temp file for inner overflow batches */
+	BufFile	  **outerOverflowFiles;	/* temp file for outer overflow batches */
+
 	/*
 	 * Info about the datatype-specific hash functions for the datatypes being
 	 * hashed. These are arrays of the same length as the number of hash join
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index 8d700c06c5..bb6b24a1b4 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -16,6 +16,7 @@
 
 #include "access/parallel.h"
 #include "nodes/execnodes.h"
+#include "storage/buffile.h"
 
 struct SharedHashJoinBatch;
 
@@ -53,6 +54,11 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable,
 						  uint32 hashvalue,
 						  int *bucketno,
 						  int *batchno);
+extern int ExecHashGetBatchIndex(HashJoinTable hashtable, int batchno);
+extern BufFile **ExecHashGetBatchFile(HashJoinTable hashtable, int batchno,
+					 BufFile **batchFiles, BufFile **overflowFiles);
+extern void ExecHashSwitchToNextBatchSlice(HashJoinTable hashtable);
+extern int ExecHashSwitchToNextBatch(HashJoinTable hashtable);
 extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
 extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
 extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate);
@@ -66,6 +72,7 @@ extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 						size_t *space_allowed,
 						int *numbuckets,
 						int *numbatches,
+						int *numbatches_inmemory,
 						int *num_skew_mcvs);
 extern int	ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue);
 extern void ExecHashEstimate(HashState *node, ParallelContext *pcxt);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 9959c9e31f..6c53c5abd2 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2115,6 +2115,7 @@ typedef struct HashInstrumentation
 	int			nbuckets_original;	/* planned number of buckets */
 	int			nbatch;			/* number of batches at end of execution */
 	int			nbatch_original;	/* planned number of batches */
+	int			nbatch_inmemory;	/* number of batches kept in memory */
 	size_t		space_peak;		/* speak memory usage in bytes */
 } HashInstrumentation;
 

Attachment: hashjoin-test.sh
Description: Bourne shell script


[Postgresql General]     [Postgresql PHP]     [PHP Users]     [PHP Home]     [PHP on Windows]     [Kernel Newbies]     [PHP Classes]     [PHP Books]     [PHP Databases]     [Yosemite]

  Powered by Linux