📜 ⬆️ ⬇️

Initial rakes in working with Service Bus for Windows Server

Microsoft has such a not very well-known thing, as the Service Bus for Windows Server . And it so happened that in several projects we had a chance to work with it. As a result, it turned out to collect a small set of pitfalls that were encountered in projects more often than others. What and share.

A brief description of what Service Bus for Windows Server is.
This is a Microsoft implementation of the Service Bus, which is very close to the Windows Azure Service Bus on Windows, but does not require Azure itself. That is a sort of quite comfortable and advanced tire. It can provide both standard queues and their advanced version of topic (topic), which is able to give the same message to several different subscriptions (subscription). Since in practice I only managed to collide with topics / subscriptions, then we will only discuss them later.
image
That is, consumers publish their posts in the topic. The topic transmits them to all its subscriptions. Subscriptions, in turn, check messages for whether they need them or not by matching them with their list of rules (filter). All suitable messages are then transmitted to those customers who subscribe to these same subscriptions. And if several clients are subscribed to the same subscription, then only one of them will receive the message. Everything is pretty standard.

The first steps and the very first rake


How to start using this thing? From trying to send and receive a message, of course.
Attention , hereafter the non-production code is given, the attached code is only intended to serve as a functioning illustration of the text.
var messageFactory = MessagingFactory.CreateFromConnectionString(connectionString); var publisher = messageFactory.CreateTopicClient(topicName); var listener = messageFactory.CreateSubscriptionClient(topicName, subscriptionName); listener.OnMessage(message => Console.WriteLine($"Message received: {message.GetBody<string>()}")); var brokeredMessage = new BrokeredMessage("some test message"); publisher.Send(brokeredMessage); 

It's simple, a message appears in the console. Let's try to post a lot of messages and very roughly estimate how much time it will take to send and receive:

 var stopwatch = new Stopwatch(); int messagesToSent = 200; int messagesProccessed = 0; listener.OnMessage(message => { Console.WriteLine($"Message received: {message.GetBody<string>()}"); messagesProccessed++; if (messagesProccessed == messagesToSent) { stopwatch.Stop(); Console.WriteLine($"Time passed: {stopwatch.Elapsed}"); } }); stopwatch.Start(); for (var i = 0; i < messagesToSent; i ++) { var brokeredMessage = new BrokeredMessage($"Message №{i}"); publisher.Send(brokeredMessage); } 

If you run this code, it turns out that on my old veteran computer the process takes about six seconds.
')
But the next step often leads to the first rake. The fact is that the subscriber can receive messages in one of two modes:


