📜 ⬆️ ⬇️

Nonblocking message queue for two threads

Several years ago, while working on my small game project, I had a need to implement the transfer of messages from one thread to another. During the search for solutions, an idea appeared to implement an unblockable queue.

Details under the cut.

Formulation of the problem.
There are 2 streams - the writer (hereinafter the Writer) and the reader (Reader). A queue is needed to send messages from the Writer to the Reader that does not use locks (mutexes or other blocking methods).
')
After some reflection, this solution came out: so that the streams do not interfere with each other, a two-part queue must be used. One part is used by the Writer and the other by the Reader. The algorithm is as follows:
  1. The writer writes a message to his part of the queue and checks the reader's queue. If it is empty, then the Writer's turn is transferred to the Reader, and a new turn is created for the Writer.
  2. The reader checks his part of the queue and if it is not empty, it takes values ​​from it.

I laid out the implementation on the RSDN (the link is given at the end of the article), but after some discussion this topic subsided. And now I would like to share my work here.

Actually code:
#define LOCK_FREE_QUEUE2_USE_FLUSH 1 template <class TYPE> class LockFreeQueue2 { public: LockFreeQueue2(); ~LockFreeQueue2(); //     void Enqueue(TYPE* data); //     bool Dequeue(volatile TYPE*& data); #if LOCK_FREE_QUEUE2_USE_FLUSH //   .   bool Flush(); #else //      void SetWriterFinished(); #endif private: volatile TYPE *readerTop; //    TYPE *writerTop, *writerBottom; //      #if LOCK_FREE_QUEUE2_USE_FLUSH == 0 volatile bool isWriterFinished; #endif }; template<class TYPE> LockFreeQueue2<TYPE>::LockFreeQueue2() { readerTop = writerTop = writerBottom = NULL; #if LOCK_FREE_QUEUE2_USE_FLUSH == 0 isWriterFinished = false; #endif } template<class TYPE> LockFreeQueue2<TYPE>::~LockFreeQueue2() { } template<class TYPE> void LockFreeQueue2<TYPE>::Enqueue(TYPE* data) { data->next = NULL; if (writerTop) //     ,      { writerBottom->next = data; writerBottom = data; } else //    { writerTop = writerBottom = data; } if (!readerTop) //    ,     { readerTop = writerTop; writerTop = NULL; } } template<class TYPE> bool LockFreeQueue2<TYPE>::Dequeue(volatile TYPE*& data) { if (!readerTop) //    { #if LOCK_FREE_QUEUE2_USE_FLUSH == 0 if (isWriterFinished && writerTop) //   ,      { readerTop = writerTop; writerTop = NULL; } else #endif { return false; //   } } //     -  data = readerTop; readerTop = readerTop->next; return true; } #if LOCK_FREE_QUEUE2_USE_FLUSH template<class TYPE> bool LockFreeQueue2<TYPE>::Flush() { if (!writerTop) return true; if (!readerTop) //        { readerTop = writerTop; return true; } return false; } #else template<class TYPE> void LockFreeQueue2<TYPE>::SetWriterFinished() { isWriterFinished = true; } #endif 


Define LOCK_FREE_QUEUE2_USE_FLUSH is used to compile two different behaviors of the queue at the end of the Writer's work. When the value is 1, the Writer should call the Flush method until he leaves his part of the queue to the Reader. If the value is 0, then the Writer simply sets the isWriterFinished variable to true and stops. The reader himself will take the rest at the end.
Here it seems that the flag of the Reader’s work is also needed, so that the Writer does not wait forever.

To this implementation, you can add some more features. For example, you can add an item counter. It must also be separate for each part of the queue so that simultaneous access to the counter variable does not occur. The sum of these counters will be the total number of items in the queue.
You can also add an event for the Reader, according to which he will wake up when the Writer hands him his part. But this is not so simple and there may be problems with simultaneous access.
This queue cannot be used for the situation by one Writer and many Readers, but you can use a router (dispatcher) and one queue for each reader. The router will have to determine which readers will add the message to the queue. In the simplest case, this criterion may be the minimum number of items in the queue.

Before publication of the article I googled a little on this topic, but I did not find anything similar. Maybe I was looking badly, so if I invented the bicycle, or if you find errors in the implementation, write in the comment.

Reference to the discussion in the RSDN.

Source: https://habr.com/ru/post/127868/


All Articles