Attention , hereafter the non-production code is given, the attached code is only intended to serve as a functioning illustration of the text.
var messageFactory = MessagingFactory.CreateFromConnectionString(connectionString); var publisher = messageFactory.CreateTopicClient(topicName); var listener = messageFactory.CreateSubscriptionClient(topicName, subscriptionName); listener.OnMessage(message => Console.WriteLine($"Message received: {message.GetBody<string>()}")); var brokeredMessage = new BrokeredMessage("some test message"); publisher.Send(brokeredMessage);
var stopwatch = new Stopwatch(); int messagesToSent = 200; int messagesProccessed = 0; listener.OnMessage(message => { Console.WriteLine($"Message received: {message.GetBody<string>()}"); messagesProccessed++; if (messagesProccessed == messagesToSent) { stopwatch.Stop(); Console.WriteLine($"Time passed: {stopwatch.Elapsed}"); } }); stopwatch.Start(); for (var i = 0; i < messagesToSent; i ++) { var brokeredMessage = new BrokeredMessage($"Message №{i}"); publisher.Send(brokeredMessage); }
listener.OnMessage(message => { Console.WriteLine($"Message received: {message.GetBody<string>()}"); messagesProccessed++; if (messagesProccessed == messagesToSent) { stopwatch.Stop(); Console.WriteLine($"Time passed: {stopwatch.Elapsed}"); } message.Complete(); // , });
var onMessageOptions = new OnMessageOptions(); onMessageOptions.ExceptionReceived += (sender, args) => Console.WriteLine($"Exception received: {args.Exception}"); listener.OnMessage(message => { Console.WriteLine($"Message received: {message.GetBody<string>()}"); messagesProccessed++; if (messagesProccessed == messagesToSent) { stopwatch.Stop(); Console.WriteLine($"Time passed: {stopwatch.Elapsed}"); } message.Complete(); }, onMessageOptions); // onMessageOptions
The lock provided is invalid. She has expired, or the message has already been removed from the queue. TrackingId: 54630ae4-6e4f-4979-8fc8-b66e5314079c_GAPC_BAPC, TimeStamp: 08/24/2016 9:20:08 PM
var stopwatch = new Stopwatch(); int messagesToSent = 200; int messagesProccessed = 0; var onMessageOptions = new OnMessageOptions { AutoComplete = false // }; onMessageOptions.ExceptionReceived += (sender, args) => Console.WriteLine($"Exception received: {args.Exception}"); listener.OnMessage(message => { Console.WriteLine($"Message received: {message.GetBody<string>()}"); messagesProccessed++; if (messagesProccessed == messagesToSent) { stopwatch.Stop(); Console.WriteLine($"Time passed: {stopwatch.Elapsed}"); } message.Complete(); }, onMessageOptions); stopwatch.Start(); for (var i = 0; i < messagesToSent; i ++) { var brokeredMessage = new BrokeredMessage($"Message №{i}"); publisher.Send(brokeredMessage); }
class Listener : IListener { private readonly MessagingFactory _messageFactory; private readonly SubscriptionClient _client; public event Func<string, Task> OnReceivedAsync; public Listener(string connectionString, string topicName, string subscriptionName, ReceiveMode receiveMode) { _messageFactory = MessagingFactory.CreateFromConnectionString(connectionString); _client = _messageFactory.CreateSubscriptionClient(topicName, subscriptionName, receiveMode); var onMessageOptions = new OnMessageOptions { AutoComplete = false }; onMessageOptions.ExceptionReceived += (sender, args) => Console.WriteLine($"Exception received: {args.Exception}"); _client.OnMessageAsync(bm => OnReceivedAsync?.Invoke(bm.GetBody<string>()), onMessageOptions); } }
var messageFactory = MessagingFactory.CreateFromConnectionString(connectionString); var publisher = messageFactory.CreateTopicClient(topicName); int messagesToSent = 20; for (var i = 0; i < messagesToSent; i++) { var brokeredMessage = new BrokeredMessage($"Message №{i}"); publisher.Send(brokeredMessage); } var listener = new Listener(connectionString, topicName, subscriptionName, ReceiveMode.ReceiveAndDelete); listener.OnReceivedAsync += x => { Console.WriteLine($"Message received: {x}"); return Task.FromResult(true); };
class Listener : IListener { private readonly MessagingFactory _messageFactory; private readonly SubscriptionClient _client; public Listener(string connectionString, string topicName, string subscriptionName, ReceiveMode receiveMode) { _messageFactory = MessagingFactory.CreateFromConnectionString(connectionString); _client = _messageFactory.CreateSubscriptionClient(topicName, subscriptionName, receiveMode); } public void Subscribe(Func<string, Task> handleMessage) { var onMessageOptions = new OnMessageOptions { AutoComplete = false }; onMessageOptions.ExceptionReceived += (sender, args) => Console.WriteLine($"Exception received: {args.Exception}"); _client.OnMessageAsync(bm => handleMessage(bm.GetBody<string>()), onMessageOptions); } }
var listener = new Listener(connectionString, topicName, subscriptionName, ReceiveMode.ReceiveAndDelete); listener.Subscribe(x => { Console.WriteLine($"Message received: {x}"); return Task.FromResult(true); });
var messageFactory = MessagingFactory.CreateFromConnectionString(connectionString); var publisher = messageFactory.CreateTopicClient(topicName); var listener1 = messageFactory.CreateSubscriptionClient(topicName, subscriptionName, ReceiveMode.ReceiveAndDelete); var listener2 = messageFactory.CreateSubscriptionClient(topicName, subscriptionName, ReceiveMode.ReceiveAndDelete); int messagesToSent = 10; int messagesProccessed = 0; var onMessageOptions = new OnMessageOptions { AutoComplete = false }; onMessageOptions.ExceptionReceived += (sender, args) => Console.WriteLine($"Exception received: {args.Exception}"); listener1.OnMessage(message => { Console.WriteLine($"listener1: message received: {message.GetBody<string>()}, listener1 is closed: {listener1.IsClosed}"); messagesProccessed++; }, onMessageOptions); for (var i = 0; i < messagesToSent; i++) { var brokeredMessage = new BrokeredMessage($"Message №{i}"); publisher.Send(brokeredMessage); Thread.Sleep(50); if (i == 4) { Console.WriteLine("Closing listener1"); listener1.Close(); } } listener2.OnMessage(message => { Console.WriteLine($"listener2: message received : {message.GetBody<string>()}, listener2 is closed: {listener2.IsClosed}"); messagesProccessed++; }, onMessageOptions);
listener1: message received: Message # 0, listener1 is closed: False
listener1: message received: Message # 1, listener1 is closed: False
listener1: message received: Message # 2, listener1 is closed: False
listener1: message received: Message # 3, listener1 is closed: False
listener1: message received: Message # 4, listener1 is closed: False
Closing listener1
listener1: message received: Message # 5, listener1 is closed: True
listener2: message received: Message # 6, listener2 is closed: False
listener2: message received: Message # 7, listener2 is closed: False
listener2: message received: Message # 8, listener2 is closed: False
listener2: message received: Message # 9, listener2 is closed: False
var messageFactory = MessagingFactory.CreateFromConnectionString(connectionString); var messageFactory1 = MessagingFactory.CreateFromConnectionString(connectionString); var messageFactory2 = MessagingFactory.CreateFromConnectionString(connectionString); var publisher = messageFactory.CreateTopicClient(topicName); var listener1 = messageFactory1.CreateSubscriptionClient(topicName, subscriptionName, ReceiveMode.PeekLock); var listener2 = messageFactory2.CreateSubscriptionClient(topicName, subscriptionName, ReceiveMode.PeekLock); int messagesToSent = 10; int messagesProccessed = 0; var onMessageOptions = new OnMessageOptions { AutoComplete = false }; onMessageOptions.ExceptionReceived += (sender, args) => Console.WriteLine($"Exception received: {args.Exception}"); listener1.OnMessage(message => { try { Console.WriteLine($"listener1: message received: {message.GetBody<string>()}, listener1 is closed: {listener1.IsClosed}"); messagesProccessed++; message.Complete(); } catch (Exception ex1) { Console.WriteLine($"listener1 Complete() exception: {ex1.Message}"); try { message.Abandon(); } catch (Exception ex2) { Console.WriteLine($"listener1 Abandon() exception: {ex2.Message}"); } } }, onMessageOptions); for (var i = 0; i < messagesToSent; i++) { var brokeredMessage = new BrokeredMessage($"Message №{i}"); publisher.Send(brokeredMessage); Thread.Sleep(50); if (i == 4) { Console.WriteLine("Closing listener1"); listener1.Close(); } } listener2.OnMessage(message => { Console.WriteLine($"listener2: message received : {message.GetBody<string>()}, listener2 is closed: {listener2.IsClosed}"); messagesProccessed++; message.Complete(); }, onMessageOptions);
listener1: message received: Message # 0, listener1 is closed: False
listener1: message received: Message # 1, listener1 is closed: False
listener1: message received: Message # 2, listener1 is closed: False
listener1: message received: Message # 3, listener1 is closed: False
listener1: message received: Message # 4, listener1 is closed: False
Closing listener1
listener1: message received: Message # 5, listener1 is closed: True
listener1 Complete () exception: This messaging entity has already been closed, aborted, or disposed.
Abandon listener exception: This message has been closed, aborted, or disposed.
listener2: message received: Message # 6, listener2 is closed: False
listener2: message received: Message # 7, listener2 is closed: False
listener2: message received: Message # 8, listener2 is closed: False
listener2: message received: Message # 9, listener2 is closed: False
listener2: message received: Message # 5, listener2 is closed: False
Source: https://habr.com/ru/post/308464/
All Articles