📜 ⬆️ ⬇️

Testing parallel processes

image

Have you met with errors that occur from time to time in production, but cannot be reproduced in any way locally? It happens, you study such a bug and suddenly you realize that it manifests itself only with simultaneous parallel execution of scripts. Having studied the code, you understand how to fix it so that this does not happen again. But it would be good to write a test for such a fix ...

In the article I will talk about my approach to testing such situations. And also I will give several illustrative (and probably even classical) examples of bugs that are convenient to test using this approach. All examples of bugs are alive - that is found in the work.
')
Looking ahead, I’ll immediately say that at the end of the article there will be a link to github, where I posted a ready-made solution that allows you to test parallel console processes easily and simply.

Example number one. Parallel addition of the same


Task. We have an application with a database (PostgreSQL) and we need to adjust the import of data from a third-party system. Suppose there is a table account (id, name) and the association of identifiers with the external system in the account_import (id, external_id) table account_import (id, external_id) . Let's outline a simple message receiving mechanism.

When receiving a message, we will first check if there are such records in our database. If so, we will update the available ones. If not, we will add to the database.

 $data = json_decode($jsonInput, true); // '{"id":1,"name":"account1"}' try { $connection->beginTransaction(); // ,       $stmt = $connection->prepare("SELECT id FROM account_import WHERE external_id = :external_id"); $stmt->execute([ ':external_id' => $data['id'], ]); $row = $stmt->fetch(); usleep(100000); // 0.1 sec //      ,    if ($row) { $stmt = $connection->prepare("UPDATE account SET name = :name WHERE id = ( SELECT id FROM account_import WHERE external_id = :external_id )"); $stmt->execute([ ':name' => $data['name'], ':external_id' => $data['id'], ]); $accountId = $row['id']; } //     else { $stmt = $connection->prepare("INSERT INTO account (name) VALUES (:name)"); $stmt->execute([ ':name' => $data['name'], ]); $accountId = $connection->lastInsertId(); $stmt = $connection->prepare("INSERT INTO account_import (id, external_id) VALUES (:id, :external_id)"); $stmt->execute([ ':id' => $accountId, ':external_id' => $data['id'], ]); } $connection->commit(); } catch (\Throwable $e) { $connection->rollBack(); throw $e; } 

At first glance it looks good. But if the data in our system can not be transmitted strictly in a consistent manner, then we may encounter a problem. The delay is 0.1 seconds in this example, we need to ensure that the problem is reproduced. What happens if you import the same data in parallel? Probably, instead of the data being added, and then updated, there will be an attempt to re-insert the data and, as a result, an error of violation of the primary key in account_import.

To correct a mistake, it would be good to reproduce it first. And the best thing is to write a test that reproduces the error. I decided to run the commands asynchronously for this using bash and wrote a simple script for this, which can be used not only in conjunction with PHP.

The idea is simple - we run several instances of commands in the background, then we wait for them to complete, and check the execution codes. If among the execution codes there are non-zero, then we have found a bug. In a simplified form, the script will look like this:

 # ,    COMMAND=”echo -e '{\"id\":1,\"name\":\"account1\"}' | ./cli app:import” # PID-    pids=() #     results=() #      () expects=() #         stderr for i in $(seq 2) do eval $COMMAND 1>&2 & pids+=($!) ; echo -e '>>>' Process ${pids[i-1]} started 1>&2 done #         $results for pid in "${pids[@]}" do wait $pid results+=($?) expects+=(0) echo -e '<<<' Process $pid finished 1>&2 done #      result=`( IFS=$', '; echo "${results[*]}" )` expect=`( IFS=$', '; echo "${expects[*]}" )` if [ "$result" != "$expect" ] then exit 1 fi 

The full version of the script posted on github .

Based on this command, we can add new assertions to PHPUnit. Everything is easier here and I will not dwell on this in detail. I can only say that they are implemented in the above project. To use them, it is enough to connect the AsyncTrait AsyncTrait to your test.

