📜 ⬆️ ⬇️

Concurrency structure in .net. ConcurrentQueue from the inside

ConcurrentQueue can be attributed to lock-free concurrent data structures. In its implementation, there are no locks (lock, Mutex ...) and it is implemented using:
- the classic function CompareExchange;
- SpinWait
- volatile (used as a memory-barrier)
The ConcurrentQueue is based on a ring-buffer structure ( ring buffer ).


Ring-buffer (ring buffer)


The ring buffer is ideally suited for implementing a queue data structure (FIFO).

It is based on an array of data and 2 pointers - the beginning (start) and end (end).

There are two main operations:
  1. Push - add to the end. When a new element is added to the buffer, the end counter is incremented by 1 and a new element is written in its place. If we “have rested” on the upper border of the array, then the value of end is reset to zero (“moves” to the beginning of the array) and the elements begin to be written to the beginning of the array. Recording is possible until the end index reaches the start index.
  2. Pop - select items first. The selection of elements occurs from the start element, successively increasing its value, until it reaches the end. Sampling is possible until the start index reaches the end index.


Block ring buffer


The ConcurrentQueue device is slightly more complicated than the classic ring buffer. In its implementation the concept of a segment (Segment) is used. A concurrentQueue consists of a linked list of (unidirectional) segments. Segment size is 32.
private class Segment { volatile VolatileBool[] m_state; volatile T[] m_array; volatile int m_low; volatile int m_high; volatile Segment m_next; } 

Initially 1 segment is created in the ConcurrentQueue

As required, new segments are added to the right.


The result is a unidirectional linked list. The beginning of the linked list is m_head, the end is m_tail. Limitations:

Add item (Enqueue)


Below is an approximate algorithm for adding items to a segment.

 index = Interlocked.Increment(ref this.m_high); if (index <= 31) { m_array[index] = value; m_state[index].m_value = true; } 

m_state is an array of cell states; if true, the element is written to the cell; if false, not yet. In fact, this is a kind of “commit” record. It is necessary so that between the operations of increasing the Interlocked.Increment index and the entry of the value m_array [index] = value there is no reading of the element by another thread. Then the data will be read after the execution:
 while (!this.m_state[index].m_value) spinWait2.SpinOnce(); 

')

Adding a new segment (Segment.Grow)


As soon as m_high of the current segment becomes equal to 31, the recording in the current segment stops and a new segment is created (the current segments continue to live their own lives).
 m_next = new ConcurrentQueue<T>.Segment(this.m_index + 1L, this.m_source); m_source.m_tail = this.m_next; 

m_next - link to the next segment
m_source.m_tail - link last segment of the list of segments.

Item selection (TryDequeue)


The basis of the selection of elements from the queue are two basic functionalities:

Approximate sampling algorithm:
  1. Get m_low
  2. Increase m_low by 1, using CompareExchange
  3. If m_low is greater than 31, go to the next segment.
  4. Wait for the commit (m_state [low] .m_value) of the element with the index m_low.

 SpinWait spinWait1 = new SpinWait(); int low = this.Low; if (Interlocked.CompareExchange(ref this.m_low, low + 1, low) == low) { SpinWait spinWait2 = new SpinWait(); while (!this.m_state[low].m_value) spinWait2.SpinOnce(); result = this.m_array[low]; 


Count vs IsEmpty


IsEmpty code:
 ConcurrentQueue<T>.Segment segment = this.m_head; if (!segment.IsEmpty) return false; if (segment.Next == null) return true; SpinWait spinWait = new SpinWait(); for (; segment.IsEmpty; segment = this.m_head) { if (segment.Next == null) return true; spinWait.SpinOnce(); } return false; 

Those. in essence, it is to find the first non-empty segment. If it is found, the queue is not empty.

Code Count:
 ConcurrentQueue<T>.Segment head; ConcurrentQueue<T>.Segment tail; int headLow; int tailHigh; this.GetHeadTailPositions(out head, out tail, out headLow, out tailHigh); if (head == tail) return tailHigh - headLow + 1; return 32 - headLow + 32 * (int) (tail.m_index - head.m_index - 1L) + (tailHigh + 1); 

In fact, it searches for the first and the last segment and, based on these two segments, calculates the number of elements.
Conclusion - the Count operation will take up more CPU time than IsEmpty.

Snapshot & GetEnumerator


The ConcurrentQueue structure supports snapshot technology to get a complete set of elements.
Holistic data returns:

Operators above also work without locks, and integrity is achieved by introducing a counter
 volatile int m_numSnapshotTakers 
within the entire queue - the number of operations that work with snapshots at the current time. Those. Each operation that wants to get a complete picture must implement the following code:
 Interlocked.Increment(ref this.m_numSnapshotTakers); try { ...//    } finally { Interlocked.Decrement(ref this.m_numSnapshotTakers); } 

In addition to this, only the operation Dequeue “writes” to us, therefore only it verifies the need to delete the link to the queue element:
 if (this.m_source.m_numSnapshotTakers <= 0) this.m_array[low] = default (T); 

Source: https://habr.com/ru/post/245837/


All Articles