📜 ⬆️ ⬇️

Asynchronous modes of gRPC framework and principles of their work in C ++

Once we decided to try gRPC for our tasks with our team. After some discussions, we came to the conclusion that we will use an asynchronous client and server. However, there was a working example from the documentation for one mode only. Examples of other modes of interaction, the basic principles of asynchronous operations, the principles of an asynchronous server and client in gRPC and much more under the cat.

Introduction


The purpose of this article is to explain the principles of operation of the asynchronous operations of the gRPC framework in C ++ for all four modes of interaction. Those readers who want to use asynchronous gRPC modes, but do not fully understand how they are implemented on the server and / or client, will be able to find an explanation below. Readers who understand the principles of asynchronous server and client, but do not have on hand working examples of all four modes of interaction, will also find examples in the article. Readers who understood everything and got working examples are asked in the comments.

The layout of the article is as follows:

  1. Differences between multi-threaded and asynchronous application models;
  2. Asynchronous application model for gRPC;
  3. Terminology used;
  4. Initial data;
  5. Examples of asynchronous interaction modes;
  6. Conclusion and list of references.

1. Differences between multithreaded and asynchronous application models


Consider the difference between multi-threaded and asynchronous application architectures.
When more than one client connects to the server, there are exactly two options for handling connections. The first option is to create a separate stream for each connection, i.e. servicing each client in its own thread. The second option is to use the asynchronous model of the application. The asynchronous model of the application is inextricably linked with the concept of an event loop. An event is some logical operation at the kernel level: a new connection appeared on the socket, a new message was received from the client, a message was sent to the client, etc. W.R. Stephens in his book "Unix: Network Application Development" about the asynchronous I / O model writes the following:
We tell the kernel to start the operation and notify us when the entire operation (including copying data from the kernel to our buffer) is complete.
Consider a simple example of a server that handles a client connection in three steps. The first stage is the processing of the connection. At this stage, there are no operations with the file system, I / O and DB. The second stage is the synchronous reading of data from the socket. And the third stage is the processing of the received data. Conditions are the same as in the first stage. For simplicity, we assume that the first and third stages take 1 ms, the second stage takes 2 ms.
Now let's say that several clients have connected to the server.
')
With a multi-threaded model, a separate thread is created for each connection. Each thread spends 4 ms to process a client request. However, when reading data from a socket, the thread begins to wait for the operation to complete. As long as data from the socket is not counted in the application buffer, the stream will be blocked. It turns out that 2ms of the total 4 ms - 50% of the time the thread is running - the processor will be idle.

To solve the problem of idle processor, came up with asynchronous I / O operations and the associated asynchronous application models.

The asynchronous model of the application is based on the event queue. When an event occurs, this event is placed at the end of the queue. While there are events in the queue, the processor will always be busy. I / O operations associated with events in the queue must be asynchronous.

In the example with a three-stage server, you need to replace the synchronous reading of data from the socket with an asynchronous one. Of course, as long as the data from the socket is not considered, we will not be able to proceed to the third stage. However, after placing the event of the end of reading data from the socket to the end of the queue, you can retrieve events from the queue and perform related operations. For example, for those 2 ms, while data is being read from the socket of the first client, you can handle the connection and request to read the data from the socket for the second and third clients. After that, the queue will receive the event of completion of reading data from the socket for the first client.

In general, in an asynchronous model, each client request may be slower than in a multi-threaded model. However, the overall performance when using the asynchronous model is higher due to a decrease in processor idle time.

At this point, you need to pay attention to a few points. The first point concerns the fact that all operations in the asynchronous model are performed in one thread. Unlike multi-threaded model. In connection with this, there is a second point: if resource-intensive operations are in the event queue, then during their execution the remaining operations will be idle in the queue. There are two options for solving this problem. The first option is to create separate threads for resource-intensive operations from the queue. The second option is to split resource-intensive operations into several parts that will be called from the queue.

2. Asynchronous application model for gRPC


When using synchronous RPC on the server, gRPC uses a multi-threaded application model. Each client request is processed in a separate thread.

When using asynchronous RPC, gRPC uses an asynchronous model of the application on both the server and the client. According to the principle of the asynchronous model, there is a queue of events. In gRPC, the event queue is implemented in the grpc :: CompletionQueue class. Adding events to the queue occurs inside gRPC, and requests to add and read events from the queue occur in the user application.

