📜 ⬆️ ⬇️

Dispatcher of arbitrary messages based on google protocol buffers

A free day appeared, and I decided to play around with the google :: protobuf library. This library provides the ability to encode and decode structured data. On the basis of this library, I will build a simple dispatcher that can process any messages. The uniqueness of this dispatcher is that it will not know the types of transmitted messages, and will only process messages using registered handlers.

Brief description of the protobuf library


So, first briefly consider the google :: protobuf library, it comes in the form of two components:
actually, the library itself + header files
* .proto file compiler - generates a C ++ class from the message description (there is also the possibility of generation for other programming languages: Java, Python, etc.)
The message description is created in a separate file, from which the class will be generated, the syntax is very simple:
package sample.proto; message ServerStatusAnswer { optional int32 threadCount = 1; repeated string listeners = 2; } 
Here we describe the ServerStatusAnswer message, which has two optional fields:
For example, the following message satisfies this description:
 ServerStatusAnswer { threadCount = 3 listeners = { "one", "two" } } 
In fact, the protobuf format is binary, here I gave the message in a readable format only for convenience of perception

The compiler automatically generates C ++ code for serialization and deserialization of such messages. The protobuf library also provides additional features: serialization to a file, to a stream, to a buffer.

I am using CMake as a build system, and it already has protobuf support:
 cmake_minimum_required(VERSION 2.8) project(ProtobufTests) find_package(Protobuf REQUIRED) include_directories(${PROTOBUF_INCLUDE_DIRS}) include_directories(${CMAKE_CURRENT_BINARY_DIR}) #... set (ProtobufTestsProtoSources Message.proto ServerStatus.proto Echo.proto ) #... PROTOBUF_GENERATE_CPP(PROTO_SRCS PROTO_HDRS ${ProtobufTestsProtoSources}) add_executable(ProtobufTests ${ProtobufTestsSources} ${PROTO_SRCS} ${PROTO_HDRS}) target_link_libraries(ProtobufTests #... ${PROTOBUF_LIBRARY} ) 
PROTOBUF_GENERATE_CPP - this macro calls the protoc compiler for each * .proto file, and generates the corresponding cpp and h files that are added to the assembly.
Everything is done automatically, and no extra squats should be done (Under * nix, an additional package of Threads and the corresponding linker flag may be needed).
')

Manager Description


I decided to try to write a message dispatcher, which accepts a message, calls the appropriate handler and sends a response to the received message. In this case, the dispatcher does not need to know the types of messages transmitted to it. This may be necessary if the dispatcher adds or removes the appropriate handlers in the process of work (for example, by loading the appropriate extension model, * .dll, * .so).

In order to process arbitrary messages, we must have a class that processes an abstract message. Obviously, if we have descriptions of messages in the * .proto file, the compiler will generate the corresponding classes for us, but unfortunately all of them will be inherited from google :: protobuf :: Message. In this class, it is problematic to pull out all the data from the message (in principle it can be done, but then we will do a lot of extra work), besides we will not know how we form the answer.
The following statement comes to the rescue: "Any problem can be solved by introducing an additional level of abstraction, apart from the problem of too many levels of abstraction."
We need to separate the definition of the message type from the message itself, we can do this in the following way:
 package sample.proto; message Message { required string id = 1; optional bytes data = 2; } 
We will pack our message inside another message:Thus, our dispatcher will search for the corresponding message handler in the id field:
 #ifndef MESSAGEDISPATCHER_H #define MESSAGEDISPATCHER_H #include <map> #include <stdexcept> #include <boost/noncopyable.hpp> #include <boost/smart_ptr/shared_ptr.hpp> #include "message.pb.h" class MessageProcessingError: public std::runtime_error { public: MessageProcessingError(const std::string & e): std::runtime_error(e) { } }; class MessageProcessorBase: private boost::noncopyable { public: virtual ~MessageProcessorBase() { } virtual std::string id() const = 0; virtual sample::proto::Message process(const sample::proto::Message & query) = 0; }; typedef boost::shared_ptr<MessageProcessorBase> MessageProcessorBasePtr; class MessageDispatcher { public: MessageDispatcher(); void addProcessor(MessageProcessorBasePtr processor); sample::proto::Message dispatch(const sample::proto::Message & query); typedef std::map<std::string, MessageProcessorBasePtr> DispatcherImplType; const DispatcherImplType & impl() const; private: DispatcherImplType mImpl; }; #endif // MESSAGEDISPATCHER_H 
