📜 ⬆️ ⬇️

"Boost.Asio C ++ Network Programming". Chapter 5: Synchronous vs. Asynchronous

Hello!
I continue to translate the book of John Torjo "Boost.Asio C ++ Network Programming".

Content:


The authors of Boost.Asio did a wonderful job, giving us the opportunity to choose what suits our applications more by choosing a synchronous or asynchronous path.
In the previous chapter, we saw frameworks for all types of applications, such as a synchronous client, a synchronous server, as well as their asynchronous variants. You can use each of them as a basis for your application. If there is a need to delve into the details of each type of application, then read on.
')


Mixing synchronous and asynchronous programming


Library Boost.Asio allows you to mix synchronous and asynchronous programming. Personally, I think this is a bad idea, but Boost.Asio, like C ++ as a whole, allows you to shoot yourself in the foot if you want.
You can easily fall into the trap, especially if your application is asynchronous. For example, in response to an asynchronous write operation, you say, you make an asynchronous read operation:

io_service service; ip::tcp::socket sock(service); ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 8001); void on_write(boost::system::error_code err, size_t bytes) { char read_buff[512]; read(sock, buffer(read_buff)); } async_write(sock, buffer("echo"), on_write); 

Surely a synchronous read operation will block the current thread, so any other incomplete asynchronous operations will be in standby mode (for this thread). This is a bad code and can cause the application to slow down or block at all (the whole point of using the asynchronous approach is to avoid blocking, so using synchronous operations, you deny it). If you have a synchronous application, it is unlikely that you will use asynchronous read or write operations, since to think synchronously means to think in a linear fashion (to do A, then B, then C, and so on).
The only case, in my opinion, when synchronous and asynchronous operations can work together is when they are completely separated from each other, for example, a synchronous network and asynchronous input and output operations from the database.

Delivery of messages from the client to the server and vice versa


A very important part of a good client / server application is delivering messages back and forth (from server to client and from client to server). You must specify that identifies the message. In other words, when an incoming message is being read, how can we know that the message has been completely read?
You need to determine the end of the message (the beginning is easy to determine, this is the first byte that came after the end of the last message), but you will see that it is not so easy.
You can:

Throughout the book, I decided to use "the character '\ n' as the end of each message." So, reading messages will show the following code snippet:

 char buff_[512]; // synchronous read read(sock_, buffer(buff_), boost::bind(&read_complete, this, _1, _2)); // asynchronous read async_read(sock_ buffer(buff_),MEM_FN2(read_complete,_1,_2), MEM_FN2(on_read,_1,_2)); size_t read_complete(const boost::system::error_code & err, size_t bytes) { if ( err) return 0; already_read_ = bytes; bool found = std::find(buff_, buff_ + bytes, '\n') < buff_ + bytes; // we read one-by-one until we get to enter, no buffering return found ? 0 : 1; } 

Leaving the length indication as the prefix of the message as an exercise for the reader, is pretty easy.

Synchronous I / O in client applications


A synchronous client, as a rule, is of two types:




Both scenarios use the following strategy: make a request - read the answer. In other words, one side makes a request, to which the other side responds. This is an easy way to implement a client / server application and this is what I recommend to you.
You can always create a Mambo Jambo client / server where each side writes at any time, but it’s very likely that this path will lead to disaster (how do you know what happened when the client or server is blocked?).
The previous scenarios may look the same, but, they are very different:

Basically you will encounter pull-like client / server applications that facilitate development, as well as, as a rule, are the norm.
You can mix these two approaches: get on demand (client-server) and push request (server-client), however, it is difficult and better to avoid it. There is a problem of mixing these two approaches, if you use a strategy to make a request - read the answer; the following may occur:

