📜 ⬆️ ⬇️

Event model based on async and await

Back in 2012, when the price of oil was still three-digit, and the grass was greener, Microsoft released .NET 4.5, and with it the async / await design. There are already quite a few articles written about it ( Async in C # ), and most C # developers have studied it well. But have all the use cases been considered, is it possible to squeeze a bit more out of await?

The most obvious use of this construct is to wait for an asynchronous operation to complete. The first thing that comes to mind is waiting for I / O. For example, we sent a request to the client and wait for a response, then using await we will be able to continue executing the code after receiving the response, while the code itself will look synchronous. But what if during the wait it becomes necessary to interrupt the execution of this operation? Then we will have to use CancellationToken, and if there are several such operations, then the tokens will need to be linked or use one common token. In this case, the reason for the cancellation will be hidden from the code that uses this CancellationToken. In addition to cancellation, the code must support the handling of connection loss, timeout, returned errors, etc.

In the classic version, this will result in the use of CancellationToken for cancellation processing, try catch for processing disconnection and analysis code of the returned data to evaluate the result of the request. But can all this fit in a single paradigm? In this article, I propose to consider an alternative approach based on an event model using the async / await syntactic sugar.

Eventing library.


Everything needed for an event model on async / await was designed as an Eventing library and uploaded to GitHub under the MIT license.
')
The library has been tested and successfully used on the combat system for more than two years.

Using


The example described at the beginning using Eventing will look like this:

var @event = await this.EventManager.WaitFor<MessageReceived, CancelRequested>(TimeSpan.FromSeconds(50)); if (@event == null) Log.Info("timeout"); else if (@event is CancelRequested) Log.Info("Cancelled, reason: {0}", ((CancelRequested) @event).Reason); else Log.Info("Message received"); 

Here we use EventManager, an event manager that implements the IEventManager interface, to wait for MessageReceived and CancelRequested events with a timeout of 50 seconds. Using the WaitFor call, we create a subscription to the specified events, and the await call blocks further code execution (but not the flow). It will remain blocked until one of the specified events occurs or the timeout period expires, after which execution will continue in the current synchronization context. But what if the connection with the client is lost during the formation of the subscription? In this case, the code will hang for 50 seconds, since the client disconnect event will be missed. Fix this:

 //   var eventAwait = this.EventManager.WaitFor<MessageReceived, ClientDisconnected, CancelRequested>(TimeSpan.FromSeconds(50), e => !(e is ClientDisconnected) || ((ClientDisconnected)e).id == client.Id); //   if (!client.Connected || cancelRequested) { //            Log.Info("Client disconnected or cancel requested"); return; } //      var @event = await eventAwait; ... 

Here we added the ClientDisconnected event and divided the creation of the awaitable variable eventAwait and the immediate wait for the event. If we did not separate them, the client could disconnect after checking client.Connected and waiting for the event, which would lead to the loss of the event. An event filter has also been added that excludes ClientDisconnected events unrelated to the current client.

How to create an event?


To do this, create a class that implements IEvent:

 class CancelRequested : IEvent { public string Reason { get; set; } } 

And then call IEventManager.RaiseEvent, for example:

 this.EventManager.RaiseEvent(new CancelRequested()). 


Inheritance from IEvent separates events from other classes and prevents the use of inappropriate instances in the RaiseEvent method. Inheritance is also supported:

 class UserCancelRequested : CancelRequested { } class SystemCancelRequested : CancelRequested { } var @event = await this.EventManager.WaitFor<CancelRequested>(); if (@event is UserCancelRequested) ... 

If you have a complex system in which there are many expected events at the same time, using the CancelRequested event instead of cancellation tokens will allow you to avoid global and local CancellationToken linking. This is important because complex linking increases the chance of missing a memory leak due to the retention of tokens.

How to subscribe to an event?


Some events are periodic, such events can be obtained using the IEventManager.StartReceiving method:

 void StartReceiving<T>(Action<T> handler, object listener, Func<T, bool> filter = null, SynchronizationContext context = null) where T : IEvent; 

The handler handler will be called in the context of context synchronization for each T event that satisfies the filter filter, if specified. If the synchronization context is not set, then SynchronizationContext.Current will be used.

How it works?


It uses all the same mechanism of tasks on which async / await is based. When you call WaitFor, the event manager creates a task using TaskCompletionSource and creates a subscription for the selected types of events in the message bus.

 // EventManager.cs,   var taskCompletionSource = new TaskCompletionSource<IEvent>(); var subscription = new MessageSubscription( subscriptionId, message => { var @event = message as IEvent; if (filter != null && !filter(@event)) return; //     if (taskCompletionSource.TrySetResult(@event)) this.trace.TraceEvent(TraceEventType.Information, 0, "Wait ended: '{0}' - '{1}'", subscriptionId, message.GetType()); }, this, UnsubscribePolicy.Auto, this.defaultContext, eventTypes); this.messageBus.Subscribe(subscription); ... return taskCompletionSource.Task; 

When an event is generated, the RaiseEvent method is called, which sends the event to the bus, and according to the type of event it selects the subscriptions in which eventTypes includes this type. Next, the subscription handler is called, and if it satisfies the filter, the result of the task execution is set and unlocks the await call.

 // EventManager.cs,   public void RaiseEvent(IEvent @event) { this.trace.TraceEvent(TraceEventType.Information, 0, "Event: {0}", @event); this.messageBus.Send(@event); } // MessageBus.cs,   public void Send(object message) { var messageType = message.GetType(); IMessageSubscription[] subscriptionsForMessage; lock (this.subscriptions) { subscriptionsForMessage = this.subscriptions .Where(s => s.MessagesTypes.Any(type => messageType == type || type.IsAssignableFrom(messageType))) .ToArray(); } ... foreach (var subscription in subscriptionsForMessage) subscription.ProccessMessage(message); this.UnsubscribeAutoSubscriptions(subscriptionsForMessage); ... // MessageSubscription.cs public void ProccessMessage(object message) { var messageHandler = this.handler; this.SynchronizationContext.Post(o => messageHandler(message), null); } 

In MessageSubscription.ProccessMessage, a message is sent to a user-defined synchronization context, which avoids delays when sending a message.

Deliver my class from multithreading!


Everyone who has worked with async / await knows that after await completes, the code continues its execution not in the current thread, but in the current synchronization context. This can be a problem if you subscribe to an event using StartReceiving and then call WaitFor, which causes the class code to be executed simultaneously in different threads (the event handler from StartReceiving and the code after await // how scary to live!). This is easy to fix with a single-threaded synchronization context included in the library:

 this.serverSynchronizationContext = new SingleThreadSynchronizationContext("Server thread"); this.clientSynchronizationContext = new SingleThreadSynchronizationContext("Client thread"); this.serverSynchronizationContext.Post(async o => await this.RunServer(), null); this.clientSynchronizationContext.Post(async o => await this.RunClient(), null); 

Thus, our client will always be executed in the stream “Client thread”, and the server in “Server thread”. You can write multi-threaded code without thinking about race condition. As a bonus, this will allow for maximum utilization of a single stream.

What is the advantage?


The main advantage is the simplicity and testability of the code. If one can argue about the first, everyone understands simplicity in his own way, then everything is obvious with the second paragraph. A multi-threaded application can be tested in a single thread, emulating any sequence of events, and this does not require creating mock objects, any interaction can be reduced to events, and their checking to a RaiseEvent call. NUnit example:

 /// <summary> /// This test demonstrates how to test application that uses Eventing /// All code executes sequently in one thread /// </summary> [TestFixture] public class TestCase : TestBase { [Test] public async Task ClientDoesWork() { var client = new Client(this.EventManager); var doWorkAwaitable = client.DoWork(); this.EventManager.RaiseEvent(new Connected()); // We can debug and step into this.React(); await doWorkAwaitable; Assert.AreEqual(true, client.Connected); } } 

How can this be used?


In order not to overfill the article with listings, I will provide only a brief text description of one of the systems where Eventing is used. It is a horizontally scalable distributed system consisting of four types of nodes, one of which is a master. The master continuously communicates with all nodes and controls the execution of various operations on them. Each operation can be represented as a finite state machine, where the transition is the occurrence of an event (including a timeout or cancellation). Although for each operation it was possible to implement the automaton in its classical form (which we initially did), it was much easier to imagine using Eventing, where the current state was determined by the code execution point, rather than a separate variable. At that, at each step all the expected events were clearly listed, which simplified the testing of the white box.

Conclusion


The article discusses the key features and options for using the Eventing library. The library does not pretend to the universality and support of high-loaded systems, but calls for a little different look at familiar things, allows you to write safe and easy to test in terms of multithreading code.

Source: https://habr.com/ru/post/302522/


All Articles