📜 ⬆️ ⬇️

State Library



It so happened that on one project it was necessary to reform the way data is exchanged between different processes. Historically, the pattern was rather unsightly. One process periodically overwrites its current settings as an XML file. The second one was reading this file once a second, checking what was changed in it from the last time. File changes were calculated through many comparisons of the current and past of its states, generating some chain of actions. The reading process wrote in turn another XML file that was read by the third process, etc. The saddest thing is that this scheme required a cumbersome, from time to time repeating code of comparisons, which overlapped when adding new data.

The idea of ​​replacing all of this zoo of XML files with a messaging system that supports pub / sub was suggested. Three candidates were actively considered: NATS , Redis and ZeroMQ . Since it was planned to exchange not only metadata, but also a large amount of binary data in real time, the maximum throughput at the edge of the corner became. For this reason, we had to weed out the first two candidates, despite their higher-level and convenient broker-based API (tests showed that NATS gives Redis a head start, but ZeroMQ loses about 20%).

Next was the question of how to synchronize state between processes. The following scheme seemed logical:
')
  1. Clients after connecting to the server deduct its full status.
  2. Further, when the state changes, the server publishes patches (changes) to which clients are subscribed.
  3. When a patch is received, the client calls handlers corresponding to the changes (events) in the patch, and then imposes it on the previous state of the server.

This scheme perfectly fit the use of JSON Patch , which made it possible not to reinvent the wheel to generate and apply patches. Thus, the JSON library, which has built-in support for JSON Patch, became the ideal basis for our library to sync state.


So, after a couple of weeks of work, a small library was written, which included the following communication primitives:


  1. Publisher is a simple wrapper over a PUB socket.
  2. Subscriber is a wrapper over a SUB socket that allows asynchronously processing notifications in a dedicated stream.
  3. Requester - a wrapper over the REQ-board that allows you to send a request asynchronously and process the response in a dedicated stream.
  4. Replier - a wrapper over the REP-socket, allowing to process incoming requests in a dedicated stream.

On the basis of these primitives, Client and Server were implemented, allowing to synchronize the state, as well as assign callbacks to its specific changes.


Sample code and its output
#include <chrono> #include <map> #include <string> #include <vector> #include "syncer.h" using namespace nlohmann; using namespace std; using namespace std::chrono; using namespace syncer; struct Site { int temperature; int pressure; }; static inline void to_json(json& j, const Site& s) { j = json(); j["temperature"] = s.temperature; j["pressure"] = s.pressure; } static inline void from_json(const json& j, Site& s) { s.temperature = j.at("temperature").get<int>(); s.pressure = j.at("pressure").get<int>(); } struct State { map<string, Site> sites; string forecast; }; static inline void to_json(json& j, const State& s) { j = json(); j["sites"] = s.sites; j["forecast"] = s.forecast; } static inline void from_json(const json& j, State& s) { s.sites = j.at("sites").get<map<string, Site>>(); s.forecast = j.at("forecast").get<string>(); } PatchOpRouter<State> CreateRouter() { PatchOpRouter<State> router; router.AddCallback<int>(R"(/sites/(\w+)/temperature)", PATCH_OP_ANY, [] (const State& old, const smatch& m, PatchOp op, int t) { cout << "Temperature in " << m[1].str() << " has changed: " << old.sites.at(m[1].str()).temperature << " -> " << t << endl; }); router.AddCallback<Site>(R"(/sites/(\w+)$)", PATCH_OP_ADD, [] (const State&, const smatch& m, PatchOp op, const Site& s) { cout << "Site added: " << m[1].str() << " (temperature: " << s.temperature << ", pressure: " << s.pressure << ")" << endl; }); router.AddCallback<Site>(R"(/sites/(\w+)$)", PATCH_OP_REMOVE, [] (const State&, const smatch& m, PatchOp op, const Site&) { cout << "Site removed: " << m[1].str() << endl; }); return router; } int main() { State state; state.sites["forest"] = { 51, 29 }; state.sites["lake"] = { 49, 31 }; state.forecast = "cloudy and rainy"; Server<State> server("tcp://*:5000", "tcp://*:5001", state); Client<State> client("tcp://localhost:5000", "tcp://localhost:5001", CreateRouter()); this_thread::sleep_for(milliseconds(100)); cout << "Forecast: " << client.data().forecast << endl; state.sites.erase("lake"); state.sites["forest"] = { 50, 28 }; state.sites["desert"] = { 55, 30 }; state.forecast = "cloudy and rainy"; server.Update(state); this_thread::sleep_for(milliseconds(100)); return 0; } 

The result of this code is the following output:


Site added: forest (temperature: 51, pressure: 29)
Site added: lake (temperature: 49, pressure: 31)
Forecast: cloudy and rainy
Temperature in forest has changed: 51 -> 50
Site removed: lake
Site added: desert (temperature: 55, pressure: 30)

Of course, the chosen approach is far from optimal in terms of performance, since it generously allocates threads for individual sockets, instead of using Epoll. Therefore, it will be poorly suited for systems requiring a large number of simultaneous connections. Hopefully, for most cases this is uncritical.


So, the opportunity to greatly simplify most of the interprocess communication. It will not be so easy to do for legacy-code, since manual checks of changes are strongly mixed with the rest of the functionality, and therefore you have to cut it “alive”. On the other hand, to implement synchronization for the new code became one pleasure.

Source: https://habr.com/ru/post/340654/


All Articles