📜 ⬆️ ⬇️

Experience using MassTransit 3.0

MassTransit is an open source library developed in the C # language for the .NET platform that simplifies work with the data bus, which is used in building distributed applications and implementing SOA (service oriented architecture).

RabbitMq, Azure Service Bus or In-Memory Manager can act as a message broker (in the case of In-Memory, the scope is limited to the process in which the instance is initialized).

Content:


Teams and Events


The library contains 2 main types of messages: commands and events.
')

Teams


Signal the need to perform some action. For the most meaningful name of the command, it is advisable to use the structure of the verb + noun:
EstimateConnection, SendSms, NotifyCustomerOrderProcessed.

Working with commands is performed using the Send method ( ISendEndpoint interface) and specifying the endpoint receiver (queue):

Sending team
private static async Task SendSmsCommand(IBus busControl) { var command = new SendSmsCommand { CommandId = Guid.NewGuid(), PhoneNumber = "89031112233", Message = "Thank you for your reply" }; var endpoint = await busControl.GetSendEndpoint(AppSettings.CommandEndpoint); await endpoint.Send(command); } 


Developments


They signal an incident that may be of interest to a certain set of subscribers (Observer pattern) that react to these events, for example: ConnectionEstimated, CallTerminated, SmsSent, CustomerNotified.

Work with events is carried out using the Publish method ( IPublishEndpoint interface).

The terminology also contains the main difference between these types of messages - the team is delivered to a single performer (in order to avoid duplication of execution):


Image from article MassTransit Send vs. Publish

While the event is focused on the notification of n-subscribers, each of which responds to the incident event in its own way.


Image from article MassTransit Send vs. Publish

In other words, when n-consumers are launched (from the English consumer, the consumer is the processor), processing the command, after its publication only one of them will receive a message about it, while everyone will receive a message about the event.

Message Contracts


According to the MassTransit documentation, when declaring message contracts, it is recommended to use interfaces:

Contract: command to send SMS
 public interface ISendSms { Guid CommandId { get; } string PhoneNumber { get; } string Message { get; } } 


As mentioned earlier, commands should be sent exclusively using the Send method (IBus interface) and specifying the addressee (endpoint).

Contract: event of successful sending an SMS message
 public interface ISmsSent { Guid EventId { get; } DateTime SentAtUtc { get; } } 


Events are sent using the Publish method.

Routing


Both the distribution of messages by exchange and the choice of consyumers (they will be discussed in this article a little later) for processing are based on runtime types of these messages - the name uses the namespace and type name, in the case of generic, the parent type name and the list of arguments.

Exchange


When configuring receive endpoint 'a (connecting previously registered consumers), when using RabbitMq as a delivery channel, based on the message types specified for processing by consumers , the names of the required exchange are formed, into which these same messages will then be placed.

Similar actions at the send endpoint 'a configuration stage are also performed for commands, for sending which also their own exchange is required.

On the image you can see created in the framework of my exchange script:



In that case, when configuring the receive endpoint, we specify the name of the queue:

 cfg.ReceiveEndpoint(host, "play_with_masstransit_queue", e => e.LoadFrom(container)); 

then in the exchange message bindings you will see the following picture:



The final message path, the type of which is implemented by ISmsEvent, will have the following form:



If the configuration is carried out without specifying a queue:

 cfg.ReceiveEndpoint(host, e=> e.LoadFrom(container)); 

The names for the last exchange and the queue are formed automatically, and upon completion of the work, they will be deleted:



Message format


Speaking about the message format, I would like to elaborate on the name (or messageType). For its formation (headers urn: message :), the function GetUrnForType (Type type) is responsible. For example, I added inheritance from ICommand for the ISendSms command and the generic type:

Contract: command for sending SMS messages ICommand <string>
 public interface ICommand<T> { } public interface ISendSms<T> : ICommand<T> { Guid CommandId { get; } string PhoneNumber { get; } string Message { get; } } class SendSmsCommand : ISendSms<string> { public Guid CommandId { get; set; } public string PhoneNumber { get; set; } public string Message { get; set; } } 


