📜 ⬆️ ⬇️

How does the ConcurrentBag in .Net

Among concurrent collections, the ConcurrentDictionary is the most popular. ConcurrentQueue and ConcurrentStack are also often used.

In general, the solution of locking parts of a collection for a thread-safe hash table is very simple, logical, and therefore even more beautiful.

The structure of ConcurrentDictionary was even described in an article on the Habré Under the hood of a Dictionary and ConcurrentDictionary . ConcurrentBag is not so popular, as it is used mainly where the Producer-Consumer pattern is implemented. Moreover, this structure works best when the same stream is engaged in adding and removing data from the collection. Why this happens will be discussed below.

Introduction


The basis of this structure is a simple idea to divide all data between threads so that each stream works as often as possible with its “virtual” part of the repository.
')
In .Net, as is known, to make a variable not in one instance for all threads, but its own instance in each thread, the [ThreadStatic] attribute is used. In the .Net Framework 4.0, the ThreadLocal class has been added, which is a convenient wrapper for working with such data.



In ConcurrentBag, data is stored in ThreadLocal m_locals. That is, each thread that works with this structure has its own ThreadLocalList instance. There are also volatile m_headList and m_tailList variables that point to the first and last element in m_locals respectively. This is necessary in order to receive IEnumerator when you need to get the entire collection.

References to the "head" and "tail" in m_locals exist, since the storage is implemented through a unidirectional linked list . That is, the thread has an instance of ThreadLocalList, in this class there is a ThreadLocalList field m_nextList, indicating the next instance of ThreadLocalList in another thread. This means that from a single thread, you can access all instances of this variable in all threads, “stepping” on m_nextList.

Next we will deal with the structure of the class ThreadLocalList. It is also a bidirectional linked list. The element is represented by the usual class Node. The pointer to the first and last element is m_head and m_tail, respectively. It is also worth noting that there is a Thread m_ownerThread field that stores a link to the current owner of the instance. Why the current and not the creator will be discussed below. The result is the following structure:



Add item


Getting ThreadLocalList

First, the ThreadLocalList for the current thread is obtained or created, if it was not created, the m_headList and m_tailList pointers are updated accordingly. And the creation takes place in a synchronized code, where the lock is on the GlobalListLock (the same m_locals). This is required to update m_tailList. Also, this lock is used, as you can guess from the name, wherever a lock is needed on the entire collection, that is, in CopyTo, ToArray, GetEnumerator, Count, IsEmpty via the FreezeBag and UnFreezeBag methods.

Also, when creating, we first try to find ThreadLocalList without an owner, that is, the thread that used this collection and died the death of the brave. We find such a list, if any, and assign a reference to the current thread to the m_ownerThread field.

Search unused list
private ThreadLocalList GetUnownedList() { ThreadLocalList currentList = m_headList; while (currentList != null) { if (currentList.m_ownerThread.ThreadState == System.Threading.ThreadState.Stopped) { currentList.m_ownerThread = Thread.CurrentThread; return currentList; } currentList = currentList.m_nextList; } return null; } 





Adding an item to ThreadLocalList

The second step is to add an item to the ThreadLinkedList. It is added to the standard "head" without locks. However, if the number of elements in ThreadLocalList is less than two, then a lock is imposed on the current sheet instance when the element is added, since in this case data loss is possible. This is due to the fact that another thread at this time can take data from the ThreadLocalList of the current thread (stealing thread).

Getting item from collection


When a thread wants to take an item from a collection, it first goes to its ThreadLocalList and if it is not empty, it takes an item from the head of the linked list. If the local storage is empty, it goes through m_nextList through all the storages of other threads and searches for a non-empty list. If found, then "steals" (steal) element from there. Moreover, he must “steal” an element without confusing and preventing the owner from adding the element correctly to the flow. There is a crucial point. If we take an item from a coherent list of another thread, then we take it not from the “head”, but from the “tail”. That is, if there are more than two elements in a linked list, then the stream can steal an element without blocking the entire sheet. That is, in this case, the “race condition” is impossible, since there is at least one intermediate element between the element being added and the element to be taken.