By default, messageFactory.CreateSubscriptionClient creates a PeekLock option. But due to non-obviousness, I almost did not see the client being created without explicit indication of the mode of operation. And, if you believe the documentation, at the specified PeekLock, you must call .Complete () for each message. Let's try to do it:

 listener.OnMessage(message => { Console.WriteLine($"Message received: {message.GetBody<string>()}"); messagesProccessed++; if (messagesProccessed == messagesToSent) { stopwatch.Stop(); Console.WriteLine($"Time passed: {stopwatch.Elapsed}"); } message.Complete(); //  ,        }); 

And here the unexpected happens. In spite of the fact that no exceptions are being thrown, the lines with Message No. X run, everything happens VERY slowly. These 200 messages instead of six seconds required as much as four minutes and nine seconds! This does not justify the old iron. But after all, I once found this problem in the code of a live project; just for a small number of messages, the performance drawdown was not noticeable.

Why it happens? After all, if something would be wrong, one would expect an exception? In fact, there is an exception. Just for some not entirely clear reason, Microsoft made an extremely non-obvious way of obtaining information about these very exceptions.

The OnMessage message subscription method accepts the optional OnMessageOptions parameter, which allows you to subscribe to the ExceptionReceived event. That is, those "hidden exceptions".

 var onMessageOptions = new OnMessageOptions(); onMessageOptions.ExceptionReceived += (sender, args) => Console.WriteLine($"Exception received: {args.Exception}"); listener.OnMessage(message => { Console.WriteLine($"Message received: {message.GetBody<string>()}"); messagesProccessed++; if (messagesProccessed == messagesToSent) { stopwatch.Stop(); Console.WriteLine($"Time passed: {stopwatch.Elapsed}"); } message.Complete(); }, onMessageOptions); //    onMessageOptions 

By running this code, we will see that for each message, a Microsoft.ServiceBus.Messaging.MessageLockLostException is thrown:
The lock provided is invalid. She has expired, or the message has already been removed from the queue. TrackingId: 54630ae4-6e4f-4979-8fc8-b66e5314079c_GAPC_BAPC, TimeStamp: 08/24/2016 9:20:08 PM

Why it happens? Because onMessageOptions has one more parameter: AutoCommit . And it defaults to true. Thus, in order to work correctly in case you want to independently manage the life cycle of a message, this field must be set to false. Let's try this way:

 var stopwatch = new Stopwatch(); int messagesToSent = 200; int messagesProccessed = 0; var onMessageOptions = new OnMessageOptions { AutoComplete = false //   }; onMessageOptions.ExceptionReceived += (sender, args) => Console.WriteLine($"Exception received: {args.Exception}"); listener.OnMessage(message => { Console.WriteLine($"Message received: {message.GetBody<string>()}"); messagesProccessed++; if (messagesProccessed == messagesToSent) { stopwatch.Stop(); Console.WriteLine($"Time passed: {stopwatch.Elapsed}"); } message.Complete(); }, onMessageOptions); stopwatch.Start(); for (var i = 0; i < messagesToSent; i ++) { var brokeredMessage = new BrokeredMessage($"Message №{i}"); publisher.Send(brokeredMessage); } 

And voila: no exceptions, message processing takes only two and a half seconds. It looks like a normal operation.

Summarizing :


Abstraction and rake two


The second, not less frequently encountered moment, also seen in the production code, is the improper creation of a wrapper for the subscriber. Creating a class that would hide the work with service bass within itself is generally a good thing. But there are nuances. Here is an illustration of how not to do it, but such code has been repeatedly noticed in reality.

This class is created like this:

 class Listener : IListener { private readonly MessagingFactory _messageFactory; private readonly SubscriptionClient _client; public event Func<string, Task> OnReceivedAsync; public Listener(string connectionString, string topicName, string subscriptionName, ReceiveMode receiveMode) { _messageFactory = MessagingFactory.CreateFromConnectionString(connectionString); _client = _messageFactory.CreateSubscriptionClient(topicName, subscriptionName, receiveMode); var onMessageOptions = new OnMessageOptions { AutoComplete = false }; onMessageOptions.ExceptionReceived += (sender, args) => Console.WriteLine($"Exception received: {args.Exception}"); _client.OnMessageAsync(bm => OnReceivedAsync?.Invoke(bm.GetBody<string>()), onMessageOptions); } } 

Which is further used like this:

 var messageFactory = MessagingFactory.CreateFromConnectionString(connectionString); var publisher = messageFactory.CreateTopicClient(topicName); int messagesToSent = 20; for (var i = 0; i < messagesToSent; i++) { var brokeredMessage = new BrokeredMessage($"Message №{i}"); publisher.Send(brokeredMessage); } var listener = new Listener(connectionString, topicName, subscriptionName, ReceiveMode.ReceiveAndDelete); listener.OnReceivedAsync += x => { Console.WriteLine($"Message received: {x}"); return Task.FromResult(true); }; 

If you run this code, everything seems to work, but instead of the first message there will be an error " NullReferenceException: Object reference not set to an instance of an object. "

Moreover, the error will be caught only in the case of a subscription to onMessageOptions.ExceptionReceived, if this is not done (and not for some reason, they do not do this for some reason), then the problem can only be found by indirect and sometimes very subtle bugs in the code behavior.

What is wrong here? Well, the answer is pretty obvious, and if I hadn’t met so often, I probably wouldn’t have mentioned it. When the _client.OnMessageAsync is called in the Listener abstraction constructor, the subscriber is already starting to receive messages. Therefore, a number of them (depending on how far the designer and the subscription to the listener.OnReceivedAsync are spaced out will be skipped and go to the empty OnReceivedAsync? .Invoke , logically returning null. Hence, NullReferenceException .

What to do with it? The simplest thing is to postpone instance creation and subscription, like this:

 class Listener : IListener { private readonly MessagingFactory _messageFactory; private readonly SubscriptionClient _client; public Listener(string connectionString, string topicName, string subscriptionName, ReceiveMode receiveMode) { _messageFactory = MessagingFactory.CreateFromConnectionString(connectionString); _client = _messageFactory.CreateSubscriptionClient(topicName, subscriptionName, receiveMode); } public void Subscribe(Func<string, Task> handleMessage) { var onMessageOptions = new OnMessageOptions { AutoComplete = false }; onMessageOptions.ExceptionReceived += (sender, args) => Console.WriteLine($"Exception received: {args.Exception}"); _client.OnMessageAsync(bm => handleMessage(bm.GetBody<string>()), onMessageOptions); } } 

And subscribe like this:

 var listener = new Listener(connectionString, topicName, subscriptionName, ReceiveMode.ReceiveAndDelete); listener.Subscribe(x => { Console.WriteLine($"Message received: {x}"); return Task.FromResult(true); }); 


Now the loss of messages when creating a class does not occur.

Summarizing :


Rake number three


Subscriber has a wonderful method Close () . But his behavior is not entirely predictable speculative. Let's try to execute the following code here, which, after sending the first half of messages, calls this very Close () and receives the second half of the messages through another copy of the subscriber.

 var messageFactory = MessagingFactory.CreateFromConnectionString(connectionString); var publisher = messageFactory.CreateTopicClient(topicName); var listener1 = messageFactory.CreateSubscriptionClient(topicName, subscriptionName, ReceiveMode.ReceiveAndDelete); var listener2 = messageFactory.CreateSubscriptionClient(topicName, subscriptionName, ReceiveMode.ReceiveAndDelete); int messagesToSent = 10; int messagesProccessed = 0; var onMessageOptions = new OnMessageOptions { AutoComplete = false }; onMessageOptions.ExceptionReceived += (sender, args) => Console.WriteLine($"Exception received: {args.Exception}"); listener1.OnMessage(message => { Console.WriteLine($"listener1: message received: {message.GetBody<string>()}, listener1 is closed: {listener1.IsClosed}"); messagesProccessed++; }, onMessageOptions); for (var i = 0; i < messagesToSent; i++) { var brokeredMessage = new BrokeredMessage($"Message №{i}"); publisher.Send(brokeredMessage); Thread.Sleep(50); if (i == 4) { Console.WriteLine("Closing listener1"); listener1.Close(); } } listener2.OnMessage(message => { Console.WriteLine($"listener2: message received : {message.GetBody<string>()}, listener2 is closed: {listener2.IsClosed}"); messagesProccessed++; }, onMessageOptions); 

But the result in the console will be like this:
listener1: message received: Message # 0, listener1 is closed: False
listener1: message received: Message # 1, listener1 is closed: False
listener1: message received: Message # 2, listener1 is closed: False
listener1: message received: Message # 3, listener1 is closed: False
listener1: message received: Message # 4, listener1 is closed: False
Closing listener1
listener1: message received: Message # 5, listener1 is closed: True
listener2: message received: Message # 6, listener2 is closed: False
listener2: message received: Message # 7, listener2 is closed: False
listener2: message received: Message # 8, listener2 is closed: False
listener2: message received: Message # 9, listener2 is closed: False

Not obvious, right? If you do the same, but for the PeekLock mode instead of ReceiveAndDelete , the result will be similar, except that .Complete () will throw out the System.OperationCanceledException: This messaging entity has already been closed, aborted, or disposed . And if you catch errors in the message handler in order to make Abandon () with your hands, then Abandon () itself will throw out the error. And both of these are ordinary, not hiding inside OnMessageOptions .

And the missed message itself, unlike ReceiveAndDelete , will still be processed later, when a re-send occurs.

Code with Complete and output to console
 var messageFactory = MessagingFactory.CreateFromConnectionString(connectionString); var messageFactory1 = MessagingFactory.CreateFromConnectionString(connectionString); var messageFactory2 = MessagingFactory.CreateFromConnectionString(connectionString); var publisher = messageFactory.CreateTopicClient(topicName); var listener1 = messageFactory1.CreateSubscriptionClient(topicName, subscriptionName, ReceiveMode.PeekLock); var listener2 = messageFactory2.CreateSubscriptionClient(topicName, subscriptionName, ReceiveMode.PeekLock); int messagesToSent = 10; int messagesProccessed = 0; var onMessageOptions = new OnMessageOptions { AutoComplete = false }; onMessageOptions.ExceptionReceived += (sender, args) => Console.WriteLine($"Exception received: {args.Exception}"); listener1.OnMessage(message => { try { Console.WriteLine($"listener1: message received: {message.GetBody<string>()}, listener1 is closed: {listener1.IsClosed}"); messagesProccessed++; message.Complete(); } catch (Exception ex1) { Console.WriteLine($"listener1 Complete() exception: {ex1.Message}"); try { message.Abandon(); } catch (Exception ex2) { Console.WriteLine($"listener1 Abandon() exception: {ex2.Message}"); } } }, onMessageOptions); for (var i = 0; i < messagesToSent; i++) { var brokeredMessage = new BrokeredMessage($"Message №{i}"); publisher.Send(brokeredMessage); Thread.Sleep(50); if (i == 4) { Console.WriteLine("Closing listener1"); listener1.Close(); } } listener2.OnMessage(message => { Console.WriteLine($"listener2: message received : {message.GetBody<string>()}, listener2 is closed: {listener2.IsClosed}"); messagesProccessed++; message.Complete(); }, onMessageOptions); 


listener1: message received: Message # 0, listener1 is closed: False
listener1: message received: Message # 1, listener1 is closed: False
listener1: message received: Message # 2, listener1 is closed: False
listener1: message received: Message # 3, listener1 is closed: False
listener1: message received: Message # 4, listener1 is closed: False
Closing listener1
listener1: message received: Message # 5, listener1 is closed: True
listener1 Complete () exception: This messaging entity has already been closed, aborted, or disposed.
Abandon listener exception: This message has been closed, aborted, or disposed.
listener2: message received: Message # 6, listener2 is closed: False
listener2: message received: Message # 7, listener2 is closed: False
listener2: message received: Message # 8, listener2 is closed: False
listener2: message received: Message # 9, listener2 is closed: False
listener2: message received: Message # 5, listener2 is closed: False


What to do with it and how to live with it? Well, you just need to remember about it and take it into account in the code. The all-knowing stackoverflow offers enough options to deal with this behavior. For example, where appropriate, you can call messageFactory.Close () together with the closure of the subscriber. Or check in the handler if the subscriber is not closed with something like if (listener.IsClosed) {/ *** /} , etc.

Summarizing :


Conclusion


In general, Service Bus for Windows Server is a pretty good thing and copes with its tasks, but some little things at the start can drink a little blood. I hope the points outlined in the article will be useful to someone and save them from stuffing their own cones.

Source: https://habr.com/ru/post/308464/


All Articles