⬆️ ⬇️

Scheduling server tasks with boost.task

Recently, on a profile resource, one programmer asked the question: “What should MMO use for working with streams?” . The programmer was inclined to Intel TBB , but not even to the basic primitives, but to custom task scheduling. Well, like TBB - well, okay. A little later, I saw the source code of the MMO server of another programmer, who recently began to correspond with him from scratch to improve the architecture. And there were a lot of bikes that were written by the programmer himself instead of using third-party components such as boost (for example, wrapper classes over pthread, and this is in 2010, when boost.thread is almost standard). There was implemented a thread pool support with a task scheduler. This topic is very interesting to me and I began to dig information about ready-made solutions for planning tasks (as in TBB) and found boost.task , which I decided to write about.





Definition



The task (task) is a logically united set of actions. The task scheduler asynchronously performs tasks based on certain strategies for choosing who should be running at the moment in what thread.

Tasks allow you to abstract from ordinary threads and operate at a higher level.



Why do I need a task scheduler?



How does a spherical server work in a vacuum? Very simple:

  1. A request comes from a client
  2. It is processed!
  3. The answer is sent


Well, in addition to the server, some processes may occur that are executed without a client request. For example, sending notifications to the entire user database, clearing the database of obsolete data (little key), processing daily statistics, and so on.

Now the snag is exactly how the request is processed. We need to figure out how to handle it.

Take for example a memcached-like server: we have a hash_map with data, there are read requests, there are write requests that make a simple hash map and return data, or write them into a hash map. So far everything is happening in one thread, but what if we need to use all the processors of the system?

We create as many threads as there are cores. We process users in each stream, which we scatter as a round-robin when creating a connection. When accessing the container, we use rwlock-i (boost :: shared_mutex). Fine. And how do we deal with the removal of elements from the container? Create a thread that wakes up once every N seconds and cleans the container.

It was a simple example, and now a more complex example: a service that, depending on the user's request, can make a request to the database, make an http request to some site. What will happen if the server is made according to the previous model (all requests to other components will be executed synchronously)? Well, the database is located on the same site as the server, the answer will be within a couple of milliseconds. Sending email is also not a problem - we put sendmail on the same machine, give him the data, and he himself will figure out how to send a letter.

Fine. Although not quite. And what to do with the http-request? It can take a very long time - it all depends on the site which is somewhere far away and it’s not known how long the request will process. In this case, the thread will be inactive, although there are many requests in the queue that can be executed, but they wait until this thread is released.

Such a request must be performed asynchronously. You can implement it like this:

class LongRequestHandler

{

public :

void Handle()

{

// read client request parameters

// mysql request 1

// mysql request 2

HttpRequestExecutor::GetInstance()->Execute(

"example.com?x=1" ,

boost::bind( this , &LongRequestHandler::HandleStage2)

);

}

void HandleStage2( const std:: string & http_request_result)

{

// mysql request 3

// write response to client

}

};




* This source code was highlighted with Source Code Highlighter .


HttpRequestExecutor accepts the url of the request and the callback, which must be called upon completion of the request (callback type - boost :: function).

And this approach works, though not too beautiful.

The Thinking Asynchronously in C ++ blog shows an interesting implementation of asynchronous tasks. The result looks like this:

template <typename Handler> void async_echo(

tcp::socket& socket,

mutable_buffer working_buffer,

Handler handler,

// coroutine state:

coroutine coro = coroutine(),

error_code ec = error_code(),

size_t length = 0)

{

reenter (coro)

{

entry:

while (!ec)

{

yield socket.async_read_some(

buffer(working_buffer),

bind(&async_echo<handler>,

ref (socket), working_buffer,

box(handler), coro, _1, _2));

if (ec) break ;

yield async_write(socket,

buffer(working_buffer, length),

bind(&async_echo<handler>,

ref (socket), working_buffer,

box(handler), coro, _1, _2));

}

handler(ec);

}

}




* This source code was highlighted with Source Code Highlighter .
Coroutine and yield in C ++ look unusually;) This is implemented on the define , you can read the blog how the author did it.

Gradually, the logic becomes more complicated, new elements are added, which need to be processed asynchronously, the implementation is also complicated. Further task

  mysql request 1
 mysql request 2
 http request 1
 mysql request 3
 http request 2
 mysql request 4
 mysql request 5


And executing it sequentially with stops in http requests, we see that requests

  mysql request 2
 http request 1


and

  mysql request 3
 http request 2
 mysql request 4


we can do it in parallel, and if we want to do this, then we will have to complicate the logic even more. And I would like to write a simple code, for example:

  mysql request 1
 x = run (func1)
 y = run (func2)
 wait (x, y)
 mysql request 5

 func1:
   mysql request 2
   http request 1

 func2:
   mysql request 3
   http request 2
   mysql request 4


This is where the task scheduler comes in handy.

')

Implementations



You can read about support for the task scheduler in the new standard 0x here .



I liked the boost.task the most. Further its detailed consideration.



Description boost.task



boost.task - the implementation of the proposal in the standard C ++ 0x . It supports setting task execution strategies, creating sub-tasks, interrupting tasks.

The library depends on:



boost.task and boost.fiber compiled libraries (boost.atomic and boost.move - header-only) - so you have to build them. To make it easier to experiment, I collected all the dependencies in one place, spiced it up with cmake and poured the project on github . It works on linux, for building under windows, you will need to add 2-3 lines to cmake files.



Usage example



The library API is fairly simple; implementing the request handler that was described above is not difficult. I will cite it again:

  mysql request 1

   mysql request 2
   http request 1

   mysql request 3
   http request 2
   mysql request 4

 mysql request 5


