📜 ⬆️ ⬇️

Queue management in Laravel

image

In my current project there are a lot of tasks that are performed in the background. Data from the external service arrives and goes through several stages of processing. Processing is implemented through a queue mechanism. This is convenient, you can vary the number of workers for each type of process. And in the event that something falls, the queue will accumulate, and the data will not be lost - will be processed as soon as the problem is resolved.

To create a task from one process for the next processing stage, we simply called dispatch() at the end of processing, like this:

 class MyFirstJob extends Job { use DispatchesJobs; protected $data; public function __construct($data) { $this->data = $data; } public function handle() { $this->doSomething($this->data); $this->dispatch(new MySecondJob($this->data)); // Second task } } 

And the next stage of processing was initiated quite similarly:
')
 class MySecondJob extends Job { use DispatchesJobs; protected $data; public function __construct($data) { $this->data = $data; } public function handle() { $this->doSomething($this->data); if ($this->someCondition($this->data)) { $this->dispatch(new MyThirdJob($this->data)); // Third task } } } 

At first it was good, but new stages of processing were added and the chain grew. Once again, when I had to add another processing stage (new line), I caught myself thinking that I no longer remember exactly what was being processed and in what sequence. And the code to understand it is not so easy. There appeared elements of business logic: in such a case, such and such processing is started, in another case, a set of tasks is created right away. In general, everything that we “love” so much to see in large systems.

Oh, oh, I thought, it's time to do something. And I decided that it would be very convenient to take control of the processing order (the order of dispatch() calls) into a separate code. Then everything will be logical and clear - here we have a business process (control code, queue manager), here we have its individual pieces (queues).

I did so and am still pleased. Now I will tell you exactly what I did. I would be glad if you find this approach useful.

Queue management


We have several independent data processing processes. To describe each algorithm separately, we make an abstract class for the queue manager.

 <?php namespace App\Jobs\Pipeline; use App\Jobs\Job; use Illuminate\Foundation\Bus\DispatchesJobs; abstract class PipelineAbstract { use DispatchesJobs; /** * @param array $params * @return PipelineAbstract */ public function start(array $params) { $this->next(null, $params); return $this; } /** * @param Job $currentJob * @param array $params Set of parameters for starting new jobs */ abstract public function next(Job $currentJob = null, array $params); /** * @param Job $job */ protected function startJob(Job $job) { $this->dispatch($job); } } 

In the next() method, we will just implement the business process. startJob() is just a wrapper over dispatch() just in case. And we will use start() in the place where we need to initiate the entire data processing process (where data from an external service arrive).

An example of business logic implementation:

 <?php namespace App\Jobs\Pipeline; use App\Jobs\Job; use App\Jobs\MyFirstJob; use App\Jobs\MySecondJob; use App\Jobs\MyThirdJob; class ProcessDataPipeline extends PipelineAbstract { /** * @inheritdoc */ public function next(Job $currentJob = null, array $params) { // Start first job if ($currentJob === null) { $this->startJob(new MyFirstJob($params, $this)); } if ($currentJob instanceof MyFirstJob) { $this->startJob(new MySecondJob($params, $this)); } if ($currentJob instanceof MySecondJob) { if ($this->someCondition($params)) { $this->startJob(new MyThirdJob($params, $this)); } } } } 

That's all. It remains only to replace the launch of MyFirstJob .

It was

 $this->dispatch(new MyFirstJob($data)); 

It became

 (new ProcessDataPipeline())->start($data); 

And instead of adding jobs to the rest of the queues, we call the next() method.

It was

 $this->dispatch(new MySecondJob($data)); 

It became

 $this->next($data); 

I almost forgot. We still have to refine the base queue class for this. In the code above, it is clear that when instantiating the object of the queue, we now also pass the pipeline along with the data to the object.

 <?php namespace App\Jobs; use App\Jobs\Pipeline\PipelineAbstract; abstract class Job { /** * @param array $params */ public function next(array $params) { if ($this->pipeline) { $this->pipeline->next($this, $params); } } } 

And in the constructors of specific jobs, we accept an instance of the pipeline, so that the steps of the business logic (the next() method call) are processed by the desired implementation of the pipeline.

 class MyFirstJob extends Job { /** * @param mixed data * @param PipelineAbstract|null $pipeline */ public function __construct($data, PipelineAbstract $pipeline = null) { $this->data = $data; $this->pipeline = $pipeline; } } 

Now that's it. It turned out like a chain of responsibility. I tried to explain the idea in simple language. If suddenly you also wanted to do this, then I published a working example of implementation, perhaps so it will be more convenient for someone than in words:

What good



By the way, in the latest version of Laravel there appeared a similar tool withChain() , it guarantees the execution of tasks in strict sequence. In simple cases this will be enough. But in cases when there are conditions for launching one process or another, when data for the next process are born in the previous one, a more universal mechanism is still needed. For example, the one that I talked about in this article.

Source: https://habr.com/ru/post/354364/


All Articles