
Dear habrachitel, I want to share my experience about the application-transparent transition from the PgQ to amqp queue. Maybe it will seem like a bicycle, maybe some thoughts will be useful. The article assumes familiarity with the basics of PgQ and rabbitmq.
Prerequisites
So historically, in our project PgQ is very actively used. With all its flaws, PgQ has an undeniable advantage - transactionality with the database, which our code actively used. That is, you can be sure that the event will be in the queue, and the base will be updated. Or neither will happen. And this advantage should be transferred to the new queue engine.
I will not describe here the reasons for leaving PgQ in detail, this is a topic for another article. I’ll dwell only on the transition itself.
')
Thinking, pg_amqp
Googling leads to an extension for PostgreSQL -
pg_amqp . It provides stored procedures in PostgreSQL for sending to amqp. The extension works fine at the logic level of the application: by rolling back the transaction in PostgreSQL, the data in amqp will not fall. And if we commit, they will.
BEGIN; INSERT INTO some_table (...) VALUES (...); SELECT amqp.publish(broker_id, 'amqp.direct', 'foo', 'bar'); ROLLBACK; // , amqp
In fact, the extension does not guarantee that the message will go to amqp. Inside there is only a serial commit transaction, first in PostgreSQL, and then in amqp. And if the connection with amqp between two commits disappears, the message will disappear. Despite the fact that the probability of such an event is small, there will be lost packets. And given that we work with real money and trading accounts, this is unacceptable.
For those to whom the loss of 0.01% of packages is permissible - the rest of the article can not be read. Just use pg_amqp if you want to leave PgQ for amqp.
Getting to build a bike
* Next, instead of the abstract amqp, there will be a specific rabbitmq.
But after all, we still have PostgreSQL, within which transactions are complete. And we can insert all the packages into a table in a transactional way, and then somehow send to amqp something that didn’t get there.
No sooner said than done.
All the work with PgQ in our application was done using a single stored procedure, which I could freely change.
I created a table
amqp.message( id bigint default nextval('amqp_message_id_sequence') primary key, pid bigint, queue varchar(128), message text )
and the trigger that, when inserted into the table, sent this data to amqp. And the insert storage in pgq was replaced by an insert of data in this table. Overhead, however, is only in sending data to amqp, since in PgQ, too, each event is inserted into the table. Why do I need a pid will explain later.
Now there are messages both in the table and at the recipient from rabbitmq. Messages are written to the table guaranteed within the PostgreSQL transaction, and
almost all messages are sent to amqp using pg_amqp. But how to understand which messages came and which did not? And how to keep this table in sane size (preferably tens or hundreds of rows), so as not to lose productivity?
Here rabbitmq comes to the rescue. After all, he is able to duplicate messages in several queues

So let's give one turn to our business code and use the second to confirm receipt of the package?
No sooner said than done. We create exchange, 2 queues and a messenger who simply deletes the received message from the amqp.message table.
As a result, there is a table in which only those messages that are “on the way” are stored. The size of the table is always small, since the messages are deleted immediately after insertion. The size of the table can be put on monitoring. And the business code of our application now works only with rabbitmq and knows nothing about magic under the hood.
Here is the final scheme of work.

But now an important question arises: how to understand that some package did not come? After all, a row in the amqp.message table still does not guarantee that the message is gone - it may just be “on the way.” We need to be sure of this in order to send a package, otherwise we can create a duplicate of the package, and transfer $ 200 instead of $ 100 to someone :) And at the same time determine that the package did not come and send it as quickly as possible to the minimum disrupt the order of packets in the queue.
Here begins the main shamanism
All our packages are numbered in ascending order, but the system is multi-threaded, and packages are not required to come to rabbitmq in the order in which they are in the table. But within the framework of one process that sends messages to amqp, they must be strictly ordered. PostgreSQL provides the ability to view the pid of the current process (pg_backend_pid ()). And we can expect that within one pg_backend_pid () packages will be strictly ordered in ascending order (I remind you that we generate the package id using nextval). Consequently, when receiving a packet with id N, all packets from the same pg_backend_pid with id below N are not delivered and need to be sent.
So we need to do a queue scam that does only 2 things:
- Listens to the queue "queue messenger"
- Checks if the amqp.message table contains messages with the current pid and id less than the current one. If there is, it sends them (sends it again transactionally, via the amqp.message table)
- Removes messages by id from amqp.message table
Profit! We have all the messages reach the addressee, and at the same time we completely got rid of PgQ. The code of the main application has not changed.
Overhead for all:
- The amqp.message system table into which we insert each message, and then delete
- Messenger that removes strings from amqp.message and sends messages
I draw attention to the fact that the logic of the dispenser is completely resistant to falls. You can kill him at any time, he will start up again and continue working without any problems.
The system does not take into account the case when the postgres process sent a packet to amqp, which did not reach, and no longer sends packets. I would be grateful if someone tells you how to automatically handle this situation. Now this is solved simply by monitoring, but not a single such event has yet occurred. In general, sending a message is a very rare event. We use pgbouncer, which reduces the number of different pg_backend_pid.