📜 ⬆️ ⬇️

AsyncCollections: the story of one bike

Since ancient times, I have been a big fan of System.Collections.Concurrent and the BlockingCollection in particular. How many times this engineering marvel has rescued in the most diverse situations - it is impossible to count.

Since a little less ancient times, async / await has become a part of everyday life. It would seem that life is beautiful, but there is one “but”: you don’t really want to mix asynchronous code with a blocking code. And BlockingCollection, as it is easy to guess (at least from the name), in some cases, the flow blocks.

False trail: Nito.AsyncEx


One day I came across a reference to the Nito.AsyncEx library written by Stephen Cleary, in which there was a class with the intriguing name AsyncCollection. However, having looked at what is under his hood, I was left in some bewilderment: there was AsyncLock from the same library, hung on all the actions on the wrapped IProducerConsumerCollection. AsyncLock, in turn, is actively using the most ordinary lock-ups and a thin layer of magic, which I suddenly didn't want to unravel. Even if this implementation does what is stated, it looks a bit sophisticated, monstrous, and perhaps not very optimal. Is it really impossible to solve this problem more accurately?

We all know what such thoughts are fraught with. Visual Studio, New project ...
')

AsyncQueue


To begin with, let's define what we all want from our asynchronous collection. As a starting point, you can take the following interface:

public interface IAsyncCollection<T>: IEnumerable<T> { int Count { get; } void Add( T item ); Task<T> TakeAsync(); } 

In addition, for simplicity, we will focus on the fact that our collection is a queue. Why the queue? Yes, for about the same reason that the default queue is used in the BlockingCollection.

Then comes the intense work of thought associated with an attempt to determine the possible states of our collection. At first glance, there may be 3 of them:

1. There are no elements in the collection, but there were calls to TakeAsync (), which need to be completed Task when the elements appear (for simplicity and brevity, I will call them awaiter later). In this case:

2. Awaiter is not present, but there were calls to Add (). The situation is completely symmetrical to the previous one:

3. Both queues — both the awaiter queue and the item queue — are empty. Depending on the following action, we go either to state 1 or to state 2:

I don’t want to hang locks, we didn’t go away from the implementation of Nito.AsyncEx, which was stuffed with them. What do ConcurrentQueues do in such cases? They understand that right now an operation is underway in the next thread, which is about to end, and after which we can do something useful, create SpinWait and turn on hold. Let's try to reproduce this idea with us. Need to:


The first two requirements very much resemble the work of the Interlocked class; to store the state, you can use something like a queue balance: TakeAsync () atomically reduces the balance by one, Add () atomically increases it. And by the value of the balance, which returns Interlocked.Increment / Interlocked.Decrement, you can find out that a new element / awaiter is coming, even before it appears in the corresponding queue. Pretty chatter, try to code all of the above:

  public class AsyncQueue<T>: IAsyncCollection<T> { private ConcurrentQueue<T> _itemQueue = new ConcurrentQueue<T>(); private ConcurrentQueue<TaskCompletionSource<T>> _awaiterQueue = new ConcurrentQueue<TaskCompletionSource<T>>(); // _queueBalance < 0 means there are free awaiters and not enough items. // _queueBalance > 0 means the opposite is true. private long _queueBalance = 0; public void Add( T item ) { long balanceAfterCurrentItem = Interlocked.Increment( ref _queueBalance ); if ( balanceAfterCurrentItem > 0 ) { // Items are dominating, so we can safely add a new item to the queue. _itemQueue.Enqueue( item ); } else { // There's at least one awaiter available or being added as we're speaking, so we're giving the item to it. TaskCompletionSource<TItem> awaiter; SpinWait spin = new SpinWait(); while ( !_awaiterQueue.TryDequeue( out awaiter ) ) spin.SpinOnce(); awaiter.SetResult( item ); } } public Task<T> TakeAsync() { long balanceAfterCurrentAwaiter = Interlocked.Decrement( ref _queueBalance ); if ( balanceAfterCurrentAwaiter < 0 ) { // Awaiters are dominating, so we can safely add a new awaiter to the queue. var taskSource = new TaskCompletionSource<TItem>(); _awaiterQueue.Enqueue( taskSource ); return taskSource.Task; } else { // There's at least one item available or being added, so we're returning it directly. T item; SpinWait spin = new SpinWait(); while ( !_itemQueue.TryTake( out item ) ) spin.SpinOnce(); return Task.FromResult( item ); } } } 

We are testing, we are surprised to find that it seems to even work. Victory? On the one hand, yes, on the other hand, the overwhelming creative impulse is so easy to stop ...

Useful (and not so) buns


Let's look closely at what we did. Synchronous Add (), asynchronous TakeAsync () ... Stop, asynchronous method without the possibility of its cancellation? Disorder. We fix.

First, when canceling CancellationToken, you must immediately cancel the corresponding task:

  public Task<TItem> TakeAsync( CancellationToken cancellationToken ) { // ... if ( balanceAfterCurrentAwaiter < 0 ) { var taskSource = new TaskCompletionSource<TItem>(); _awaiterQueue.Enqueue( taskSource ); cancellationToken.Register( state => { TaskCompletionSource<T> awaiter = state as TaskCompletionSource<T>; awaiter.TrySetCanceled(); }, taskSource, useSynchronizationContext : false ); return taskSource.Task; } else { // ... } } 

Secondly, we obviously cannot pick out canceled awaiter from somewhere in the middle of the queue, so we need to teach Add () to skip this canceled awaiter. The balance is magically maintained automatically:

  private bool TryAdd( TItem item ) { long balanceAfterCurrentItem = Interlocked.Increment( ref _queueBalance ); if ( balanceAfterCurrentItem > 0 ) { _itemQueue.Enqueue( item ); return true; } else { TaskCompletionSource<T> awaiter; SpinWait spin = new SpinWait(); while ( !_awaiterQueue.TryDequeue( out awaiter ) ) spin.SpinOnce(); // Returns false if the cancellation occurred earlier. return awaiter.TrySetResult( item ); } } public void Add( TItem item ) { while ( !TryAdd( item ) ) ; } 

Thirdly, the old TakeAsync () method (which, without CancellationToken) can generally be transferred to an IAsyncCollection interface as an extension:

  public interface IAsyncCollection<T>: IEnumerable<T> { int Count { get; } void Add( T item ); Task<T> TakeAsync( CancellationToken cancellationToken ); } public static class AsyncCollectionExtensions { public static Task<T> TakeAsync<T>( this IAsyncCollection<T> collection ) { return collection.TakeAsync( CancellationToken.None ); } } 

By the way, about the IAsyncCollection. If you look closely, then our implementation of AsyncQueue does not have to be nailed to a ConcurrentQueue; any thread-safe IProducerConsumerCollection is suitable for storing items. For example, ConcurrentStack. Therefore, you can do this:

  public class AsyncCollection<TItem, TItemQueue>: IAsyncCollection<TItem> where TItemQueue: IProducerConsumerCollection<TItem>, new() { private TItemQueue _itemQueue = new TItemQueue(); private ConcurrentQueue<TaskCompletionSource<TItem>> _awaiterQueue = new ConcurrentQueue<TaskCompletionSource<TItem>>(); // ... } public class AsyncQueue<T>: AsyncCollection<T, ConcurrentQueue<T>> { } public class AsyncStack<T>: AsyncCollection<T, ConcurrentStack<T>> { } 

On the one hand, I would like not to produce type parameter, but simply to take an IProducerConsumerCollection in the constructor, but the trouble is: we can slip a collection that is already referenced from the outside and into which we can tap elements outside ( part of our elements), there having destroyed the synchronization between the real state of the collection and the stored balance. The factory method has the same problem, so you have to create the collection yourself.

Benchmarks!


It is time to measure the speed of our bike. To run benchmarks, there is a BenchmarkDotNet package that implements a bunch of small details that should be considered when running benchmarks, so that we zayuzay. The general idea of ​​the benchmark is as follows:

  class AsyncQueueBenchmark { private const int _consumerThreadCount = 3; private const int _producerThreadCount = 3; private const int _itemsAddedPerThread = 10000; private const int _itemsAddedTotal = _producerThreadCount * _itemsAddedPerThread; private IAsyncCollection<int> _currentQueue; private CancellationTokenSource _cancelSource; private int _itemsTaken; //      private void DdosCurrentQueue() { _consumerTasks = Enumerable.Range( 0, _consumerThreadCount ) .Select( _ => Task.Run( () => RunConsumerAsync() ) ) .ToArray(); _producerTasks = Enumerable.Range( 0, _producerThreadCount ) .Select( _ => Task.Run( () => RunProducer() ) ) .ToArray(); Task.WaitAll( _producerTasks ); Task.WaitAll( _consumerTasks ); } private async Task RunConsumerAsync() { try { CancellationToken cancelToken = _cancelSource.Token; while ( _itemsTaken < _itemsAddedTotal && !cancelToken.IsCancellationRequested ) { int item = await _currentQueue.TakeAsync( cancelToken ); int itemsTakenLocal = Interlocked.Increment( ref _itemsTaken ); if ( itemsTakenLocal >= _itemsAddedTotal ) { _cancelSource.Cancel(); break; } } } catch ( OperationCanceledException ) { } } private void RunProducer() { for ( int i = 0; i < _itemsAddedPerThread; i++ ) { int item = 42; _currentQueue.Add( item ); } } 

Those. we simply take a fixed pack of elements, fi fi e them in a queue in several streams, in parallel in several streams, we queue this queue, note how much time it takes. We slip different implementations of IAsyncCollection, compare. Participate in the race:

1. Freshly AsyncQueue
2. Nito.AsyncEx.AsyncCollection as follows:

  class NitoAsyncCollectionAdapter<T>: IAsyncCollection<T> { private Nito.AsyncEx.AsyncCollection<T> _collection; public NitoAsyncCollectionAdapter() { _collection = new Nito.AsyncEx.AsyncCollection<T>(); } #region IAsyncCollection<T> Members public void Add( T item ) { _collection.Add( item ); } public Task<T> TakeAsync( System.Threading.CancellationToken cancellationToken ) { return _collection.TakeAsync( cancellationToken ); } #endregion } 

3. BlockingCollection (well, how not to compare with it) in the form:

  class BlockingCollectionAdapter<T>: IAsyncCollection<T> { private BlockingCollection<T> _collection; public BlockingCollectionAdapter() { _collection = new BlockingCollection<T>(); } #region IAsyncCollection<T> Members public void Add( T item ) { _collection.Add( item ); } public Task<T> TakeAsync( System.Threading.CancellationToken cancellationToken ) { T item = _collection.Take( cancellationToken ); return Task.FromResult( item ); } #endregion } 

Results:
 HellBrick.AsyncCollections.AsyncQueue: 1ms |  Stats: MedianTicks = 3368, MedianMs = 1, Error = 06.34%
 Nito.AsyncEx.AsyncCollection: 12ms |  Stats: MedianTicks = 40503, MedianMs = 12, Error = 31.36%
 System.Concurrent.BlockingCollection: 2ms |  Stats: MedianTicks = 7222, MedianMs = 2, Error = 38.82%

The intuitive assessment of Nito.AsyncEx.AsyncCollection did not let down: this is really a monstrous slow down crap. But the most interesting thing: we managed to overtake the BlockingCollection in performance and at the same time do without blocking threads. Win! Open the cake or any other bonus snacks and go further.

AsyncBatchQueue


I occasionally had to use a small wrapper over the BlockingCollection, which took on the input single elements and gave them packs of a certain size. At the same time, if for a certain time the required number of elements did not accumulate, the timer worked and made the forced flush of what we managed to dial. Who wants an asynchronous version of this thing? I want to.

To begin with, we will do without a timer and manual flush. It is logical to store and deliver the assembled packs of elements using our new AsyncQueue tools:

  public class AsyncBatchQueue<T> { private int _batchSize; private Batch _currentBatch; private AsyncQueue<IReadOnlyList<T>> _batchQueue = new AsyncQueue<IReadOnlyList<T>>(); public AsyncBatchQueue( int batchSize ) { _batchSize = batchSize; _currentBatch = new Batch( this ); } public void Add( T item ) { SpinWait spin = new SpinWait(); while ( !_currentBatch.TryAdd( item ) ) spin.SpinOnce(); } public Task<IReadOnlyList<T>> TakeAsync( CancellationToken cancellationToken ) { return _batchQueue.TakeAsync( cancellationToken ); } private class Batch: IReadOnlyList<T> { private AsyncBatchQueue<T> _queue; // ? public Batch( AsyncBatchQueue<T> queue ) { _queue = queue; } public bool TryAdd( T item ) { // ? } } } 

What is happening here: in the Add method, you need to try to add an element to the current batch and, if we have filled it, flip it in the _batchQueue. In this case, the situation is quite possible when another stream has outstripped us, is currently engaged in adding / flush, but it has not yet managed to write a link to a (empty) batch in _currentBatch. Hence the good old SpinWait.

The main magic will be in the nested class Batch, the idea of ​​which is most brazenly borrowed from the implementation of the ConcurrentQueue (by the way, if anyone has not read the source, I recommend reading: there are many interesting things there). This idea is as follows:


It looks something like this. (Beware, the code is not yet viable! I’ll explain why later.)

  private class Batch: IReadOnlyList<T> { private AsyncBatchQueue<T> _queue; private T[] _items; private int _lastReservationIndex = -1; private int _count = -1; public Batch( AsyncBatchQueue<T> queue ) { _queue = queue; _items = new T[ _queue._batchSize ]; } public bool TryAdd( T item ) { int index = Interlocked.Increment( ref _lastReservationIndex ); // The following is true if someone has beaten us to the last slot and we have to wait until the next batch comes along. if ( index >= _queue._batchSize ) return false; // The following is true if we've taken the last slot, which means we're obligated to flush the current batch and create a new one. if ( index == _queue._batchSize - 1 ) FlushInternal( _queue._batchSize ); _items[ index ] = item; return true; } private void FlushInternal( int count ) { _count = count; _queue._currentBatch = new Batch( _queue ); _queue._batchQueue.Add( this ); } } 

Further, it would be nice to still implement IReadOnlyList. Here one nuance comes up: no one guarantees that when we are flush or batch, all the elements of the array are filled with real data. The stream that grabbed the last element could just be faster. This suggests a solution: for each slot in the array, store a flag that determines whether the corresponding value can be read.

  private class Batch: IReadOnlyList<T> { // ... private bool[] _finalizationFlags; public Batch( AsyncBatchQueue<T> queue ) { // ... _finalizationFlags = new bool[ _queue._batchSize ]; } public bool TryAdd( T item ) { // ... _items[ index ] = item; _finalizationFlags[ index ] = true; return true; } public T this[ int index ] { get { if ( index >= _count ) throw new IndexOutOfRangeException(); return GetItemWithoutValidation( index ); } } private T GetItemWithoutValidation( int index ) { SpinWait spin = new SpinWait(); while ( !_finalizationFlags[ index ] ) spin.SpinOnce(); return _items[ index ]; } // ...     GetItemWithoutValidation } 


And now begins the real magic. The problem is that in the code there are a lot of places where the compiler with the processor can spoil everything, rearranging the instructions in places and caching that which cannot be cached categorically.

1. In AsyncBatchCollection.Add (), the _currentBatch value can be read once and cached, with the result that, if the batch is full, the thread will spin forever. volatile to the rescue:

  public class AsyncBatchQueue<T> { // ... private volatile Batch _currentBatch; // ... } 

2. In the FlushInternal () method, batch can be added to the output queue before the _count field is filled. We stick full fence:

  private void FlushInternal( int count ) { _count = count; _queue._currentBatch = new Batch( _queue ); // The full fence ensures that the current batch will never be added to the queue before _count is set. Thread.MemoryBarrier(); _queue._batchQueue.Add( this ); } 

3. In the TryAdd method, the write instructions in _items [index] and _finalizationFlags [index] can be swapped. Again we stick in full fence:

  public bool TryAdd( T item ) { // ... // The full fence prevents setting finalization flag before the actual item value is written. _items[ index ] = item; Thread.MemoryBarrier(); _finalizationFlags[ index ] = true; return true; } 

4. An inverse problem (reading an item before a flag) can occur in GetItemWithoutValidation. We stick you-know-what:

  private T GetItemWithoutValidation( int index ) { SpinWait spin = new SpinWait(); while ( !_finalizationFlags[ index ] ) spin.SpinOnce(); // The full fence prevents reading item value before finalization flag is set. Thread.MemoryBarrier(); return _items[ index ]; } 

5. Everything in the same method, the value _finalizationFlags [index] can be cached, because of which the thread will spin forever. Usually, this is solved by hanging a volatile modifier on the field, but it is not possible to do this with an array element, so you understand:

  private T GetItemWithoutValidation( int index ) { SpinWait spin = new SpinWait(); while ( !_finalizationFlags[ index ] ) { spin.SpinOnce(); // The full fence prevents caching any part of _finalizationFlags[ index ] expression. Thread.MemoryBarrier(); } // ... } 

Here, by the way, it is worth making a small digression.
In the ConcurrentQueue, a similar problem is solved in a very unusual way :
  internal volatile VolatileBool[] m_state; struct VolatileBool { public VolatileBool(bool value) { m_value = value; } public volatile bool m_value; } 

If VolatileBool were a class instead of a structure, everything would be extremely simple: even if a reference to a VolatileBool instance is cached somewhere, reading volatile m_value is guaranteed to return the actual value of the field. Why this trick works with the structure, which in theory is supposed to be copied at the time of the call m_state [index], I did not understand.


It seems that the dangerous places are over and the basic functionality should work (at least, I sincerely would like to believe in it).

Now we set the timer


Everything seems to be great, but there is one (already not related to multithreading) caveat: if the number of elements added to the collection is not a multiple of batchSize, then we will never see the rest. Need the ability to do flush manually, and better by timer. The easiest way is to make the call to the Flush () method attempt to immediately grab the last slot in the array, thus marking the batch as full. At the same time, it is necessary to remember the last real _lastReservationIndex value, otherwise we will not be able to find out how many slots are actually occupied (spoiler: Interlocked.CompareExchange () comes to the rescue). A total of 5 possible scenarios are possible:

  1. _lastReservationIndex <0. flush is nothing.
  2. _lastReservationIndex> = _queue._batchSize. FlushInternal () will execute the thread that grabbed the last slot, do not do anything.
  3. _lastReservationIndex is valid and we managed to atomically install it to _queue._batchSize. We know the real number of elements in the array, you can do FlushInternal ().
  4. Between reading past _lastReservationIndex value and writing a new value to the same place, another thread crept in and grabbed the last element. In fact, the situation repeats option number 2: do nothing.
  5. Same as # 4, but the batch is not full. Spin, try again.

  public class AsyncBatchQueue<T>: IEnumerable<IReadOnlyList<T>> { // ... public void Flush() { SpinWait spin = new SpinWait(); while ( !_currentBatch.TryFlush() ) spin.SpinOnce(); } // ... private class Batch: IReadOnlyList<T> { // ... public bool TryFlush() { int expectedPreviousReservation = Volatile.Read( ref _lastReservationIndex ); // We don't flush if the batch doesn't have any items or if another thread is about to flush it // However, we report success to avoid unnecessary spinning. if ( expectedPreviousReservation < 0 || expectedPreviousReservation >= _queue._batchSize ) return true; int previousReservation = Interlocked.CompareExchange( ref _lastReservationIndex, _queue._batchSize, expectedPreviousReservation ); // Flush reservation has succeeded. if ( expectedPreviousReservation == previousReservation ) { FlushInternal( previousReservation + 1 ); return true; } // The following is true if someone has completed the batch by the time we tried to flush it. // Therefore the batch will be flushed anyway even if we don't do anything. // The opposite means someone has slipped in an update and we have to spin. return previousReservation >= _queue._batchSize; } // ... } } 

Done! It remains to hang the timer from above - this is a process so devoid of magic that I will try to do without the copy-related code. Benchmarks also will not, because I do not know with whom it would be possible to compare the performance.

What's next?


First, both the collections reviewed suffer from one subtle flaw. If someone does Thread.Abort (), then at the most unexpected moment ThreadAbortException may crash and destroy the carefully maintained consistency of the state of the collections. In the aforementioned ConcurrentQueue (and in a bunch of other places), this problem is solved in a very extravagant way:

 try { } finally { // Insert Thread.Abort()-safe code here } 

The case is quite rare, but just in case it would be nice to defend against it. Maybe someday I'll do it anyway.

Secondly, for happiness, we still lack at least one asynchronous collection: a prioritized queue. And, unlike the BlockingCollection, a trivial implementation using TakeFromAny () is not visible on the horizon. To be continued?..

PS


For those who heroically read to the end:

Nuget package: www.nuget.org/packages/AsyncCollections
Source code: github.com/HellBrick/AsyncCollections

If there is criticism, bugs, suggestions or just sound thoughts - write, I will be glad to discuss.

Source: https://habr.com/ru/post/240891/


All Articles