$pulsar = new \React\PublisherPulsar\Pulsar(); $publisherPulsarDto = new \React\PublisherPulsar\Inventory\PublisherPulsarDto(); $publisherPulsarDto->setModuleName('react:pulsar'); // , $publisherPulsarDto->setReplyStackCommandName('php artisan react:pulsar-reply-stack'); // , . , . Laravel $publisherPulsarDto->initDefaultPulsarSocketsParams(); $pulsar->setPublisherPulsarDto($publisherPulsarDto); $pulsar->manage();
$publisherPulsarDto->setPulsationIterationPeriod(1); // ( , ; ) $publisherPulsarDto->setSubscribersPerIteration(10); // , ( .. ; -/) $publisherPulsarDto->setPerformerContainerActionMaxExecutionTime(7); // $publisherPulsarDto->setLogger(\Log::getMonolog()); // StreamHandlers. set, Logger STDOUT $publisherPulsarDto->setMaxWaitReplyStackResult(7); // , subscribersPerIteration . , , "" $pulsarSocketsParams = new \React\PublisherPulsar\Inventory\PulsarSocketsParamsDto(); // $pulsarSocketsParams->setReplyToReplyStackSocketAddress('tcp://127.0.0.1:6271'); $pulsarSocketsParams->setPushToReplyStackSocketAddress('tcp://127.0.0.1:6272'); $pulsarSocketsParams->setPublishSocketAddress('tcp://127.0.0.1:6273'); $pulsarSocketsParams->setPullSocketAddress('tcp://127.0.0.1:6274'); $pulsarSocketsParams->setReplyStackSocketAddress('tcp://127.0.0.1:6275'); $publisherPulsarDto->setPulsarSocketsParams($pulsarSocketsParams); $pulsar->setPublisherPulsarDto($publisherPulsarDto); $pulsar->manage();
$replyStack = new \React\PublisherPulsar\ReplyStack(); $replyStack->startCommunication();
$performer = new \React\PublisherPulsar\Performer(); $performerDto = new \React\PublisherPulsar\Inventory\PerformerDto(); $performerDto->setModuleName("YourServiceNameContainingPerformer"); // $performer->setPerformerDto($performerDto); $performer->initDefaultPerformerSocketsParams(); $this->zmqPerformer = $performer;
$performerDto->setLogger(\Log::getMonolog()); $performerSocketParams = new \React\PublisherPulsar\Inventory\PerformerSocketsParamsDto(); // ZMQ- (Publish/Subscribe, Push/Pull, Request/Reply) $performerSocketParams->setPublisherPulsarSocketAddress('tcp://127.0.0.1:6273'); $performerSocketParams->setPushPulsarSocketAddress('tcp://127.0.0.1:6274'); $performerSocketParams->setRequestPulsarRsSocketAddress('tcp://127.0.0.1:6275'); $performerDto->setSocketsParams($performerSocketParams); $performer->setPerformerDto($performerDto); $this->zmqPerformer = $performer;
$this->zmqPerformer->connectToPulsarAndWaitPermissionToAct();
if (isUserRateLimitExceeded()) { $result = new ActionResultingPushDto(); $result->setActionCompleteCorrectly(false); $result->setSlowDown(true); $result->setErrorMessage($e->getMessage()); $result->setErrorReason(GaErrorResponsesConstants::USER_RATE_LIMIT_EXCEEDED); $this->zmqPerformer->pushActionResultInfo($result); } elseif (isDailyLimitExceeded()) { $result = new ActionResultingPushDto(); $result->setActionCompleteCorrectly(false); $sleepForPeriod = new ErrorSleepForPeriod(); $sleepForPeriod->setSleepPeriod((60 * 60 * 1000000)); $result->setSleepForPeriod($sleepForPeriod); $result->setErrorMessage($e->getMessage()); $result->setErrorReason(GaErrorResponsesConstants::DAILY_LIMIT_EXCEEDED); $this->zmqPerformer->pushActionResultInfo($result); } else { $this->zmqPerformer->pushActionResultInfoWithoutPulsarCorrectionBehavior(); }
$publisherToSubscribersDto = new YourNameExtendedByPublisherToSubscribersDto(); $publisherToSubscribersDto->setYourProperty(); $publisherPulsarDto->setPublisherToSubscribersDto($publisherToSubscribersDto);
Source: https://habr.com/ru/post/304536/
All Articles