📜 ⬆️ ⬇️

Custom aggregate and window functions in PostgreSQL and Oracle


In this article, we will look at how to create custom aggregate and window (in Oracle terminology - analytical) functions in two systems. Despite differences in syntax and in general in the approach to extensibility, the mechanism of these functions is very similar. But there are differences too.

It must be admitted that own aggregate and window functions are quite rare. Window functions in general for some reason are traditionally classified as “advanced” SQL and are considered difficult to understand and master. There would be to deal with those functions that are already available in the database!

Why, then, to delve into this question at all? I can name a few reasons:
')

An example on which we will train is the calculation of the average, an analogue of the standard avg function for the type numeric (number in Oracle). We will write such a function and see how it works in the aggregate and window modes and whether it can be calculated by several parallel processes. And in conclusion, let's look at an example from real life.

Aggregate functions


Let's move from simple to complex, switching between PostgreSQL and Oracle.

First, some general considerations. Any aggregate function is called for each row of the table in turn and eventually processes them all. Between calls, it needs to maintain an internal state that defines the context for its execution. At the end of the work, it should return the total value.

So, we need four components:


PostgreSQL


To store the state you need to select the appropriate data type. You can take the standard, but you can define your own. For the function that calculates the average, it is necessary to sum up the values ​​separately and count their number separately. Therefore, we will create our own composite type with two fields:

CREATE TYPE average_state AS (
accum numeric,
qty numeric
);

Now we define a function to process the next value. In PostgreSQL, it is called a transition function:

CREATE OR REPLACE FUNCTION average_transition (
state average_state,
val numeric
) RETURNS average_state AS $$
BEGIN
RAISE NOTICE '%(%) + %', state.accum, state.qty, val;
RETURN ROW(state.accum+val, state.qty+1)::average_state;
END;
$$ LANGUAGE plpgsql;

The function takes the current state and the next value, and returns the new state: the values ​​are added together, and the unit is added to the quantity.

In addition, we derive (RAISE NOTICE) function parameters — this will allow us to see how the work is done. Good old debug PRINT, there is nothing better than you.

The next function is to return the final value:

CREATE OR REPLACE FUNCTION average_final (
state average_state
) RETURNS numeric AS $$
BEGIN
RAISE NOTICE '= %(%)', state.accum, state.qty;
RETURN CASE WHEN state.qty > 0 THEN
trim(trailing '0' from ( state.accum/state.qty )::text)::numeric
END;
END;
$$ LANGUAGE plpgsql;

The function takes a state and returns the resulting number. To do this, simply divide the accumulated amount by the amount. But at zero, we return NULL (so does avg).

"Feint ears" with the trim function is needed solely for the accuracy of the output: this way we get rid of insignificant zeros that would otherwise clutter the screen and interfere with perception. Like this:

SELECT 1::numeric / 2::numeric;
?column?
------------------------
0.50000000000000000000
(1 row)

In real life, these tricks, of course, are not needed.

And finally, we define the actual aggregate function. To do this, use the special command CREATE AGGREGATE:

CREATE AGGREGATE average(numeric) (
sfunc = average_transition ,
stype = average_state ,
finalfunc = average_final ,
initcond = '(0,0)'
);

This command indicates the data type for the state (stype), our two functions (sfunc and finalfunc) and the initial state value (initcond) as a string constant.

You can try. Almost all the examples in this article will use a simple table with five rows: one, two, three, four, five. The table is created on the fly by the function generate_series, an indispensable tool for generating test data:

SELECT average(gx) FROM generate_series(1,5) AS g(x);
NOTICE: 0(0) + 1
NOTICE: 1(1) + 2
NOTICE: 3(2) + 3
NOTICE: 6(3) + 4
NOTICE: 10(4) + 5
NOTICE: = 15(5)
average
---------
3
(1 row)

The result is correct, and the output of functions allows you to track the progress of the execution:


Another check is on the empty set:

