📜 ⬆️ ⬇️

Symfony + RabbitMQ Quick start for young

Good day, friends.
Today I wanted to talk about how you can work with RabbitMQ in Symfony and quite a bit about some underwater rooms. At the end I will write a couple of interesting moments about the rabbit (Russian translation of "rabbit") for those who are completely in the tank.

I will not talk about RabbitMQ myself, so if you don’t know it yet, read the following translations:

Article 1
Article 2
Article 3
Article 4
Article 5
')
Do not be afraid of examples on the pearl or Python - this is not scary, everything is quite clear from the source code.

+ Everything is described in sufficient detail when I read it at one time; it was enough to interpret the code in my mind in order to understand what and why.

If you already know what a consumer is and why you need to do $ em-> clear () + gc_collect_cycles in it, and then close the connection to the database, then most likely you will not learn anything new for yourself. The article is more likely for those who do not want to deal with the AMQP protocol, but who need to use queues right now and for some reason, the choice fell mindlessly on RabbitMQ, and not the same lightweight beanstalkd .
If you have a microservice architecture and you are waiting for me to tell you how to weld the communication between components via AMQP, how beautiful it is to do RPC, then I myself have been waiting for something like this for a long time on Habré ...

We have a task: to send messages to the Email in the queue using RabbitMQ, as well as to ensure fault tolerance: if the mail server responds with a timeout or something else is broken - you need to try the task after 30 seconds again.

So, install our bundle .

I'm too lazy to describe to you how to copy the composer require command and line in AppKernel.

I really hope that you have done this yourself and are ready to start configuring our bundle.

If not, here is a complete guide for the little ones:
RabbitMQ installation:
echo 'deb http://www.rabbitmq.com/debian/ testing main' | sudo tee /etc/apt/sources.list.d/rabbitmq.list wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add - sudo apt-get update sudo apt-get install rabbitmq-server sudo rabbitmq-plugins enable rabbitmq_management 

Now you can open your localhost : 15672 under the account: guest guest and see a lot of cool things that you will soon understand and feel like a man.

Now install the bundle itself:

 composer require php-amqplib/rabbitmq-bundle 

And register it in our application:

 // app/AppKernel.php public function registerBundles() { $bundles = array( new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle(), ); } 

That's all.



Bundle configuration for us:

 old_sound_rabbit_mq: connections: default: host: 'localhost' port: 5672 user: 'guest' password: 'guest' vhost: '/' lazy: false connection_timeout: 3 read_write_timeout: 3 keepalive: false heartbeat: 0 use_socket: true producers: send_email: connection: default exchange_options: { name: 'notification.v1.send_email', type: direct } consumers: send_email: connection: default exchange_options: { name: 'notification.v1.send_email', type: direct } queue_options: { name: 'notification.v1.send_email' } callback: app.consumer.mail_sender 

Here, great attention should be paid to producers and consumers. If it is very short and simple: producer is what sends messages through RabbitMQ to a consumer, and consumer in turn is the thing that receives and processes these messages. Here exchange_options - options for the exchanger (did you read the articles about rabbitmq, which were at the beginning of the article?), Queue_options - options for the queue (similarly). You should also pay attention to the callback in the consumer - here you can see the service ID, which extends the ConsumerInterface (execute method with message argument).

Since so far you don’t have it, when you start the application or compile the container, we will get some DI exception, that the service was not found, but we are requesting it. Therefore, let's create our service:

 #app/config/services.yml services: app.consumer.mail_sender: class: AppBundle\Consumer\MailSenderConsumer 

And the class itself:

 namespace AppBundle\Consumer; use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface; use PhpAmqpLib\Message\AMQPMessage; /** * Class NotificationConsumer */ class MailSenderConsumer implements ConsumerInterface { /** * @var AMQPMessage $msg * @return void */ public function execute(AMQPMessage $msg) { echo '     : '.$msg->getBody().PHP_EOL; echo ' !...'; } } 