The generated message in this case will contain the following value in the messageType field (based on which, after receiving the message, then the costumer responsible for processing is selected):

 "messageType": [ "urn:message:PlayWithMassTransit30.Extension:SendSmsCommand", "urn:message:PlayWithMassTransit30.Contract.Command:ISendSms[[System:String]]", "urn:message:PlayWithMassTransit30.Contract.Command:ICommand[[System:String]]" ] 

In addition to the messageType, the message contains information about the host to which it was sent:

 "host": { "machineName": "DESKTOP-SI9OHUR", "processName": "PlayWithMassTransit30.vshost", "processId": 1092, "assembly": "PlayWithMassTransit30", "assemblyVersion": "1.0.0.0", "frameworkVersion": "4.0.30319.42000", "massTransitVersion": "3.4.1.808", "operatingSystemVersion": "Microsoft Windows NT 6.2.9200.0" } 

Significant part of payload:

 "message": { "commandId": "7388f663-82dc-403a-8bf9-8952f2ff262e", "phoneNumber": "89031112233", "message": "Thank you for your reply" } 

and other service fields and headers.

Consumer (Consumer)


A concierge is a class that processes one or more types of messages specified when declaring an IConsumer interface in inheritance, where T is the type of message processed by this constituent.

Example of a concierge processing an ISendSms command and publishing an ISmsSent event:

SendSmsConsumer: command to send a message
 public class SendSmsConsumer : IConsumer<ISendSms<string>> { public SendSmsConsumer(IBusControl busControl) { _busControl = busControl; } public async Task Consume(ConsumeContext<ISendSms<string>> context) { var message = context.Message; Console.WriteLine($"[IConsumer<ISendSms>] Send sms command consumed"); Console.WriteLine($"[IConsumer<ISendSms>] CommandId: {message.CommandId}"); Console.WriteLine($"[IConsumer<ISendSms>] Phone number: {message.PhoneNumber}"); Console.WriteLine($"[IConsumer<ISendSms>] Message: {message.Message}"); Console.Write(Environment.NewLine); Console.WriteLine(" :   "); await _busControl.SmsSent(DateTime.UtcNow); } private readonly IBus _busControl; } 


After we receive a command to send SMS messages and perform the required actions, we form and send an event that the SMS has been delivered.

I submitted the code for sending messages to a separate Extension class above IBusControl, and the implementation of the messages themselves is also there:

