Search Postgresql Archives

Re: pg_upgrade and wraparound

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Resurrecting an old thread.

We (AWS) have seen this wraparound during pg_upgrade more often recently with customers who have millions of large objects in their databases.

On 6/11/18 1:14 PM, Tom Lane wrote:
Andres Freund <andres@xxxxxxxxxxx> writes:
I suspect the issue is that pg_resetwal does:
	if (set_xid != 0)
	{
		ControlFile.checkPointCopy.nextXid = set_xid;

		/*
		 * For the moment, just set oldestXid to a value that will force
		 * immediate autovacuum-for-wraparound.  It's not clear whether adding
		 * user control of this is useful, so let's just do something that's
		 * reasonably safe.  The magic constant here corresponds to the
		 * maximum allowed value of autovacuum_freeze_max_age.
		 */
		ControlFile.checkPointCopy.oldestXid = set_xid - 2000000000;
		if (ControlFile.checkPointCopy.oldestXid < FirstNormalTransactionId)
			ControlFile.checkPointCopy.oldestXid += FirstNormalTransactionId;
		ControlFile.checkPointCopy.oldestXidDB = InvalidOid;
	}

but we have codepath that doesn't check for oldestXidDB being
InvalidOid.  Not great.

Hm, I think I'd define the problem as "pg_resetwal is violating the
expectation that oldestXidDB be valid".

However, this just explains the basically-cosmetic issue that the
complaint message mentions OID 0.  It doesn't really get us to the
answer to why Alexander is seeing a failure.  It might be useful
to see pg_controldata output for the old cluster, as well as
"select datname, datfrozenxid from pg_database" output from the
old cluster.

Unfortunately I don't have pg_controldata output from the old clusters either. I would like to be able to artificially create an "old" cluster that fails during pg_upgrade in that way.

One of the things in my way is that when using pg_resetwal to put the NextXID way into the future (to push the old cluster close to wraparound for example), the postmaster won't start because it doesn't have the pg_xact files for that around. Should pg_resetwal create the files in the gap between the old NextXID and the new one?

Onw thing I do have is a patch that provides a workaround for the problem as well as a substantial speed improvement for the case at hand. This patch adds some options to pg_upgrade, pg_dump and pg_restore.

Option added to pg_dump:

    --blob-in-parallel

This option requires --schema-only. It causes pg_dump to emit the BLOB metadata with SECTION_DATA instead of SECTION_PRE_DATA. This causes the statements for creating the large object metadata (lo_create(OID) and ALTER LARGE OBJECT) to move into the parallel phase of pg_restore, which means that their metadata will be created in parallel. In my tests a database containing large objects only is upgraded in 1/#cores the time.

Option added to pg_restore:

    --blob-batch-size=N

With this option pg_restore tries to put N BLOB TOC entries into one transaction. This is per parallel worker and it will commit those batches if there is a change in object type, so only BLOB TOC entries will ever be batched at all. With a sufficient 'max_locks_per_transation' a --blob-batch-size=1000 nicely reduces the number of XIDs consumed for upgrading 10M large objects from 20M to 10K.

Options added to pg_upgrade:

    --blob-in-parallel           forwarded to pg_dump
    --blob-batch-size=N          forwarded to pg_restore
    --restore-jobs=N             forwarded as --jobs=N to pg_restore


Patch is attached.


Regards, Jan


--
Jan Wieck
Principle Database Engineer
Amazon Web Services
diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c
index c6059fc..fdcb5e7 100644
--- a/src/bin/pg_dump/parallel.c
+++ b/src/bin/pg_dump/parallel.c
@@ -865,6 +865,11 @@ RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
 	WaitForCommands(AH, pipefd);
 
 	/*
+	 * Close an eventually open BLOB batch transaction.
+	 */
+	CommitBlobTransaction((Archive *)AH);
+
+	/*
 	 * Disconnect from database and clean up.
 	 */
 	set_cancel_slot_archive(slot, NULL);
diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h
index 97941fa..2bedd02 100644
--- a/src/bin/pg_dump/pg_backup.h
+++ b/src/bin/pg_dump/pg_backup.h
@@ -201,6 +201,8 @@ typedef struct Archive
 
 	int			numWorkers;		/* number of parallel processes */
 	char	   *sync_snapshot_id;	/* sync snapshot id for parallel operation */
+	int			blobBatchSize;	/* # of blob to restore per transaction */
+	bool		blobInParallel;	/* place "BLOB" TEs in SECTION_DATA */
 
 	/* info needed for string escaping */
 	int			encoding;		/* libpq code for client_encoding */
@@ -268,6 +270,7 @@ extern void WriteData(Archive *AH, const void *data, size_t dLen);
 extern int	StartBlob(Archive *AH, Oid oid);
 extern int	EndBlob(Archive *AH, Oid oid);
 