Well, you are not offended that I did not include in the article how to work with SwiftMailer? :) It is important for us that a string is delivered asynchronously here via a message queue, then how we will process this string is our business. Mail is just an example of a case.

How do we pass a string to our consumer account? To do this, let's create a test command:

 namespace AppBundle\Command; use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; class TestConsumerCommand extends ContainerAwareCommand { /** * {@inheritdoc} */ protected function configure() { $this ->setName('app:test-consumer') ->setDescription('Hello PhpStorm'); } /** * {@inheritdoc} */ protected function execute(InputInterface $input, OutputInterface $output) { $this->getContainer()->get('old_sound_rabbit_mq.send_email_producer')->publish('    ...'); } 

Again, I apologize for not having made a controller with a beautiful mold for you - I am too lazy for that. Yes, and too unnecessarily. And I'm an economical bummer and I like to draw, I dream a little towards theories and application architecture. Distracted.

Now we start our consumer and order it to wait for a message from RabbitMQ:

 bin/console rabbitmq:consumer send_email -vvv 

And send him a message from our test team:

 bin/console app:test-consumer 

And now, in the process of rabbitmq: consumer, we can see our message! And that pseudo dispatch ended in success.

And now let's see how we can implement deferred processing of messages in case of errors. I will not use the RabbitMQ plugin for pending messages. We will achieve this by creating a new queue, in which we will indicate the lifetime of messages for 30 seconds and set the setting: after death, move it to the main queue.

Just add a new producer:

 producers: send_email: connection: default exchange_options: { name: 'notification.v1.send_email', type: direct } delayed_send_email: connection: default exchange_options: name: 'notification.v1.send_email_delayed_30000' type: direct queue_options: name: 'notification.v1.send_email_delayed_30000' arguments: x-message-ttl: ['I', 30000] x-dead-letter-exchange: ['S', 'notification.v1.send_email'] 

Now let's change the consummer logic:

 namespace AppBundle\Consumer; use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface; use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface; use PhpAmqpLib\Message\AMQPMessage; /** * Class NotificationConsumer */ class MailSenderConsumer implements ConsumerInterface { private $delayedProducer; /** * MailSenderConsumer constructor. * @param ProducerInterface $delayedProducer */ public function __construct(ProducerInterface $delayedProducer) { $this->delayedProducer = $delayedProducer; } /** * @var AMQPMessage $msg * @return void */ public function execute(AMQPMessage $msg) { $body = $msg->getBody(); echo '     '.$body.' ...'.PHP_EOL; try { if ($body == 'bad') { throw new \Exception(); } echo ' ...'.PHP_EOL; } catch (\Exception $exception) { echo 'ERROR'.PHP_EOL; $this->delayedProducer->publish($body); } } } 

In general, it is useful to use LoggerInterface for output - and it is nice and scalable.
But we are lazy and we do not want to create additional “pillows”, right? Just know.

Now we have to flip the producer for the delayed queue:

 #app/config/services.yml services: app.consumer.mail_sender: class: AppBundle\Consumer\MailSenderConsumer arguments: ['@old_sound_rabbit_mq.delayed_send_email_producer'] 

And change the command:

 namespace AppBundle\Command; use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; class TestConsumerCommand extends ContainerAwareCommand { /** * {@inheritdoc} */ protected function configure() { $this ->setName('app:test-consumer') ->setDescription('Hello PhpStorm'); } /** * {@inheritdoc} */ protected function execute(InputInterface $input, OutputInterface $output) { $this->getContainer()->get('old_sound_rabbit_mq.send_email_producer')->publish(', ...'); $this->getContainer()->get('old_sound_rabbit_mq.send_email_producer')->publish('bad'); } } 

Now, along with the normal message, it will send a bad message.

If we run, we will see the following output:

      , ...  ...      bad... ERROR 

After 30 seconds, the processing message appears again:
      bad... ERROR 

And so endlessly. The logic of maximum attempts, etc. think for yourself. Next I will give a couple of tips for your sales and some features.

Now tips for your sales:

1) Without departing from the topic with the maximum processing attempts: know for all 102% all possible exceptions to the context with which you work! Be able to imagine when re-processing is required, and when not, otherwise - hello trash from the logs and lack of understanding of what is happening. If the broken task is spinning in RabbitMQ, with real data, normal tasks, you can hardly throw out broken tasks without crutches, without updating the concierge code or restarting it. So think it over immediately. In this case, it would be correct to catch only some SMTPTimeOutException.
Also with such a model it is important to understand that: on stage 1 there is one “global responsibility for changing the state of something”. Do not give too many risky tasks to your worker. If we consider the option with 1C, then the problem may be the following: let's say if the product is added or changed successfully in 1C, we write something in the database, for example, the date of the last successful synchronization or unsuccessful. Those. 2 databases are updated here immediately: database 1C and database of your application. Suppose everything has been successfully created in 1C, then the field “date of last successful synchronization” is updated in the database - hop, an error has come out, again, the database server does not respond - the task is postponed to “later” and repeats until the database starts responding. And at the same time, every time the “subtask” associated with the creation of an entity in 1C will succeed, each time if the attempt to write to the site database fails, which is wrong.

