To prepare the asynchronous task scheduler, we need
postgres itself and its
pg_task extension. (I gave links to my postgres fork, because I made some changes that I couldn’t get into the original repository yet. You can also use a
ready-made way .)
(In the original PostgreSQL there is an error in PL / pgSQL, due to which my scheduler does not work correctly when an uncaught exception occurs in a task written in PL / pgSQL. I described this error
here and fixed it in my fork
here .)
To install the scheduler is not required to create an extension in (each) database. Instead, simply add it to the configuration file.
shared_preload_libraries = 'pg_task'
After restarting PostgreSQL, the scheduler will create task tables on behalf of database users and default schemas for these users in all databases.
')
If you want the scheduler to run only for specific databases, you can specify them in the configuration file
pg_task.database = 'database1,database2'
If you want to run the scheduler not from database users, then this can also be set
pg_task.database = 'database1:user1,database2:user2'
If you need to create scheduler tables not in the default scheme for users, you can set it this way.
pg_task_schema.database1 = schema3
If it is required to call the table of the scheduler differently, it can be done so
pg_task_table.database1 = table3
By default, the scheduler checks tasks every 1000 ms, but this can be changed so
pg_task_period.database1 = 100 pg_task_period.database2 = 10
So, the scheduler creates (if not yet created) (a scheme, if necessary, and) a task table with such columns
id BIGSERIAL NOT NULL PRIMARY KEY,
In fact, the scheduler starts several background workflows: one to track changes in the configuration file and to start / stop if necessary for the other background processes of the scheduler. And one background workflow for each database to check the scheduled tasks in each database and start up if necessary to perform tasks.
For example, if we want to execute a task as quickly as possible, then we execute the SQL command
INSERT INTO task (request) VALUES ('SELECT now()')
The task scheduler will write the result of the task to the result column in text form. If, as a result of the task, there are several columns, the scheduler will add them to the header along with the column types. Also, as a result of the task execution there may be several lines, all of them will be added to the result column.
If we want to complete a task, for example, after 5 minutes, then we write the planned time in the appropriate column
INSERT INTO task (dt, request) VALUES (now() + '5 min':INTERVAL, 'SELECT now()')
and if we want the task to be completed at a specific time, then we will write
INSERT INTO task (dt, request) VALUES ('2019-07-01 00:00:00', 'SELECT now()')
If we need the task to be performed every 5 minutes, then we write
INSERT INTO task (repeat, request) VALUES ('5 min', 'SELECT now()')
if you write like that
INSERT INTO task (repeat, request, drift) VALUES ('5 min', 'SELECT now()', false)
then the repetition of the task will be started 5 minutes after the completion of the task (and not after the scheduled time, as by default).
When an exception occurs during the execution of a task, it is intercepted and in text form is recorded in the result column, and the corresponding state is assigned to the task.
for example
INSERT INTO task (request) VALUES ('SELECT 1/0')
If it is necessary for no more than 2 tasks to be simultaneously executed for any task queue, then we create tasks with the command
INSERT INTO task (queue, max, request) VALUES ('queue', 2, 'SELECT now()')
Let we have accumulated a lot of tasks in this queue and they are simultaneously executed by 2. If you create a task with the command
INSERT INTO task (queue, max, request) VALUES ('queue', 3, 'SELECT now()')
then it will be executed as soon as possible for all other tasks in this queue, i.e. it's something like priority