On Fri, Apr 19, 2019 at 11:34:54PM -0400, Gunther wrote:
...
It would be so nice if there was a way to force a specific plan for
purposes of the testing. I tried giving false data in pg_class
reltuples and relpages:
foo=# analyze tmp_q;
ANALYZE
foo=# analyze tmp_r;
ANALYZE
foo=# select relname, relpages, reltuples from pg_class where relname in ('tmp_q', 'tmp_r');
relname | relpages | reltuples
---------+----------+-------------
tmp_r | 5505039 | 1.13467e+08
tmp_q | 7 | 236
(2 rows)
foo=# update pg_class set (relpages, reltuples) = (5505039, 1.13467e+08) where relname = 'tmp_q';
UPDATE 1
foo=# update pg_class set (relpages, reltuples) = (7, 236) where relname = 'tmp_r';
UPDATE 1
but that didn't help. Somehow the planner outsmarts every such trick,
so I can't get it to follow my right outer join plan where the big
table is hashed. I am sure y'all know some way to force it.
That does not work, because the planner does not actually use these values
directly - it only computes the density from them, and then multiplies
that to the current number of pages in the file. That behaves much nicer
when the table grows/shrinks between refreshes of the pg_class values.
So what you need to do is tweak these values to skew the density in a way
that then results in the desired esimate when multiplied with the actual
number of pages. For me, this did the trick:
update pg_class set (relpages, reltuples) = (1000000, 1)
where relname = 'tmp_r';
update pg_class set (relpages, reltuples) = (1, 1000000)
where relname = 'tmp_q';
after which I get a plan like this:
Hash Right Join
Hash Cond: (...)
-> Seq Scan on tmp_q q
-> Hash
-> Seq Scan on tmp_r r
As for the issue, I have a theory that I think would explain the issues.
It is related to the number of batches, as others speculated over here.
It's not a memory leak, though, it's just that each batch requires a lot
of extra memory and we don't account for that.
The trouble is - each batch is represented by BufFile, which is a whopping
8272 bytes, because it includes PGAlignedBlock. Now, with 131072 batches,
that's a nice 1GB of memory right there. And we don't actually account for
this memory in hashjoin code, so it's not counted against work_mem and we
just increase the number of batches.
Attached are two patches, that should help us to confirm that's actually
what's happening when running the query on actual data. The first patch
moves the BufFile stuff into a separate memory context, to make it more
obvious where the memory went. It also adds a buch of logging into the
ExecHashIncreaseNumBatches() function.
The second patch makes sure all the BufFiles are allocated right when
increasing the number of batches - otherwise we allocate them only when we
actually find a row for that batch, and I suspect the sample data shared
on this thread are somewhat correlated (I see long runs of the same UUID
value). That might slow down the memory growth. Of course, the real data
probably don't have such correlation, resulting in faster failures.
With the patch, I see stuff like this with 256k batches:
ExecutorState: 65536 total in 4 blocks; 28136 free (4 chunks); 37400 used
HashTableContext: 8192 total in 1 blocks; 7624 free (0 chunks); 568 used
hash batch files: 4404002656 total in 524302 blocks; 8387928 free (20 chunks); 4395614728 used
so it's conceivable it's the root cause.
As for a fix, I'm not sure. I'm pretty sure we need to consider the amount
of memory for BufFile(s) when increasing the number of batches. But we
can't just stop incrementing the batches, because that would mean the
current batch may easily get bigger than work_mem :-(
I think we might cap the number of batches kept in memory, and at some
point start spilling data into an "overflow batch." So for example we'd
allow 32k batches, and then instead of increasing nbatch to 64k, we'd
create a single "overflow batch" representing batches 32k - 64k. After
processing the first 32k batches, we'd close the files and reuse the
memory for the next 32k batches. We'd read the overflow batch, split it
into the 32k batches, and just process them as usual. Of course, there
might be multiple rounds of this, for example we might end up with 32k
concurrent batches but 128k virtual ones, which means we'd do 4 rounds of
this dance.
It's a bit inefficient, but situations like this should be rather rare,
and it's more graceful than just crashing with OOM.
regards
--
Tomas Vondra http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
>From 431f2442f463c16b4f8f7a4c268c1533ed1aca84 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas@xxxxxxxxxxxxxxx>
Date: Sat, 20 Apr 2019 21:44:37 +0200
Subject: [PATCH 1/2] move BufFile stuff into separate context
---
src/backend/executor/nodeHash.c | 48 +++++++++++++++++++++++++++++
src/backend/executor/nodeHashjoin.c | 7 +++++
src/include/executor/hashjoin.h | 1 +
3 files changed, 56 insertions(+)
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 6ffaa751f2..7d36c22d61 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -498,6 +498,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls)
hashtable->skewTuples = 0;
hashtable->innerBatchFile = NULL;
hashtable->outerBatchFile = NULL;
+ hashtable->fileCtx = NULL;
hashtable->spaceUsed = 0;
hashtable->spacePeak = 0;
hashtable->spaceAllowed = space_allowed;
@@ -900,6 +901,11 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
nbatch = oldnbatch * 2;
Assert(nbatch > 1);
+ elog(WARNING, "ExecHashIncreaseNumBatches: increasing number of batches from %d to %d", oldnbatch, nbatch);
+
+ elog(LOG, "ExecHashIncreaseNumBatches ======= context stats start =======");
+ MemoryContextStats(TopMemoryContext);
+
#ifdef HJDEBUG
printf("Hashjoin %p: increasing nbatch to %d because space = %zu\n",
hashtable, nbatch, hashtable->spaceUsed);
@@ -909,6 +915,14 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
if (hashtable->innerBatchFile == NULL)
{
+ MemoryContext oldctx;
+
+ hashtable->fileCtx = AllocSetContextCreate(CurrentMemoryContext,
+ "hash batch files",
+ ALLOCSET_DEFAULT_SIZES);
+
+ oldctx = MemoryContextSwitchTo(hashtable->fileCtx);
+
/* we had no file arrays before */
hashtable->innerBatchFile = (BufFile **)
palloc0(nbatch * sizeof(BufFile *));
@@ -916,9 +930,15 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
palloc0(nbatch * sizeof(BufFile *));
/* time to establish the temp tablespaces, too */
PrepareTempTablespaces();
+
+ MemoryContextSwitchTo(oldctx);
}
else
{
+ MemoryContext oldctx;
+
+ oldctx = MemoryContextSwitchTo(hashtable->fileCtx);
+
/* enlarge arrays and zero out added entries */
hashtable->innerBatchFile = (BufFile **)
repalloc(hashtable->innerBatchFile, nbatch * sizeof(BufFile *));
@@ -928,6 +948,9 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
(nbatch - oldnbatch) * sizeof(BufFile *));
MemSet(hashtable->outerBatchFile + oldnbatch, 0,
(nbatch - oldnbatch) * sizeof(BufFile *));
+
+ MemoryContextSwitchTo(oldctx);
+
}
MemoryContextSwitchTo(oldcxt);
@@ -999,12 +1022,19 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
}
else
{
+ MemoryContext oldctx;
+
/* dump it out */
Assert(batchno > curbatch);
+
+ oldctx = MemoryContextSwitchTo(hashtable->fileCtx);
+
ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple),
hashTuple->hashvalue,
&hashtable->innerBatchFile[batchno]);
+ MemoryContextSwitchTo(oldctx);
+
hashtable->spaceUsed -= hashTupleSize;
nfreed++;
}
@@ -1042,6 +1072,9 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
hashtable);
#endif
}
+
+ elog(LOG, "ExecHashIncreaseNumBatches ======= context stats end =======");
+ MemoryContextStats(TopMemoryContext);
}
/*
@@ -1656,13 +1689,20 @@ ExecHashTableInsert(HashJoinTable hashtable,
}
else
{
+ MemoryContext oldctx;
+
/*
* put the tuple into a temp file for later batches
*/
Assert(batchno > hashtable->curbatch);
+
+ oldctx = MemoryContextSwitchTo(hashtable->fileCtx);
+
ExecHashJoinSaveTuple(tuple,
hashvalue,
&hashtable->innerBatchFile[batchno]);
+
+ MemoryContextSwitchTo(oldctx);
}
}
@@ -2488,10 +2528,18 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
}
else
{
+ MemoryContext oldctx;
+
/* Put the tuple into a temp file for later batches */
Assert(batchno > hashtable->curbatch);
+
+ oldctx = MemoryContextSwitchTo(hashtable->fileCtx);
+
ExecHashJoinSaveTuple(tuple, hashvalue,
&hashtable->innerBatchFile[batchno]);
+
+ MemoryContextSwitchTo(oldctx);
+
pfree(hashTuple);
hashtable->spaceUsed -= tupleSize;
hashtable->spaceUsedSkew -= tupleSize;
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 5922e60eed..6a546344ac 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -389,16 +389,23 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
if (batchno != hashtable->curbatch &&
node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO)
{
+ MemoryContext oldctx;
+
/*
* Need to postpone this outer tuple to a later batch.
* Save it in the corresponding outer-batch file.
*/
Assert(parallel_state == NULL);
Assert(batchno > hashtable->curbatch);
+
+ oldctx = MemoryContextSwitchTo(hashtable->fileCtx);
+
ExecHashJoinSaveTuple(ExecFetchSlotMinimalTuple(outerTupleSlot),
hashvalue,
&hashtable->outerBatchFile[batchno]);
+ MemoryContextSwitchTo(oldctx);
+
/* Loop around, staying in HJ_NEED_NEW_OUTER state */
continue;
}
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index a9f9872a78..ef8e4475e8 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -328,6 +328,7 @@ typedef struct HashJoinTableData
*/
BufFile **innerBatchFile; /* buffered virtual temp file per batch */
BufFile **outerBatchFile; /* buffered virtual temp file per batch */
+ MemoryContext fileCtx;
/*
* Info about the datatype-specific hash functions for the datatypes being
--
2.20.1
>From 3fcf38603ebbd07e36dccadf84ab84409964fe7a Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas@xxxxxxxxxxxxxxx>
Date: Sat, 20 Apr 2019 21:59:37 +0200
Subject: [PATCH 2/2] allocate BufFile eagerly
---
src/backend/executor/nodeHash.c | 20 ++++++++++++++++++++
1 file changed, 20 insertions(+)
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 7d36c22d61..7f125d8dba 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -915,6 +915,7 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
if (hashtable->innerBatchFile == NULL)
{
+ int i;
MemoryContext oldctx;
hashtable->fileCtx = AllocSetContextCreate(CurrentMemoryContext,
@@ -931,10 +932,20 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
/* time to establish the temp tablespaces, too */
PrepareTempTablespaces();
+ for (i = 1; i < nbatch; i++)
+ {
+ if (!hashtable->innerBatchFile[i])
+ hashtable->innerBatchFile[i] = BufFileCreateTemp(false);
+
+ if (!hashtable->outerBatchFile[i])
+ hashtable->outerBatchFile[i] = BufFileCreateTemp(false);
+ }
+
MemoryContextSwitchTo(oldctx);
}
else
{
+ int i;
MemoryContext oldctx;
oldctx = MemoryContextSwitchTo(hashtable->fileCtx);
@@ -949,6 +960,15 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
MemSet(hashtable->outerBatchFile + oldnbatch, 0,
(nbatch - oldnbatch) * sizeof(BufFile *));
+ for (i = 1; i < nbatch; i++)
+ {
+ if (!hashtable->innerBatchFile[i])
+ hashtable->innerBatchFile[i] = BufFileCreateTemp(false);
+
+ if (!hashtable->outerBatchFile[i])
+ hashtable->outerBatchFile[i] = BufFileCreateTemp(false);
+ }
+
MemoryContextSwitchTo(oldctx);
}
--
2.20.1