📜 ⬆️ ⬇️

Sectioning and live snapshots of data in PostgreSQL

Although the topic of partitioning has already been raised earlier , I want to return to it in order to tell about my experience in solving this problem, which arose due to the need for analytic processing of large amounts of data. In addition to partitioning, I will consider the extremely simplified implementation of “snapshots” of aggregated queries that are automatically updated when the source data changes.

One of the main requirements for the developed system was the use of free software, and therefore, the choice fell on PostgreSQL. When I started working on the project, I knew PostgreSQL rather superficially, but I was quite familiar with the features of Oracle Database. As it was a question of analytical processing, I wanted to have analogues of such Oracle options as Partitioning and Materialized Views . After getting acquainted with the capabilities of PostgreSQL , it became clear that this functionality, one way or another, would have to be written manually.

Of course, it was not about any full implementation of Materialized Views, involving the rewriting of requests . For my needs, the possibility of creating automatically updated aggregated single-table samples was quite enough (support for joining tables is likely to be added in the near future). For partitioning, I planned to use a multiple-described approach using inherited tables, with data insertion controlled by a trigger. I had the idea to use Rules to control the partitioning, but I refused it because, in my case, data insertion with single records prevailed.

I began, of course, with tables for storing metadata:
')
ps_tables.sql
create sequence ps_table_seq; create table ps_table ( id bigint default nextval('ps_table_seq') not null, name varchar(50) not null unique, primary key(id) ); create sequence ps_column_seq; create table ps_column ( id bigint default nextval('ps_column_seq') not null, table_id bigint not null references ps_table(id), name varchar(50) not null, parent_name varchar(50), type_name varchar(8) not null check (type_name in ('date', 'key', 'nullable', 'sum', 'min', 'max', 'cnt')), unique (table_id, name), primary key(id) ); create table ps_range_partition ( table_id bigint not null references ps_table(id), type_name varchar(10) not null check (type_name in ('day', 'week', 'month', 'year')), start_value date not null, end_value date not null, primary key(table_id, start_value) ); create table ps_snapshot ( snapshot_id bigint not null references ps_table(id), table_id bigint not null references ps_table(id), type_name varchar(10) not null check (type_name in ('day', 'week', 'month', 'year')), primary key(snapshot_id) ); 


Everything is quite obvious here. The only thing worth mentioning is the column types:
Type of
Description
date
Column containing the calendar date used for data partitioning and aggregation (PostgreSQL date and timestamp types supported)
key
The key used in the group by phrase for data aggregation (all PostgreSQL integer types are supported)
nullable
The key used in data aggregation, possibly containing a null value
sum
Summation of values
min
Minimum value
max
Maximum value
cnt
Counting the number of non-null values

The basis of the entire solution was the function that performs the rebuilding of the functions of the triggers for the table containing the source data:

ps_trigger_regenerate (bigint)
 create or replace function ps_trigger_regenerate(in p_table bigint) returns void as $$ declare l_sql text; l_table_name varchar(50); l_date_column varchar(50); l_flag boolean; tabs record; columns record; begin select name into l_table_name from ps_table where id = p_table; l_sql := 'create or replace function ps_' || l_table_name || '_insert_trigger() returns trigger ' || 'as $'|| '$ ' || 'begin '; for tabs in select a.snapshot_id as id, b.name as table_name, a.type_name as snapshot_type from ps_snapshot a, ps_table b where a.table_id = p_table and b.id = a.snapshot_id loop l_flag = FALSE; l_sql := l_sql || 'update ' || tabs.table_name || ' set '; for columns in select name, parent_name, type_name from ps_column where table_id = tabs.id and not type_name in ('date', 'key', 'nullable') loop if l_flag then l_sql := l_sql || ', '; end if; l_flag := TRUE; if columns.type_name = 'sum' then l_sql := l_sql || columns.name || ' = ' || columns.name || ' + coalesce(NEW.' || columns.parent_name || ', 0) '; end if; if columns.type_name = 'min' then l_sql := l_sql || columns.name || ' = least(coalesce(' || columns.name || ', NEW.' || columns.parent_name || '), coalesce(NEW.' || columns.parent_name || ', ' || columns.name || ')) '; end if; if columns.type_name = 'max' then l_sql := l_sql || columns.name || ' = greatest(coalesce(' || columns.name || ', NEW.' || columns.parent_name || '), coalesce(NEW.' || columns.parent_name || ', ' || columns.name || ')) '; end if; if columns.type_name = 'cnt' then l_sql := l_sql || columns.name || ' = ' || columns.name || ' + case when NEW.' || columns.parent_name || ' is null then 0 else 1 end '; end if; end loop; l_flag = FALSE; l_sql := l_sql || 'where '; for columns in select name, parent_name, type_name from ps_column where table_id = tabs.id and type_name in ('date', 'key', 'nullable') loop if l_flag then l_sql := l_sql || 'and '; end if; l_flag := TRUE; if columns.type_name = 'date' then l_sql := l_sql || columns.name || ' = date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ') '; end if; if columns.type_name = 'key' then l_sql := l_sql || columns.name || ' = NEW.' || columns.parent_name || ' '; end if; if columns.type_name = 'nullable' then l_sql := l_sql || columns.name || ' = coalesce(NEW.' || columns.parent_name || ', 0)'; end if; end loop; l_sql := l_sql || '; ' || 'if not FOUND then ' || 'insert into ' || tabs.table_name || '('; l_flag = FALSE; for columns in select name, type_name from ps_column where table_id = tabs.id loop if l_flag then l_sql := l_sql || ', '; end if; l_flag := TRUE; l_sql := l_sql || columns.name; end loop; l_sql := l_sql || ') values ('; l_flag = FALSE; for columns in select name, parent_name, type_name from ps_column where table_id = tabs.id loop if l_flag then l_sql := l_sql || ', '; end if; l_flag := TRUE; if columns.type_name = 'date' then l_sql := l_sql || 'date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ')'; elsif columns.type_name = 'cnt' then l_sql := l_sql || 'case when NEW.' || columns.parent_name || ' is null then 0 else 1 end'; elsif columns.type_name in ('nullable', 'sum') then l_sql := l_sql || 'coalesce(NEW.' || columns.parent_name || ', 0)'; else l_sql := l_sql || 'NEW.' || columns.parent_name; end if; end loop; l_sql := l_sql || '); ' || 'end if; '; end loop; select name into l_date_column from ps_column where table_id = p_table and type_name = 'date'; for tabs in select to_char(start_value, 'YYYYMMDD') as start_value, to_char(end_value, 'YYYYMMDD') as end_value, type_name from ps_range_partition where table_id = p_table order by start_value desc loop l_sql := l_sql || 'if NEW.' || l_date_column || ' >= to_date(''' || tabs.start_value || ''', ''YYYYMMDD'') and NEW.' || l_date_column || ' < to_date(''' || tabs.end_value || ''', ''YYYYMMDD'') then ' || 'insert into ' || l_table_name || '_' || tabs.start_value || ' values (NEW.*); ' || 'return null; ' || 'end if; '; end loop; l_sql := l_sql || 'return NEW; '|| 'end; '|| '$'||'$ language plpgsql'; execute l_sql; l_sql := 'create or replace function ps_' || l_table_name || '_raise_trigger() returns trigger ' || 'as $'|| '$ ' || 'begin ' || 'raise EXCEPTION ''Can''''t support % on MIN or MAX aggregate'', TG_OP;' || 'end; '|| '$'||'$ language plpgsql'; execute l_sql; l_sql := 'create or replace function ps_' || l_table_name || '_delete_trigger() returns trigger ' || 'as $'|| '$ ' || 'begin '; for tabs in select a.snapshot_id as id, b.name as table_name, a.type_name as snapshot_type from ps_snapshot a, ps_table b where a.table_id = p_table and b.id = a.snapshot_id loop l_flag = FALSE; l_sql := l_sql || 'update ' || tabs.table_name || ' set '; for columns in select name, parent_name, type_name from ps_column where table_id = tabs.id and type_name in ('sum', 'cnt') loop if l_flag then l_sql := l_sql || ', '; end if; l_flag := TRUE; if columns.type_name = 'sum' then l_sql := l_sql || columns.name || ' = ' || columns.name || ' - OLD.' || columns.parent_name || ' '; end if; if columns.type_name = 'cnt' then l_sql := l_sql || columns.name || ' = ' || columns.name || ' - case when OLD.' || columns.parent_name || ' is null then 0 else 1 end '; end if; end loop; l_flag = FALSE; l_sql := l_sql || 'where '; for columns in select name, parent_name, type_name from ps_column where table_id = tabs.id and type_name in ('date', 'key', 'nullable') loop if l_flag then l_sql := l_sql || 'and '; end if; l_flag := TRUE; if columns.type_name = 'date' then l_sql := l_sql || columns.name || ' = date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ') '; end if; if columns.type_name = 'key' then l_sql := l_sql || columns.name || ' = NEW.' || columns.parent_name || ' '; end if; if columns.type_name = 'nullable' then l_sql := l_sql || columns.name || ' = coalesce(NEW.' || columns.parent_name || ', 0)'; end if; end loop; l_sql := l_sql || '; '; end loop; l_sql := l_sql || 'return null; '|| 'end; '|| '$'||'$ language plpgsql'; execute l_sql; l_sql := 'create or replace function ps_' || l_table_name || '_update_trigger() returns trigger ' || 'as $'|| '$ ' || 'begin '; for tabs in select a.snapshot_id as id, b.name as table_name, a.type_name as snapshot_type from ps_snapshot a, ps_table b where a.table_id = p_table and b.id = a.snapshot_id loop l_flag = FALSE; l_sql := l_sql || 'update ' || tabs.table_name || ' set '; for columns in select name, parent_name, type_name from ps_column where table_id = tabs.id and type_name in ('sum', 'cnt') loop if l_flag then l_sql := l_sql || ', '; end if; l_flag := TRUE; if columns.type_name = 'sum' then l_sql := l_sql || columns.name || ' = ' || columns.name || ' - OLD.' || columns.parent_name || ' + NEW.' || columns.parent_name || ' '; end if; if columns.type_name = 'cnt' then l_sql := l_sql || columns.name || ' = ' || columns.name || ' - case when OLD.' || columns.parent_name || ' is null then 0 else 1 end ' || ' + case when NEW.' || columns.parent_name || ' is null then 0 else 1 end '; end if; end loop; l_flag = FALSE; l_sql := l_sql || 'where '; for columns in select name, parent_name, type_name from ps_column where table_id = tabs.id and type_name in ('date', 'key', 'nullable') loop if l_flag then l_sql := l_sql || 'and '; end if; l_flag := TRUE; if columns.type_name = 'date' then l_sql := l_sql || columns.name || ' = date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ') '; end if; if columns.type_name = 'key' then l_sql := l_sql || columns.name || ' = NEW.' || columns.parent_name || ' '; end if; if columns.type_name = 'nullable' then l_sql := l_sql || columns.name || ' = coalesce(NEW.' || columns.parent_name || ', 0)'; end if; end loop; l_sql := l_sql || '; '; end loop; l_sql := l_sql || 'return null; '|| 'end; '|| '$'||'$ language plpgsql'; execute l_sql; end; $$ language plpgsql; 


