📜 ⬆️ ⬇️

"Boost.Asio C ++ Network Programming". Chapter 4: Client and Server

Hello!
I continue to translate the book of John Torjo "Boost.Asio C ++ Network Programming".

Content:


In this chapter, we are going to delve into creating non-trivial client / server applications using Boost.Asio. You can run and test them, and once you understand them, you can use them as a basis for creating your own applications.
')

In the following applications:

The client can make the following requests:

For interest, add a few frills:


Synchronous server / client


First, we implement a synchronous application. You will see that the code is simple and easy to understand. However, the network part must be executed in a separate thread, since all network calls are blocked.

Synchronous client

The synchronous client, as you expected, does everything consistently; connects to the server, enters it, and then performs a communication cycle, namely, fall asleep, make a request, read the server's response, fall asleep again, and so on.



Since we are making the synchronous option, this allows us to make some things simpler. First, connect to the server; let's do it in the form of a loop, for example, like this:

 ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 8001); void run_client(const std::string & client_name) { talk_to_svr client(client_name); try { client.connect(ep); client.loop(); } catch(boost::system::system_error & err) { std::cout << "client terminated " << std::endl; } } 

The following example is the class talk_to_svr :

 struct talk_to_svr { talk_to_svr(const std::string & username): sock_(service), started_(true), username_(username) {} void connect(ip::tcp::endpoint ep) { sock_.connect(ep); } void loop() { write("login " + username_ + "\n"); read_answer(); while ( started_) { write_request(); read_answer(); boost::this_thread::sleep(millisec(rand() % 7000)); } } std::string username() const { return username_; } ... private: ip::tcp::socket sock_; enum { max_msg = 1024 }; int already_read_; char buff_[max_msg]; bool started_; std::string username_; }; 

In the loop, we just ping, read the response from the server, and fall asleep. We fall asleep indefinitely (sometimes more than 5 seconds), so at some point the server will shut us down:

 void write_request() { write("ping\n"); } void read_answer() { already_read_ = 0; read(sock_, buffer(buff_), boost::bind(&talk_to_svr::read_complete, this, _1, _2)); process_msg(); } void process_msg() { std::string msg(buff_, already_read_); if ( msg.find("login ") == 0) on_login(); else if ( msg.find("ping") == 0) on_ping(msg); else if ( msg.find("clients ") == 0) on_clients(msg); else std::cerr << "invalid msg " << msg << std::endl; } 