+extern void CommitBlobTransaction(Archive *AH);
 extern void CloseArchive(Archive *AH);
 
 extern void SetArchiveOptions(Archive *AH, DumpOptions *dopt, RestoreOptions *ropt);
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 5b9fbe8..f772410 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -68,6 +68,10 @@ typedef struct _parallelReadyList
 } ParallelReadyList;
 
 
+static int		blobBatchCount = 0;
+static bool		blobInXact = false;
+
+
 static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
 							   const int compression, bool dosync, ArchiveMode mode,
 							   SetupWorkerPtrType setupWorkerPtr);
@@ -259,6 +263,23 @@ OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
 	return (Archive *) AH;
 }
 
+void
+CommitBlobTransaction(Archive *AHX)
+{
+	ArchiveHandle *AH = (ArchiveHandle *) AHX;
+
+	if (blobInXact)
+	{
+		ahprintf(AH, "--\n");
+		ahprintf(AH, "-- End BLOB restore batch\n");
+		ahprintf(AH, "--\n");
+		ahprintf(AH, "COMMIT;\n\n");
+
+		blobBatchCount = 0;
+		blobInXact = false;
+	}
+}
+
 /* Public */
 void
 CloseArchive(Archive *AHX)
@@ -266,6 +287,8 @@ CloseArchive(Archive *AHX)
 	int			res = 0;
 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
 
+	CommitBlobTransaction(AHX);
+
 	AH->ClosePtr(AH);
 
 	/* Close the output */
@@ -3546,6 +3569,59 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
 {
 	RestoreOptions *ropt = AH->public.ropt;
 
+	/* We restore BLOBs in batches to reduce XID consumption */
+	if (strcmp(te->desc, "BLOB") == 0 && AH->public.blobBatchSize > 0)
+	{
+		if (blobInXact)
+		{
+			/* We are inside a BLOB restore transaction */
+			if (blobBatchCount >= AH->public.blobBatchSize)
+			{
+				/*
+				 * We did reach the batch size with the previous BLOB.
+				 * Commit and start a new batch.
+				 */
+				ahprintf(AH, "--\n");
+				ahprintf(AH, "-- BLOB batch size reached\n");
+				ahprintf(AH, "--\n");
+				ahprintf(AH, "COMMIT;\n");
+				ahprintf(AH, "BEGIN;\n\n");
+
+				blobBatchCount = 1;
+			}
+			else
+			{
+				/* This one still fits into the current batch */
+				blobBatchCount++;
+			}
+		}
+		else
+		{
+			/* Not inside a transaction, start a new batch */
+			ahprintf(AH, "--\n");
+			ahprintf(AH, "-- Start BLOB restore batch\n");
+			ahprintf(AH, "--\n");
+			ahprintf(AH, "BEGIN;\n\n");
+
+			blobBatchCount = 1;
+			blobInXact = true;
+		}
+	}
+	else
+	{
+		/* Not a BLOB. If we have a BLOB batch open, close it. */
+		if (blobInXact)
+		{
+			ahprintf(AH, "--\n");
+			ahprintf(AH, "-- End BLOB restore batch\n");
+			ahprintf(AH, "--\n");
+			ahprintf(AH, "COMMIT;\n\n");
+
+			blobBatchCount = 0;
+			blobInXact = false;
+		}
+	}
+
 	/* Select owner, schema, tablespace and default AM as necessary */
 	_becomeOwner(AH, te);
 	_selectOutputSchema(AH, te->namespace);
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index bce8d54..b327e89 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -323,6 +323,7 @@ main(int argc, char **argv)
 	int			numWorkers = 1;
 	int			compressLevel = -1;
 	int			plainText = 0;
+	bool		blobInParallel = false;
 	ArchiveFormat archiveFormat = archUnknown;
 	ArchiveMode archiveMode;
 
@@ -391,6 +392,7 @@ main(int argc, char **argv)
 		{"no-sync", no_argument, NULL, 7},
 		{"on-conflict-do-nothing", no_argument, &dopt.do_nothing, 1},
 		{"rows-per-insert", required_argument, NULL, 10},
+		{"blob-in-parallel", no_argument, NULL, 11},
 
 		{NULL, 0, NULL, 0}
 	};
@@ -607,6 +609,10 @@ main(int argc, char **argv)
 				dopt.dump_inserts = (int) rowsPerInsert;
 				break;
 
+			case 11:			/* place "BLOB" TEs in SECTION_DATA */
+				blobInParallel = true;
+				break;
+
 			default:
 				fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
 				exit_nicely(1);
@@ -711,6 +717,10 @@ main(int argc, char **argv)
 	if (archiveFormat != archDirectory && numWorkers > 1)
 		fatal("parallel backup only supported by the directory format");
 
