📜 ⬆️ ⬇️

Asynchronous data exchange with a remote application via SSH

Good day, friends and colleagues. My name is still Dmitry Smirnov, and I still, to my great pleasure, am the developer of the ISPsystem. Some time ago I started working on a completely new project that inspired me a lot, because the new one is in our case the absence of Legacy code and the support of old compilers. Hello, Boost, C ++ 17 and all the other joys of modern development.

It so happened that all my past projects were multithreaded, respectively, I had very little experience with asynchronous solutions. This was the most pleasant for me in this development, in addition to modern powerful tools.

One of the latest related tasks was the need to write a wrapper over the libssh2 library in the realities of an asynchronous application that uses Boost.Asio , and capable of generating no more than two threads. About this and tell.
')


Note: the author assumes that the reader is familiar with the basics of asynchronous development and boost :: asio.

Task


In general, the task was as follows: connect to a remote server using an rsa key or username and password; download a script to the remote machine and run it; read his answers and send him commands through the same connection. In this case, of course, not blocking the flow (which is half of the total possible pool).

Disclaimer : I know that work with SSH is implemented in Poco, but I did not find a way to marry him with Asio, and writing something of my own was more interesting :-).

Initialization


To initialize and minimize the library, I decided to use the usual singleton:

Init ()
class LibSSH2 { public: static void Init() { static LibSSH2 instance; } private: explicit LibSSH2() { if (libssh2_init(0) != 0) { throw std::runtime_error("libssh2 initialization failed"); } } ~LibSSH2() { std::cout << "shutdown libssh2" << std::endl; libssh2_exit(); } }; 



There are in this decision, of course, the pitfalls, according to my favorite reference book, “One Thousand and One Way to Shoot Yourself in the Foot in C ++”. If someone generates a stream that will be forgotten to start eating, and the main one ends earlier, interesting special effects may well arise. But in this case I will not take into account this possibility.

Main entities


After analyzing the example , it becomes clear that for our small library we need three simple entities: a socket, a session, and a pipe. As it is not bad to have synchronous instruments, for the time being we will leave Asio aside.

Let's start with a simple socket:

Socket
 class Socket { public: explicit Socket() : m_sock(socket(AF_INET, SOCK_STREAM, 0)) { if (m_sock == -1) { throw std::runtime_error("failed to create socket"); } } ~Socket() { close(m_sock); } private: int m_sock = -1; } 


Now the session:

Session
 class Session { public: explicit Session(const bool enable_compression) : m_session(libssh2_session_init()) { if (m_session == nullptr) { throw std::runtime_error("failed to create libssh2 session"); } libssh2_session_set_blocking(m_session, 0); if (enable_compression) { libssh2_session_flag(m_session, LIBSSH2_FLAG_COMPRESS, 1); } } ~Session() { const std::string desc = "Shutting down libssh2 session"; libssh2_session_disconnect(m_session, desc.c_str()); libssh2_session_free(m_session); } private: LIBSSH2_SESSION *m_session; } 


Since we now have a socket and a session, it would be nice to write a wait function for the socket in libssh2 realities:

Waiting for socket
 int WaitSocket() const { pollfd fds{}; fds.fd = sock; fds.events = 0; if ((libssh2_session_block_directions(session) & LIBSSH2_SESSION_BLOCK_INBOUND) != 0) { fds.events |= POLLIN; } if ((libssh2_session_block_directions(session) & LIBSSH2_SESSION_BLOCK_OUTBOUND) != 0) { fds.events |= POLLOUT; } return poll(&fds, 1, 10); } 


Actually, this is practically no different from the above example, except that it uses select instead of poll.

There is a channel. There are several types of channels in libssh2: idle, SCP, direct tcp. We are interested in the simplest, basic channel:

