On 14/09/14 18:55, Tom Lane wrote: > Are you watching the state in a loop inside a single plpgsql function? > If so, I wonder whether the problem is that the plpgsql function's > snapshot isn't changing. From memory, marking the function VOLATILE > would help if that's the issue. The function is VOLATILE. I attached 2 versions of it. fn-old.sql does not work because once a slave has disconnected it drops out and does not come back. fn.sql uses dblink to work around the problem. But it consumes 2 db connections. The intent of the function is to be called between operations that may cause slaves to lag behind. If the lag is below a certain limit, it simply returns. Otherwise, it waits until the lag drops below a second limit. If it were a VOLATILE problem, the functions would not be able to see when a slave drops out nor changes in the data. But it does see these changes. Only when a slave comes back online, it is not seen in the current transaction. Torsten
CREATE OR REPLACE FUNCTION wait_for_streaming_lag(low_water_mark BIGINT DEFAULT 1000000, high_water_mark BIGINT DEFAULT 20000000, tmout INTERVAL DEFAULT '4h') RETURNS BIGINT AS $def$ DECLARE r RECORD; water_mark BIGINT; BEGIN SET LOCAL client_min_messages TO ERROR; CREATE TEMP TABLE IF NOT EXISTS lag ( gen INT, application_name TEXT, client_addr INET, flush_location TEXT, lmd TIMESTAMP ); SET LOCAL client_min_messages TO NOTICE; water_mark := $2; -- use high_water_mark for the first loop LOOP WITH g AS (SELECT max(gen) AS gen FROM lag), r AS (SELECT 1 AS ord, application_name, client_addr, flush_location, clock_timestamp() AS lmd FROM pg_stat_replication UNION ALL SELECT 2 AS ord, application_name, client_addr, flush_location, lmd FROM lag) INSERT INTO lag SELECT coalesce(g.gen+1, 1), rx.* FROM (SELECT DISTINCT ON (application_name, client_addr) application_name, client_addr, flush_location, lmd FROM r ORDER BY application_name, client_addr, ord ASC, pg_xlog_location_diff(flush_location, '0/0') ASC) rx CROSS JOIN g; DELETE FROM lag WHERE gen<(SELECT max(gen) FROM lag); DELETE FROM lag WHERE lmd<clock_timestamp() - '5min'::INTERVAL; SELECT INTO r coalesce(max(pg_xlog_location_diff(pg_current_xlog_location(), flush_location)), 0) AS lag, clock_timestamp()-now() AS tm FROM lag; EXIT WHEN r.lag <= water_mark; IF r.tm>$3 THEN RAISE EXCEPTION USING MESSAGE='Timeout while waiting for streaming lag to drop below ' || $1, ERRCODE='TF001'; END IF; water_mark := $1; PERFORM pg_sleep(1); END LOOP; RETURN r.lag; END; $def$ LANGUAGE plpgsql VOLATILE SECURITY invoker;
BEGIN; CREATE OR REPLACE FUNCTION wait_for_streaming_lag(low_water_mark BIGINT DEFAULT 1000000, high_water_mark BIGINT DEFAULT 20000000, tmout INTERVAL DEFAULT '4h') RETURNS BIGINT AS $def$ DECLARE r RECORD; water_mark BIGINT; BEGIN -- we need dblink here because pg_stat_replication at least in 9.3, -- although it does report replicas dropping out, it does not report -- replicas reconnecting if called in a transaction. PERFORM dblink_connect('wait_for_streaming_lag', 'dbname=' || current_database() || ' application_name=wait_for_streaming_lag') WHERE NOT EXISTS (SELECT 1 FROM unnest(dblink_get_connections()) c(c) WHERE c='wait_for_streaming_lag'); SET LOCAL client_min_messages TO ERROR; CREATE TEMP TABLE IF NOT EXISTS lag ( gen INT, application_name TEXT, client_addr INET, flush_location TEXT, lmd TIMESTAMP ); SET LOCAL client_min_messages TO NOTICE; water_mark := $2; -- use high_water_mark for the first loop LOOP WITH g AS (SELECT max(gen) AS gen FROM lag), r AS (SELECT 1 AS ord, application_name, client_addr, flush_location, clock_timestamp() AS lmd FROM dblink('wait_for_streaming_lag', $$ SELECT application_name, client_addr, flush_location FROM pg_stat_replication $$) repl(application_name TEXT, client_addr INET, flush_location TEXT) UNION ALL SELECT 2 AS ord, application_name, client_addr, flush_location, lmd FROM lag) INSERT INTO lag SELECT coalesce(g.gen+1, 1), rx.* FROM (SELECT DISTINCT ON (application_name, client_addr) application_name, client_addr, flush_location, lmd FROM r ORDER BY application_name, client_addr, ord ASC, pg_xlog_location_diff(flush_location, '0/0') ASC) rx CROSS JOIN g; DELETE FROM lag WHERE gen<(SELECT max(gen) FROM lag); DELETE FROM lag WHERE lmd<clock_timestamp() - '5min'::INTERVAL; SELECT INTO r coalesce(max(pg_xlog_location_diff(pg_current_xlog_location(), flush_location)), 0) AS lag, clock_timestamp()-now() AS tm FROM lag; EXIT WHEN r.lag <= water_mark; IF r.tm>$3 THEN RAISE EXCEPTION USING MESSAGE='Timeout while waiting for streaming lag to drop below ' || $1, ERRCODE='TF001'; END IF; water_mark := $1; PERFORM pg_sleep(1); END LOOP; RETURN r.lag; END; $def$ LANGUAGE plpgsql VOLATILE SECURITY invoker; COMMIT;
-- Sent via pgsql-general mailing list (pgsql-general@xxxxxxxxxxxxxx) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-general