📜 ⬆️ ⬇️

Information exchange between working threads without pain? CSP channels to help us

Developing a multithreaded code is a difficult task. Really difficult. Fortunately, high-level abstractions, such as task-based parallelism, map-reduce / fork-join, CSP, actors, etc., have long been invented to simplify the lives of developers.

But when you get to profile forums where C ++ nicknames communicate, you get the feeling that many are simply unaware of the presence of something simpler and more convenient than std :: thread coupled with std :: mutex + std :: condition_variable . Regularly there are questions from the category: “I need to run several workflows, in one this is done, in the second this, and in the third this. I run them like this, and exchange information between threads like this. Am I doing right? "

Obviously, such questions are asked by novices. But, first, the number of inexperienced youth in software development has always been great, and with the growth of the attractiveness of the IT industry, this number only increases. It’s sad that newbies know about std :: thread and std :: mutex, but don’t know about ready-made tools that could simplify their lives (like Intel TBB, HPX, QP / C ++, Boost.Fiber, FastFlow, CAF, SObjectizer, etc.).
')
And, secondly, among the answers to such questions are quite rare tips "take this ready-made tool, your task with its help is solved in just a few lines." More often, people discuss the low-level details of self-made implementations of thread-safe message queues.

All this suggests that it makes sense to show with simple examples how a particular framework can help in solving even small and seemingly simple tasks related to multithreading. Since we are developing SObjectizer as a tool to simplify the development of multi-threaded applications in C ++, today we will try to show how CSP channels implemented in SObjectizer-e can save a developer from a part of the headache when writing multi-threaded code.

Simple demo


In this article we will look at a simple demo. A small test application on the main thread of which there is a “dialogue” with the user. When the user enters the word "exit", the application is terminated.

There are two additional workflows in the application. One simulates a periodic "poll" of a certain sensor. On the second workflow, the information “taken” from the sensor is “written” to the file.

Naturally, no real work with the sensor and data files is performed; instead, the program organizes delays that block the working thread for some time. This simulates synchronous work with external devices and files.

Let such a rough imitation of the reader does not bother. The purpose of the article is to show the interaction between the workflows via CSP channels (which are called mchains in SObjectizer), and not to fill the worker threads with actual content.

The principle of the example "on the fingers"


So, in our example, in addition to the main thread there are two additional working threads.

The first working thread, which we will call meter_reader_thread, is for “polling” the sensor. This thread needs two mchains. The first mchain will be used to send commands to the thread meter_reader_thread itself. In particular, a message of type acquisition_turn will be placed on this channel by timer, upon receiving which meter_reader_thread will conduct a “survey”.

The second mchain is needed by meter_reader_thread in order to transmit the information “taken” from the sensor of the second working thread. The second working thread, which we will call file_writer_thread, is responsible for “writing” information to the file. The second working thread reads from the mchain command to write information and "executes" them. As long as there are no commands in mchain, the file_writer_thread thread is sleeping awaiting a new command.

It turns out such a simple scheme:



Work of both threads is completed as soon as mchains are closed in the main worker thread.

Parsing simple example text


The full source code of a simple example can be viewed in the repository created for illustration. We will go from simple to complex. We start the analysis with functions that perform the work of the file_writer_thread and meter_reader_thread threads, and then we will look at the implementation of the main () function, in which we will have to take into account a number of tricks related to multithreading.

Function file_writer_thread ()


The file_writer_thread () function is the simplest in this example. Here is its full text:

// ,    . void file_writer_thread( //        . so_5::mchain_t file_write_ch) { //       ,    . //        receive. receive(from(file_write_ch), //         //   write_data. [&](so_5::mhood_t<write_data> cmd) { //    . std::cout << cmd->file_name_ << ": write started" << std::endl; std::this_thread::sleep_for(350ms); std::cout << cmd->file_name_ << ": write finished" << std::endl; }); } 

All that file_writer_thread () does is hang inside the receive () call. The receive () function waits for a message to arrive on the channel and, when a message arrives on the channel, looks for a handler for this message among those handlers that are sent to receive ().

In this case, only one handler is passed - for a message of type write_data. When a message of this type enters the channel, this handler is called. Inside this handler, in essence, all the “business logic” is collected, i.e. Imitation of writing read data to a file.

