📜 ⬆️ ⬇️

Simple lock-free objects for two threads

There were many articles about universal lock-free objects, however, for some special cases they are unnecessarily cumbersome. My case was just this: it was necessary to organize one-way transmission of information from one stream to another. The main thread is started by the worker, after which he can only request that he stop and no longer manage it. In turn, the workflow can notify the principal of its current state (progress), as well as send intermediate results of execution. It turns out that only data transfer from the worker to the main stream is required.

Of course, maybe I invented a bicycle or, worse, a bicycle with glitches. Therefore, comments and criticism are very welcome!

State object


The state of our workflow is represented as a class. At the same time, the main thread is not obliged to always take data stored in the state object (for example, it doesn’t matter if the main thread misses the intermediate value of the progress, it is important for it to get the latest actual at the moment).

To implement a lock-free state transfer, we need three instances of it (different objects of the same class):
')
var ReadItem: TLockFreeWorkState; CurrentItem: TLockFreeWorkState; WriteItem: TLockFreeWorkState; 

The idea is this: the workflow has free access to the WriteItem object. When all data is saved, the InterlockedExchange operation is performed with an object in CurrentItem, after which the main thread is somehow notified of the readiness of the new state (in my example, the usual PostMessage is used). The main thread in the notification handler performs the InterlockedExchange operation of the CurrentItem object with the ReadItem object, after which it can freely read data from the ReadItem.

It turns out such a "bubble": the state data appears in the WriteItem and then "pops up" through CurrentItem in ReadItem. By the way, I did not come up with a normal name for the base class of such a structure, so I simply called TLockFreeWorkState (maybe someone will have a better idea).

There is one caveat: the main thread can apply for the current state at any time. If we always perform InterlockedExchange, we will alternately return the current and previous state.

The regular Newest flag in the classroom will help us to prevent this. When writing a state, the worker thread always sets WriteItem.Newest: = True, and after InterlockedExchange, this flag appears in CurrentItem. The main thread at the beginning checks CurrentItem.Newest and, only if it is True, does InterlockedExchange, after which it immediately resets ReadItem.Newest to False. I figured that reading CurrentItem.Newest from the main thread is safe, but correct me if it’s not right.

Now all this is in the form of a simplified code (the speculation of types is omitted for greater clarity)

 type TLockFreeWorkState = class public Newest: Boolean; end; function Read(var CurrentItem, ReadItem: TLockFreeWorkState): Boolean; begin if CurrentItem.Newest then begin ReadItem := InterlockedExchangePointer(CurrentItem, ReadItem); ReadItem.Newest := False; Result := True; end else Result := False; end; procedure Write(var CurrentItem, WriteItem: TLockFreeWorkState); begin WriteItem.Newest := True; WriteItem := InterlockedExchangePointer(CurrentItem, WriteItem); end; 

Queue object


In some ways, the approach is similar, but for implementation we need initially only one object, but two links to it:

 var ReadQueue: TLockFreeWorkQueue; WriteQueue: TLockFreeWorkQueue; 

Initially, a single TLockFreeWorkQueue instance is created and written to the ReadQueue and WriteQueue variables. The class is a circular buffer and has the following description:

  TLockFreeWorkQueue = class public Head: Integer; Tail: Integer; Items: array[0..QueueCapacity - 1] of TObject; end; 

where QueueCapacity is some constant (greater than zero) that defines the length of the ring buffer.

When adding an item to a queue, the worker thread performs the InterlockedExchangeComparePointer WriteQueue.Items [Tail] element. In this case, the element is compared with Nil and, if successful, the added element is written to it. If the operation is successful, the Tail value is incremented by 1 and reset to 0 if QueueCapacity is reached. We are free to operate with Tail, since only the workflow (the flow-writer) has access to this variable. Also after this, the worker thread must notify the principal that there are elements in the queue. If the operation fails, it means that the queue is full, but more on that later.

The main thread on the notification from the worker begins the cycle of reading items from the queue (in fact, you can start reading at any time). To retrieve an element, call InterlockedExchangePointer for the element ReadQueue.Items [Head] where the value Nil is written. If the extracted element is not Nil, then the Head value is incremented by 1 and reset to 0 if it reaches the QueueCapacity.

Now let's deal with the case of buffer overflow. For new items, we may well create a new queue object and continue to write to it, and in order for this object to be found by the stream reader, we must pass a link to it in the current queue object. To do this, add an additional NextQueue field to the class:

  TLockFreeWorkQueue = class public Head: Integer; Tail: Integer; Items: array[0..QueueCapacity - 1] of TObject; NextQueue: TLockFreeWorkQueue; end; 

