Prehistory
In the process of developing a single B2B project, it became necessary to access our application from various systems like 1C, Oracle, MS Sql Server. The first "universal" option that came to mind is to use web services. But, unfortunately, the above-mentioned systems have a slightly different understanding of this standard (for example, not the fact that the same oracle and 1C will understand each other), besides, we did not want to inflate the project using heavy third-party libraries (at that time we used boost and Intel TBB, we still didn’t want to attract Qt or gSoap).
Therefore, it was decided to write your bike.
The aforementioned systems have a mechanism for extending the capabilities of internal languages ​​with the help of dll-plug-ins, the so-called external components. The idea of ​​using them to communicate with the outside world is not new - there have already been articles on this topic on Habré:
habrahabr.ru/post/149956habrahabr.ru/company/Centrobit/blog/165441habrahabr.ru/post/163859So, we decided to use external components to solve the problem. Then we had to decide in what way, moreover, preferably cross-platform (i.e., so that the component and the application could be assembled for different platforms by simple recompilation), we will associate the component with our application. The easiest thing would be to not bother and find ready IPC solutions on this topic (QLocalSocket in Qt, Named Pipes in ACE), but again, for reasons of reducing the number of libraries used, it was decided to write my own small bicycle.
The ideal solution would be boost :: interprocess :: message_queue, but it was not a simple queue, but a priority queue, which somewhat reduced performance (the desire for high performance was also one of the reasons why we abandoned web services). In addition, we needed some kind of client-server solution.
We refused from sockets through loopback right away - they have too much overhead in windows (see
habrahabr.ru/post/81067 ). Therefore, we began to look towards the named pipes windows and domain sockets in unix.
Part 1. We make TZ
In fact, the meaning of our external components was only to receive some xml message from the system and send it to the channel (of course, the communication should be two-way, but we did not always implement the mechanism for sending messages to the system through the components). Therefore, we decided that we would not dive into the wilds of asynchronous work with channels, and we would only have to implement the usual blocking read-write operations (and processing clients). The requirements for read-write functions were as follows:
- Functions must take two parameters. Void * pointer to the buffer to write to or read from and the size of the data to read / send.
- In case of any error (for example, the number of bytes received or sent is not equal to the parameter passed), the function should throw an exception std :: runtime_error (yes, exception handling slows down performance, you could use error codes)
- Within the framework of one class, both work with server channels (acceptance) and work with client channels is realized. In the case of creating a server channel, read-write functions should not work, but should throw exceptions.
The last point requires an explanation - the server channel is only needed to perform a locking client wait method in a cycle, the return value of which is a channel object with a connected client (according to this ideology, the accept function for POSIX sockets works. In windows API, however, with channels on the contrary, in the case of blocking work, the waiting function simply stops the stream until a client appears at that end of the channel, then it needs to be processed, and for new clients to create a new listening channel, etc.)
As a result, the following abstract INamedPipe class was composed from which the WinNamedPipe and POSIXNamedPipe classes are then inherited.
#pragma once #include <stdexcept> class INamedPipe { protected: std::string _name; bool _server; INamedPipe(){}; virtual void internalReadBytes(void* buf,size_t size)=0; virtual void internalWriteBytes(const void* buf,size_t size)=0; virtual void internalFlush()=0; public: INamedPipe(const std::string prefix, const std::string& name, bool server):_name(prefix),_server(server) { _name.append(name); } void ConnectOrOpen() { if(_server) open(); else connect(); } virtual void ReadBytes(void* buf,size_t size) { if(!_server) { if(size<1) throw std::out_of_range("Size is 0 or less"); internalReadBytes(buf,size); } else throw std::runtime_error("This operation is not supported on server pipe"); } virtual void WriteBytes(const void* buf,size_t size) { if(!_server) { if(size<1) throw std::out_of_range("Size is 0 or less"); internalWriteBytes(buf,size);
The INamedPipe constructor takes three parameters — the path prefix where the channel will be located in the system, the name of the channel, and the parameter indicating the type of the channel — client or server (it was decided to combine both types in the same class).
The ConnectOrOpen () function calls the required method for opening a channel depending on its type (for the server, open (), for the client, connect ()).
Part 2. We write windows-implementation of WinNamedPipe
Before implementation, we decided to google examples. First of all, tutorials from MSDN (
msdn.microsoft.com/ru-ru/library/windows/desktop/aa365592 ,
msdn.microsoft.com/ru-ru/library/windows/desktop/aa365588 ) and documentation in Russian -
www. frolov-lib.ru/books/bsp/v27/ch2_3.htm . With the help of these articles, the first version of the WinNamedPipe class was written. After some time, we found a good article on writing a class of working with named channels using the example of creating an external component for MetaTrader (
www.mql5.com/ru/articles/115 ), which we also recommend reading.
In the WinNamedPipe constructor, a prefix is ​​already set - according to MSDN requirements, the full path to the named pipe should look like "\\ computer_name \ pipe \ channel_name" and if it works with the named pipe on the local machine, it looks like \\. \ Pipe \ pipe_name (yes , named pipes in Windows allow you to share information over the network).
The class also has a constructor that accepts a HANDLE type parameter as an input - a number that identifies some structure or object in the operating system (using this constructor, we can create a new instance of the WinNamedPipe class knowing only HANDLE of an already existing named pipe). This constructor will be used in the WaitForConnection () function (see below)
The open () implementation looks like this:
void WinNamedPipe::open(){ _hPipe = CreateNamedPipe( (LPSTR)_name.data(), // PIPE_ACCESS_DUPLEX, // PIPE_TYPE_BYTE | // PIPE_WAIT, // PIPE_UNLIMITED_INSTANCES, // BUFFER_PIPE_SIZE, // BUFFER_PIPE_SIZE, // 0, // - (0=) NULL); // – if (_hPipe == INVALID_HANDLE_VALUE) { THROW_LAST_ERROR("CreateNamedPipe failed"); }
Note that the buffer size BUFFER_PIPE_SIZE we set at the very beginning of the file. In fact, its size does not affect the maximum size of the transmitted data - in our project we successfully transferred pieces that were several times larger than the buffer.
Special attention should be paid to the first parameter: (LPSTR) _name.data ()
Since the name of the channel in our class is stored in std :: string, for its correct transfer to WinAPI, the functions must be set in the project properties in Visual Studio “Use Multi-Byte character Set” (for more details see article
habrahabr.ru/post/164193 ) .
A detailed description of the parameters of the CreateNamedPipe function can be found in MSDN (
msdn.microsoft.com/en-us/library/windows/desktop/aa365150 and at
www.frolov-lib.ru/books/bsp/v27/ch2_3.htm ).
Client connection to the channel is performed using the connect () function, which has the following implementation:
void WinNamedPipe::connect() for(;;) { WaitNamedPipe((LPSTR)_name.data(), NMPWAIT_WAIT_FOREVER); _hPipe = CreateFile( (LPSTR)_name.data(), // GENERIC_READ | // GENERIC_WRITE, 0, NULL, // OPEN_EXISTING, // 0, NULL); // Break if the pipe handle is valid or error!=232 if (_hPipe != INVALID_HANDLE_VALUE||GetLastError() != ERROR_PIPE_BUSY) break; } if (_hPipe == INVALID_HANDLE_VALUE) THROW_LAST_ERROR("Could not open pipe"); DWORD dwMode = PIPE_TYPE_BYTE; BOOL fSuccess = SetNamedPipeHandleState( _hPipe, // &dwMode, // - NULL, // NULL); // if ( ! fSuccess) { THROW_LAST_ERROR("SetNamedPipeHandleState failed"); } }
First, with the help of the WaitNamedPipe function, we get up to the “eternal” waiting for a free instance of the server channel, then with the help of CreateFile we connect to it. After that, we check if the connection is successful. If the pointer to the channel object is valid (or an error other than ERROR_PIPE_BUSY has occurred - an error indicating that there is no free channel), we exit the loop and set up the channel operation mode we need. Otherwise, the cycle repeats until it connects normally, or an error other than ERROR_PIPE_BUSY occurs. If the server named pipe with the given name does not exist, then WaitNamedPipe will end immediately, GetLastError will return error code 2 (The system cannot find the file specified) and connect () will throw an exception.
According to our idea, WaitForConnection () should return a pointer to the new WinNamedPipe, to which the client is already connected and to which you can use the read-write functions. However, in Windows, the standard client acceptance mechanism is different — the ConnectNamedPipe function (which receives the created channel as an input) simply blocks the stream until a client appears at the other end of the channel. Then, in order not to lose other clients, you need to create a new listening channel and transfer it to this function, etc. Therefore, to implement our ideas, we have to call open () again from WaitForConnection.
The WaitForConnection () function looks like this:
WinNamedPipe* WinNamedPipe::WaitForConnection() { if(_server) { DWORD error; if (ConnectNamedPipe(_hPipe, NULL)||(error=GetLastError())==ERROR_PIPE_CONNECTED) { HANDLE client=_hPipe; open(); return new WinNamedPipe(client); } else { THROW_LAST_ERROR("WaitForConnection failed"); } } else { throw std::runtime_error("WaitForConnection is not supported on server pipe\n"); } }
We note an important point - if there are a lot of client channels created, it is possible that the client will connect to the channel even before the call to ConnectNamedPipe. In this case, GetLastError will return error code 535 (but this will not be an error)
The read and write functions are both simple and straightforward without additional comments:
void WinNamedPipe::internalReadBytes(void* buf,size_t size) { DWORD cbBytesRead = 0; BOOL fSuccess = FALSE; // fSuccess = ReadFile( _hPipe, // buf, // - size, // &cbBytesRead, // , NULL); // if (!fSuccess || cbBytesRead == 0 ||cbBytesRead!=size) { if (GetLastError() == ERROR_BROKEN_PIPE) { THROW_LAST_ERROR("pipe disconnected"); } else { THROW_LAST_ERROR("read failed"); } } } void WinNamedPipe::internalWriteBytes(const void* buf,size_t size) { DWORD cbWritten; BOOL fSuccess = FALSE; fSuccess = WriteFile( _hPipe, // buf, // size, // &cbWritten, // NULL); // if (!fSuccess || size != cbWritten) { THROW_LAST_ERROR("WriteFile failed"); } }
The function Close () should dwell in more detail. To close the server channel and inform the client at the other end of it, use the DisconnectNamedPipe (HANDLE pipe). After that, it can again expect new customers, and you can tell the operating system that it is not needed using the CloseHandle (HANDLE pipe). CloseHandle can also be called on client channels, but this method should only be called once — either on the server pipe with the client connected, or on the client. Therefore, it was decided that we would call DisconnectNamedPipe and CloseHandle only on server instances of the channels, and on the client’s Close, it would be a dummy method.
void WinNamedPipe::Close() { if(_server||_server_with_client) { DisconnectNamedPipe(_hPipe); CloseHandle(_hPipe); //May throw an exception if a debugger is attached to the process and handle is not valid } }
You should also remember that when sending or receiving large amounts of data, the client must notify the opposite party that all data has been read, and the server should not close the channel before receiving this notification.
To reduce copy-paste, a macro was created that gets GetLastError and throws exceptions:
#define THROW_LAST_ERROR(e){ \ int error=GetLastError(); \ std::stringstream err; \ err<<e<<", GLE="<<error; \ throw std::runtime_error(err.str().data()); \ }
')
Part 3. We write unix-implementation - PosixNamedPipe
To implement the posix-compatible part of the library, we decided to use Unix domain sockets (for more information about working with sockets in unix, read here -
wiki.linuxformat.ru/index.php/LXF83 : Unix_API).
Unlike windows, a pointer file to a local socket can be located anywhere in the file system. We decided to pass the path / tmp / as a prefix. It should be noted that if you pass an empty string as a prefix, the client will not be able to connect to the socket and will return a file not found error. In addition, if a file with the name of a local socket exists before the function that creates a local socket, then no client can connect to the server (Connection refused error). Therefore, before creating a socket, you should check the presence of this file and try to delete it. Hence the idea of ​​a fixed prefix of the path arose - it is necessary to somehow eliminate the potential vulnerability, which makes it possible to wipe the file anywhere in the system (and here it would be nice to recall the access rights and use of the basename function). It also follows from the fact that there are no more than one server copies of the PosixNamedPipe class with the same name — only one instance of the class must correspond to each name to work correctly.
The open () function code looks like this:
void PosixNamedPipe::open() { sock= socket(AF_UNIX, SOCK_STREAM, 0); if(sock == -1) { THROW_ERROR("Create_socket failed: "); } unlink(_name.c_str()); desc.sun_family = AF_UNIX; strcpy(desc.sun_path, _name.c_str()); if (bind(sock, (sockaddr*)&desc, sizeof(struct sockaddr_un)) == -1) { THROW_ERROR("Connection failed(bind): "); } if (listen(sock,SOMAXCONN) == -1) { THROW_ERROR("Connection failed(listen): "); } }
The connect () function is almost identical:
void PosixNamedPipe::connect() { sock= socket(AF_UNIX, SOCK_STREAM, 0); if(sock == -1) { THROW_ERROR("Create_socket failed: "); } desc.sun_family = AF_UNIX; strcpy(desc.sun_path, _name.c_str()); if (::connect(sock, (sockaddr*)&desc, sizeof(struct sockaddr_un)) == -1) { THROW_ERROR("Connection failed(connect): "); } }
In order to write WaitForConnection () according to our TK, unlike the windows version of the code, it did not even have to be perverted.
PosixNamedPipe* PosixNamedPipe::WaitForConnection() { int client=accept(sock,NULL,NULL); if(client!=-1) return new PosixNamedPipe(client); else { THROW_ERROR("Accept error: "); } }
The constructor called in it is also trivial:
PosixNamedPipe::PosixNamedPipe(int pipe) { sock=pipe; _server=false; memset(&desc, 0, sizeof(struct sockaddr_un)); }
Here it should be noted that for the bind and :: connect functions to work correctly, it is recommended to reset an instance of the sockaddr_un structure before initializing its fields, and therefore a line memset (& desc, 0, sizeof (struct sockaddr_un)) was added to the class constructors.
The read-write functions for sockets are also quite simple:
void PosixNamedPipe::internalReadBytes(void* buf,size_t size) { size_t ret=-1; if ((recv(sock, buf, size, MSG_WAITALL)) == -1) { THROW_ERROR("Error while reading: "); } } void PosixNamedPipe::internalWriteBytes(const void* buf,size_t size) { size_t ret=-1; if ((ret = send(sock, buf, size, 0)) == -1||ret!=size) { THROW_ERROR("Error while sending: "); } }
In the channel closing function, just in case, the file is deleted when the server instance of the class is closed (and the call to the closing function is also automatically performed in the destructor):
void PosixNamedPipe::Close() { if(_server) unlink(desc.sun_path); close(sock); } PosixNamedPipe::~PosixNamedPipe() { this->Close(); }
Part 4. We write a multi-threaded server for processing clients
So, classes for working with named pipes are created. Now we will do a little trick not to write #ifdef _WIN32 and similar things when creating instances of these classes in our applications:
//NamedPipe.h #pragma once #ifdef _WIN32 #include typedef WinNamedPipe NamedPipe; #else #include <unistd.h> #include typedef PosixNamedPipe NamedPipe; #endif
Now in any project we do #include "NamedPipe.h" and we can write code of the form NamedPipe * client = new NamedPipe ("NamedPipe.h", 0) without worrying about which operating system our application will be built for.
For processing incoming requests from external components in our project a simple multithreaded class server based on the thread pool pattern was written using the boost :: thread and Intel TBB libraries. The main idea was this - in one thread in an infinite loop, the call is called WaitForConnection (), and the pointers to Named objects with connected clients are added to the tbb :: concurrent_queue queue, from where they are taken by the threads that are directly occupied by reading-writing and processing incoming messages.
The code of the function involved in receiving incoming connections is as follows:
void NamedPipeServer::run() { NamedPipe* client; _pipe=new NamedPipe(_name, 1); try { _pipe->ConnectOrOpen(); } catch(const std::exception& e) { } while(_active) { try { client=_pipe->WaitForConnection(); _clients.push(client); } catch(const std::exception& e) { } } delete _pipe; }
Note that in place of the empty brackets after the catch, we had a call for logging macros, but in the code rewritten for this article, these calls were removed so as not to overload our library with dependencies.
In our plan, work with clients should be described in the handleClient (NamedPipe * client) function, which is virtual in the server class and should be redefined in the derived class.
In each thread handler, the following cycle is spinning:
void NamedPipeServer::workerProc() { NamedPipe* client; while(_active) if(_clients.try_pop(client)) { handleClient(client); } else boost::this_thread::sleep(boost::posix_time::milliseconds(100)); }
In the loop, the thread tries to get the client out of the queue and, if successful, calls the handleClient, otherwise it falls asleep for a while in order to avoid idle CPU usage.
To start all threads, the Start () method is called, which calls the startWorkers () method, which creates threads:
void NamedPipeServer::Start() { _active=true; startWorkers(); } void NamedPipeServer::startWorkers() { for (size_t i = 0; i < _thread_pool_size; ++i) { boost::shared_ptr<boost::thread> thread(new boost::thread(boost::bind(&NamedPipeServer::workerProc, this))); _threads.push_back(thread); } boost::shared_ptr<boost::thread> dispatcher(new boost::thread(boost::bind(&NamedPipeServer::run,this))); _threads.push_back(dispatcher); }
Note that the Start () method does not block the thread of execution. To block it until the end of all server threads, use the JoinWorkers method:
void NamedPipeServer::JoinWorkers() { size_t size=_threads.size(); for (std::size_t i = 0; i < size; ++i) _threads[i]->join(); for (std::size_t i = 0; i < size; ++i) _threads[i].reset(); _threads.clear(); }
In practice, in our application it is considered that the running servers work until the application closes. This saved us the trouble of trying to stop the named pipes server threads.
In order to try to stop the server streams, you can write, for example, such Stop () method:
void NamedPipeServer::Stop() { _active=false; this->JoinWorkers(); }
However, when calling this method, the thread will “get up”. The reason for this is the fact that one of the threads is blocked by the WaitForConnection () function.
And the output is possible only after the next client joins us (respectively, after such a connection the client breaks off).
The simplest (but not the best) solution to a possible blocking problem is to create a client named pipe in the Stop function and connect to our server, thereby resetting the WaitForConnection flow lock.
To solve this problem more correctly, we need to change the behavior of the WaitForConnection () function by adding a timeout to it.
The new feature for the Windows version looks like this:
WinNamedPipe* WinNamedPipe::WaitForConnection(unsigned int timeout) { if(_server) { OVERLAPPED lpOverlapped = {0}; lpOverlapped.hEvent = CreateEvent(0,1,1,0); if(ConnectNamedPipe(_hPipe, &lpOverlapped)==0) { if(GetLastError()==ERROR_PIPE_CONNECTED) if (!SetEvent(lpOverlapped.hEvent)) THROW_LAST_ERROR("AsyncWaitForConnection failed"); int result = WaitForSingleObject(lpOverlapped.hEvent,timeout); if (WAIT_OBJECT_0 == result) { HANDLE client=_hPipe; open(); return new WinNamedPipe(client); } else { return NULL; } } else { THROW_LAST_ERROR("AsyncWaitForConnection failed"); } } else { throw std::runtime_error("WaitForConnection is not supported on client pipe\n"); } }
In order for this method to work correctly, you need to change the call to the CreateNamedPipe function in open () as follows:
_hPipe = CreateNamedPipe( (LPSTR)_name.data(), PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, // PIPE_TYPE_BYTE | PIPE_WAIT, PIPE_UNLIMITED_INSTANCES, BUFFER_PIPE_SIZE, BUFFER_PIPE_SIZE, 0, NULL)
The implementation for linux looks like this:
PosixNamedPipe* PosixNamedPipe::WaitForConnection(unsigned int timeout) { int nsock; int retour; fd_set readf; fd_set writef; struct timeval to; FD_ZERO(&readf); FD_ZERO(&writef); FD_SET(sock, &readf); FD_SET(sock, &writef); to.tv_usec = timeout*1000; retour = select(sock+1, &readf, &writef, 0, &to); if (retour == 0) { return NULL; } if ( (FD_ISSET(sock, &readf)) || (FD_ISSET(sock,&writef))) { nsock = accept(sock, NULL, NULL); return new PosixNamedPipe(nsock); } else { throw std::runtime_error("invalid socket descriptor!\n"); } }
In conclusion, we note that the destructor of the server class for correct operation must call the Stop () method and then clear the entire _clients queue if there are any raw clients there. Since the client queue in the server consists of pointers, it must also be manually cleared:
NamedPipeServer::~NamedPipeServer(void) { this->Stop(); while(!_clients.empty()) { if(_clients.try_pop(_pipe)) delete _pipe; } }
Part 5. An example of using the library.
To test the library was written spamming, in an infinite loop creating two threads that are trying to connect to the server and send him 10 lines. After completion of threads, the cycle repeats.
Each stream looks like this:
void spamming_thread() { std::vector<std::string> words; words.push_back(std::string("one ")); words.push_back(std::string("two ")); words.push_back(std::string("three ")); words.push_back(std::string("four ")); words.push_back(std::string("five ")); words.push_back(std::string("six ")); words.push_back(std::string("seven ")); words.push_back(std::string("eight ")); words.push_back(std::string("nine ")); words.push_back(std::string("ten ")); NamedPipe client("NamedPipeTester",0); try { client.ConnectOrOpen(); for(int i=0;i<words.size();++i) { std::cout<<"sending "<<words[i]<<"\n"; size_t size=words[i].size(); client.WriteBytes(&size,sizeof(size)); client.WriteBytes(words[i].data(),size); } client.Close(); } catch(const std::runtime_error &e) { std::cout<<"Exception: "<<e.what()<<"\n"; } }
Now we will write a server that accepts connections:
#pragma once #include "../NamedPipeServer.h" class SimpleServer: public NamedPipeServer { protected: void handleClient(NamedPipe* client); public: SimpleServer(const std::string &name):NamedPipeServer(name,1){}; ~SimpleServer(){}; }; void SimpleServer::handleClient(NamedPipe* client) { for(int i=0;i<10;++i) { try { size_t size; client->ReadBytes(&size,sizeof(size)); if(size>0) { char* message=new char[size]; client->ReadBytes(message,size);
Run it, wait half a minute and try to stop:
SimpleServer* s=new SimpleServer("NamedPipeTester"); s->Start(); boost::this_thread::sleep(boost::posix_time::milliseconds(30000)); delete s; system("pause");
Conclusion
This library has been successfully used for six months in our project. For habr the library code has been corrected and supplemented. However, since the article was written by a novice programmer, errors are possible in the code. The author will be grateful for constructive criticism, comments and suggestions for improvement and for errors found.
This article does not claim to be complete. It does not consider such things as non-blocking reading and writing, setting permissions for named pipes, and an alternative to byte read / write mode — messaging mode.
All the source code for the article can be found at
github.com/xpavlov/libNamedPipe