Channel
 class SimpleChannel { public: explicit SimpleChannel(session) { while ((m_channel = libssh2_channel_open_session(session) == nullptr && GetSessionLastError() == LIBSSH2_ERROR_EAGAIN) { WaitSocket(); } if (m_channel == nullptr) { throw std::runtime_error("Critical error while opening simple channel"); } } void SendEof() { while (libssh2_channel_send_eof(m_channel) == LIBSSH2_ERROR_EAGAIN) { WaitSocket(); } while (libssh2_channel_wait_eof(m_channel) == LIBSSH2_ERROR_EAGAIN) { WaitSocket(); } } ~SimpleChannel() { CloseChannel(); } private: void CloseChannel() { int rc; while ((rc = libssh2_channel_close(m_channel)) == LIBSSH2_ERROR_EAGAIN) { WaitSocket(); } libssh2_channel_free(m_channel); } LIBSSH2_CHANNEL *m_channel; }; 


Now that all the basic tools are ready, it remains to establish a connection with the host and perform the manipulations we need. Asynchronous recording to the channel and synchronous, of course, will be very different, but the process of establishing a connection is not.

Therefore, we write the base class:

Base connection
 class BaseConnectionImpl { protected: explicit BaseConnectionImpl(const SshConnectData &connect_data) ///<    ,     : m_session(connect_data.enable_compression) , m_connect_data(connect_data) { LibSSH2::Init(); ConnectSocket(); HandShake(); ProcessKnownHosts(); Auth(); } ///       bool CheckSocket(int type) const { pollfd fds{}; fds.fd = m_sock; fds.events = type; return poll(&fds, 1, 0) == 1; } bool WantRead() const { return CheckSocket(POLLIN); } bool WantWrite() const { return CheckSocket(POLLOUT); } /*   ,   ,       *  - . */ void ConnectSocket() {...} void HandShake() {...} void Auth() {...} class Socket m_sock; class Session m_session; class SimpleChannel; SshConnectData m_connect_data; }; 


Now we are ready to write the simplest class for connecting to a remote host and executing some command on it:

Synchronous connection
 class Connection::Impl : public BaseConnectionImpl { public: explicit Impl(const SshConnectData &connect_data) : BaseConnectionImpl(connect_data) {} template <typename Begin> void WriteToChannel(LIBSSH2_CHANNEL *channel, Begin ptr, size_t size) { do { int rc; while ((rc = libssh2_channel_write(channel, ptr, size)) == LIBSSH2_ERROR_EAGAIN) { WaitSocket(); } if (rc < 0) { break; } size -= rc; ptr += rc; } while (size != 0); } void ExecuteCommand(const std::string &command, const std::string &in = "") { SimpleChannel channel(*this); int return_code = libssh2_channel_exec(channel, command.c_str()); if (return_code != 0 && return_code != LIBSSH2_ERROR_EAGAIN) { throw std::runtime_error("Critical error while executing ssh command"); } if (!in.empty()) { WriteToChannel(channel, in.c_str(), in.size()); channel.SendEof(); } std::string response; for (;;) { int rc; do { std::array<char, 4096> buffer{}; rc = libssh2_channel_read(channel, buffer.data(), buffer.size()); if (rc > 0) { boost::range::copy(boost::adaptors::slice(buffer, 0, rc), std::back_inserter(response)); } else if (rc < 0 && rc != LIBSSH2_ERROR_EAGAIN) { throw std::runtime_error("libssh2_channel_read error (" + std::to_string(rc) + ")"); } } while (rc > 0); if (rc == LIBSSH2_ERROR_EAGAIN) { WaitSocket(); } else { break; } } } }; 


Until now, everything that we have written has been simply to bring the examples of libssh2 to a more civilized form. But now, having all the simple tools for synchronous writing data to the channel, we can go to Asio.

Having a standard socket is good, but not very practical if you need to wait for its readiness to read / write asynchronously, while doing your own things. Here comes boost :: asio :: ip :: tcp :: socket, which has a wonderful method:

 async_wait(wait_type, WaitHandler) 

It is wonderfully constructed from a normal socket, for which we have already established a connection in advance and boost :: asio :: io_context - the execution context of our application.

Asynchronous Connection Designer
 class AsyncConnection::Impl : public BaseConnectionImpl, public std::enable_shared_from_this<AsyncConnection::Impl> { public: Impl(boost::asio::io_context &context, const SshConnectData &connect_data) : BaseConnectionImpl(connect_data) , m_tcp_socket(context, tcp::v4(), m_sock.GetSocket()) { m_tcp_socket.non_blocking(true); } }; 



Now we need to start executing any command on the remote host and, as the data comes from it, give it to some callback.

 void AsyncRun(const std::string &command, CallbackType &&callback) { m_read_callback = std::move(callback); auto ec = libssh2_channel_exec(*m_channel, command.c_str()); TryRead(); } 

Thus, by running the command, we transfer control to the TryRead () method.

 void TryRead() { if (m_read_in_progress) { return; } m_tcp_socket.async_wait(tcp::socket::wait_read, [this, self = shared_from_this()](auto ec) { if (WantRead()) { ReadHandler(ec); } if (m_complete) { return; } TryRead(); }); } 

First of all, we check if the reading process has already been started by some previous call. If not, we begin to expect the readiness of the socket for reading. As a wait handler, the usual lambda with the capture of shared_from_this () is used.

Pay attention to the WantRead () call. Async_wait, as it turned out, also has its flaws, and can just return on timeout. In order not to perform unnecessary actions in this case, I decided to check the socket through poll without a timeout - whether the socket really wants to read now. If not, then we simply run TryRead () again and wait. Otherwise, we immediately proceed to reading and transferring data to the callback.

 void ReadHandler(const boost::system::error_code &error) { if (error != boost::system::errc::success) { return; } m_read_in_progress = true; int ec = LIBSSH2_ERROR_EAGAIN; std::array<char, 4096> buffer {}; while ((ec = libssh2_channel_read(*m_channel, buffer.data(), buffer.size())) > 0) { std::string tmp; boost::range::copy(boost::adaptors::slice(buffer, 0, ec), std::back_inserter(tmp)); if (m_read_callback != nullptr) { m_read_callback(tmp); } } m_read_in_progress = false; } 

Thus, an infinite asynchronous read loop from the running application starts. The next step for us is to send instructions to the application:

 void AsyncWrite(const std::string &data, WriteCallbackType &&callback) { m_input += data; m_write_callback = std::move(callback); TryWrite(); } 

The data transferred to the asynchronous recording and callback will be stored inside the connection. And we will start the next cycle, only this time the entries:

Write cycle
 void TryWrite() { if (m_input.empty() || m_write_in_progress) { return; } m_tcp_socket.async_wait(tcp::socket::wait_write, [this, self = shared_from_this()](auto ec) { if (WantWrite()) { WriteHandler(ec); } if (m_complete) { return; } TryWrite(); }); } void WriteHandler(const boost::system::error_code &error) { if (error != boost::system::errc::success) { return; } m_write_in_progress = true; int ec = LIBSSH2_ERROR_EAGAIN; while (!m_input.empty()) { auto ptr = m_input.c_str(); auto read_size = m_input.size(); while ((ec = libssh2_channel_write(*m_channel, ptr, read_size)) > 0) { read_size -= ec; ptr += ec; } AssertResult(ec); m_input.erase(0, m_input.size() - read_size); if (ec == LIBSSH2_ERROR_EAGAIN) { break; } } if (m_input.empty() && m_write_callback != nullptr) { m_write_callback(); } m_write_in_progress = false; } 


Thus, we will write data to the channel until all of them have been successfully transmitted. Then we will return control to the caller so that you can transfer a new piece of data. So you can not only send instructions to some application on the host, but also, for example, download files of any size in small portions without blocking the thread, which is important.

Using this library, I was able to successfully run a script on a remote server that monitors file system changes, simultaneously reading its output and sending various commands. In general: a very valuable experience adapting the C-style library for a modern C ++ project that uses Boost.

I would be happy to read the advice of more experienced users of Boost.Asio to learn more and improve your solution :-).

Source: https://habr.com/ru/post/430488/


All Articles