ping_ok
or ping client_list_chaned
(in the latter case, the client re-requests a list of connected clients). ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 8001); void run_client(const std::string & client_name) { talk_to_svr client(client_name); try { client.connect(ep); client.loop(); } catch(boost::system::system_error & err) { std::cout << "client terminated " << std::endl; } }
talk_to_svr
: struct talk_to_svr { talk_to_svr(const std::string & username): sock_(service), started_(true), username_(username) {} void connect(ip::tcp::endpoint ep) { sock_.connect(ep); } void loop() { write("login " + username_ + "\n"); read_answer(); while ( started_) { write_request(); read_answer(); boost::this_thread::sleep(millisec(rand() % 7000)); } } std::string username() const { return username_; } ... private: ip::tcp::socket sock_; enum { max_msg = 1024 }; int already_read_; char buff_[max_msg]; bool started_; std::string username_; };
void write_request() { write("ping\n"); } void read_answer() { already_read_ = 0; read(sock_, buffer(buff_), boost::bind(&talk_to_svr::read_complete, this, _1, _2)); process_msg(); } void process_msg() { std::string msg(buff_, already_read_); if ( msg.find("login ") == 0) on_login(); else if ( msg.find("ping") == 0) on_ping(msg); else if ( msg.find("clients ") == 0) on_clients(msg); else std::cerr << "invalid msg " << msg << std::endl; }
read_complete
(which was discussed a lot in the last chapter) to make sure that we read the symbol '\ n'. The logic is contained in the process_msg()
function, where we read the client's response and direct it to the correct function: void on_login() { do_ask_clients(); } void on_ping(const std::string & msg) { std::istringstream in(msg); std::string answer; in >> answer >> answer; if ( answer == "client_list_changed") do_ask_clients(); } void on_clients(const std::string & msg) { std::string clients = msg.substr(8); std::cout << username_ << ", new client list:" << clients; } void do_ask_clients() { write("ask_clients\n"); read_answer(); } void write(const std::string & msg) { sock_.write_some(buffer(msg)); } size_t read_complete(const boost::system::error_code & err, size_t bytes) { // ... same as before }
client_list_changed
, then we again make a request for a sheet of clients.accept()
blocking operation), and the other is responsible for existing ones: void accept_thread() { ip::tcp::acceptor acceptor(service, ip::tcp::endpoint(ip::tcp::v4(), 8001)); while ( true) { client_ptr new_( new talk_to_client); acceptor.accept(new_->sock()); boost::recursive_mutex::scoped_lock lk(cs); clients.push_back(new_); } } void handle_clients_thread() { while ( true) { boost::this_thread::sleep( millisec(1)); boost::recursive_mutex::scoped_lock lk(cs); for(array::iterator b = clients.begin(),e = clients.end(); b != e; ++b) (*b)->answer_to_client(); // erase clients that timed out clients.erase(std::remove_if(clients.begin(), clients.end(), boost::bind(&talk_to_client::timed_out,_1)), clients.end()); } } int main(int argc, char* argv[]) { boost::thread_group threads; threads.create_thread(accept_thread); threads.create_thread(handle_clients_thread); threads.join_all(); }
talk_to_client
has a socket. It does not have a copy constructor, so if you want to stuff it into std::vector
, then you need to get a shared pointer on it. There are two ways to do this: either inside the talk_to_client
get a shared pointer to the socket, and then make an array of instances of talk_to_client
or when there is an instance of talk_to_client with a socket by value and get an array of shared pointer on talk_to_client. I chose the latter, but you can go another way: typedef boost::shared_ptr<talk_to_client> client_ptr; typedef std::vector<client_ptr> array; array clients; boost::recursive_mutex cs; // thread-safe access to clients array
talk_to_client
code looks like this: struct talk_to_client : boost::enable_shared_from_this<talk_to_client> { talk_to_client() { ... } std::string username() const { return username_; } void answer_to_client() { try { read_request(); process_request(); } catch ( boost::system::system_error&) { stop(); } if ( timed_out()) stop(); } void set_clients_changed() { clients_changed_ = true; } ip::tcp::socket & sock() { return sock_; } bool timed_out() const { ptime now = microsec_clock::local_time(); long long ms = (now - last_ping).total_milliseconds(); return ms > 5000 ; } void stop() { boost::system::error_code err; sock_.close(err); } void read_request() { if ( sock_.available()) already_read_ += sock_.read_some( buffer(buff_ + already_read_, max_msg - already_read_)); } ... private: // ... same as in Synchronous Client bool clients_changed_; ptime last_ping; };
read_request()
. Reading will occur only if there is data, so the server will never be blocked: void process_request() { bool found_enter = std::find(buff_, buff_ + already_read_, '\n') < buff_ + already_read_; if ( !found_enter) return; // message is not full // process the msg last_ping = microsec_clock::local_time(); size_t pos = std::find(buff_, buff_ + already_read_, '\n') - buff_; std::string msg(buff_, pos); std::copy(buff_ + already_read_, buff_ + max_msg, buff_); already_read_ -= pos + 1; if ( msg.find("login ") == 0) on_login(msg); else if ( msg.find("ping") == 0) on_ping(); else if ( msg.find("ask_clients") == 0) on_clients(); else std::cerr << "invalid msg " << msg << std::endl; } void on_login(const std::string & msg) { std::istringstream in(msg); in >> username_ >> username_; write("login ok\n"); update_clients_changed(); } void on_ping() { write(clients_changed_ ? "ping client_list_changed\n" : "ping ok\n"); clients_changed_ = false; } void on_clients() { std::string msg; { boost::recursive_mutex::scoped_lock lk(cs); for( array::const_iterator b = clients.begin(), e = clients.end() ;b != e; ++b) msg += (*b)->username() + " "; } write("clients " + msg + "\n"); } void write(const std::string & msg) { sock_.write_some(buffer(msg)); }
process_request()
. After we considered the data that was available, we should check whether we found_enteris
message to the end (if yes, the found_enteris
set to true). If this is the case, then we protect ourselves from reading, maybe more than one message (after the '\ n' character, nothing will be saved to the buffer), and then we interpret the completely read message. The rest of the code is pretty simple. #define MEM_FN(x) boost::bind(&self_type::x, shared_from_this()) #define MEM_FN1(x,y) boost::bind(&self_type::x, shared_from_this(),y) #define MEM_FN2(x,y,z) boost::bind(&self_type::x, shared_from_this(),y,z) class talk_to_svr : public boost::enable_shared_from_this<talk_to_svr>, boost::noncopyable { typedef talk_to_svr self_type; talk_to_svr(const std::string & username) : sock_(service), started_(true), username_(username), timer_(service) {} void start(ip::tcp::endpoint ep) { sock_.async_connect(ep, MEM_FN1(on_connect,_1)); } public: typedef boost::system::error_code error_code; typedef boost::shared_ptr<talk_to_svr> ptr; static ptr start(ip::tcp::endpoint ep, const std::string & username) { ptr new_(new talk_to_svr(username)); new_->start(ep); return new_; } void stop() { if ( !started_) return; started_ = false; sock_.close(); } bool started() { return started_; } ... private: size_t read_complete(const boost::system::error_code & err, size_t bytes) { if ( err) return 0; bool found = std::find(read_buffer_, read_buffer_ + bytes, '\n') < read_buffer_ + bytes; return found ? 0 : 1; } private: ip::tcp::socket sock_; enum { max_msg = 1024 }; char read_buffer_[max_msg]; char write_buffer_[max_msg]; bool started_; std::string username_; deadline_timer timer_; };
deadline_timer
timer function for pinging the server; and again, we will check the connection to the server at a random point in time. void on_connect(const error_code & err) { if ( !err) do_write("login " + username_ + "\n"); else stop(); } void on_read(const error_code & err, size_t bytes) { if ( err) stop(); if ( !started() ) return; // process the msg std::string msg(read_buffer_, bytes); if ( msg.find("login ") == 0) on_login(); else if ( msg.find("ping") == 0) on_ping(msg); else if ( msg.find("clients ") == 0) on_clients(msg); } void on_login() { do_ask_clients(); } void on_ping(const std::string & msg) { std::istringstream in(msg); std::string answer; in >> answer >> answer; if ( answer == "client_list_changed") do_ask_clients(); else postpone_ping(); } void on_clients(const std::string & msg) { std::string clients = msg.substr(8); std::cout << username_ << ", new client list:" << clients ; postpone_ping(); }
on_read()
in the first two lines of the code everything is done very beautifully. In the first line, we check if there is an error, if so, we stop. In the second line, we check whether we stopped (before or just that), if so, we return. Otherwise, if all is well, we process the incoming message.do_*
functions do_*
as follows: void do_ping() { do_write("ping\n"); } void postpone_ping() { timer_.expires_from_now(boost::posix_time::millisec(rand() % 7000)); timer_.async_wait( MEM_FN(do_ping)); } void do_ask_clients() { do_write("ask_clients\n"); } void on_write(const error_code & err, size_t bytes) { do_read(); } void do_read() { async_read(sock_, buffer(read_buffer_), MEM_FN2(read_complete,_1,_2), MEM_FN2(on_read,_1,_2)); } void do_write(const std::string & msg) { if ( !started() ) return; std::copy(msg.begin(), msg.end(), write_buffer_); sock_.async_write_some( buffer(write_buffer_, msg.size()), MEM_FN2(on_write,_1,_2)); }
read
operation causes a ping:read
operation completes, on_read()
on_read()
redirected to on_login(), on_ping()
, or on_clients()
on_accept, on_read, on_write
and on_check_ping
. Basically, this means that you will never know by calling which of these asynchronous operations will all end, but you know for sure that this will be one of them. ip::tcp::acceptor acceptor(service, ip::tcp::endpoint(ip::tcp::v4(), 8001)); void handle_accept(talk_to_client::ptr client, const error_code & err) { client->start(); talk_to_client::ptr new_client = talk_to_client::new_(); acceptor.async_accept(new_client->sock(), boost::bind(handle_accept,new_client,_1)); } int main(int argc, char* argv[]) { talk_to_client::ptr client = talk_to_client::new_(); acceptor.async_accept(client->sock(), boost::bind(handle_accept,client,_1)); service.run(); }
client list changed
event (a new client has connected or one of the clients received the list and disconnected) and notify other clients when this happens. Thus, we need to store an array of clients, otherwise there would be no need for this array if you would not like to know all the connected clients at a given time: class talk_to_client; typedef boost::shared_ptr<talk_to_client> client_ptr; typedef std::vector<client_ptr> array; array clients;
connection
class is as follows: class talk_to_client : public boost::enable_shared_from_this<talk_to_ client>, boost::noncopyable { talk_to_client() { ... } public: typedef boost::system::error_code error_code; typedef boost::shared_ptr<talk_to_client> ptr; void start() { started_ = true; clients.push_back( shared_from_this()); last_ping = boost::posix_time::microsec_clock::local_time(); do_read(); // first, we wait for client to login } static ptr new_() { ptr new_(new talk_to_client); return new_; } void stop() { if ( !started_) return; started_ = false; sock_.close(); ptr self = shared_from_this(); array::iterator it = std::find(clients.begin(), clients.end(), self); clients.erase(it); update_clients_changed(); } bool started() const { return started_; } ip::tcp::socket & sock() { return sock_;} std::string username() const { return username_; } void set_clients_changed() { clients_changed_ = true; } ... private: ip::tcp::socket sock_; enum { max_msg = 1024 }; char read_buffer_[max_msg]; char write_buffer_[max_msg]; bool started_; std::string username_; deadline_timer timer_; boost::posix_time::ptime last_ping; bool clients_changed_; };
talk_to_client
or talk_to_server
from the connection
class to clarify what I'm saying.stop()
function that removes a connected client from an array of clients. void on_read(const error_code & err, size_t bytes) { if ( err) stop(); if ( !started() ) return; std::string msg(read_buffer_, bytes); if ( msg.find("login ") == 0) on_login(msg); else if ( msg.find("ping") == 0) on_ping(); else if ( msg.find("ask_clients") == 0) on_clients(); } void on_login(const std::string & msg) { std::istringstream in(msg); in >> username_ >> username_; do_write("login ok\n"); update_clients_changed(); } void on_ping() { do_write(clients_changed_ ? "ping client_list_changed\n" : "ping ok\n"); clients_changed_ = false; } void on_clients() { std::string msg; for(array::const_iterator b =clients.begin(),e =clients.end(); b != e; ++b) msg += (*b)->username() + " "; do_write("clients " + msg + "\n"); }
update_clients_changed()
, which sets clients_changed_
to true
for all clients. void do_ping() { do_write("ping\n"); } void do_ask_clients() { do_write("ask_clients\n"); } void on_write(const error_code & err, size_t bytes) { do_read(); } void do_read() { async_read(sock_, buffer(read_buffer_), MEM_FN2(read_complete,_1,_2), MEM_FN2(on_read,_1,_2)); post_check_ping(); } void do_write(const std::string & msg) { if ( !started() ) return; std::copy(msg.begin(), msg.end(), write_buffer_); sock_.async_write_some( buffer(write_buffer_, msg.size()), MEM_FN2(on_write,_1,_2)); } size_t read_complete(const boost::system::error_code & err, size_t bytes) { // ... as before }
on_write()
is called, which calls another asynchronous read, and thus waits for the request — responds to it, the cycle continues until the client turns off or the timer runs. void on_check_ping() { ptime now = microsec_clock::local_time(); if ( (now - last_ping).total_milliseconds() > 5000) stop(); last_ping = boost::posix_time::microsec_clock::local_time(); } void post_check_ping() { timer_.expires_from_now(boost::posix_time::millisec(5000)); timer_.async_wait( MEM_FN(on_check_ping)); }
Source: https://habr.com/ru/post/195794/
All Articles