📜 ⬆️ ⬇️

Signals on c #

Good day Habr. Inspired by the synchronization model of threads in go and signals in QT, an idea appeared to implement something similar in c #.

image

If interested, please under the cat.

At the moment, synchronization of threads in c # causes some difficulties, in particular, the transfer of synchronization primitives between the objects of your application and support for this in the future.
')
The current model with Task and IAsyncResult as well as the TPL as a whole solves all problems with proper design, but I wanted to create a simple class through which you can send and receive signals with blocking flow.

In general, a certain interface has matured in my head:

public interface ISignal<T> : IDisposable { void Send(T signal); T Receive(); T Receive(int timeOut); } 

where T is an entity that must be transferred to the recipient.

Call example:

  [TestMethod] public void ExampleTest() { var signal = SignalFactory.GetInstanse<string>(); var task1 = Task.Factory.StartNew(() => //   { Thread.Sleep(1000); signal.Send("Some message"); }); //    string message = signal.Receive(); Debug.WriteLine(message); } 

To get the signal object, create a factory.

  public static class SignalFactory { public static ISignal<T> GetInstanse<T>() { return new Signal<T>(); } public static ISignal<T> GetInstanse<T>(string name) { return new CrossProcessSignal<T>(name); } } 

Signal - internal class for synchronization within one process. For synchronization, an object reference is required.

CrossProcessSignal is an internal class that can synchronize threads in separate processes (but more on that later).

Now about the implementation of Signal


The first thing that comes to mind in Receive is to block the execution of a stream using Semaphore and, in the Send method, call the Release () of this semaphore with the number of blocked threads.
After unlocking threads, return the result from the field of the T buffer class. But we do not know how many threads will hang in Receive and there is no guarantee that a couple more threads will not run to the Release call.

AutoResetEvent was selected as the synchronization primitive. For each new thread, an AutoResetEvent will be created, we will store all this stuff in the Dictionary <int, AutoResetEvent> dictionary where the key is the stream id.

Actually the class fields look like this:

 private T buffer; Dictionary<int,AutoResetEvent> events = new Dictionary<int, AutoResetEvent>(); private volatile object sync = new object(); private bool isDisposabled = false; 

We will need the sync object when we call Send, so that several threads do not start overwriting the buffer.

The isDisposabled flag indicates whether Dispose () was called, if not called, then call it in the destructor.

 public void Dispose() { foreach(var resetEvent in events.Values) { resetEvent.Dispose(); } isDisposabled = true; } ~Signal() { if (!isDisposabled) { Dispose(); } } 

Now about the Receive method.

  public T Receive() { var waiter = GetEvents(); waiter.WaitOne(); waiter.Reset(); return buffer; } 

GetEvents () gets out of the dictionary AutoResetEvent if there is, if not then creates a new one and puts it in the dictionary.

waiter.WaitOne () blocking the stream before waiting for the signal.

waiter.Reset () resets the current state of AutoResetEvent. The next WaitOne call will block the thread.

It remains only to call the Set method for each AutoResetEvent.

 public void Send(T signal) { lock (sync) { buffer = signal; foreach(var autoResetEvent in events.Values) { autoResetEvent.Set(); } } } 

You can check this model with dough:

Test
 private void SendTest(string name = "") { ISignal<string> signal; if (string.IsNullOrEmpty(name)) { signal = SignalFactory.GetInstanse<string>(); //    } else { signal = SignalFactory.GetInstanse<string>(name); } var task1 = Task.Factory.StartNew(() => //   { for (int i = 0; i < 10; i++) { //  ,   var message = signal.Receive(); Debug.WriteLine($"Thread 1 {message}"); } }); var task2 = Task.Factory.StartNew(() => //   { for (int i = 0; i < 10; i++) { //  ,   var message = signal.Receive(); Debug.WriteLine($"Thread 2 {message}"); } }); for (int i = 0; i < 10; i++) { //    . signal.Send($"Ping {i}"); Thread.Sleep(50); } } 

Signal class listing
 using System.Collections.Generic; using System.Threading; namespace Signal { internal class Signal<T> : ISignal<T> { private T buffer; Dictionary<int,AutoResetEvent> events = new Dictionary<int, AutoResetEvent>(); private volatile object sync = new object(); private bool isDisposabled = false; ~Signal() { if (!isDisposabled) { Dispose(); } } public T Receive() { var waiter = GetEvents(); waiter.WaitOne(); waiter.Reset(); return buffer; } public T Receive(int timeOut) { var waiter = GetEvents(); waiter.WaitOne(timeOut); waiter.Reset(); return buffer; } public void Send(T signal) { lock (sync) { buffer = signal; foreach(var autoResetEvent in events.Values) { autoResetEvent.Set(); } } } private AutoResetEvent GetEvents() { var threadId = Thread.CurrentThread.ManagedThreadId; AutoResetEvent autoResetEvent; if (!events.ContainsKey(threadId)) { autoResetEvent = new AutoResetEvent(false); events.Add(threadId, autoResetEvent); } else { autoResetEvent = events[threadId]; } return autoResetEvent; } public void Dispose() { foreach(var resetEvent in events.Values) { resetEvent.Dispose(); } isDisposabled = true; } } } 

This implementation has room to grow in terms of reliability. In the source code there is an interprocessor implementation of this idea with signal transmission through shared memory, if it is interesting, I can write a separate article about it.

Sources on Github

Source: https://habr.com/ru/post/336742/


All Articles