📜 ⬆️ ⬇️

Learn ActionBlock: or a short story about otherwise deadlock

I think that almost every real project uses one or another form of implementation of the supplier -consumer queue . The idea behind the problem is quite simple. The application needs to unleash the production of some data from their processing. Take, for example, the thread pool in the CLR: we add an item for processing by calling ThreadPool.QueueUserWorkItem , and the thread pool itself decides what number of worker threads are most optimal and calls methods to process items with the right degree of parallelism.

But using a standard thread pool is not always possible and / or reasonable. Although it is possible to specify the minimum and maximum number of threads, this configuration is global and will affect the application as a whole, rather than the parts it needs. There are many other ways to solve the problem of the supplier of the consumer. It can be a head-on solution when the application logic is mixed with aspects of multithreading, queues, and synchronization. It can be a wrapper over the BlockingCollection with manual control of the number of workflows or tasks. Or it could be a solution based on a completely turnkey solution, such as an ActionBlock <T> from a TPL DataFlow.

Today we will look at the internal device of the ActionBlock class, discuss the design decisions that were made by its authors and find out why we need to know all this in order to get around some of the problems when using it. Ready? Well then, let's go!

On my current project we have a number of cases when we need to solve the problem of the supplier-consumer. One of them looks like this: we have a custom parser and interpreter for a language very similar to TypeScript. Without going deep into details, we can say that we need to parse a set of files and get the so-called "transitive closure" of all dependencies. Then they need to be converted into a performance suitable for execution and execute.
')
The parsing logic looks like this:

  1. Parsim file.
  2. We analyze its contents and look for its dependencies (by analyzing all the 'import * from', 'require' and similar constructions).
  3. We calculate the dependencies (i.e., we find the set of files that are required by the current file for normal operation).
  4. Add the resulting file dependencies to the list for parsing.

Pretty simple, isn't it? And there is. Here is how a slightly simplified implementation based on TPL Dataflow and the ActionBlock <T> class will look like:

private static Task<ParsedFile> ParseFileAsync(string path) { Console.WriteLine($"Parsing '{path}'. {{0}}", $"Thread Id - {Thread.CurrentThread.ManagedThreadId}"); Thread.Sleep(10); return Task.FromResult( new ParsedFile() { FileName = path, Dependencies = GetFileDependencies(path), }); } static void Main(string[] args) { long numberOfProcessedFiles = 0; ActionBlock<string> actionBlock = null; Func<string, Task> processFile = async path => { Interlocked.Increment(ref numberOfProcessedFiles); ParsedFile parsedFile = await ParseFileAsync(path); foreach (var dependency in parsedFile.Dependencies) { Console.WriteLine($"Sending '{dependency}' to the queue... {{0}}", $"Thread Id - {Thread.CurrentThread.ManagedThreadId}"); await actionBlock.SendAsync(dependency); } if (actionBlock.InputCount == 0) { // This is a marker that this is a last file and there // is nothing to process actionBlock.Complete(); } }; actionBlock = new ActionBlock<string>(processFile); actionBlock.SendAsync("FooBar.ts").GetAwaiter().GetResult(); Console.WriteLine("Waiting for an action block to finish..."); actionBlock.Completion.GetAwaiter().GetResult(); Console.WriteLine($"Done. Processed {numberOfProcessedFiles}"); Console.ReadLine(); } 

Let's see what happens here. For simplicity, all the main logic is in the Main method. The variable numberOfProcessedFiles is used to validate the logic and contains the total number of processed files. The main work is done in the processFile delegate, which is then passed to the ActionBlock constructor. This delegate simultaneously plays the role of "consumer" and "supplier": it takes the path to the file through the path argument, parses the file, finds its dependencies, and sends new files to the queue by calling the actionBlock.SendAsync method. Then there is a check of the number of elements in the processing queue, and if there are no new elements, the whole operation is completed by calling actionBlock.Complete () (*). Then, the Main method creates an instance of ActionBlock , starts processing the first file, and waits until the end of the entire process.

The ParseFileAsync method emulates the file parsing process and calculates dependencies using the following primitive logic: the file 'foo.ts' depends on 'fo.ts', which depends on 'f.ts'. Those. each file depends on a file with a shorter name. This is unrealistic logic, but it allows you to show the basic idea of ​​calculating the transitive closure of files.

The ActionBlock class manages concurrency for you. The truth is that it should be taken into account that the default “degree of parallelism” (degree of parallelism) is 1 and to change this, you must pass an instance of the class ExecutionDataflowBlockOptions in the ActionBlock constructor. If the MaxDegreeOfParallelism property is greater than 1, then the ActionBlock will call a callback delegate from different threads (in fact, from different tasks) for parallel processing of queue elements.

Post vs. SendAsync: what and when to use


Everyone who at least once tried to independently solve the problem of the supplier-consumer faced the problem: what to do when the flow of input data exceeds the capabilities of consumers in processing? How do you throttle the input stream? Just keep all input elements in memory? Generate an exception? Return false in the add item method? Use a circular buffer and discard old items? Or block the execution of this method until there is a place in the queue?

To solve this problem, the ActionBlock authors decided to use the following generally accepted approach:

  1. The client can set the queue size when creating an ActionBlock object.
  2. If the queue is full, the Post method returns false , and the SendAsync extension method returns the task to be completed when free space appears in the queue.

In our previous example, we did not specify the queue size. This means that if new elements are added faster than they are processed, the application will sooner or later fall with an OutOfMemoryException . But let's try to fix this situation. And set the queue is very small, for example, in 1 element.

 actionBlock = new ActionBlock<string>(processFile, new ExecutionDataflowBlockOptions() {BoundedCapacity = 1}); 

Now, if we run this code, we get ... deadlock!

image

Deadlock


Let's think about the problem of the consumer-supplier in terms of design. We write our own queue, which accepts a callback method for handling elements. We need to decide whether it should support limiting the number of elements or not. If we need a “bounded” queue, then we will probably come up with a design that is very similar to the ActionBlock class design : we will add a synchronous method for adding elements, which will return false if the queue is full, and an asynchronous method that will return the task. In the case of a full queue, the client of our class will have the opportunity to decide what to do: handle the “overflow” by calling the synchronous version of adding items or “waiting” (await) for free space in the queue using the asynchronous version.

Then you will need to decide when to call the callback method. As a result, you can come to the following logic: if the queue is not empty, then the first element is taken, the callback method is called, processing is expected to complete, after which the element is removed from the queue. (The actual implementation will be significantly more difficult than it seems, simply because it must take into account all sorts of races). The queue may decide to remove the item before calling the callback method, but, as we will see shortly, this will not affect the ability to receive deadlock.

We have come to a simple and elegant design, but it can easily lead to a problem. Suppose the queue is full and a callback call is in progress to process one of the items. But what if, instead of quickly “returning” the management queue, the handler tries to add another element by calling await SendAsync :

image

The queue is full and cannot accept new items, because the callback method is not yet complete. But this method is also stuck waiting for await SendAsync to complete and can not move on until the place becomes free in the queue. Classic deadlock!

Ok, we get deadlock, because ActionBlock removes an item from the queue * after * completing the callback method. But let's consider an alternative scenario: what happens if the ActionBlock deletes the element * before * calling the callback method? In fact, nothing will change. Deadlock will still be possible.

Imagine that the queue size is one, and the degree of parallelism is two.



image

It turns out that removing an item from the queue before processing does not help. Moreover, this will only exacerbate the problem, since the likelihood of the de-block is significantly reduced (it is necessary that with a degree of concurrency equal to N, all N callback methods try to add new items to the queue at the same time).

Another disadvantage is less obvious. ActionBlock is not a general-purpose solution. This class implements the ITargetSource interface and can be used to process elements in complex dataflow scenarios. For example, we can have a BufferBlock with several “target” blocks for parallel processing of elements. In the current implementation, balancing handlers is implemented in a trivial way. As soon as the receiver (in our case ActionBlock ) is full, it stops accepting new elements as input. This allows other blocks in the chain to process the element instead.

If an element is deleted only after it has been processed, the ActionBlock will become more greedy and will accept more elements than it can handle at the moment. In this case, the size (bounded capacity) of each block will be equal to 'BoundedCapaciy' + 'MaxDegreeOfParallelism'.

How to solve the problem with deadlock?


I'm afraid that way. If at the same time you need to limit the number of elements in the queue and the callback method can add new elements, then ActionBlock will have to be abandoned. An alternative would be a solution based on the BlockingCollection and “manual” control of the number of workflows, for example, using a task pool or Parallel.Invoke.

Degree of Parallelism


Unlike primitives from TPL, all blocks from TPL Dataflow, by default, are single-threaded. Those. ActionBlock , TransformerBlock, and others call the callback method one at a time. The authors of TPL Dataflow decided that simplicity is more important than possible performance gains. Thinking about dataflow graphs is generally quite difficult, and parallel data processing by all blocks will make this process even harder.

To change the degree of parallelism, the block needs to pass ExecutionDataflowBlockOptions and set the MaxDegreeOfParallelism property to a value greater than 1. By the way, if this property is set to -1, then all incoming elements will be processed by the new task and parallelism will be limited only by the capabilities of the used task scheduler ( TaskScheduler object), which can also be passed via ExecutionDataflowBlockOptions .

Conclusion


Designing easy-to-use components is challenging. Designing easy-to-use components that solve concurrency issues is doubly more complicated. To use these components correctly, you need to know how they are implemented and what restrictions their developers have in mind.

The class ActionBlock <T> is a great thing that greatly simplifies the implementation of the supplier-consumer pattern. But even in this case, you should be aware of some aspects of TPL Dataflow, like the degree of parallelism and the behavior of blocks in the event of overflow.

- (*) This example is not thread-safe and a full implementation should not use actionBlock.InputCount . Do you see a problem?

(**) The Post method returns false in one of two cases: the queue is full or already completed (the Complete method is called). This aspect may make it difficult to use this method, since it is impossible to distinguish between these two cases. The SendAsync method, on the other hand, behaves somewhat differently: the method returns a Task <bool> object, which will be in an incomplete state while the queue is full, and if the queue is already completed and not able to accept new items, the task.Result will be false .

Source: https://habr.com/ru/post/319208/


All Articles