+	/* blobInParallel is only possible with schemaOnly */
+	if (!dopt.schemaOnly && blobInParallel)
+		fatal("blob-in-parallel only supported in schema-only dump");
+
 	/* Open the output file */
 	fout = CreateArchive(filename, archiveFormat, compressLevel, dosync,
 						 archiveMode, setupDumpWorker);
@@ -733,6 +743,7 @@ main(int argc, char **argv)
 	fout->maxRemoteVersion = (PG_VERSION_NUM / 100) * 100 + 99;
 
 	fout->numWorkers = numWorkers;
+	fout->blobInParallel = blobInParallel;
 
 	/*
 	 * Open the database using the Archiver, so it knows about it. Errors mean
@@ -1043,6 +1054,8 @@ help(const char *progname)
 	printf(_("  --use-set-session-authorization\n"
 			 "                               use SET SESSION AUTHORIZATION commands instead of\n"
 			 "                               ALTER OWNER commands to set ownership\n"));
+	printf(_("  --blob-in-parallel           dump blob metadata in the parallel section\n"
+			 "                               (requires --schema-only)\n"));
 
 	printf(_("\nConnection options:\n"));
 	printf(_("  -d, --dbname=DBNAME      database to dump\n"));
@@ -3400,7 +3413,8 @@ dumpBlob(Archive *fout, BlobInfo *binfo)
 					 ARCHIVE_OPTS(.tag = binfo->dobj.name,
 								  .owner = binfo->rolname,
 								  .description = "BLOB",
-								  .section = SECTION_PRE_DATA,
+								  .section = fout->blobInParallel ?
+											 SECTION_DATA : SECTION_PRE_DATA,
 								  .createStmt = cquery->data,
 								  .dropStmt = dquery->data));
 
diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c
index e47d0b8..43d5507 100644
--- a/src/bin/pg_dump/pg_restore.c
+++ b/src/bin/pg_dump/pg_restore.c
@@ -63,6 +63,7 @@ main(int argc, char **argv)
 	int			c;
 	int			exit_code;
 	int			numWorkers = 1;
+	int			blobBatchSize = 0;
 	Archive    *AH;
 	char	   *inputFileSpec;
 	static int	disable_triggers = 0;
@@ -124,6 +125,7 @@ main(int argc, char **argv)
 		{"no-publications", no_argument, &no_publications, 1},
 		{"no-security-labels", no_argument, &no_security_labels, 1},
 		{"no-subscriptions", no_argument, &no_subscriptions, 1},
+		{"blob-batch-size", required_argument, NULL, 4},
 
 		{NULL, 0, NULL, 0}
 	};
@@ -249,7 +251,7 @@ main(int argc, char **argv)
 
 			case 'v':			/* verbose */
 				opts->verbose = 1;
-				pg_logging_set_level(PG_LOG_INFO);
+				pg_logging_set_level(PG_LOG_DEBUG);
 				break;
 
 			case 'w':
@@ -284,6 +286,10 @@ main(int argc, char **argv)
 				set_dump_section(optarg, &(opts->dumpSections));
 				break;
 
+			case 4:				/* # of blobs to restore per transaction */
+				blobBatchSize = atoi(optarg);
+				break;
+
 			default:
 				fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
 				exit_nicely(1);
@@ -371,6 +377,12 @@ main(int argc, char **argv)
 		exit_nicely(1);
 	}
 
+	if (blobBatchSize < 0)
+	{
+		pg_log_error("invalid blob batch size");
+		exit(1);
+	}
+
 	opts->disable_triggers = disable_triggers;
 	opts->enable_row_security = enable_row_security;
 	opts->noDataForFailedTables = no_data_for_failed_tables;
@@ -438,6 +450,7 @@ main(int argc, char **argv)
 		SortTocFromFile(AH);
 
 	AH->numWorkers = numWorkers;
+	AH->blobBatchSize = blobBatchSize;
 
 	if (opts->tocSummary)
 		PrintTOCSummary(AH);
@@ -510,6 +523,7 @@ usage(const char *progname)
 	printf(_("  --use-set-session-authorization\n"
 			 "                               use SET SESSION AUTHORIZATION commands instead of\n"
 			 "                               ALTER OWNER commands to set ownership\n"));
+	printf(_("  --blob-batch-size=NUM        attempt to restore NUM BLOBs per transaction\n"));
 
 	printf(_("\nConnection options:\n"));
 	printf(_("  -h, --host=HOSTNAME      database server host or socket directory\n"));
