📜 ⬆️ ⬇️

PostgreSQL Recipes: Asynchronous Task Scheduler

To prepare the asynchronous task scheduler, we need postgres itself and its pg_task extension. (I gave links to my postgres fork, because I made some changes that I couldn’t get into the original repository yet. You can also use a ready-made way .)

(In the original PostgreSQL there is an error in PL / pgSQL, due to which my scheduler does not work correctly when an uncaught exception occurs in a task written in PL / pgSQL. I described this error here and fixed it in my fork here .)

To install the scheduler is not required to create an extension in (each) database. Instead, simply add it to the configuration file.

shared_preload_libraries = 'pg_task' 

After restarting PostgreSQL, the scheduler will create task tables on behalf of database users and default schemas for these users in all databases.
')
If you want the scheduler to run only for specific databases, you can specify them in the configuration file

 pg_task.database = 'database1,database2' 

If you want to run the scheduler not from database users, then this can also be set

 pg_task.database = 'database1:user1,database2:user2' 

If you need to create scheduler tables not in the default scheme for users, you can set it this way.

 pg_task_schema.database1 = schema3 

If it is required to call the table of the scheduler differently, it can be done so

 pg_task_table.database1 = table3 

By default, the scheduler checks tasks every 1000 ms, but this can be changed so

 pg_task_period.database1 = 100 pg_task_period.database2 = 10 

So, the scheduler creates (if not yet created) (a scheme, if necessary, and) a task table with such columns

 id BIGSERIAL NOT NULL PRIMARY KEY, -- ,   dt TIMESTAMP NOT NULL DEFAULT NOW(), --     (- -   ) start TIMESTAMP, --     stop TIMESTAMP, --      queue TEXT NOT NULL DEFAULT 'default', --    (      ) max INT, --        (,      ) pid INT, --  ,    request TEXT NOT NULL, --  SQL   response TEXT, --    state TEXT NOT NULL DEFAULT 'QUEUE', --   (- - ,    , , ...) timeout INTERVAL, --      delete BOOLEAN NOT NULL DEFAULT false, --     ,    repeat INTERVAL, --    drift BOOLEAN NOT NULL DEFAULT true --        

In fact, the scheduler starts several background workflows: one to track changes in the configuration file and to start / stop if necessary for the other background processes of the scheduler. And one background workflow for each database to check the scheduled tasks in each database and start up if necessary to perform tasks.

For example, if we want to execute a task as quickly as possible, then we execute the SQL command

 INSERT INTO task (request) VALUES ('SELECT now()') 

The task scheduler will write the result of the task to the result column in text form. If, as a result of the task, there are several columns, the scheduler will add them to the header along with the column types. Also, as a result of the task execution there may be several lines, all of them will be added to the result column.

If we want to complete a task, for example, after 5 minutes, then we write the planned time in the appropriate column

 INSERT INTO task (dt, request) VALUES (now() + '5 min':INTERVAL, 'SELECT now()') 

and if we want the task to be completed at a specific time, then we will write

 INSERT INTO task (dt, request) VALUES ('2019-07-01 00:00:00', 'SELECT now()') 

If we need the task to be performed every 5 minutes, then we write

 INSERT INTO task (repeat, request) VALUES ('5 min', 'SELECT now()') 

if you write like that

 INSERT INTO task (repeat, request, drift) VALUES ('5 min', 'SELECT now()', false) 

then the repetition of the task will be started 5 minutes after the completion of the task (and not after the scheduled time, as by default).

When an exception occurs during the execution of a task, it is intercepted and in text form is recorded in the result column, and the corresponding state is assigned to the task.

for example

 INSERT INTO task (request) VALUES ('SELECT 1/0') 

If it is necessary for no more than 2 tasks to be simultaneously executed for any task queue, then we create tasks with the command

 INSERT INTO task (queue, max, request) VALUES ('queue', 2, 'SELECT now()') 

Let we have accumulated a lot of tasks in this queue and they are simultaneously executed by 2. If you create a task with the command

 INSERT INTO task (queue, max, request) VALUES ('queue', 3, 'SELECT now()') 

then it will be executed as soon as possible for all other tasks in this queue, i.e. it's something like priority

Source: https://habr.com/ru/post/456722/


All Articles