📜 ⬆️ ⬇️

Process management. Do not exceed RPS API limit (QPS)

image

Structural and functional module diagram (clickable)

I want to talk about the developed module Publisher Pulsar ( github ), which allows you to dynamically synchronize the actions of processes.

As a special case of its use:
')
For example, there are many (tens or hundreds, thousands) processes that independently access the Google Analytics API.

At the same time, GA set a limit of 10 queries per second from a single IP.

If you do not regulate appeals to the API
then some processes will repeatedly return the error of exceeding the limit, and either they will not be able to perform their task without the API data, or they will try again and again to get the data in a loop, creating a problem for other processes, increasing the number of errors. That is, there will be chaos and lack of predictability in terms of the percentage of correctly received data and the percentage of errors when accessing the API.

And the problem arises of avoiding exceeding the RPS limit (QPS) so that all processes can correctly receive data.

Considering that GA calls can be made randomly from various third-party processes using the same IP addresses.

And thus, the question arises - to use more expensive redundancy, or put more modest hard limits in the static token bucket - https://github.com/bandwidth-throttle/token-bucket .

This module is just coping with the achievement of a balance of redundancy and maximum efficiency, dynamically adjusting the parameters of its activity (being a variant of the implementation of a dynamic token bucket).


The system is designed to function like a pulsar - to make regular ("pulsing") sending to subscribers.

The general structure of actions for using the module can be described as:

1. Specify the parameters and run the pulsar as a demon.

2. Configure the process (Service) code accessing the API (note - performing any action that needs to be synchronized) for a connection to the Pulsar so that before performing the action (for example, making a request to the API), the process would access the Pulsar and wait for permission to perform an action. And only after receiving permission to perform it.

As a result, the Pulsar according to the settings at the same time only allows subscribers to be [for example] 10 processes (which left the FIFO stack; that is, 10 were allowed to become subscribers, and the remaining N are in the ZMQ queue).

And after the required number of processes has become subscribers, permission is sent to them, after which they can perform their action (for example, access the API).

Thus, the limit will be observed regardless of the number of processes running in parallel (within the capabilities of the ZMQ stack).

3. After that, the subscriber (performer) must send a message to the Pulsar about the action taken: whether there are any errors or everything is in order.

Since if during the execution of an action there are errors related to the number of simultaneously performed actions, then the Pulsar can correct its behavior temporarily, until the situation normalizes (errors disappear) to reduce the number of subscribers, to increase the interval between publications (action permissions), or even temporarily stop working ( in case of an error requiring a break in actions, for example, exceeding the daily limit of 403 DailyLimitExceeded ).

1) Setting up and starting a Pulsar:

There is a mode out of the box and setting mode. Out of the box, it also assumes default socket addresses, and this can only be appropriate if the Pulsar and the workers are started on the same machine.

From the box:
$pulsar = new \React\PublisherPulsar\Pulsar(); $publisherPulsarDto = new \React\PublisherPulsar\Inventory\PublisherPulsarDto(); $publisherPulsarDto->setModuleName('react:pulsar'); // ,    $publisherPulsarDto->setReplyStackCommandName('php artisan react:pulsar-reply-stack'); //   ,     .      ,    .         Laravel $publisherPulsarDto->initDefaultPulsarSocketsParams(); $pulsar->setPublisherPulsarDto($publisherPulsarDto); $pulsar->manage(); 

And customization options:
 $publisherPulsarDto->setPulsationIterationPeriod(1); //     (     ,     ;       ) $publisherPulsarDto->setSubscribersPerIteration(10); //  ,      ( .. ;           -/) $publisherPulsarDto->setPerformerContainerActionMaxExecutionTime(7); //            $publisherPulsarDto->setLogger(\Log::getMonolog()); //    StreamHandlers.    set,    Logger     STDOUT $publisherPulsarDto->setMaxWaitReplyStackResult(7); //       ,    subscribersPerIteration .          ,       ,       ""    $pulsarSocketsParams = new \React\PublisherPulsar\Inventory\PulsarSocketsParamsDto(); //     $pulsarSocketsParams->setReplyToReplyStackSocketAddress('tcp://127.0.0.1:6271'); $pulsarSocketsParams->setPushToReplyStackSocketAddress('tcp://127.0.0.1:6272'); $pulsarSocketsParams->setPublishSocketAddress('tcp://127.0.0.1:6273'); $pulsarSocketsParams->setPullSocketAddress('tcp://127.0.0.1:6274'); $pulsarSocketsParams->setReplyStackSocketAddress('tcp://127.0.0.1:6275'); $publisherPulsarDto->setPulsarSocketsParams($pulsarSocketsParams); $pulsar->setPublisherPulsarDto($publisherPulsarDto); $pulsar->manage(); 

