using namespace boost::asio; struct client { ip::tcp::socket sock; char buff[1024]; // each msg is at maximum this size int already_read; // how much have we already read? }; std::vector<client> clients; void handle_clients() { while ( true) for ( int i = 0; i < clients.size(); ++i) if ( clients[i].sock.available() ) on_read(clients[i]); } void on_read(client & c) { int to_read = std::min( 1024 - c.already_read, c.sock. available()); c.sock.read_some( buffer(c.buff + c.already_read, to_read)); c.already_read += to_read; if ( std::find(c.buff, c.buff + c.already_read, '\n') < c.buff + c.already_read) { int pos = std::find(c.buff, c.buff + c.already_read, '\n') - c.buff; std::string msg(c.buff, c.buff + pos); std::copy(c.buff + pos, c.buff + 1024, c.buff); c.already_read -= pos; on_read_msg(c, msg); } } void on_read_msg(client & c, const std::string & msg) { // analyze message, and write back if ( msg == "request_login") c.sock.write( "request_ok\n"); else if ... }
handle_clients()
function to block as little as possible. If the function is blocked at any point, then all incoming messages from the client will wait for the function to unlock and begin processing them.if ( clients[i].sock.available() ) on_read(clients[i])
. In on_read, we will only read as much as is available; calling read_until(c.sock, buffer(...),'\n')
would not be a very good idea, since it is blocked until we read the message from a specific client to the end (we will never know when this will happen) .on_read_msg()
function; all incoming messages will be suspended until this function is executed. A well-written on_read_msg()
function will ensure that this does not happen, but it can still happen (sometimes writing to a socket can be blocked, for example, if its buffer is full). using namespace boost::asio; struct client { // ... same as before bool set_reading() { boost::mutex::scoped_lock lk(cs_); if ( is_reading_) return false; // already reading else { is_reading_ = true; return true; } } void unset_reading() { boost::mutex::scoped_lock lk(cs_); is_reading_ = false; } private: boost::mutex cs_; bool is_reading_; }; std::vector<client> clients; void handle_clients() { for ( int i = 0; i < 10; ++i) boost::thread( handle_clients_thread); } void handle_clients_thread() { while ( true) for ( int i = 0; i < clients.size(); ++i) if ( clients[i].sock.available() ) if ( clients[i].set_reading()) { on_read(clients[i]); clients[i].unset_reading(); } } void on_read(client & c) { // same as before } void on_read_msg(client & c, const std::string & msg) { // same as before }
set_reading
() and set_unreading()
functions do. The set_reading()
function is very important. You want to "check whether you can read and start reading" was performed in one step. If you do this in two steps (“check whether you can read” and “start reading”), then you can start two streams: one to check for reading for a client, the other to call the on_read
function for the same client, in the end, this can lead to data corruption and possibly even a system crash.on_read
operation is on_read
, we respond in response, and then we wait for the next request to arrive (we start another asynchronous read operation). using namespace boost::asio; io_service service; struct client { ip::tcp::socket sock; streambuf buff; // reads the answer from the client } std::vector<client> clients; void handle_clients() { for ( int i = 0; i < clients.size(); ++i) async_read_until(clients[i].sock, clients[i].buff, '\n', boost::bind(on_read, clients[i], _1, _2)); for ( int i = 0; i < 10; ++i) boost::thread(handle_clients_thread); } void handle_clients_thread() { service.run(); } void on_read(client & c, const error_code & err, size_t read_bytes) { std::istream in(&c.buff); std::string msg; std::getline(in, msg); if ( msg == "request_login") c.sock.async_write( "request_ok\n", on_write); else if ... ... // now, wait for the next read from the same client async_read_until(c.sock, c.buff, '\n', boost::bind(on_read, c, _1, _2)); }
client
structure has only two members, handle_clients()
simply calls async_read_until
, and then creates ten threads, each of which calls service.run()
. These threads will handle all asynchronous read or write operations to the client. Another thing to note is that the on_read()
function will constantly prepare for the next asynchronous read operation (see the last line).io_service
class provides four functions, such as run(), run_one(), poll()
, and poll_one()
. Although most of the time you will be working with service.run()
. Here you will learn what can be achieved with the help of other functions.run()
will work until the pending operations are completed or until you call io_service::stop()
yourself. To keep an io_service
instance working, you typically add one or more asynchronous operations, and when they end, you continue to add, as shown in the following code: using namespace boost::asio; io_service service; ip::tcp::socket sock(service); char buff_read[1024], buff_write[1024] = "ok"; void on_read(const boost::system::error_code &err, std::size_t bytes) ; void on_write(const boost::system::error_code &err, std::size_t bytes) { sock.async_read_some(buffer(buff_read), on_read); } void on_read(const boost::system::error_code &err, std::size_t bytes) { // ... process the read ... sock.async_write_some(buffer(buff_write,3), on_write); } void on_connect(const boost::system::error_code &err) { sock.async_read_some(buffer(buff_read), on_read); } int main(int argc, char* argv[]) { ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 2001); sock.async_connect(ep, on_connect); service.run(); }
service.run()
is called, at least one asynchronous operation is pending. When the socket connects to the server, on_connect
is on_connect
, which adds another asynchronous operation. After the end of work on_connect
, we have one scheduled operation ( read
). When the on_read
operation is on_read
, we write the answer, another scheduled operation ( write
) is added. When the on_write
function is on_write
, we read the following message from the server, which will add another scheduled operation. When the on_write
function on_write
, we have one scheduled operation ( read
). And so the cycle continues until we decide to close the application.io_service::run
was called. This is noted for simplicity, because at least 90 to 95 percent of the time is the only function you'll use. The same is true for run_one(), poll()
, or poll_one()
in the stream.run_one()
function will perform and send more than one asynchronous operation: io_service service; service.run(); // OR while ( !service.stopped()) service.run_once();
run_once()
to start an asynchronous operation, and then wait for it to complete: io_service service; bool write_complete = false; void on_write(const boost::system::error_code & err, size_t bytes) { write_complete = true; } ... std::string data = "login ok"; write_complete = false; async_write(sock, buffer(data), on_write); do service.run_once() while (!write_complete);
run_one()
along with Boost.Asio, for example blocking_tcp_client.cpp
and blocking_udp_client.cpp
. The poll_one
function runs no more than one deferred operation that is ready to start without blocking:run_one()
will start it and return 1async_wait
handler should be async_wait
async_read
) and its handler must be calledio_services
instance io_services
(this is explained in detail in the next section)poll_one
to make sure that all the I / O handlers are running and proceed to the following tasks: io_service service; while ( true) { // run all handlers of completed IO operations while ( service.poll_one()) ; // ... do other work here ... }
poll()
function will perform all operations that are pending, and can be run without blocking. The following code is equivalent: io_service service; service.poll(); // OR while ( service.poll_one()) ;
boost::system::system_error
in case of failure. But this should never happen; an error thrown here usually results in a crash, maybe an error in the resources or one of your handlers threw an exception. In any case, each of the functions has an overload, which does not throw exceptions, but takes boost::system::error_code
as an argument and sets it as a return value. io_service service; boost::system::error_code err = 0; service.run(err); if ( err) std::cout << "Error " << err << std::endl;
service.post()
to add a custom function that will be called asynchronously, for example: #include <boost/thread.hpp> #include <boost/bind.hpp> #include <boost/asio.hpp> #include <iostream> using namespace boost::asio; io_service service; void func(int i) { std::cout << "func called, i= " << i << std::endl; } void worker_thread() { service.run(); } int main(int argc, char* argv[]) { for ( int i = 0; i < 10; ++i) service.post(boost::bind(func, i)); boost::thread_group threads; for ( int i = 0; i < 3; ++i) threads.create_thread(worker_thread); // wait for all threads to be created boost::this_thread::sleep( boost::posix_time::millisec(500)); threads.join_all(); }
service.post(some_function)
adds an asynchronous function call. This function is immediately terminated, after an io_service
instance is io_service
to call this some_function
in one of the threads, which calls service.run()
. In our case, we created one of the three streams in advance. You cannot be sure of the order in which the asynchronous functions will be called. You should not expect them to be called in the order of their addition ( post()
). A possible result of the previous example is as follows: func called, i= 0 func called, i= 2 func called, i= 1 func called, i= 4 func called, i= 3 func called, i= 6 func called, i= 7 func called, i= 8 func called, i= 5 func called, i= 9
go_to_restaurant
), place an order ( order
) and eat ( eat
). You want to first come to the restaurant, make an order, and only then eat. To do this, you will use io_service::strand
, which will assign which asynchronous handler to call. Consider the following example: using namespace boost::asio; io_service service; void func(int i) { std::cout << "func called, i= " << i << "/" << boost::this_thread::get_id() << std::endl; } void worker_thread() { service.run(); } int main(int argc, char* argv[]) { io_service::strand strand_one(service), strand_two(service); for ( int i = 0; i < 5; ++i) service.post( strand_one.wrap( boost::bind(func, i))); for ( int i = 5; i < 10; ++i) service.post( strand_two.wrap( boost::bind(func, i))); boost::thread_group threads; for ( int i = 0; i < 3; ++i) threads.create_thread(worker_thread); // wait for all threads to be created boost::this_thread::sleep( boost::posix_time::millisec(500)); threads.join_all(); }
func called, i = 0
will be output before func called, i = 1
, which will be displayed before func called, i = 2
and so on . The same for func called, i = 5
, which will be displayed before func called, i = 6
and func called, i = 6
will be displayed before func called, i = 7
and so on. It should be noted that even if functions are called sequentially, this does not mean that they will all be called in one thread. A possible implementation of this program may be as follows: func called, i= 0/002A60C8 func called, i= 5/002A6138 func called, i= 6/002A6530 func called, i= 1/002A6138 func called, i= 7/002A6530 func called, i= 2/002A6138 func called, i= 8/002A6530 func called, i= 3/002A6138 func called, i= 9/002A6530 func called, i= 4/002A6138
service.post(handler)
: this function ensures that it will complete immediately after it makes a request to the io_service
instance to call the specified handler. The handler will be called later in one of the threads that called service.run()
.service.dispatch(handler)
: this is a request to an instance of io_service
to call a given handler, but, in addition, it can call a handler inside a function if the current thread called service.run()
.service.wrap(handler)
: this function creates a wrapper function that will call service.dispatch(handler)
. This is a bit confusing, I will explain shortly what it means.service.post()
in the previous section, as well as a possible result of the program execution. Change it and see how service.dispatch
affects the result: using namespace boost::asio; io_service service; void func(int i) { std::cout << "func called, i= " << i << std::endl; } void run_dispatch_and_post() { for ( int i = 0; i < 10; i += 2) { service.dispatch(boost::bind(func, i)); service.post(boost::bind(func, i + 1)); } } int main(int argc, char* argv[]) { service.post(run_dispatch_and_post); service.run(); }
func called, i= 0 func called, i= 2 func called, i= 4 func called, i= 6 func called, i= 8 func called, i= 1 func called, i= 3 func called, i= 5 func called, i= 7 func called, i= 9
dispatch()
to write even numbers and post()
to write odd numbers. dispatch()
will call the handler before it completes, because the current thread called service.run()
, while post () terminates right away.service.wrap(handler)
. wrap () returns a functor that can be used as an argument to another function: using namespace boost::asio; io_service service; void dispatched_func_1() { std::cout << "dispatched 1" << std::endl; } void dispatched_func_2() { std::cout << "dispatched 2" << std::endl; } void test(boost::function<void()> func) { std::cout << "test" << std::endl; service.dispatch(dispatched_func_1); func(); } void service_run() { service.run(); } int main(int argc, char* argv[]) { test( service.wrap(dispatched_func_2)); boost::thread th(service_run); boost::this_thread::sleep( boost::posix_time::millisec(500)); th.join(); }
test(service.wrap(dispatched_func_2));
will wrap dispatched_func_2
and create a functor that will be passed to test
as an argument. When test()
called, it redirects the call to dispatched_func_1()
and calls func()
. At this point, you will see that calling func()
equivalent to service.dispatch(dispatched_func_2)
, since they are called sequentially. The output of the program confirms this: test dispatched 1 dispatched 2
io_service::strand
class (used to serialize asynchronous actions) also contains the poll(), dispatch()
and wrap()
functions. Their meaning is the same as the poll(), dispatch()
and wrap()
io_service
from io_service
. However, most of the time, you will only use the io_service::strand::wrap()
function as an argument for io_service::poll()
or io_service::dispatch()
. io_service service; ip::tcp::socket sock(service); char buff[512]; ... read(sock, buffer(buff));
sock
and buff
must both survive the read()
call. In other words, they must be valid after the read()
call is completed. This is exactly what you expect, all the arguments you pass to the function must be valid inside it. Everything becomes more complicated when we go asynchronously: io_service service; ip::tcp::socket sock(service); char buff[512]; void on_read(const boost::system::error_code &, size_t) {} ... async_read(sock, buffer(buff), on_read);
sock
and buff
must survive the read
operation itself, but we do not know when this will happen, since it is asynchronous.buffer
instance that survived the asynchronous call (using boost::shared_array<>
). Here we can use the same principle by creating a class that internally contains a socket and buffers for reading / writing. Then for all asynchronous calls we pass the boost::bind
shared pointer functor: using namespace boost::asio; io_service service; struct connection : boost::enable_shared_from_this<connection> { typedef boost::system::error_code error_code; typedef boost::shared_ptr<connection> ptr; connection() : sock_(service), started_(true) {} void start(ip::tcp::endpoint ep) { sock_.async_connect(ep, boost::bind(&connection::on_connect, shared_from_this(), _1)); } void stop() { if ( !started_) return; started_ = false; sock_.close(); } bool started() { return started_; } private: void on_connect(const error_code & err) { // here you decide what to do with the connection: read or write if ( !err) do_read(); else stop(); } void on_read(const error_code & err, size_t bytes) { if ( !started() ) return; std::string msg(read_buffer_, bytes); if ( msg == "can_login") do_write("access_data"); else if ( msg.find("data ") == 0) process_data(msg); else if ( msg == "login_fail") stop(); } void on_write(const error_code & err, size_t bytes) { do_read(); } void do_read() { sock_.async_read_some(buffer(read_buffer_), boost::bind(&connection::on_read, shared_from_this(), _1, _2)); } void do_write(const std::string & msg) { if ( !started() ) return; // note: in case you want to send several messages before // doing another async_read, you'll need several write buffers! std::copy(msg.begin(), msg.end(), write_buffer_); sock_.async_write_some(buffer(write_buffer_, msg.size()), boost::bind(&connection::on_write, shared_from_this(), _1, _2)); } void process_data(const std::string & msg) { // process what comes from server, and then perform another write } private: ip::tcp::socket sock_; enum { max_msg = 1024 }; char read_buffer_[max_msg]; char write_buffer_[max_msg]; bool started_; }; int main(int argc, char* argv[]) { ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 8001); connection::ptr(new connection)->start(ep); }
boost::bind
functor is sent as an argument. This functor internally stores the shared pointer on the connection
instance. While an asynchronous operation is pending, Boost.Asio will store a copy of the boost::bind
functor, which in turn stores the shared pointer on the connection
. Problem solved!connection
class is only a skeleton
class; You will need to adapt it to your needs (in the case of a server, it will look very different). Notice how easy you are creating a new connection::ptr(new connection)->start(ep)
. This begins a (asynchronous) connection to the server. If you want to close the connection, you call stop()
.start()
), it will wait for connections. When a connection occurs, on_connect()
called. If there are no errors, then a read operation ( do_read()
) is called. Once the read operation is complete, you can interpret the message; most likely in your on_read()
application will look different. When you send a message, you have to copy it to a buffer, and then send it, as done in do_write()
, because, again, the buffer must survive the asynchronous write operation. And the last note - when recording, remember that you must specify how much to write, otherwise the entire buffer will be sent.resolver
object to include a host name, such as www.yahoo.com, instead of one or more IP addresses.service.run()
to create an asynchronous loop, but sometimes you need to go further and then you can use run_one(), poll()
, or poll_one()
.service.post()
or service.dispatch()
.connection
class should enabled_shared_from_this
from enabled_shared_from_this
, contain all the necessary buffers within it and, at each asynchronous call, pass the shared pointer to this operation.Source: https://habr.com/ru/post/195006/
All Articles