📜 ⬆️ ⬇️

Writing our SynchronizationContext

This story began a long time ago when I first tried to work with a UI not from a UI thread. And when I started to catch various “glitches”, I realized that this should be done carefully. Later, I ran into this in the world of dots and it was at that moment that I first became acquainted with SynchronizationContext. But then, after reading about the device of this object, I considered that this knowledge was enough for me. You can do this, for example, here: SynchronizationContext - when MSDN fails .

I remembered about SynchronizationContext only with the output of c # 5 and its async / await, since This mechanism interacts with this very synchronization context. This is done so that after an asynchronous operation, the code can be executed in the thread calling the asynchronous operation, which is very convenient when working with the UI. But by running this small code in the UI thread and any other:

Debug.WriteLine(Thread.CurrentThread.ManagedThreadId); await Task.Run(() => Debug.WriteLine(Thread.CurrentThread.ManagedThreadId)); Debug.WriteLine(Thread.CurrentThread.ManagedThreadId); 

We will see that the code is returned to the original stream only when it is launched in the UI stream. The thing is that the synchronization context is specified only in the UI stream (except for wcf, etc.). The thought immediately comes to mind, you just need to set the synchronization context to the desired stream. But here we have a problem; the standard implementation of SynchronizationContext does not give us the necessary capabilities. It allows you to continue executing code in the current thread or in the thread from the pool. After I did not find an implementation that you can just copy, run and see the desired result, I decided to try to implement my own and imagine how it could look like in practice. This will be discussed below.

To execute the code, SynchronizationContext provides two virtual methods, Send (synchronous execution) and Post (asynchronous). Therefore, we inherit from SynchronizationContext and override the necessary methods.
')
CustomSynchronizationContext
 class CustomSynchronizationContext : SynchronizationContext, IDisposable { private readonly AutoResetEvent _eventReset; private readonly Queue<KeyValuePair<SendOrPostCallback, object>> _workItems; private readonly Thread _thread; public CustomSynchronizationContext() { _eventReset = new AutoResetEvent(false); _workItems = new Queue<KeyValuePair<SendOrPostCallback, object>>(); _thread = new Thread(DoWork); _thread.Start(this); } private void DoWork(object obj) { SynchronizationContext.SetSynchronizationContext(obj as SynchronizationContext); while (true) { while (_workItems.Count > 0) { var item = _workItems.Dequeue(); item.Key(item.Value); } _eventReset.Reset(); _eventReset.WaitOne(); } } public override void Post(SendOrPostCallback d, object state) { _workItems.Enqueue(new KeyValuePair<SendOrPostCallback, object>(d, state)); _eventReset.Set(); } public void Dispose() { _eventReset.Dispose(); _thread.Abort(); } } 


We start.

 static void Main(string[] args) { var syncContext = new CustomSynchronizationContext(); Console.WriteLine(Thread.CurrentThread.ManagedThreadId); syncContext.Post(o => Console.WriteLine(Thread.CurrentThread.ManagedThreadId), null); } 

And we expect to see different streams. What's going on here? First, for convenience, we create and assign a stream inside the context, not a context to the stream. So we will be sure that no one but us can influence this flow. Secondly, we start a queue in which we will store delegates for execution in the created thread. Thirdly, we “tap” the AutoResetEvent so that the thread does not terminate and not go in cycles. Well, IDisposable. Note that if you delete the context, there will also be an attempt to eliminate the stream. Those. such code:

 static void Main(string[] args) { using (var syncContext = new CustomSynchronizationContext()) { Console.WriteLine(Thread.CurrentThread.ManagedThreadId); syncContext.Post(o => Console.WriteLine(Thread.CurrentThread.ManagedThreadId), null); } } 

most likely will display information only about the original stream. Perhaps this is not what I would like, but for a demo, I think it will do. In addition, it is easy to fix.

What about exception handling? Check it out.

 static void Main(string[] args) { var syncContext = new CustomSynchronizationContext(); try { syncContext.Post(o => { throw new Exception("TestException"); }, null); } catch (Exception ex) { Console.WriteLine(ex.Message); } } 

It is expected to fall in our “special” stream. It's time to remember that we also have the Send method, which is responsible for synchronous execution. This should allow you to wait for the delegate to complete and get an exception. Let's try.

