account (id, name)
and the association of identifiers with the external system in the account_import (id, external_id)
table account_import (id, external_id)
. Let's outline a simple message receiving mechanism. $data = json_decode($jsonInput, true); // '{"id":1,"name":"account1"}' try { $connection->beginTransaction(); // , $stmt = $connection->prepare("SELECT id FROM account_import WHERE external_id = :external_id"); $stmt->execute([ ':external_id' => $data['id'], ]); $row = $stmt->fetch(); usleep(100000); // 0.1 sec // , if ($row) { $stmt = $connection->prepare("UPDATE account SET name = :name WHERE id = ( SELECT id FROM account_import WHERE external_id = :external_id )"); $stmt->execute([ ':name' => $data['name'], ':external_id' => $data['id'], ]); $accountId = $row['id']; } // else { $stmt = $connection->prepare("INSERT INTO account (name) VALUES (:name)"); $stmt->execute([ ':name' => $data['name'], ]); $accountId = $connection->lastInsertId(); $stmt = $connection->prepare("INSERT INTO account_import (id, external_id) VALUES (:id, :external_id)"); $stmt->execute([ ':id' => $accountId, ':external_id' => $data['id'], ]); } $connection->commit(); } catch (\Throwable $e) { $connection->rollBack(); throw $e; }
# , COMMAND=”echo -e '{\"id\":1,\"name\":\"account1\"}' | ./cli app:import” # PID- pids=() # results=() # () expects=() # stderr for i in $(seq 2) do eval $COMMAND 1>&2 & pids+=($!) ; echo -e '>>>' Process ${pids[i-1]} started 1>&2 done # $results for pid in "${pids[@]}" do wait $pid results+=($?) expects+=(0) echo -e '<<<' Process $pid finished 1>&2 done # result=`( IFS=$', '; echo "${results[*]}" )` expect=`( IFS=$', '; echo "${expects[*]}" )` if [ "$result" != "$expect" ] then exit 1 fi
AsyncTrait
to your test. use App\Command\Initializer; use Mnvx\PProcess\AsyncTrait; use Mnvx\PProcess\Command\Command; use PHPUnit\Framework\TestCase; use Symfony\Component\Console\Tester\CommandTester; class ImportCommandTest extends TestCase { use AsyncTrait; public function testImport() { $cli = Initializer::create(); $command = $cli->find('app:delete'); // c external_id = 1, // $commandTester = new CommandTester($command); $commandTester->execute([ 'externalId' => 1, ]); $asnycCommand = new Command( 'echo -e \'{"id":1,"name":"account1"}\' | ./cli app:import', // dirname(__DIR__), // , 2 // ); // $this->assertAsyncCommand($asnycCommand); } }
$ ./vendor/bin/phpunit PHPUnit 6.1.1 by Sebastian Bergmann and contributors. F 1 / 1 (100%) Time: 230 ms, Memory: 6.00MB There was 1 failure: 1) ImportCommandTest::testImport Failed asserting that command echo -e '{"id":1,"name":"account1"}' | ./cli app:import (path: /var/www/pprocess-playground, count: 2) executed in parallel. Output: >>> Process 18143 started >>> Process 18144 started Account 25 imported correctly [Doctrine\DBAL\Exception\UniqueConstraintViolationException] An exception occurred while executing 'INSERT INTO account_import (id, exte rnal_id) VALUES (:id, :external_id)' with params ["26", 1]: SQLSTATE[23505]: Unique violation: 7 : "account_import_pkey" DETAIL: "(external_id)=(1)" . ------- app:import <<< Process 18143 finished <<< Process 18144 finished . /var/www/pprocess-playground/vendor/mnvx/pprocess/src/AsyncTrait.php:19 /var/www/pprocess-playground/tests/ImportCommandTest.php:30 FAILURES! Tests: 1, Assertions: 1, Failures: 1.
$mutex = new FlockMutex(fopen(__FILE__, 'r')); $mutex->synchronized(function () use ($connection, $data) { // try });
$ ./vendor/bin/phpunit PHPUnit 6.1.1 by Sebastian Bergmann and contributors. . 1 / 1 (100%) Time: 361 ms, Memory: 6.00MB OK (1 test, 1 assertion)
users (id, name)
table users (id, name)
and we want to store in the users_active (id)
table a list of currently active users.users_acitve
table and add data there again. try { $connection->beginTransaction(); $connection->prepare("DELETE FROM users_active")->execute(); usleep(100000); // 0.1 sec $connection->prepare("INSERT INTO users_active (id) VALUES (3), (5), (6), (10)")->execute(); $connection->commit(); $output->writeln('<info>users_active refreshed</info>'); } catch (\Throwable $e) { $connection->rollBack(); throw $e; }
use Mnvx\PProcess\AsyncTrait; use Mnvx\PProcess\Command\Command; use PHPUnit\Framework\TestCase; class DetectActiveUsersCommandTest extends TestCase { use AsyncTrait; public function testImport() { $asnycCommand = new Command( './cli app:detect-active-users', // dirname(__DIR__), // , 2 // ); // $this->assertAsyncCommand($asnycCommand); } }
$ ./vendor/bin/phpunit tests/DetectActiveUsersCommandTest.php PHPUnit 6.1.1 by Sebastian Bergmann and contributors. F 1 / 1 (100%) Time: 287 ms, Memory: 4.00MB There was 1 failure: 1) DetectActiveUsersCommandTest::testImport Failed asserting that command ./cli app:detect-active-users (path: /var/www/pprocess-playground, count: 2) executed in parallel. Output: >>> Process 24717 started >>> Process 24718 started users_active refreshed <<< Process 24717 finished [Doctrine\DBAL\Exception\UniqueConstraintViolationException] An exception occurred while executing 'INSERT INTO users_active (id) VALUES (3), (5), (6), (10)': SQLSTATE[23505]: Unique violation: 7 : "users_active_pkey" DETAIL: "(id)=(3)" . ------- app:detect-active-users <<< Process 24718 finished . /var/www/pprocess-playground/vendor/mnvx/pprocess/src/AsyncTrait.php:19 /var/www/pprocess-playground/tests/DetectActiveUsersCommandTest.php:19 FAILURES! Tests: 1, Assertions: 1, Failures: 1.
$connection->prepare("SELECT id FROM users_active FOR UPDATE")->execute();
$asnycCommand = new Command( './cli app:detect-active-users', // dirname(__DIR__), // , 3 // );
users_active
table. In the case when 3 processes work at the same time, the picture is the following: the first process gets a lock. The second and third processes are waiting for the completion of the first process transaction. As soon as the transaction is completed, the second and third processes will continue to run in parallel, with undesirable consequences. $connection->prepare("SELECT id FROM users WHERE id IN (3, 5, 6, 10) FOR UPDATE")->execute();
DELETE
we could just use TRUNCATE
, which blocks the entire table. try { $connection->beginTransaction(); $connection->prepare("UPDATE deadlock SET value = value + 1 WHERE id = 1")->execute(); usleep(100000); // 0.1 sec $connection->prepare("UPDATE deadlock SET value = value + 1 WHERE id = 2")->execute(); $connection->commit(); $output->writeln('<info>Completed without deadlocks</info>'); } catch (\Throwable $e) { $connection->rollBack(); throw $e; }
try { $connection->beginTransaction(); $connection->prepare("UPDATE deadlock SET value = value + 1 WHERE id = 2")->execute(); usleep(100000); // 0.1 sec $connection->prepare("UPDATE deadlock SET value = value + 1 WHERE id = 1")->execute(); $connection->commit(); $output->writeln('<info>Completed without deadlocks</info>'); } catch (\Throwable $e) { $connection->rollBack(); throw $e; }
use Mnvx\PProcess\AsyncTrait; use Mnvx\PProcess\Command\CommandSet; use PHPUnit\Framework\TestCase; class DeadlockCommandTest extends TestCase { use AsyncTrait; public function testImport() { $asnycCommand = new CommandSet( [ // './cli app:deadlock-one', './cli app:deadlock-two' ], dirname(__DIR__), // , 1 // ); // $this->assertAsyncCommands($asnycCommand); } }
$ ./vendor/bin/phpunit tests/DeadlockCommandTest.php PHPUnit 6.1.1 by Sebastian Bergmann and contributors. F 1 / 1 (100%) Time: 1.19 seconds, Memory: 4.00MB There was 1 failure: 1) DeadlockCommandTest::testImport Failed asserting that commands ./cli app:deadlock-one, ./cli app:deadlock-two (path: /var/www/pprocess-playground, count: 1) executed in parallel. Output: >>> Process 5481 started: ./cli app:deadlock-one >>> Process 5481 started: ./cli app:deadlock-two [Doctrine\DBAL\Exception\DriverException] An exception occurred while executing 'UPDATE deadlock SET value = value + 1 WHERE id = 1': SQLSTATE[40P01]: Deadlock detected: 7 : DETAIL: 5498 ShareLock " 294 738"; 5499. 5499 ShareLock " 294737"; 5498. HINT: . CONTEXT: (0,48) "deadlock" ------- app:deadlock-two Completed without deadlocks <<< Process 5481 finished <<< Process 5484 finished . /var/www/pprocess-playground/vendor/mnvx/pprocess/src/AsyncTrait.php:39 /var/www/pprocess-playground/tests/DeadlockCommandTest.php:22 FAILURES! Tests: 1, Assertions: 1, Failures: 1.
Source: https://habr.com/ru/post/327292/
All Articles