To read the response, we use read_complete (which was discussed a lot in the last chapter) to make sure that we read the symbol '\ n'. The logic is contained in the process_msg() function, where we read the client's response and direct it to the correct function:

 void on_login() { do_ask_clients(); } void on_ping(const std::string & msg) { std::istringstream in(msg); std::string answer; in >> answer >> answer; if ( answer == "client_list_changed") do_ask_clients(); } void on_clients(const std::string & msg) { std::string clients = msg.substr(8); std::cout << username_ << ", new client list:" << clients; } void do_ask_clients() { write("ask_clients\n"); read_answer(); } void write(const std::string & msg) { sock_.write_some(buffer(msg)); } size_t read_complete(const boost::system::error_code & err, size_t bytes) { // ... same as before } 

When reading the response from the server in our ping, if we get client_list_changed , then we again make a request for a sheet of clients.

Synchronous server

Synchronous server is also quite simple. It needs two threads, one to listen to new customers, the other to handle existing ones. It cannot use a single thread, waiting for a new client is a blocking operation, so we need an additional stream to process existing clients.



As expected, the server is a little more difficult to write than the client. On the one hand, it must manage all connected clients. Since we are writing a synchronous version of the server, we need at least two streams, one of which accepts new clients (since accept() blocking operation), and the other is responsible for existing ones:

 void accept_thread() { ip::tcp::acceptor acceptor(service, ip::tcp::endpoint(ip::tcp::v4(), 8001)); while ( true) { client_ptr new_( new talk_to_client); acceptor.accept(new_->sock()); boost::recursive_mutex::scoped_lock lk(cs); clients.push_back(new_); } } void handle_clients_thread() { while ( true) { boost::this_thread::sleep( millisec(1)); boost::recursive_mutex::scoped_lock lk(cs); for(array::iterator b = clients.begin(),e = clients.end(); b != e; ++b) (*b)->answer_to_client(); // erase clients that timed out clients.erase(std::remove_if(clients.begin(), clients.end(), boost::bind(&talk_to_client::timed_out,_1)), clients.end()); } } int main(int argc, char* argv[]) { boost::thread_group threads; threads.create_thread(accept_thread); threads.create_thread(handle_clients_thread); threads.join_all(); } 

We need a list of clients to handle incoming requests from them.
Each instance of talk_to_client has a socket. It does not have a copy constructor, so if you want to stuff it into std::vector , then you need to get a shared pointer on it. There are two ways to do this: either inside the talk_to_client get a shared pointer to the socket, and then make an array of instances of talk_to_client or when there is an instance of talk_to_client with a socket by value and get an array of shared pointer on talk_to_client. I chose the latter, but you can go another way:

 typedef boost::shared_ptr<talk_to_client> client_ptr; typedef std::vector<client_ptr> array; array clients; boost::recursive_mutex cs; // thread-safe access to clients array 

The main talk_to_client code looks like this:

 struct talk_to_client : boost::enable_shared_from_this<talk_to_client> { talk_to_client() { ... } std::string username() const { return username_; } void answer_to_client() { try { read_request(); process_request(); } catch ( boost::system::system_error&) { stop(); } if ( timed_out()) stop(); } void set_clients_changed() { clients_changed_ = true; } ip::tcp::socket & sock() { return sock_; } bool timed_out() const { ptime now = microsec_clock::local_time(); long long ms = (now - last_ping).total_milliseconds(); return ms > 5000 ; } void stop() { boost::system::error_code err; sock_.close(err); } void read_request() { if ( sock_.available()) already_read_ += sock_.read_some( buffer(buff_ + already_read_, max_msg - already_read_)); } ... private: // ... same as in Synchronous Client bool clients_changed_; ptime last_ping; }; 

The above code is pretty obvious. The most important function is read_request() . Reading will occur only if there is data, so the server will never be blocked:

 void process_request() { bool found_enter = std::find(buff_, buff_ + already_read_, '\n') < buff_ + already_read_; if ( !found_enter) return; // message is not full // process the msg last_ping = microsec_clock::local_time(); size_t pos = std::find(buff_, buff_ + already_read_, '\n') - buff_; std::string msg(buff_, pos); std::copy(buff_ + already_read_, buff_ + max_msg, buff_); already_read_ -= pos + 1; if ( msg.find("login ") == 0) on_login(msg); else if ( msg.find("ping") == 0) on_ping(); else if ( msg.find("ask_clients") == 0) on_clients(); else std::cerr << "invalid msg " << msg << std::endl; } void on_login(const std::string & msg) { std::istringstream in(msg); in >> username_ >> username_; write("login ok\n"); update_clients_changed(); } void on_ping() { write(clients_changed_ ? "ping client_list_changed\n" : "ping ok\n"); clients_changed_ = false; } void on_clients() { std::string msg; { boost::recursive_mutex::scoped_lock lk(cs); for( array::const_iterator b = clients.begin(), e = clients.end() ;b != e; ++b) msg += (*b)->username() + " "; } write("clients " + msg + "\n"); } void write(const std::string & msg) { sock_.write_some(buffer(msg)); } 

Take a look at process_request() . After we considered the data that was available, we should check whether we found_enteris message to the end (if yes, the found_enteris set to true). If this is the case, then we protect ourselves from reading, maybe more than one message (after the '\ n' character, nothing will be saved to the buffer), and then we interpret the completely read message. The rest of the code is pretty simple.

Asynchronous server / client


And now the most interesting (and difficult) part, let's go asynchronously.

Asynchronous client

Things will now be considered a little more complicated, but, of course, manageable. And we will have an application that is not blocked.



You should already understand the following code:

 #define MEM_FN(x) boost::bind(&self_type::x, shared_from_this()) #define MEM_FN1(x,y) boost::bind(&self_type::x, shared_from_this(),y) #define MEM_FN2(x,y,z) boost::bind(&self_type::x, shared_from_this(),y,z) class talk_to_svr : public boost::enable_shared_from_this<talk_to_svr>, boost::noncopyable { typedef talk_to_svr self_type; talk_to_svr(const std::string & username) : sock_(service), started_(true), username_(username), timer_(service) {} void start(ip::tcp::endpoint ep) { sock_.async_connect(ep, MEM_FN1(on_connect,_1)); } public: typedef boost::system::error_code error_code; typedef boost::shared_ptr<talk_to_svr> ptr; static ptr start(ip::tcp::endpoint ep, const std::string & username) { ptr new_(new talk_to_svr(username)); new_->start(ep); return new_; } void stop() { if ( !started_) return; started_ = false; sock_.close(); } bool started() { return started_; } ... private: size_t read_complete(const boost::system::error_code & err, size_t bytes) { if ( err) return 0; bool found = std::find(read_buffer_, read_buffer_ + bytes, '\n') < read_buffer_ + bytes; return found ? 0 : 1; } private: ip::tcp::socket sock_; enum { max_msg = 1024 }; char read_buffer_[max_msg]; char write_buffer_[max_msg]; bool started_; std::string username_; deadline_timer timer_; }; 

You will see an additional deadline_timer timer function for pinging the server; and again, we will check the connection to the server at a random point in time.
Well, now let's see what the main logic of the class looks like:

 void on_connect(const error_code & err) { if ( !err) do_write("login " + username_ + "\n"); else stop(); } void on_read(const error_code & err, size_t bytes) { if ( err) stop(); if ( !started() ) return; // process the msg std::string msg(read_buffer_, bytes); if ( msg.find("login ") == 0) on_login(); else if ( msg.find("ping") == 0) on_ping(msg); else if ( msg.find("clients ") == 0) on_clients(msg); } void on_login() { do_ask_clients(); } void on_ping(const std::string & msg) { std::istringstream in(msg); std::string answer; in >> answer >> answer; if ( answer == "client_list_changed") do_ask_clients(); else postpone_ping(); } void on_clients(const std::string & msg) { std::string clients = msg.substr(8); std::cout << username_ << ", new client list:" << clients ; postpone_ping(); } 

In on_read() in the first two lines of the code everything is done very beautifully. In the first line, we check if there is an error, if so, we stop. In the second line, we check whether we stopped (before or just that), if so, we return. Otherwise, if all is well, we process the incoming message.
Finally, the do_* functions do_* as follows:

 void do_ping() { do_write("ping\n"); } void postpone_ping() { timer_.expires_from_now(boost::posix_time::millisec(rand() % 7000)); timer_.async_wait( MEM_FN(do_ping)); } void do_ask_clients() { do_write("ask_clients\n"); } void on_write(const error_code & err, size_t bytes) { do_read(); } void do_read() { async_read(sock_, buffer(read_buffer_), MEM_FN2(read_complete,_1,_2), MEM_FN2(on_read,_1,_2)); } void do_write(const std::string & msg) { if ( !started() ) return; std::copy(msg.begin(), msg.end(), write_buffer_); sock_.async_write_some( buffer(write_buffer_, msg.size()), MEM_FN2(on_write,_1,_2)); } 

Please note that each read operation causes a ping:


Asynchronous server

The scheme is quite complicated, you can see that four arrows go from Boost.Asio to on_accept, on_read, on_write and on_check_ping . Basically, this means that you will never know by calling which of these asynchronous operations will all end, but you know for sure that this will be one of them.



So, we work asynchronously, so we can work in one thread. Receiving customers is the easiest part, as shown in the following code snippet:

 ip::tcp::acceptor acceptor(service, ip::tcp::endpoint(ip::tcp::v4(), 8001)); void handle_accept(talk_to_client::ptr client, const error_code & err) { client->start(); talk_to_client::ptr new_client = talk_to_client::new_(); acceptor.async_accept(new_client->sock(), boost::bind(handle_accept,new_client,_1)); } int main(int argc, char* argv[]) { talk_to_client::ptr client = talk_to_client::new_(); acceptor.async_accept(client->sock(), boost::bind(handle_accept,client,_1)); service.run(); } 

The code above will always wait for new clients asynchronously (each new client connection will cause a different asynchronous wait).
We have to monitor the client list changed event (a new client has connected or one of the clients received the list and disconnected) and notify other clients when this happens. Thus, we need to store an array of clients, otherwise there would be no need for this array if you would not like to know all the connected clients at a given time:

 class talk_to_client; typedef boost::shared_ptr<talk_to_client> client_ptr; typedef std::vector<client_ptr> array; array clients; 

The skeleton of the connection class is as follows:

 class talk_to_client : public boost::enable_shared_from_this<talk_to_ client>, boost::noncopyable { talk_to_client() { ... } public: typedef boost::system::error_code error_code; typedef boost::shared_ptr<talk_to_client> ptr; void start() { started_ = true; clients.push_back( shared_from_this()); last_ping = boost::posix_time::microsec_clock::local_time(); do_read(); // first, we wait for client to login } static ptr new_() { ptr new_(new talk_to_client); return new_; } void stop() { if ( !started_) return; started_ = false; sock_.close(); ptr self = shared_from_this(); array::iterator it = std::find(clients.begin(), clients.end(), self); clients.erase(it); update_clients_changed(); } bool started() const { return started_; } ip::tcp::socket & sock() { return sock_;} std::string username() const { return username_; } void set_clients_changed() { clients_changed_ = true; } ... private: ip::tcp::socket sock_; enum { max_msg = 1024 }; char read_buffer_[max_msg]; char write_buffer_[max_msg]; bool started_; std::string username_; deadline_timer timer_; boost::posix_time::ptime last_ping; bool clients_changed_; }; 

I call talk_to_client or talk_to_server from the connection class to clarify what I'm saying.
We will have to use the previous code now; it is similar to what we used for the client application. We have an additional stop() function that removes a connected client from an array of clients.
The server continuously expects asynchronous read operations:

 void on_read(const error_code & err, size_t bytes) { if ( err) stop(); if ( !started() ) return; std::string msg(read_buffer_, bytes); if ( msg.find("login ") == 0) on_login(msg); else if ( msg.find("ping") == 0) on_ping(); else if ( msg.find("ask_clients") == 0) on_clients(); } void on_login(const std::string & msg) { std::istringstream in(msg); in >> username_ >> username_; do_write("login ok\n"); update_clients_changed(); } void on_ping() { do_write(clients_changed_ ? "ping client_list_changed\n" : "ping ok\n"); clients_changed_ = false; } void on_clients() { std::string msg; for(array::const_iterator b =clients.begin(),e =clients.end(); b != e; ++b) msg += (*b)->username() + " "; do_write("clients " + msg + "\n"); } 

The code is pretty simple; one thing is that when a new client enters the system, we call update_clients_changed() , which sets clients_changed_ to true for all clients.
As soon as he receives a request, he immediately responds to it, as shown in the following code snippet:

 void do_ping() { do_write("ping\n"); } void do_ask_clients() { do_write("ask_clients\n"); } void on_write(const error_code & err, size_t bytes) { do_read(); } void do_read() { async_read(sock_, buffer(read_buffer_), MEM_FN2(read_complete,_1,_2), MEM_FN2(on_read,_1,_2)); post_check_ping(); } void do_write(const std::string & msg) { if ( !started() ) return; std::copy(msg.begin(), msg.end(), write_buffer_); sock_.async_write_some( buffer(write_buffer_, msg.size()), MEM_FN2(on_write,_1,_2)); } size_t read_complete(const boost::system::error_code & err, size_t bytes) { // ... as before } 

At the end of each write operation, on_write() is called, which calls another asynchronous read, and thus waits for the request — responds to it, the cycle continues until the client turns off or the timer runs.
Since each reading begins with an asynchronous wait for 5 seconds, you can see if the client has a timer. If so, then we close the connection:

 void on_check_ping() { ptime now = microsec_clock::local_time(); if ( (now - last_ping).total_milliseconds() > 5000) stop(); last_ping = boost::posix_time::microsec_clock::local_time(); } void post_check_ping() { timer_.expires_from_now(boost::posix_time::millisec(5000)); timer_.async_wait( MEM_FN(on_check_ping)); } 

That's the whole server. You can start it and start working with it!

Summary


In this chapter, we looked at how to write a few basic client / server applications. We avoided such traps as memory leaks and deadlocks. All programs are suitable as a basis for your future applications, they can be expanded and adapted.
In the next chapter, we will get a deeper understanding of synchronous versus asynchronous, the differences when using Boost.Asio; Let's see how you can connect your own asynchronous operation.

Resources for this article: link

Thank you all for your attention, until we meet again!

Source: https://habr.com/ru/post/195794/


All Articles