CustomSynchronizationContext (final demo)
 class CustomSynchronizationContext : SynchronizationContext, IDisposable { private readonly AutoResetEvent _workerResetEvent; private readonly ConcurrentQueue<WorkItem> _workItems; private readonly Thread _thread; public CustomSynchronizationContext() { _workerResetEvent = new AutoResetEvent(false); _workItems = new ConcurrentQueue<WorkItem>(); _thread = new Thread(DoWork); _thread.Start(this); } private void DoWork(object obj) { SynchronizationContext.SetSynchronizationContext(obj as SynchronizationContext); while (true) { WorkItem workItem; while (_workItems.TryDequeue(out workItem)) workItem.Execute(); //Note: race condition here _workerResetEvent.Reset(); _workerResetEvent.WaitOne(); } } public override void Send(SendOrPostCallback d, object state) { if (Thread.CurrentThread == _thread) d(state); else { using (var resetEvent = new AutoResetEvent(false)) { var wiExecutionInfo = new WorkItemExecutionInfo(); _workItems.Enqueue(new SynchronousWorkItem(d, state, resetEvent, ref wiExecutionInfo)); _workerResetEvent.Set(); resetEvent.WaitOne(); if (wiExecutionInfo.HasException) throw wiExecutionInfo.Exception; } } } public override void Post(SendOrPostCallback d, object state) { _workItems.Enqueue(new AsynchronousWorkItem(d, state)); _workerResetEvent.Set(); } public void Dispose() { _workerResetEvent.Dispose(); _thread.Abort(); } private class WorkItemExecutionInfo { public bool HasException => Exception != null; public Exception Exception { get; set; } } private abstract class WorkItem { protected readonly SendOrPostCallback SendOrPostCallback; protected readonly object State; protected WorkItem(SendOrPostCallback sendOrPostCallback, object state) { SendOrPostCallback = sendOrPostCallback; State = state; } public abstract void Execute(); } private class SynchronousWorkItem : WorkItem { private readonly AutoResetEvent _syncObject; private readonly WorkItemExecutionInfo _workItemExecutionInfo; public SynchronousWorkItem(SendOrPostCallback sendOrPostCallback, object state, AutoResetEvent resetEvent, ref WorkItemExecutionInfo workItemExecutionInfo) : base(sendOrPostCallback, state) { if (workItemExecutionInfo == null) throw new NullReferenceException(nameof(workItemExecutionInfo)); _syncObject = resetEvent; _workItemExecutionInfo = workItemExecutionInfo; } public override void Execute() { try { SendOrPostCallback(State); } catch (Exception ex) { _workItemExecutionInfo.Exception = ex; } _syncObject.Set(); } } private class AsynchronousWorkItem : WorkItem { public AsynchronousWorkItem(SendOrPostCallback sendOrPostCallback, object state) : base(sendOrPostCallback, state) { } public override void Execute() { SendOrPostCallback(State); } } } 


Here, for convenience, we introduce the WorkItem class, which will execute the code (delegate) in the way we need. From it we inherit two more SynchronousWorkItem and AsynchronousWorkItem, by names it is clear what is the difference between them. In implementations, the only difference is that the synchronous version implements the wait (AutoResetEvent) and the absorption of the exception, which will then be thrown in the original stream. Now KeyValuePair <SendOrPostCallback, object> can be changed to WorkItem, well, we change the simple queue to competitive. Also, in the Send method, we add a check of the current thread and if it suddenly turns out to be “our”, then simply run the delegate here.

Check again.

 static void Main(string[] args) { var syncContext = new CustomSynchronizationContext(); try { syncContext.Send(o => { throw new Exception("TestException"); }, null); } catch (Exception ex) { Console.WriteLine(ex.Message); } } 

The exception has now been successfully processed. Well, it's time to run the very first code example, which was mentioned in the article, on a stream with a newly created synchronization context.

 static void Main(string[] args) { var syncContext = new CustomSynchronizationContext(); syncContext.Post(TestAsyncMethod, null); } async static void TestAsyncMethod(object obj) { Console.WriteLine(Thread.CurrentThread.ManagedThreadId); await Task.Run(() => Console.WriteLine(Thread.CurrentThread.ManagedThreadId)); Console.WriteLine(Thread.CurrentThread.ManagedThreadId); } 

My conclusion:

9
ten
9

Source: https://habr.com/ru/post/269985/


All Articles