They threw the task of making microservice, which receives data from RabbitMQ, processes it, and sends the data further down the stage to RabbitMQ. After sending the job, I looked at what I learned. It turned out that this set of components can be used for rapid prototyping of the pipeline architecture.
Components Used:
For example, I will do microservice to issue a rating of players. The following messages come from the kernel of the system to microservice:
The service should send a message with the contents of the rating once a minute. The rating is sorted by points for the calendar week.
REACT-CPP is a wrapper over libev in C ++ 11. This library is needed to organize the event loop. Since in addition to working with a socket, timers and unix signal handlers are required.
class Application { public: Application(); ~Application(); using IntervalWatcherPtr = std::shared_ptr<React::IntervalWatcher>; void run(); void shutdown(); //... private: bool onMinute(); //... private: React::MainLoop m_loop; IntervalWatcherPtr m_minuteTimer; //... };
void Application::run() { m_minuteTimer = m_loop.onInterval(5.0, 60.0, std::bind(&Application::onMinute, this)); m_loop.onSignal(SIGTERM, [this]() -> bool { shutdown(); return false; }); m_loop.onSignal(SIGUSR1, [this]()->bool{ cleanRating(); return true; }); //... m_loop.run(); } bool Application::onMinute() { calculateRating(); sendRating(); return true; }
Here I create a timer that starts in 5 seconds and which the handler will call every 60 seconds. Any decent daemon / service must have a SIGTERM handler to ask it to exit correctly from the outside. As for the SIGUSR1 handler, you can independently calculate the beginning / end of the week via Boost.Date_Time , but I am too lazy when there is a cron + pkill in GNU / Linux.
Since I published RabbitMQ tutorials on C ++, AMQP-CPP has acquired the implementation of a handler on libev and libuv.
Connection and processing of the message:
void Application::createChannel(AMQP::TcpConnection &connection) { m_channel = std::make_unique<AMQP::TcpChannel>(&connection); m_channel->declareQueue(m_cfg.source().name, AMQP::durable) .onSuccess([&](const std::string &name, uint32_t messagecount, uint32_t consumercount) { LOG(INFO) << "Declared queue " << name << ", message count: " << messagecount; m_channel->consume(m_cfg.source().name) .onReceived([&](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { onMessage(message, deliveryTag, redelivered); }) .onError([](const char *message) { LOG(ERROR) << "Error consume:" << message; APP->shutdown(); }); }) .onError([&](const char *message) { LOG(ERROR) << "Error declare queue:" << message; shutdown(); }); } void Application::onMessage(const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { parseMessage(message); m_channel->ack(deliveryTag); }
Post publication:
AMQP::Envelope env(s.GetString()); m_channel->publish("", m_cfg.destination().name, env);
May require local data storage. I took LelevDB, I wrote about it in Using LevelDB . Made only a small RAII wrapper:
class DataBase { public: DataBase(); bool open(const std::string &path2base, bool compression = true); bool put(const std::string &key, const ByteArray &value, bool sync = false); ByteArray get(const std::string &key); Snapshot snapshot(); Iterator iterator(); private: std::shared_ptr<leveldb::DB> m_backend; }; class Snapshot { public: Snapshot(); ~Snapshot(); ByteArray get(const std::string &key); Iterator iterator(); private: Snapshot(const std::weak_ptr<leveldb::DB> &backend, const leveldb::Snapshot *snapshot); private: friend class DataBase; std::weak_ptr<leveldb::DB> m_backend; const leveldb::Snapshot *m_shapshot; }; class Iterator { public: Iterator(std::unique_ptr<leveldb::Iterator> rawIterator); Iterator(Iterator &&iter); /*! * Create empty iterator */ Iterator() = default; ~Iterator(); bool isValid() const noexcept; void next(); void prev(); std::string key(); ByteArray value(); /*! * Seek to first */ void toFirst(); /*! * Seek to last */ void toLast(); Iterator(const Iterator &) = delete; Iterator &operator=(const Iterator &) = delete; private: std::unique_ptr<leveldb::Iterator> m_iterator; };
LevelDB is used to save / restore state.
void Application::loadFromLocalStorage() { auto snapshot = m_localStorage->snapshot(); auto iter = snapshot.iterator(); iter.toFirst(); while (iter.isValid()) { auto player = new Player(iter.value()); m_id2player[player->id] = player; m_players.push_back(player); iter.next(); } } void Application::updatePlayerInBD(const Player *player) { if (!m_localStorage->put(std::to_string(player->id), player->serialize())) { LOG(ERROR) << "[" << player->id << ", " << player->name << "] is not updated in the database"; } }
Data comes in JSON format. Parses json using RapidJSON , looking for a suitable method, calling the appropriate handler:
void Application::parseMessage(const AMQP::Message &message) { /* * * { * "method":"player_registered", * "params":{ * ... * } * } */ rapidjson::Document doc; doc.Parse(message.body(), message.bodySize()); const std::string method = doc["method"].GetString(); auto iter = m_handlers.find(method); if (iter != m_handlers.end()) { iter->second(*this, doc["params"]); } else { LOG(WARNING) << "Unknown method:" << method; } }
The methods themselves are simple:
void Application::onPlayerRegistered(const JValue ¶ms) { auto obj = params.GetObject(); const uint64_t playerId = obj["id"].GetUint64(); if (!isRegistred(playerId)) { auto player = new Player; player->id = playerId; player->name = obj["name"].GetString(); m_players.push_back(player); m_id2player[playerId] = player; updatePlayerInBD(player); } } void Application::onPlayerRenamed(const JValue ¶ms) { auto obj = params.GetObject(); const uint64_t playerId = obj["id"].GetUint64(); if (isRegistred(playerId)) { auto player = m_id2player[playerId]; player->name = obj["name"].GetString(); updatePlayerInBD(player); } else { LOG(WARNING) << "Renaming an unknown user[" << playerId << "]"; } } void Application::onPlayerWon(const JValue ¶ms) { auto obj = params.GetObject(); const uint64_t playerId = obj["id"].GetUint64(); if (isRegistred(playerId)) { auto player = m_id2player[playerId]; player->points += obj["points"].GetInt64(); updatePlayerInBD(player); } else { LOG(WARNING) << "Unknown player[" << playerId << "]"; } }
Once a minute we sort the players and send the rating:
bool Application::onMinute() { calculateRating(); sendRating(); return true; } void Application::calculateRating() { std::sort(m_players.begin(), m_players.end(), [](const Player *a, const Player *b) { return a->points > b->points; }); } void Application::sendRating() { using namespace rapidjson; StringBuffer s; Writer<StringBuffer> writer(s); writer.StartArray(); const size_t count = std::min(m_players.size(), size_t(10)); for (size_t i = 0; i < count; ++i) { writer.StartObject(); writer.Key("id"); writer.Uint64(m_players[i]->id); writer.Key("name"); writer.String(m_players[i]->name.c_str()); writer.Key("points"); writer.Int64(m_players[i]->points); writer.EndObject(); } writer.EndArray(); AMQP::Envelope env(s.GetString()); m_channel->publish("", m_cfg.destination().name, env); }
All code is available on GitHub . Library sources are shipped with the service and automatically compiled to GNU / Linux with gcc.
Let's sum up what we have:
Source: https://habr.com/ru/post/315268/
All Articles