Now if InterlockedExchangeComparePointer recording element returns not Nil (queue is full), then create a new queue object NewWriteQueue: TLockFreeWorkQueue, write element added therein, perform InterlockedExchangePointer variable WriteQueue.NextQueue and retain the end NewWriteQueue variable WriteQueue. Thus, after this operation, the values ​​in ReadQueue and WriteQueue will already refer to different objects.

In the main thread, we need to add processing to an empty queue. If when reading InterlockedExchangePointer for the ReadQueue.Items [Head] element returns Nil, then we need to additionally check the NextQueue field, for which we also perform InterlockedExchangePointer (ReadQueue.NextQueue, Nil). If this returns non-Nil, then we save the object to NewReadQueue, delete the current ReadQueue object, and set this variable to the value NewReadQueue.

Here is a simplified code for adding an item to a queue:

 procedure AddQueueItem(var WriteQueue: TLockFreeWorkQueue; Item: TObject); var NewWriteQueue: TLockFreeWorkQueue; begin if InterlockedCompareExchangePointer(WriteQueue.Items[WriteQueue.Tail]), Item, Nil) = Nil then begin // Added successfully Inc(WriteQueue.Tail); if WriteQueue.Tail = QueueCapacity then WriteQueue.Tail := 0; end else begin // WriteQueue full. Create new chained queue. NewWriteQueue := TLockFreeWorkQueue.Create; NewWriteQueue.Items[0] := Item; Inc(NewWriteQueue.Tail); if NewWriteQueue.Tail = QueueCapacity then // Check single-item queue NewWriteQueue.Tail := 0; InterlockedExchangePointer(WriteQueue.NextQueue, NewWriteQueue); WriteQueue := NewWriteQueue; end; end; 

and retrieving item from queue:

 function ExtractQueueItem(var ReadQueue: TLockFreeWorkQueue): TObject; var NewReadQueue: TLockFreeWorkQueue; begin Result := Nil; repeat Result := InterlockedExchangePointer(ReadQueue.Items[ReadQueue.Head], Nil); if Result = Nil then begin // No new items in this queue. Check next queue is available NewReadQueue := InterlockedExchangePointer(ReadQueue.NextQueue, Nil); if Assigned(NewReadQueue) then begin ReadQueue.Free; ReadQueue := NewReadQueue; end else // No new item in queue Exit; end; until Result <> Nil; // Item extracted successfully Inc(ReadQueue.Head); if ReadQueue.Head = QueueCapacity then ReadQueue.Head := 0; end; 

In this code, I probably reinsured somewhat. Not sure if you need to use InterlockedExchangePointer for operations with the NextQueue field, it may be safe to perform direct reading and writing.

Test case


The working and combed code along with a simple console example can be viewed under the spoiler.

