📜 ⬆️ ⬇️

Distributed transactions between RabbitMQ and MS SQL

To implement asynchronous communication between the two systems, it is very beneficial to use message queues. Even if one of the systems is lying, the other does not notice this and calmly continues to send messages to it, which will be processed when the second system is raised. The MS SQL table can be used as a message queue, but this is not a particularly scalable solution.

However, as soon as we have a separate system for storing the message queue (we use RabbitMQ), we immediately have problems with the transaction. For example, if we want to save a note in the database stating that we sent a message to Rabbit, it is not so easy to ensure that the mark is saved only if the message has been sent successfully. How we coped with this problem read under a cat.

In some scenarios, you can simply send messages to RabbitMQ after completing the SQL transaction. For example, if we need to send an email with a password during registration and there is a button “re-send the letter” on the page that appears after registration, then we can easily afford to do without any transaction and in case of a message sending error, just display a notification to the user .

You can send a message right before committing a SQL transaction. In this case, we can roll back the SQL transaction if the sending of the message falls, but there is a possibility that after the successful sending of the message, the commit of the SQL transaction will fall. But if a situation is acceptable for you when rare messages are delivered to the receiving system, but the sending system forgets about it, I would recommend using this method, since it is very simple to implement.
')
In scenarios, when a dropped transaction will be necessarily repeated, you can not be afraid that the sender does not have a sending record (and moreover, you can send a message at any time, and not just before the commit). However, it is necessary to make the message processing operation idempotent so that the same from the point of view of the sending system does not process the message two times in the receiving system.

For example, we need to send the consumer an email and put a note about it in the database. Consumer data is stored in the CRM system. The CRM system communicates with the email gateway through a queue in RabbitMQ. Sending a message is performed by a task that has a unique identifier and a list of consumers who need to send a message. If the processing of sending a letter to the consumer drops (for example, by SQL timeout), then after a while the task will try to send the message again. In this scenario, we can send a message to RabbitMQ before completing the transaction, but when processing a message in an email gateway, we need to keep a unique task identifier and a customer number in the list. If the email gateway database already has a message with the same task ID and customer number, then we do not send it again.

In order for the email gateway to abstract from exactly how CRM sends messages, CRM must not transmit the task identifier and customer numbers in the list, but the idempotency key - a unique value generated from this data. With other ways of sending an email, the key idempotency will be shaped differently. With this approach, the email gateway should not know anything about how messages can be sent to it — the main thing is for the sender to transmit a key that uniquely identifies the message.

Not in all cases it is possible to guarantee that in case of a SQL transaction fall, it will be repeated after a while. Also, there is not always data on the basis of which it is possible to form a unique key of idempotency. And it is advisable to always make the operation of processing a message from the queue idempotent, since even in the absence of duplicate messages, one message can be processed several times if the call to the Ack RabbitMQ method falls. To solve the problem in the general case, we need something like a distributed transaction between RabbitMQ and MS SQL and an automatically generated idempotency key. Both of these tasks can be solved as follows:
  1. Within the SQL transaction, a unique message identifier is stored in a special table in the database.
  2. After an INSERT request is completed, but until the SQL transaction is completed, the message is saved in the intermediate queue. In this message, among other things, a unique identifier is transmitted, which was saved in the database.
  3. A separate task processes the intermediate queue and checks that the unique message identifier is in the database.
  4. If there is, the message is transferred to the queue, which is already processed by the receiving system. In order not to store old identifiers in the auxiliary table, after the message has been moved, its identifier is deleted from the database (even if the deletion of the identifier falls, this will not affect the performance of the system - just an extra record in the database will remain).
  5. If at the time of the request to write to the database with a unique identifier, the transaction has not yet been completed, then the request will wait for the completion of this transaction, and only after that it will return the record. That is, no additional logic is needed to wait for the completion of the transaction.
  6. If the unique identifier is not in the database, this means that the transaction has been rolled back and the message is discarded.
  7. A unique message identifier is used in the recipient system as a key idempotency.

image
With this approach, it is guaranteed that the sending system will keep information about the sent message. If a record with a unique identifier is created in the message sending transaction, you can use it and dispense with the auxiliary table.

Here the question may arise: “And how is it better to use a table in the database as a queue? Anyway, you have to do auxiliary queries to the database. ”The fact is that if you use a table in the database as a queue, then to get the last unprocessed message, queries like“ SELECT TOP 1 * FROM Messages WHERE Status = 'New' ”will be executed. If we want to process messages in several threads, then in order to ensure that one message will not be processed by two different threads, we will have to use a Serializable transaction to receive the last message and change its status. When using a Serializable transaction, a request to receive the last unprocessed message will block all records with the 'New' status and no one will be able to add new messages until the transaction of receiving the message ends.

But in such a transaction there will always be a deadlock, since two threads will be able to simultaneously read the last raw message, imposing a shared lock, and then trying to update the message status will not be able to raise the level of this lock to an exclusive one, and one of the transactions will be rolled back. Therefore, when reading a message, it is necessary to impose an update lock on the update. As a result, the queue will become a bottleneck, since only one stream can get access to it (both for writing and reading) at a time.

If we use the approach described above, then all queries to the auxiliary table (insert, search and delete) are performed using a known unique key and block only one record in the database. Therefore, when multi-threaded processing of messages, there is no bottleneck in which several threads will wait for the lock to be released in order to add or receive a message.

Source: https://habr.com/ru/post/257949/


All Articles