If the elements are less than two, then you can’t just add without blocking, just as if there are less than three elements, then you can’t take the element without synchronization. The following will show an example of adding and removing items, when these operations are performed on empty vaults, and when there are two items.

Testing thread performance with your ThreadLocalList

Code for testing (option 1)
 Task task1, task2, task3; ConcurrentBag<int> bagInt = new ConcurrentBag<int>(); int inputSize = 100 * 1024 * 1024; int[] inputDataInt = new int[inputSize]; for (var i = 0; i < inputSize; i++) { inputDataInt[i] = i; } Stopwatch sw = new Stopwatch(); sw.Start(); task1 = Task.Factory.StartNew(() => { int outInt; for (var i = 0; i < inputSize; i++) { bagInt.Add(inputDataInt[i]); bagInt.TryTake(out outInt); } }); task2 = Task.Factory.StartNew(() => { int outInt; for (var i = 0; i < inputSize; i++) { bagInt.Add(inputDataInt[i]); bagInt.TryTake(out outInt); } }); task3 = Task.Factory.StartNew(() => { int outInt; for (var i = 0; i < inputSize; i++) { bagInt.Add(inputDataInt[i]); bagInt.TryTake(out outInt); } }); Task.WaitAll(task1, task2, task3); sw.Stop(); 


Code for testing (option 2)
 Task task1, task2, task3; ConcurrentBag<int> bagInt = new ConcurrentBag<int>(); int inputSize = 100 * 1024 * 1024; int[] inputDataInt = new int[inputSize]; for (var i = 0; i < inputSize; i++) { inputDataInt[i] = i; } Stopwatch sw = new Stopwatch(); sw.Start(); task1 = Task.Factory.StartNew(() => { bagInt.Add(-2); bagInt.Add(-1); int outInt; for (var i = 0; i < inputSize; i++) { bagInt.Add(inputDataInt[i]); bagInt.TryTake(out outInt); } }); task2 = Task.Factory.StartNew(() => { bagInt.Add(-2); bagInt.Add(-1); int outInt; for (var i = 0; i < inputSize; i++) { bagInt.Add(inputDataInt[i]); bagInt.TryTake(out outInt); } }); task3 = Task.Factory.StartNew(() => { bagInt.Add(-2); bagInt.Add(-1); int outInt; for (var i = 0; i < inputSize; i++) { bagInt.Add(inputDataInt[i]); bagInt.TryTake(out outInt); } }); Task.WaitAll(task1, task2, task3); sw.Stop(); 


In this example, there are three threads, each one enters and immediately takes the element n-times.
In this example, all threads will only work with their ThreadLocalList.
In the second case, before performing these operations, we will add two elements in each stream to the local list. And it turns out that all threads will change the size of their list from 2 to 3 and back.

An int array of size 100 * 1024 * 1024.
On an empty collection (option 1) - 16 seconds,
In the local storage, first two elements were added (option 2) - 12 seconds.



ThreadLocalList has a m_currentOp property, indicating the current operation that is being performed on the collection (None, Add, Take). However, during the operation it is reset to None, if the number of elements is less than 2 or 3 to add and take, respectively (then lock to the list is performed).
When a thread wants to steal an item from the list of another thread, it first waits until the current operation becomes None. This is done using SpinWait .

 SpinWait spinner = new SpinWait(); while (list.m_currentOp != (int)ListOperation.None) { spinner.SpinOnce(); } 

Blocking on add and take occurs not only when the number of elements is less than 2-3, but also when the field m_needSync = true. It indicates that the entire collection has been locked. When the entire collection is locked, a lock is also iteratively applied to the entire ThreadLocalList of all the threads.

Conclusion


Summing up, I would like to note two basic principles:

1) Each thread tries to work only with its part of the repository;
2) Even if the stream does not find data in itself, we try to avoid blocking the local list of data from another stream when we “steal” data from it.

In English, Simon Cooper rather briefly and well described all the basic principles in the article.
Inside the Concurrent Collections: ConcurrentBag .

Source: https://habr.com/ru/post/241706/


All Articles