Oh Wow, i guess you are right.
I just ran example where local runs make use of parallel setup, but not FDW.
i have three servers
2 x pg10
1 x pg11
i run queries on coordinator node ( pg11 ) which makes calls to foreign server to do a simple count.
the individual nodes run the query in parallel, the setup is repeatable. but via FDW it runs a simple seq scan.
i guess this is for the same reason as you mentioned wrt declared cursors.
on pg11
create schema pg10;
create schema pg10_qa;
import foreign schema pg10 from server pg10 into pg10;
import foreign schema pg10_qa from server pg10_qa into pg10_qa;
explain (analyze,verbose) SELECT COUNT(1) FROM pg10.tbl_ItemTransactions; ----this query is via FDW
QUERY PLAN
----------------------------------------------------------------------------------------------------
Foreign Scan (cost=108.53..152.69 rows=1 width=8) (actual time=6584.498..6584.500 rows=1 loops=1)
Output: (count(1))
Relations: Aggregate on (pg10.tbl_itemtransactions)
Remote SQL: SELECT count(1) FROM pg10.tbl_itemtransactions
Planning Time: 0.112 ms
Execution Time: 6585.435 ms
(6 rows)
2019-02-18 09:56:48 UTC LOG: duration: 6593.046 ms plan:
Query Text: DECLARE c1 CURSOR FOR
SELECT count(1) FROM pg10.tbl_itemtransactions
Aggregate (cost=768694.80..768694.81 rows=1 width=8) (actual time=6593.039..6593.039 rows=1 loops=1)
Output: count(1)
Buffers: shared hit=259476
-> Seq Scan on pg10.tbl_itemtransactions (cost=0.00..666851.04 rows=40737504 width=0) (actual time=0.024..3389.245 rows=40737601 loops=1)
Output: tranid, transactiondate, transactionname
Buffers: shared hit=259476
--------
on pg10 (1) -- foreign server pg10
create schema pg10;
CREATE TABLE pg10.tbl_ItemTransactions
(
TranID SERIAL
,TransactionDate TIMESTAMPTZ
,TransactionName TEXT
);
INSERT INTO pg10.tbl_ItemTransactions
(TransactionDate, TransactionName)
SELECT x, 'dbrnd'
FROM generate_series('2014-01-01 00:00:00'::timestamptz, '2016-08-01 00:00:00'::timestamptz,'2 seconds'::interval) a(x);
explain analyze SELECT count(1) FROM pg10.tbl_itemtransactions; --this query is local
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=472650.72..472650.73 rows=1 width=8) (actual time=2576.053..2576.054 rows=1 loops=1)
-> Gather (cost=472650.50..472650.71 rows=2 width=8) (actual time=2575.721..2626.980 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=471650.50..471650.51 rows=1 width=8) (actual time=2569.302..2569.302 rows=1 loops=3)
-> Parallel Seq Scan on tbl_itemtransactions (cost=0.00..429215.60 rows=16973960 width=0) (actual time=0.048..1492.144 rows=13579200 loops=3)
Planning time: 0.405 ms
Execution time: 2627.455 ms
(8 rows)
--------
on pg10 (2) -- foreign server pg10_qa
create schema pg10_qa;
CREATE TABLE pg10_qa.tbl_ItemTransactions
(
TranID SERIAL
,TransactionDate TIMESTAMPTZ
,TransactionName TEXT
);
INSERT INTO pg10_qa.tbl_ItemTransactions
(TransactionDate, TransactionName)
SELECT x, 'dbrnd'
FROM generate_series('2014-01-01 00:00:00'::timestamptz, '2016-08-01 00:00:00'::timestamptz,'2 seconds'::interval) a(x);
explain analyze SELECT count(1) FROM pg10_qa.tbl_itemtransactions; -- this query is local
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=472650.72..472650.73 rows=1 width=8) (actual time=2568.469..2568.469 rows=1 loops=1)
-> Gather (cost=472650.50..472650.71 rows=2 width=8) (actual time=2568.067..2613.006 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=471650.50..471650.51 rows=1 width=8) (actual time=2563.893..2563.893 rows=1 loops=3)
-> Parallel Seq Scan on tbl_itemtransactions (cost=0.00..429215.60 rows=16973960 width=0) (actual time=0.017..1388.417 rows=13579200 loops=3)
Planning time: 0.048 ms
Execution time: 2613.246 ms
(8 rows)
Vijay
On Mon, Feb 18, 2019 at 3:07 AM Jeff Janes <jeff.janes@xxxxxxxxx> wrote:
On Sun, Feb 17, 2019 at 6:32 AM Vijaykumar Jain <vjain@xxxxxxxxxxxxx> wrote:I am yet to figure out the reason, what we have done is implement fake columns to represent samples and giving them random numbers and keeping other bulls to fake limit.Most of the queries that were impacted were the ones that did not push order by and limit to foreign servers.I am also trying to upgrade pg11 to make use of parallelisation.postgres_fdw operates through declared cursors, and declared cursors inhibit parallel query. This doesn't change in v11, see https://www.postgresql.org/docs/11/when-can-parallel-query-be-used.htmlI'm not aware of any other changes in v11 that are likely to help you out.Cheers,Jeff
on mgmt drop table if exists "flight_bookings"; drop table if exists "hotel_bookings"; drop table if exists "users"; drop table if exists "cities"; # partitions would be on shards create table "hotel_bookings" (id serial, user_id int, booked_at timestamp, city_name text, continent text, flight_id int) partition by list (continent); create table "flight_bookings" (id serial, user_id int, booked_at timestamp, from_city text, from_continent text, to_city text, to_continent text) partition by list (to_continent); # non partitioned table, but hotel_bookings and flight_bookings refer it create table "users" (id serial, name text, age int); # create foreign tables which are on shards create foreign table if not exists "flight_bookings1" partition of flight_bookings for values in ('Asia', 'Oceania', 'North America') server gc_bi_shard_1_primary; create foreign table if not exists "hotel_bookings1" partition of hotel_bookings for values in ('Asia', 'Oceania', 'North America') server gc_bi_shard_1_primary; create foreign table if not exists "flight_bookings2" partition of flight_bookings for values in ('Europe', 'Africa', 'South and Central America') server gc_bi_shard_2_primary; create foreign table if not exists "hotel_bookings2" partition of hotel_bookings for values in ('Europe', 'Africa', 'South and Central America') server gc_bi_shard_2_primary; # for demo purpose, create an unpartitioned table create table if not exists hotel_bookings_u (like hotel_bookings); create table if not exists flight_bookings_u (like flight_bookings); #load data in unpartitioned table first copy flight_bookings_u from '/var/tmp/pgconf-asia-demo/testdata/flight_bookings.csv' (format csv); copy hotel_bookings_u from '/var/tmp/pgconf-asia-demo/testdata/hotel_bookings.csv' (format csv); copy users from '/var/tmp/pgconf-asia-demo/testdata/users.csv' (format csv); #then load the foreign partitoned tables insert into hotel_bookings select * from hotel_bookings_u; insert into flight_bookings select * from flight_bookings_u; # on shard1 create partitioned tables hotel_bookings1 and flight_bookings1 and further subpartition by booked_at create table "hotel_bookings1" (id int, user_id int, booked_at timestamp, city_name text, continent text, flight_id int) partition by range (booked_at); create table "flight_bookings1" (id int, user_id int, booked_at timestamp, from_city text, from_continent text, to_city text, to_continent text) partition by range (booked_at); # subpartition hotel_bookings1 by booked_at create table hotel_bookings1_q1 partition of hotel_bookings1 for values from ('2017-01-01') to ('2017-04-01'); create table hotel_bookings1_q2 partition of hotel_bookings1 for values from ('2017-04-01') to ('2017-07-01'); create table hotel_bookings1_q3 partition of hotel_bookings1 for values from ('2017-07-01') to ('2017-10-01'); create table hotel_bookings1_q4 partition of hotel_bookings1 for values from ('2017-10-01') to ('2018-01-01'); create table hotel_bookings1_default partition of hotel_bookings1 DEFAULT; # subpartition flight_bookings1 by booked_at create table flight_bookings1_q1 partition of flight_bookings1 for values from ('2017-01-01') to ('2017-04-01'); create table flight_bookings1_q2 partition of flight_bookings1 for values from ('2017-04-01') to ('2017-07-01'); create table flight_bookings1_q3 partition of flight_bookings1 for values from ('2017-07-01') to ('2017-10-01'); create table flight_bookings1_q4 partition of flight_bookings1 for values from ('2017-10-01') to ('2018-01-01'); create table flight_bookings1_default partition of flight_bookings1 DEFAULT; # on shard2 create partitioned tables hotel_bookings2 and flight_bookings2 and further subpartition by booked_at drop table flight_bookings2; drop table hotel_bookings2; create table "hotel_bookings2" (id int, user_id int, booked_at timestamp, city_name text, continent text, flight_id int) partition by range (booked_at); create table "flight_bookings2" (id int, user_id int, booked_at timestamp, from_city text, from_continent text, to_city text, to_continent text) partition by range (booked_at); # subpartition hotel_bookings2 by booked_at create table hotel_bookings2_q1 partition of hotel_bookings2 for values from ('2017-01-01') to ('2017-04-01'); create table hotel_bookings2_q2 partition of hotel_bookings2 for values from ('2017-04-01') to ('2017-07-01'); create table hotel_bookings2_q3 partition of hotel_bookings2 for values from ('2017-07-01') to ('2017-10-01'); create table hotel_bookings2_q4 partition of hotel_bookings2 for values from ('2017-10-01') to ('2018-01-01'); create table hotel_bookings2_default partition of hotel_bookings2 DEFAULT; # subpartition flight_bookings2 by booked_at create table flight_bookings2_q1 partition of flight_bookings2 for values from ('2017-01-01') to ('2017-04-01'); create table flight_bookings2_q2 partition of flight_bookings2 for values from ('2017-04-01') to ('2017-07-01'); create table flight_bookings2_q3 partition of flight_bookings2 for values from ('2017-07-01') to ('2017-10-01'); create table flight_bookings2_q4 partition of flight_bookings2 for values from ('2017-10-01') to ('2018-01-01'); create table flight_bookings2_default partition of flight_bookings2 DEFAULT; # no filter conditions, scans all foreign tables explain select * from flight_bookings join hotel_bookings on flight_bookings.id = hotel_bookings.flight_id join users on flight_bookings.user_id = users.id; QUERY PLAN ----------------------------------------------------------------------------------------------------------- Merge Join (cost=6029.95..17762.67 rows=761410 width=269) Merge Cond: (hotel_bookings2.flight_id = flight_bookings2.id) -> Merge Append (cost=830.18..1071.98 rows=7440 width=47) Sort Key: hotel_bookings2.flight_id -> Foreign Scan on hotel_bookings2 (cost=339.72..404.81 rows=2893 width=49) -> Foreign Scan on hotel_bookings1 (cost=490.45..592.76 rows=4547 width=46) -> Materialize (cost=5199.77..5302.11 rows=20468 width=185) -> Sort (cost=5199.77..5250.94 rows=20468 width=185) Sort Key: flight_bookings2.id -> Hash Join (cost=419.00..1844.16 rows=20468 width=185) Hash Cond: (flight_bookings2.user_id = users.id) -> Append (cost=100.00..1243.72 rows=20468 width=54) -> Foreign Scan on flight_bookings2 (cost=100.00..453.89 rows=7654 width=56) -> Foreign Scan on flight_bookings1 (cost=100.00..687.49 rows=12814 width=53) -> Hash (cost=194.00..194.00 rows=10000 width=41) -> Seq Scan on users (cost=0.00..194.00 rows=10000 width=41) (16 rows) # continent filter restricts the scan to shard1 (eliminated shard2) and then filter on booked_at further restricts scan to q1 partition on tables on shard1 explain select * from flight_bookings join hotel_bookings on flight_bookings.id = hotel_bookings.flight_id join users on flight_bookings.user_id = users.id where flight_bookings.to_continent = 'Asia' and hotel_bookings.continent = 'Asia' and hotel_bookings.booked_at <= '2017-01-02'::timestamp and flight_bookings.booked_at <= '2017-01-02'::timestamp; QUERY PLAN -------------------------------------------------------------------------------------------------- Nested Loop (cost=299.43..575.88 rows=4 width=269) Join Filter: (flight_bookings1.id = hotel_bookings1.flight_id) -> Hash Join (cost=199.43..431.08 rows=15 width=185) Hash Cond: (users.id = flight_bookings1.user_id) -> Seq Scan on users (cost=0.00..194.00 rows=10000 width=41) -> Hash (cost=199.25..199.25 rows=15 width=59) -> Append (cost=100.00..199.25 rows=15 width=59) -> Foreign Scan on flight_bookings1 (cost=100.00..199.17 rows=15 width=59) -> Materialize (cost=100.00..143.91 rows=4 width=50) -> Append (cost=100.00..143.89 rows=4 width=50) -> Foreign Scan on hotel_bookings1 (cost=100.00..143.87 rows=4 width=50) (11 rows) # continent filter restricts the scan to shard1 (eliminated shard2) explain select * from flight_bookings join hotel_bookings on flight_bookings.id = hotel_bookings.flight_id join users on flight_bookings.user_id = users.id where flight_bookings.to_continent = 'Asia' and hotel_bookings.continent = 'Asia'; QUERY PLAN ---------------------------------------------------------------------------------------------------- Merge Join (cost=1582.79..2244.21 rows=40517 width=269) Merge Cond: (hotel_bookings1.flight_id = flight_bookings1.id) -> Merge Append (cost=291.62..341.45 rows=1533 width=39) Sort Key: hotel_bookings1.flight_id -> Foreign Scan on hotel_bookings1 (cost=291.61..326.11 rows=1533 width=39) -> Sort (cost=1291.17..1304.38 rows=5286 width=185) Sort Key: flight_bookings1.id -> Hash Join (cost=419.00..964.28 rows=5286 width=185) Hash Cond: (flight_bookings1.user_id = users.id) -> Append (cost=100.00..572.60 rows=5286 width=53) -> Foreign Scan on flight_bookings1 (cost=100.00..546.17 rows=5286 width=53) -> Hash (cost=194.00..194.00 rows=10000 width=41) -> Seq Scan on users (cost=0.00..194.00 rows=10000 width=41) (13 rows) # on shard1, no predicates but on join results in all partition scans explain select * from flight_bookings1 join hotel_bookings1 on flight_bookings1.id = hotel_bookings1.flight_id; QUERY PLAN --------------------------------------------------------------------------------------------- Hash Join (cost=171.04..659.85 rows=4547 width=99) Hash Cond: (flight_bookings1_q1.id = hotel_bookings1_q1.flight_id) -> Append (cost=0.00..331.21 rows=12814 width=53) -> Seq Scan on flight_bookings1_q1 (cost=0.00..65.83 rows=3183 width=53) -> Seq Scan on flight_bookings1_q2 (cost=0.00..65.73 rows=3173 width=53) -> Seq Scan on flight_bookings1_q3 (cost=0.00..67.21 rows=3221 width=53) -> Seq Scan on flight_bookings1_q4 (cost=0.00..67.35 rows=3235 width=53) -> Seq Scan on flight_bookings1_default (cost=0.00..1.02 rows=2 width=66) -> Hash (cost=114.20..114.20 rows=4547 width=46) -> Append (cost=0.00..114.20 rows=4547 width=46) -> Seq Scan on hotel_bookings1_q1 (cost=0.00..18.31 rows=931 width=39) -> Seq Scan on hotel_bookings1_q2 (cost=0.00..18.54 rows=954 width=39) -> Seq Scan on hotel_bookings1_q3 (cost=0.00..19.12 rows=1012 width=39) -> Seq Scan on hotel_bookings1_q4 (cost=0.00..18.30 rows=930 width=39) -> Seq Scan on hotel_bookings1_default (cost=0.00..17.20 rows=720 width=84) (15 rows) # on shard1, with predicates using booked_at on table flight_bookings1, scan restricted to sub partition (q1 and default) but all partitions for hotel_bookings1 gc_bi_shard_1=> explain select * from flight_bookings1 join hotel_bookings1 on flight_bookings1.id = hotel_bookings1.flight_id and flight_bookings1.booked_at <= '2017-01-02'::timestamp; QUERY PLAN ----------------------------------------------------------------------------------------------- Hash Join (cost=75.42..207.09 rows=41 width=99) Hash Cond: (hotel_bookings1_q1.flight_id = flight_bookings1_q1.id) -> Append (cost=0.00..114.20 rows=4547 width=46) -> Seq Scan on hotel_bookings1_q1 (cost=0.00..18.31 rows=931 width=39) -> Seq Scan on hotel_bookings1_q2 (cost=0.00..18.54 rows=954 width=39) -> Seq Scan on hotel_bookings1_q3 (cost=0.00..19.12 rows=1012 width=39) -> Seq Scan on hotel_bookings1_q4 (cost=0.00..18.30 rows=930 width=39) -> Seq Scan on hotel_bookings1_default (cost=0.00..17.20 rows=720 width=84) -> Hash (cost=74.99..74.99 rows=35 width=53) -> Append (cost=0.00..74.99 rows=35 width=53) -> Seq Scan on flight_bookings1_q1 (cost=0.00..73.79 rows=34 width=53) Filter: (booked_at <= '2017-01-02 00:00:00'::timestamp without time zone) -> Seq Scan on flight_bookings1_default (cost=0.00..1.02 rows=1 width=66) Filter: (booked_at <= '2017-01-02 00:00:00'::timestamp without time zone) # on shard1, with predicates using booked_at, scan restricted to sub partition (q1 and default) of both tables gc_bi_shard_1=> explain select * from flight_bookings1 join hotel_bookings1 on flight_bookings1.id = hotel_bookings1.flight_id and flight_bookings1.booked_at <= '2017-01-02'::timestamp and hotel_bookings1.booked_at <= '2017-01-02'::times tamp; QUERY PLAN ----------------------------------------------------------------------------------------------- Hash Join (cost=75.42..117.63 rows=41 width=135) Hash Cond: (hotel_bookings1_q1.flight_id = flight_bookings1_q1.id) -> Append (cost=0.00..40.87 rows=247 width=83) -> Seq Scan on hotel_bookings1_q1 (cost=0.00..20.64 rows=7 width=39) Filter: (booked_at <= '2017-01-02 00:00:00'::timestamp without time zone) -> Seq Scan on hotel_bookings1_default (cost=0.00..19.00 rows=240 width=84) Filter: (booked_at <= '2017-01-02 00:00:00'::timestamp without time zone) -> Hash (cost=74.99..74.99 rows=35 width=53) -> Append (cost=0.00..74.99 rows=35 width=53) -> Seq Scan on flight_bookings1_q1 (cost=0.00..73.79 rows=34 width=53) Filter: (booked_at <= '2017-01-02 00:00:00'::timestamp without time zone) -> Seq Scan on flight_bookings1_default (cost=0.00..1.02 rows=1 width=66) Filter: (booked_at <= '2017-01-02 00:00:00'::timestamp without time zone) (13 rows) # on shard1 when using predicates using the shard key, both default partition and matching partition scanned explain select * from hotel_bookings1 where booked_at <= '2017-01-02'::timestamp; QUERY PLAN ----------------------------------------------------------------------------------- Append (cost=0.00..40.87 rows=247 width=83) -> Append (cost=0.00..331.21 rows=12814 width=53) -> Seq Scan on flight_bookings1_q1 (cost=0.00..65.83 rows=3183 width=53) -> Seq Scan on flight_bookings1_q2 (cost=0.00..65.73 rows=3173 width=53) -> Seq Scan on flight_bookings1_q3 (cost=0.00..67.21 rows=3221 width=53) -> Seq Scan on flight_bookings1_q4 (cost=0.00..67.35 rows=3235 width=53) -> Seq Scan on flight_bookings1_default (cost=0.00..1.02 rows=2 width=66) -> Hash (cost=114.20..114.20 rows=4547 width=46) -> Append (cost=0.00..114.20 rows=4547 width=46) -> Seq Scan on hotel_bookings1_q1 (cost=0.00..18.31 rows=931 width=39) -> Seq Scan on hotel_bookings1_q2 (cost=0.00..18.54 rows=954 width=39) -> Seq Scan on hotel_bookings1_q3 (cost=0.00..19.12 rows=1012 width=39) -> Seq Scan on hotel_bookings1_q4 (cost=0.00..18.30 rows=930 width=39) -> Seq Scan on hotel_bookings1_default (cost=0.00..17.20 rows=720 width=84) (15 rows) # on shard1, with predicates using booked_at on table flight_bookings1, scan restricted to sub partition (q1 and default) but all partitions for hotel_bookings1 gc_bi_shard_1=> explain select * from flight_bookings1 join hotel_bookings1 on flight_bookings1.id = hotel_bookings1.flight_id and flight_bookings1.booked_at <= '2017-01-02'::timestamp; QUERY PLAN ----------------------------------------------------------------------------------------------- Hash Join (cost=75.42..207.09 rows=41 width=99) Hash Cond: (hotel_bookings1_q1.flight_id = flight_bookings1_q1.id) -> Append (cost=0.00..114.20 rows=4547 width=46) -> Seq Scan on hotel_bookings1_q1 (cost=0.00..18.31 rows=931 width=39) -> Seq Scan on hotel_bookings1_q2 (cost=0.00..18.54 rows=954 width=39) -> Seq Scan on hotel_bookings1_q3 (cost=0.00..19.12 rows=1012 width=39) -> Seq Scan on hotel_bookings1_q4 (cost=0.00..18.30 rows=930 width=39) -> Seq Scan on hotel_bookings1_default (cost=0.00..17.20 rows=720 width=84) -> Hash (cost=74.99..74.99 rows=35 width=53) -> Append (cost=0.00..74.99 rows=35 width=53) -> Seq Scan on flight_bookings1_q1 (cost=0.00..73.79 rows=34 width=53) Filter: (booked_at <= '2017-01-02 00:00:00'::timestamp without time zone) -> Seq Scan on flight_bookings1_default (cost=0.00..1.02 rows=1 width=66) Filter: (booked_at <= '2017-01-02 00:00:00'::timestamp without time zone) # on shard1, with predicates using booked_at, scan restricted to sub partition (q1 and default) of both tables gc_bi_shard_1=> explain select * from flight_bookings1 join hotel_bookings1 on flight_bookings1.id = hotel_bookings1.flight_id and flight_bookings1.booked_at <= '2017-01-02'::timestamp and hotel_bookings1.booked_at <= '2017-01-02'::times tamp; QUERY PLAN ----------------------------------------------------------------------------------------------- Hash Join (cost=75.42..117.63 rows=41 width=135) Hash Cond: (hotel_bookings1_q1.flight_id = flight_bookings1_q1.id) -> Append (cost=0.00..40.87 rows=247 width=83) -> Seq Scan on hotel_bookings1_q1 (cost=0.00..20.64 rows=7 width=39) Filter: (booked_at <= '2017-01-02 00:00:00'::timestamp without time zone) -> Seq Scan on hotel_bookings1_default (cost=0.00..19.00 rows=240 width=84) Filter: (booked_at <= '2017-01-02 00:00:00'::timestamp without time zone) -> Hash (cost=74.99..74.99 rows=35 width=53) -> Append (cost=0.00..74.99 rows=35 width=53) -> Seq Scan on flight_bookings1_q1 (cost=0.00..73.79 rows=34 width=53) Filter: (booked_at <= '2017-01-02 00:00:00'::timestamp without time zone) -> Seq Scan on flight_bookings1_default (cost=0.00..1.02 rows=1 width=66) Filter: (booked_at <= '2017-01-02 00:00:00'::timestamp without time zone) (13 rows) # on shard1 when using predicates using the shard key, both default partition and matching partition scanned explain select * from hotel_bookings1 where booked_at <= '2017-01-02'::timestamp; QUERY PLAN ----------------------------------------------------------------------------------- Append (cost=0.00..40.87 rows=247 width=83) -> Seq Scan on hotel_bookings1_q1 (cost=0.00..20.64 rows=7 width=39) Filter: (booked_at <= '2017-01-02 00:00:00'::timestamp without time zone) -> Seq Scan on hotel_bookings1_default (cost=0.00..19.00 rows=240 width=84) Filter: (booked_at <= '2017-01-02 00:00:00'::timestamp without time zone) (5 rows)