ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 8001); size_t read_complete(char * buf, const error_code & err, size_t bytes) { if ( err) return 0; bool found = std::find(buf, buf + bytes, '\n') < buf + bytes; // we read one-by-one until we get to enter, no buffering return found ? 0 : 1; } void sync_echo(std::string msg) { msg += "\n"; ip::tcp::socket sock(service); sock.connect(ep); sock.write_some(buffer(msg)); char buf[1024]; int bytes = read(sock, buffer(buf), boost::bind(read_complete,buf,_1,_2)); std::string copy(buf, bytes - 1); msg = msg.substr(0, msg.size() - 1); std::cout << "server echoed our " << msg << ": "<< (copy == msg ? "OK" : "FAIL") << std::endl; sock.close(); } int main(int argc, char* argv[]) { char* messages[] = { "John says hi", "so does James", "Lucy just got home", "Boost.Asio is Fun!", 0 }; boost::thread_group threads; for ( char ** message = messages; *message; ++message) { threads.create_thread( boost::bind(sync_echo, *message)); boost::this_thread::sleep( boost::posix_time::millisec(100)); } threads.join_all(); }
sync_echo
function. It contains all the logic to connect to the server, sends a message to it and waits for a return response.read()
, because we want to receive the entire message before the '\ n' character. The sock.read_some()
function will not be enough, since it will read only what is available, but not necessarily the entire message.read
completed). In our case, it will always return 1, because we don’t want to mistakenly read more than we need.main()
we create several threads; one thread for each message that the client sends, and wait until they complete. If you run the program, you will see the following output: server echoed our John says hi: OK server echoed our so does James: OK server echoed our Lucy just got home: OK server echoed our Boost.Asio is Fun!: OK
service.run()
. io_service service; size_t read_complete(char * buff, const error_code & err, size_t bytes) { if ( err) return 0; bool found = std::find(buff, buff + bytes, '\n') < buff + bytes; // we read one-by-one until we get to enter, no buffering return found ? 0 : 1; } void handle_connections() { ip::tcp::acceptor acceptor(service, ip::tcp::endpoint(ip::tcp::v4(),8001)); char buff[1024]; while ( true) { ip::tcp::socket sock(service); acceptor.accept(sock); int bytes = read(sock, buffer(buff), boost::bind(read_complete,buff,_1,_2)); std::string msg(buff, bytes); sock.write_some(buffer(msg)); sock.close(); } } int main(int argc, char* argv[]) { handle_connections(); }
handle_connections()
. Since it is single-threaded, we accept the new client, read the message that it sent, send it back, and then wait for the next client. For example, if you connect two clients at once, then the second will have to wait while the server serves the first client.service.run()
.connection
class, as shown in the second chapter . #define MEM_FN(x) boost::bind(&self_type::x, shared_from_this()) #define MEM_FN1(x,y) boost::bind(&self_type::x, shared_from_this(),y) #define MEM_FN2(x,y,z) boost::bind(&self_type::x, shared_from_this(),y,z) class talk_to_svr : public boost::enable_shared_from_this<talk_to_svr>,boost::noncopyable { typedef talk_to_svr self_type; talk_to_svr(const std::string & message) : sock_(service), started_(true),message_(message) {} void start(ip::tcp::endpoint ep) { sock_.async_connect(ep, MEM_FN1(on_connect,_1)); } public: typedef boost::system::error_code error_code; typedef boost::shared_ptr<talk_to_svr> ptr; static ptr start(ip::tcp::endpoint ep, const std::string & message) { ptr new_(new talk_to_svr(message)); new_->start(ep); return new_; } void stop() { if ( !started_) return; started_ = false; sock_.close(); } bool started() { return started_; } ... private: ip::tcp::socket sock_; enum { max_msg = 1024 }; char read_buffer_[max_msg]; char write_buffer_[max_msg]; bool started_; std::string message_; };
talk_to_svr
, so that while there are asynchronous operations in the talk_to_svr
instance, this instance remains alive. In order to avoid such errors as creating instances of talk_to_svr
in the stack, we made the constructor private and forbade the copy constructor (inherited from boost::noncopyable
).start(), stop()
, and started()
, which do only what their names say. To create a connection, simply call talk_to_svr::start(endpoint, message)
. We also have buffers for reading and writing ( read_buffer_
and write_buffer_
). // equivalent to "sock_.async_connect(ep, MEM_FN1(on_connect,_1));" sock_.async_connect(ep, boost::bind(&talk_to_svr::on_connect,shared_ptr_from_this(),_1)); sock_.async_connect(ep, boost::bind(&talk_to_svr::on_connect,this,_1));
async_connect
handler, it will save the shared pointer to the talk_to_server
instance until it calls the final handler, thereby making sure that we are still alive when this happens.talk_to_server
instance is talk_to_server
, it may already have been deleted! void do_read() { async_read(sock_, buffer(read_buffer_), MEM_FN2(read_complete,_1,_2), MEM_FN2(on_read,_1,_2)); } void do_write(const std::string & msg) { if ( !started() ) return; std::copy(msg.begin(), msg.end(), write_buffer_); sock_.async_write_some( buffer(write_buffer_, msg.size()), MEM_FN2(on_write,_1,_2)); } size_t read_complete(const boost::system::error_code & err, size_t bytes) { // similar to the one shown in TCP Synchronous Client }
do_read()
function first makes sure that we read the message from the server, after which on_read()
called. The do_write()
function first copies the message to the buffer (it is likely that msg may go out of scope and will eventually collapse), and then makes sure that the on_write()
call occurs after the actual write. void on_connect(const error_code & err) { if ( !err) do_write(message_ + "\n"); else stop(); } void on_read(const error_code & err, size_t bytes) { if ( !err) { std::string copy(read_buffer_, bytes - 1); std::cout << "server echoed our " << message_ << ": "<< (copy == message_ ? "OK" : "FAIL") << std::endl; } stop(); } void on_write(const error_code & err, size_t bytes) { do_read(); }
do_write()
. When the write operation is completed, on_write()
is called, which initiates the do_read()
function. When do_read()
completed, do_read()
is called, here we simply check that the message from the server is the same as what we sent to it and exit it. int main(int argc, char* argv[]) { ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 8001); char* messages[] = { "John says hi", "so does James", "Lucy got home", 0 }; for ( char ** message = messages; *message; ++message) { talk_to_svr::start( ep, *message); boost::this_thread::sleep( boost::posix_time::millisec(100)); } service.run(); }
server echoed our John says hi: OK server echoed our so does James: OK server echoed our Lucy just got home: OK
class talk_to_client : public boost::enable_shared_from_this<talk_to_client>, boost::noncopyable { typedef talk_to_client self_type; talk_to_client() : sock_(service), started_(false) {} public: typedef boost::system::error_code error_code; typedef boost::shared_ptr<talk_to_client> ptr; void start() { started_ = true; do_read(); } static ptr new_() { ptr new_(new talk_to_client); return new_; } void stop() { if ( !started_) return; started_ = false; vsock_.close(); } ip::tcp::socket & sock() { return sock_;} ... private: ip::tcp::socket sock_; enum { max_msg = 1024 }; char read_buffer_[max_msg]; char write_buffer_[max_msg]; bool started_; };
is_started()
function. For each client, we simply read the message that he sent, send the same message back and close the connection.do_read(), do_write()
and read_complete()
exactly the same as in the asynchronous TCP client.on_read()
and on_write()
functions: void on_read(const error_code & err, size_t bytes) { if ( !err) { std::string msg(read_buffer_, bytes); do_write(msg + "\n"); } stop(); } void on_write(const error_code & err, size_t bytes) { do_read(); }
ip::tcp::acceptor acceptor(service, ip::tcp::endpoint(ip::tcp::v4(), 8001)); void handle_accept(talk_to_client::ptr client, const error_code & err) { client->start(); talk_to_client::ptr new_client = talk_to_client::new_(); acceptor.async_accept(new_client->sock(), boost::bind(handle_accept,new_client,_1)); } int main(int argc, char* argv[]) { talk_to_client::ptr client = talk_to_client::new_(); acceptor.async_accept(client->sock(), boost::bind(handle_accept,client,_1)); service.run(); }
handle_accep
t is handle_accep
, which starts to read asynchronously from this client, and also asynchronously waits for a new client. ip::udp::endpoint ep( ip::address::from_string("127.0.0.1"), 8001); void sync_echo(std::string msg) { ip::udp::socket sock(service, ip::udp::endpoint(ip::udp::v4(), 0) ); sock.send_to(buffer(msg), ep); char buff[1024]; ip::udp::endpoint sender_ep; int bytes = sock.receive_from(buffer(buff), sender_ep); std::string copy(buff, bytes); std::cout << "server echoed our " << msg << ": "<< (copy == msg ? "OK" : "FAIL") << std::endl; sock.close(); } int main(int argc, char* argv[]) { char* messages[] = { "John says hi", "so does James", "Lucy got home", 0 }; boost::thread_group threads; for ( char ** message = messages; *message; ++message) { threads.create_thread( boost::bind(sync_echo, *message)); boost::this_thread::sleep( boost::posix_time::millisec(100)); } threads.join_all(); }
synch_echo()
function; connecting to the server, sending a message, receiving a response message from the server and closing the connection. io_service service; void handle_connections() { char buff[1024]; ip::udp::socket sock(service, ip::udp::endpoint(ip::udp::v4(), 8001)); while ( true) { ip::udp::endpoint sender_ep; int bytes = sock.receive_from(buffer(buff), sender_ep); std::string msg(buff, bytes); sock.send_to(buffer(msg), sender_ep); } } int main(int argc, char* argv[]) { handle_connections(); }
Source: https://habr.com/ru/post/195386/
All Articles