io_service service; ip::tcp::socket sock(service); ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 8001); void on_write(boost::system::error_code err, size_t bytes) { char read_buff[512]; read(sock, buffer(read_buff)); } async_write(sock, buffer("echo"), on_write);
char buff_[512]; // synchronous read read(sock_, buffer(buff_), boost::bind(&read_complete, this, _1, _2)); // asynchronous read async_read(sock_ buffer(buff_),MEM_FN2(read_complete,_1,_2), MEM_FN2(on_read,_1,_2)); size_t read_complete(const boost::system::error_code & err, size_t bytes) { if ( err) return 0; already_read_ = bytes; bool found = std::find(buff_, buff_ + bytes, '\n') < buff_ + bytes; // we read one-by-one until we get to enter, no buffering return found ? 0 : 1; }
void loop() { // read answer to our login write("login " + username_ + "\n"); read_answer(); while ( started_) { write_request(); read_answer(); ... } }
void loop() { while ( started_) { read_notification(); write_answer(); } } void read_notification() { already_read_ = 0; read(sock_, buffer(buff_), boost::bind(&talk_to_svr::read_complete, this, _1, _2)); process_notification(); } void process_notification() { // ... see what the notification is, and prepare answer }
void read_request() { if ( sock_.available()) already_read_ += sock_.read_some(buffer(buff_ + already_read_, max_msg –already_read_)); }
void process_request() { bool found_enter = std::find(buff_, buff_ + already_read_, '\n') < buff_ + already_read_; if ( !found_enter) return; // message is not full size_t pos = std::find(buff_, buff_ + already_read_, '\n') - buff_; std::string msg(buff_, pos); ... if ( msg.find("login ") == 0) on_login(msg); else if ( msg.find("ping") == 0) on_ping(); else ... }
typedef std::vector<client_ptr> array; array clients; array notify; std::string notify_msg; void on_new_client() { // on a new client, we notify all clients of this event notify = clients; std::ostringstream msg; msg << "client count " << clients.size(); notify_msg = msg.str(); notify_clients(); } void notify_clients() { for ( array::const_iterator b = notify.begin(), e = notify.end(); b != e; ++b) { (*b)->sock_.write_some(notify_msg); } }
on_new_client()
function is a function of a single event, where we must notify all clients about it. notify_clients
is a function that will notify clients that are subscribed to this event. The server sends a message, but does not wait for a response from each client, as this can lead to blocking. When a response comes from the client, the client can tell us that this is exactly the answer to our notification (and we can process it correctly). void accept_thread() { ip::tcp::acceptor acceptor(service, ip::tcp::endpoint(ip::tcp::v4(),8001)); while ( true) { client_ptr new_( new talk_to_client); acceptor.accept(new_->sock()); boost::recursive_mutex::scoped_lock lk(cs); clients.push_back(new_); } }
talk_to_client
class talk_to_client
become thread safe. Then you will need a special mechanism to know which threads handle which clients. To do this, you have two options:answer_to_client
function, shows how the last script can be implemented: struct talk_to_client : boost::enable_shared_from_this<talk_to_client> { ... void answer_to_client() { try { read_request(); process_request(); } catch ( boost::system::system_error&) { stop(); } } };
struct talk_to_client : boost::enable_shared_from_this<talk_to_client> { boost::recursive_mutex cs; boost::recursive_mutex cs_ask; bool in_process; void answer_to_client() { { boost::recursive_mutex::scoped_lock lk(cs_ask); if ( in_process) return; in_process = true; } { boost::recursive_mutex::scoped_lock lk(cs); try { read_request(); process_request(); } catch ( boost::system::system_error&) { stop(); } } { boost::recursive_mutex::scoped_lock lk(cs_ask); in_process = false; } } };
in_process
instance will be set to true
, and other threads will ignore this client. An added bonus is that the handle_clients_thread()
function cannot be modified; you can simply create as many handle_clients_thread()
functions as you like.async_read
and async_write
.service.run()
function does not complete its activity. void on_connect() { do_read(); } void do_read() { async_read(sock_, buffer(read_buffer_), MEM_FN2(read_complete,_1,_2), MEM_FN2(on_read,_1,_2)); } void on_read(const error_code & err, size_t bytes) { if ( err) stop(); if ( !started() ) return; std::string msg(read_buffer_, bytes); if ( msg.find("clients") == 0) on_clients(msg); else ... } void on_clients(const std::string & msg) { std::string clients = msg.substr(8); std::cout << username_ << ", new client list:" << clients ; do_write("clients ok\n"); }
on_[event]
function ends it and writes a response to the server.service.run()
does not stop its operation.talk_to_client
class: void start() { ... do_read(); // first, we wait for client to login } void on_read(const error_code & err, size_t bytes) { std::string msg(read_buffer_, bytes); if ( msg.find("login ") == 0) on_login(msg); else if ( msg.find("ping") == 0) on_ping(); else ... } void on_login(const std::string & msg) { std::istringstream in(msg); in >> username_ >> username_; do_write("login ok\n"); } void do_write(const std::string & msg) { std::copy(msg.begin(), msg.end(), write_buffer_); sock_.async_write_some( buffer(write_buffer_, msg.size()), MEM_FN2(on_write,_1,_2)); } void on_write(const error_code & err, size_t bytes) { do_read(); }
void start() { ... on_new_client_event(); } void on_new_client_event() { std::ostringstream msg; msg << "client count " << clients.size(); for ( array::const_iterator b = clients.begin(), e = clients.end();b != e; ++b) (*b)->do_write(msg.str()); } void on_read(const error_code & err, size_t bytes) { std::string msg(read_buffer_, bytes); // basically here, we only acknowledge // that our clients received our notifications } void do_write(const std::string & msg) { std::copy(msg.begin(), msg.end(), write_buffer_); sock_.async_write_some( buffer(write_buffer_, msg.size()), MEM_FN2(on_write,_1,_2)); } void on_write(const error_code & err, size_t bytes) { do_read(); }
on_new_client_event
, messages will be sent to all clients that need to be informed about this event. When they answer, we will understand that they have processed the received event. Please note that we will never finish waiting for events asynchronously (therefore service.run()
will not finish working), since we are always waiting for new customers.main()
function: int main() { talk_to_client::ptr client = talk_to_client::new_(); acc.async_accept(client->sock(), boost::bind(handle_accept,client,_1)); service.run(); }
boost::thread_group threads; void listen_thread() { service.run(); } void start_listen(int thread_count) { for ( int i = 0; i < thread_count; ++i) threads.create_thread( listen_thread); } int main(int argc, char* argv[]) { talk_to_client::ptr client = talk_to_client::new_(); acc.async_accept(client->sock(), boost::bind(handle_accept,client,_1)); start_listen(100); threads.join_all(); }
async_*
in thread A, the procedure for its completion can be called in thread B (as long as thread B calls service.run()
). This in itself is not a problem. As long as you follow the logical sequence, that is, from async_read()
to on_read()
, from on_read()
to process_reques
t, from process_request
to async_write()
, from async_write()
to on_write()
, from on_write()
to async_read()
and there are no public
functions that would call your talk_to_client
class, although different functions can be called in different threads, they will still be called sequentially. Thus, mutexes are not needed. void do_read() { async_read(sock_, buffer(read_buffer_), MEM_FN2(read_complete,_1,_2), MEM_FN2(on_read,_1,_2)); post_check_ping(); } void post_check_ping() { timer_.expires_from_now(boost::posix_time::millisec(5000)); timer_.async_wait( MEM_FN(on_check_ping)); }
talk_to_client
using mutexes. In addition, we have an array of clients that we refer to several times in the code, which also needs its own mutex.update_clients_changed()
function: void update_clients_changed() { array copy; { boost::recursive_mutex::scoped_lock lk(clients_cs); copy = clients; } for( array::iterator b = copy.begin(), e = copy.end(); b != e; ++b) (*b)->set_clients_changed(); }
clients_cs
and client cs_
mutex to be locked at the same time. void my_func() { ... } service.post(my_func);
my_func
is called on one of the threads that call service.run()
. You can also run an asynchronous function and make a terminating handler that will tell you when the function ends. The pseudocode will look like this: void on_complete() { ... } void my_func() { ... service.post(on_complete); } async_call(my_func);
async_call
function async_call
, you have to create your own. Fortunately, this is not so difficult. See the following code snippet: struct async_op : boost::enable_shared_from_this<async_op>, ... { typedef boost::function<void(boost::system::error_code)> completion_func; typedef boost::function<boost::system::error_code ()> op_func; struct operation { ... }; void start() { { boost::recursive_mutex::scoped_lock lk(cs_); if ( started_) return; started_ = true; } boost::thread t( boost::bind(&async_op::run,this)); } void add(op_func op, completion_func completion, io_service &service) { self_ = shared_from_this(); boost::recursive_mutex::scoped_lock lk(cs_); ops_.push_back( operation(service, op, completion)); if ( !started_) start(); } void stop() { boost::recursive_mutex::scoped_lock lk(cs_); started_ = false; ops_.clear(); } private: boost::recursive_mutex cs_; std::vector<operation> ops_; bool started_; ptr self_; };
async_op
, ( run()
) , ( add()
) . - , :completion
io_service
, completion
. , . : struct async_op : boost::enable_shared_from_this<async_op> , private boost::noncopyable { struct operation { operation(io_service & service, op_func op, completion_func completion): service(&service), op(op)completion(completion), work(new o_service::work(service)){} operation() : service(0) {} io_service * service; op_func op; completion_func completion; typedef boost::shared_ptr<io_service::work> work_ptr; work_ptr work; }; ... };
io_service::work
, service.run()
, ( , io_service::work, service.run()
, ). : struct async_op : ... { typedef boost::shared_ptr<async_op> ptr; static ptr new_() { return ptr(new async_op); } ... void run() { while ( true) { { boost::recursive_mutex::scoped_lock lk(cs_); if ( !started_) break; } boost::this_thread::sleep( boost::posix_time::millisec(10)); operation cur; { boost::recursive_mutex::scoped_lock lk(cs_); if ( !ops_.empty()) { cur = ops_[0]; ops_.erase( ops_.begin()); } } if ( cur.service) cur.service->post(boost::bind(cur.completion, cur.op() )); } self_.reset(); } };
run()
, , ; , . .compute_file_checksum
, : size_t checksum = 0; boost::system::error_code compute_file_checksum(std::string file_name) { HANDLE file = ::CreateFile(file_name.c_str(), GENERIC_READ, 0, 0, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, 0); windows::random_access_handle h(service, file); long buff[1024]; checksum = 0; size_t bytes = 0, at = 0; boost::system::error_code ec; while ( (bytes = read_at(h, at, buffer(buff), ec)) > 0) { at += bytes; bytes /= sizeof(long); for ( size_t i = 0; i < bytes; ++i) checksum += buff[i]; } return boost::system::error_code(0, boost::system::generic_category()); } void on_checksum(std::string file_name, boost::system::error_code) { std::cout << "checksum for " << file_name << "=" << checksum << std::endl; } int main(int argc, char* argv[]) { std::string fn = "readme.txt"; async_op::new_()->add( service, boost::bind(compute_file_checksum,fn), boost::bind(on_checksum,fn,_1)); service.run(); }
io_service
, ( post()
) . . class proxy : public boost::enable_shared_from_this<proxy> { proxy(ip::tcp::endpoint ep_client, ip::tcp::endpoint ep_server) : ... {} public: static ptr start(ip::tcp::endpoint ep_client, ip::tcp::endpoint ep_svr) { ptr new_(new proxy(ep_client, ep_svr)); // ... connect to both endpoints return new_; } void stop() { // ... stop both connections } bool started() { return started_ == 2; } private: void on_connect(const error_code & err) { if ( !err) { if ( ++started_ == 2) on_start(); } else stop(); } void on_start() { do_read(client_, buff_client_); do_read(server_, buff_server_); } ... private: ip::tcp::socket client_, server_; enum { max_msg = 1024 }; char buff_client_[max_msg], buff_server_[max_msg]; int started_; };
on_start()
): class proxy : public boost::enable_shared_from_this<proxy> { ... void on_read(ip::tcp::socket & sock, const error_code& err, size_t bytes) { char * buff = &sock == &client_ ? buff_client_ : buff_server_; do_write(&sock == &client_ ? server_ : client_, buff, bytes); } void on_write(ip::tcp::socket & sock, const error_code &err, size_t bytes) { if ( &sock == &client_) do_read(server_, buff_server_); else do_read(client_, buff_client_); } void do_read(ip::tcp::socket & sock, char* buff) { async_read(sock, buffer(buff, max_msg), MEM_FN3(read_complete,ref(sock),_1,_2), MEM_FN3(on_read,ref(sock),_1,_2)); } void do_write(ip::tcp::socket & sock, char * buff, size_t size) { sock.async_write_some(buffer(buff,size), MEM_FN3(on_write,ref(sock),_1,_2)); } size_t read_complete(ip::tcp::socket & sock, const error_code & err, size_t bytes) { if ( sock.available() > 0) return sock.available(); return bytes > 0 ? 0 : 1; } };
int main(int argc, char* argv[]) { ip::tcp::endpoint ep_c( ip::address::from_string("127.0.0.1"), 8001); ip::tcp::endpoint ep_s( ip::address::from_string("127.0.0.1"), 8002); proxy::start(ep_c, ep_s); service.run(); }
buff_client_
buff_server_
) . , , . , . , , ( ). , , :co-routines
, .Source: https://habr.com/ru/post/196354/
All Articles