📜 ⬆️ ⬇️

Red Architecture - red help button for complex and intricate systems - part 3 (multi-threading to help)

The final part of the description of the Red Architecture is devoted to multithreading. For the sake of justice, it is worth saying that the initial version of the class v cannot be considered optimal, since there is nothing in it to solve one of the main problems to which the developers of real world applications inevitably arrive. To fully understand the current article, you need to become familiar with the concept of Red Architecture here

Red architecture

Looking ahead to say that we will be able to solve all the problems of multithreading without going beyond the limits of class v. And the changes will be much less than it might seem, and as a result, the code of class v with completely solved problems of multithreading will consist of a little more than 50 lines! Moreover, these 50 with a few lines will be more optimal than the variant of class v, described in the first part . In this case, the specific code that solves the problem of synchronization of threads will take only 20 lines!

In the course of the text, we will analyze the individual lines from the listing of completed classes v and Tests, which are given at the end of this article.
')

Where can I apply the Red Architecture?


I want to emphasize that the examples given here, as well as the entire Red Architecture concept , are proposed for use in all possible languages ​​and platforms . With # / Xamarin and the .NET platform selected to demonstrate the Red Architecture based on my personal preferences, nothing more.

Two options for class v


We will have two options for class v. The second option, identical in functionality and use to the first, will be somewhat more complicated. But it can be used not only in the “standard” C # .NET environment, but also in the PCL environment Xamarin, which means for mobile development right under three platforms: iOS, Android, Windows 10 Mobile. The fact is that in the PCL environment of the Xamarin framework, thread safe collections are not available, so the class v variant for Xamarin / PCL will contain more code for synchronizing threads. This is what we will look at in this article, since a simplified version of the class v (also included at the end of this article) is less valuable in terms of understanding multithreading problems and how to solve them.

A little bit of optimization


First of all, we get rid of the base class and make the class v self-sufficient. We do not need a base class notification mechanism that we used until now. The inherited mechanism does not allow solving problems of multithreading in the optimal way. Therefore, we now “ourselves” will send events to handler functions:

static Dictionary<k, HashSet<NotifyCollectionChangedEventHandler>> handlersMap = new Dictionary<k, HashSet<NotifyCollectionChangedEventHandler>>(); // ... foreach (var handlr in new List<NotifyCollectionChangedEventHandler>(handlersMap[key])) lock(handlr) try { handlr.Invoke(key, new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Add, new List<KeyValuePair<k, object>>(){ new KeyValuePair<k, object>(key, o) })); 

In the Add () method in the foreach loop, we copy the elements from HashSet 'a into the List and the iteration is already on the list, not the hashset. We need to do this, because the value returned by the expression handlersMap [key] is a global variable accessible from public class-mutated state methods such as m () and h (), therefore, it is possible that the HashMap returned by the expression handlersMap [key] will be modified by another thread during the iteration on it in the Add () method, and this will cause runtime because the iteration on the collection inside the foreach is not finished yet, its (collection) modification is prohibited. That is why we “substitute” for iteration not a global variable, but a List into which elements of the global HashSet are copied.

But this protection is not enough. In terms of

 new List<NotifyCollectionChangedEventHandler>(handlersMap[key]) 

