We tell the kernel to start the operation and notify us when the entire operation (including copying data from the kernel to our buffer) is complete.Consider a simple example of a server that handles a client connection in three steps. The first stage is the processing of the connection. At this stage, there are no operations with the file system, I / O and DB. The second stage is the synchronous reading of data from the socket. And the third stage is the processing of the received data. Conditions are the same as in the first stage. For simplicity, we assume that the first and third stages take 1 ms, the second stage takes 2 ms.
syntax = "proto3"; package helloworld; // The greeting service definition. service Greeter{ rpc SayHello (HelloRequest) returns (HelloReply) {} rpc GladToSeeMe(HelloRequest) returns (stream HelloReply){} rpc GladToSeeYou(stream HelloRequest) returns (HelloReply){} rpc BothGladToSee(stream HelloRequest) returns (stream HelloReply){} } // The request message containing the user's name. message HelloRequest { string name = 1; } // The response message containing the greetings message HelloReply { string message = 1; }
class CommonCallData { public: Greeter::AsyncService* service_; ServerCompletionQueue* cq_; ServerContext ctx_; HelloRequest request_; HelloReply reply_; enum CallStatus { CREATE, PROCESS, FINISH }; CallStatus status_; std::string prefix; public: explicit CommonCallData(Greeter::AsyncService* service, ServerCompletionQueue* cq): service_(service), cq_(cq),status_(CREATE),prefix("Hello ") {} virtual ~CommonCallData(){} virtual void Proceed(bool = true) = 0; };
class CommonAsyncClientCall { public: ClientContext context; HelloReply reply; enum CallStatus { CREATE, PROCESS, FINISH }; CallStatus callStatus ; Status status; void printReply(const char* from) { if(!reply.message().empty()) std::cout << "[" << from << "]: reply message = " << reply.message() << std::endl; else std::cout << "[" << from << "]: reply message empty" << std::endl; } explicit CommonAsyncClientCall():callStatus(CREATE){} virtual ~CommonAsyncClientCall(){} virtual void Proceed(bool = true) = 0; };
class CallData: public CommonCallData { ServerAsyncResponseWriter<HelloReply> responder_; public: CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq): CommonCallData(service, cq), responder_(&ctx_){Proceed();} virtual void Proceed(bool = true) override { if (status_ == CREATE) { std::cout << "[Proceed11]: New responder for 1-1 mode" << std::endl; status_ = PROCESS; service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_, this); } else if (status_ == PROCESS) { new CallData(service_, cq_); std::cout << "[Proceed11]: request message = " << request_.name() << std::endl; reply_.set_message(prefix + request_.name()); status_ = FINISH; responder_.Finish(reply_, Status::OK, this); } else { GPR_ASSERT(status_ == FINISH); std::cout << "[Proceed11]: Good Bye" << std::endl; delete this; } } };
while(true) { GPR_ASSERT(cq_->Next(&tag, &ok)); //GPR_ASSERT(ok); static_cast<CommonCallData*>(tag)->Proceed(ok); }
GPR_ASSERT(ok)
check is commented out. This will be discussed below. class AsyncClientCall: public CommonAsyncClientCall { std::unique_ptr< ClientAsyncResponseReader<HelloReply> > responder; public: AsyncClientCall(const HelloRequest& request, CompletionQueue& cq_, std::unique_ptr<Greeter::Stub>& stub_):CommonAsyncClientCall() { std::cout << "[Proceed11]: new client 1-1" << std::endl; responder = stub_->AsyncSayHello(&context, request, &cq_); responder->Finish(&reply, &status, (void*)this); //callStatus = PROCESS ; } virtual void Proceed(bool ok = true) override { GPR_ASSERT(ok); if(status.ok()) printReply("Proceed11"); std::cout << "[Proceed11]: Good Bye" << std::endl; delete this; } };
void SayHello(const std::string& user) { HelloRequest request; request.set_name(user); new AsyncClientCall(request, cq_, stub_); }
void AsyncCompleteRpc() { void* got_tag; bool ok = false; while(cq_.Next(&got_tag, &ok)) { static_cast<CommonAsyncClientCall*>(got_tag)->Proceed(ok); } std::cout << "Completion queue is shutting down." << std::endl; }
[Proceed11]: New responder for 1-1 mode [Proceed11]: New responder for 1-1 mode [Proceed11]: request message = world [Proceed11]: Good Bye
[Proceed11]: new client 1-1 [Proceed11]: reply message = Hello world [Proceed11]: Good Bye
class CallData1M: public CommonCallData { ServerAsyncWriter<HelloReply> responder_;unsigned mcounter; bool new_responder_created; public: CallData1M(Greeter::AsyncService* service, ServerCompletionQueue* cq): CommonCallData(service, cq), responder_(&ctx_), mcounter, new_responder_created(false){ Proceed() ;} virtual void Proceed(bool = true) override { if(status_ == CREATE) { std::cout << "[Proceed1M]: New responder for 1-M mode" << std::endl; service_->RequestGladToSeeMe(&ctx_, &request_, &responder_, cq_, cq_, this); status_ = PROCESS ; } else if(status_ == PROCESS) { if(!new_responder_created) { new CallData1M(service_, cq_); new_responder_created = true ; std::cout << "[Proceed1M]: request message = " << request_.name() << std::endl; } static std::vector<std::string> greeting = {std::string(prefix + request_.name() + "!"), "I'm very glad to see you!", "Haven't seen you for thousand years.", "I'm server now. Call me later."}; if(mcounter >= greeting.size()) { std::cout << "[Proceed1M]: Trying finish" << std::endl; status_ = FINISH; responder_.Finish(Status(), (void*)this); } else { reply_.set_message(greeting.at(mcounter)); std::cout << "[Proceed1M]: Writing" << std::endl; responder_.Write(reply_, (void*)this); ++mcounter; } } else // if(status_ == FINISH) { std::cout << "[Proceed1M]: Good Bye" << std::endl; delete this; } } };
ServerAsyncWriter<HelloReply>
;unsigned mcounter
sent messages unsigned mcounter
;bool new_responder_created
if(status_ == CREATE)
condition if(status_ == CREATE)
, in which we indicate the current responder as a tag for a new client connection event. After that, the responder goes to the PROCESS state. When the next client request is received, the tag associated with this event will be returned from the queue. The tag received from the queue is initialized with a pointer to the object of the responder with the status PROCESS. Therefore, we will fall into the else if(status_ == PROCESS)
condition else if(status_ == PROCESS)
. When this condition is first hit, a new responder object will be created to handle the following client requests. After that, a static vector is created with server responses. Initially, the mcounter variable is zero, so we are in the else condition. In the else condition, the response text is set to the message from the response vector under the mcounter number, after which the response is sent to the client and the mcounter variable is incremented.Only one write may be outstanding at any given time. This is what you get.which means something like the following: Only one recording can be performed at a time, i.e. After calling the Write function, you need to wait for getting the tag from the queue BEFORE calling the Write function again. That is why the objects of class CallData1M store information about the number of sent messages.
else if(status_ == FINISH)
condition in which the object will be deleted. new CallData1M(&service_, cq_.get());
class AsyncClientCall1M : public CommonAsyncClientCall { std::unique_ptr< ClientAsyncReader<HelloReply> > responder; public: AsyncClientCall1M(const HelloRequest& request, CompletionQueue& cq_, std::unique_ptr<Greeter::Stub>& stub_) :CommonAsyncClientCall() { std::cout << "[Proceed1M]: new client 1-M" << std::endl; responder = stub_->AsyncGladToSeeMe(&context, request, &cq_, (void*)this); callStatus = PROCESS ; } virtual void Proceed(bool ok = true) override { if(callStatus == PROCESS) { if(!ok) { std::cout << "[Proceed1M]: Trying finish" << std::endl; responder->Finish(&status, (void*)this); callStatus = FINISH; return ; } responder->Read(&reply, (void*)this); printReply("Proceed1M"); } else if(callStatus == FINISH) { std::cout << "[Proceed1M]: Good Bye" << std::endl; delete this; } return ; } };
It is appropriate to call it asyncReaderInterface :: Read that message when it was received. result, eg cq-> Next (& read_tag, & ok) filled in 'ok' with 'false').which means something like the following: you need to call this method [Finish ()] when there are no more messages from the server (this can be found implicitly from the code or explicitly when a read request (AsyncReaderInterface :: Read) returns a bad result, i.e. ok , returned from the
cq->Next(&read_tag, &ok)
queue cq->Next(&read_tag, &ok)
will be false).Read a message of type R into msg. Completion will be noted on the associated completion queue.which means: [The Read () function] reads a message of type R into the variable msg. A notification of the completion of the read operation will be made by the tag from the queue.
void GladToSeeMe(const std::string& user) { HelloRequest request; request.set_name(user); new AsyncClientCall1M(request, cq_, stub_); }
[Proceed1M]: New responder for 1-M mode [Proceed1M]: New responder for 1-M mode [Proceed1M]: request message = client [Proceed1M]: Writing [Proceed1M]: Writing [Proceed1M]: Writing [Proceed1M]: Writing [Proceed1M]: Trying finish [Proceed1M]: Good Bye
[Proceed1M]: new client 1-M [Proceed1M]: reply message empty [Proceed1M]: reply message = Hello client! [Proceed1M]: reply message = I'm very glad to see you! [Proceed1M]: reply message = Haven't seen you for thousand years. [Proceed1M]: reply message = I'm server now. Call me later. [Proceed1M]: Trying finish [Proceed1M]: Good Bye
class CallDataM1: public CommonCallData { ServerAsyncReader<HelloReply, HelloRequest> responder_; bool new_responder_created; public: CallDataM1(Greeter::AsyncService* service, ServerCompletionQueue* cq): CommonCallData(service, cq), responder_(&ctx_), new_responder_created(false){Proceed();} virtual void Proceed(bool ok = true) override { if(status_ == CREATE) { std::cout << "[ProceedM1]: New responder for M-1 mode" << std::endl; status_ = PROCESS ; service_->RequestGladToSeeYou(&ctx_, &responder_, cq_, cq_, this); } else if(status_ == PROCESS) { if(!new_responder_created) { new CallDataM1(service_, cq_); new_responder_created = true ; } //It's time to send reply if(!ok) { std::string greeting("Hello, Client!"); reply_.set_message(greeting); std::cout << "[ProceedM1]: Sending reply" << std::endl; status_ = FINISH; responder_.Finish(reply_, Status(), (void*)this); return ; } responder_.Read(&request_, (void*)this); if(!request_.name().empty()) std::cout << "[ProceedM1]: request message =" << request_.name() << std::endl; } else // if(status_ == FINISH) { std::cout << "[ProceedM1]: Good Bye" << std::endl; delete this; } } };
ServerAsyncReader<HelloReply, HelloRequest>
;bool new_responder_created
else if(status_ == PROCESS)
condition of this class is very similar to the client class class ClientAsyncCall1M. new CallDataM1(&service_, cq_.get());
class AsyncClientCallM1 : public CommonAsyncClientCall { std::unique_ptr< ClientAsyncWriter<HelloRequest> > responder; unsigned mcounter; bool writing_mode_; public: AsyncClientCallM1(CompletionQueue& cq_, std::unique_ptr<Greeter::Stub>& stub_): CommonAsyncClientCall(), mcounter,writing_mode_(true) { std::cout << "[ProceedM1]: new client M-1" << std::endl; responder = stub_->AsyncGladToSeeYou(&context, &reply, &cq_, (void*)this); callStatus = PROCESS ; } virtual void Proceed(bool ok = true) override { if(callStatus == PROCESS) { if(writing_mode_) { static std::vector<std::string> greeting = {"Hello, server!", "Glad to see you!", "Haven't seen you for thousand years!", "I'm client now. Call me later."}; if(mcounter < greeting.size()) { HelloRequest request; request.set_name(greeting.at(mcounter)); std::cout << "[ProceedM1]: Writing" << std::endl; responder->Write(request, (void*)this); ++mcounter ; } else { responder->WritesDone((void*)this); std::cout << "[ProceedM1]: changing state to reading" << std::endl; writing_mode_ = false; return; } } else//reading mode { std::cout << "[ProceedM1]: trying finish" << std::endl; responder->Finish(&status, (void*)this); callStatus = FINISH ; } } else if(callStatus == FINISH) { assert(!reply.message().empty()); printReply("ProceedM1"); std::cout << "[ProceedM1]: Good Bye" << std::endl; delete this; } return ; } };
if(writing_mode_)
condition if(writing_mode_)
) coincides with the principle of the Proceed () function of the CallData1M class with the difference that here after sending all the messages the WritesDone () responder function is called, whereas CallData1M class calls the Finish () function. After calling the responder function WritesDone (), the client enters the reading state ( writing_mode_ = false
). In the read state, the Finish () responder function is called and the responder enters the FINISH state.It is appropriate to call upon this method. ClientAsyncWriterInterface :: WritesDone or ClientAsyncReaderWriterInterface :: WritesDone)which means something like the following: You need to call this method (Finish ()) when the client does not have more messages to send to the server (which can be declared implicitly simply by calling this method, or explicitly by calling the WritesDone () method). But since the AsyncClientCallM1 class sends several messages to the server by calling the Write () method, the Respones method of WritesDone () must be called explicitly to inform the server that there will be no more messages (unlike the AsyncClientCall1M class, which sends only one request to the server when calling the AsyncGladToSeeMe ()) function. Therefore, after calling the WritesDone () method, the responder enters the read state and the Finish () function is called in it. When an object is received from the queue in the FINISH state, the response from the server is displayed on the screen and the object is deleted.
void GladToSeeYou() { new AsyncClientCallM1(cq_, stub_); }
[ProceedM1]: New responder for M-1 mode [ProceedM1]: New responder for M-1 mode [ProceedM1]: request message = Hello, server! [ProceedM1]: request message = Glad to see you! [ProceedM1]: request message = Haven't seen you for thousand years! [ProceedM1]: request message = I'm client now. Call me later. [ProceedM1]: Sending reply [ProceedM1]: Good Bye
[ProceedM1]: new client M-1 [ProceedM1]: Writing [ProceedM1]: Writing [ProceedM1]: Writing [ProceedM1]: Writing [ProceedM1]: changing state to reading [ProceedM1]: trying finish [ProceedM1]: reply message = Hello, Client! [ProceedM1]: Good Bye
class CallDataMM: public CommonCallData { ServerAsyncReaderWriter<HelloReply, HelloRequest> responder_; unsigned mcounter; bool writing_mode_; bool new_responder_created; public: CallDataMM(Greeter::AsyncService* service, ServerCompletionQueue* cq): CommonCallData(service, cq), responder_(&ctx_), mcounter, writing_mode_(false), new_responder_created(false){Proceed();} virtual void Proceed(bool ok = true) override { if(status_ == CREATE) { std::cout << "[ProceedMM]: New responder for MM mode" << std::endl; status_ = PROCESS ; service_->RequestBothGladToSee(&ctx_, &responder_, cq_, cq_, this); } else if(status_ == PROCESS) { if(!new_responder_created) { new CallDataMM(service_, cq_); new_responder_created = true ; } if(!writing_mode_)//reading mode { if(!ok) { writing_mode_ = true; ok = true; std::cout << "[ProceedMM]: changing state to writing" << std::endl; } else { responder_.Read(&request_, (void*)this); if(!request_.name().empty()) std::cout << "[ProceedMM]: request message =" << request_.name() << std::endl; } } if(writing_mode_)//writing mode { static std::vector<std::string> greeting = {std::string(prefix + "client" "!"), "I'm very glad to see you!", "Haven't seen you for thousand years.", "How are you?", "I'm server now. Call me later."}; if(!ok || mcounter >= greeting.size()) { std::cout << "[ProceedMM]: Trying finish" << std::endl; status_ = FINISH; responder_.Finish(Status(), (void*)this); } else { reply_.set_message(greeting.at(mcounter)); responder_.Write(reply_, (void*)this); ++mcounter; } } } else // if(status_ == FINISH) { std::cout << "[ProceedMM]: Good Bye" << std::endl; delete this; } } };
ServerAsyncReaderWriter<HelloReply, HelloRequest>
;unsigned mcounter
;bool writing_mode_
;bool new_responder_created
if(!writing_mode_)
. CallDataM1 , ok false. ok false , writing_mode_ = true;
. , , . , . new CallDataM(&service_, cq_.get());
class AsyncClientCallMM : public CommonAsyncClientCall { std::unique_ptr< ClientAsyncReaderWriter<HelloRequest,HelloReply> > responder; unsigned mcounter; bool writing_mode_; public: AsyncClientCallMM(CompletionQueue& cq_, std::unique_ptr<Greeter::Stub>& stub_): CommonAsyncClientCall(), mcounter, writing_mode_(true) { std::cout << "[ProceedMM]: new client MM" << std::endl; responder = stub_->AsyncBothGladToSee(&context, &cq_, (void*)this); callStatus = PROCESS ; } virtual void Proceed(bool ok = true) override { if(callStatus == PROCESS) { if(writing_mode_) { static std::vector<std::string> greeting = {"Hello, server!", "Glad to see you!", "Haven't seen you for thousand years!", "I'm client now. Call me later."}; //std::cout << "[ProceedMM]: mcounter = " << mcounter << std::endl; if(mcounter < greeting.size()) { HelloRequest request; request.set_name(greeting.at(mcounter)); responder->Write(request, (void*)this); ++mcounter; } else { responder->WritesDone((void*)this); std::cout << "[ProceedMM]: changing state to reading" << std::endl; writing_mode_ = false; } return ; } else //reading mode { if(!ok) { std::cout << "[ProceedMM]: trying finish" << std::endl; callStatus = FINISH; responder->Finish(&status, (void*)this); return; } responder->Read(&reply, (void*)this); printReply("ProceedMM"); } return; } else if(callStatus == FINISH) { std::cout << "[ProceedMM]: Good Bye" << std::endl; delete this; } } };
if(writing_mode_)
. (greeting.size() — 1) , WritesDone() . Proceed(), CallDataMM , . , , ok false. ok false , Finish() FINISH. FINISH, . void BothGladToSee() { new AsyncClientCallMM(cq_, stub_); }
[ProceedMM]: New responder for MM mode [ProceedMM]: New responder for MM mode [ProceedMM]: request message = Hello, server! [ProceedMM]: request message = Glad to see you! [ProceedMM]: request message = Haven't seen you for thousand years! [ProceedMM]: request message = I'm client now. Call me later. [ProceedMM]: changing state to writing [ProceedMM]: Trying finish [ProceedMM]: Good Bye
[ProceedMM]: new client MM [ProceedMM]: changing state to reading [ProceedMM]: reply message empty [ProceedMM]: reply message = Hello client! [ProceedMM]: reply message = I'm very glad to see you! [ProceedMM]: reply message = Haven't seen you for thousand years. [ProceedMM]: reply message = How are you? [ProceedMM]: reply message = I'm server now. Call me later. [ProceedMM]: trying finish [ProceedMM]: Good Bye
Source: https://habr.com/ru/post/340758/
All Articles