📜 ⬆️ ⬇️

Non-blocking queues: message exchange between threads

The idea to write a similar module spawned the PLM of our corporate product. Inspecting our design documentation, we were told that in no case should our code block the task from which it is called, and generally take away as little as possible of its time, such is the feature of building the system , watchdogs, etc. The task involved the transfer of certain messages from one task to another, something like logs, advanced diagnostics. Task recipient is created to record the result in a file, since it is obvious from the original task of writing to a file that there can be no talk. And although the source (producer) is one, and the consumer (consumer) is also one, and even the presence of mutexes or semaphores would not affect the initial task, it was decided to completely abandon them. Again, in the future it was possible to extend the task to several other tasks, and therefore the situation when one task expects another, although limitedly acceptable (and the source code for exchanging its informational messages does use semaphores), is very undesirable.

It was originally intended to make a static circular buffer, where each element contains a bit that determines the origin or consumer. The algorithm is extremely simple, the source writes its data to the cell where the bit is zero, and then “publishes” the change, writing one to this bit and proceeds to the next element. The consumer from the element for which this bit is equal to one reads the message, and then the bit clears. No race conditions, everything seems to be fine. But the very first traffic test revealed that in one slice the source could theoretically produce about 30-40 thousand elements. In reality, of course, it will be less, since it still does something except for the production of these lines, but it is not possible to determine the size of the buffer, which would be sufficient. One of the reasons for this is also unstable write speed to the file - on some systems there are CF cards instead of hard drives. And I would not like to lose messages.

Having rummaged in the Internet, I came across the following solution, which I implemented in my task: drdobbs.com/architecture-and-design/210604448
The algorithm is described in sufficient detail; I will not repeat it here.
Two changes I made:
1) I do not understand why the source, not the consumer, frees the elements. The release of elements by the consumer also does not create the race condition (by the way, how can this phrase be understood in Russian?). This removes some of the load from the consumer and reduces memory usage, as consumed items are removed immediately.
2) Also, a traffic test, or rather a profiler, revealed that malloc is a relatively expensive operation. Since the maximum size of the original messages is known, it was decided to group the allocation of memory in one operation at once into 8 elements. This gave more than a twofold increase in speed, in particular, halved the processor load that we add to the original task.
')
Here it is necessary to make a reservation right away that the source code was on a plugin-free C, which, moreover, by virtue of a non-disclosure agreement, I cannot publish. Malloch doesn’t belong to C-Sharpe anymore, so the second point is no longer applicable. And I study B-Sharp already in my free time, and I am writing for myself. Once they offered a job, I didn’t go through just the lack of experience with this magic language, and since then I have been doing it. Well, okay, more to the point.

Implementing a non-blocking C # queue

The first step is to describe the queue element.
         class queItem
         {
           public object message;
           public queItem next;

           public queItem (object message = null)
           {
             this.message = message;
             next = null;
           }
         }

And actually the queue itself:
       class locklessQueue // thread-to-thread lockless queue
       {
         queItem first;
         queItem last;
       }

Here, first belongs to the consumer, and next in the last element belongs to the source. Neither first nor last should be equal to null, therefore the constructor creates an empty element in the “already consumed” state.
         public locklessQueue ()
         {
           first = new queItem ();
           last = first;
         }

Further methods of adding to the queue and, accordingly, retrieving from it.
         public void produce (object message)
         {
           last.next = new queItem (message);
           last = last.next;
         }

         public bool consume (out object message)
         {
           if (first == last || first.next == null)
           {
             message = null;
             return false;
           }
           message = first.next.message;
           first = first.next;
           return true;
         }
       }

The resulting class itself is not very reasonable, since ConcurrentQueue is already included in Docket 4.0, which not only is completely thread-safe, but also, in contrast to the resulting class, allows you to add and remove several threads from the queue at once. And allows you to work with the queue 1.5-3 times faster in comparison with the blocking option. pruflink
For the log collector, the ConcurrentQueue class is more than enough. However, I expanded the task to my own, and the ConcurrentQueue did not suit me, it is addressless.

Messaging between threads


image
Each thread should be able to send a message to another thread by its name. In my case, this is a tcp socket handler (client or server) and the actual thread handlers. How to find out what kind of handler should be sent, I leave it outside of this note.

Unfortunately, I could not solve one of the subtasks - to add a new stream as a participant in the exchange without blocking. I would like to see the source code ConcurrentQueue, perhaps its solution would help find the answer. In Sharpe, the truth can be used to send initiating messages, or for messages from asynchronous methods, but for now I’ll give a blocking solution, almost the same as it would be in classical C.
Looking ahead, I will say that the decision made has an obvious disadvantage: to process the queues, you need to start a separate stream, this is a payment for the absence of blocking. A separate thread will obviously add processor load, but the absence of a block will speed up the processing of each message. It is difficult to assess how the performance will improve / deteriorate and how it will scale, it is possible that I will conduct such research in the future.

