Hi everyone,
When using logical replication with the pgoutput plugin, on PG 16,we do the following:
1) SELECT * FROM pg_logical_slot_peek_binary_changes('test_slot_v1', null, null,'publication_names', 'cdc', 'proto_version', '4', 'streaming', 'false')
2) Get LSN of last row (Commit)
3) SELECT * FROM pg_replication_slot_advance('test_slot_v1', <Commit LSN>);
4) Repeat.
And this works perfectly fine when streaming = false. When turning on streaming the expectation is that the same thing happens, except the the LSN being passed to pg_replication_slot_advance() is for a Stream End record. On the next call to pg_logical_slot_peek_binary_changes() we should get the subsequent Stream Start record. But instead, the stream starts over from the transaction Begin record. Observe:
*** Demo starts ***
*** Initially there are no changes, peek() returns nothing: ***
=> SELECT * FROM pg_logical_slot_peek_binary_changes('test_slot_v1', null, null,'publication_names', 'cdc', 'proto_version', '4', 'streaming', 'true') WHERE SUBSTRING(data, 1,1) NOT IN ('\x49', '\x44');
lsn | xid | data
-----+-----+------
(0 rows)
*** Slot status: ***
=> SELECT slot_name, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots; slot_name | active | restart_lsn | confirmed_flush_lsn
--------------+--------+-------------+---------------------
test_slot_v1 | f | 2/98CE060 | 2/98CE060
(1 rows)
*** Now make some changes (delete then insert a bunch of records) and call peek() ***
*** The predicate filters out Delete and Insert records, leaving Stream Start (\x53 = S), ***
*** Relation (\x52 = R), Stream End (\x45 = E), and Stream Commit (\x63 = c) ***
abinitio=> SELECT * FROM pg_logical_slot_peek_binary_changes('test_slot_v1', null, null,'publication_names', 'cdc', 'proto_version', '4', 'streaming', 'true') WHERE SUBSTRING(data, 1,1) NOT IN ('\x49', '\x44');
lsn | xid | data
------------+------+--------------------------------------------------------------------------------------------------------------------------------------
2/A222A20 | 1112 | \x530000045801
2/A222A20 | 1112 | \x52000004590000402a7075626c6963007265706c69636174696f6e5f746573745f7631006400020169640000000017ffffffff006e616d650000000019ffffffff
2/C141BE8 | 1112 | \x45
2/C141C28 | 1112 | \x530000045800
2/DF598D8 | 1112 | \x45
2/DF59950 | 1112 | \x630000045800000000020df59918000000020df599500002aca72900f8a8
2/DF59950 | 1114 | \x530000045a01
2/DF59950 | 1114 | \x520000045a0000402a7075626c6963007265706c69636174696f6e5f746573745f7631006400020169640000000017ffffffff006e616d650000000019ffffffff
2/108918D0 | 1114 | \x45
2/108918D0 | 1114 | \x530000045a00
2/131E1310 | 1114 | \x45
2/131E1310 | 1114 | \x530000045a00
2/137D7768 | 1114 | \x45
2/137E8448 | 1114 | \x630000045a0000000002137e841800000002137e84480002aca729812c96
(14 rows)
*** It was a peek() so the status is unchanged: ***
=> SELECT slot_name, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots; slot_name | active | restart_lsn | confirmed_flush_lsn
--------------+--------+-------------+---------------------
test_slot_v1 | f | 2/98CE060 | 2/98CE060
(1 rows)
*** Now advance the slot to the first Stream End record: ***
=> SELECT * FROM pg_replication_slot_advance('test_slot_v1', '2/C141BE8'); slot_name | end_lsn
--------------+-----------
test_slot_v1 | 2/C141BE8
(1 row)
*** confirmed_flush_lsn is updated as expected: ****
=> SELECT slot_name, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;
slot_name | active | restart_lsn | confirmed_flush_lsn
--------------+--------+-------------+---------------------
test_slot_v1 | f | 2/9B09D10 | 2/C141BE8
(1 rows)
*** Now peek() again. It is starting from earlier than confirmed_flush_lsn: ***
=> SELECT * FROM pg_logical_slot_peek_binary_changes('test_slot_v1', null, null,'publication_names', 'cdc', 'proto_version', '4', 'streaming', 'true') WHERE SUBSTRING(data, 1,1) NOT IN ('\x49', '\x44');
lsn | xid | data
------------+------+--------------------------------------------------------------------------------------------------------------------------------------
2/A222A20 | 1112 | \x530000045801
2/A222A20 | 1112 | \x52000004590000402a7075626c6963007265706c69636174696f6e5f746573745f7631006400020169640000000017ffffffff006e616d650000000019ffffffff
2/C141BE8 | 1112 | \x45
2/C141C28 | 1112 | \x530000045800
2/DF598D8 | 1112 | \x45
2/DF59950 | 1112 | \x630000045800000000020df59918000000020df599500002aca72900f8a8
2/DF59950 | 1114 | \x530000045a01
2/DF59950 | 1114 | \x520000045a0000402a7075626c6963007265706c69636174696f6e5f746573745f7631006400020169640000000017ffffffff006e616d650000000019ffffffff
2/108918D0 | 1114 | \x45
2/108918D0 | 1114 | \x530000045a00
2/131E1310 | 1114 | \x45
2/131E1310 | 1114 | \x530000045a00
2/137D7768 | 1114 | \x45
2/137E8448 | 1114 | \x630000045a0000000002137e841800000002137e84480002aca729812c96
(14 rows)
*** Next advance to the Stream Commit record: ***
=> SELECT * FROM pg_replication_slot_advance('test_slot_v1', '2/DF59950'); slot_name | end_lsn
--------------+-----------
test_slot_v1 | 2/DF59950
(1 row)
*** This time the peek() starts from the correct LSN: ***
=> SELECT * FROM pg_logical_slot_peek_binary_changes('test_slot_v1', null, null,'publication_names', 'cdc', 'proto_version', '4', 'streaming', 'true') WHERE SUBSTRING(data, 1,1) NOT IN ('\x49', '\x44');
lsn | xid | data
------------+------+--------------------------------------------------------------------------------------------------------------------------------------
2/DF59950 | 1114 | \x530000045a01
2/DF59950 | 1114 | \x520000045a0000402a7075626c6963007265706c69636174696f6e5f746573745f7631006400020169640000000017ffffffff006e616d650000000019ffffffff
2/108918D0 | 1114 | \x45
2/108918D0 | 1114 | \x530000045a00
2/131E1310 | 1114 | \x45
2/131E1310 | 1114 | \x530000045a00
2/137D7768 | 1114 | \x45
2/137E8448 | 1114 | \x630000045a0000000002137e841800000002137e84480002aca729812c96
(8 rows)
*** End of demo ***
The question is whether that is by design or a bug, and if by design maybe someone can explain how this is meant to be used, because it's not clear. It will work eventually if argument
upto_nchanges
is NULL, because when the transaction completes we get a Stream Commit record and can advance, but in the meantime we'll have ingested a lot of duplicate records we now have to deal with. And if argument upto_nchanges
is not NULL we're stuck because peek() will only returns one or more Stream blocks until the number of returned rows exceeds upto_nchanges
, and then returns the same blocks over and over again forever because we cannot advance, and never see the Stream Commit record.Thank you.
Guillaume.