In gRPC terminology, responder (responder) is a “means of responding to a client” on the server, and a “means of receiving messages from a server” on the client.

A request to add an event to the event queue occurs when any function is called from the responder interface. The event, as before, is understood as a logically completed operation. For example, a client connect to server event means that the client has already connected to the server; the event of receiving a message from a client means that the message is already in the application's buffer, etc.

When requesting to add an event to a queue, each event is assigned a tag. This is done in order to distinguish events read from the queue. Strictly speaking, it is not the event that is physically read from the queue, but the tag associated with this event. For example, let the server make a request to add a message receiving event from a client and assign a tag = 1 to this event. When the tag = 1 returns from the queue, this will mean that a message receiving event from the client has occurred, i.e. message from client read to specified buffer.

3. Terminology used


There are four types of methods in gRPC that can be defined in a service.


At the terminology level, we will agree that the phrase “after the next receipt of the tag from the queue” will be omitted. For asynchronous gRPC operations, there is a rule: one responder operation (including creating and deleting) for one tag retrieval from the queue. It was said above that when calling the responder functions, a request is made to add an event to the event queue. Therefore, in the terminology of the asynchronous model, the rule will sound like this: one request to add an event to the queue for one reading of the event from the queue.

This means that it will be correct: wait for receiving the tag from the queue - call the function A of the responder - wait for receiving the tag from the queue - call the function B of the responder - wait for receiving the tag from the queue - delete the responder object. And any of the combinations where there is no “getting a tag from a queue” between function calls will be wrong, for example: wait for receiving a tag from a queue - call the A function of the responder - call the B function of the responder - wait for receiving the tag from the queue - delete the object. Therefore, the phrase “after the next receipt of the tag from the queue” will be omitted as follows.
At the terminology level, we will use “types of methods” and “modes of interaction” as synonyms.

4. Baseline


File helloworld.proto


We take the Greeter service as source data and add three methods to each type of interaction (file helloworld.proto ):

syntax = "proto3"; package helloworld; // The greeting service definition. service Greeter{ rpc SayHello (HelloRequest) returns (HelloReply) {} rpc GladToSeeMe(HelloRequest) returns (stream HelloReply){} rpc GladToSeeYou(stream HelloRequest) returns (HelloReply){} rpc BothGladToSee(stream HelloRequest) returns (stream HelloReply){} } // The request message containing the user's name. message HelloRequest { string name = 1; } // The response message containing the greetings message HelloReply { string message = 1; } 

The GladToSeeMe method implements ONE-MANY mode. The GladToSeeYou method implements the MANY-ONE mode. The BothGladToSee method implements the MANY-MANY mode.

Base class on the server


The only thing that will differ in all four types of methods is the mode of interaction with the client. The responder mode of the corresponding type is responsible for the interaction mode. All other parameters (service, message queue, context, etc.) in this case are common to all four types of methods. Therefore, common to all four modes of data are placed in the base abstract class, from which the classes of the corresponding modes will be inherited:

 class CommonCallData { public: Greeter::AsyncService* service_; ServerCompletionQueue* cq_; ServerContext ctx_; HelloRequest request_; HelloReply reply_; enum CallStatus { CREATE, PROCESS, FINISH }; CallStatus status_; std::string prefix; public: explicit CommonCallData(Greeter::AsyncService* service, ServerCompletionQueue* cq): service_(service), cq_(cq),status_(CREATE),prefix("Hello ") {} virtual ~CommonCallData(){} virtual void Proceed(bool = true) = 0; }; 

The abstract CommonCallData class contains data common to each of the modes (some of which are initialized in the constructor), a virtual destructor, and the pure virtual function Proceed ().

Base class on client


Solely for the convenience of reading the code, the classes for each type of interaction on the client will look similar to the corresponding classes on the server. Each of the interaction classes is inherited from the CommonAsyncClientCall base abstract class:

 class CommonAsyncClientCall { public: ClientContext context; HelloReply reply; enum CallStatus { CREATE, PROCESS, FINISH }; CallStatus callStatus ; Status status; void printReply(const char* from) { if(!reply.message().empty()) std::cout << "[" << from << "]: reply message = " << reply.message() << std::endl; else std::cout << "[" << from << "]: reply message empty" << std::endl; } explicit CommonAsyncClientCall():callStatus(CREATE){} virtual ~CommonAsyncClientCall(){} virtual void Proceed(bool = true) = 0; }; 