In a pull-like client / server application, the previous scenario could easily have been avoided. You can simulate push-like behavior by implementing the ping process, when a client checks the connection to the server, say, every 5 seconds. The server may respond with something like ping_ok if there is nothing to report or ping_ [event_name] if there is an event to alert. The client can then initiate a new request to handle this event.
Again, the previous script illustrates the synchronous client from the previous chapter. Its main loop is:

 void loop() { // read answer to our login write("login " + username_ + "\n"); read_answer(); while ( started_) { write_request(); read_answer(); ... } } 

Let change to fit the latest scenario:

 void loop() { while ( started_) { read_notification(); write_answer(); } } void read_notification() { already_read_ = 0; read(sock_, buffer(buff_), boost::bind(&talk_to_svr::read_complete, this, _1, _2)); process_notification(); } void process_notification() { // ... see what the notification is, and prepare answer } 


Synchronous I / O in server applications


Servers, like clients, are of two types, they correspond to the two scenarios from the previous section. Again, both scenarios use a strategy: create a request - read the answer.



The first scenario is a synchronous server, which we implemented in the previous chapter . Reading the request completely is not easy if you are working synchronously, since you want to avoid locks (you always read as much as you can).

 void read_request() { if ( sock_.available()) already_read_ += sock_.read_some(buffer(buff_ + already_read_, max_msg –already_read_)); } 

After the message has been completely read, simply process it and respond to the client:

 void process_request() { bool found_enter = std::find(buff_, buff_ + already_read_, '\n') < buff_ + already_read_; if ( !found_enter) return; // message is not full size_t pos = std::find(buff_, buff_ + already_read_, '\n') - buff_; std::string msg(buff_, pos); ... if ( msg.find("login ") == 0) on_login(msg); else if ( msg.find("ping") == 0) on_ping(); else ... } 

If we wanted our server to become a push-like server, we would change it as follows:

 typedef std::vector<client_ptr> array; array clients; array notify; std::string notify_msg; void on_new_client() { // on a new client, we notify all clients of this event notify = clients; std::ostringstream msg; msg << "client count " << clients.size(); notify_msg = msg.str(); notify_clients(); } void notify_clients() { for ( array::const_iterator b = notify.begin(), e = notify.end(); b != e; ++b) { (*b)->sock_.write_some(notify_msg); } } 

The on_new_client() function is a function of a single event, where we must notify all clients about it. notify_clients is a function that will notify clients that are subscribed to this event. The server sends a message, but does not wait for a response from each client, as this can lead to blocking. When a response comes from the client, the client can tell us that this is exactly the answer to our notification (and we can process it correctly).

Streams in the synchronous server


This is a very important factor: how many streams will we allocate for processing clients?
For a synchronous server, we will need at least one stream that will handle new connections:

 void accept_thread() { ip::tcp::acceptor acceptor(service, ip::tcp::endpoint(ip::tcp::v4(),8001)); while ( true) { client_ptr new_( new talk_to_client); acceptor.accept(new_->sock()); boost::recursive_mutex::scoped_lock lk(cs); clients.push_back(new_); } } 

For existing customers:

The third option is very difficult to implement in a synchronous server. The entire talk_to_client class talk_to_client become thread safe. Then you will need a special mechanism to know which threads handle which clients. To do this, you have two options:

The following code, which is similar to the original answer_to_client function, shows how the last script can be implemented:

 struct talk_to_client : boost::enable_shared_from_this<talk_to_client> { ... void answer_to_client() { try { read_request(); process_request(); } catch ( boost::system::system_error&) { stop(); } } }; 

We will modify it as shown below:

 struct talk_to_client : boost::enable_shared_from_this<talk_to_client> { boost::recursive_mutex cs; boost::recursive_mutex cs_ask; bool in_process; void answer_to_client() { { boost::recursive_mutex::scoped_lock lk(cs_ask); if ( in_process) return; in_process = true; } { boost::recursive_mutex::scoped_lock lk(cs); try { read_request(); process_request(); } catch ( boost::system::system_error&) { stop(); } } { boost::recursive_mutex::scoped_lock lk(cs_ask); in_process = false; } } }; 

As long as we process the client, its in_process instance will be set to true , and other threads will ignore this client. An added bonus is that the handle_clients_thread() function cannot be modified; you can simply create as many handle_clients_thread() functions as you like.

Asynchronous I / O in client applications


The main workflow is somewhat similar to the same process in a synchronous client application, with the difference that Boost.Asio is between each async_read and async_write .



The first scenario is the same as the asynchronous client in Chapter 4 . Remember that at the end of each asynchronous operation, you must start another asynchronous operation so that the service.run() function does not complete its activity.
To bring the first script to the second, we need to use the following code snippet:

 void on_connect() { do_read(); } void do_read() { async_read(sock_, buffer(read_buffer_), MEM_FN2(read_complete,_1,_2), MEM_FN2(on_read,_1,_2)); } void on_read(const error_code & err, size_t bytes) { if ( err) stop(); if ( !started() ) return; std::string msg(read_buffer_, bytes); if ( msg.find("clients") == 0) on_clients(msg); else ... } void on_clients(const std::string & msg) { std::string clients = msg.substr(8); std::cout << username_ << ", new client list:" << clients ; do_write("clients ok\n"); } 

Note that as soon as you successfully connect, you start reading from the server. Each on_[event] function ends it and writes a response to the server.
The beauty of the asynchronous approach is that you can mix network I / O operations with any other asynchronous operations using Boost.Asio to organize it all. Even though the flow is not as clear as the synchronous flow, you can practically think of it as synchronous.
Let's say you read a file from a web server and save it to a database (asynchronously). You can practically think about it, as shown in the following flowchart:



Asynchronous I / O in server applications


Again, there are two ubiquitous cases, the first scenario (pull) and the second scenario (push):



The first asynchronous server script was implemented in the previous chapter . At the end of each asynchronous operation, you must start another asynchronous operation so that service.run() does not stop its operation.
Here is the code frame, which is trimmed. Below are all the members of the talk_to_client class:

 void start() { ... do_read(); // first, we wait for client to login } void on_read(const error_code & err, size_t bytes) { std::string msg(read_buffer_, bytes); if ( msg.find("login ") == 0) on_login(msg); else if ( msg.find("ping") == 0) on_ping(); else ... } void on_login(const std::string & msg) { std::istringstream in(msg); in >> username_ >> username_; do_write("login ok\n"); } void do_write(const std::string & msg) { std::copy(msg.begin(), msg.end(), write_buffer_); sock_.async_write_some( buffer(write_buffer_, msg.size()), MEM_FN2(on_write,_1,_2)); } void on_write(const error_code & err, size_t bytes) { do_read(); } 

In a nutshell, we always wait for the read operation, as soon as it is completed, we process the message and reply back to the client.
Let's convert the previous code to the push server:

 void start() { ... on_new_client_event(); } void on_new_client_event() { std::ostringstream msg; msg << "client count " << clients.size(); for ( array::const_iterator b = clients.begin(), e = clients.end();b != e; ++b) (*b)->do_write(msg.str()); } void on_read(const error_code & err, size_t bytes) { std::string msg(read_buffer_, bytes); // basically here, we only acknowledge // that our clients received our notifications } void do_write(const std::string & msg) { std::copy(msg.begin(), msg.end(), write_buffer_); sock_.async_write_some( buffer(write_buffer_, msg.size()), MEM_FN2(on_write,_1,_2)); } void on_write(const error_code & err, size_t bytes) { do_read(); } 

When an event occurs, say on_new_client_event , messages will be sent to all clients that need to be informed about this event. When they answer, we will understand that they have processed the received event. Please note that we will never finish waiting for events asynchronously (therefore service.run() will not finish working), since we are always waiting for new customers.

Asynchronous Server Streams


The asynchronous server was shown in Chapter 4, it is single-threaded, since everything happens in the main() function:

 int main() { talk_to_client::ptr client = talk_to_client::new_(); acc.async_accept(client->sock(), boost::bind(handle_accept,client,_1)); service.run(); } 

The beauty of the asynchronous approach lies in the simplicity of the transition from a single-stream to a multi-stream variant. You can always go one way, at least until your customers are over 200 at the same time or so. Then, to go from one thread to 100 threads, you will need to use the following code snippet:

 boost::thread_group threads; void listen_thread() { service.run(); } void start_listen(int thread_count) { for ( int i = 0; i < thread_count; ++i) threads.create_thread( listen_thread); } int main(int argc, char* argv[]) { talk_to_client::ptr client = talk_to_client::new_(); acc.async_accept(client->sock(), boost::bind(handle_accept,client,_1)); start_listen(100); threads.join_all(); } 

Of course, once you start using multi-threading, you should think about thread-safe. Even if you call async_* in thread A, the procedure for its completion can be called in thread B (as long as thread B calls service.run() ). This in itself is not a problem. As long as you follow the logical sequence, that is, from async_read() to on_read() , from on_read() to process_reques t, from process_request to async_write() , from async_write() to on_write() , from on_write() to async_read() and there are no public functions that would call your talk_to_client class, although different functions can be called in different threads, they will still be called sequentially. Thus, mutexes are not needed.
This, however, means that there can be only one asynchronous operation pending for a client. If at some point the client has two deferred asynchronous functions, then you need mutexes. Because the two deferred operations can end at about the same time and eventually we could call their handlers simultaneously in two different threads. Thus, there is a need for thread-safety, thus, in mutexes.
In fact, our asynchronous server actually has two pending operations at the same time:

 void do_read() { async_read(sock_, buffer(read_buffer_), MEM_FN2(read_complete,_1,_2), MEM_FN2(on_read,_1,_2)); post_check_ping(); } void post_check_ping() { timer_.expires_from_now(boost::posix_time::millisec(5000)); timer_.async_wait( MEM_FN(on_check_ping)); } 

When performing a read operation, we will wait asynchronously for its completion for a certain period. Thus, there is a need for flow-safety. My advice, if you plan on choosing a multi-stream option, make your class thread safe from the very beginning. This usually does not hurt performance (you can certainly check it out). In addition, if you plan to go multi-line, then go on it from the very beginning. Thus, you will encounter possible problems at an early stage. As soon as you find a problem, the first thing you have to check is whether it happens with one thread running? If so, it's easy, just debug it. Otherwise, you probably forgot to mutex a function.
Since our example needs thread-safe, we changed talk_to_client using mutexes. In addition, we have an array of clients that we refer to several times in the code, which also needs its own mutex.
It’s not so easy to avoid deadlock and memory damage. Here's how to update_clients_changed() function:

 void update_clients_changed() { array copy; { boost::recursive_mutex::scoped_lock lk(clients_cs); copy = clients; } for( array::iterator b = copy.begin(), e = copy.end(); b != e; ++b) (*b)->set_clients_changed(); } 

What we want to avoid is that two mutexes are locked at the same time (which can lead to a deadlock situation). In our case, we do not want the clients_cs and client cs_ mutex to be locked at the same time.

Asynchronous operations


Boost.Asio also allows you to perform any of your functions asynchronously. Just use the following code snippet:

 void my_func() { ... } service.post(my_func); 

You can make sure that my_func is called on one of the threads that call service.run() . You can also run an asynchronous function and make a terminating handler that will tell you when the function ends. The pseudocode will look like this:

 void on_complete() { ... } void my_func() { ... service.post(on_complete); } async_call(my_func); 

There is no async_call function async_call , you have to create your own. Fortunately, this is not so difficult. See the following code snippet:

 struct async_op : boost::enable_shared_from_this<async_op>, ... { typedef boost::function<void(boost::system::error_code)> completion_func; typedef boost::function<boost::system::error_code ()> op_func; struct operation { ... }; void start() { { boost::recursive_mutex::scoped_lock lk(cs_); if ( started_) return; started_ = true; } boost::thread t( boost::bind(&async_op::run,this)); } void add(op_func op, completion_func completion, io_service &service) { self_ = shared_from_this(); boost::recursive_mutex::scoped_lock lk(cs_); ops_.push_back( operation(service, op, completion)); if ( !started_) start(); } void stop() { boost::recursive_mutex::scoped_lock lk(cs_); started_ = false; ops_.clear(); } private: boost::recursive_mutex cs_; std::vector<operation> ops_; bool started_; ptr self_; }; 

async_op , ( run() ) , ( add() ) . - , :

, , io_service::work , service.run() , ( , io_service::work, service.run() , ). :

 struct async_op : ... { typedef boost::shared_ptr<async_op> ptr; static ptr new_() { return ptr(new async_op); } ... void run() { while ( true) { { boost::recursive_mutex::scoped_lock lk(cs_); if ( !started_) break; } boost::this_thread::sleep( boost::posix_time::millisec(10)); operation cur; { boost::recursive_mutex::scoped_lock lk(cs_); if ( !ops_.empty()) { cur = ops_[0]; ops_.erase( ops_.begin()); } } if ( cur.service) cur.service->post(boost::bind(cur.completion, cur.op() )); } self_.reset(); } }; 

run() , , ; , . .
, compute_file_checksum , :

 size_t checksum = 0; boost::system::error_code compute_file_checksum(std::string file_name) { HANDLE file = ::CreateFile(file_name.c_str(), GENERIC_READ, 0, 0, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, 0); windows::random_access_handle h(service, file); long buff[1024]; checksum = 0; size_t bytes = 0, at = 0; boost::system::error_code ec; while ( (bytes = read_at(h, at, buffer(buff), ec)) > 0) { at += bytes; bytes /= sizeof(long); for ( size_t i = 0; i < bytes; ++i) checksum += buff[i]; } return boost::system::error_code(0, boost::system::generic_category()); } void on_checksum(std::string file_name, boost::system::error_code) { std::cout << "checksum for " << file_name << "=" << checksum << std::endl; } int main(int argc, char* argv[]) { std::string fn = "readme.txt"; async_op::new_()->add( service, boost::bind(compute_file_checksum,fn), boost::bind(on_checksum,fn,_1)); service.run(); } 

, . , , io_service , ( post() ) . .
, (, ). , -, .


. , , . , , .



-? , , , . .
, ; ( ), ( ). , , , , , , .
:

. ( on_start() ):

 class proxy : public boost::enable_shared_from_this<proxy> { ... void on_read(ip::tcp::socket & sock, const error_code& err, size_t bytes) { char * buff = &sock == &client_ ? buff_client_ : buff_server_; do_write(&sock == &client_ ? server_ : client_, buff, bytes); } void on_write(ip::tcp::socket & sock, const error_code &err, size_t bytes) { if ( &sock == &client_) do_read(server_, buff_server_); else do_read(client_, buff_client_); } void do_read(ip::tcp::socket & sock, char* buff) { async_read(sock, buffer(buff, max_msg), MEM_FN3(read_complete,ref(sock),_1,_2), MEM_FN3(on_read,ref(sock),_1,_2)); } void do_write(ip::tcp::socket & sock, char * buff, size_t size) { sock.async_write_some(buffer(buff,size), MEM_FN3(on_write,ref(sock),_1,_2)); } size_t read_complete(ip::tcp::socket & sock, const error_code & err, size_t bytes) { if ( sock.available() > 0) return sock.available(); return bytes > 0 ? 0 : 1; } }; 

(on_read) . (on_write), .
, :

 int main(int argc, char* argv[]) { ip::tcp::endpoint ep_c( ip::address::from_string("127.0.0.1"), 8001); ip::tcp::endpoint ep_s( ip::address::from_string("127.0.0.1"), 8002); proxy::start(ep_c, ep_s); service.run(); } 

, ( buff_client_ buff_server_ ) . , , . , . , , ( ). , , :

.

Summary


, , : .
:

Boost.Asio, Boost.Asio – co-routines , .

Resources for this article: link

Thank you all for your attention, until we meet again!

Source: https://habr.com/ru/post/196354/


All Articles