
I first heard about the
MassTransit (MT) library about a year ago from an ex-colleague who came to our office to share experiences. The company, in which he settled, used MT to reduce the connectivity between the modules of the service they are developing and, since the high connectivity began to turn into a problem and for us, the experience of others turned out to be very useful. In addition to reducing connectivity by switching to an event-based model of interaction between modules, MT was useful to us for the distribution of resource-intensive tasks among several processes.
What is MassTransit.
MassTransit is an implementation of the well-known
DataBus pattern. The main objective of this pattern is to organize the interaction of several objects that are unaware of the existence of each other through the exchange of messages between them. The library was written by Dru Sellers and
Chris Patterson as a free
version of the NServiceBus project, capable of using a RabbitMQ or MSMQ message server to choose from as a transport. In our project, we chose to use RabbitMQ, so here we will describe the experience and pitfalls that await the configuration of the bus on this server queues. Despite the fact that MassTransit is a layer of abstraction over the AMQP protocol and the developers tried to hide the implementation details so that knowledge about the device of the queue server and the AMQP protocol for using the library is practically not required, to understand the article and successfully go around the rake during bus configuration RabbitMQ server is desirable to have. This is bad news, but there is good news as well. At the very least knowledge is needed, it will be enough to read the first four lessons
from here . The lessons are small and clear, learning the basics of working with RabbitMQ does not take much time, but it can bring many benefits. By the way, the first couple of lessons were even translated into Habré.
Lesson one and
lesson two .
')
To the point.
Let's move from theory to practice and try using the MassTransit library to accomplish a task similar in functionality to the first example from the tutorial on RabbitMQ. We will write a simple console application in which two objects Publisher and Subscriber will interact. Publisher, when you press any key, will send a “KeyWasPressed” message and key code to the bus. Subscriber will capture this message from the bus and display it on the screen.
First we have to1) Install
Erlang2) Install
RabbitMQ3) Install MassTransit.RabbitMQ to the test application by running the command PM> Install-Package MassTransit.RabbitMQ.
Let's go directly to the code. Messages sent to the bus are normal
DTO objects. First of all, we need to create a class of the message itself, sent from the publisher to the subscriber. Let's call it KeyWasPressed.
public class KeyWasPressed {
We now turn to writing a simple publisher (subscriber) and subscriber. The key element of the library is the ServiceBus. The ServiceBus in MassTransit is a messaging environment in which the RabbitMQ (or MSMQ) queue server is responsible for transporting messages. Our subscriber and publisher will be objects of this type - ServiceBus.
Subscriber.
IServiceBus subscriber = ServiceBusFactory.New(sbc => {
Publisher.
IServiceBus publisher = ServiceBusFactory.New(sbc => {
MassTransit does not divide ServiceBus instances connected to the message server into publishers and subscribers; through each connected instance you can both publish and process messages. Therefore, we always need to specify a queue for receiving messages, although sometimes, as in the case of the publisher object, we are not going to receive anything.
Now we write an infinite loop in which each key pressed will be sent to the bus.
while (true) { publisher.Publish(new KeyWasPressed() { PressedKey = Console.ReadKey().Key }); }
Everything is ready, run our application and click on the keys.

What happens on the queue server.
Let's see what happens on the queue server when launching our application. By default, the RabbitMQ installer registers RabbitMQ as a Windows service, so we can always see what is currently happening on the queue server through the command line utilities. But it is more convenient to use the web plugin, which is also included in the standard distribution package.
To install it, we will have to follow these few steps.1) In the command line, go to the sbin folder from the server installation directory (for example,% PROGRAMFILES% \ RabbitMQ Server \ rabbitmq_server_2.7.1 \ sbin \)
2) Next, run the following command.
rabbitmq-plugins.bat enable rabbitmq_management
3) Finally, to enable the plugin control, we must reinstall the RabbitMQ service. Perform the following sequence of commands to install the service:
rabbitmq-service.bat stop
rabbitmq-service.bat install
rabbitmq-service.bat start
To make sure that the RabbitMQ server management plugin is installed and running, launch a browser and go to the
next page (for version 3.0, the default port is 55672). If everything went well, a screen similar to the following will appear:

The default login / password is guest / guest. Go inside and click on the list of points of exchange.
For each message for which there is at least one subscriber, MT creates an exchange point (exchange) with a name using the Namespace: ClassName pattern and associates subscriber queues with it. In our application, there is only one exchange point, called MT: KeyWasPressed, and this exchange point is tied to one queue - subscriber.

In new versions of MT, the queues and default exchange points became permanent (durable), I did not understand this bug or feature, but earlier, when creating each subscription, it was necessary to explicitly call the Permanent () method to fix the corresponding exchange point on the queue server. Stable exchange points and queues are convenient because if the subscriber is disconnected from the queue server at the time of publishing the message, the published message will still not pass by the subscriber, but will queue up and quietly wait for its (subscriber) connection.
Add new subscribers.
Let's change our application by adding another subscriber to the KeyWasPressed message. Unlike the first one, it will be subscribed to the queue called anothersubscriber and will display a numeric representation of each key pressed by the user.
IServiceBus anotherSubscriber = ServiceBusFactory.New(sbc => { sbc.UseRabbitMq(); sbc.ReceiveFrom("rabbitmq://localhost/anothersubscriber"); sbc.Subscribe(subs => subs.Handler<KeyWasPressed>(msg => Console.WriteLine("{0}{1}{2}{3}", Environment.NewLine, "Key with code ", (int) msg.PressedKey, " was pressed") )); });
Run the application and click on the keys.

Now, when you press each key, two lines appear on the screen - with the numeric and character code of each key pressed. If you enter the queue server control panel, you can see that now two subscriber and anothersubscriber queues are already attached to the MT: KeyWasPressed exchange point. And each received message of the MT.KeyWasPressed type, the RabbitMQ queue server sends to both queues.
Distribution of resource-intensive tasks.
Now let's see how using the MassTransit + RabbitMQ bundle, you can distribute resource-intensive tasks among several processes.
Imagine that our task is to create a service for converting video files. For this task, we have two servers. Experimentally, we have established that the optimal load for server number one is three parallel convertible video files, for server number two the number of simultaneously convertible video files should not exceed five. The conversion process, we, of course, will emulate. Imagine that we have a queue called filesToConvert, which receives files for conversion. Each file will represent an object of type VideoFile.
public class VideoFile { public int Num { get; set; }
The subscriber, having received such a message, according to the rules of the game, will have to fall asleep by the number of milliseconds specified in the TimeToConvert field of the incoming message.
The code, according to legend, runs on the first server.
int firstServerFilesCount = 0; IServiceBus firstServer = ServiceBusFactory.New(sbc => { sbc.UseRabbitMq();
According to legend, on the first server we decided to limit the number of simultaneously disassembled messages to three. Therefore, we call the SetConcurrentConsumerLimit method with argument 3. This means that when the firstServer object is connected to the message server, MassTransit will have a pool of three threads ready to process messages from the server. But we must remember that RabbitMQ is engaged in the distribution of messages, and he cannot know the fact that the firstServer object is ready to parse up to three messages at the same time. We can send this information to it by specifying the prefetch parameter in the Uri, through which firstServer connects to the message server.
The code, according to legend, runs on the second server.
int secondServerFilesCount = 0; IServiceBus secondServer = ServiceBusFactory.New(sbc => { sbc.UseRabbitMq();
Differences, as you can guess, are only in the number of threads in the pool, designed to parse the message, and the value of prefetch in Uri. It is much more important to note the fact that we connected the secondServer to the same queue, where the firstServer was connected, thereby creating a competition between the subscribers for the messages appearing in this queue. If the firstServer and secondServer objects are connected to different queues, then we will face the fact that each file will be converted twice, once on each server.
Now let's write the code that fills the filesToConvert queue with hundreds of “video files”, with the conversion time set by random.
Random rnd = new Random(); for (int i = 1; i <= 100; i++) { publisher.Publish(new VideoFile() {Num = i, TimeToConvert = rnd.Next(100, 5000)}); }
We start and make sure that our subscriber servers work in parallel using the number of threads we have assigned.

What other features can MassTransit offer us?
Within the framework of one article it is impossible to consider all the possibilities offered by the MassTransit library. But you can list (what I do).
- Sagas. Mechanism for coordinating distributed processes
- Scheduling Integration with the library Quartz.net allows you to send messages in the queue on a schedule
- Encryption. Encryption of sent messages. Rijndael block cipher is used for encryption.
- Unit Testability. For testing purposes, MassTransit can use the built-in transport (Loopback), which does not require external MQ servers.
Code examples from the article can be found
here .