So for each participant's thread, you need to create two queues, in one it will send messages and read from the other. A “proxying” container for these two lines:
     class threadNode
     {
       public string tName;
       public int tid;
       locklessQueue outgoing = new locklessQueue ();  // from messenger to node 
       locklessQueue incoming = new locklessQueue ();  // from node to messenger

       public threadNode (string tName, int tid)
       {
         this.tid = tid;
         this.tName = tName;
       }

       public void enqueue (messengerItem message) // called by Node
       {
         incoming.produce (message);
       }

       public bool dequeue (out messengerItem message) // called by Node
       {
         object msg;
         bool result = outgoing.consume (out msg);
         message = msg as messengerItem;
         return result;
       }

       public void transmit (messengerItem message) // called by Messenger
       {
         outgoing.produce (message);
       }

       public bool retrieve (out messengerItem message) // called by Messenger
       {
         object msg;
         bool result = incoming.consume (out msg);
         message = msg as messengerItem;
         return result;
       }
     }

As you can see, an object of the messengerItem type, represented by the following class, is put in the queue:
     class messengerItem
     {
       public string from;
       public string to;
       public object message;

       public messengerItem (string from, string to, object message)
       {
         this.from = from;
         this.to = to;
         this.message = message;
       }
     }

I made the main class static to be able to send a message from anywhere in the code by writing Messenger.send (...);
   public static class Messenger
   {
     static Dictionary <int, threadNode> byTID = new myDictionary <int, threadNode> ();
     static Dictionary <string, threadNode> byReReNName = new myDictionary <string, threadNode> ();
     static Mutex regMutex = new Mutex ();  // only one task at a time

To search for the required node when sending a message from a thread, I use a Dictionary, with the key managedThreadId, and to the thread — the key is the name provided during registration. To send a message from one node to another, I start my own thread, the shell of which I don’t quote here, briefly - it jerks in the infinite loop the messenger Function described later and calls Thread. Sleep if the return value is false to give the slice.
     static bool messengerFunction () {bool acted = false;  messengerItem item;  threadNode dst;  Dictionary <string, threadNode> tmp = byRegName;  foreach (threadNode node in tmp.Values) {if (tmp! = byRegName) break;  if (node.retrieve (out item)) if (item! = null) {acted = true;  if (tmp.TryGetValue (item.to, out dst)) {dst.transmit (item);  sent = true;  } // else discard}} return acted;  } 

To register a stream in the messenger, use the following function, which is currently blocking:
     
     static public void register (string tName)
     {
       if (tName == null || tName == "")
         return;
       int tid = Thread.CurrentThread.ManagedThreadId;
       myDictionary <int, threadNode> newbyTID = new myDictionary <int, threadNode> ();
       myDictionary <string, threadNode> newbyRegName = new myDictionary <string, threadNode> ();
       threadNode newnode = new threadNode (tName, tid);
       newbyTID.Add (tid, newnode);
       newbyRegName.Add (tName, newnode);
       regMutex.WaitOne ();
       foreach (threadNode node in byTID.Values)
       {
         newbyTID.Add (node.tid, node);
         newbyRegName.Add (node.tName, node);
       }
       byTID = newbyTID;
       byReGName = newbyRegName;
       regMutex.ReleaseMutex ();
     }

Also, a similar function is used to unregister when a thread terminates, omitting its code. All that is left is the function of sending and receiving messages in threads.
     static public void send (string destination, object message)
     {
       int tid = Thread.CurrentThread.ManagedThreadId;
       threadNode node;
       if (byTID.TryGetValue (tid, out node))
         node.enqueue (new messengerItem (node.tName, destination, message));
     }

     static public bool receive (out object message, out string sender)
     {
       int tid = Thread.CurrentThread.ManagedThreadId;
       threadNode node;
       if (! byTID.TryGetValue (tid, out node))
       {
         sender = null;
         message = null;
         return false;
       }
       else
       {
       messengerItem item;
       bool result = node.dequeue (out item);
       if (! result || item == null)
       {
         sender = null;
         message = null;
       }
       else
       {
         message = item.message;
         sender = item.from;
       }
       return result;
       }
     }

The last function is used by handler threads in a loop, processing the message, or, if it is not received, by performing Thread.Sleep ().

Source: https://habr.com/ru/post/130101/


All Articles