ReplyStack script code:

 $replyStack = new \React\PublisherPulsar\ReplyStack(); $replyStack->startCommunication(); 

Note: it is important that the Pulsar be started before the processes connecting to it, otherwise the processes will knock into the void on addresses that are not yet connected to the Pulsar, and will simply hang waiting for a response that will never come.

2) Setting the artist (subscriber) code:

We include the module pack's Performer object in the process code:

From the box:
 $performer = new \React\PublisherPulsar\Performer(); $performerDto = new \React\PublisherPulsar\Inventory\PerformerDto(); $performerDto->setModuleName("YourServiceNameContainingPerformer"); //          $performer->setPerformerDto($performerDto); $performer->initDefaultPerformerSocketsParams(); $this->zmqPerformer = $performer; 

And customization options:
 $performerDto->setLogger(\Log::getMonolog()); $performerSocketParams = new \React\PublisherPulsar\Inventory\PerformerSocketsParamsDto(); //         ZMQ- (Publish/Subscribe, Push/Pull, Request/Reply) $performerSocketParams->setPublisherPulsarSocketAddress('tcp://127.0.0.1:6273'); $performerSocketParams->setPushPulsarSocketAddress('tcp://127.0.0.1:6274'); $performerSocketParams->setRequestPulsarRsSocketAddress('tcp://127.0.0.1:6275'); $performerDto->setSocketsParams($performerSocketParams); $performer->setPerformerDto($performerDto); $this->zmqPerformer = $performer; 

And then in the necessary place, before calling the target action that requires synchronization / coordination, we call the method responsible for obtaining permission from the Pulsar:

 $this->zmqPerformer->connectToPulsarAndWaitPermissionToAct(); 


3) After the execution of the target action, it is necessary to send the resulting message about whether errors have occurred. For example in this form:

 if (isUserRateLimitExceeded()) { $result = new ActionResultingPushDto(); $result->setActionCompleteCorrectly(false); $result->setSlowDown(true); $result->setErrorMessage($e->getMessage()); $result->setErrorReason(GaErrorResponsesConstants::USER_RATE_LIMIT_EXCEEDED); $this->zmqPerformer->pushActionResultInfo($result); } elseif (isDailyLimitExceeded()) { $result = new ActionResultingPushDto(); $result->setActionCompleteCorrectly(false); $sleepForPeriod = new ErrorSleepForPeriod(); $sleepForPeriod->setSleepPeriod((60 * 60 * 1000000)); $result->setSleepForPeriod($sleepForPeriod); $result->setErrorMessage($e->getMessage()); $result->setErrorReason(GaErrorResponsesConstants::DAILY_LIMIT_EXCEEDED); $this->zmqPerformer->pushActionResultInfo($result); } else { $this->zmqPerformer->pushActionResultInfoWithoutPulsarCorrectionBehavior(); } 

This is also true for the case if only part of the processes are connected to the pulsar, and the other is working in an arbitrary chaotic form. And this mechanics will reduce the number of errors created by the joint activity of ordered and unordered processes.

***

In this case, as already mentioned, the module can be used for any periodic transfer of information to the processes. To do this, it is enough to initialize your class, inherited from PublisherToSubscribersDto, which contains the control logic of the processes that will receive it.

That is, when initializing the daemon in paragraph 1) add:

 $publisherToSubscribersDto = new YourNameExtendedByPublisherToSubscribersDto(); $publisherToSubscribersDto->setYourProperty(); $publisherPulsarDto->setPublisherToSubscribersDto($publisherToSubscribersDto); 

And this object will be transferred to processes.

Source: https://habr.com/ru/post/304536/


All Articles