SELECT average(gx) FROM generate_series(1,0) AS g(x);
NOTICE: = 0(0)
average
---------

(1 row)

Oracle


In Oracle, all extensibility is provided by the Data Cartridge mechanism. In simple terms, we will need to create an object type that implements the interface necessary for aggregation. The context is naturally represented by the attributes of this object.

CREATE OR REPLACE TYPE AverageImpl AS OBJECT(
accum number,
qty number,
STATIC FUNCTION ODCIAggregateInitialize (actx IN OUT AverageImpl)
RETURN number,
MEMBER FUNCTION ODCIAggregateIterate (self IN OUT AverageImpl, val IN number
RETURN number,
MEMBER FUNCTION ODCIAggregateMerge (self IN OUT AverageImpl, ctx2 IN AverageImpl)
RETURN number,
MEMBER FUNCTION ODCIAggregateTerminate (self IN OUT AverageImpl, returnValue OUT number, flags IN number)
RETURN number
);
/

The initial context value is determined here not by a constant, but by a separate (static, that is, not bound to a specific object instance) function ODCIAggregateInitialize.

The function called for each line is ODCIAggregateIterate.

The result is returned by the ODCIAggregateTerminate function, and note that certain flags are passed to it, which we will deal with a little later.

The interface includes another mandatory function: ODCIAggregateMerge. We will define it - where to go, - but let's talk about it for now.

Now create an object body with the implementation of the listed methods.

CREATE OR REPLACE TYPE BODY AverageImpl IS
STATIC FUNCTION ODCIAggregateInitialize (actx IN OUT AverageImpl)
RETURN number IS
BEGIN
actx := AverageImpl(0,0);
RETURN ODCIConst.Success;
END;
MEMBER FUNCTION ODCIAggregateIterate (self IN OUT AverageImpl, val IN number)
RETURN number IS
BEGIN
dbms_output.put_line(self.accum||'('||self.qty||') + '||val);
self.accum := self.accum + val;
self.qty := self.qty + 1;
RETURN ODCIConst.Success;
END;
MEMBER FUNCTION ODCIAggregateMerge (self IN OUT AverageImpl, ctx2 IN AverageImpl)
RETURN number IS
BEGIN
dbms_output.put_line(self.accum||'('||self.qty||') & '||ctx2.accum||'('||ctx2.qty||')');
self.accum := self.accum + ctx2.accum;
self.qty := self.qty + ctx2.qty;
RETURN ODCIConst.Success;
END;
MEMBER FUNCTION ODCIAggregateTerminate (self IN OUT AverageImpl, returnValue OUT number, flags IN number)
RETURN number IS
BEGIN
dbms_output.put_line('= '||self.accum||'('||self.qty||') flags:'||flags);
returnValue := CASE WHEN self.qty > 0 THEN self.accum / self.qty END;
RETURN ODCIConst.Success;
END;
END;
/

The implementation, for the most part, repeats everything we did for PostgreSQL, but in a slightly different syntax.

Trim dances around the return value are not needed: Oracle automatically cuts off insignificant zeros during value output.

Note that all functions return a success indicator (ODCIConst.Success value), and the semantic values ​​are passed through the OUT and IN OUT parameters (which are not connected in any way with the actual return value in PL / SQL). In particular, any function, including ODCIAggregateTerminate, can change the attributes of its object, the link to which is passed to it in the first parameter (self).

The definition of the aggregate function is as follows:

CREATE OR REPLACE FUNCTION average(val number) RETURN number
AGGREGATE USING AverageImpl;
/

We are checking. To generate values, use the idiomatic construct with the recursive CONNECT BY level query:

SELECT average(level) FROM dual CONNECT BY level <= 5;
AVERAGE(LEVEL)
--------------
3
0(0) + 1
1(1) + 2
3(2) + 3
6(3) + 4
10(4) + 5
= 15(5) flags:0

You can pay attention to the fact that the output of messages in PostgreSQL appears before the result, and in Oracle - after. This is because RAISE NOTICE works asynchronously, and the dbms_output package buffers the output.

As we can see, the zero flag was passed to the ODCIAggregateTerminate function. This means that the context is no longer required and, if desired, can be forgotten.

And check on empty set:

SELECT average(rownum) FROM dual WHERE 1 = 0;
AVERAGE(ROWNUM)
---------------

= 0(0) flags:0

Window Functions: OVER ()


The good news is that the aggregate function written by us can work without any changes as a window (analytical) one.

The window function differs from the aggregate in that it does not fold the sample into one (aggregated) row, but is calculated as if separately for each row. Syntactically, a window function call is characterized by the presence of an OVER construction with an indication of a frame that defines a set of lines to be processed. In the simplest case, it is written like this: OVER (), and this means that the function must process all the strings. The result is the same as if we considered the usual aggregate function and recorded the result (the same) opposite each row of the sample.

In other words, the frame is static and covers all lines:

 1. 2. 3. 4. 5.
 + --- + + --- + + --- + + --- + + --- +
 |  1 |  |  1 |  |  1 |  |  1 |  |  1 |
 |  2 |  |  2 |  |  2 |  |  2 |  |  2 |
 |  3 |  |  3 |  |  3 |  |  3 |  |  3 |
 |  4 |  |  4 |  |  4 |  |  4 |  |  4 |
 |  5 |  |  5 |  |  5 |  |  5 |  |  5 |
 + --- + + --- + + --- + + --- + + --- +

PostgreSQL


Let's try:

SELECT gx, average(gx) OVER ()
FROM generate_series(1,5) as g(x);

NOTICE: 0(0) + 1
NOTICE: 1(1) + 2
NOTICE: 3(2) + 3
NOTICE: 6(3) + 4
NOTICE: 10(4) + 5
NOTICE: = 15(5)
x | average
---+---------
1 | 3
2 | 3
3 | 3
4 | 3
5 | 3
(5 rows)

The NOTICE conclusion shows that everything happens in the same way as before when calculating the normal aggregate function. After receiving the result from the function average_final, PostgreSQL puts it on each line.

Oracle


SELECT average(level) OVER() average
FROM dual CONNECT BY level <= 5;

LEVEL AVERAGE
---------- -----------
1 3
2 3
3 3
4 3
5 3
0(0) + 1
1(1) + 2
3(2) + 3
6(3) + 4
10(4) + 5
= 15(5) flags:1
= 15(5) flags:1
= 15(5) flags:1
= 15(5) flags:1
= 15(5) flags:1
= 15(5) flags:0

Suddenly. Instead of calculating the result once, Oracle calls the ODCIAggregateTerminate function N + 1 times: first for each row with flag 1 (which means that the context is still useful) and then one more time at the end. The value received on the last call is simply ignored.

The conclusion is this: if the computationally complex logic is used in the ODCIAggregateTerminate function, you need to think about not doing the same work several times.

Window Functions: OVER (PARTITION BY)


The PARTITION BY clause in the frame definition is similar to the usual GROUP BY aggregate construction. The window function specifying PARTITION BY is calculated separately for each group of rows, and the result is assigned to each row of the sample.

In this embodiment, the frame is also static, but for each group it is different. For example, if two groups of lines are defined (from the first to the second and from the third to the fifth), then the frame can be represented as follows:

 1. 2. 3. 4. 5.
 + --- + + --- +
 |  1 |  |  1 |
 |  2 |  |  2 |  + --- + + --- + + --- +
 + --- + + --- + |  3 |  |  3 |  |  3 |
                 |  4 |  |  4 |  |  4 |
                 |  5 |  |  5 |  |  5 |
                 + --- + + --- + + --- +

PostgreSQL


SELECT gx/3 part,
gx,
average(gx) OVER (PARTITION BY gx/3)
FROM generate_series(1,5) as g(x);

NOTICE: 0(0) + 1
NOTICE: 1(1) + 2
NOTICE: = 3(2)
NOTICE: 0(0) + 3
NOTICE: 3(1) + 4
NOTICE: 7(2) + 5
NOTICE: = 12(3)
part | x | average
------+---+---------
0 | 1 | 1.5
0 | 2 | 1.5
1 | 3 | 4
1 | 4 | 4
1 | 5 | 4
(5 rows)

The calculation again occurs sequentially, but now when moving to another group of rows, the state is reset to the initial value (initcond).

Oracle


SELECT trunc(level/3) part,
level,
average(level) OVER(PARTITION BY trunc(level/3)) average
FROM dual CONNECT BY level <= 5;

PART LEVEL AVERAGE
---------- ---------- ----------
0 2 1.5
0 1 1.5
1 4 4
1 5 4
1 3 4
0(0) + 2
2(1) + 1
= 3(2) flags:1
= 3(2) flags:1
0(0) + 4
4(1) + 5
9(2) + 3
= 12(3) flags:1
= 12(3) flags:1
= 12(3) flags:1
= 12(3) flags:0

It is interesting that Oracle decided to rearrange the lines in some places. This may say something about the implementation details, but in any case - it has the right.

Window Functions: OVER (ORDER BY)


If we add an ORDER BY clause to the definition of the frame, which indicates the sorting order, the function will start working in the increment mode (for the sum function, we would say so - with progressive total ).

For the first line, the frame will consist of one line; for the second - from the first and second; for the third - from the first, second and third, and so on. In other words, the frame will include lines from first to current.

In fact, it can be written exactly this way: OVER (ORDER BY ... ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), but since this verbosity is implied by default, it is usually omitted.

So, the frame ceases to be static: its head moves down, and the tail remains in place:

 1. 2. 3. 4. 5.
 + --- + + --- + + --- + + --- + + --- +
 |  1 |  |  1 |  |  1 |  |  1 |  |  1 |
 + --- + |  2 |  |  2 |  |  2 |  |  2 |
         + --- + |  3 |  |  3 |  |  3 |
                 + --- + |  4 |  |  4 |
                         + --- + |  5 |
                                 + --- +

PostgreSQL


SELECT gx, average(gx) OVER (ORDER BY gx)
FROM generate_series(1,5) as g(x);

NOTICE: 0(0) + 1
NOTICE: = 1(1)
NOTICE: 1(1) + 2
NOTICE: = 3(2)
NOTICE: 3(2) + 3
NOTICE: = 6(3)
NOTICE: 6(3) + 4
NOTICE: = 10(4)
NOTICE: 10(4) + 5
NOTICE: = 15(5)
x | average
---+---------
1 | 1
2 | 1.5
3 | 2
4 | 2.5
5 | 3
(5 rows)

As you can see, the lines are still added to the context one by one, but now the function average_final is called after each addition, producing an intermediate result.

Oracle


SELECT level, average(level) OVER(ORDER BY level) average
FROM dual CONNECT BY level <= 5;

LEVEL AVERAGE
---------- ----------
1 1
2 1.5
3 2
4 2.5
5 3
0(0) + 1
= 1(1) flags:1
1(1) + 2
= 3(2) flags:1
3(2) + 3
= 6(3) flags:1
6(3) + 4
= 10(4) flags:1
10(4) + 5
= 15(5) flags:1
= 15(5) flags:0

This time both systems work in the same way.

Window Functions: OVER (PARTITION BY ORDER BY)


PARTITION BY and ORDER BY clauses can be combined. Then, within each group of rows, the function will work in the increment mode, and when moving from group to group, the state will be reset to the initial state.

 1. 2. 3. 4. 5.
 + --- + + --- +
 |  1 |  |  1 |
 + --- + |  2 |  + --- + + --- + + --- +
         + --- + |  3 |  |  3 |  |  3 |
                 + --- + |  4 |  |  4 |
                         + --- + |  5 |
                                 + --- +

PostgreSQL


SELECT gx/3 part,
gx,
average(gx) OVER (PARTITION BY gx/3 ORDER BY gx)
FROM generate_series(1,5) as g(x);

NOTICE: 0(0) + 1
NOTICE: = 1(1)
NOTICE: 1(1) + 2
NOTICE: = 3(2)
NOTICE: 0(0) + 3
NOTICE: = 3(1)
NOTICE: 3(1) + 4
NOTICE: = 7(2)
NOTICE: 7(2) + 5
NOTICE: = 12(3)
part | x | average
------+---+---------
0 | 1 | 1
0 | 2 | 1.5
1 | 3 | 3
1 | 4 | 3.5
1 | 5 | 4
(5 rows)

Oracle


SELECT trunc(level/3) part,
level,
average(level) OVER(PARTITION BY trunc(level/3) ORDER BY level) average
FROM dual CONNECT BY level <= 5;

PART LEVEL AVERAGE
---------- ---------- ----------
0 1 1
0 2 1.5
1 3 3
1 4 3.5
1 5 4
0(0) + 1
= 1(1) flags:1
1(1) + 2
= 3(2) flags:1
0(0) + 3
= 3(1) flags:1
3(1) + 4
= 7(2) flags:1
7(2) + 5
= 12(3) flags:1
= 12(3) flags:0

Window functions with a sliding frame


In all the examples we looked at, the frame was either static or only its head moved (when using the ORDER BY clause). This enabled us to calculate the state sequentially, adding line by line to the context.

But the frame of the window function can be set in such a way that its tail will also shift. In our example, this would be consistent with the notion of a moving average. For example, the indication OVER (ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) says that for each row of the result the current and two previous values ​​will be averaged.

 1. 2. 3. 4. 5.
 + --- +
 |  |  + --- +
 |  |  |  |  + --- +
 |  1 |  |  1 |  |  1 |  + --- +
 + --- + |  2 |  |  2 |  |  2 |  + --- +
         + --- + |  3 |  |  3 |  |  3 |
                 + --- + |  4 |  |  4 |
                         + --- + |  5 |
                                 + --- +

Can the window function be calculated in this case? It turns out that it can, though ineffectively. But by writing some more code, you can improve the situation.

PostgreSQL


We'll see:

SELECT gx,
average(gx) OVER (ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
FROM generate_series(1,5) as g(x);

NOTICE: 0(0) + 1
NOTICE: = 1(1)
NOTICE: 1(1) + 2
NOTICE: = 3(2)
NOTICE: 3(2) + 3
NOTICE: = 6(3)
NOTICE: 0(0) + 2
NOTICE: 2(1) + 3
NOTICE: 5(2) + 4
NOTICE: = 9(3)
NOTICE: 0(0) + 3
NOTICE: 3(1) + 4
NOTICE: 7(2) + 5
NOTICE: = 12(3)
x | average
---+---------
1 | 1
2 | 1.5
3 | 2
4 | 3
5 | 4
(5 rows)

Up to the third line, everything goes well, because the tail does not actually move: we simply add another value to the existing context. But, since we are not able to remove the value from the context, for the fourth and fifth lines everything has to be recounted completely, each time returning to the initial state.

So, it would be great to have not only the function of adding the next value, but also the function of removing the value from the state. And indeed, such a function can be created:

CREATE OR REPLACE FUNCTION average_inverse (state average_state, val numeric)
RETURNS average_state AS $$
BEGIN
RAISE NOTICE '%(%) - %', state.accum, state.qty, val;
RETURN ROW(state.accum-val, state.qty-1)::average_state;
END;
$$ LANGUAGE plpgsql;


In order for the window function to be able to use it, you need to recreate the unit as follows:

DROP AGGREGATE average(numeric);
CREATE AGGREGATE average(numeric) (
--
sfunc = average_transition ,
stype = average_state ,
finalfunc = average_final ,
initcond = '(0,0)',
-- “”
msfunc = average_transition ,
minvfunc = average_inverse ,
mstype = average_state ,
mfinalfunc = average_final ,
minitcond = '(0,0)'
);

Check:

SELECT gx,
average(gx) OVER (ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
FROM generate_series(1,5) as g(x);

NOTICE: 0(0) + 1
NOTICE: = 1(1)
NOTICE: 1(1) + 2
NOTICE: = 3(2)
NOTICE: 3(2) + 3
NOTICE: = 6(3)
NOTICE: 6(3) - 1
NOTICE: 5(2) + 4
NOTICE: = 9(3)
NOTICE: 9(3) - 2
NOTICE: 7(2) + 5
NOTICE: = 12(3)
x | average
---+---------
1 | 1
2 | 1.5
3 | 2
4 | 3
5 | 4
(5 rows)

Now everything is in order: for the fourth and fifth lines, we remove the tail value from the state and add a new one.

Oracle


Here the situation is similar. The created version of the analytical function works, but is inefficient:

SELECT level,
average(level) OVER(ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) average
FROM dual CONNECT BY level <= 5;

LEVEL AVERAGE
---------- ----------
1 1
2 1.5
3 2
4 3
5 4
0(0) + 1
= 1(1) flags:1
1(1) + 2
= 3(2) flags:1
3(2) + 3
= 6(3) flags:1
0(0) + 2
2(1) + 3
5(2) + 4
= 9(3) flags:1
0(0) + 3
3(1) + 4
7(2) + 5
= 12(3) flags:1
= 12(3) flags:0

The function to remove a value from the context is defined as follows:

MEMBER FUNCTION ODCIAggregateDelete (self IN OUT AverageImpl, val IN number)
RETURN number IS
BEGIN
dbms_output.put_line(self.accum||'('||self.qty||') - '||val);
self.accum := self.accum - val;
self.qty := self.qty - 1;
RETURN ODCIConst.Success;
END;

Full code for copy-paste
CREATE OR REPLACE TYPE AverageImpl AS OBJECT(
accum number,
qty number,
STATIC FUNCTION ODCIAggregateInitialize (actx IN OUT AverageImpl) RETURN number,
MEMBER FUNCTION ODCIAggregateIterate (self IN OUT AverageImpl, val IN number) RETURN number,
MEMBER FUNCTION ODCIAggregateMerge (self IN OUT AverageImpl, ctx2 IN AverageImpl) RETURN number,
MEMBER FUNCTION ODCIAggregateTerminate (self IN OUT AverageImpl, returnValue OUT number, flags IN number) RETURN number,
MEMBER FUNCTION ODCIAggregateDelete (self IN OUT AverageImpl, val IN number) RETURN number
);
/
CREATE OR REPLACE TYPE BODY AverageImpl IS
STATIC FUNCTION ODCIAggregateInitialize (actx IN OUT AverageImpl)
RETURN number IS
BEGIN
actx := AverageImpl(0,0);
RETURN ODCIConst.Success;
END;
MEMBER FUNCTION ODCIAggregateIterate (self IN OUT AverageImpl, val IN number)
RETURN number IS
BEGIN
dbms_output.put_line(self.accum||'('||self.qty||') + '||val);
self.accum := self.accum + val;
self.qty := self.qty + 1;
RETURN ODCIConst.Success;
END;
MEMBER FUNCTION ODCIAggregateMerge (self IN OUT AverageImpl, ctx2 IN AverageImpl)
RETURN number IS
BEGIN
dbms_output.put_line(self.accum||'('||self.qty||') & '||ctx2.accum||'('||ctx2.qty||')');
self.accum := self.accum + ctx2.accum;
self.qty := self.qty + ctx2.qty;
RETURN ODCIConst.Success;
END;
MEMBER FUNCTION ODCIAggregateTerminate (self IN OUT AverageImpl, returnValue OUT number, flags IN number)
RETURN number IS
BEGIN
dbms_output.put_line('= '||self.accum||'('||self.qty||') flags:'||flags);
returnValue := CASE WHEN self.qty > 0 THEN self.accum / self.qty END;
RETURN ODCIConst.Success;
END;
MEMBER FUNCTION ODCIAggregateDelete (self IN OUT AverageImpl, val IN number)
RETURN number IS
BEGIN
dbms_output.put_line(self.accum||'('||self.qty||') - '||val);
self.accum := self.accum - val;
self.qty := self.qty - 1;
RETURN ODCIConst.Success;
END;
END;
/


Recreate the function itself is not necessary. Check:

SELECT level,
average(level) OVER(ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) average
FROM dual CONNECT BY level <= 5;

LEVEL AVERAGE
---------- ----------
1 1
2 1.5
3 2
4 3
5 4
0(0) + 1
= 1(1) flags:1
1(1) + 2
= 3(2) flags:1
3(2) + 3
= 6(3) flags:1
6(3) - 1
5(2) + 4
= 9(3) flags:1
9(3) - 2
7(2) + 5
= 12(3) flags:1
= 12(3) flags:0

Parallelism


Both PostgreSQL and Oracle (Enterprise Edition) can calculate aggregate functions in parallel mode. In addition, each of the parallel processes performs its part of the work, forming an intermediate state. Then the main coordinating process receives these several states and must combine them into one final state.

This requires another join function , which we will now write. In our case, it simply adds both the amounts and the number of values.

PostgreSQL


The function is as follows:

CREATE OR REPLACE FUNCTION average_combine (state1 average_state, state2 average_state)
RETURNS average_state AS $$
BEGIN
RAISE NOTICE '%(%) & %(%)', state1.accum, state1.qty, state2.accum, state2.qty;
RETURN ROW(state1.accum+state2.accum, state1.qty+state2.qty)::average_state;
END;
$$ LANGUAGE plpgsql;

We will also remove our debug output from the function average_transition. With parallel execution, we will summarize not five values, but more, so if this is not done, we will get too much useless information.

Since we remove the output, there is no need to use a procedural language - we will write a function in pure SQL:

CREATE OR REPLACE FUNCTION average_transition (state average_state, val numeric)
RETURNS average_state AS $$
SELECT ROW(state.accum+val, state.qty+1)::average_state;
$$ LANGUAGE sql;

It remains to recreate the unit with the new function and indicate that it can be safely used in parallel mode:

DROP AGGREGATE average(numeric);
CREATE AGGREGATE average(numeric) (
--
sfunc = average_transition ,
stype = average_state ,
finalfunc = average_final ,
combinefunc = average_combine ,
initcond = '(0,0)',
-- “”
msfunc = average_transition ,
minvfunc = average_inverse ,
mstype = average_state ,
mfinalfunc = average_final ,
minitcond = '(0,0)',
--
parallel = safe
);

Now create a table and fill it with data. Thousands of lines will suffice.

CREATE TABLE t(n) AS SELECT generate_series(1,1000)::numeric;

With the default settings, PostgreSQL does not build a parallel plan for such a table — it’s too small — but it’s easy to persuade:

SET parallel_setup_cost=0;
SET min_parallel_table_scan_size=0;

EXPLAIN(costs off) SELECT average(n) FROM t;
QUERY PLAN
------------------------------------------
Finalize Aggregate
-> Gather
Workers Planned: 2
-> Partial Aggregate
-> Parallel Seq Scan on t

In terms of the request, we see:


Check:

SELECT average(n) FROM t;
NOTICE: 0(0) & 281257(678)
NOTICE: 281257(678) & 127803(226)
NOTICE: 409060(904) & 91440(96)
NOTICE: = 500500(1000)
average
---------
500.5
(1 row)

Why is the function average_combine called three times, not two? The fact is that in PostgreSQL, the coordinating process also does some of the work. Therefore, although two workflows were launched, the work was actually carried out in three. One of them managed to process 678 lines, the other 226 and the third - 96 (although these figures mean nothing and may differ with another launch).

Oracle


If you remember, we already wrote the ODCIAggregateMerge function at the very beginning, since it is mandatory in Oracle. Documentation insists that this function is necessary not only for parallel work, but also for sequential - although it is difficult for me to understand why (and in practice I did not have to deal with its execution during sequential processing).

All that remains is to declare the function safe for parallel operation:

CREATE OR REPLACE FUNCTION average(val number) RETURN number
PARALLEL_ENABLE
AGGREGATE USING AverageImpl;
/

Create a table:

CREATE TABLE t(n) AS SELECT to_number(level) FROM dual CONNECT BY level <= 1000;

To persuade Oracle is even easier than PostgreSQL - just write a hint. Here is the plan that turns out (the output is heavily trimmed for simplicity):

EXPLAIN PLAN FOR SELECT /*+ PARALLEL(2) */ average(n) FROM t;
SELECT * FROM TABLE(dbms_xplan.display);

---------------------------------
| Id | Operation |
---------------------------------
| 0 | SELECT STATEMENT |
| 1 | SORT AGGREGATE |
| 2 | PX COORDINATOR |
| 3 | PX SEND QC (RANDOM) |
| 4 | SORT AGGREGATE |
| 5 | PX BLOCK ITERATOR |
| 6 | TABLE ACCESS FULL |
---------------------------------

The plan also contains:


SELECT /*+ PARALLEL(2) */ average(n) FROM t;
AVERAGE(N)
----------
500.5
0(0) & 216153(657)
216153(657) & 284347(343)
= 500500(1000) flags:0

In Oracle, the coordinator does not participate in partial aggregation. Therefore, only two contexts are combined, and for the same reason we see only the output of the ODCIAggregateMerge function.

Documentation


It's time to provide links to the documentation, including the aggregate and window functions already included in the DBMS. There you can find a lot of interesting things.

PostgreSQL:


Oracle:


Example about rounding kopecks


And the promised example from life. I came up with this function when I had to write reports for the accounting department working under RAS (the rules of Russian accounting).

The simplest task in which the need for rounding arises is the distribution of total expenses (say, 100 rubles) into departments (say, 3 pieces) according to some principle (say, equally):

WITH depts(name) AS (
VALUES ('A'), ('B'), ('C')
), report(dept,amount) AS (
SELECT name, 100.00 / count(*) OVER() FROM depts
)
SELECT dept, round(amount,2) FROM report;

dept | round
------+-------
A | 33.33
B | 33.33
C | 33.33
(3 rows)

This query shows the problem: the amount must be rounded, but a penny is lost. But RAS does not forgive this.

The task can be solved in different ways, but for my taste the most elegant way is the window function, which works in incremental mode and takes all the struggle with the pennies on itself:

WITH depts(name) AS (
VALUES ('A'), ('B'), ('C')
), report(dept,amount) AS (
SELECT name, 100.00 / count(*) OVER() FROM depts
)
SELECT dept, round2(amount) OVER (ORDER BY dept) FROM report;

dept | round2
------+--------
A | 33.33
B | 33.34
C | 33.33
(3 rows)

The state of such a function includes a rounding error (r_error) and the current rounded value (amount). The processing function of the next value increases the rounding error, and, if it already exceeds half a penny, adds a penny to the rounded amount:

state.r_error := state.r_error + val - round(val,2);
state.amount := round(val,2) + round(state.r_error,2);
state.r_error := state.r_error - round(state.r_error,2);

And the function that produces the result simply returns the ready state.amount.

I will not give the full code of the function: using the examples already given, it is not difficult to write it.

If you have met some interesting examples of using your own aggregate or window functions - share them in the comments.

Source: https://habr.com/ru/post/351008/


All Articles