📜 ⬆️ ⬇️

"Boost.Asio C ++ Network Programming". Chapter 2: Boost.Asio Basics Part 2

Hello!
I continue to translate the book of John Torjo "Boost.Asio C ++ Network Programming". In this part of the second chapter we will talk about asynchronous programming.

Content:


This section deeply examines some of the issues that you will encounter when working with asynchronous programming. Having read it once, I suggest that you return to it as you go through the book in order to strengthen your understanding of these concepts.
')

Need to work asynchronously


As I said, as a rule, synchronous programming is much simpler than asynchronous. Because it is much easier to think linearly (we call function A, after it ends we call its handler, we call function B, after we finish it we call its handler, and so on, so we can think of a handler event in a manner). In the latter case, you can have, say, five events and you can never know the order in which they are performed, and you won’t even know if they all will be executed!
But even though asynchronous programming is more difficult, you will most likely prefer it, say, in writing servers that have to deal with a large number of clients at the same time. The more clients you have, the easier the asynchronous programming compared to synchronous.
Let's say you have an application that simultaneously deals with 1000 clients, each message from the client to the server and from the server to the client ends with a '\ n'.
Synchronous code, 1 thread:

using namespace boost::asio; struct client { ip::tcp::socket sock; char buff[1024]; // each msg is at maximum this size int already_read; // how much have we already read? }; std::vector<client> clients; void handle_clients() { while ( true) for ( int i = 0; i < clients.size(); ++i) if ( clients[i].sock.available() ) on_read(clients[i]); } void on_read(client & c) { int to_read = std::min( 1024 - c.already_read, c.sock. available()); c.sock.read_some( buffer(c.buff + c.already_read, to_read)); c.already_read += to_read; if ( std::find(c.buff, c.buff + c.already_read, '\n') < c.buff + c.already_read) { int pos = std::find(c.buff, c.buff + c.already_read, '\n') - c.buff; std::string msg(c.buff, c.buff + pos); std::copy(c.buff + pos, c.buff + 1024, c.buff); c.already_read -= pos; on_read_msg(c, msg); } } void on_read_msg(client & c, const std::string & msg) { // analyze message, and write back if ( msg == "request_login") c.sock.write( "request_ok\n"); else if ... } 

One thing that you want to avoid when writing servers (and basically any network application) is for the code to stop responding. In our case, we want the handle_clients() function to block as little as possible. If the function is blocked at any point, then all incoming messages from the client will wait for the function to unlock and begin processing them.
In order to remain responsive, we will read from the socket only when it has data, that is, if ( clients[i].sock.available() ) on_read(clients[i]) . In on_read, we will only read as much as is available; calling read_until(c.sock, buffer(...),'\n') would not be a very good idea, since it is blocked until we read the message from a specific client to the end (we will never know when this will happen) .
The bottleneck here is the on_read_msg() function; all incoming messages will be suspended until this function is executed. A well-written on_read_msg() function will ensure that this does not happen, but it can still happen (sometimes writing to a socket can be blocked, for example, if its buffer is full).
Synchronous code, 10 threads:

 using namespace boost::asio; struct client { // ... same as before bool set_reading() { boost::mutex::scoped_lock lk(cs_); if ( is_reading_) return false; // already reading else { is_reading_ = true; return true; } } void unset_reading() { boost::mutex::scoped_lock lk(cs_); is_reading_ = false; } private: boost::mutex cs_; bool is_reading_; }; std::vector<client> clients; void handle_clients() { for ( int i = 0; i < 10; ++i) boost::thread( handle_clients_thread); } void handle_clients_thread() { while ( true) for ( int i = 0; i < clients.size(); ++i) if ( clients[i].sock.available() ) if ( clients[i].set_reading()) { on_read(clients[i]); clients[i].unset_reading(); } } void on_read(client & c) { // same as before } void on_read_msg(client & c, const std::string & msg) { // same as before } 

In order to use multiple threads, we need to synchronize them, which is what the set_reading () and set_unreading() functions do. The set_reading() function is very important. You want to "check whether you can read and start reading" was performed in one step. If you do this in two steps (“check whether you can read” and “start reading”), then you can start two streams: one to check for reading for a client, the other to call the on_read function for the same client, in the end, this can lead to data corruption and possibly even a system crash.
You will notice that the code is becoming more complex.
A third option is also possible for synchronous code, namely, to have one thread per client. But as the number of simultaneous customers grows, this largely becomes an unacceptable operation.
Now consider the asynchronous options. We are constantly doing asynchronous read operation. When the client makes a request, the on_read operation is on_read , we respond in response, and then we wait for the next request to arrive (we start another asynchronous read operation).
Asynchronous code, 10 threads:

 using namespace boost::asio; io_service service; struct client { ip::tcp::socket sock; streambuf buff; // reads the answer from the client } std::vector<client> clients; void handle_clients() { for ( int i = 0; i < clients.size(); ++i) async_read_until(clients[i].sock, clients[i].buff, '\n', boost::bind(on_read, clients[i], _1, _2)); for ( int i = 0; i < 10; ++i) boost::thread(handle_clients_thread); } void handle_clients_thread() { service.run(); } void on_read(client & c, const error_code & err, size_t read_bytes) { std::istream in(&c.buff); std::string msg; std::getline(in, msg); if ( msg == "request_login") c.sock.async_write( "request_ok\n", on_write); else if ... ... // now, wait for the next read from the same client async_read_until(c.sock, c.buff, '\n', boost::bind(on_read, c, _1, _2)); } 

Notice how much simpler the code has become. The client structure has only two members, handle_clients() simply calls async_read_until , and then creates ten threads, each of which calls service.run() . These threads will handle all asynchronous read or write operations to the client. Another thing to note is that the on_read() function will constantly prepare for the next asynchronous read operation (see the last line).

Asynchronous functions run (), run_one (), poll (), poll_one ()


To implement a listen loop, the io_service class provides four functions, such as run(), run_one(), poll() , and poll_one() . Although most of the time you will be working with service.run() . Here you will learn what can be achieved with the help of other functions.

Constantly working

Once again, run() will work until the pending operations are completed or until you call io_service::stop() yourself. To keep an io_service instance working, you typically add one or more asynchronous operations, and when they end, you continue to add, as shown in the following code:

 using namespace boost::asio; io_service service; ip::tcp::socket sock(service); char buff_read[1024], buff_write[1024] = "ok"; void on_read(const boost::system::error_code &err, std::size_t bytes) ; void on_write(const boost::system::error_code &err, std::size_t bytes) { sock.async_read_some(buffer(buff_read), on_read); } void on_read(const boost::system::error_code &err, std::size_t bytes) { // ... process the read ... sock.async_write_some(buffer(buff_write,3), on_write); } void on_connect(const boost::system::error_code &err) { sock.async_read_some(buffer(buff_read), on_read); } int main(int argc, char* argv[]) { ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 2001); sock.async_connect(ep, on_connect); service.run(); } 

When service.run() is called, at least one asynchronous operation is pending. When the socket connects to the server, on_connect is on_connect , which adds another asynchronous operation. After the end of work on_connect , we have one scheduled operation ( read ). When the on_read operation is on_read , we write the answer, another scheduled operation ( write ) is added. When the on_write function is on_write , we read the following message from the server, which will add another scheduled operation. When the on_write function on_write , we have one scheduled operation ( read ). And so the cycle continues until we decide to close the application.

Functions run_one (), poll (), poll_one ()

It was noted earlier that handlers of asynchronous functions are called on the same thread in which io_service::run was called. This is noted for simplicity, because at least 90 to 95 percent of the time is the only function you'll use. The same is true for run_one(), poll() , or poll_one() in the stream.
The run_one() function will perform and send more than one asynchronous operation:

You may consider the following equivalent code:

 io_service service; service.run(); // OR while ( !service.stopped()) service.run_once(); 

You can use run_once() to start an asynchronous operation, and then wait for it to complete:

 io_service service; bool write_complete = false; void on_write(const boost::system::error_code & err, size_t bytes) { write_complete = true; } ... std::string data = "login ok"; write_complete = false; async_write(sock, buffer(data), on_write); do service.run_once() while (!write_complete); 

There are also some examples that use run_one() along with Boost.Asio, for example blocking_tcp_client.cpp and blocking_udp_client.cpp . The poll_one function runs no more than one deferred operation that is ready to start without blocking:

A delayed operation that is ready to be started without blocking is usually one of the following:

You can use poll_one to make sure that all the I / O handlers are running and proceed to the following tasks:

 io_service service; while ( true) { // run all handlers of completed IO operations while ( service.poll_one()) ; // ... do other work here ... } 

The poll() function will perform all operations that are pending, and can be run without blocking. The following code is equivalent:

 io_service service; service.poll(); // OR while ( service.poll_one()) ; 

All previous functions will throw exceptions boost::system::system_error in case of failure. But this should never happen; an error thrown here usually results in a crash, maybe an error in the resources or one of your handlers threw an exception. In any case, each of the functions has an overload, which does not throw exceptions, but takes boost::system::error_code as an argument and sets it as a return value.

 io_service service; boost::system::error_code err = 0; service.run(err); if ( err) std::cout << "Error " << err << std::endl; 


Asynchronous operation


Asynchronous operation is not only asynchronous processing of clients connecting to the server, asynchronous reading from and writing to the socket. This covers any operations that can be performed asynchronously.
By default, you do not know the order in which the handlers of all asynchronous functions are called. In addition, usually the following calls are asynchronous (outgoing from an asynchronous socket read / write / receive). You can use service.post() to add a custom function that will be called asynchronously, for example:

 #include <boost/thread.hpp> #include <boost/bind.hpp> #include <boost/asio.hpp> #include <iostream> using namespace boost::asio; io_service service; void func(int i) { std::cout << "func called, i= " << i << std::endl; } void worker_thread() { service.run(); } int main(int argc, char* argv[]) { for ( int i = 0; i < 10; ++i) service.post(boost::bind(func, i)); boost::thread_group threads; for ( int i = 0; i < 3; ++i) threads.create_thread(worker_thread); // wait for all threads to be created boost::this_thread::sleep( boost::posix_time::millisec(500)); threads.join_all(); } 

In the previous example, service.post(some_function) adds an asynchronous function call. This function is immediately terminated, after an io_service instance is io_service to call this some_function in one of the threads, which calls service.run() . In our case, we created one of the three streams in advance. You cannot be sure of the order in which the asynchronous functions will be called. You should not expect them to be called in the order of their addition ( post() ). A possible result of the previous example is as follows:

 func called, i= 0 func called, i= 2 func called, i= 1 func called, i= 4 func called, i= 3 func called, i= 6 func called, i= 7 func called, i= 8 func called, i= 5 func called, i= 9 

There may be a time when you want to assign a handler for some asynchronous function. Let's say you have to go to a restaurant ( go_to_restaurant ), place an order ( order ) and eat ( eat ). You want to first come to the restaurant, make an order, and only then eat. To do this, you will use io_service::strand , which will assign which asynchronous handler to call. Consider the following example:

 using namespace boost::asio; io_service service; void func(int i) { std::cout << "func called, i= " << i << "/" << boost::this_thread::get_id() << std::endl; } void worker_thread() { service.run(); } int main(int argc, char* argv[]) { io_service::strand strand_one(service), strand_two(service); for ( int i = 0; i < 5; ++i) service.post( strand_one.wrap( boost::bind(func, i))); for ( int i = 5; i < 10; ++i) service.post( strand_two.wrap( boost::bind(func, i))); boost::thread_group threads; for ( int i = 0; i < 3; ++i) threads.create_thread(worker_thread); // wait for all threads to be created boost::this_thread::sleep( boost::posix_time::millisec(500)); threads.join_all(); } 

In the above code, we see that the first five and the last five thread IDs are output sequentially, namely, func called, i = 0 will be output before func called, i = 1 , which will be displayed before func called, i = 2 and so on . The same for func called, i = 5 , which will be displayed before func called, i = 6 and func called, i = 6 will be displayed before func called, i = 7 and so on. It should be noted that even if functions are called sequentially, this does not mean that they will all be called in one thread. A possible implementation of this program may be as follows:

 func called, i= 0/002A60C8 func called, i= 5/002A6138 func called, i= 6/002A6530 func called, i= 1/002A6138 func called, i= 7/002A6530 func called, i= 2/002A6138 func called, i= 8/002A6530 func called, i= 3/002A6138 func called, i= 9/002A6530 func called, i= 4/002A6138 


Asynchronous post () vs. dispatch () vs. wrap ()

Boost.Asio provides three ways to add a function handler for an asynchronous call:

You saw an example of using service.post() in the previous section, as well as a possible result of the program execution. Change it and see how service.dispatch affects the result:

 using namespace boost::asio; io_service service; void func(int i) { std::cout << "func called, i= " << i << std::endl; } void run_dispatch_and_post() { for ( int i = 0; i < 10; i += 2) { service.dispatch(boost::bind(func, i)); service.post(boost::bind(func, i + 1)); } } int main(int argc, char* argv[]) { service.post(run_dispatch_and_post); service.run(); } 

Before explaining what is happening here, let's look at the result by running the program:

 func called, i= 0 func called, i= 2 func called, i= 4 func called, i= 6 func called, i= 8 func called, i= 1 func called, i= 3 func called, i= 5 func called, i= 7 func called, i= 9 

First, even numbers are written, and then odd numbers. This is because we use dispatch() to write even numbers and post() to write odd numbers. dispatch() will call the handler before it completes, because the current thread called service.run() , while post () terminates right away.
Now let's talk about service.wrap(handler) . wrap () returns a functor that can be used as an argument to another function:

 using namespace boost::asio; io_service service; void dispatched_func_1() { std::cout << "dispatched 1" << std::endl; } void dispatched_func_2() { std::cout << "dispatched 2" << std::endl; } void test(boost::function<void()> func) { std::cout << "test" << std::endl; service.dispatch(dispatched_func_1); func(); } void service_run() { service.run(); } int main(int argc, char* argv[]) { test( service.wrap(dispatched_func_2)); boost::thread th(service_run); boost::this_thread::sleep( boost::posix_time::millisec(500)); th.join(); } 

String test(service.wrap(dispatched_func_2)); will wrap dispatched_func_2 and create a functor that will be passed to test as an argument. When test() called, it redirects the call to dispatched_func_1() and calls func() . At this point, you will see that calling func() equivalent to service.dispatch(dispatched_func_2) , since they are called sequentially. The output of the program confirms this:

 test dispatched 1 dispatched 2 

The io_service::strand class (used to serialize asynchronous actions) also contains the poll(), dispatch() and wrap() functions. Their meaning is the same as the poll(), dispatch() and wrap() io_service from io_service . However, most of the time, you will only use the io_service::strand::wrap() function as an argument for io_service::poll() or io_service::dispatch() .

Stay alive


You say, performing the following operation:

 io_service service; ip::tcp::socket sock(service); char buff[512]; ... read(sock, buffer(buff)); 

In this case, sock and buff must both survive the read() call. In other words, they must be valid after the read() call is completed. This is exactly what you expect, all the arguments you pass to the function must be valid inside it. Everything becomes more complicated when we go asynchronously:

 io_service service; ip::tcp::socket sock(service); char buff[512]; void on_read(const boost::system::error_code &, size_t) {} ... async_read(sock, buffer(buff), on_read); 

In this case, sock and buff must survive the read operation itself, but we do not know when this will happen, since it is asynchronous.
When using socket buffers, you can have a buffer instance that survived the asynchronous call (using boost::shared_array<> ). Here we can use the same principle by creating a class that internally contains a socket and buffers for reading / writing. Then for all asynchronous calls we pass the boost::bind shared pointer functor:

 using namespace boost::asio; io_service service; struct connection : boost::enable_shared_from_this<connection> { typedef boost::system::error_code error_code; typedef boost::shared_ptr<connection> ptr; connection() : sock_(service), started_(true) {} void start(ip::tcp::endpoint ep) { sock_.async_connect(ep, boost::bind(&connection::on_connect, shared_from_this(), _1)); } void stop() { if ( !started_) return; started_ = false; sock_.close(); } bool started() { return started_; } private: void on_connect(const error_code & err) { // here you decide what to do with the connection: read or write if ( !err) do_read(); else stop(); } void on_read(const error_code & err, size_t bytes) { if ( !started() ) return; std::string msg(read_buffer_, bytes); if ( msg == "can_login") do_write("access_data"); else if ( msg.find("data ") == 0) process_data(msg); else if ( msg == "login_fail") stop(); } void on_write(const error_code & err, size_t bytes) { do_read(); } void do_read() { sock_.async_read_some(buffer(read_buffer_), boost::bind(&connection::on_read, shared_from_this(), _1, _2)); } void do_write(const std::string & msg) { if ( !started() ) return; // note: in case you want to send several messages before // doing another async_read, you'll need several write buffers! std::copy(msg.begin(), msg.end(), write_buffer_); sock_.async_write_some(buffer(write_buffer_, msg.size()), boost::bind(&connection::on_write, shared_from_this(), _1, _2)); } void process_data(const std::string & msg) { // process what comes from server, and then perform another write } private: ip::tcp::socket sock_; enum { max_msg = 1024 }; char read_buffer_[max_msg]; char write_buffer_[max_msg]; bool started_; }; int main(int argc, char* argv[]) { ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 8001); connection::ptr(new connection)->start(ep); } 

In all asynchronous calls, the boost::bind functor is sent as an argument. This functor internally stores the shared pointer on the connection instance. While an asynchronous operation is pending, Boost.Asio will store a copy of the boost::bind functor, which in turn stores the shared pointer on the connection . Problem solved!
Of course, the connection class is only a skeleton class; You will need to adapt it to your needs (in the case of a server, it will look very different). Notice how easy you are creating a new connection::ptr(new connection)->start(ep) . This begins a (asynchronous) connection to the server. If you want to close the connection, you call stop() .
As soon as the instance began to work ( start() ), it will wait for connections. When a connection occurs, on_connect() called. If there are no errors, then a read operation ( do_read() ) is called. Once the read operation is complete, you can interpret the message; most likely in your on_read() application will look different. When you send a message, you have to copy it to a buffer, and then send it, as done in do_write() , because, again, the buffer must survive the asynchronous write operation. And the last note - when recording, remember that you must specify how much to write, otherwise the entire buffer will be sent.

Summary


Network API is very extensive. This chapter has been implemented as a link to which you should return at the time when you implement your own network application.
Boost.Asio introduces the concept of endpoints that you can think of as an IP address and port. If you do not know the exact IP address, then you can use a resolver object to include a host name, such as www.yahoo.com, instead of one or more IP addresses.
We also looked at the socket classes that are in the core of the API. Boost.Asio provides implementations for TCP, UDP, and ICMP, but you can extend it for your own protocols, although this is not a job for the faint of heart.
Asynchronous programming is a necessary evil. You have seen why sometimes this is needed, especially when writing servers. Usually you only need to call service.run() to create an asynchronous loop, but sometimes you need to go further and then you can use run_one(), poll() , or poll_one() .
When using the asynchronous approach, you can have your own asynchronous functions, just use service.post() or service.dispatch() .
Finally, in order for both the socket and the buffer (for reading or writing) to remain valid for the entire period of the asynchronous operation (until completion), we must take special precautions. Your connection class should enabled_shared_from_this from enabled_shared_from_this , contain all the necessary buffers within it and, at each asynchronous call, pass the shared pointer to this operation.
The next chapter will have a lot of practical work; many application coding when implementing such applications as client / server echo.

Thank you all, until we meet again!

Source: https://habr.com/ru/post/195006/


All Articles