Despite its awesome appearance, this feature is pretty simple. Its task is to form (based on the available metadata) four functions used in the construction of triggers:


Here, instead of TABLE, the name of the table containing the source data is substituted. A typical definition of the ps_TABLE_insert_trigger () function would be as follows:

 create or replace function ps_data_insert_trigger() returns trigger as $$ begin update data_month set sum_field = sum_field + NEW.sum_field , min_field = least(min_field, NEW.min_field) where date_field = date_trunc('month', NEW.date_field) and key_field = NEW.key_field; if not FOUND then insert into data_month(date_field, key_field, sum_field, min_field) values (date_trunc('month', NEW.date_field), NEW.key_field, NEW.sum_field, NEW.min_field); end if; if NEW.date_field >= to_date('20130101', 'YYYYMMDD') and NEW.date_field < to_date('20130201', 'YYYYMMDD') then insert into data_20130101 values (NEW.*); return null; end if; return NEW; end; $$ language plpgsql; 

Actually, the function looks somewhat more complicated, since null values ​​are handled in a special way. But, as an illustration, the above example is quite adequate. The logic of this code is obvious:



The last point leads to the fact that if the appropriate section is not found, the data is added to the main table. In practice, it is quite convenient. Even if we do not create a section in advance or receive data with an incorrect date, data insertion will succeed. You can later analyze the contents of the main table by running the query:

 select * from only data 

