Resurrecting an old thread.
We (AWS) have seen this wraparound during pg_upgrade more often recently
with customers who have millions of large objects in their databases.
On 6/11/18 1:14 PM, Tom Lane wrote:
Andres Freund <andres@xxxxxxxxxxx> writes:
I suspect the issue is that pg_resetwal does:
if (set_xid != 0)
{
ControlFile.checkPointCopy.nextXid = set_xid;
/*
* For the moment, just set oldestXid to a value that will force
* immediate autovacuum-for-wraparound. It's not clear whether adding
* user control of this is useful, so let's just do something that's
* reasonably safe. The magic constant here corresponds to the
* maximum allowed value of autovacuum_freeze_max_age.
*/
ControlFile.checkPointCopy.oldestXid = set_xid - 2000000000;
if (ControlFile.checkPointCopy.oldestXid < FirstNormalTransactionId)
ControlFile.checkPointCopy.oldestXid += FirstNormalTransactionId;
ControlFile.checkPointCopy.oldestXidDB = InvalidOid;
}
but we have codepath that doesn't check for oldestXidDB being
InvalidOid. Not great.
Hm, I think I'd define the problem as "pg_resetwal is violating the
expectation that oldestXidDB be valid".
However, this just explains the basically-cosmetic issue that the
complaint message mentions OID 0. It doesn't really get us to the
answer to why Alexander is seeing a failure. It might be useful
to see pg_controldata output for the old cluster, as well as
"select datname, datfrozenxid from pg_database" output from the
old cluster.
Unfortunately I don't have pg_controldata output from the old clusters
either. I would like to be able to artificially create an "old" cluster
that fails during pg_upgrade in that way.
One of the things in my way is that when using pg_resetwal to put the
NextXID way into the future (to push the old cluster close to wraparound
for example), the postmaster won't start because it doesn't have the
pg_xact files for that around. Should pg_resetwal create the files in
the gap between the old NextXID and the new one?
Onw thing I do have is a patch that provides a workaround for the
problem as well as a substantial speed improvement for the case at hand.
This patch adds some options to pg_upgrade, pg_dump and pg_restore.
Option added to pg_dump:
--blob-in-parallel
This option requires --schema-only. It causes pg_dump to emit the BLOB
metadata with SECTION_DATA instead of SECTION_PRE_DATA. This causes the
statements for creating the large object metadata (lo_create(OID) and
ALTER LARGE OBJECT) to move into the parallel phase of pg_restore, which
means that their metadata will be created in parallel. In my tests a
database containing large objects only is upgraded in 1/#cores the time.
Option added to pg_restore:
--blob-batch-size=N
With this option pg_restore tries to put N BLOB TOC entries into one
transaction. This is per parallel worker and it will commit those
batches if there is a change in object type, so only BLOB TOC entries
will ever be batched at all. With a sufficient
'max_locks_per_transation' a --blob-batch-size=1000 nicely reduces the
number of XIDs consumed for upgrading 10M large objects from 20M to 10K.
Options added to pg_upgrade:
--blob-in-parallel forwarded to pg_dump
--blob-batch-size=N forwarded to pg_restore
--restore-jobs=N forwarded as --jobs=N to pg_restore
Patch is attached.
Regards, Jan
--
Jan Wieck
Principle Database Engineer
Amazon Web Services
diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c
index c6059fc..fdcb5e7 100644
--- a/src/bin/pg_dump/parallel.c
+++ b/src/bin/pg_dump/parallel.c
@@ -865,6 +865,11 @@ RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
WaitForCommands(AH, pipefd);
/*
+ * Close an eventually open BLOB batch transaction.
+ */
+ CommitBlobTransaction((Archive *)AH);
+
+ /*
* Disconnect from database and clean up.
*/
set_cancel_slot_archive(slot, NULL);
diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h
index 97941fa..2bedd02 100644
--- a/src/bin/pg_dump/pg_backup.h
+++ b/src/bin/pg_dump/pg_backup.h
@@ -201,6 +201,8 @@ typedef struct Archive
int numWorkers; /* number of parallel processes */
char *sync_snapshot_id; /* sync snapshot id for parallel operation */
+ int blobBatchSize; /* # of blob to restore per transaction */
+ bool blobInParallel; /* place "BLOB" TEs in SECTION_DATA */
/* info needed for string escaping */
int encoding; /* libpq code for client_encoding */
@@ -268,6 +270,7 @@ extern void WriteData(Archive *AH, const void *data, size_t dLen);
extern int StartBlob(Archive *AH, Oid oid);
extern int EndBlob(Archive *AH, Oid oid);
+extern void CommitBlobTransaction(Archive *AH);
extern void CloseArchive(Archive *AH);
extern void SetArchiveOptions(Archive *AH, DumpOptions *dopt, RestoreOptions *ropt);
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 5b9fbe8..f772410 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -68,6 +68,10 @@ typedef struct _parallelReadyList
} ParallelReadyList;
+static int blobBatchCount = 0;
+static bool blobInXact = false;
+
+
static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
const int compression, bool dosync, ArchiveMode mode,
SetupWorkerPtrType setupWorkerPtr);
@@ -259,6 +263,23 @@ OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
return (Archive *) AH;
}
+void
+CommitBlobTransaction(Archive *AHX)
+{
+ ArchiveHandle *AH = (ArchiveHandle *) AHX;
+
+ if (blobInXact)
+ {
+ ahprintf(AH, "--\n");
+ ahprintf(AH, "-- End BLOB restore batch\n");
+ ahprintf(AH, "--\n");
+ ahprintf(AH, "COMMIT;\n\n");
+
+ blobBatchCount = 0;
+ blobInXact = false;
+ }
+}
+
/* Public */
void
CloseArchive(Archive *AHX)
@@ -266,6 +287,8 @@ CloseArchive(Archive *AHX)
int res = 0;
ArchiveHandle *AH = (ArchiveHandle *) AHX;
+ CommitBlobTransaction(AHX);
+
AH->ClosePtr(AH);
/* Close the output */
@@ -3546,6 +3569,59 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
{
RestoreOptions *ropt = AH->public.ropt;
+ /* We restore BLOBs in batches to reduce XID consumption */
+ if (strcmp(te->desc, "BLOB") == 0 && AH->public.blobBatchSize > 0)
+ {
+ if (blobInXact)
+ {
+ /* We are inside a BLOB restore transaction */
+ if (blobBatchCount >= AH->public.blobBatchSize)
+ {
+ /*
+ * We did reach the batch size with the previous BLOB.
+ * Commit and start a new batch.
+ */
+ ahprintf(AH, "--\n");
+ ahprintf(AH, "-- BLOB batch size reached\n");
+ ahprintf(AH, "--\n");
+ ahprintf(AH, "COMMIT;\n");
+ ahprintf(AH, "BEGIN;\n\n");
+
+ blobBatchCount = 1;
+ }
+ else
+ {
+ /* This one still fits into the current batch */
+ blobBatchCount++;
+ }
+ }
+ else
+ {
+ /* Not inside a transaction, start a new batch */
+ ahprintf(AH, "--\n");
+ ahprintf(AH, "-- Start BLOB restore batch\n");
+ ahprintf(AH, "--\n");
+ ahprintf(AH, "BEGIN;\n\n");
+
+ blobBatchCount = 1;
+ blobInXact = true;
+ }
+ }
+ else
+ {
+ /* Not a BLOB. If we have a BLOB batch open, close it. */
+ if (blobInXact)
+ {
+ ahprintf(AH, "--\n");
+ ahprintf(AH, "-- End BLOB restore batch\n");
+ ahprintf(AH, "--\n");
+ ahprintf(AH, "COMMIT;\n\n");
+
+ blobBatchCount = 0;
+ blobInXact = false;
+ }
+ }
+
/* Select owner, schema, tablespace and default AM as necessary */
_becomeOwner(AH, te);
_selectOutputSchema(AH, te->namespace);
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index bce8d54..b327e89 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -323,6 +323,7 @@ main(int argc, char **argv)
int numWorkers = 1;
int compressLevel = -1;
int plainText = 0;
+ bool blobInParallel = false;
ArchiveFormat archiveFormat = archUnknown;
ArchiveMode archiveMode;
@@ -391,6 +392,7 @@ main(int argc, char **argv)
{"no-sync", no_argument, NULL, 7},
{"on-conflict-do-nothing", no_argument, &dopt.do_nothing, 1},
{"rows-per-insert", required_argument, NULL, 10},
+ {"blob-in-parallel", no_argument, NULL, 11},
{NULL, 0, NULL, 0}
};
@@ -607,6 +609,10 @@ main(int argc, char **argv)
dopt.dump_inserts = (int) rowsPerInsert;
break;
+ case 11: /* place "BLOB" TEs in SECTION_DATA */
+ blobInParallel = true;
+ break;
+
default:
fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
exit_nicely(1);
@@ -711,6 +717,10 @@ main(int argc, char **argv)
if (archiveFormat != archDirectory && numWorkers > 1)
fatal("parallel backup only supported by the directory format");
+ /* blobInParallel is only possible with schemaOnly */
+ if (!dopt.schemaOnly && blobInParallel)
+ fatal("blob-in-parallel only supported in schema-only dump");
+
/* Open the output file */
fout = CreateArchive(filename, archiveFormat, compressLevel, dosync,
archiveMode, setupDumpWorker);
@@ -733,6 +743,7 @@ main(int argc, char **argv)
fout->maxRemoteVersion = (PG_VERSION_NUM / 100) * 100 + 99;
fout->numWorkers = numWorkers;
+ fout->blobInParallel = blobInParallel;
/*
* Open the database using the Archiver, so it knows about it. Errors mean
@@ -1043,6 +1054,8 @@ help(const char *progname)
printf(_(" --use-set-session-authorization\n"
" use SET SESSION AUTHORIZATION commands instead of\n"
" ALTER OWNER commands to set ownership\n"));
+ printf(_(" --blob-in-parallel dump blob metadata in the parallel section\n"
+ " (requires --schema-only)\n"));
printf(_("\nConnection options:\n"));
printf(_(" -d, --dbname=DBNAME database to dump\n"));
@@ -3400,7 +3413,8 @@ dumpBlob(Archive *fout, BlobInfo *binfo)
ARCHIVE_OPTS(.tag = binfo->dobj.name,
.owner = binfo->rolname,
.description = "BLOB",
- .section = SECTION_PRE_DATA,
+ .section = fout->blobInParallel ?
+ SECTION_DATA : SECTION_PRE_DATA,
.createStmt = cquery->data,
.dropStmt = dquery->data));
diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c
index e47d0b8..43d5507 100644
--- a/src/bin/pg_dump/pg_restore.c
+++ b/src/bin/pg_dump/pg_restore.c
@@ -63,6 +63,7 @@ main(int argc, char **argv)
int c;
int exit_code;
int numWorkers = 1;
+ int blobBatchSize = 0;
Archive *AH;
char *inputFileSpec;
static int disable_triggers = 0;
@@ -124,6 +125,7 @@ main(int argc, char **argv)
{"no-publications", no_argument, &no_publications, 1},
{"no-security-labels", no_argument, &no_security_labels, 1},
{"no-subscriptions", no_argument, &no_subscriptions, 1},
+ {"blob-batch-size", required_argument, NULL, 4},
{NULL, 0, NULL, 0}
};
@@ -249,7 +251,7 @@ main(int argc, char **argv)
case 'v': /* verbose */
opts->verbose = 1;
- pg_logging_set_level(PG_LOG_INFO);
+ pg_logging_set_level(PG_LOG_DEBUG);
break;
case 'w':
@@ -284,6 +286,10 @@ main(int argc, char **argv)
set_dump_section(optarg, &(opts->dumpSections));
break;
+ case 4: /* # of blobs to restore per transaction */
+ blobBatchSize = atoi(optarg);
+ break;
+
default:
fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
exit_nicely(1);
@@ -371,6 +377,12 @@ main(int argc, char **argv)
exit_nicely(1);
}
+ if (blobBatchSize < 0)
+ {
+ pg_log_error("invalid blob batch size");
+ exit(1);
+ }
+
opts->disable_triggers = disable_triggers;
opts->enable_row_security = enable_row_security;
opts->noDataForFailedTables = no_data_for_failed_tables;
@@ -438,6 +450,7 @@ main(int argc, char **argv)
SortTocFromFile(AH);
AH->numWorkers = numWorkers;
+ AH->blobBatchSize = blobBatchSize;
if (opts->tocSummary)
PrintTOCSummary(AH);
@@ -510,6 +523,7 @@ usage(const char *progname)
printf(_(" --use-set-session-authorization\n"
" use SET SESSION AUTHORIZATION commands instead of\n"
" ALTER OWNER commands to set ownership\n"));
+ printf(_(" --blob-batch-size=NUM attempt to restore NUM BLOBs per transaction\n"));
printf(_("\nConnection options:\n"));
printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
diff --git a/src/bin/pg_upgrade/dump.c b/src/bin/pg_upgrade/dump.c
index 0ffe171..975da00 100644
--- a/src/bin/pg_upgrade/dump.c
+++ b/src/bin/pg_upgrade/dump.c
@@ -54,8 +54,9 @@ generate_old_dump(void)
parallel_exec_prog(log_file_name, NULL,
"\"%s/pg_dump\" %s --schema-only --quote-all-identifiers "
- "--binary-upgrade --format=custom %s --file=\"%s\" %s",
+ "--binary-upgrade --format=custom %s %s --file=\"%s\" %s",
new_cluster.bindir, cluster_conn_opts(&old_cluster),
+ user_opts.blob_in_parallel ? "--blob-in-parallel " : "",
log_opts.verbose ? "--verbose" : "",
sql_file_name, escaped_connstr.data);
diff --git a/src/bin/pg_upgrade/option.c b/src/bin/pg_upgrade/option.c
index a05675c..3e6c08d 100644
--- a/src/bin/pg_upgrade/option.c
+++ b/src/bin/pg_upgrade/option.c
@@ -56,6 +56,9 @@ parseCommandLine(int argc, char *argv[])
{"socketdir", required_argument, NULL, 's'},
{"verbose", no_argument, NULL, 'v'},
{"clone", no_argument, NULL, 1},
+ {"blob-batch-size", required_argument, NULL, 2},
+ {"blob-in-parallel", no_argument, NULL, 3},
+ {"restore-jobs", required_argument, NULL, 4},
{NULL, 0, NULL, 0}
};
@@ -67,6 +70,7 @@ parseCommandLine(int argc, char *argv[])
time_t run_time = time(NULL);
user_opts.transfer_mode = TRANSFER_MODE_COPY;
+ user_opts.restore_jobs = 1;
os_info.progname = get_progname(argv[0]);
@@ -209,6 +213,18 @@ parseCommandLine(int argc, char *argv[])
user_opts.transfer_mode = TRANSFER_MODE_CLONE;
break;
+ case 2:
+ user_opts.blob_batch_size = atoi(optarg);
+ break;
+
+ case 3:
+ user_opts.blob_in_parallel = true;
+ break;
+
+ case 4:
+ user_opts.restore_jobs = atoi(optarg);
+ break;
+
default:
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
os_info.progname);
@@ -309,6 +325,10 @@ usage(void)
printf(_(" -v, --verbose enable verbose internal logging\n"));
printf(_(" -V, --version display version information, then exit\n"));
printf(_(" --clone clone instead of copying files to new cluster\n"));
+ printf(_(" --blob-batch-size=NUM number of blobs to restore per transaction\n"));
+ printf(_(" --blob-in-parallel have pg_dump put the BLOB metadata into the\n"
+ "parallel section\n"));
+ printf(_(" --restore-jobs=NUM number of simultaneous processes or threads to use for pg_restore\n"));
printf(_(" -?, --help show this help, then exit\n"));
printf(_("\n"
"Before running pg_upgrade you must:\n"
diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c
index cb0aff3..c6310ed 100644
--- a/src/bin/pg_upgrade/pg_upgrade.c
+++ b/src/bin/pg_upgrade/pg_upgrade.c
@@ -388,10 +388,14 @@ create_new_objects(void)
parallel_exec_prog(log_file_name,
NULL,
"\"%s/pg_restore\" %s %s --exit-on-error --verbose "
+ "--blob-batch-size %d "
+ "--jobs %d --verbose "
"--dbname template1 \"%s\"",
new_cluster.bindir,
cluster_conn_opts(&new_cluster),
create_opts,
+ user_opts.blob_batch_size,
+ user_opts.restore_jobs,
sql_file_name);
}
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 63574b5..32c0abe 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -299,6 +299,9 @@ typedef struct
* changes */
transferMode transfer_mode; /* copy files or link them? */
int jobs; /* number of processes/threads to use */
+ int blob_batch_size;/* number of BLOBs to restore per xact */
+ bool blob_in_parallel; /* dump BLOB metadata in data section */
+ int restore_jobs; /* number of procs/threads for pg_restore */
char *socketdir; /* directory to use for Unix sockets */
} UserOpts;