The CommonAsyncClientCall class contains data common for all types of interaction, a constructor, a virtual destructor, a purely virtual function Proceed () and a function for outputting a response from the server PrintReply ().

5. Examples of asynchronous interaction modes


One-to-one interaction mode


Code on the server


The implementation of the CallData class for one-to-one interaction mode will look like this:

 class CallData: public CommonCallData { ServerAsyncResponseWriter<HelloReply> responder_; public: CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq): CommonCallData(service, cq), responder_(&ctx_){Proceed();} virtual void Proceed(bool = true) override { if (status_ == CREATE) { std::cout << "[Proceed11]: New responder for 1-1 mode" << std::endl; status_ = PROCESS; service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_, this); } else if (status_ == PROCESS) { new CallData(service_, cq_); std::cout << "[Proceed11]: request message = " << request_.name() << std::endl; reply_.set_message(prefix + request_.name()); status_ = FINISH; responder_.Finish(reply_, Status::OK, this); } else { GPR_ASSERT(status_ == FINISH); std::cout << "[Proceed11]: Good Bye" << std::endl; delete this; } } }; 

The CallData class is inherited from the base class and implements the Proceed () function. From class members, only the responder of the corresponding type is present in the CallData class (ServerAsyncResponseWriter <HelloReply>). The arguments of the CallData constructor are passed to the constructor of the CommonCallData base class. Since all four classes of the corresponding modes will have their own implementation of Proceed (), the pointer void * tag must now be cast not to the type of CallData, but to the type of CommonCallData:

 while(true) { GPR_ASSERT(cq_->Next(&tag, &ok)); //GPR_ASSERT(ok); static_cast<CommonCallData*>(tag)->Proceed(ok); } 

Note that now the GPR_ASSERT(ok) check is commented out. This will be discussed below.

Client Code


The implementation of the AsyncClientCall class for the one-to-one interaction mode will look like this:

 class AsyncClientCall: public CommonAsyncClientCall { std::unique_ptr< ClientAsyncResponseReader<HelloReply> > responder; public: AsyncClientCall(const HelloRequest& request, CompletionQueue& cq_, std::unique_ptr<Greeter::Stub>& stub_):CommonAsyncClientCall() { std::cout << "[Proceed11]: new client 1-1" << std::endl; responder = stub_->AsyncSayHello(&context, request, &cq_); responder->Finish(&reply, &status, (void*)this); //callStatus = PROCESS ; } virtual void Proceed(bool ok = true) override { GPR_ASSERT(ok); if(status.ok()) printReply("Proceed11"); std::cout << "[Proceed11]: Good Bye" << std::endl; delete this; } }; 

The AsyncClientCall class is inherited from the CommonAsyncClientCall class, just as in the case of a server, has a responder that corresponds to the interaction mode, and implements the purely virtual function Proceed (). The code of the Proceed () function is the same as the code above for the corresponding mode, which was executed in the function GreeterClient :: AsyncCompleteRpc ().

Code that was previously in the function GreeterClient :: SayHello () is now in the constructor class AsyncClientCall. From the point of view of logic, such an implementation is not correct and is made to ensure the similarity with the implementation of the server. Further, all classes on the client will be implemented on the same principle. If you wish, you can always structure the code as it is done in the documentation.

Since we transferred the code from the GreeterClient :: SayHello () function to the AsyncClientCall class constructor, the GreeterClient :: SayHello () function code will look like this:

 void SayHello(const std::string& user) { HelloRequest request; request.set_name(user); new AsyncClientCall(request, cq_, stub_); } 

The code for the function GreeterClient :: AsyncCompleteRpc () now looks like this:
 void AsyncCompleteRpc() { void* got_tag; bool ok = false; while(cq_.Next(&got_tag, &ok)) { static_cast<CommonAsyncClientCall*>(got_tag)->Proceed(ok); } std::cout << "Completion queue is shutting down." << std::endl; } 

Here you can see the obvious similarities with the code on the server. Similarly, the tag returned from the queue is assigned to the new response from the server. Similarly, the tag is given to a pointer to the base class CommonAsyncClientCall, and the corresponding mode is also called the implementation of the function Proceed ().

One-to-one console and server console output


The console output of the server for one-to-one mode is as follows:

 [Proceed11]: New responder for 1-1 mode [Proceed11]: New responder for 1-1 mode [Proceed11]: request message = world [Proceed11]: Good Bye 

First, the first responder is created. After the request is received from the client, a second responder is created that will process the following requests. The request from the client contains the message “world”. A request is made to send a message to the client by the Finish () responder function. The next time an object is received in the FINISH state from the queue, the object is deleted.

Client console output for one-to-one mode is as follows:

 [Proceed11]: new client 1-1 [Proceed11]: reply message = Hello world [Proceed11]: Good Bye 

First, create a new client. When a message from the server is received, it is displayed on the screen. After this, the object is deleted.

One-to-many interaction mode


Code on the server


The CallData1M class, which is responsible for the one-to-many interaction mode on the server, looks like this:

 class CallData1M: public CommonCallData { ServerAsyncWriter<HelloReply> responder_;unsigned mcounter; bool new_responder_created; public: CallData1M(Greeter::AsyncService* service, ServerCompletionQueue* cq): CommonCallData(service, cq), responder_(&ctx_), mcounter, new_responder_created(false){ Proceed() ;} virtual void Proceed(bool = true) override { if(status_ == CREATE) { std::cout << "[Proceed1M]: New responder for 1-M mode" << std::endl; service_->RequestGladToSeeMe(&ctx_, &request_, &responder_, cq_, cq_, this); status_ = PROCESS ; } else if(status_ == PROCESS) { if(!new_responder_created) { new CallData1M(service_, cq_); new_responder_created = true ; std::cout << "[Proceed1M]: request message = " << request_.name() << std::endl; } static std::vector<std::string> greeting = {std::string(prefix + request_.name() + "!"), "I'm very glad to see you!", "Haven't seen you for thousand years.", "I'm server now. Call me later."}; if(mcounter >= greeting.size()) { std::cout << "[Proceed1M]: Trying finish" << std::endl; status_ = FINISH; responder_.Finish(Status(), (void*)this); } else { reply_.set_message(greeting.at(mcounter)); std::cout << "[Proceed1M]: Writing" << std::endl; responder_.Write(reply_, (void*)this); ++mcounter; } } else // if(status_ == FINISH) { std::cout << "[Proceed1M]: Good Bye" << std::endl; delete this; } } }; 

The CallData1M class is inherited from the CommonCallData base class and contains three class members:


All members of the class are initialized in the initialization list of the constructor, after which the implementation of the Proceed () function is called. Initially, all responders have the CREATE state, so we fall into the if(status_ == CREATE) condition if(status_ == CREATE) , in which we indicate the current responder as a tag for a new client connection event. After that, the responder goes to the PROCESS state. When the next client request is received, the tag associated with this event will be returned from the queue. The tag received from the queue is initialized with a pointer to the object of the responder with the status PROCESS. Therefore, we will fall into the else if(status_ == PROCESS) condition else if(status_ == PROCESS) . When this condition is first hit, a new responder object will be created to handle the following client requests. After that, a static vector is created with server responses. Initially, the mcounter variable is zero, so we are in the else condition. In the else condition, the response text is set to the message from the response vector under the mcounter number, after which the response is sent to the client and the mcounter variable is incremented.

Sending messages to the client will occur exactly (greeting.size () - 1) times. We cannot send all the answers at once, because it contradicts the principles of the asynchronous model, on the basis of which asynchronous modes are implemented in gRPC. From the documentation:
Only one write may be outstanding at any given time. This is what you get.
which means something like the following: Only one recording can be performed at a time, i.e. After calling the Write function, you need to wait for getting the tag from the queue BEFORE calling the Write function again. That is why the objects of class CallData1M store information about the number of sent messages.

After all the messages from the vector are sent, we will get into the condition if (mcounter> = greeting.size ()). In this condition, the state of the responder is changed to FINISH, after which the Finish () function is called.

The next time in the Proceed () function we will get into the else if(status_ == FINISH) condition in which the object will be deleted.

To create the very first object of the CallData1M class, add the line to the Run () function:

 new CallData1M(&service_, cq_.get()); 

Client Code


The AsyncCallData1M class, which is responsible for the one-to-many interaction mode on the client, looks like this:

 class AsyncClientCall1M : public CommonAsyncClientCall { std::unique_ptr< ClientAsyncReader<HelloReply> > responder; public: AsyncClientCall1M(const HelloRequest& request, CompletionQueue& cq_, std::unique_ptr<Greeter::Stub>& stub_) :CommonAsyncClientCall() { std::cout << "[Proceed1M]: new client 1-M" << std::endl; responder = stub_->AsyncGladToSeeMe(&context, request, &cq_, (void*)this); callStatus = PROCESS ; } virtual void Proceed(bool ok = true) override { if(callStatus == PROCESS) { if(!ok) { std::cout << "[Proceed1M]: Trying finish" << std::endl; responder->Finish(&status, (void*)this); callStatus = FINISH; return ; } responder->Read(&reply, (void*)this); printReply("Proceed1M"); } else if(callStatus == FINISH) { std::cout << "[Proceed1M]: Good Bye" << std::endl; delete this; } return ; } }; 

The AsyncClientCall1M class is inherited from CommonAsyncClientCall and contains the corresponding type of responder. In the class constructor, a request is made to the server and the responder goes to the PROCESS state.

When an object of this class returns from the queue, its status will be PROCESS. The ok flag received from the queue associated with the tag is passed to the Proceed () function. The value of the ok flag equal to false means that there will be no more messages from the server, i.e. The Finish () function was called on the server. When the client-side function needs to be called, the Finish () function is described in the documentation :
It is appropriate to call it asyncReaderInterface :: Read that message when it was received. result, eg cq-> Next (& read_tag, & ok) filled in 'ok' with 'false').
which means something like the following: you need to call this method [Finish ()] when there are no more messages from the server (this can be found implicitly from the code or explicitly when a read request (AsyncReaderInterface :: Read) returns a bad result, i.e. ok , returned from the cq->Next(&read_tag, &ok) queue cq->Next(&read_tag, &ok) will be false).

If ok is true, then we make the next request for reading messages by calling the Read () responder function. Regarding the Read function, the documentation states the following:
Read a message of type R into msg. Completion will be noted on the associated completion queue.
which means: [The Read () function] reads a message of type R into the variable msg. A notification of the completion of the read operation will be made by the tag from the queue.

After the first call of the Read () function, the variable containing the reply (reply) will be still empty, since the event of the end of reading the message has not yet occurred.

When all messages from the server have been read, the ok flag will return from the queue with a value of false, the responder’s Finish () function will be called, after which the responder will go to FINISH state. After receiving from the queue an object of a class with the state FINISH, this class will be deleted.

To create an object of the AsyncClientCall1M class, in the GreeterClient class you need to add the GladToSeeMe () function:

 void GladToSeeMe(const std::string& user) { HelloRequest request; request.set_name(user); new AsyncClientCall1M(request, cq_, stub_); } 

One-to-many console output for server and client


The console output of the server for the one-to-many mode is as follows:

 [Proceed1M]: New responder for 1-M mode [Proceed1M]: New responder for 1-M mode [Proceed1M]: request message = client [Proceed1M]: Writing [Proceed1M]: Writing [Proceed1M]: Writing [Proceed1M]: Writing [Proceed1M]: Trying finish [Proceed1M]: Good Bye 

First, the first responder is created. After the request is received from the client, a second responder is created that will process the following requests. The request from the client contains the message “client”. After that, four responses are sent to the client, after which the Finish () responder function is called. The next time an object is received in the FINISH state from the queue, the object is deleted.
Client console output for one-to-many mode looks like this:
 [Proceed1M]: new client 1-M [Proceed1M]: reply message empty [Proceed1M]: reply message = Hello client! [Proceed1M]: reply message = I'm very glad to see you! [Proceed1M]: reply message = Haven't seen you for thousand years. [Proceed1M]: reply message = I'm server now. Call me later. [Proceed1M]: Trying finish [Proceed1M]: Good Bye 

First, create a new client. After the first call of the Read () responder function, the variable containing the reply is not yet filled, so the message “reply message empty” is displayed. The client then receives 4 messages from the server, after which the ok flag is returned from the queue, which is false. The Finder () responder function is called. When you next get the object from the queue, the object is deleted.

Interaction mode many to one


Code on the server


The CallDataM1 class, responsible for the many-to-one interaction mode on the server, is as follows:

 class CallDataM1: public CommonCallData { ServerAsyncReader<HelloReply, HelloRequest> responder_; bool new_responder_created; public: CallDataM1(Greeter::AsyncService* service, ServerCompletionQueue* cq): CommonCallData(service, cq), responder_(&ctx_), new_responder_created(false){Proceed();} virtual void Proceed(bool ok = true) override { if(status_ == CREATE) { std::cout << "[ProceedM1]: New responder for M-1 mode" << std::endl; status_ = PROCESS ; service_->RequestGladToSeeYou(&ctx_, &responder_, cq_, cq_, this); } else if(status_ == PROCESS) { if(!new_responder_created) { new CallDataM1(service_, cq_); new_responder_created = true ; } //It's time to send reply if(!ok) { std::string greeting("Hello, Client!"); reply_.set_message(greeting); std::cout << "[ProceedM1]: Sending reply" << std::endl; status_ = FINISH; responder_.Finish(reply_, Status(), (void*)this); return ; } responder_.Read(&request_, (void*)this); if(!request_.name().empty()) std::cout << "[ProceedM1]: request message =" << request_.name() << std::endl; } else // if(status_ == FINISH) { std::cout << "[ProceedM1]: Good Bye" << std::endl; delete this; } } }; 

The CallDataM1 class is inherited from the CommonCallData base class and contains two class members:


All members of the class are initialized in the initialization list of the constructor, after which the implementation of the Proceed () function is called. In the CREATE state, the request for processing the next client request with the responder tag is also given. After that, the responder goes to the PROCESS state. The code in the else if(status_ == PROCESS) condition of this class is very similar to the client class class ClientAsyncCall1M.

In fact, the working principle of the ClientAsyncCall1M client class and the CallDataM1 server class are exactly the same. The server reads client requests until a tag with the ok flag is set to false from the queue. Just as in the one-to-many client class mode, an ok flag with a value of false means that there will be no more messages from the client. After the ok flag with the value false is received, the Finish responder function is called and the responder enters the FINISH state. When an object is received from the queue in the FINISH state, the object is deleted.

To create the very first object of the CallDataM1 class, add the line to the Run () function:

 new CallDataM1(&service_, cq_.get()); 


Client Code


The AsyncClientCallM1 class, which is responsible for the many-to-one interaction mode on the client, looks like this:

 class AsyncClientCallM1 : public CommonAsyncClientCall { std::unique_ptr< ClientAsyncWriter<HelloRequest> > responder; unsigned mcounter; bool writing_mode_; public: AsyncClientCallM1(CompletionQueue& cq_, std::unique_ptr<Greeter::Stub>& stub_): CommonAsyncClientCall(), mcounter,writing_mode_(true) { std::cout << "[ProceedM1]: new client M-1" << std::endl; responder = stub_->AsyncGladToSeeYou(&context, &reply, &cq_, (void*)this); callStatus = PROCESS ; } virtual void Proceed(bool ok = true) override { if(callStatus == PROCESS) { if(writing_mode_) { static std::vector<std::string> greeting = {"Hello, server!", "Glad to see you!", "Haven't seen you for thousand years!", "I'm client now. Call me later."}; if(mcounter < greeting.size()) { HelloRequest request; request.set_name(greeting.at(mcounter)); std::cout << "[ProceedM1]: Writing" << std::endl; responder->Write(request, (void*)this); ++mcounter ; } else { responder->WritesDone((void*)this); std::cout << "[ProceedM1]: changing state to reading" << std::endl; writing_mode_ = false; return; } } else//reading mode { std::cout << "[ProceedM1]: trying finish" << std::endl; responder->Finish(&status, (void*)this); callStatus = FINISH ; } } else if(callStatus == FINISH) { assert(!reply.message().empty()); printReply("ProceedM1"); std::cout << "[ProceedM1]: Good Bye" << std::endl; delete this; } return ; } }; 

The AsyncClientCallM1 class is inherited from the CommonAsyncClientCall class and has a corresponding responder and a message counter. In addition to these two members, the AsyncClientCallM1 class has a status flag. The principle of operation of the Proceed () function in the PROCESS state in the write mode ( if(writing_mode_) condition if(writing_mode_) ) coincides with the principle of the Proceed () function of the CallData1M class with the difference that here after sending all the messages the WritesDone () responder function is called, whereas CallData1M class calls the Finish () function. After calling the responder function WritesDone (), the client enters the reading state ( writing_mode_ = false ). In the read state, the Finish () responder function is called and the responder enters the FINISH state.

Here you should pay attention to the fact that here, in addition to calling the responder function WritesDone (), there is a call to the responder function Finish (). The documentation on this subject states the following:
It is appropriate to call upon this method. ClientAsyncWriterInterface :: WritesDone or ClientAsyncReaderWriterInterface :: WritesDone)
which means something like the following: You need to call this method (Finish ()) when the client does not have more messages to send to the server (which can be declared implicitly simply by calling this method, or explicitly by calling the WritesDone () method). But since the AsyncClientCallM1 class sends several messages to the server by calling the Write () method, the Respones method of WritesDone () must be called explicitly to inform the server that there will be no more messages (unlike the AsyncClientCall1M class, which sends only one request to the server when calling the AsyncGladToSeeMe ()) function. Therefore, after calling the WritesDone () method, the responder enters the read state and the Finish () function is called in it. When an object is received from the queue in the FINISH state, the response from the server is displayed on the screen and the object is deleted.

To create an object of class AsyncClientCallM1, in the GreeterClient class, add the GladToSeeYou () function:

 void GladToSeeYou() { new AsyncClientCallM1(cq_, stub_); } 

Console output of server and client in many to one mode


The console output of the server for many-to-one mode is as follows:

 [ProceedM1]: New responder for M-1 mode [ProceedM1]: New responder for M-1 mode [ProceedM1]: request message = Hello, server! [ProceedM1]: request message = Glad to see you! [ProceedM1]: request message = Haven't seen you for thousand years! [ProceedM1]: request message = I'm client now. Call me later. [ProceedM1]: Sending reply [ProceedM1]: Good Bye 

. , , , . . Read(). , AsyncClientCall1M . ok, false, , WritesDone(). Finish() . FINISH , .

:

 [ProceedM1]: new client M-1 [ProceedM1]: Writing [ProceedM1]: Writing [ProceedM1]: Writing [ProceedM1]: Writing [ProceedM1]: changing state to reading [ProceedM1]: trying finish [ProceedM1]: reply message = Hello, Client! [ProceedM1]: Good Bye 

. e , WritesDone() . (PROCESS), Finish() FINISH. FINISH, .



CallDataMM, :

 class CallDataMM: public CommonCallData { ServerAsyncReaderWriter<HelloReply, HelloRequest> responder_; unsigned mcounter; bool writing_mode_; bool new_responder_created; public: CallDataMM(Greeter::AsyncService* service, ServerCompletionQueue* cq): CommonCallData(service, cq), responder_(&ctx_), mcounter, writing_mode_(false), new_responder_created(false){Proceed();} virtual void Proceed(bool ok = true) override { if(status_ == CREATE) { std::cout << "[ProceedMM]: New responder for MM mode" << std::endl; status_ = PROCESS ; service_->RequestBothGladToSee(&ctx_, &responder_, cq_, cq_, this); } else if(status_ == PROCESS) { if(!new_responder_created) { new CallDataMM(service_, cq_); new_responder_created = true ; } if(!writing_mode_)//reading mode { if(!ok) { writing_mode_ = true; ok = true; std::cout << "[ProceedMM]: changing state to writing" << std::endl; } else { responder_.Read(&request_, (void*)this); if(!request_.name().empty()) std::cout << "[ProceedMM]: request message =" << request_.name() << std::endl; } } if(writing_mode_)//writing mode { static std::vector<std::string> greeting = {std::string(prefix + "client" "!"), "I'm very glad to see you!", "Haven't seen you for thousand years.", "How are you?", "I'm server now. Call me later."}; if(!ok || mcounter >= greeting.size()) { std::cout << "[ProceedMM]: Trying finish" << std::endl; status_ = FINISH; responder_.Finish(Status(), (void*)this); } else { reply_.set_message(greeting.at(mcounter)); responder_.Write(reply_, (void*)this); ++mcounter; } } } else // if(status_ == FINISH) { std::cout << "[ProceedMM]: Good Bye" << std::endl; delete this; } } }; 

CallDataMM CommonCallData, :


, Proceed(). CREATE . PROCESS. Proceed() PROCESS — . , : CallData1M CallDataM1 . if(!writing_mode_) . CallDataM1 , ok false. ok false , writing_mode_ = true; . , , . , .

(greeting.size()-1) . Finish() FINISH. FINISH, .

CallDataM, Run() :

 new CallDataM(&service_, cq_.get()); 


AsyncClientCallMM, :

 class AsyncClientCallMM : public CommonAsyncClientCall { std::unique_ptr< ClientAsyncReaderWriter<HelloRequest,HelloReply> > responder; unsigned mcounter; bool writing_mode_; public: AsyncClientCallMM(CompletionQueue& cq_, std::unique_ptr<Greeter::Stub>& stub_): CommonAsyncClientCall(), mcounter, writing_mode_(true) { std::cout << "[ProceedMM]: new client MM" << std::endl; responder = stub_->AsyncBothGladToSee(&context, &cq_, (void*)this); callStatus = PROCESS ; } virtual void Proceed(bool ok = true) override { if(callStatus == PROCESS) { if(writing_mode_) { static std::vector<std::string> greeting = {"Hello, server!", "Glad to see you!", "Haven't seen you for thousand years!", "I'm client now. Call me later."}; //std::cout << "[ProceedMM]: mcounter = " << mcounter << std::endl; if(mcounter < greeting.size()) { HelloRequest request; request.set_name(greeting.at(mcounter)); responder->Write(request, (void*)this); ++mcounter; } else { responder->WritesDone((void*)this); std::cout << "[ProceedMM]: changing state to reading" << std::endl; writing_mode_ = false; } return ; } else //reading mode { if(!ok) { std::cout << "[ProceedMM]: trying finish" << std::endl; callStatus = FINISH; responder->Finish(&status, (void*)this); return; } responder->Read(&reply, (void*)this); printReply("ProceedMM"); } return; } else if(callStatus == FINISH) { std::cout << "[ProceedMM]: Good Bye" << std::endl; delete this; } } }; 

AsyncClientCallMM CommonAsyncClientCall, , /. CallDataMM , Proceed() AsyncClientCallMM PROCESS : . , : AsyncClientCallM1 AsyncClientCall1M .

if(writing_mode_) . (greeting.size() — 1) , WritesDone() . Proceed(), CallDataMM , . , , ok false. ok false , Finish() FINISH. FINISH, .

AsyncClientCallMM, GreeterClient BothGladToSee():

 void BothGladToSee() { new AsyncClientCallMM(cq_, stub_); } 


The console output of the server for many-to-many mode is as follows:

 [ProceedMM]: New responder for MM mode [ProceedMM]: New responder for MM mode [ProceedMM]: request message = Hello, server! [ProceedMM]: request message = Glad to see you! [ProceedMM]: request message = Haven't seen you for thousand years! [ProceedMM]: request message = I'm client now. Call me later. [ProceedMM]: changing state to writing [ProceedMM]: Trying finish [ProceedMM]: Good Bye 

. , , , . . Read(). ok, false, , WritesDone(). . , Finish() FINISH. FINISH , .

:

 [ProceedMM]: new client MM [ProceedMM]: changing state to reading [ProceedMM]: reply message empty [ProceedMM]: reply message = Hello client! [ProceedMM]: reply message = I'm very glad to see you! [ProceedMM]: reply message = Haven't seen you for thousand years. [ProceedMM]: reply message = How are you? [ProceedMM]: reply message = I'm server now. Call me later. [ProceedMM]: trying finish [ProceedMM]: Good Bye 

. e , WritesDone() . , ok false, Finish(). Finish() FINISH. FINISH .

6.


, , gRPC . . , , , , . , , . - , !

C :


Source: https://habr.com/ru/post/340758/


All Articles