⬆️ ⬇️

Multithreading in C ++ and SObjectizer with CSP channels, but without any actors ...

Previously, we talked about SObjectizer as an actor framework for C ++, although in reality this is not quite the case. For example, for a long time in SObjectizer there is such a cool thing as mchains (they are also channels from the CSP model). Mchains allow you to easily and naturally organize data exchange between workflows. Without creating agents that are not always needed. Just the other day, it was once again possible to take advantage of this feature and simplify one’s life by transferring data between streams via channels (ie, SObjectizer mchains). So not only in Go you can enjoy the use of CSP. In C ++ this is also possible. Who cares what and how, I ask under the cat.



The task was as follows: there is some kind of third-party system to which you need to make synchronous requests. It was necessary to see how this system behaves, if requests to it go not into one stream, but into several. For this, it was necessary to make an existing single-threaded multi-threaded client, the working threads of which would issue their own stream of requests to a third-party system.



The complete list of requests to be executed was in a separate file. So it was necessary to consistently read this file, get another request and give it to one of the free working threads. Each thread counted the number of queries executed. It was necessary to determine how long it would take to read and process all requests, as well as to calculate how many requests were completed.



Obviously, a simple solution was suggested. There is a main working thread that reads the request file. Each request is placed in the general queue of requests. Where requests are handled by worker threads. Those. The working thread takes the first request from the queue, executes it, then takes the new first request from the queue, and so on. If the queue is empty, then the working thread should pause until something in the queue appears. If it is very full, the main thread should be suspended until there is free space in the queue.



Mchains from SObjectizer just allow you to do without writing your thread-safe queues.



To solve this problem, two mchains were required. The first mchain is used to send read requests to the worker threads. Home thread writes requests to it,

worker threads read requests from there. When the request file is read completely, the main thread simply closes this mchain. Accordingly, as soon as the worker threads see that there is nothing in mchain and it is closed, they will complete their work.



The second mchain was required in order for the workers to be able to send information to the main thread that they had assured their work and how many requests they had processed. In this mchain, worker threads write just one message each. And the main thread only reads from this mchain.



Well, now you can see how it all looks in the code. Code without comments, because it was a one-time program for emission. Therefore, the necessary explanations will be given after the corresponding piece of code.



Let's start with the run_app function, which is called from main () immediately after the program parses the command line parameters:



void run_app( const app_args_t & args ) { so_5::wrapped_env_t sobj( []( so_5::environment_t & ) {}, []( so_5::environment_params_t & params ) { params.infrastructure_factory( so_5::env_infrastructures::simple_mtsafe::factory() ); } ); auto tasks_ch = create_mchain( sobj, std::chrono::seconds(5), 50, so_5::mchain_props::memory_usage_t::preallocated, so_5::mchain_props::overflow_reaction_t::abort_app ); auto finish_ack_ch = create_mchain( sobj ); std::vector< std::thread > workers; const auto cleanup = cpp_util_3::at_scope_exit( [&] { so_5::close_drop_content( finish_ack_ch ); so_5::close_drop_content( tasks_ch ); for( auto & t : workers ) t.join(); } ); cpp_util_3::n_times( args.m_threads_count, [&] { workers.emplace_back( [&] { worker_thread( args, tasks_ch, finish_ack_ch ); } ); } ); do_main_work( args, tasks_ch, finish_ack_ch ); } 


Here you first create an instance of the SObjectizer Environment, which will own the mchains. Without a SOEnvironment, you cannot create mchain, so you have to create an SOEnvironment.



But we do not need a full-fledged SOEnvironment, which is designed to create a cloud of agents in the application, for which SOEnvironment is forced to efficiently manage to create several of its own auxiliary threads. Therefore, in the parameters of SOEnvironment, we ask to use a special, single-threaded version of SObjectizer . In this case, wrapped_env_t will create one auxiliary thread on which the so_5 :: launch () call will occur and that's it. More SObjectizer will not do anything. And this auxiliary thread will sleep in so_5 :: launch () until it returns from run_app.



Next, we need mchain to distribute requests to the worker threads. This is tasks_ch. But this is not a simple mchain. First, it is mchain limited capacity. Attempting to add another message to the completed mchain will block the current thread. But blocking is not forever, but only for 5 seconds. If even after 5 seconds there is no free space in mchain, the entire application will be interrupted by calling std :: abort (). In this case, it is justified, since under normal conditions, none of the working threads should fall asleep for more than a few milliseconds, not to mention 5 seconds. So if in 5 seconds there is no free space in tasks_ch, then something definitely goes wrong, so you need to call std :: abort (). In addition, since tasks_ch have a known size in advance, we order to immediately allocate the necessary memory for the entire message queue in mchain.