Extension methods over IBus to encapsulate interconnect logic
 public static class BusExtensions { /// <summary> ///    /// </summary> /// <param name="bus"></param> /// <param name="host"></param> /// <param name="phoneNumber"></param> /// <param name="message"></param> /// <returns></returns> public static async Task SendSms( this IBus bus, Uri host, string phoneNumber, string message ) { var command = new SendSmsCommand { CommandId = Guid.NewGuid(), PhoneNumber = phoneNumber, Message = message }; await bus.SendCommand(host, command); } /// <summary> ///       /// </summary> /// <param name="bus"></param> /// <param name="sentAtUtc"></param> /// <returns></returns> public static async Task SmsSent( this IBus bus, DateTime sentAtUtc ) { var @event = new SmsSentEvent { EventId = Guid.NewGuid(), SentAtUtc = sentAtUtc }; await bus.PublishEvent(@event); } /// <summary> ///   /// </summary> /// <typeparam name="T"></typeparam> /// <param name="bus"></param> /// <param name="address"></param> /// <param name="command"></param> /// <returns></returns> private static async Task SendCommand<T>(this IBus bus, Uri address, T command) where T : class { var endpoint = await bus.GetSendEndpoint(address); await endpoint.Send(command); } /// <summary> ///   /// </summary> /// <typeparam name="T"></typeparam> /// <param name="bus"></param> /// <param name="message"></param> /// <returns></returns> private static async Task PublishEvent<T>(this IBus bus, T message) where T : class { await bus.Publish(message); } } class SendSmsCommand : ISendSms<string> { public Guid CommandId { get; set; } public string PhoneNumber { get; set; } public string Message { get; set; } } class SmsSentEvent : ISmsSent { public Guid EventId { get; set; } public DateTime SentAtUtc { get; set; } } 


In my opinion, this solution quite successfully allows you to separate the code of business logic from the details of the implementation of intersystem (component) interaction and encapsulate them in one place.

DI Container Configuration


At the moment, MassTransit provides the ability to use the following popular containers :

  1. Autofac;
  2. Ninject;
  3. StructureMap;
  4. Unity;
  5. Castle Windsor.

In the case of UnityContainer, you will need to install the nuget package MassTransit.Unity, after which the LoadFrom extension method will become available:

 public static class UnityExtensions { public static void LoadFrom(this IReceiveEndpointConfigurator configurator, IUnityContainer container); } 

The usage example is as follows:

IBusControl configuration using UnityContainer
 public static IBusControl GetConfiguredFactory(IUnityContainer container) { if (container == null) { throw new ArgumentNullException(nameof(container)); } var control = Bus.Factory.CreateUsingRabbitMq(cfg => { var host = cfg.Host(AppSettings.Host, h => { }); // cfg.ReceiveEndpoint(host, e => e.LoadFrom(container)); cfg.ReceiveEndpoint(host, "play_with_masstransit_queue", e => e.LoadFrom(container)); }); control.ConnectConsumeObserver(new ConsumeObserver()); control.ConnectReceiveObserver(new ReceiveObserver()); control.ConnectConsumeMessageObserver(new ConsumeObserverSendSmsCommand()); control.ConnectSendObserver(new SendObserver()); control.ConnectPublishObserver(new PublishObserver()); return control; } 


As a life of a consumer in a container, the documentation suggests using ContainerControlledLifetimeManager ().

Observers


To monitor the processing of messages available connection observers (Observer). For this, MassTransit provides the following set of interfaces for handlers:

  1. IReceiveObserver - triggered immediately after receiving a message and creating a RecieveContext;
  2. IConsumeObserver - triggered after ConsumeContext is created;
  3. IConsumeMessageObserver <T> - to monitor messages of type T, in the methods of which the strictly typed message content will be available;
  4. ISendObserver - to monitor the commands sent;
  5. IPublishObserver - to monitor the events sent.

For each of them, the IBusControl interface provides its own connection method, the execution of which should be implemented immediately before IBusControl.Start ().

As an example, here is the implementation of ConsumeObserver:

IConsumeObsever implementation
 public class ConsumeObserver : IConsumeObserver { public Task PreConsume<T>(ConsumeContext<T> context) where T : class { Console.WriteLine($"[ConsumeObserver] PreConsume {context.MessageId}"); return Task.CompletedTask; } public Task PostConsume<T>(ConsumeContext<T> context) where T : class { Console.WriteLine($"[ConsumeObserver] PostConsume {context.MessageId}"); return Task.CompletedTask; } public Task ConsumeFault<T>(ConsumeContext<T> context, Exception exception) where T : class { Console.WriteLine($"[ConsumeObserver] ConsumeFault {context.MessageId}"); return Task.CompletedTask; } } 


I will not give the code of each consumer, as on the principle of operation and structure, they are similar. The implementation of each of them can be viewed in the documentation or in the source code on Github .

The final pipeline for receiving a command to send an SMS message, processing it and publishing an event of its successful execution is as follows:



New to MassTransit 3.0


With the changes that have touched the new version of the library, you can find in 2 review articles by the author of the library Chris Patterson in the pages of his blog: MassTransit 3 API Changes and MassTransit v3 Update .

Conclusion


There should have been a comparison of the most popular message queue libraries, however, I decided to leave it for a separate article.

Hopefully, I was able to make for you a superficial acquaintance with the MassTransit library, beyond which such interesting things as transactionality, persistence (integration with NHibernate, MondoDb, EntityFramework), message sending planner (integration with Quartz), state machine (Automatonymous and Saga), logging (Log4Net, NLog), multithreading and more.

Sample source codes are available on Github .

Materials used:
  1. Documentation MassTransit .

Source: https://habr.com/ru/post/314080/


All Articles