diff --git a/src/bin/pg_upgrade/dump.c b/src/bin/pg_upgrade/dump.c
index 0ffe171..975da00 100644
--- a/src/bin/pg_upgrade/dump.c
+++ b/src/bin/pg_upgrade/dump.c
@@ -54,8 +54,9 @@ generate_old_dump(void)
 
 		parallel_exec_prog(log_file_name, NULL,
 						   "\"%s/pg_dump\" %s --schema-only --quote-all-identifiers "
-						   "--binary-upgrade --format=custom %s --file=\"%s\" %s",
+						   "--binary-upgrade --format=custom %s %s --file=\"%s\" %s",
 						   new_cluster.bindir, cluster_conn_opts(&old_cluster),
+						   user_opts.blob_in_parallel ? "--blob-in-parallel " : "",
 						   log_opts.verbose ? "--verbose" : "",
 						   sql_file_name, escaped_connstr.data);
 
diff --git a/src/bin/pg_upgrade/option.c b/src/bin/pg_upgrade/option.c
index a05675c..3e6c08d 100644
--- a/src/bin/pg_upgrade/option.c
+++ b/src/bin/pg_upgrade/option.c
@@ -56,6 +56,9 @@ parseCommandLine(int argc, char *argv[])
 		{"socketdir", required_argument, NULL, 's'},
 		{"verbose", no_argument, NULL, 'v'},
 		{"clone", no_argument, NULL, 1},
+		{"blob-batch-size", required_argument, NULL, 2},
+		{"blob-in-parallel", no_argument, NULL, 3},
+		{"restore-jobs", required_argument, NULL, 4},
 
 		{NULL, 0, NULL, 0}
 	};
@@ -67,6 +70,7 @@ parseCommandLine(int argc, char *argv[])
 	time_t		run_time = time(NULL);
 
 	user_opts.transfer_mode = TRANSFER_MODE_COPY;
+	user_opts.restore_jobs = 1;
 
 	os_info.progname = get_progname(argv[0]);
 
@@ -209,6 +213,18 @@ parseCommandLine(int argc, char *argv[])
 				user_opts.transfer_mode = TRANSFER_MODE_CLONE;
 				break;
 
+			case 2:
+				user_opts.blob_batch_size = atoi(optarg);
+				break;
+
+			case 3:
+				user_opts.blob_in_parallel = true;
+				break;
+
+			case 4:
+				user_opts.restore_jobs = atoi(optarg);
+				break;
+
 			default:
 				fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
 						os_info.progname);
@@ -309,6 +325,10 @@ usage(void)
 	printf(_("  -v, --verbose                 enable verbose internal logging\n"));
 	printf(_("  -V, --version                 display version information, then exit\n"));
 	printf(_("  --clone                       clone instead of copying files to new cluster\n"));
+	printf(_("  --blob-batch-size=NUM         number of blobs to restore per transaction\n"));
+	printf(_("  --blob-in-parallel            have pg_dump put the BLOB metadata into the\n"
+			 "parallel section\n"));
+	printf(_("  --restore-jobs=NUM            number of simultaneous processes or threads to use for pg_restore\n"));
 	printf(_("  -?, --help                    show this help, then exit\n"));
 	printf(_("\n"
 			 "Before running pg_upgrade you must:\n"
diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c
index cb0aff3..c6310ed 100644
--- a/src/bin/pg_upgrade/pg_upgrade.c
+++ b/src/bin/pg_upgrade/pg_upgrade.c
@@ -388,10 +388,14 @@ create_new_objects(void)
 		parallel_exec_prog(log_file_name,
 						   NULL,
 						   "\"%s/pg_restore\" %s %s --exit-on-error --verbose "
+						   "--blob-batch-size %d "
+						   "--jobs %d --verbose "
 						   "--dbname template1 \"%s\"",
 						   new_cluster.bindir,
 						   cluster_conn_opts(&new_cluster),
 						   create_opts,
+						   user_opts.blob_batch_size,
+						   user_opts.restore_jobs,
 						   sql_file_name);
 	}
 
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 63574b5..32c0abe 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -299,6 +299,9 @@ typedef struct
 								 * changes */
 	transferMode transfer_mode; /* copy files or link them? */
 	int			jobs;			/* number of processes/threads to use */
+	int			blob_batch_size;/* number of BLOBs to restore per xact */
+	bool		blob_in_parallel; /* dump BLOB metadata in data section */
+	int			restore_jobs;	/* number of procs/threads for pg_restore */
 	char	   *socketdir;		/* directory to use for Unix sockets */
 } UserOpts;
 

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Index of Archives]     [Postgresql Jobs]     [Postgresql Admin]     [Postgresql Performance]     [Linux Clusters]     [PHP Home]     [PHP on Windows]     [Kernel Newbies]     [PHP Classes]     [PHP Databases]     [Postgresql & PHP]     [Yosemite]

  Powered by Linux