size_t read_complete(boost::system::error_code, size_t bytes){ ... } char buff[1024]; read(sock, buffer(buff), read_complete); write(sock, buffer("echo\n"));
streambuf
. Here is the simplest and worst thing you can do with the streambuf
object: streambuf buf; read(sock, buf);
streambuf
object is streambuf
, and since the streambuf object can redistribute itself to accommodate more space, it will basically read until the connection is closed. You can use the read_until
function to read to the last character: streambuf buf; read_until(sock, buf, "\n");
streambuf
object you will do something similar to the following: streambuf buf; std::ostream out(&buf); out << "echo" << std::endl; write(sock, buf);
write
function to send the contents of the buffer. struct person { std::string first_name, last_name; int age; }; std::ostream& operator<<(std::ostream & out, const person & p) { return out << p.first_name << " " << p.last_name << " " << p.age; } std::istream& operator>>(std::istream & in, person & p) { return in >> p.first_name >> p.last_name >> p.age; }
streambuf buf; std::ostream out(&buf); person p; // ... initialize p out << p << std::endl; write(sock, buf);
read_until(sock, buf, "\n"); std::istream in(&buf); person p; in >> p;
streambuf
objects and, of course, the corresponding std::ostream
for writing or std::istream
for reading, is that you end up writing code that will be considered normal:streambuf
object in the console, use the following code: streambuf buf; ... std::cout << &buf << std::endl; // dumps all content to the console
std::string to_string(streambuf &buf) { std::ostringstream out; out << &buf; return out.str(); }
streambuf
derived from std::streambuf.
Like std :: streambuf, it does not have a copy constructor.streambuf ([max_size,] [allocator])
: this function creates a streambuf object. If necessary, you can optionally set the maximum buffer size and allocator that will be used to allocate / free memory.prepare(n)
: This function returns a sub-buffer used to accommodate a continuous sequence of n
characters. It can be used to read or write. The result of this function can be used with any independent function from Boost. Asio read / write, and not only with those that work with streambuf
objects.data()
: this function returns the entire buffer as a continuous sequence of characters and is used for writing. The result of this function can be used with any independent function from Boost.Asio, recording, and not only with those that work with streambuf
objects.consume(n)
: in this function, the data is removed from the input sequence (from the read operation).commit(n)
: in this function, data is removed from the output sequence (from a write operation) and added to the input sequence (in a read operation).size()
: This function returns the size in characters of the entire streambuf
object.max_size()
: this function returns the maximum number of characters that can be contained in the streambuf
object.streambuf
as an argument for reading / writing an independent function, as shown below: read_until(sock, buf, "\n"); // reads into buf write(sock, buf); // writes from buf
read_until(sock, buf, '\n'); std::cout << &buf << std::endl;
read(sock, buf.prepare(16), transfer_exactly(16) ); std::cout << &buf << std::endl;
read(sock, buf.prepare(16), transfer_exactly(16) ); buf.commit(16); std::cout << &buf << std::endl;
streambuf
object and if you use the independent write function, use the following code snippet: streambuf buf; std::ostream out(&buf); out << "hi there" << std::endl; write(sock, buf);
hi there
three times: streambuf buf; std::ostream out(&buf); out << "hi there" << std::endl; for ( int i = 0; i < 3; ++i) write(sock, buf.data());
streambuf buf; std::ostream out(&buf); out << "hi there" << std::endl; write(sock, buf.data()); buf.consume(9);
streambuf
instance. Use the previous features if you want tweaking.streambuf
objectsstreambuf
objects:read (sock, buf [, completion_function])
: this function reads from the socket to the streambuf
object. The final function is optional. If it is, then it is called after each successful read operation and reports Boost.Asio, if the operation is completed (if not, it continues reading). Its signature is as follows: size_t completion(const boost::system::error_code & err, size_t bytes_transfered)
Upon completion, the function returns 0, meaning, if the read operation has completed completely; if they return a non-zero value, this means that the maximum number of bytes for the next call to the stream function read_some
.read_at(radom_stream, offset, buf [, completion_function])
: this function reads from a random stream. Note that this does not apply to sockets (since they do not model the concept of a random stream).read_until(sock, buf, char | string | regex | match_condition)
: this function reads as long as this condition is met. Either a certain character must be read, or a string or regular expression matches one of the read lines, or the match_condition
function will tell us to exit the function. The match_condition
function signature is as follows: match_conditionis pair<iterator,bool>match(iterator begin, iterator end)
; where the main iterator is buffers_iterator <streambuf::const_buffers_type>
. If a match is found, the pair will return ( passed-end-of-match
set to true
), if no match is found, the other pair will return ( begin
set to false
).write(sock, buf [, completion_function])
: This function writes all the contents into a streambuf
object. The final function is optional and its behavior is similar to the final read()
function: it returns 0 when the write operation is completed or a non-zero value when the number of bytes is specified which will be written upon the next call to the stream function write_some
.write_at(random_stream,offset, buf [, completion_function])
: this function writes to a random stream. Again, does not apply to sockets.async_read(sock, buf [, competion_function], handler)
: this asynchronous counterpart of the read()
function. The handler signature is as follows: void handler(const boost::system::error_code, size_t bytes)
.async_read_at(radom_stream, offset, buf [, completion_function] ,handler)
: this is the asynchronous counterpart of the read_at()
function.async_read_until (sock, buf, char | string | regex | match_condition, handler)
: this is the asynchronous counterpart of the read_until()
function.async_write(sock, buf [, completion_function] , handler)
: this is the asynchronous counterpart of the write()
function.async_write_at(random_stream,offset, buf [, completion_function], handler)
: this is the asynchronous counterpart of the write_at()
function. streambuf buf; bool is_vowel(char c) { return c == 'a' || c == 'e' || c == 'i' || c == 'o' || c == 'u'; } size_t read_complete(boost::system::error_code, size_t bytes) { const char * begin = buffer_cast<const char*>( buf.data()); if ( bytes == 0) return 1; while ( bytes > 0) { if ( is_vowel(*begin++)) return 0; else --bytes; } return 1; } ... read(sock, buf, read_complete);
read_until(sock, buf, boost::regex("^[aeiou]+") );
match_condition
function to work: streambuf buf; bool is_vowel(char c) { return c == 'a' || c == 'e' || c == 'i' || c == 'o' || c == 'u'; } typedef buffers_iterator<streambuf::const_buffers_type> iterator; std::pair<iterator,bool> match_vowel(iterator b, iterator e) { while ( b != e) { if ( is_vowel(*b++)) return std::make_pair(b, true); } return std::make_pair(e, false); } ... size_t bytes = read_until(sock, buf, match_vowel);
boost/libs/asio/example/http/server4: yield.hpp
and coroutine.hpp
. Here two macros and a class are defined in Boost.Asio:coroutine
: this class is a derivative of your connection
class or that you use in order to implement coroutines.reenter(entry)
: this is the coroutine body. The input argument is a pointer to a subroutine, for example, for use as a block within an entire function. class talk_to_svr : public boost::enable_shared_from_this<talk_to_svr> , public coroutine, boost::noncopyable { ... void step(const error_code & err = error_code(), size_t bytes = 0) { reenter(this) { for (;;) { yield async_write(sock_, write_buffer_, MEM_FN2(step,_1,_2) ); yield async_read_until( sock_, read_buffer_,"\n", MEM_ FN2(step,_1,_2)); yield service.post( MEM_FN(on_answer_from_server)); } } } };
connect(), on_connect(), on_read(),do_read(), on_write(), do_write()
and so on. Now we have one called function step()
reenter(this) { for (;;) { }}
. You can think of reenter(this)
as the code that we executed last, so that we can now call the following code.reenter
block you can see several ongoing calls. The first time the function is entered, the async_write
function is async_write
, the second input is the async_read_until
function, the third is the service.post
function, the fourth is async_write
again async_write
and so on.for(;;) {}.
instance for(;;) {}.
Look at the following code: void step(const error_code & err = error_code(), size_t bytes = 0) { reenter(this) { yield async_write(sock_, write_buffer_, MEM_FN2(step,_1,_2) ); yield async_read_until( sock_, read_buffer_, "\n",MEM_FN2(step,_1,_2)); yield service.post( MEM_FN(on_answer_from_server)); } }
service.post
. The fourth time we would have passed by service.post
and didn’t do anything. The same thing will happen for the fifth time and for all of the following: class talk_to_svr : public boost::enable_shared_from_this<talk_to_svr> , public coroutine, boost::noncopyable { talk_to_svr(const std::string & username) : ... {} void start(ip::tcp::endpoint ep) { sock_.async_connect(ep, MEM_FN2(step,_1,0) ); } static ptr start(ip::tcp::endpoint ep, const std::string & username) { ptr new_(new talk_to_svr(username)); new_->start(ep); return new_; } void step(const error_code & err = error_code(), size_t bytes = 0) { reenter(this) { for (;;) { if ( !started_) { started_ = true; std::ostream out(&write_buf_); out << "login " << username_ << "\n"; } yield async_write(sock_, write_buf_, MEM_FN2(step,_1,_2) ); yield async_read_until( sock_,read_buf_,"\n", MEM_FN2(step,_1,_2)); yield service.post( MEM_FN(on_answer_from_server)); } } } void on_answer_from_server() { std::istream in(&read_buf_); std::string word; in >> word; if ( word == "login") on_login(); else if ( word == "ping") on_ping(); else if ( word == "clients") on_clients(); read_buf_.consume( read_buf_.size()); if (write_buf_.size() > 0) service.post( MEM_FN2(step,error_code(),0)); } ... private: ip::tcp::socket sock_; streambuf read_buf_, write_buf_; bool started_; std::string username_; deadline_timer timer_; };
start()
function is called, which asynchronously connects to the server. When the connection is established, we enter step()
for the first time. This is when we send a message with our login.async_write
, then async_read_until
and process the message ( on_answer_from_server
).on_answer_from_server
function on_answer_from_server
we process incoming messages; we read the first word and send it to the corresponding function, and we ignore the rest of the message (in any case): class talk_to_svr : ... { ... void on_login() { do_ask_clients(); } void on_ping() { std::istream in(&read_buf_); std::string answer; in >> answer; if ( answer == "client_list_changed") do_ask_clients(); else postpone_ping(); } void on_clients() { std::ostringstream clients; clients << &read_buf_; std::cout << username_ << ", new client list:" << clients. str(); postpone_ping(); } void do_ping() { std::ostream out(&write_buf_); out << "ping\n"; service.post( MEM_FN2(step,error_code(),0)); } void postpone_ping() { timer_.expires_from_now(boost::posix_time::millisec(rand() % 7000)); timer_.async_wait( MEM_FN(do_ping)); } void do_ask_clients() { std::ostream out(&write_buf_); out << "ask_clients\n"; } };
int main(int argc, char* argv[]) { ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 8001); talk_to_svr::start(ep, "John"); service.run(); }
Source: https://habr.com/ru/post/196888/
All Articles