As an emulation of the request to mysql, the usual sleep for random time will be used:

 boost::this_thread::sleep(boost::posix_time::milliseconds(rand()%100 + 10)); 
    

boost::this_thread::sleep(boost::posix_time::milliseconds(rand()%100 + 10));



The asynchronous timer from boost :: asio will be used as the external http request.

So:

Request - request class.

class Request

{

public :

Request( const std:: string & data);

const std:: string & Read() const ;

void Write( const std:: string & answer);

};




* This source code was highlighted with Source Code Highlighter .


And RequestHandler is the request handler class.

class RequestHandler

{

public :

RequestHandler(boost::asio::io_service & io_service, const RequestPtr & request);

void Process() const ;

};




* This source code was highlighted with Source Code Highlighter .


io_service is passed in order to make an external call (use the timer boost :: asio :: deadline_timer). So let's begin. We define a pool of threads to handle our tasks:

boost::tasks::static_pool< boost::tasks::unbounded_fifo > pool( boost::tasks::poolsize( 5) );



* This source code was highlighted with Source Code Highlighter .


boost.task supports two basic types of task scheduling strategies:



It is also possible to set a strategy for processing tasks within the queue:



Accordingly, the described line of code creates a pool of 5 threads with an unlimited fifo type queue.

Now we need to create io_service and a pool of 3 threads to handle external requests.

boost::asio::io_service io_service;



* This source code was highlighted with Source Code Highlighter .


If you call io_service :: run at the moment when there are no tasks in it, the method will end immediately, and for normal operation we need running threads. This is usually achieved by adding an accept port to the io_service to which the clients are connected, and in this case io_service can be taken waiting for the timer to execute:

boost::asio::deadline_timer dummy_timer(io_service);

dummy_timer.expires_from_now(boost::posix_time::seconds(10));

// void dummy_handler(const boost::system::error_code&) {}

dummy_timer.async_wait(&dummy_handler);




* This source code was highlighted with Source Code Highlighter .


After that you can create a thread pool:

boost::thread_group io_service_thread_pool;

for ( int i = 0; i < 3; ++i)

io_service_thread_pool.create_thread(

boost::bind(&boost::asio::io_service::run, &io_service)

);




* This source code was highlighted with Source Code Highlighter .
Next, create a query:

RequestPtr request( new Request( "some data" ));

RequestHandlerPtr handler( new RequestHandler(io_service, request));



* This source code was highlighted with Source Code Highlighter .


Everything is ready, you can perform the task:

boost::tasks::handle< void > request_processing(

boost::tasks::async(

boost::tasks::make_task( &RequestHandler::Process, handler ),

pool));




* This source code was highlighted with Source Code Highlighter .
boost :: tasks :: make_task (& RequestHandler :: Process, handler) - creates a task to invoke a Process on the handler object that can be executed. boost :: tasks :: async initiates an asynchronous execution of the task. boost :: tasks :: handle an object by which you can track the status of the completion of the task, get the result if it is.

boost :: tasks :: async supports 4 task execution algorithms:



Next, wait until the task is completed:

request_processing.wait();



* This source code was highlighted with Source Code Highlighter .


And stop io_service:

io_service.stop();

io_service_thread_pool.join_all();




* This source code was highlighted with Source Code Highlighter .


The Process function is surprisingly very simple.

void Subtask1() const

{

Request( "query2" );

ExternalRequest( "extquery1" );

}



void Subtask2() const

{

Request( "query3" );

ExternalRequest( "extquery2" );

Request( "query4" );

}



void Process() const

{

std:: string data = request_->Read();



Request( "query1" );



boost::tasks::handle< void > subtask1(

boost::tasks::async(

boost::tasks::make_task( &RequestHandler::Subtask1, this )));

boost::tasks::handle< void > subtask2(

boost::tasks::async(

boost::tasks::make_task( &RequestHandler::Subtask2, this )));



boost::tasks::waitfor_all( subtask1, subtask2);



Request( "query5" );



request_->Write( "some answer" );

}




* This source code was highlighted with Source Code Highlighter .


Subtasks are performed using boost :: tasks :: async without specifying the policy to start and the as_sub_task algorithm is automatically selected, which performs tasks in the same thread pool as the parent task. The implementation of the subtask functions is also trivial.

RequestHandler :: Request - calls boost :: this_thread :: sleep, and with ExternalRequest everything is a bit more complicated:

void ExternalRequest( const std:: string & what) const

{

ExternalRequestHandler external_handler(io_service_);

boost::tasks::spin::auto_reset_event ev;

external_handler.PerformExternalReqeust(what, &ev);

ev.wait();

}



* This source code was highlighted with Source Code Highlighter .
A handler is created, as well as an event with automatic reset - boost :: tasks :: spin :: auto_reset_event. This event is passed to the external request handler and upon its completion ev.set () will be called, and until then ev.wait () blocks the task.

In contrast to normal flows and synchronization primitives (boost :: condition), ev.wait () does not block the flow, but blocks the task (calls it in the this_task :: yield () loop). This means that processor resources will be used by other tasks.

The entire file can be found here .



findings



boost.task is quite a convenient library for scheduling tasks. It allows you to see how support for asynchronous code execution in the new C ++ 0x standard will look, and you can use it right now without waiting for the standard to be released.

Code using boost.task becomes smaller and much clearer than with normal use of threads.

There are certainly some flaws: the code is not yet optimized, which can cause problems in rare cases; The library is not yet accepted in boost (along with its dependencies).



What to read on the topic?



Source: https://habr.com/ru/post/88288/



All Articles