The receive () function in SObjectizer has two versions. The first version, which we did not use in this example, waits for and retrieves only one message from the channel. The second version, which is shown above, extracts all messages from the channel and returns control only when the channel is closed. Those. in this case, the output from file_writer_thread () will occur only when the receive () call completes. And this happens when someone closes the file_write_ch channel.

Function meter_reader_thread ()


The meter_reader_thread function is somewhat more complicated:

 //     . void meter_reader_thread( // ,    . so_5::mchain_t timer_ch, // ,        . so_5::mchain_t file_write_ch) { //      . struct acquisition_turn : public so_5::signal_t {}; //   .      . int ordinal = 0; //  . auto timer = so_5::send_periodic<acquisition_turn>(timer_ch, 0ms, 750ms); //       ,    . //        receive. receive(from(timer_ch), //         //   acquire_turn. [&](so_5::mhood_t<acquisition_turn>) { //   . std::cout << "meter read started" << std::endl; std::this_thread::sleep_for(50ms); std::cout << "meter read finished" << std::endl; //      . so_5::send<write_data>(file_write_ch, "data_" + std::to_string(ordinal) + ".dat"); ++ordinal; }); } 

Here we, first, determine the type of signal acquisition_turn, which will come to us from time to time so that we can perform a simulation of the “survey” of the sensor.

Secondly, we launch this most periodic acquisition_turn signal by calling send_periodic (). Thanks to this, SObjectizer will send acquisition_turn to timer_ch every 750ms.

Well, after that, the familiar call to us will receive () from which we will exit only when the channel timer_ch is closed. Inside receive () we have implemented our signal acquisition_turn handler. In this handler, we simulate a “poll” of the sensor, and then give the command to write the “collected” data of the file_writer_thread thread by sending the write_data message to the file_write_ch channel.

So it turns out that meter_reader_thread sleeps all the time inside receive (), wakes up periodically when receiving acquisition_turn, then sends the write_data message to file_write_ch (that is, to the file_writer_thread thread) and goes back to sleep before the next acquisition_turn. Or until the timer_ch is closed.

Main () function


Before looking at the main () code, you need to describe a few small subtleties, without a discussion of which some of this code may not be clear.

The main problem that has to be solved when working with threads and CSP channels is the correct and timely completion of working threads. Those. if we create an instance of std :: thread and start a working thread with it, then we will have to call std :: thread :: join () to wait for the completion of the working thread (detached threads are not used here). The easiest way is to manually call std :: thread :: join () at the end of the main () function. Sort of:

 int main() { ... std::thread file_writer{file_writer_thread}; ... file_writer.join(); } 

But the bad thing is that such a naive approach does not protect us from exceptions or other forms of premature exit from the skoup (for example, the usual return).