2) Read about durable, since we are using RabbitMQ. PS: it starts up as true \ false flag “durable” in the config of the bundle, specifically in exchange_options and queue_options

3) All your life close the connection to the database after the execution of the program. And also run the EM cleanup and after the garbage collector to clean up the links. Those. in the end, our consumer should look something like this:

 class MailSenderConsumer implements ConsumerInterface { private $delayedProducer; private $entityManager; /** * MailSenderConsumer constructor. * @param ProducerInterface $delayedProducer * @param EntityManagerInterface $entityManager */ public function __construct(ProducerInterface $delayedProducer, EntityManagerInterface $entityManager) { $this->delayedProducer = $delayedProducer; $this->entityManager = $entityManager; gc_enable(); } /** * @var AMQPMessage $msg * @return void */ public function execute(AMQPMessage $msg) { $body = $msg->getBody(); echo '     '.$body.' ...'.PHP_EOL; try { if ($body == 'bad') { throw new \Exception(); } echo ' ...'.PHP_EOL; } catch (\Exception $exception) { echo 'ERROR'.PHP_EOL; $this->delayedProducer->publish($body); } $this->entityManager->clear(); $this->entityManager->getConnection()->close(); gc_collect_cycles(); } } 

Consumer works as a demon, so constantly storing links in it and keeping the connection with the database is bad. In the case of MySQL, you will get a MySQL server that has gone away.

4) Think a lot about why your pending message model may unexpectedly kill your business. For example, we have a mechanism that, when a product changes in the admin panel, floods these changes through a queue in 1C. Now imagine a situation: the administrator changes the product -> task # 1 is created to try to change the same data in 1C database. The 1C server does not respond, so the task is simply shifted constantly until everything works. During this time, the administrator decided to correct something else in the same product, which he does. Task # 2 is logged.

Now imagine a situation where tasks # 1 and # 2 are alternately carried out and postponed.

What if 1C works by the time task # 2 is completed? The task will be completed and flood the latest changes. Next, task # 1 will go into motion and wipe the stable changes :)
Exit: send the timestamp as a version, and if the task is “from the past”, we throw it away.

5) You go into asynchrony - read about many architectural problems, as well as race condition, inconsistency of consumers on different machines and so on.

6) Write versions of your queues ... Wow, how it helps on real sales. In principle, we did just that in this example.

7) Maybe you do not need RabbitMQ and the whole AMQP protocol. Look towards beanstalkd.

8) Start up the consumer and any other demonic on php through the supervisor and enable full logging of the crash of the processes in it. He also has a web interface for managing this whole thing, which is also very convenient. There will always be problems.

Source: https://habr.com/ru/post/338950/


All Articles