📜 ⬆️ ⬇️

Redis, hiredis, libev and multithread. Part 1

A problem has appeared: there is a large flow of data from a multitude of clients, which needs to be processed very quickly (preferably in real-time), put in a database and sent to another set of clients, while the data is not structured in tables or documents. The choice of technologies for implementation fell on Redis + C ++.


Choosing a library for Redis


If to glance here , then we see 4 options of libraries for implementation of the client for Redis under this task: hiredis, credis, libredis and C ++ Client.
It would seem that the last is what is needed. But this library is in the status of "expiremental", which immediately hints that you need to be armed with a file, enthusiasm and great patience. And the money is paid not for the amount of labor, but for the result. In addition, the functionality of this client leaves much to be desired.
Libredis . This thing seems to be a plugin for the PHP engine. Does not fit.
Credis . A very simple library that allows you to execute only basic Redis commands, without the ability to build a composite command (this feature is now in their TODO), and in fact there is a data model, and I would like to make atomic several queries at the same time.
So, the choice fell on hiredis - the official client.

Selection of event-oriented library


Following the latest trends of IT-technologies and taking into account the huge number of clients, I thought about the need to use non-blocking connection with Redis, especially this DBMS provides such an opportunity. In turn, hiredis provides adapters for use with 3 event-oriented data processing libraries: these are ae, libevent, and libev.
Here it is important to emphasize that the client for Redis is just a part of the service that needs to be sent to a separate stream and somehow send data to this stream from other streams that handle connections with clients.
Ae . I didn’t understand what kind of library, Google didn’t help me, if anyone knows - please comment with prooflink.
Libevent . No support for versions 2.xx, only 1.xx This fact makes it impossible to use the functionality in a multi-threaded application.
So, the choice fell on libev . Further details.
')

Hiredis + libev + multithread


Articles, as it turned out, there is no under this topic. There are publications from the manufacturers of hiredis on how to implement single-threaded event-oriented data processing (here is the proof ) and there is documentation on libev , which clearly states: multithreading is not supported by default, because there is no unique algorithm to make a thread-safe implementation, but it is possible to create separate watcher with type ev_async, which will receive data from other streams.
I also want to make a reservation right away: I tried several variants of writing the code, but either I don’t have enough knowledge or need to dig deeper into the sources, in general, the best option so far was simply to copy the provided adapter for libev into my project and add the necessary functionality.
So. To begin with, add another watcher to the structure of the redisLibevEvents event:
typedef struct redisLibevEvents { redisAsyncContext *context; struct ev_loop *loop; int reading, writing; ev_io rev, wev; ev_async aev; //    watcher      . } redisLibevEvents; 

Further. Add a callback function to handle events coming from other threads:
 void redisLibevAsyncEvent(EV_P_ ev_async *watcher, int revents) { //       callback-  watcher-  ev_io. #if EV_MULTIPLICITY ((void)loop); #endif ((void)revents); redisLibevEvents *e = (redisLibevEvents*)watcher->data; redisAsyncHandleRead(e->context); //    . free((redisLibevEvents*)watcher->data); //   ,       . } 

Further. In the function of binding the context to the event loop, which is called redisLibevAttach, we add the following to the end:
 ev_async_init(&e->aev, redisLibevAsyncEvent); //  watcher. ev_async_start(e->loop, &e->aev); //  . 

Now only one thing is missing: getting a pointer to the created watcher, since it is needed for sending events. To do this, we process the redisLibevAttach function once more with a file:
 int redisLibevAttach(EV_P_ redisAsyncContext *ac, ev_async **_pEvAsync) { ... *_pEvAsync = &e->aev; return REDIS_OK; } 

Actually after you can already use.
Initialization function for the event loop:
 m_pRedisAsyncContext = redisAsyncConnect(m_strIP, m_nDBPort); //  . m_pEventLoop = ev_loop_new(EVFLAG_AUTO); //    . redisLibevAttach(m_pEventLoop, m_pRedisAsyncContext, &m_pAEV); //     . redisAsyncSetConnectCallback(m_pRedisAsyncContext, redisConnectCallbackFunction); //  callback     Redis. redisAsyncSetDisconnectCallback(m_pRedisAsyncContext, redisDisconnectCallbackFunction); //  callback  . 

Stream function:
 ev_loop(m_pEventLoop, 0); //    . 

Well, the function of adding a new event:
 //   ,   (   )   . redisAsyncCommand(m_pRedisAsyncContext, redisAsyncCommandCallbackFunction, _pPrivData, _pcQuery); //     (_pcQuery)      (_pPrivData)  callback-   . redisLibevEvents * pRedisLibevEvent = (redisLibevEvents*)malloc(sizeof(redisLibevEvents)); //    . pRedisLibevEvent->context = m_pRedisAsyncContext; //    . m_pAEV->data = pRedisLibevEvent; //  watcher-   . ev_async_send(m_pEventLoop, m_pAEV); //    . //  /. 


Well that's all. The resulting mixture of C and C ++ is quite viable, although it does not look very nice.
I hope someone will come in handy until the official valid implementations in C ++ come out.

PS: Read the continuation in the second part .

Source: https://habr.com/ru/post/131916/


All Articles