Here we could be helped by some kind of auxiliary class that would call std :: thread :: join () in its destructor. For example, we could do something like:

 class auto_joiner { std::thread & t_; ... //   /. public: auto_joiner(std::thread & t) : t_{t} {} ~auto_joiner() { t_.join(); } }; int main() { ... std::thread file_writer{file_writer_thread}; auto_joiner file_writer_joiner{file_writer}; ... } 

When using a SObjectizer, there is no need to write such an auto_joiner yourself, since SObjectizer already has a similar tool. We will see its use in the main () code. It differs from the one shown above in that it can call join () not for one object of std :: thread, but for several.

But besides calling std :: thread :: join (), in order to correctly stop the working thread in our example, we need to take into account one more thing: in order for the thread inside which receive () is called to complete its work, you must close mchain. If this is not done, the return from receive () will not occur and we will fall asleep forever on the call to std :: thread :: join ().

This means that we should take care of automatically closing the mchains when exiting main (). And here we will use the same approach as with the call to std :: thread :: join (): let's use the auxiliary object, which in its destructor calls close () for mchain. Those. we will do something like:

 int main() { ... auto ch = so_5::create_mchain(...); auto_closer ch_closer{ch}; ... } 

Again, we don’t need to do our implementation of this auxiliary class auto_closer, because SObjectizer is already ready.

We have already figured out how to relinquish ourselves on call join () for worker threads and automatic closing mchains. But there is one more very important point: in what order these operations should be performed. Because, if we write just such a simple and clear sequence:

 int main() { ... auto ch = so_5::create_mchain(...); auto_closer ch_closer{ch}; ... std::thread work_thread{[ch]{ receive(from(ch), ...); }}; auto_joiner work_thread_joiner{work_thread}; ... } 

then we get the classic deadlock and hangup in the auto_joiner destructor.

The problem is that the auto_joiner destructor is called before the auto_closer destructor. Those. we will try to join for the working thread, which hangs on receive () from the not yet closed mchain.

Therefore, in order for mchains to automatically close before join () is called for a working thread, you need to change the order in which the entities are created in the program:

 int main() { ... //    .       . std::thread work_thread; auto_joiner work_thread_joiner{work_thread}; ... //       . auto ch = so_5::create_mchain(...); auto_closer ch_closer{ch}; ... //       . work_thread = std::thread{[ch]{ receive(from(ch), ...); }}; ... } 

And now, after explaining the main nuances, you can look at the code for the main () function itself:

 int main() { //  SObjectizer. so_5::wrapped_env_t sobj; // -     ... std::thread meter_reader, file_writer; // ...      joiner. //     join()      // main.    ,      main : // -    - /. auto joiner = so_5::auto_join(meter_reader, file_writer); //  ,     . auto timer_ch = so_5::create_mchain(sobj); auto writer_ch = so_5::create_mchain(sobj); //         main. //    ,       // receive()  join()    . auto closer = so_5::auto_close_drop_content(timer_ch, writer_ch); //      . meter_reader = std::thread(meter_reader_thread, timer_ch, writer_ch); file_writer = std::thread(file_writer_thread, writer_ch); //        exit  //      . std::cout << "Type 'exit' to quit:" << std::endl; std::string cmd; while(std::getline(std::cin, cmd)) { if("exit" == cmd) break; else std::cout << "Type 'exit' to quit" << std::endl; } //   main.      // (  closer),      // (  joiner). return 0; } 

I hope that basically this code is clear. And explanations may be required unless for two small moments.

First, it is the creation of an instance of so_5 :: wrapped_env_t at the beginning of main. Behind this instance will be hidden SObjectizer Environment . And we need the SObjectizer Environment both for creating mchains and for servicing timers (the send_periodic () call in meter_reader_thread hides in itself an appeal to the SObjectizer ov timer).

The second is the auto_close_drop_content call. On the one hand, it’s understandable: this function returns an auto_closer object, which automatically closes the mchains in its destructor. But, on the other hand, what does drop_content mean in the name of this function?

The fact is that in SObjectizer you can close mchain in two modes. In the first mode, mchain is closed with discarding all messages in the mchain that have not yet been processed by the receive () functions. For example, at the time of calling close (), there are 100500 messages in mchain. All these messages will be destroyed and they will not get to the recipients. This mode is called drop_content and the auto_close_drop_content function just creates auto_closer, which will close mchain in drop_content mode.

The second mode of closing mchain, on the contrary, saves all messages in mchain. This allows the receive () function to finish processing the contents of mchain. But here it will be impossible to add new messages to mchain, because mchain is already closed (for writing). This mode is called, respectively, retain_content.

Both the mchain, drop_content and retain_content close modes are good in different situations. In this example, we need drop_content, which is why auto_close_drop_content is used.

The result of the first example


If we run our first example, we will see the expected picture:



We here see consistent "polls" and "record" of the results of these "polls".

The complication of a simple example: control the load on file_writer_thread


The full source code of the second example can be found here .

The first version of our example turned out to be very idealized: we believe that the recording of the data “taken” from the sensor will always be completed by the next “survey”. But in real life, most likely, the time of operations with external devices can “float” in rather wide limits. This means that it would make sense for us to take care of the situation when the “writing” to the file takes longer and messages will accumulate in the mchain with the write_data messages.

In order to simulate a similar situation, slightly modify the already shown above functions meter_reader_thread () and file_writer_thread (). In meter_reader_thread (), just increase the acquisition_turn signal acquisition rate:

 auto timer = so_5::send_periodic<acquisition_turn>(timer_ch, 0ms, 300ms); 

But in file_writer_thread () we make it so that the time of the “write” operation is chosen randomly from the range [295ms, 1s]. Those. sometimes the operation of “recording” will fall within the intervals between “polls”, but in most cases it will not. Sometimes it will not fit very much. So, this is how we modify file_writer_thread ():

 // ,    . void file_writer_thread( //        . so_5::mchain_t file_write_ch) { //      . std::mt19937 rd_gen{std::random_device{}()}; //         //  [295ms, 1s]. std::uniform_int_distribution<int> rd_dist{295, 1000}; //       ,    . //        receive. receive(from(file_write_ch), //         //   write_data. [&](so_5::mhood_t<write_data> cmd) { //     "". const auto pause = rd_dist(rd_gen); //    . std::cout << cmd->file_name_ << ": write started (pause:" << pause << "ms)" << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds{pause}); std::cout << cmd->file_name_ << ": write finished" << std::endl; }); } 

It turns out that raw write_data messages can now accumulate in file_write_ch. There is a problem of congestion that is widely known in narrow circles: this is when a data provider generates new data with a higher rate than the data consumer is able to handle. The problem is unpleasant, you need to fight it.

For example, it is possible to implement the back pressure mechanism. Those. when the data provider begins to overload the consumer, the consumer in one way or another lets the supplier know about it. In the case of CSP shny channels, a completely natural way to implement “back pressure” would be to block the data provider for writing to the channel until the consumer is free enough to accept the next piece of data from the supplier.

By the way, in this respect, the CSP Model in some data processing scenarios is much more convenient than the Actor Model. Indeed, in the Model of Actors, the data exchange between the supplier and the consumer is carried out only through asynchronous messages. Those. the supplier, sending the next message to the consumer, does not know how much the consumer is loaded, whether the next message will lead to overload and, if it does, how long it takes to wait before sending the next message. Whereas in the CSP Model of the supplier, you can “put down” to write to the channel and “wake up” the supplier after the consumer has dealt with his load.
So, we would like the supplier, i.e. In our case, meter_reader_thread, fell asleep if file_writer_thread does not have time to parse and process the messages previously sent to file_write_ch. Can SObjectizer mchains provide us with this?

Yes.

To do this, you need to set additional properties for mchain while creating mchain. In the first version of our example, we created mchain in the simplest way, like this:

 auto writer_ch = so_5::create_mchain(sobj); 

In this case, a “dimensionless” channel is created, as many messages can be pushed into such a channel as the size of free RAM allows.

Since we want "back pressure", the "dimensionless" channel does not suit us. So we need to limit the number of messages that can wait in the channel of its processing.

We also want the information provider to fall asleep while trying to write to the completed channel. There are no problems with this, but in SObjectizer you need to set an upper bound for such an expectation. For example, fall asleep while trying to write to the channel, but sleep no more than five seconds (or five hours, it depends on the task).

SObjectizer requires the developer to limit the maximum waiting time for recording into the filled channel, because without such a limit it is easy to catch deadlock. Say, the T1 thread is trying to write a message to the overflow channel C1 for the T2 thread, which at this moment is trying to write a message to the overflowed channel C2 for the T3 thread. And at this moment the thread T3 is trying to write a message to the crowded channel C0 for the thread T1. In the case of a limit on the maximum waiting time, such a deadlock will eventually be broken automatically.

So, we set the size of the channel and the maximum wait time, but the question remains: “What should I do with the recording operation in a crowded channel if the channel is not free even after waiting?”

In SObjectizer, you can choose what to do if the channel is not free even after waiting. For example, you can throw away the oldest message that is in the channel. Or you can ignore the new message that we tried to insert into the channel. Or you can make the send () function throw an exception in this case.

In our example, we use the following reaction: discarding the oldest message. In this case, it is quite logical, since we already have “new data” from the sensor, their recording is more relevant than storing old data. Therefore, in the updated example, we will create a channel for write_data messages as follows:

 //        ,   //      mchain      // ,       . auto writer_ch = so_5::create_mchain(sobj, //      300ms. 300ms, //   mchain-    2- . 2, //   mchain  . so_5::mchain_props::memory_usage_t::preallocated, //    mchain-     , //       mchain-. so_5::mchain_props::overflow_reaction_t::remove_oldest); 

Additional explanation can be given unless the argument so_5 :: mchain_props :: memory_usage_t :: preallocated. This argument determines how memory will be allocated for queuing messages within the channel itself. Since the channel we have is fixed and small, then it makes sense to allocate space for the message queue immediately. What we are doing in this case.

Thread Channel Limit meter_reader_thread


In the second example, we limited the channel size for write_data messages. But we also have a channel for acquisition_turn signals. Maybe it makes sense to limit it as well?

Indeed, there is meaning. It is generally enough for us to have a single message capacity for acquisition_turn. If the acquisition_turn signal is already in the channel, then there is no point in adding a new one.

Therefore, we modify the code snippet in which we create this channel:

 //        , //        mchain    //   . auto timer_ch = so_5::create_mchain(sobj, //      . 1, //   mchain  . so_5::mchain_props::memory_usage_t::preallocated, //   ,     . so_5::mchain_props::overflow_reaction_t::drop_newest); 

Here we see two important differences:


The result of the second example


When we run the second example, we can already see the following picture:



You can see that some of the numbers from the debug print thread file_writer_thread disappeared. For example, after the data_24.dat record, the data_26.dat record follows. And there is no data_25.dat record. This is because the write_data message for data_25.dat was thrown from the channel when it was full.

In addition, we can see that when the file_writer_thread thread for a long time "leaves" the record, during this time the meter_reader_thread thread manages to conduct several "polls".

Let's complicate the example again: add control meter_reader_thread


The full source code of the third example can be found here .
You can not deny yourself the temptation to complicate the example again: this time add the ability to manage the thread meter_reader_thread. Indeed, why not make it possible to increase or decrease the period of "polling" the sensor? Let's do it and do it.

Let the main thread now, in the process of dialogue with the user, understand not only the 'exit' command (terminate the application), but also the 'inc' command (lengthen the polling period by 1.5 times) and 'dec' (shorten the polling period by 1.5 times).

The main issue that we have to solve in this case is the question of the delivery of the inc and dec commands from the main thread of the application to the meter_reader_thread thread. But in fact - this is not a question. We will simply generate two new signals:

 // ,         //   . struct dec_read_period : public so_5::signal_t {}; // ,         //   . struct inc_read_period : public so_5::signal_t {}; 

The main thread will send these signals to the appropriate channel when the user enters this or that command:

 //        exit  //      . bool stop_execution = false; while(!stop_execution) { std::cout << "Type 'exit' to quit, 'inc' or 'dec':" << std::endl; std::string cmd; if(std::getline(std::cin, cmd)) { if("exit" == cmd) stop_execution = true; else if("inc" == cmd) so_5::send<inc_read_period>(control_ch); else if("dec" == cmd) so_5::send<dec_read_period>(control_ch); } else stop_execution = true; } 

But which channel exactly will we send these signals to? This question is much more interesting.

In principle, we could use the same channel for periodic acquisition_turn, and for inc_ / dec_read_period. But in order to show that SObjectizer is still able to work with mchain, we will use two different channels for meter_reader_thread:


For ease of implementation of the third example, both of these channels will be created and closed in the main () function, and in the meter_reader_thread () function they will be passed as parameters:

 //  ,     . //    meter_reader_thread.  - . auto control_ch = so_5::create_mchain(sobj); // ,      acquisition_turn. auto timer_ch = so_5::create_mchain(control_ch->environment(), //      . 1, //   mchain  . so_5::mchain_props::memory_usage_t::preallocated, //   ,     . so_5::mchain_props::overflow_reaction_t::drop_newest); ... //         main. //    ,       // receive()  join()    . auto closer = so_5::auto_close_drop_content(control_ch, timer_ch, writer_ch); //      . meter_reader = std::thread(meter_reader_thread, control_ch, timer_ch, writer_ch); ... 


Modified version of meter_reader_thread () function


The meter_reader_thread () function will grow significantly in size, since now it should perform more actions. And although I myself do not really like the functions, the text of which does not fit on one screen, in this case I had to write such a voluminous function, so as not to smear fragments of business logic on auxiliary functions.

Compared with the first and second examples, in the third example there were two fundamentally important changes in meter_reader_thread.

First, now the “polling” period of the sensor may change. Because of this, it is not profitable for us to run acquisition_turn as a periodic message. You have to restart it each time when the period changes. Therefore, now we will go another way: when processing the next acquisition_turn, we will note the time spent on the next “poll” and sending write_data. After that, either we will immediately send ourselves the acquisition_turn without delay, if we spent too much time on the “survey”. Or we will send a deferred acquisition_turn, while the delay in delivery will be the delta between the current polling period and the actual elapsed time.

We will have the following acqusition_turn processing fragment:

 //         //   acquire_turn. [&](so_5::mhood_t<acquisition_turn>) { //   ,      //  .   . const auto started_at = std::chrono::steady_clock::now(); //   . std::cout << "meter read started" << std::endl; std::this_thread::sleep_for(50ms); std::cout << "meter read finished" << std::endl; //      . so_5::send<write_data>(file_write_ch, "data_" + std::to_string(ordinal) + ".dat"); ++ordinal; //         // . const auto duration = std::chrono::steady_clock::now() - started_at; //    ,    //   . if(duration >= current_period) { std::cout << "period=" << current_period.count() << "ms, no sleep" << std::endl; so_5::send<acquisition_turn>(timer_ch); } else { //        "". const auto sleep_time = to_ms(current_period - duration); std::cout << "period=" << current_period.count() << "ms, sleep=" << sleep_time.count() << "ms" << std::endl; so_5::send_delayed<acquisition_turn>(timer_ch, current_period - duration); } } 

Secondly, we now have to wait for the message not from one channel, but from two at once. To which channel the message came first, from this we must take the message and process the message taken.

To do this, we will use the so_5 :: select () function, which is similar to the previously shown so_5 :: receive (). But, unlike receive (), the select () function can wait for incoming messages from several channels.

As a result, in meter_reader_thread we make the following call to select () (schematically, omitting the details of the implementations of the handlers):

 //       ,    . so_5::select(so_5::from_all(), //     . case_(timer_ch, //         //   acquire_turn. [&](so_5::mhood_t<acquisition_turn>) { ... //  . }), //     . case_(control_ch, //    . [&](so_5::mhood_t<inc_read_period>) { ... //  . }, //    . [&](so_5::mhood_t<dec_read_period>) { ... //  . }) ); 

Those.we say select () should wait for messages from all of the channels listed below until all channels are closed. Then, in the case_ sections, there is an enumeration of channels (one channel per section) and a list of handlers for messages from each channel.

So, from the timer_ch channel, we process only the acquisition_time signal, and from the control_ch channel - the inc_read_period and dec_read_period signals.

It turns out that in the third example, the meter_read_thread () function returns control only after the control returns select (). And select () ends when both timer_ch and control_ch are closed. What happens in main () - e at the completion of the application.

The result of the third example


When running the third example and issuing several inc commands, we can see the following picture:



Conclusion


We are developing SObjectizer as a tool to simplify the development of multi-threaded applications, and not as an implementation of any one approach to the problem of concurrent computing. Therefore, in SObjectizer you can find traces of the Actor Models, and Publish / Subscribe, and CSP. Earlier we talked more about the part of SObjectizer, which belongs to the Actors and Pub / Sub models. Today we tried to briefly introduce the reader to CSP channels. This is the second attempt, the first was last year .

If there is something incomprehensible in our story, we will be happy to answer questions in the comments. If someone wants us to illustrate something from the narrated picture / scheme, then tell me what it is - we will try to make the appropriate illustration and add it to the text.

It is possible that the example shown to someone will seem uninteresting and divorced from life. Still, our goal was his clarity. But, if someone from the readers can offer another, more vital example in order to illustrate his decision with the help of CSP-shny channels, then we will try to make the solution of the example offered by the readers and describe this decision in subsequent articles.

Well, at the end of the article we suggest everyone to try SObjectizer in and share their impressions. Feedback is very important for us and your wishes / comments allow us to develop SObjectizer and make it more powerful and more convenient.

Source: https://habr.com/ru/post/358120/


All Articles