After that, create the missing sections (as it will be shown below, the data will be automatically transferred from the main table to the created section). In such cases, the number of records that did not fall into their section, as a rule, is not large and the costs for data transfer are insignificant.

Now it is necessary to make a harness. Let's start with the function of creating a new section:

ps_add_range_partition (varchar, varchar, varchar, date)
 create or replace function ps_add_range_partition(in p_table varchar, in p_column varchar, in p_type varchar, in p_start date) returns void as $$ declare l_sql text; l_end date; l_start_str varchar(10); l_end_str varchar(10); l_table bigint; l_flag boolean; columns record; begin perform 1 from ps_table a, ps_column b where a.id = b.table_id and lower(a.name) = lower(p_table) and b.type_name = 'date' and lower(b.name) <> lower(p_column); if FOUND then raise EXCEPTION 'Conflict DATE columns'; end if; l_end := p_start + ('1 ' || p_type)::INTERVAL; perform 1 from ps_table a, ps_range_partition b where a.id = b.table_id and lower(a.name) = lower(p_table) and (( p_start >= b.start_value and p_start < b.end_value ) or ( b.start_value >= p_start and b.start_value < l_end )); if FOUND then raise EXCEPTION 'Range intervals intersects'; end if; perform 1 from ps_table where lower(name) = lower(p_table); if not FOUND then insert into ps_table(name) values (lower(p_table)); end if; select id into l_table from ps_table where lower(name) = lower(p_table); perform 1 from ps_column where table_id = l_table and type_name = 'date' and lower(name) = lower(p_column); if not FOUND then insert into ps_column(table_id, name, type_name) values (l_table, lower(p_column), 'date'); end if; insert into ps_range_partition(table_id, type_name, start_value, end_value) values (l_table, p_type, p_start, l_end); l_start_str = to_char(p_start, 'YYYYMMDD'); l_end_str = to_char(l_end, 'YYYYMMDD'); l_sql := 'create table ' || p_table || '_' || l_start_str || '(' || 'check (' || p_column || ' >= to_date(''' || l_start_str || ''', ''YYYYMMDD'') and ' || p_column || ' < to_date(''' || l_end_str || ''', ''YYYYMMDD'')), ' || 'primary key ('; l_flag := FALSE; for columns in select f.name as name from ( select ps_array_to_set(a.conkey) as nn from pg_constraint a, pg_class b where b.oid = a.conrelid and a.contype = 'p' and b.relname = p_table ) c, ( select d.attname as name, d.attnum as nn from pg_attribute d, pg_class e where e.oid = d.attrelid and e.relname = p_table ) f where f.nn = c.nn order by f.nn loop if l_flag then l_sql := l_sql || ', '; end if; l_flag := TRUE; l_sql := l_sql || columns.name; end loop; l_sql := l_sql || ')) inherits (' || p_table || ')'; execute l_sql; l_sql := 'create index ' || p_table || '_' || l_start_str || '_date on ' || p_table || '_' || l_start_str || '(' || p_column || ')'; execute l_sql; perform ps_trigger_regenerate(l_table); execute 'drop trigger if exists ps_' || p_table || '_before_insert on ' || p_table; execute 'drop trigger if exists ps_' || p_table || '_after_update on ' || p_table; execute 'drop trigger if exists ps_' || p_table || '_after_delete on ' || p_table; l_sql := 'insert into ' || p_table || '_' || l_start_str || ' ' || 'select * from ' || p_table || ' where ' || p_column || ' >= to_date(''' || l_start_str || ''', ''YYYYMMDD'') and ' || p_column || ' < to_date(''' || l_end_str || ''', ''YYYYMMDD'')'; execute l_sql; l_sql := 'delete from only ' || p_table || ' where ' || p_column || ' >= to_date(''' || l_start_str || ''', ''YYYYMMDD'') and ' || p_column || ' < to_date(''' || l_end_str || ''', ''YYYYMMDD'')'; execute l_sql; l_sql := 'create trigger ps_' || p_table || '_before_insert ' || 'before insert on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_insert_trigger()'; execute l_sql; perform 1 from ps_snapshot a, ps_column b where b.table_id = a.snapshot_id and a.table_id = l_table and b.type_name in ('min', 'max'); if FOUND then l_sql := 'create trigger ps_' || p_table || '_after_update ' || 'after update on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_raise_trigger()'; execute l_sql; l_sql := 'create trigger ps_' || p_table || '_after_delete ' || 'after delete on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_raise_trigger()'; execute l_sql; l_sql := 'create trigger ps_' || p_table || '_' || l_start_str || '_after_update ' || 'after update on ' || p_table || '_' || l_start_str || ' for each row ' || 'execute procedure ps_' || p_table || '_raise_trigger()'; execute l_sql; l_sql := 'create trigger ps_' || p_table || '_' || l_start_str || '_after_delete ' || 'after delete on ' || p_table || '_' || l_start_str || ' for each row ' || 'execute procedure ps_' || p_table || '_raise_trigger()'; execute l_sql; else l_sql := 'create trigger ps_' || p_table || '_after_update ' || 'after update on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_update_trigger()'; execute l_sql; l_sql := 'create trigger ps_' || p_table || '_after_delete ' || 'after delete on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_delete_trigger()'; execute l_sql; l_sql := 'create trigger ps_' || p_table || '_' || l_start_str || '_after_update ' || 'after update on ' || p_table || '_' || l_start_str || ' for each row ' || 'execute procedure ps_' || p_table || '_update_trigger()'; execute l_sql; l_sql := 'create trigger ps_' || p_table || '_' || l_start_str || '_after_delete ' || 'after delete on ' || p_table || '_' || l_start_str || ' for each row ' || 'execute procedure ps_' || p_table || '_delete_trigger()'; execute l_sql; end if; end; $$ language plpgsql; 


Here, after checking the correctness of the input data, we add the necessary metadata, after which we create the inherited table. Then, we re-create the trigger functions by calling ps_trigger_regenerate, then transfer the data that falls under the partitioning condition into the created section with a dynamic query and re-create the triggers themselves.

Difficulties arose with two points.

  1. I had to suffer a little with the addition of the month, day or year to the starting date (depending on the input parameter p_type:

     l_end := p_start + ('1 ' || p_type)::INTERVAL; 

  2. Since the primary key is not inherited, I had to compose a query to System Catalogs , to get a list of the primary key columns of the source table (I also found it inappropriate to store the primary key description in my metadata):

      select f.name as name from ( select ps_array_to_set(a.conkey) as nn from pg_constraint a, pg_class b where b.oid = a.conrelid and a.contype = 'p' and b.relname = p_table ) c, ( select d.attname as name, d.attnum as nn from pg_attribute d, pg_class e where e.oid = d.attrelid and e.relname = p_table ) f where f.nn = c.nn order by f.nn 


Also, it should be noted that before creating an index on the partitioning key (for the created partition), it would be worthwhile to first check whether it is the leading column of the primary key (so as not to create a duplicate index).

The function of deleting a section is much simpler and does not need special comments:

ps_del_range_partition (varchar, date)
 create or replace function ps_del_range_partition(in p_table varchar, in p_start date) returns void as $$ declare l_sql text; l_start_str varchar(10); l_table bigint; begin select id into l_table from ps_table where lower(name) = lower(p_table); l_start_str = to_char(p_start, 'YYYYMMDD'); delete from ps_range_partition where table_id = l_table and start_value = p_start; perform ps_trigger_regenerate(l_table); l_sql := 'insert into ' || p_table || ' ' || 'select * from ' || p_table || '_' || l_start_str; execute l_sql; perform 1 from ( select 1 from ps_range_partition where table_id = l_table union all select 1 from ps_snapshot where table_id = l_table ) a; if not FOUND then execute 'drop trigger if exists ps_' || p_table || '_before_insert on ' || p_table; execute 'drop trigger if exists ps_' || p_table || '_after_update on ' || p_table; execute 'drop trigger if exists ps_' || p_table || '_after_delete on ' || p_table; execute 'drop function ps_' || p_table || '_insert_trigger() cascade'; execute 'drop function ps_' || p_table || '_raise_trigger() cascade'; execute 'drop function ps_' || p_table || '_update_trigger() cascade'; execute 'drop function ps_' || p_table || '_delete_trigger() cascade'; delete from ps_column where table_id = l_table; delete from ps_table where id = l_table; end if; perform 1 from ps_range_partition where table_id = l_table; if not FOUND then delete from ps_column where table_id = l_table and type_name = 'date'; end if; execute 'drop table ' || p_table || '_' || l_start_str; end; $$ language plpgsql; 


When deleting a section, data, naturally, is not lost, but transferred to the main table (triggers are deleted beforehand, because, as it turned out, the keyword only does not work in the insert statement).

It remains to add the management functions of "live" data snapshots:

ps_add_snapshot_column (varchar, varchar, varchar, varchar)
 create or replace function ps_add_snapshot_column(in p_snapshot varchar, in p_column varchar, in p_parent varchar, in p_type varchar) returns void as $$ declare l_table bigint; begin perform 1 from ps_table where lower(name) = lower(p_snapshot); if not FOUND then insert into ps_table(name) values (lower(p_snapshot)); end if; select id into l_table from ps_table where lower(name) = lower(p_snapshot); insert into ps_column(table_id, name, parent_name, type_name) values (l_table, lower(p_column), lower(p_parent), p_type); end; $$ language plpgsql; 


ps_add_snapshot (varchar, varchar, varchar)
 create or replace function ps_add_snapshot(in p_table varchar, in p_snapshot varchar, in p_type varchar) returns void as $$ declare l_sql text; l_table bigint; l_snapshot bigint; l_flag boolean; columns record; begin select id into l_snapshot from ps_table where lower(name) = lower(p_snapshot); perform 1 from ps_column where table_id = l_snapshot and type_name in ('date', 'key'); if not FOUND then raise EXCEPTION 'Key columns not found'; end if; perform 1 from ps_column where table_id = l_snapshot and not type_name in ('date', 'key', 'nullable'); if not FOUND then raise EXCEPTION 'Aggregate columns not found'; end if; perform 1 from ps_table where lower(name) = lower(p_table); if not FOUND then insert into ps_table(name) values (lower(p_table)); end if; select id into l_table from ps_table where lower(name) = lower(p_table); insert into ps_snapshot(table_id, snapshot_id, type_name) values (l_table, l_snapshot, p_type); perform ps_trigger_regenerate(l_table); l_sql := 'create table ' || p_snapshot || ' ('; l_flag := FALSE; for columns in select name, type_name from ps_column where table_id = l_snapshot loop if l_flag then l_sql := l_sql || ', '; end if; l_flag := TRUE; if columns.type_name = 'date' then l_sql := l_sql || columns.name || ' date not null'; else l_sql := l_sql || columns.name || ' bigint not null'; end if; end loop; l_sql := l_sql || ', primary key ('; l_flag := FALSE; for columns in select name from ps_column where table_id = l_snapshot and type_name in ('date', 'key', 'nullable') loop if l_flag then l_sql := l_sql || ', '; end if; l_flag := TRUE; l_sql := l_sql || columns.name; end loop; l_sql := l_sql || '))'; execute l_sql; execute 'drop trigger if exists ps_' || p_table || '_before_insert on ' || p_table; execute 'drop trigger if exists ps_' || p_table || '_after_update on ' || p_table; execute 'drop trigger if exists ps_' || p_table || '_after_delete on ' || p_table; l_sql := 'create trigger ps_' || p_table || '_before_insert ' || 'before insert on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_insert_trigger()'; execute l_sql; perform 1 from ps_snapshot a, ps_column b where b.table_id = a.snapshot_id and a.table_id = l_table and b.type_name in ('min', 'max'); if FOUND then l_sql := 'create trigger ps_' || p_table || '_after_update ' || 'after update on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_raise_trigger()'; execute l_sql; l_sql := 'create trigger ps_' || p_table || '_after_delete ' || 'after delete on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_raise_trigger()'; execute l_sql; else l_sql := 'create trigger ps_' || p_table || '_after_update ' || 'after update on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_update_trigger()'; execute l_sql; l_sql := 'create trigger ps_' || p_table || '_after_delete ' || 'after delete on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_delete_trigger()'; execute l_sql; end if; l_sql := 'insert into ' || p_snapshot || '('; l_flag := FALSE; for columns in select name from ps_column where table_id = l_snapshot loop if l_flag then l_sql := l_sql || ', '; end if; l_flag := TRUE; l_sql := l_sql || columns.name; end loop; l_sql := l_sql || ') select '; l_flag := FALSE; for columns in select parent_name as name, type_name from ps_column where table_id = l_snapshot loop if l_flag then l_sql := l_sql || ', '; end if; l_flag := TRUE; if columns.type_name = 'date' then l_sql := l_sql || 'date_trunc(lower(''' || p_type || '''), ' || columns.name || ')'; end if; if columns.type_name = 'key' then l_sql := l_sql || columns.name; end if; if columns.type_name = 'nullable' then l_sql := l_sql || 'coalesce(' || columns.name || ', 0)'; end if; if columns.type_name = 'sum' then l_sql := l_sql || 'sum(' || columns.name || ')'; end if; if columns.type_name = 'min' then l_sql := l_sql || 'min(' || columns.name || ')'; end if; if columns.type_name = 'max' then l_sql := l_sql || 'max(' || columns.name || ')'; end if; if columns.type_name = 'cnt' then l_sql := l_sql || 'count(' || columns.name || ')'; end if; end loop; l_sql := l_sql || 'from ' || p_table || ' group by '; l_flag := FALSE; for columns in select parent_name as name, type_name from ps_column where table_id = l_snapshot and type_name in ('date', 'key', 'nullable') loop if l_flag then l_sql := l_sql || ', '; end if; l_flag := TRUE; if columns.type_name = 'date' then l_sql := l_sql || 'date_trunc(lower(''' || p_type || '''), ' || columns.name || ')'; else l_sql := l_sql || columns.name; end if; end loop; execute l_sql; end; $$ language plpgsql; 


ps_del_snapshot (varchar)
 create or replace function ps_del_snapshot(in p_snapshot varchar) returns void as $$ declare l_sql text; p_table varchar(50); l_table bigint; l_snapshot bigint; begin select a.table_id, c.name into l_table, p_table from ps_snapshot a, ps_table b, ps_table c where b.id = a.snapshot_id and c.id = a.table_id and lower(b.name) = lower(p_snapshot); select id into l_snapshot from ps_table where lower(name) = lower(p_snapshot); delete from ps_snapshot where snapshot_id = l_snapshot; delete from ps_column where table_id = l_snapshot; delete from ps_table where id = l_snapshot; execute 'drop trigger if exists ps_' || p_table || '_before_insert on ' || p_table; execute 'drop trigger if exists ps_' || p_table || '_after_update on ' || p_table; execute 'drop trigger if exists ps_' || p_table || '_after_delete on ' || p_table; perform 1 from ( select 1 from ps_range_partition where table_id = l_table union all select 1 from ps_snapshot where table_id = l_table ) a; if not FOUND then execute 'drop function if exists ps_' || p_table || '_insert_trigger() cascade'; execute 'drop function if exists ps_' || p_table || '_raise_trigger() cascade'; execute 'drop function if exists ps_' || p_table || '_update_trigger() cascade'; execute 'drop function if exists ps_' || p_table || '_delete_trigger() cascade'; else perform ps_trigger_regenerate(l_table); l_sql := 'create trigger ps_' || p_table || '_before_insert ' || 'before insert on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_insert_trigger()'; execute l_sql; perform 1 from ps_snapshot a, ps_column b where b.table_id = a.snapshot_id and a.table_id = l_table and b.type_name in ('min', 'max'); if FOUND then l_sql := 'create trigger ps_' || p_table || '_after_update ' || 'after update on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_raise_trigger()'; execute l_sql; l_sql := 'create trigger ps_' || p_table || '_after_delete ' || 'after delete on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_raise_trigger()'; execute l_sql; else l_sql := 'create trigger ps_' || p_table || '_after_update ' || 'after update on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_update_trigger()'; execute l_sql; l_sql := 'create trigger ps_' || p_table || '_after_delete ' || 'after delete on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_delete_trigger()'; execute l_sql; end if; end if; execute 'drop table if exists ' || p_snapshot; end; $$ language plpgsql; 


Here, too, there is nothing fundamentally new and the only thing I would like to note is that in the case of using the 'min' or 'max' aggregates, when creating triggers, the ps_TABLE_raise_trigger () function is used, prohibiting deletions and changes in the table which is built snapshot. This was done because I could not come up with an adequate performance implementation of updating these aggregates when executing the update and delete statements in the source table.

Let's see how it all works. Create a test table:

 create sequence test_seq; create table test ( id bigint default nextval('test_seq') not null, event_time timestamp not null, customer_id bigint not null, value bigint not null, primary key(id) ); 

Now, to add a section, just run the following query:

 select ps_add_range_partition('test', 'event_time', 'month', to_date('20130501', 'YYYYMMDD')) 

As a result, an inherited table test_20130501 will be created, into which all records for the month of May will automatically fall.

To delete a section, you can run the following query:

 select ps_del_range_partition('test', to_date('20130501', 'YYYYMMDD')) 


Creating a snapshot is somewhat more complicated, since it is first necessary to determine the columns of interest to us:

 select ps_add_snapshot_column('test_month', 'customer_id', 'key') select ps_add_snapshot_column('test_month', 'event_time', 'date') select ps_add_snapshot_column('test_month', 'value_sum', 'value', 'sum') select ps_add_snapshot_column('test_month', 'value_cnt', 'value', 'cnt') select ps_add_snapshot_column('test_month', 'value_max', 'value', 'max') select ps_add_snapshot('test', 'test_month', 'month') 


As a result, an automatically updated table will be created based on the following query:

 select customer_id, date_trunc('month', event_time), sum(value) as value_sum, count(value) as value_cnt, max(value) as value_max from test group by customer_id, date_trunc('month', event_time) 

You can remove the snapshot by running the following query:

 select ps_del_snapshot('test_month') 

On this, for today, everything. Scripts can be picked up on github .

Source: https://habr.com/ru/post/177957/


All Articles