-- UPDATE_INFO_STEPS / -- , / CREATE TABLE UPDATE_INFO_STEPS ( BEGIN_GUID varchar(36), END_GUID varchar(36) NOT NULL, STEP_NO int, STATUS char(1), BEGIN_UPD timestamp, END_UPD timestamp, ROWS_UPDATED int, ROWS_UPDATED_TEXT varchar(30), DISCR varchar(10) ); ALTER TABLE UPDATE_INFO_STEPS ADD PRIMARY KEY(discr, step_no); -- FUNC_UPDATE_INFO_STEPS . -- "" , . CREATE OR REPLACE FUNCTION func_update_info_steps( pStep_no int, pDiscr varchar(10) ) RETURNS text AS $BODY$ DECLARE lResult text; BEGIN SELECT 'SUCCESS' INTO lResult FROM update_info_steps WHERE step_no = pStep_no AND discr = pDiscr AND status = 'N' FOR UPDATE NOWAIT; UPDATE UPDATE_INFO_STEPS SET status = 'A', begin_upd = now() WHERE step_no = pStep_no AND discr = pDiscr AND status = 'N'; return lResult; EXCEPTION WHEN lock_not_available THEN SELECT 'ERROR' INTO lResult; return lResult; END; $BODY$ LANGUAGE PLPGSQL VOLATILE; -- (1 1 ) -- 1. . DO LANGUAGE PLPGSQL $$ DECLARE -- l_count int := 10000; -- l_discr VARCHAR(10) := '<discr>'; BEGIN INSERT INTO UPDATE_INFO_STEPS ( BEGIN_GUID, END_GUID, STEP_NO, STATUS, DISCR ) SELECT min(guid) BEGIN_GUID, max(guid) END_GUID, RES2.STEP STEP_NO, 'N' :: char(1) STATUS, l_discr DISCR FROM ( SELECT guid, floor( (ROWNUM - 1) / l_count ) + 1 AS STEP FROM ( -- SELECT <column> AS GUID, -- row_number() over ( ORDER BY <column> ) AS ROWNUM FROM -- <schema>.<table_name> ORDER BY 1 -- ) RES1 ) RES2 GROUP BY RES2.step; END; $$; -- 2. , UPDATE. DO LANGUAGE PLPGSQL $$ DECLARE cur record; vCount int; vCount_text varchar(30); vCurStatus char(1); vCurUpdDate date; -- l_discr varchar(10) := '<discr>'; l_upd_res varchar(100); BEGIN FOR cur IN ( SELECT * FROM UPDATE_INFO_STEPS WHERE status = 'N' AND DISCR = l_discr ORDER BY step_no ) LOOP vCount := 0; -- ! SELECT result INTO l_upd_res FROM dblink( '<parameters>', 'SELECT FUNC_UPDATE_INFO_STEPS(' || cur.step_no || ',''' || l_discr || ''')' ) AS T (result text); IF l_upd_res = 'SUCCESS' THEN -- . -- , . -- - -- cur.begin_guid - cur.end_guid dblink " ". -- . SELECT dblink( '<parameters>', 'UPDATE FOO set level = 42 WHERE id BETWEEN ''' || cur.begin_guid || ''' AND ''' || cur.end_guid || '''' ) INTO vCount_text; -- . SELECT dblink( '<parameters>', 'update UPDATE_INFO_STEPS SET status = ''P'', end_upd = now(), rows_updated_text = ''' || vCount_text || ''' WHERE step_no = ' || cur.step_no || ' AND discr = ''' || l_discr || '''' ) INTO l_upd_res; END IF; END LOOP; END; $$; -- . SELECT SUM(CASE status WHEN 'P' THEN 1 ELSE 0 END) done, SUM(CASE status WHEN 'A' THEN 1 ELSE 0 END) processing, SUM(CASE status WHEN 'N' THEN 1 ELSE 0 END) LEFT_, round( SUM(CASE status WHEN 'P' THEN 1 ELSE 0 END):: numeric / COUNT(*)* 100 :: numeric, 2 ) done_proc FROM UPDATE_INFO_STEPS WHERE discr = '<discr>';
-- , -- (pg_parallel_task) -- (pg_parallel_task_statements). CREATE TABLE IF NOT EXISTS public.pg_parallel_task ( name text primary key, threads_count int not null DEFAULT 10, comment text ); COMMENT ON table public.pg_parallel_task IS ' '; COMMENT ON COLUMN public.pg_parallel_task.name IS ' '; COMMENT ON COLUMN public.pg_parallel_task.threads_count IS ' . 10'; COMMENT ON COLUMN public.pg_parallel_task.comment IS ''; CREATE TABLE IF NOT EXISTS public.pg_parallel_task_statements ( statement_id bigserial primary key, task_name text not null references public.pg_parallel_task (name), sql_statement text not null, status text not null check ( status in ( 'new', 'in progress', 'ok', 'error' ) ) DEFAULT 'new', start_time timestamp without time zone, elapsed_sec float(8), rows_affected bigint, err text ); COMMENT ON table public.pg_parallel_task_statements IS ' '; COMMENT ON COLUMN public.pg_parallel_task_statements.sql_statement IS ' '; COMMENT ON COLUMN public.pg_parallel_task_statements.status IS ' . new|in progress|ok|error'; COMMENT ON COLUMN public.pg_parallel_task_statements.start_time IS ' '; COMMENT ON COLUMN public.pg_parallel_task_statements.elapsed_sec IS ' , '; COMMENT ON COLUMN public.pg_parallel_task_statements.rows_affected IS ' , '; COMMENT ON COLUMN public.pg_parallel_task_statements.err IS ' , . NULL, .'; -- INSERT INTO PUBLIC.pg_parallel_task (NAME, threads_count) VALUES ('JIRA-001', 10); INSERT INTO PUBLIC.pg_parallel_task_statements (task_name, sql_statement) SELECT 'JIRA-001' task_name, FORMAT( 'UPDATE FOO SET level = 42 where id >= ''%s'' and id <= ''%s''', MIN(d.id), MAX(d.id) ) sql_statement FROM ( SELECT id, NTILE(10) OVER ( ORDER BY id ) part FROM foo ) d GROUP BY d.part; --
<changeSet id="JIRA-001" author="soldatov"> <executeCommand os="Linux, Mac OS X" executable="./pgpar.sh"> <arg value="testdatabase"/><arg value="JIRA-001"/> </executeCommand> </changeSet>
SELECT count(1) FROM pgbench_accounts; count ------- 500000000 (1 row) SELECT pg_size_pretty(pg_total_relation_size('pgbench_accounts')); pg_size_pretty ---------------- 62 Gb (1 row) UPDATE pgbench_accounts SET abalance = 42; -- 49 vacuum full analyze verbose pgbench_accounts; INSERT INTO public.pg_parallel_tASk (name, threads_count) values ('JIRA-002', 25); INSERT INTO public.pg_parallel_tASk_statements (tASk_name, sql_statement) SELECT 'JIRA-002' tASk_name, FORMAT('UPDATE pgbench_accounts SET abalance = 42 WHERE aid >= ''%s'' AND aid <= ''%s'';', MIN(d.aid), MAX(d.aid)) sql_statement FROM (SELECT aid, ntile(25) over (order by aid) part FROM pgbench_accounts) d GROUP BY d.part; -- 10 -- ctid, INSERT INTO public.pg_parallel_tASk_statements (tASk_name, sql_statement) SELECT 'JIRA-002-ctid' tASk_name, FORMAT('UPDATE pgbench_accounts SET abalance = 45 WHERE (ctid::text::point)[0]::text > ''%s'' AND (ctid::text::point)[0]::text <= ''%s'';', (d.min_ctid), (d.max_ctid)) sql_statement FROM ( WITH max_ctid AS ( SELECT MAX((ctid::text::point)[0]::int) FROM pgbench_accounts) SELECT generate_series - (SELECT max / 25 FROM max_ctid) AS min_ctid, generate_series AS max_ctid FROM generate_series((SELECT max / 25 FROM max_ctid), (SELECT max FROM max_ctid), (SELECT max / 25 FROM max_ctid))) d; -- 9 ./pgpar-linux-amd64 jdbc:postgresql://localhost:5432 soldatov password testdatabase JIRA-002 -- 7
<changeSet author="soldatov" id="JIRA-002-01"> <sql> <![CDATA[ INSERT INTO public.pg_parallel_task (name, threads_count) VALUES ('JIRA-002', 5); INSERT INTO public.pg_parallel_task_statements (task_name, sql_statement) SELECT 'JIRA-002' task_name, FORMAT( 'UPDATE pgbench_accounts SET abalance = 42 WHERE filler IS NULL AND aid >= ''%s'' AND aid <= ''%s'';', MIN(d.aid), MAX(d.aid) ) sql_statement FROM ( SELECT aid, ntile(10000) over ( order by aid ) part FROM pgbench_accounts WHERE filler IS NULL ) d GROUP BY d.part; ]]> </sql> </changeSet> <changeSet author="soldatov" id="JIRA-002-02"> <executeCommand os="Linux, Mac OS X" executable="./pgpar.sh"> <arg value="pgconfdb"/><arg value="JIRA-002"/> </executeCommand> </changeSet>
<changeSet author="soldatov" id="JIRA-003-01"> <sql> <![CDATA[ INSERT INTO pg_parallel_task (name, threads_count) VALUES ('JIRA-003', 2); INSERT INTO pg_parallel_task_statements (task_name, sql_statement) SELECT 'JIRA-003' task_name, 'VACUUM FULL ANALYZE pgbench_accounts;' sql_statement; INSERT INTO pg_parallel_task_statements (task_name, sql_statement) SELECT 'JIRA-003' task_name, 'VACUUM FULL ANALYZE pgbench_branches;' sql_statement; ]]> </sql> </changeSet> <changeSet author="soldatov" id="JIRA-003-02"> <executeCommand os="Linux, Mac OS X" executable="./pgpar.sh"> <arg value="testdatabase"/><arg value="JIRA-003"/> </executeCommand> </changeSet>
-- SQL part ALTER TABLE pgbench_accounts ADD COLUMN account_number text; INSERT INTO public.pg_parallel_task (name, threads_count) VALUES ('JIRA-004', 5); INSERT INTO public.pg_parallel_task_statements (task_name, sql_statement) SELECT 'JIRA-004' task_name, FORMAT('UPDATE pgbench_accounts SET account_number = aid::text || filler WHERE aid >= ''%s'' AND aid <= ''%s'';', MIN(d.aid), MAX(d.aid)) sql_statement FROM (SELECT aid, ntile(50000) over (order by device_version_guid) part FROM pgbench_accounts) d GROUP BY d.part; SELECT * FROM func_run_parallel_task('testdatabase','JIRA-004');
Source: https://habr.com/ru/post/351160/
All Articles