There are a lot of cores in modern CPUs. For years, applications sent queries to databases in parallel. If this is a reporting query to a set of rows in a table, it is executed faster when it involves several CPUs, and in PostgreSQL, this is possible starting from version 9.6.
It took 3 years to implement the function of parallel queries - I had to rewrite the code at different stages of query execution. In PostgreSQL 9.6, an infrastructure has appeared to further improve the code. In later versions, other types of queries are executed in parallel.
PostgreSQL developers have tried to cut the response time of the benchmark TPC-H. Download the benchmark and adapt it to PostgreSQL . This is an unofficial use of the TPC-H benchmark — not for comparing databases or equipment.
./dbgen -s 10
creates a 23 GB database. That's enough to see the performance difference between parallel and non-parallel queries.tbl
files to csv for
and sed
.csv
to pg_tpch/dss/data
.qgen
command../tpch.sh
.It may be faster not because of parallel reading, but because the data is scattered across many CPU cores. In modern operating systems, PostgreSQL data files are well cached. With read ahead, you can get more block from storage than the PG daemon requests. Therefore, query performance is not limited to disk I / O. It consumes CPU cycles to:
WHERE
conditions.Perform a simple select
query:
tpch=# explain analyze select l_quantity as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day; QUERY PLAN -------------------------------------------------------------------------------------------------------------------------- Seq Scan on lineitem (cost=0.00..1964772.00 rows=58856235 width=5) (actual time=0.014..16951.669 rows=58839715 loops=1) Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone) Rows Removed by Filter: 1146337 Planning Time: 0.203 ms Execution Time: 19035.100 ms
A serial scan yields too many rows without aggregation, so the query is executed by a single CPU core.
If you add SUM()
, you can see that two workflows can help speed up the request:
explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day; QUERY PLAN ---------------------------------------------------------------------------------------------------------------------------------------------------- Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1) -> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1) Workers Planned: 2 Workers Launched: 2 -> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3) -> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3) Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone) Rows Removed by Filter: 382112 Planning Time: 0.241 ms Execution Time: 8555.131 ms
The "Parallel Seq Scan" node produces rows for partial aggregation. The "Partial Aggregate" node truncates these strings with SUM()
. At the end, the SUM counter from each workflow is collected by the "Gather" node.
The final result is calculated by the node "Finalize Aggregate". If you have your own aggregation functions, do not forget to mark them as "parallel safe".
The number of working processes can be increased without restarting the server:
alter system set max_parallel_workers_per_gather=4; select * from pg_reload_conf();
Now we see 4 workers in the explain output:
tpch=# explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day; QUERY PLAN ---------------------------------------------------------------------------------------------------------------------------------------------------- Finalize Aggregate (cost=1440213.58..1440213.59 rows=1 width=32) (actual time=5152.072..5152.072 rows=1 loops=1) -> Gather (cost=1440213.15..1440213.56 rows=4 width=32) (actual time=5151.807..5153.900 rows=5 loops=1) Workers Planned: 4 Workers Launched: 4 -> Partial Aggregate (cost=1439213.15..1439213.16 rows=1 width=32) (actual time=5147.238..5147.239 rows=1 loops=5) -> Parallel Seq Scan on lineitem (cost=0.00..1402428.00 rows=14714059 width=5) (actual time=0.037..3601.882 rows=11767943 loops=5) Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone) Rows Removed by Filter: 229267 Planning Time: 0.218 ms Execution Time: 5153.967 ms
What's going on here? The workflow was 2 times more, and the request was only 1.6599 times faster. The calculations are interesting. We had 2 workflows and 1 leader. After the change was 4 + 1.
Our maximum acceleration from parallel processing: 5/3 = 1.66 (6) times.
Query execution always begins with a leading process. The leader makes everything non-parallel and part of parallel processing. Other processes that perform the same requests are called workflows. Parallel processing uses a dynamic background workflow infrastructure (from version 9.4). Since other parts of PostgreSQL use processes rather than threads, a query with 3 workflows could be 4 times faster than traditional processing.
Workflows communicate with the leader through a message queue (based on shared memory). Each process has 2 queues: for errors and for tuples.
The minimum limit specifies the max_parallel_workers_per_gather
parameter. Then the max_parallel_workers size
takes workflows from the pool bounded by the max_parallel_workers size
parameter. The last limit is max_worker_processes
, that is, the total number of background processes.
If you cannot allocate the workflow, the processing will be single-process
A query planner can shorten workflows depending on the size of the table or index. For this there are parameters min_parallel_table_scan_size
and min_parallel_index_scan_size
.
set min_parallel_table_scan_size='8MB' 8MB table => 1 worker 24MB table => 2 workers 72MB table => 3 workers x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker
Every time a table is 3 times larger than min_parallel_(index|table)_scan_size
, Postgres adds a workflow. The number of work processes is not cost based. Circular dependency makes complex implementations difficult. Instead, the scheduler uses simple rules.
In practice, these rules are not always suitable for production, so you can change the number of worker processes for a particular table: ALTER TABLE ... SET ( parallel_workers = N
).
In addition to a long list of restrictions, there are also cost checks:
parallel_setup_cost
- to avoid parallel processing of short requests. This parameter estimates the time for preparing the memory, starting the process and initial data exchange.
parallel_tuple_cost
: the leader’s communication with the workers may drag in proportion to the number of tuples from workflows. This parameter considers the cost of data exchange.
PostgreSQL 9.6+ — . explain (costs off) select c_custkey, count(o_orderkey) from customer left outer join orders on c_custkey = o_custkey and o_comment not like '%special%deposits%' group by c_custkey; QUERY PLAN -------------------------------------------------------------------------------------- Finalize GroupAggregate Group Key: customer.c_custkey -> Gather Merge Workers Planned: 4 -> Partial GroupAggregate Group Key: customer.c_custkey -> Nested Loop Left Join -> Parallel Index Only Scan using customer_pkey on customer -> Index Scan using idx_orders_custkey on orders Index Cond: (customer.c_custkey = o_custkey) Filter: ((o_comment)::text !~~ '%special%deposits%'::text)
The collection takes place at the last stage, so that Nested Loop Left Join is a parallel operation. Parallel Index Only Scan appeared only in version 10. It works in the same way as parallel sequential scanning. Condition c_custkey = o_custkey
reads one order for each client line. So it is not parallel.
Each workflow creates its own hash table before PostgreSQL 11. And if there are more than four of these processes, the performance will not increase. In the new version, the hash table is common. Each workflow can use WORK_MEM to create a hash table.
select l_shipmode, sum(case when o_orderpriority = '1-URGENT' or o_orderpriority = '2-HIGH' then 1 else 0 end) as high_line_count, sum(case when o_orderpriority <> '1-URGENT' and o_orderpriority <> '2-HIGH' then 1 else 0 end) as low_line_count from orders, lineitem where o_orderkey = l_orderkey and l_shipmode in ('MAIL', 'AIR') and l_commitdate < l_receiptdate and l_shipdate < l_commitdate and l_receiptdate >= date '1996-01-01' and l_receiptdate < date '1996-01-01' + interval '1' year group by l_shipmode order by l_shipmode LIMIT 1; QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Limit (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1) -> Finalize GroupAggregate (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1) Group Key: lineitem.l_shipmode -> Gather Merge (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1) Workers Planned: 4 Workers Launched: 4 -> Partial GroupAggregate (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5) Group Key: lineitem.l_shipmode -> Sort (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5) Sort Key: lineitem.l_shipmode Sort Method: external merge Disk: 2304kB Worker 0: Sort Method: external merge Disk: 2064kB Worker 1: Sort Method: external merge Disk: 2384kB Worker 2: Sort Method: external merge Disk: 2264kB Worker 3: Sort Method: external merge Disk: 2336kB -> Parallel Hash Join (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5) Hash Cond: (lineitem.l_orderkey = orders.o_orderkey) -> Parallel Seq Scan on lineitem (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5) Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone)) Rows Removed by Filter: 11934691 -> Parallel Hash (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5) Buckets: 65536 Batches: 256 Memory Usage: 3840kB -> Parallel Seq Scan on orders (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5) Planning Time: 0.977 ms Execution Time: 7923.770 ms
Request 12 of TPC-H clearly shows a parallel hash connection. Each workflow is involved in creating a shared hash table.
A merge join is non-parallel in nature. Do not worry if this is the last stage of the request - it can still be executed in parallel.
-- Query 2 from TPC-H explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment from part, supplier, partsupp, nation, region where p_partkey = ps_partkey and s_suppkey = ps_suppkey and p_size = 36 and p_type like '%BRASS' and s_nationkey = n_nationkey and n_regionkey = r_regionkey and r_name = 'AMERICA' and ps_supplycost = ( select min(ps_supplycost) from partsupp, supplier, nation, region where p_partkey = ps_partkey and s_suppkey = ps_suppkey and s_nationkey = n_nationkey and n_regionkey = r_regionkey and r_name = 'AMERICA' ) order by s_acctbal desc, n_name, s_name, p_partkey LIMIT 100; QUERY PLAN ---------------------------------------------------------------------------------------------------------- Limit -> Sort Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey -> Merge Join Merge Cond: (part.p_partkey = partsupp.ps_partkey) Join Filter: (partsupp.ps_supplycost = (SubPlan 1)) -> Gather Merge Workers Planned: 4 -> Parallel Index Scan using <strong>part_pkey</strong> on part Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36)) -> Materialize -> Sort Sort Key: partsupp.ps_partkey -> Nested Loop -> Nested Loop Join Filter: (nation.n_regionkey = region.r_regionkey) -> Seq Scan on region Filter: (r_name = 'AMERICA'::bpchar) -> Hash Join Hash Cond: (supplier.s_nationkey = nation.n_nationkey) -> Seq Scan on supplier -> Hash -> Seq Scan on nation -> Index Scan using idx_partsupp_suppkey on partsupp Index Cond: (ps_suppkey = supplier.s_suppkey) SubPlan 1 -> Aggregate -> Nested Loop Join Filter: (nation_1.n_regionkey = region_1.r_regionkey) -> Seq Scan on region region_1 Filter: (r_name = 'AMERICA'::bpchar) -> Nested Loop -> Nested Loop -> Index Scan using idx_partsupp_partkey on partsupp partsupp_1 Index Cond: (part.p_partkey = ps_partkey) -> Index Scan using supplier_pkey on supplier supplier_1 Index Cond: (s_suppkey = partsupp_1.ps_suppkey) -> Index Scan using nation_pkey on nation nation_1 Index Cond: (n_nationkey = supplier_1.s_nationkey)
The "Merge Join" node is located above the "Gather Merge". So the merge does not use parallel processing. But the "Parallel Index Scan" node still helps with the part_pkey
segment.
In PostgreSQL 11 , sectional connection is disabled by default: it has very expensive scheduling. Tables with similar partitioning can be joined section by section. So Postgres will use smaller hash tables. Each section connection may be parallel.
tpch=# set enable_partitionwise_join=t; tpch=# explain (costs off) select * from prt1 t1, prt2 t2 where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000; QUERY PLAN --------------------------------------------------- Append -> Hash Join Hash Cond: (t2.b = t1.a) -> Seq Scan on prt2_p1 t2 Filter: ((b >= 0) AND (b <= 10000)) -> Hash -> Seq Scan on prt1_p1 t1 Filter: (b = 0) -> Hash Join Hash Cond: (t2_1.b = t1_1.a) -> Seq Scan on prt2_p2 t2_1 Filter: ((b >= 0) AND (b <= 10000)) -> Hash -> Seq Scan on prt1_p2 t1_1 Filter: (b = 0) tpch=# set parallel_setup_cost = 1; tpch=# set parallel_tuple_cost = 0.01; tpch=# explain (costs off) select * from prt1 t1, prt2 t2 where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000; QUERY PLAN ----------------------------------------------------------- Gather Workers Planned: 4 -> Parallel Append -> Parallel Hash Join Hash Cond: (t2_1.b = t1_1.a) -> Parallel Seq Scan on prt2_p2 t2_1 Filter: ((b >= 0) AND (b <= 10000)) -> Parallel Hash -> Parallel Seq Scan on prt1_p2 t1_1 Filter: (b = 0) -> Parallel Hash Join Hash Cond: (t2.b = t1.a) -> Parallel Seq Scan on prt2_p1 t2 Filter: ((b >= 0) AND (b <= 10000)) -> Parallel Hash -> Parallel Seq Scan on prt1_p1 t1 Filter: (b = 0)
The main thing is that the connection in sections is parallel only if these sections are large enough.
Parallel Append can be used instead of different blocks in different workflows. This is usually the case with UNION ALL queries. The disadvantage is less concurrency, because each workflow processes only 1 request.
2 workflows are running here, although 4 is included.
tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day; QUERY PLAN ------------------------------------------------------------------------------------------------ Gather Workers Planned: 2 -> Parallel Append -> Aggregate -> Seq Scan on lineitem Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone) -> Aggregate -> Seq Scan on lineitem lineitem_1 Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
max_parallel_workers_per_gather
— how many worker processes the executing program will use for parallel processing from the plan.max_worker_processes
- adjusts the total number of worker processes to the number of CPU cores on the server.max_parallel_workers
- the same, but for parallel workflows.Starting from version 9.6, parallel processing can significantly improve the performance of complex queries that scan multiple rows or indexes. PostgreSQL 10 has parallel processing enabled by default. Do not forget to disable it on servers with a large OLTP workload. Sequential scans or index scans consume a lot of resources. If you do not run a report on the entire data set, queries can be made more efficient simply by adding the missing indexes or using the correct partitioning.
Source: https://habr.com/ru/post/446706/
All Articles