But now we get that each handler must unpack the message sample :: proto :: Message into its own message. And this process will be duplicated for each such processor. We want to avoid duplication of code, so let's take the Type Erasure pattern. This pattern allows you to hide the type of the processed entity behind a common interface, but each handler will work with a specific type known only to it.

So, the implementation is very simple:
 template <typename ProtoQueryT, typename ProtoAnswerT> class ProtoMessageProcessor: public MessageProcessorBase { public: virtual sample::proto::Message process(const sample::proto::Message & query) { ProtoQueryT underlyingQuery; if (!underlyingQuery.ParseFromString(query.data())) { throw MessageProcessingError("Failed to parse query: " + query.ShortDebugString()); } ProtoAnswerT underlyingAnswer = doProcessing(underlyingQuery); sample::proto::Message a; a.set_id(query.id()); if (!underlyingAnswer.SerializeToString(a.mutable_data())) { throw MessageProcessingError("Failed to prepare answer: " + underlyingAnswer.ShortDebugString()); } return a; } private: virtual ProtoAnswerT doProcessing(const ProtoQueryT & query) = 0; }; 
We define the process virtual function, but also add the doProcess virtual function, which already works with our specific messages ! This technique is based on the template instantiation mechanism: types are substituted at the time of actual use of the template, and not at the time of the declaration. And since this class is inherited from MessageProcessorBase, we can safely transfer the heirs of this class to our dispatcher. It should also be noted that this class serializes and deserializes our specific messages and throws exceptions in case of errors.

And finally, I will give an example of using this dispatcher, let's say we have two types of messages:
 package sample.proto; message ServerStatusQuery { } message ServerStatusAnswer { optional int32 threadCount = 1; repeated string listeners = 2; } 

 package sample.proto; message EchoQuery { required string msg = 1; } message EchoAnswer { required string echo = 1; } 
As the description shows, these messages request the server for its internal state (ServerStatus), and simply return the received request (Echo). The implementation of the handlers themselves is trivial, I will only give the implementation of ServerStatus:
 #ifndef SERVERSTATUSMESSAGEPROCESSOR_H #define SERVERSTATUSMESSAGEPROCESSOR_H #include "MessageDispatcher.h" #include "ServerStatus.pb.h" class ServerStatusMessageProcessor: public ProtoMessageProcessor<sample::proto::ServerStatusQuery, sample::proto::ServerStatusAnswer> { public: typedef sample::proto::ServerStatusQuery query_type; typedef sample::proto::ServerStatusAnswer answer_type; ServerStatusMessageProcessor(MessageDispatcher * dispatcher); virtual std::string id() const; private: MessageDispatcher * mDispatcher; virtual answer_type doProcessing(const query_type & query); }; #endif // SERVERSTATUSMESSAGEPROCESSOR_H 
The implementation itself:
 #include "ServerStatusMessageProcessor.h" using namespace sample::proto; ServerStatusMessageProcessor::ServerStatusMessageProcessor(MessageDispatcher * dispatcher) : mDispatcher(dispatcher) { } std::string ServerStatusMessageProcessor::id() const { return "ServerStatus"; } ServerStatusAnswer ServerStatusMessageProcessor::doProcessing(const ServerStatusQuery & query) { ServerStatusAnswer s; s.set_threadcount(10); typedef MessageDispatcher::DispatcherImplType::const_iterator md_iterator; const MessageDispatcher::DispatcherImplType & mdImpl = mDispatcher->impl(); for (md_iterator it = mdImpl.begin(); it != mdImpl.end(); ++it) { s.add_listeners(it->first); } return s; } 
Here's how it works:
 #include "MessageDispatcher.h" #include "ServerStatusMessageProcessor.h" #include "EchoMessageProcessor.h" #include <iostream> #include <boost/smart_ptr/make_shared.hpp> using namespace sample::proto; int main() { try { MessageDispatcher md; md.addProcessor(boost::make_shared<ServerStatusMessageProcessor>(&md)); md.addProcessor(boost::make_shared<EchoMessageProcessor>()); Message q; q.set_id("ServerStatus"); Message ans = md.dispatch(q); std::cout << "query: " << q.DebugString() << std::endl; std::cout << "answer: " << ans.DebugString() << std::endl; } catch (const std::exception & e) { std::cerr << e.what() << std::endl; } return 0; } 

PS For the writing of this article were used:

The example is posted on github

Source: https://habr.com/ru/post/138812/


All Articles