Recently, two articles from the same author on C # async / await functionality were published on the Medium resource.
The main conclusions were:
But the main problem of the above publications is the absolute misunderstanding of the cooperative multitasking model in C # with the introduction of misleading readers. The benchmarks themselves are meaningless , as we will see later.
Further in the article I will try to reveal the essence of the problem in more detail with examples of solutions.
After a little editing of the source code, the benchmark implementation on .NET turns out to be faster than the Go option. Along the way, we solve the stack overflow problem for recursive asynchronous methods.
NB : Freshly released .NET Core 2.0 and Go 1.8.3 will be used.
We proceed immediately to the consideration of example # 1:
using System; using System.Threading; using System.Threading.Tasks; namespace CSharpAsyncRecursion { class Program { static void Main(string[] args) { Console.WriteLine("Counted to {0}.", CountRecursivelyAsync(10000).Result); } static async Task<int> CountRecursivelyAsync(int count) { if (count <= 0) return count; return 1 + await CountRecursivelyAsync(count - 1); } } }
The console will StackOverflowException
with StackOverflowException
. Sorrow!
The implementation of tail-call optimization is not suitable here, because we are not going to edit the compiler, rewrite bytecode, etc.
Therefore, the solution should be suitable for the most general case.
Usually, the recursive algorithm is replaced by an iterative one. But in this case it does not suit us either.
The delayed execution mechanism comes to the rescue.
We implement a simple method Defer
:
Task<T> Defer<T, TState>(Func<TState, Task<T>> task, TState state) => Task.Factory.StartNew(async s => await task((TState)s), state).Unwrap();
In order to put a task in the queue, it is necessary to specify the scheduler.
The Task.Run
and Task.Factory.StartNew
allow you to use it (By default, TaskScheduler.Default
, which is suitable for this example), and the latter allows you to pass a state object to the delegate.
Currently,Task.Factory.StartNew
does not support generalized overloads and is unlikely to happen . If you need to pass a state, then eitherAction<object>
orFunc<object, TResult>
.
Rewrite the example using the new Defer
method:
static async Task Main(string[] args) { Task<T> Defer<T, TState>(Func<TState, Task<T>> task, TState state) => Task.Factory.StartNew(async s => await task((TState)s), state).Unwrap(); Task<int> CountRecursivelyAsync(int count) { if (count <= 0) return Task.FromResult(count); return Defer(seed => CountRecursivelyAsync(seed - 1).ContinueWith(rec => rec.Result + 1), count); } Console.WriteLine($"Counted to {await CountRecursivelyAsync(100000)}."); }
First, let's take a look at the benchmark code from this article .
package main import ( "flag"; "fmt"; "time" ) func measure(start time.Time, name string) { elapsed := time.Since(start) fmt.Printf("%s took %s", name, elapsed) fmt.Println() } var maxCount = flag.Int("n", 1000000, "how many") func f(output, input chan int) { output <- 1 + <-input } func test() { fmt.Printf("Started, sending %d messages.", *maxCount) fmt.Println() flag.Parse() defer measure(time.Now(), fmt.Sprintf("Sending %d messages", *maxCount)) finalOutput := make(chan int) var left, right chan int = nil, finalOutput for i := 0; i < *maxCount; i++ { left, right = right, make(chan int) go f(left, right) } right <- 0 x := <-finalOutput fmt.Println(x) } func main() { test() test() }
using System; using System.Diagnostics; using System.Linq; using System.Threading.Tasks; using System.Threading.Tasks.Channels; namespace ChannelsTest { class Program { public static void Measure(string title, Action<int, bool> test, int count, int warmupCount = 1) { test(warmupCount, true); // Warmup var sw = new Stopwatch(); GC.Collect(); sw.Start(); test(count, false); sw.Stop(); Console.WriteLine($"{title}: {sw.Elapsed.TotalMilliseconds:0.000}ms"); } static async void AddOne(WritableChannel<int> output, ReadableChannel<int> input) { await output.WriteAsync(1 + await input.ReadAsync()); } static async Task<int> AddOne(Task<int> input) { var result = 1 + await input; await Task.Yield(); return result; } static void Main(string[] args) { if (!int.TryParse(args.FirstOrDefault(), out var maxCount)) maxCount = 1000000; Measure($"Sending {maxCount} messages (channels)", (count, isWarmup) => { var firstChannel = Channel.CreateUnbuffered<int>(); var output = firstChannel; for (var i = 0; i < count; i++) { var input = Channel.CreateUnbuffered<int>(); AddOne(output.Out, input.In); output = input; } output.Out.WriteAsync(0); if (!isWarmup) Console.WriteLine(firstChannel.In.ReadAsync().Result); }, maxCount); Measure($"Sending {maxCount} messages (Task<int>)", (count, isWarmup) => { var tcs = new TaskCompletionSource<int>(); var firstTask = AddOne(tcs.Task); var output = firstTask; for (var i = 0; i < count; i++) { var input = AddOne(output); output = input; } tcs.SetResult(-1); if (!isWarmup) Console.WriteLine(output.Result); }, maxCount); } } }
What immediately thrown into the eyes:
Task.Yield()
, justifying it by the fact that otherwise the example will fall from StackOverflowException. With the same success, Task.Delay could also be used. Why trifle something? But, as they saw earlier, everything stems from the 'unfortunate' experience with recursive calls to asynchronous methods.Go uses the concept of goroutine - lightweight threads. Accordingly, each Gorutin has its own stack. Currently the stack size is 2KB. Therefore, when starting benchmarks, be careful (more than 4GB is needed)!
On the one hand, this can be useful for CLR JIT, and on the other hand, Go re-uses the already created gorutines, which makes it possible to exclude measurements of expenditure on memory allocation by the system.
Testing Environment:
Well, I got the following results:
Warmup (s) | Benchmark (s) | |
---|---|---|
Go | 9.3531 | 1.0249 |
C # | - | 1.3568 |
NB : Because If the example implements just a call chain, then neither GOMAXPROCS nor the size of the channel does not affect the result (already tested experimentally). We take into account the best time. Fluctuations are not quite important, because big difference.
Yes, indeed: Go is ahead of C # by ~ 30%.
Challange accepted!
If you do not use something like Task.Yield
, then again there will be a StackOverflowException.
This time we will not use Defer
!
The idea of implementation is simple: run add. a thread that listens / processes tasks in turn.
In my opinion, it is easier to implement your own scheduler than the synchronization context.
The TaskScheduler
class TaskScheduler
looks like this:
// Represents an object that handles the low-level work of queuing tasks onto threads. public abstract class TaskScheduler { /* */ public virtual int MaximumConcurrencyLevel { get; } public static TaskScheduler FromCurrentSynchronizationContext(); protected abstract IEnumerable<Task> GetScheduledTasks(); protected bool TryExecuteTask(Task task); protected abstract bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued); protected internal abstract void QueueTask(Task task); protected internal virtual bool TryDequeue(Task task); }
As we can see, TaskScheduler
already implements a semblance of a queue: QueueTask
and TryDequeue
.
In order not to reinvent the wheel, we will use the ready-made planners from the .NET team .
Rewrite this case in C # 7, making it as close as possible to Go:
static async Task Main(string[] args) { void Measure(string title, Action<int, bool> test, int count, int warmupCount = 1 { var sw = new Stopwatch(); sw.Start(); test(count, false); sw.Stop(); Console.WriteLine($"{title}: {sw.Elapsed.TotalMilliseconds:0.000}ms"); } async Task<int> f(Task<int> input) { return 1 + await input; // return output } await Task.Factory.StartNew(() => { if (!int.TryParse(args.FirstOrDefault(), out var maxCount)) maxCount = 1000000; Measure($"Sending {maxCount} messages (Task<int>)", (count, isWarmup) => { var tcs = new TaskCompletionSource<int>(); (var left, var right) = ((Task<int>)null, f(tcs.Task)); for (var i = 0; i < count; i++) { left = f(right); right = left; } tcs.SetResult(-1); if (!isWarmup) Console.WriteLine(right.Result); }, maxCount); }, CancellationToken.None, TaskCreationOptions.None, new StaTaskScheduler(2)); }
Here you need to make a couple of remarks:
GC.Collect()
remove as mentioned above.StaTaskScheduler
with two auxiliary threads to avoid blocking: one waits for the result from the main / last task, and the other handles the task chain itself .The problem of recursive calls disappears automatically. Therefore, we boldly remove the Task.Yield()
call from the f(input)
Task.Yield()
. If this is not done, then we can expect a slightly better result compared to the original, because The default scheduler uses ThreadPool.
Now we publish the release assembly:dotnet publish -c release -r win10-x64
And run ...
Suddenly we get about 600 ms instead of the previous 1300 ms. Not bad!
Go, I will remind, fulfilled at the level of 1000 ms. But the feeling of irrelevance of using channels as a means of cooperative multitasking in the original examples does not leave me.
ps
I did not do a huge number of test runs with full statistics of the distribution of measurement values specifically .
The purpose of the article was to highlight a specific use-case as an async / await and simultaneously dispel the myth of the impossibility of recursive calling asynchronous methods in C #.
pps
The cause of the initial C # lag was the use of Task.Yield()
. Constant context switching is not gud!
Source: https://habr.com/ru/post/336000/
All Articles