template <typename T> struct tagged_ptr { T * ptr ; unsigned int tag ; tagged_ptr(): ptr(nullptr), tag(0) {} tagged_ptr( T * p ): ptr(p), tag(0) {} tagged_ptr( T * p, unsigned int n ): ptr(p), tag(n) {} T * operator->() const { return ptr; } };
boost.lockfree
.
boost.lockfree
library.
MSQueue
[MS98] with tagged pointer. Yes, lock-free algorithms are very verbose!
std:atomic
.
template <typename T> struct node { tagged_ptr next; T data; } ; template <typename T> class MSQueue { tagged_ptr<T> volatile m_Head; tagged_ptr<T> volatile m_Tail; FreeList m_FreeList; public: MSQueue() { // dummy node // Head & Tail dummy node m_Head.ptr = m_Tail.ptr = new node(); } }; void enqueue( T const& value ) { E1: node * pNode = m_FreeList.newNode(); E2: pNode–>data = value; E3: pNode–>next.ptr = nullptr; E4: for (;;) { E5: tagged_ptr<T> tail = m_Tail; E6: tagged_ptr<T> next = tail.ptr–>next; E7: if tail == m_Tail { // Tail ? E8: if next.ptr == nullptr { // E9: if CAS(&tail.ptr–>next, next, tagged_ptr<T>(node, next.tag+1)) { // , E10: break; } E11: } else { // Tail // tail E12: CAS(&m_Tail, tail, tagged_ptr<T>(next.ptr, tail.tag+1)); } } } // end loop // tail E13: CAS(&m_Tail, tail, tagged_ptr<T>(pNode, tail.tag+1)); } bool dequeue( T& dest ) { D1: for (;;) { D2: tagged_ptr<T> head = m_Head; D3: tagged_ptr<T> tail = m_Tail; D4: tagged_ptr<T> next = head–>next; // Head, tail next ? D5: if ( head == m_Head ) { // Queue tail ? D6: if ( head.ptr == tail.ptr ) { // ? D7: if (next.ptr == nullptr ) { // D8: return false; } // Tail // tail D9: CAS(&m_Tail, tail, tagged_ptr<T>(next.ptr, tail.tag+1>)); D10: } else { // Tail // CAS, // dequeue next D11: dest = next.ptr–>data; // head D12: if (CAS(&m_Head, head, tagged_ptr<T>(next.ptr, head.tag+1)) D13: break // , } } } // end of loop // dummy node D14: m_FreeList.add(head.ptr); D15: return true; // – dest }
dequeue
from an empty queue). Such “slotting” with the help of a cycle is a typical method of lock-free programming.
m_Head
) is a dummy node. The presence of a dummy element ensures that the pointers to the beginning and end of the queue will never be NULL
. A sign of void queue is the condition m_Head == m_Tail && m_Tail->next == NULL
(lines D6-D8). The last condition ( m_Tail->next == NULL
) is significant, because in the process of adding to the queue we do not change m_Tail
, the line E9 only changes m_Tail->next
. Thus, at first glance, the enqueue
method violates the queue structure. In fact, changing the tail of m_Tail
occurs in another method and / or another thread: the enqueue
operation enqueue
for the addition of an element (line E8) that m_Tail
points to the last element (that is, m_Tail->next == NULL
), and if it is not so, trying to move the pointer to the end (line E12); similarly, the dequeue
operation dequeue
before performing its “immediate duties” if it does not indicate the end of the queue (line D9). This shows us a common approach in lock-free programming - mutual aid flows ( helping ): the algorithm of one operation is “smeared” across all container operations and one operation relies heavily on the fact that its work will be completed by the next call (possibly another) of operation in ( possible) another thread.
m_Head
and m_Tail
must be declared in C ++ 11 as atomic
(in pseudocode - volatile
). In addition, recall that the CAS primitive checks the value of the target address with the given one, and if they are equal, changes the data on the target address to a new value. Therefore, a local copy of the current value should always be specified for a CAS primitive; calling CAS(&val, val, newVal)
will almost always succeed, which is an error for us.
dequeue
is copied in the dequeue
method (line D11): before removing an item from the queue (line D12). Given that the elimination of an element (promotion of m_Head
in line D12) can be unsuccessful, data copying (D11) can be repeated. From the point of view of C ++, this means that the data stored in the queue should not be too complicated, otherwise the overhead of the assignment operator in line D11 will be too large. Accordingly, under high load conditions, the probability of failure of the CAS primitive increases. Attempting to “optimize” the algorithm by moving the string D11 outside the cycle limits will result in an error: the next
element can be removed by another thread. Since the queue implementation in question is based on the tagged pointer scheme, in which elements are never deleted, such an “optimization” will result in the fact that we can return incorrect data (not those that were in the queue at the time of successful execution of the D12 line).
MSQueue
algorithm MSQueue
interesting in that m_Head
always points to a dummy node, and the first element of a non-empty queue is the element following m_Head
. With dequeue
value of the first element is read from a non-empty queue, that is, m_Head.next
, the dummy element is deleted, and the next element, that is, the one whose value we return, becomes the new dummy element (and new head). It turns out that physically removing an element is possible only after the next dequeue
operation .
cds::intrusive::MSQueue
.
nGlobalEpoch
, and each thread operates in its own local epoch of nThreadEpoch
. When entering a protected epoch-based code, the local epoch of the stream is incremented if it does not exceed the global epoch. Once all threads have reached the global era, nGlobalEpoch
incremented.
// static atomic<unsigned int> m_nGlobalEpoch := 1 ; const EPOCH_COUNT = 3 ; // TLS data struct ThreadEpoch { // unsigned int m_nThreadEpoch ; // List<void *> m_arrRetired[ EPOCH_COUNT ] ; ThreadEpoch(): m_nThreadEpoch(1) {} void enter() { if ( m_nThreadEpoch <= m_nGlobalEpoch ) m_nThreadEpoch = m_nGlobalEpoch + 1 ; } void exit() { if ( , m_nGlobalEpoch ) { ++m_nGlobalEpoch ; (delete) m_arrRetired[ (m_nGlobalEpoch – 1) % EPOCH_COUNT ] ; } } } ;
m_arrRetired[m_nThreadEpoch % EPOCH_COUNT]
. As soon as all the streams have passed through the global m_nGlobalEpoch
epoch, all lists of all streams of the m_nGlobalEpoch – 1
epoch can be freed, and the global epoch itself can be incremented.
Please correct a small error in the article “Lock-free data structures. Inside Memory management schemes ”- in the“ Epoch-based reclamation ”section, in the exit () function, you need to replace m_arrRetired [(m_nGlobalEpoch - 2)% EPOCH_COUNT] with m_arrRetired [(m_nGlobalEpoch - 1)% EPOCH_COUNT]. At this moment, the local epoch for threads can be either m_nGlobalEpoch (for those threads that have already entered enter (), or m_nGlobalEpoch + 1 (for streams that are again included in the critical section), and the generation of m_nGlobalEpoch - 1 can be safely released.
ThreadEpoch::enter()
and ThreadEpoch::exit()
, which is very similar to the critical section:
lock_free_op( … ) { get_current_thread()->ThreadEpoch.enter() ; . . . // lock-free . // “ ” epoch-based , // , , // . . . . get_current_thread()->ThreadEpoch.exit() ; }
HP
hazard-pointer array of the current stream. The HP
array is private to the stream: only the stream owner is written to it, all threads can read (in the Scan
procedure). If you carefully analyze the operations of different lock-free containers, you will notice that the size of the HP
array (the number of hazard-pointers to one stream) does not exceed 3 to 4, so the overhead of supporting the scheme is small.
cds::container::SkipListMap
) is a probabilistic data structure, in fact, a list of lists, with a variable height of each element. Such containers are not very suitable for the Hazard Pointer schema, although libcds has a skip-list implementation for this schema.
// // P : // K : hazard pointer // N : hazard pointers = K*P // R : batch size, RN=Ω(N), , R=2*N // Per-thread : // Hazard Pouinter // - // void * HP[K] // dlist ( 0..R) unsigned dcount = 0; // void* dlist[R]; // // dlist void RetireNode( void * node ) { dlist[dcount++] = node; // – Scan if (dcount == R) Scan(); } // // dlist, // Hazard Pointer void Scan() { unsigned i; unsigned p=0; unsigned new_dcount = 0; // 0 .. N void * hptr, plist[N], new_dlist[N]; // Stage 1 – HP // plist for (unsigned t=0; t < P; ++t) { void ** pHPThread = get_thread_data(t)->HP ; for (i = 0; i < K; ++i) { hptr = pHPThread[i]; if ( hptr != nullptr ) plist[p++] = hptr; } } // Stage 2 – hazard pointer' // sort(plist); // Stage 3 – , hazard for ( i = 0; i < R; ++i ) { // dlist[i] plist Hazard Pointer' // dlist[i] if ( binary_search(dlist[i], plist)) new_dlist[new_dcount++] = dlist[i]; else free(dlist[i]); } // Stage 4 – . for (i = 0; i < new_dcount; ++i ) dlist[i] = new_dlist[i]; dcount = new_dcount; }
RetireNode(pNode)
of the pNode
lock-free element is pNode
stream j
places pNode
in a local (for stream j
) list of dlist
deferred (to be deleted) elements. As soon as the size of the dlist
reaches R ( R is comparable to N = P*K
, but more than N ; for example, R = 2N
), the Scan()
procedure is called, which deals with the removal of deferred elements. The condition R > P*K
is essential: only if this condition is met, it is guaranteed that Scan()
will be able to remove something from the array of pending data. If this condition is violated, Scan()
can remove nothing from the array, and we get an algorithm error — the array is completely filled, but it cannot be reduced.
Scan()
consists of four stages.
plist
array of current hazard-pointers, which includes all non- null
hazard-pointers of all streams. Only the first stage reads shared data — arrays of HP
streams — the remaining stages work only with local data.plist
array to optimize the subsequent search; here you can also remove plist
elements from plist
.dlist
array of the current stream are viewed. If the dlist[i]
element is in plist
(that is, some thread is working with this pointer, declaring it as a hazard pointer), it cannot be deleted and it remains in the dlist
(transferred to new_dlist
). Otherwise, the dlist[i]
element can be deleted - not a single thread works with it.new_dlist
to dlist
.Since R > N
, the procedure Scan()
will necessarily reduce the size of the array dlist
, that is, some elements will be necessarily removed. std::atomic<T *> atomicPtr ; … T * localPtr ; do { localPtr = atomicPtr.load(std::memory_order_relaxed); HP[i] = localPtr ; } while ( localPtr != atomicPtr.load(std::memory_order_acquire));
atomicPtr
localPtr
( ) HP[i]
HP
hazard- . , , atomicPtr
, , atomicPtr
localPtr
. , HP
( hazard) atomicPtr
. Hazard Pointer' ( hazard), , .
template <typename T> class MSQueue { struct node { std::atomic<node *> next ; T data; node(): next(nullptr) {} node( T const& v): next(nullptr), data(v) {} }; std::atomic<node *> m_Head; std::atomic<node *> m_Tail; public: MSQueue() { node * p = new node; m_Head.store( p, std::memory_order_release ); m_Tail.store( p, std::memory_order_release ); } void enqueue( T const& data ) { node * pNew = new node( data ); while (true) { node * t = m_Tail.load(std::memory_order_relaxed); // hazard. HP – thread-private HP[0] = t; // , m_Tail ! if (t != m_Tail.load(std::memory_order_acquire) continue; node * next = t->next.load(std::memory_order_acquire); if (t != m_Tail) continue; if (next != nullptr) { // m_Tail // m_Tail m_Tail.compare_exchange_weak( t, next, std::memory_order_release); continue; } node * tmp = nullptr; if ( t->next.compare_exchange_strong( tmp, pNew, std::memory_order_release)) break; } m_Tail.compare_exchange_strong( t, pNew, std::memory_order_acq_rel ); HP[0] = nullptr; // hazard pointer } bool dequeue(T& dest) { while true { node * h = m_Head.load(std::memory_order_relaxed); // Hazard Pointer HP[0] = h; // , m_Head if (h != m_Head.load(std::memory_order_acquire)) continue; node * t = m_Tail.load(std::memory_order_relaxed); node * next = h->next.load(std::memory_order_acquire); // head->next Hazard Pointer HP[1] = next; // m_Head – if (h != m_Head.load(std::memory_order_relaxed)) continue; if (next == nullptr) { // HP[0] = nullptr; return false; } if (h == t) { // enqueue – m_Tail m_Tail.compare_exchange_strong( t, next, std::memory_order_release); continue; } dest = next->data; if ( m_Head.compare_exchange_strong(h, next, std::memory_order_release)) break; } // Hazard Pointers HP[0] = nullptr; HP[1] = nullptr; // // . RetireNode(h); } };
K = 8
P = 100
R = 2 * K * P = 1600
.cds::gc::hzp
). ( T
), . , ( , «» – . , – -, – « ». , , – ).cds::gc::hzp
. (template) - , (- type erasure).cds::gc::HP
. , ( ) lock-free libcds, , ( , ) GC
. cds::gc::HP
– . struct retired_ptr { typedef void (* fnDisposer )( void * ); void * ptr ; // fnDisposer pDisposer; // - retired_ptr( void * p, fnDisposer d): ptr(p), pDisposer(d) {} };
Scan()
HP- pDisposer(ptr)
«» .
pDisposer
. . , :
template <typename T> struct make_disposer { static void dispose( void * p ) { delete reinterpret_cast<T *>(p); } }; template <typename T> void retire_ptr( T * p ) { // p arrRetired // , arrRetired – arrRetired.push( retired_ptr( p, make_disposer<T>::dispose )); // – scan if ( arrRetired.full() ) scan(); }
main()
cds::gc::HP
, , HP-, . , , cds::gc::HP
. API HP-.
cds::gc::HP
– , , .
HP(size_t nHazardPtrCount = 0, size_t nMaxThreadCount = 0, size_t nMaxRetiredPtrCount = 0, cds::gc::hzp::scan_type nScanType = cds::gc::hzp::inplace);
nHazardPtrCount
– hazard pointer' ( K )
nMaxThreadCount
– ( P )
nMaxRetiredPtrCount
– retired- ( R = 2KP
)
nScanType
– : cds::gc::hzp::classic
, Scan
; cds::gc::hzp::inplace
Scan()
new_dlist
dlist
( ).
cds::gc::HP
. , cds::gc::HP
Hazard Pointer, , , .
template <class Disposer, typename T> static void retire( T * p ) ; template <typename T> static void retire( T * p, void (* pFunc)(T *) )
Disposer
( pFunc
) (). :
struct Foo { … }; struct fooDisposer { void operator()( Foo * p ) const { delete p; } }; // myDisposer Foo Foo * p = new Foo ; cds::gc::HP::retire<fooDisposer>( p );
static void force_dispose();
Scan()
Hazard Pointer. , , libcds .
cds::gc::HP
:
thread_gc
– (thread data), Hazard Pointer. , HP- , ,Guard
– hazard pointertemplate <size_t Count> GuardArray
– hazard pointer'. HP- hazard- . , Guard
Guard
GuardArray<N>
Hazard Pointer, . .
Guard
hazard- API:
template <typename T> T protect( CDS_ATOMIC::atomic<T> const& toGuard ); template <typename T, class Func> T protect( CDS_ATOMIC::atomic<T> const& toGuard, Func f );
T
– ) hazard. , : toGuard
, hazard pointer' , .
Func
) , hazard T *
, . , (node), (, node
). Func
:
struct functor { value_type * operator()( T * p ) ; };
template <typename T> T * assign( T * p ); template <typename T, int Bitmask> T * assign( cds::details::marked_ptr<T, Bitmask> p );
protect
, , — p
hazard-.
cds::details::marked_ptr
. marked- 2-3 ( 0 ) , — lock-free . hazard- ( Bitmask
).
template <typename T> T * get() const;
void copy( Guard const& src );
src
this
. hazard- .
void clear();
Guard
.
GuardArray
API, :
template <typename T> T protect(size_t nIndex, CDS_ATOMIC::atomic<T> const& toGuard ); template <typename T, class Func> T protect(size_t nIndex, CDS_ATOMIC::atomic<T> const& toGuard, Func f ) template <typename T> T * assign( size_t nIndex, T * p ); template <typename T, int Bitmask> T * assign( size_t nIndex, cds::details::marked_ptr<T, Bitmask> p ); void copy( size_t nDestIndex, size_t nSrcIndex ); void copy( size_t nIndex, Guard const& src ); template <typename T> T * get( size_t nIndex) const; void clear( size_t nIndex);
CDS_ATOMIC
– ?
std::atomic
. ( ) C++11 atomic
, CDS_ATOMIC
std
. – cds::cxx11_atomics
, , libcds- atomic. libcds boost.atomic
, CDS_ATOMIC
boost
.
void CleanUpNode( Node * pNode); void TerminateNode( Node * pNode);
TerminateNode
pNode
. CleanUpNode
, , pNode
«» ( ) , () ; RefCount- , , CleanUpNode
:
void CleanUpNode(Node * pNode) { for (all x where pNode->link[x] of node is reference-counted) { retry: node1 = DeRefLink(&pNode->link[x]); // HP if (node1 != NULL and !is_deleted( node1 )) { node2 = DeRefLink(node1->link[x]); // HP // , // node1 CompareAndSwapRef(&pNode->link[x],node1,node2); ReleaseRef(node2); // HP ReleaseRef(node1); // HP goto retry; // , } ReleaseRef(node1); // HP } }
CleanUpNode
, , . lock-free , , MultiCAS .
Scan
Hazar Pointers ( , CleanUpNode
). : Hazard Pointers ( R > N = P * K
), Scan
- ( , hazard-), HRC Scan
- ( – ). , Scan
, CleanUpAll
: CleanUpNode
, Scan
.
cds::gc::HRC
. API API cds::gc::HP
. namespace cds::gc::hrc
.
Scan
(, - ) CleanUpAll
, retired-.
Liberate
— Scan
HP-. Liberate
, . HP-, retired- , PTB- .
Liberate
, retired- guard', retired- hand-off guard'. Liberate
hand-off , guard, , , .
Liberate
: wait-free lock-free. Wait-free dwCAS (CAS ), dwCAS . Lock-free , . (guard' retired-) lock-free Liberate
, (, retired-, ). , PTB- , Liberate
.
Liberate
PTB-scheme, making it similar to the HP-scheme. As a result, the implementation of PTB in libcds has become more similar to the variant of the HP scheme with an arbitrary number of hazard-pointers and a single array of retired-data. This affected the performance slightly: a “clean” HP scheme is still slightly faster than PTB, but PTB may be preferable due to the lack of restrictions on the number of guards.
cds::gc::DHP
— HP, — pass-the-buck , — Hazard Pointer .
cds::gc::PTB
, namespace cds::gc::ptb
. API cds::gc::PTB
cds::gc:::HP
, . :
PTB( size_t nLiberateThreshold = 1024, size_t nInitialThreadGuardCount = 8 );
nLiberateThreshold
— Liberate
. retired- , Liberate
nInitialThreadGuardCount
— quard' (, libcds). guard'GC
libcds. , Scan()
/ Liberate()
. — .
Source: https://habr.com/ru/post/202190/