We write this test.

 use App\Command\Initializer; use Mnvx\PProcess\AsyncTrait; use Mnvx\PProcess\Command\Command; use PHPUnit\Framework\TestCase; use Symfony\Component\Console\Tester\CommandTester; class ImportCommandTest extends TestCase { use AsyncTrait; public function testImport() { $cli = Initializer::create(); $command = $cli->find('app:delete'); //   c external_id = 1, //           $commandTester = new CommandTester($command); $commandTester->execute([ 'externalId' => 1, ]); $asnycCommand = new Command( 'echo -e \'{"id":1,"name":"account1"}\' | ./cli app:import', //   dirname(__DIR__), // ,      2 //     ); //   $this->assertAsyncCommand($asnycCommand); } } 

As a result of launching the test, we will get this output.

 $ ./vendor/bin/phpunit PHPUnit 6.1.1 by Sebastian Bergmann and contributors. F 1 / 1 (100%) Time: 230 ms, Memory: 6.00MB There was 1 failure: 1) ImportCommandTest::testImport Failed asserting that command echo -e '{"id":1,"name":"account1"}' | ./cli app:import (path: /var/www/pprocess-playground, count: 2) executed in parallel. Output: >>> Process 18143 started >>> Process 18144 started Account 25 imported correctly [Doctrine\DBAL\Exception\UniqueConstraintViolationException] An exception occurred while executing 'INSERT INTO account_import (id, exte rnal_id) VALUES (:id, :external_id)' with params ["26", 1]: SQLSTATE[23505]: Unique violation: 7 :       "account_import_pkey" DETAIL:  "(external_id)=(1)"  . ------- app:import <<< Process 18143 finished <<< Process 18144 finished . /var/www/pprocess-playground/vendor/mnvx/pprocess/src/AsyncTrait.php:19 /var/www/pprocess-playground/tests/ImportCommandTest.php:30 FAILURES! Tests: 1, Assertions: 1, Failures: 1. 

The reason we have already discussed. Now we will try to add a forced blocking of parallel execution of a fragment of our script ( malkusch / lock is used here).

 $mutex = new FlockMutex(fopen(__FILE__, 'r')); $mutex->synchronized(function () use ($connection, $data) { //     try }); 

Test passed:

 $ ./vendor/bin/phpunit PHPUnit 6.1.1 by Sebastian Bergmann and contributors. . 1 / 1 (100%) Time: 361 ms, Memory: 6.00MB OK (1 test, 1 assertion) 

I put this and other examples on github , if suddenly someone needs it.

Example number two. Preparation of data in the table


This example will be a little more interesting. Suppose we have a users users (id, name) table users (id, name) and we want to store in the users_active (id) table a list of currently active users.

We will have a team that every time will delete all records from the users_acitve table and add data there again.

 try { $connection->beginTransaction(); $connection->prepare("DELETE FROM users_active")->execute(); usleep(100000); // 0.1 sec $connection->prepare("INSERT INTO users_active (id) VALUES (3), (5), (6), (10)")->execute(); $connection->commit(); $output->writeln('<info>users_active refreshed</info>'); } catch (\Throwable $e) { $connection->rollBack(); throw $e; } 

Here, only at first glance, everything is fine. In fact, when running in parallel, we get an error again.

Let's write a test to reproduce it.

 use Mnvx\PProcess\AsyncTrait; use Mnvx\PProcess\Command\Command; use PHPUnit\Framework\TestCase; class DetectActiveUsersCommandTest extends TestCase { use AsyncTrait; public function testImport() { $asnycCommand = new Command( './cli app:detect-active-users', //   dirname(__DIR__), // ,      2 //     ); //   $this->assertAsyncCommand($asnycCommand); } } 

Run the test and see the error text:

 $ ./vendor/bin/phpunit tests/DetectActiveUsersCommandTest.php PHPUnit 6.1.1 by Sebastian Bergmann and contributors. F 1 / 1 (100%) Time: 287 ms, Memory: 4.00MB There was 1 failure: 1) DetectActiveUsersCommandTest::testImport Failed asserting that command ./cli app:detect-active-users (path: /var/www/pprocess-playground, count: 2) executed in parallel. Output: >>> Process 24717 started >>> Process 24718 started users_active refreshed <<< Process 24717 finished [Doctrine\DBAL\Exception\UniqueConstraintViolationException] An exception occurred while executing 'INSERT INTO users_active (id) VALUES (3), (5), (6), (10)': SQLSTATE[23505]: Unique violation: 7 :       "users_active_pkey" DETAIL:  "(id)=(3)"  . ------- app:detect-active-users <<< Process 24718 finished . /var/www/pprocess-playground/vendor/mnvx/pprocess/src/AsyncTrait.php:19 /var/www/pprocess-playground/tests/DetectActiveUsersCommandTest.php:19 FAILURES! Tests: 1, Assertions: 1, Failures: 1. 

In the text of the error, it is clear that again the INSERT is executed in parallel and this leads to undesirable consequences. Let's try to make a lock at the record level - add a line after the start of the transaction:

 $connection->prepare("SELECT id FROM users_active FOR UPDATE")->execute(); 

We start the test - the error is gone. Our test runs two instances of the process. Let's increase the number of copies in our test to 3 and see what happens.

 $asnycCommand = new Command( './cli app:detect-active-users', //   dirname(__DIR__), // ,      3 //     ); 

And again we have the same error. What's the matter, we added a lock? A little thought, you can guess that such a lock will only help if there are entries in the users_active table. In the case when 3 processes work at the same time, the picture is the following: the first process gets a lock. The second and third processes are waiting for the completion of the first process transaction. As soon as the transaction is completed, the second and third processes will continue to run in parallel, with undesirable consequences.

To fix it, let's make the lock more common. For example,

 $connection->prepare("SELECT id FROM users WHERE id IN (3, 5, 6, 10) FOR UPDATE")->execute(); 

Or instead of DELETE we could just use TRUNCATE , which blocks the entire table.

Example number three. Deadlock


It happens that the command itself does not lead to problems, but the simultaneous calling of two different teams working with the same resources leads to problems. Find the causes of such bugs is not easy. But if the reason is found, then the test is written exactly to avoid returning the problem in the future when making changes to the code.

Let's write a couple of such commands. This is a classic case of deadlock.

The first command first updates the record with id = 1, then with id = 2.

 try { $connection->beginTransaction(); $connection->prepare("UPDATE deadlock SET value = value + 1 WHERE id = 1")->execute(); usleep(100000); // 0.1 sec $connection->prepare("UPDATE deadlock SET value = value + 1 WHERE id = 2")->execute(); $connection->commit(); $output->writeln('<info>Completed without deadlocks</info>'); } catch (\Throwable $e) { $connection->rollBack(); throw $e; } 

The second command first updates the record with id = 2, then with id = 1.

 try { $connection->beginTransaction(); $connection->prepare("UPDATE deadlock SET value = value + 1 WHERE id = 2")->execute(); usleep(100000); // 0.1 sec $connection->prepare("UPDATE deadlock SET value = value + 1 WHERE id = 1")->execute(); $connection->commit(); $output->writeln('<info>Completed without deadlocks</info>'); } catch (\Throwable $e) { $connection->rollBack(); throw $e; } 

The test will look like this.

 use Mnvx\PProcess\AsyncTrait; use Mnvx\PProcess\Command\CommandSet; use PHPUnit\Framework\TestCase; class DeadlockCommandTest extends TestCase { use AsyncTrait; public function testImport() { $asnycCommand = new CommandSet( [ //   './cli app:deadlock-one', './cli app:deadlock-two' ], dirname(__DIR__), // ,      1 //     ); //   $this->assertAsyncCommands($asnycCommand); } } 

As a result of the test run, we will see the cause of the error:

 $ ./vendor/bin/phpunit tests/DeadlockCommandTest.php PHPUnit 6.1.1 by Sebastian Bergmann and contributors. F 1 / 1 (100%) Time: 1.19 seconds, Memory: 4.00MB There was 1 failure: 1) DeadlockCommandTest::testImport Failed asserting that commands ./cli app:deadlock-one, ./cli app:deadlock-two (path: /var/www/pprocess-playground, count: 1) executed in parallel. Output: >>> Process 5481 started: ./cli app:deadlock-one >>> Process 5481 started: ./cli app:deadlock-two [Doctrine\DBAL\Exception\DriverException] An exception occurred while executing 'UPDATE deadlock SET value = value + 1 WHERE id = 1': SQLSTATE[40P01]: Deadlock detected: 7 :   DETAIL:  5498    ShareLock  " 294 738";   5499.  5499    ShareLock  " 294737";    5498. HINT:      . CONTEXT:    (0,48)   "deadlock" ------- app:deadlock-two Completed without deadlocks <<< Process 5481 finished <<< Process 5484 finished . /var/www/pprocess-playground/vendor/mnvx/pprocess/src/AsyncTrait.php:39 /var/www/pprocess-playground/tests/DeadlockCommandTest.php:22 FAILURES! Tests: 1, Assertions: 1, Failures: 1. 

The problem is treated by adding a lock by analogy with the first example. Either by revising the structure of the database or the algorithm for working with data.

We summarize


If the code is executed in parallel, unexpected situations may arise, and if corrected, it is useful to write tests. We reviewed several of these situations and wrote tests using pprocess .

Source: https://habr.com/ru/post/327292/


All Articles