Test case
 program LockFreeTest; {$APPTYPE CONSOLE} {$R *.res} uses SysUtils, Classes, Windows, Messages; // Lock-free work thread state //////////////////////////////////////////////// type TLockFreeWorkState = class protected FNewest: Boolean; public class function Read(var CurrentItem, ReadItem): Boolean; class procedure Write(var CurrentItem, WriteItem); property Newest: Boolean read FNewest write FNewest; end; class function TLockFreeWorkState.Read(var CurrentItem, ReadItem): Boolean; begin if TLockFreeWorkState(CurrentItem).Newest then begin pointer(ReadItem) := InterlockedExchangePointer(pointer(CurrentItem), pointer(ReadItem)); TLockFreeWorkState(ReadItem).Newest := False; Result := True; end else Result := False; end; class procedure TLockFreeWorkState.Write(var CurrentItem, WriteItem); begin TLockFreeWorkState(WriteItem).Newest := True; pointer(WriteItem) := InterlockedExchangePointer(pointer(CurrentItem), pointer(WriteItem)); end; // Lock-free work thread queue //////////////////////////////////////////////// type TLockFreeWorkQueue = class public const QueueCapacity = 4; // Small value for test purposes public type TLockFreeWorkQueueItems = array[0..QueueCapacity - 1] of TObject; public Head: Integer; // Access from main thread only Tail: Integer; // Access from work thread only NextQueue: TLockFreeWorkQueue; Items: TLockFreeWorkQueueItems; public destructor Destroy; override; class procedure Add(var WriteQueue: TLockFreeWorkQueue; Item: TObject); static; class function Extract(var ReadQueue: TLockFreeWorkQueue): TObject; static; end; destructor TLockFreeWorkQueue.Destroy; var i: Integer; begin // Free non-extracted items for i := 0 to QueueCapacity - 1 do Items[i].Free; // Free NextQueue if exists NextQueue.Free; inherited; end; class procedure TLockFreeWorkQueue.Add(var WriteQueue: TLockFreeWorkQueue; Item: TObject); var NewWriteQueue: TLockFreeWorkQueue; begin // Check item assigned (can't add empty items) if not Assigned(Item) or not Assigned(WriteQueue) then Exit; if InterlockedCompareExchangePointer(pointer(WriteQueue.Items[WriteQueue.Tail]), pointer(Item), Nil) = Nil then begin // Added successfully Inc(WriteQueue.Tail); if WriteQueue.Tail = QueueCapacity then WriteQueue.Tail := 0; end else begin // WriteQueue full. Create new chained queue. NewWriteQueue := TLockFreeWorkQueue.Create; NewWriteQueue.Items[0] := Item; Inc(NewWriteQueue.Tail); if NewWriteQueue.Tail = QueueCapacity then // Check single-item queue NewWriteQueue.Tail := 0; InterlockedExchangePointer(pointer(WriteQueue.NextQueue), NewWriteQueue); WriteQueue := NewWriteQueue; end; end; class function TLockFreeWorkQueue.Extract(var ReadQueue: TLockFreeWorkQueue): TObject; var NewReadQueue: TLockFreeWorkQueue; begin Result := Nil; if not Assigned(ReadQueue) then Exit; repeat Result := InterlockedExchangePointer(pointer(ReadQueue.Items[ReadQueue.Head]), Nil); if Result = Nil then begin // No new items in this queue. Check next queue is available NewReadQueue := InterlockedExchangePointer(pointer(ReadQueue.NextQueue), Nil); if Assigned(NewReadQueue) then begin ReadQueue.Free; ReadQueue := NewReadQueue; end else // No new item in queue Exit; end; until Result <> Nil; // Item extracted successfully Inc(ReadQueue.Head); if ReadQueue.Head = QueueCapacity then ReadQueue.Head := 0; end; // Test work thread /////////////////////////////////////////////////////////// const WM_MAINNOTIFY = WM_USER + 1; type TWorkThreadState = class(TLockFreeWorkState) public Progress: Integer; end; TWorkThreadQueueItem = class public ItemData: Integer; end; TWorkThread = class(TThread) protected FMainHandle: THandle; FMainNotified: Integer; // State fields FStateRead: TWorkThreadState; FStateCurrent: TWorkThreadState; FStateWrite: TWorkThreadState; // Queue fields FQueueRead: TLockFreeWorkQueue; FQueueWrite: TLockFreeWorkQueue; // Debug (test) fiels FDebugReadQueue: Boolean; procedure Execute; override; procedure SetState; procedure AddQueueItem(Item: TWorkThreadQueueItem); procedure NotifyMain; public constructor Create(CreateSuspended: Boolean); destructor Destroy; override; function GetState: TWorkThreadState; function ExtractQueueItem: TWorkThreadQueueItem; procedure NotificationProcessed; property MainHandle: THandle read FMainHandle; end; constructor TWorkThread.Create(CreateSuspended: Boolean); begin inherited Create(CreateSuspended); // State objects FStateRead := TWorkThreadState.Create; FStateCurrent := TWorkThreadState.Create; FStateWrite := TWorkThreadState.Create; // Queue objects FQueueRead := TLockFreeWorkQueue.Create; FQueueWrite := FQueueRead; end; destructor TWorkThread.Destroy; begin inherited; FStateRead.Free; FStateCurrent.Free; FStateWrite.Free; // Always destroy read queue only: only read queue may have NextQueue reference FQueueRead.Free; end; procedure TWorkThread.NotifyMain; begin if InterlockedExchange(FMainNotified, 1) = 0 then PostMessage(FMainHandle, WM_MAINNOTIFY, 0, 0); end; procedure TWorkThread.NotificationProcessed; begin InterlockedExchange(FMainNotified, 0); end; function TWorkThread.GetState: TWorkThreadState; begin TLockFreeWorkState.Read(FStateCurrent, FStateRead); Result := FStateRead; end; procedure TWorkThread.SetState; begin TLockFreeWorkState.Write(FStateCurrent, FStateWrite); end; procedure TWorkThread.AddQueueItem(Item: TWorkThreadQueueItem); begin TLockFreeWorkQueue.Add(FQueueWrite, Item); end; function TWorkThread.ExtractQueueItem: TWorkThreadQueueItem; begin Result := TWorkThreadQueueItem(TLockFreeWorkQueue.Extract(FQueueRead)); end; procedure TWorkThread.Execute; const TestQueueCountToFlush = 10; var ProgressIndex: Integer; TestQueueCount: Integer; Item: TWorkThreadQueueItem; begin TestQueueCount := 0; ProgressIndex := 0; while not Terminated do begin // Send current progress if FStateWrite.Progress <> ProgressIndex then begin // All state object fields initialization required FStateWrite.Progress := ProgressIndex; SetState; NotifyMain; end; // Emulate calculation Sleep(500); Inc(ProgressIndex); // Put intermediate result in queue Item := TWorkThreadQueueItem.Create; Item.ItemData := ProgressIndex; AddQueueItem(Item); Inc(TestQueueCount); if TestQueueCount = TestQueueCountToFlush then begin TestQueueCount := 0; // Allow queue reading from main thread FDebugReadQueue := True; NotifyMain; end; end; end; // Test application /////////////////////////////////////////////////////////// type TMain = class protected FHandle: THandle; FThread: TWorkThread; procedure WndProc(var Message: TMessage); public constructor Create; destructor Destroy; override; function Run: Boolean; property Handle: THandle read FHandle; end; var Main: TMain; constructor TMain.Create; begin FHandle := AllocateHWnd(WndProc); FThread := TWorkThread.Create(True); FThread.FMainHandle := Handle; FThread.Start; writeln('Work thread started'); end; destructor TMain.Destroy; begin writeln('Stopping work thread...'); FThread.Free; writeln('Work thread stopped'); DeallocateHWnd(FHandle); inherited; end; procedure TMain.WndProc(var Message: TMessage); var State: TWorkThreadState; Item: TWorkThreadQueueItem; begin if Message.Msg = WM_MAINNOTIFY then begin FThread.NotificationProcessed; State := FThread.GetState; // Show current progress writeln('Work progress ', State.Progress); // Check queue reading allowed if FThread.FDebugReadQueue then begin writeln('Read queue...'); repeat Item := FThread.ExtractQueueItem; try if Assigned(Item) then writeln('Queue item: ', Item.ItemData); finally Item.Free; end; until not Assigned(Item); FThread.FDebugReadQueue := False; end; end else Message.Result := DefWindowProc(Handle, Message.Msg, Message.wParam, Message.lParam); end; function TMain.Run: Boolean; var Msg: TMsg; begin writeln('Start message loop (Ctrl+C to break)'); Result := True; while Result do case Integer(GetMessage(Msg, Handle, 0, 0)) of 0: Break; -1: Result := False; else begin TranslateMessage(Msg); DispatchMessage(Msg); end; end; end; // Console event handler ////////////////////////////////////////////////////// function ConsoleEventProc(CtrlType: DWORD): BOOL; stdcall; begin Result := False; case CtrlType of CTRL_CLOSE_EVENT, CTRL_C_EVENT, CTRL_BREAK_EVENT: if Assigned(Main) then begin PostMessage(Main.Handle, WM_QUIT, 0, 0); Result := True; end; end; end; // Main procedure ///////////////////////////////////////////////////////////// begin {$IFDEF DEBUG} ReportMemoryLeaksOnShutdown := True; {$ENDIF} try SetConsoleCtrlHandler(@ConsoleEventProc, True); Main := TMain.Create; try Main.Run; finally FreeAndNil(Main); end; except on E: Exception do Writeln(E.ClassName, ': ', E.Message); end; end. 


In a normal situation, when an element appears in the queue, it should be extracted by the main thread as soon as possible. However, to test the queue overflow, I added the TWorkThread.FDebugReadQueue field, which, if set to False, prevents the main thread from reading from the queue (in the TWorkThread.Execute method, entered the constant TestQueueCountToFlush = 10, which allows the main thread to read only after 10 added elements).

Unfortunately, the test case is too simple and does not generate read / write collisions between threads, when stream switching occurs within the read / write utility functions. But here I am not sure whether it is possible to check all the bottlenecks of the algorithm at all and what the code for this should be turned into.

Source: https://habr.com/ru/post/245889/


All Articles