The value (hash set) of handlersMap [key] implicitly applies the copy operation. This will definitely cause problems if, during the period between the beginning and the end of the copy operation, some other stream tries to add or delete an item in the hashset being copied. Therefore, we put a lock (Monitor.Enter (handlersMap [key])) on this hashset just before the start of foreach

 Monitor.Enter(handlersMap[key]); foreach (var handlr in new List<NotifyCollectionChangedEventHandler>(handlersMap[key])) { // ... 

and “release” (Monitor.Exit (handlersMap [key])) right after entering the foreach loop

 foreach (var handlr in new List<NotifyCollectionChangedEventHandler>(handlersMap[key])) { if (Monitor.IsEntered(handlersMap[key])) { Monitor.PulseAll(handlersMap[key]); Monitor.Exit(handlersMap[key]); } // ... 

According to the rules of the Monitor object, the number of calls to Enter () must correspond to the number of calls to Exit (), so we have an if check (Monitor.IsEntered (handlersMap [key])) that ensures that if the lock has been installed, then we will exit only one times, at the beginning of the first iteration of the foreach loop. Immediately after the Monitor.Exit line (handlersMap [key]), the handlersMap [key] hashset will again be available for use by other threads. Thus, we limit the hashset blocking to the minimum possible time, it can be said that in this case, the hashset will be blocked for just a moment.

Immediately after the foreach loop, we see a repetition of the lock code.

 // ... if (Monitor.IsEntered(handlersMap[key])) { Monitor.PulseAll(handlersMap[key]); Monitor.Exit(handlersMap[key]); } // ... 

This code is necessary in case there has not been a single iteration in the foreach, which is possible when for one of the keys there is not a single handler in the corresponding hashset.

The following code requires a detailed explanation:

  lock(handlr) try { // ... 

The fact is that in the Red Architecture concept, the only objects created outside the class v and requiring synchronization of threads are handler functions. If we couldn’t manage the code that call handlers to our functions, we would have to “fence” something like

 void OnEvent(object sender, NotifyCollectionChangedEventArgs e) { lock(OnEvent); //    unlock(OnEvent); } 

Notice the lock () unlock () lines between which the useful method code is located. If inside the handler data is modified that is external to it, then lock () and unlock () would be necessary to add. Because simultaneously the flows entering into this function will change values ​​of external variables in the chaotic order.

But instead, we added just one line to the whole program - lock (handlr), and did it inside the class v without touching anything outside of it! Now we can write as many handler functions as possible without thinking about their thread safety, since the implementation of class v guarantees that only one thread can enter this particular handler, other threads will “stand” on lock (handlr) and wait for the work to be completed in this the handler of the previous thread that entered it.

foreach, for (;;) and multithreading


In the Tests listing (at the end of the article), there is a foreachTest (string [] a) method that checks the for (;;) loop operation while simultaneously entering this method and, therefore, into the for (;;) loop of two threads. The following is a possible part of the output of this method:

// ...
~: string20
~: string21
~: string22
~: astring38
~: astring39
~: string23
~: string24
~: astring40
~: astring41
~: string25
~: astring42
~: string26
~: astring43
~: astring44
~: string27
~: astring45
~: string28
// ...

We see that despite the mixed output of the strings “string” and “astring”, the numeric suffix of each of the strings goes in order, i.e. To output each of the lines, the local variable i is taken to be correct. Such a conclusion suggests that the simultaneous input of two streams to for (;;) is safe. Probably, all variables declared within the framework of the for (;;) construction, for example, the variable int i, are created on the stack of the thread entered in for (;;). That is why access to variables created inside for (;;) does not need “manual” synchronization, since they are already available only to the thread in whose stack they were created. This is the case on C # and the .NET platform. In other languages, although unlikely, there may be a different behavior, so such a test would not be superfluous.

try ... catch is the norm, not an exception


try ... catch At first glance, this construction does not seem necessary, but it is important. It is designed to protect us from a situation where at the time of the call to handlr.Invoke () the object in which the defined handlr was destroyed. An object can be destroyed by another thread or by the garbage collector at any time between the lines.

 foreach (var handlr in new List<NotifyCollectionChangedEventHandler>(handlersMap[key])) 

and

 handlr.Invoke(); 

In exception handling - the catch block, we check if the handler refers to a zero (remote) object, we simply remove it from the list of handlers.

 lock (handlr) try { handlr.Invoke(key, new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Add, new List<KeyValuePair<k, object>>(){ new KeyValuePair<k, object>(key, o) })); #if __tests__ /* check modification of global collection of handlers for a key while iteration through its copy */ handlersMap[key].Add((object sender, NotifyCollectionChangedEventArgs e) => { }); #endif } catch (Exception e) { // because exception can be thrown inside handlr.Invoke(), but before handler was destroyied. if (ReferenceEquals(null,handlr) && e is NullReferenceException) // handler invalid, remove it m(handlr); else // exception in handler's body throw e; } 

Class v initialization


The static constructor is one of the distinguishing features of C #. It can not be called directly. It is called automatically only once, before creating the first object of this class. We use it to initialize handlersMap — for all keys from k, we prepare for use empty HashSets intended for storing handler functions of each of the keys. In the absence of a static constructor in other languages, any method that initializes an object will do.

 static v() { foreach (ke in Enum.GetValues(typeof(k))) handlersMap[e] = new HashSet<NotifyCollectionChangedEventHandler>(); new Tests().run(); } 

How to deal with thread unsafe collection?


The C # HashSet class does not provide synchronization when modifying from multiple threads (not thread safe), so we need to synchronize the modification of this object, namely the deletion and addition of elements. In our case, it’s enough to add one lock (handlersMap [key]) line just before the operation of deleting / adding an element in the methods m (), h () of class v. In this case, the object blocking the thread will be the HashMap object associated with this particular key key. This will make it possible to modify this particular hashset with only one stream.

"Side effects" multithreading


Some of the "side effects" of multi-threading are worth mentioning. In particular, the code of handler functions must be prepared for the fact that in some cases it will be called after the "unsubscribe" of the handler function from receiving events. That is, after calling m (key, handler), the handler can still be called for some time (probably a few fractions of a second). This is possible because at the time the handlersMap [key] .Remove (handler) in the m () method is called, this handler may already be copied by another thread in the foreach line (var handlr in new List (handlersMap [key])) , and will be called in the Add () method of the class v after its deletion in the m () method.

Simple rules for solving complex problems.


Finally, I want to draw attention to the fact that we, being diligent developers, do not violate agreements on the use of locks. In particular, such agreements are listed on this page in the Remarks docs.microsoft.com/en-us/dotnet/csharp/language-reference/keywords/lock-statement section. They are common to all languages, not just C #. The essence of these agreements is as follows:


We use 2 types of objects for locks and both are private. The first type is a HashSet object, which is private for class v. The second type is an object of the handler function type. Functions handlers are declared private in all objects that they declare and use to receive events. In the case of the Red Architecture, only the class v should call the function handlers directly, and nothing else.

Listings


Below is the complete code for the classes v and Tests. In C #, you can use them directly by copying from here. “Translation” of this code into other languages ​​will be a small and entertaining task for you.

Below is the code of the “universal” class v, which can also be used in mobile application projects on the Xamarin / C # platform.

 using System; using System.Collections.Generic; using System.Collections.Specialized; using System.Threading; namespace Common { public enum k {OnMessageEdit, MessageEdit, MessageReply, Unused, MessageSendProgress, OnMessageSendProgress, OnIsTyping, IsTyping, MessageSend, JoinRoom, OnMessageReceived, OnlineStatus, OnUpdateUserOnlineStatus } public class v { static Dictionary<k, HashSet<NotifyCollectionChangedEventHandler>> handlersMap = new Dictionary<k, HashSet<NotifyCollectionChangedEventHandler>>(); public static void h(k[] keys, NotifyCollectionChangedEventHandler handler) { foreach (var key in keys) lock(handlersMap[key]) handlersMap[key].Add(handler); } public static void m(NotifyCollectionChangedEventHandler handler) { foreach (k key in Enum.GetValues(typeof(k))) lock(handlersMap[key]) handlersMap[key].Remove(handler); } public static void Add(k key, object o) { Monitor.Enter(handlersMap[key]); foreach (var handlr in new List<NotifyCollectionChangedEventHandler>(handlersMap[key])) { if (Monitor.IsEntered(handlersMap[key])) { Monitor.PulseAll(handlersMap[key]); Monitor.Exit(handlersMap[key]); } lock (handlr) try { handlr.Invoke(key, new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Add, new List<KeyValuePair<k, object>>(){ new KeyValuePair<k, object>(key, o) })); #if __tests__ /* check modification of global collection of handlers for a key while iteration through its copy */ handlersMap[key].Add((object sender, NotifyCollectionChangedEventArgs e) => { }); #endif } catch (Exception e) { // because exception can be thrown inside handlr.Invoke(), but before handler was destroyied. if (ReferenceEquals(null,handlr) && e is NullReferenceException) // handler invalid, remove it m(handlr); else // exception in handler's body throw e; } } if (Monitor.IsEntered(handlersMap[key])) { Monitor.PulseAll(handlersMap[key]); Monitor.Exit(handlersMap[key]); } } static v() { foreach (ke in Enum.GetValues(typeof(k))) handlersMap[e] = new HashSet<NotifyCollectionChangedEventHandler>(); new Tests().run(); } } } 

Below is the code of the “simplified” class v, which can be used on the “standard” C # .NET platform. Its only difference from the “universal” counterpart is the use of the ConcurrentBag collection instead of HashMap, which provides out-of-the-box synchronization of streams when accessing itself. Using ConcurrentBag instead of HashSet allowed to remove most of the synchronization code streams from the class v.

 using System; using System.Collections.Generic; using System.Collections.Specialized; using System.Collections.Concurrent; using System.Threading; namespace Common { public enum k { OnMessageEdit, MessageEdit, MessageReply, Unused, MessageSendProgress, OnMessageSendProgress, OnIsTyping, IsTyping, MessageSend, JoinRoom, OnMessageReceived, OnlineStatus, OnUpdateUserOnlineStatus } public class v { static Dictionary<k, ConcurrentBag<NotifyCollectionChangedEventHandler>> handlersMap = new Dictionary<k, ConcurrentBag<NotifyCollectionChangedEventHandler>>(); public static void h(k[] keys, NotifyCollectionChangedEventHandler handler) { foreach (var key in keys) handlersMap[key].Add(handler); } public static void m(NotifyCollectionChangedEventHandler handler) { foreach (k key in Enum.GetValues(typeof(k))) handlersMap[key].Remove(handler); } public static void Add(k key, object o) { foreach (var handlr in new List<NotifyCollectionChangedEventHandler>(handlersMap[key])) { lock (handlr) try { handlr.Invoke(key, new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Add, new List<KeyValuePair<k, object>>(){ new KeyValuePair<k, object>(key, o) })); #if __tests__ /* check modification of global collection of handlers for a key while iteration through its copy */ handlersMap[key].Add((object sender, NotifyCollectionChangedEventArgs e) => { }); #endif } catch (Exception e) { // because exception can be thrown inside handlr.Invoke(), but before handler was destroyied. if (ReferenceEquals(null,handlr) && e is NullReferenceException) // handler invalid, remove it m(handlr); else // exception in handler's body throw e; } } } static v() { foreach (ke in Enum.GetValues(typeof(k))) handlersMap[e] = new ConcurrentBag<NotifyCollectionChangedEventHandler>(); new Tests().run(); } } } 

Below is the code for the Tests class that tests the multithreaded use of the v class, as well as handler functions. Pay attention to the comments. They have a lot of useful information about how the testing and test code works.

 using System; using System.Collections.Generic; using System.Collections.Specialized; using System.Threading.Tasks; using System.Diagnostics; using System.Linq; namespace ChatClient.Core.Common { class DeadObject { void OnEvent(object sender, NotifyCollectionChangedEventArgs e) { var newItem = (KeyValuePair<k, object>)e.NewItems[0]; Debug.WriteLine(String.Format("~ OnEvent() of dead object: key: {0} value: {1}", newItem.Key.ToString(), newItem.Value)); } public DeadObject() { vh(new k[] { k.OnlineStatus }, OnEvent); } ~DeadObject() { // Accidentally we forgot to call vm(OnEvent) here, and now v.handlersMap contains reference to "dead" handler } } public class Tests { void OnEvent(object sender, NotifyCollectionChangedEventArgs e) { var newItem = (KeyValuePair<k, object>)e.NewItems[0]; Debug.WriteLine(String.Format("~ OnEvent(): key: {0} value: {1}", newItem.Key.ToString(), newItem.Value)); if (newItem.Key == k.Unused) { // v.Add(k.Unused, "stack overflow crash"); // reentrant call in current thread causes stack overflow crash. Deadlock doesn't happen, because lock mechanism allows reentrancy for a thread that already has a lock on a particular object // Task.Run(() => v.Add(k.Unused, "deadlock")); // the same call in a separate thread don't overflow, but causes infinite recursive loop } } void OnEvent2(object sender, NotifyCollectionChangedEventArgs e) { var newItem = (KeyValuePair<k, object>)e.NewItems[0]; Debug.WriteLine(String.Format("~ OnEvent2(): key: {0} value: {1}", newItem.Key.ToString(), newItem.Value)); } void foreachTest(string[] a) { for (int i = 0; i < a.Length; i++) { Debug.WriteLine(String.Format("~ : {0}{1}", a[i], i)); } } async void HandlersLockTester1(object sender, NotifyCollectionChangedEventArgs e) { var newItem = (KeyValuePair<k, object>)e.NewItems[0]; Debug.WriteLine(String.Format("~ HandlersLockTester1(): key: {0} value: {1}", newItem.Key.ToString(), newItem.Value)); await Task.Delay(300); } async void HandlersLockTester2(object sender, NotifyCollectionChangedEventArgs e) { var newItem = (KeyValuePair<k, object>)e.NewItems[0]; Debug.WriteLine(String.Format("~ HandlersLockTester2(): key: {0} value: {1}", newItem.Key.ToString(), newItem.Value)); } public async void run() { // Direct call for garbage collector - should be called for testing purposes only, not recommended for a business logic of an application GC.Collect(); /* * == test v.Add()::foreach (var handlr in new List<NotifyCollectionChangedEventHandler>(handlersMap[key])) * for two threads entering the foreach loop at the same time and iterating handlers only of its key */ Task t1 = Task.Run(() => { v.Add(k.OnMessageReceived, "this key"); }); Task t2 = Task.Run(() => { v.Add(k.MessageEdit, "that key"); }); // wait for both threads to complete before executing next test await Task.WhenAll(new Task[] { t1, t2 }); /* For now DeadObject may be already destroyed, so we may test catch block in v class */ v.Add(k.OnlineStatus, "for dead object"); /* test reentrant calls - causes stack overflow or infinite loop, depending on code at OnEvent::if(newItem.Key == k.Unused) clause */ v.Add(k.Unused, 'a'); /* testing foreach loop entering multiple threads */ var s = Enumerable.Repeat("string", 200).ToArray(); var n = Enumerable.Repeat("astring", 200).ToArray(); t1 = Task.Run(() => { foreachTest(s); }); t2 = Task.Run(() => { foreachTest(n); }); // wait for both threads to complete before executing next test await Task.WhenAll(new Task[] { t1, t2 }); /* testing lock(handlr) in Add() method of class v */ vh(new k[] { k.IsTyping }, HandlersLockTester1); vh(new k[] { k.JoinRoom }, HandlersLockTester2); // line 1 Task.Run(() => { v.Add(k.IsTyping, "first thread for the same handler"); }); // line 2 Task.Run(() => { v.Add(k.IsTyping, "second thread for the same handler"); }); // line below will MOST OF THE TIMES complete executing before the line 2 above, because line 2 will wait completion of line 1 // since both previous lines 1 and 2 are calling the same handler, access to which is synchronized by lock(handlr) in Add() method of class v Task.Run(() => { v.Add(k.JoinRoom, "third thread for other handler"); }); } public Tests() { // add OnEvent for each key vh(new k[] { k.OnMessageReceived, k.MessageEdit, k.Unused }, OnEvent); // add OnEvent2 for each key vh(new k[] { k.Unused, k.OnMessageReceived, k.MessageEdit }, OnEvent2); /* == test try catch blocks in v class, when handler is destroyed before handlr.Invoke() called */ var ddo = new DeadObject(); // then try to delete object, setting its value to null. We are in a managed environment, so we can't directly manage life cicle of an object. ddo = null; } } } 

The code that registers the handler function as well as the handler function itself for such a class v could look like this:

handler function registration code

 // add OnEvent for each key vh(new k[] { k.OnMessageReceived, k.MessageEdit, k.Unused }, OnEvent); 

handler function code

 void OnEvent(object sender, NotifyCollectionChangedEventArgs e) { var newItem = (KeyValuePair<k, object>)e.NewItems[0]; Debug.Write("~ OnEvent(): key {0} value {1}", newItem.Key.ToString(), newItem.Value); } 

A general description of the Red Architecture is here .

Source: https://habr.com/ru/post/334840/


All Articles