With the second mchain, to which the worker threads will send finish_ack messages, everything is much simpler. Therefore, finish_ack_ch is created by a simple call to create_mchain, with default parameters (dimensionless mchain without blocking on send operations).



Next, we need to run N worker threads and save them in the workers vector. But it's not so simple. We can get an exception when creating the next working thread. In this case, it would be useful for us to properly complete those threads that have already been created.



To simplify life with a rollback of previously performed operations, use the analog of the D-shny scope_exit (well, or the analog of the BOOST_SCOPE_EXIT or Go-shny defer, here it is closer to someone). The variable cleanup, in essence, is an object with a lambda inside. This lambda is called when the destructor of the cleanup variable is called. It is created by cleanup using the small cpp_util helper library. Another clarification about cleanup: the first thing we need to do when cleaning is to close the mchains. If one of the worker threads has already started and falls asleep on the receive call from tasks_ch, then closing tasks_ch in cleanup will immediately wake up this thread and allow it to complete its work.



Well, then we create the worker threads and call do_main_work. Inside do_main_work, the main work of the main thread of the application is performed: reading a file with requests, sending requests to working threads, collecting results. Here is what the simplified version of do_main_work looks like, from which minor details have been removed:



 void do_main_work( const app_args_t & args, so_5::mchain_t tasks_ch, so_5::mchain_t finish_ack_ch ) { data_file_handler_t file{ args.m_data_file, args.m_force_keep_alive }; const auto started_at = hires_clock::now(); while( !file.is_eof() ) { auto request = file.get_next_request(); if( !request ) break; so_5::send< std::string >( tasks_ch, *request ); } so_5::close_retain_content( tasks_ch ); unsigned long long total_requests{}; so_5::receive( from(finish_ack_ch).handle_n( args.m_threads_count ), [&]( const finish_ack_t & what ) { total_requests += what.m_requests; } ); const auto total_time = hires_clock::now() - started_at; if( total_requests ) { ... // Print the results... } } 


All the most interesting here is collected in two places.



First, inside while. There, requests from a file are sequentially read and transmitted to the worker threads by calling send. If send is called when tasks_ch is completely filled, the main thread will be suspended (but not longer than 5 seconds).



Secondly, when the entire file with requests is read, we need to wait for answers from all the worker threads. To do this, we first close tasks_ch, so that the worker threads understand that it’s time to complete their work. But you need to close so that those requests that are already in the queue but have not yet been processed are not lost. Therefore, close_retain_content is called (however, for cleanup actions, close_drop_content was used in run_app, because there we don’t need to save anything in the closed channel).



After the tasks_ch is closed, you need to wait for a response from the N worker threads. This expectation of exactly N responses is recorded in one magic line:



 so_5::receive( from(finish_ack_ch).handle_n( args.m_threads_count ), 


It says literally the following: read from the finish_ack_ch channel until exactly threads_count messages are read and processed.



Well, to complete the picture you need to show how the code of the working thread looks. It is quite simple:



 void worker_thread( const app_args_t & args, so_5::mchain_t tasks_ch, so_5::mchain_t finish_ack_ch ) { io_performer_t io_performer{ args.m_srv, args.m_port }; unsigned long long total_requests{}; so_5::receive( from(tasks_ch), [&]( const std::string & request ) { io_performer.request_response( request ); ++total_requests; } ); so_5::send< finish_ack_t >( finish_ack_ch, total_requests ); } 


The thread just hangs inside the receive from the tasks_ch channel. The return from receive will occur when closing tasks_ch. If tasks_ch is empty, then receive will sleep until

until something enters the channel (or until the channel is closed). And when the return from receive occurs, the working thread simply sends the finish_ack message to finish_ack_ch and ends.



That's all.



It must be said that with the multithreading itself and the exchange of information between the streams there were no problems. Literally from the first time it started and earned. Problems arose within the implementation of io_performer_t :: request_response, when due to errors in the implementation of the interaction between the client and the server, the current thread hung. It was then that the 5-second limit helped to wait for the write to complete tasks_ch: when the threads started to hang, the timeout worked and the multi-threaded client crashed. It immediately became clear that there is a bug, and the bug is in request_response, since only hanging there could stop normal reading from tasks_ch.



In conclusion, I would like to say that the Actor Model and the Interactive Sequential Process Model (aka CSP) are excellent pieces. Somewhere one works well, somewhere second. SObjectizer allows you to use both. And then all at once, sometimes it is necessary.



')

Source: https://habr.com/ru/post/336854/



All Articles