I think that almost every real project uses one or another form of implementation of the supplier
-consumer queue . The idea behind the problem is quite simple. The application needs to unleash the production of some data from their processing. Take, for example, the thread pool in the CLR: we add an item for processing by calling
ThreadPool.QueueUserWorkItem , and the thread pool itself decides what number of worker threads are most optimal and calls methods to process items with the right degree of parallelism.
But using a standard thread pool is not always possible and / or reasonable. Although it is possible to specify the minimum and maximum number of threads, this configuration is global and will affect the application as a whole, rather than the parts it needs. There are many other ways to solve the problem of the supplier of the consumer. It can be a head-on solution when the application logic is mixed with aspects of multithreading, queues, and synchronization. It can be a wrapper over the
BlockingCollection with manual control of the number of workflows or tasks. Or it could be a solution based on a completely turnkey solution, such as an
ActionBlock <T> from a TPL DataFlow.
Today we will look at the internal device of the
ActionBlock class, discuss the design decisions that were made by its authors and find out why we need to know all this in order to get around some of the problems when using it. Ready? Well then, let's go!
On my current project we have a number of cases when we need to solve the problem of the supplier-consumer. One of them looks like this: we have a custom parser and interpreter for a language very similar to TypeScript. Without going deep into details, we can say that we need to parse a set of files and get the so-called "transitive closure" of all dependencies. Then they need to be converted into a performance suitable for execution and execute.
')
The parsing logic looks like this:
- Parsim file.
- We analyze its contents and look for its dependencies (by analyzing all the 'import * from', 'require' and similar constructions).
- We calculate the dependencies (i.e., we find the set of files that are required by the current file for normal operation).
- Add the resulting file dependencies to the list for parsing.
Pretty simple, isn't it? And there is. Here is how a slightly simplified implementation based on TPL Dataflow and the
ActionBlock <T> class will look like:
private static Task<ParsedFile> ParseFileAsync(string path) { Console.WriteLine($"Parsing '{path}'. {{0}}", $"Thread Id - {Thread.CurrentThread.ManagedThreadId}"); Thread.Sleep(10); return Task.FromResult( new ParsedFile() { FileName = path, Dependencies = GetFileDependencies(path), }); } static void Main(string[] args) { long numberOfProcessedFiles = 0; ActionBlock<string> actionBlock = null; Func<string, Task> processFile = async path => { Interlocked.Increment(ref numberOfProcessedFiles); ParsedFile parsedFile = await ParseFileAsync(path); foreach (var dependency in parsedFile.Dependencies) { Console.WriteLine($"Sending '{dependency}' to the queue... {{0}}", $"Thread Id - {Thread.CurrentThread.ManagedThreadId}"); await actionBlock.SendAsync(dependency); } if (actionBlock.InputCount == 0) {
Let's see what happens here. For simplicity, all the main logic is in the
Main method. The variable
numberOfProcessedFiles is used to validate the logic and contains the total number of processed files. The main work is done in the
processFile delegate, which is then passed to the
ActionBlock constructor. This delegate simultaneously plays the role of "consumer" and "supplier": it takes the path to the file through the
path argument, parses the file, finds its dependencies, and sends new files to the queue by calling the
actionBlock.SendAsync method. Then there is a check of the number of elements in the processing queue, and if there are no new elements, the whole operation is completed by calling
actionBlock.Complete () (*). Then, the
Main method creates an instance of
ActionBlock , starts processing the first file, and waits until the end of the entire process.
The
ParseFileAsync method emulates the file parsing process and calculates dependencies using the following primitive logic: the file 'foo.ts' depends on 'fo.ts', which depends on 'f.ts'. Those. each file depends on a file with a shorter name. This is unrealistic logic, but it allows you to show the basic idea of ​​calculating the transitive closure of files.
The
ActionBlock class manages concurrency for you. The truth is that it should be taken into account that the default “degree of parallelism” (degree of parallelism) is 1 and to change this, you must pass an instance of the class
ExecutionDataflowBlockOptions in the
ActionBlock constructor. If the
MaxDegreeOfParallelism property is greater than 1, then the
ActionBlock will call a callback delegate from different threads (in fact, from different tasks) for parallel processing of queue elements.
Post vs. SendAsync: what and when to use
Everyone who at least once tried to independently solve the problem of the supplier-consumer faced the problem: what to do when the flow of input data exceeds the capabilities of consumers in processing? How do you throttle the input stream? Just keep all input elements in memory? Generate an exception? Return
false in the add item method? Use a circular buffer and discard old items? Or block the execution of this method until there is a place in the queue?
To solve this problem, the
ActionBlock authors decided to use the following generally accepted approach:
- The client can set the queue size when creating an ActionBlock object.
- If the queue is full, the Post method returns false , and the SendAsync extension method returns the task to be completed when free space appears in the queue.
In our previous example, we did not specify the queue size. This means that if new elements are added faster than they are processed, the application will sooner or later fall with an
OutOfMemoryException . But let's try to fix this situation. And set the queue is very small, for example, in 1 element.
actionBlock = new ActionBlock<string>(processFile, new ExecutionDataflowBlockOptions() {BoundedCapacity = 1});
Now, if we run this code, we get ... deadlock!
Deadlock
Let's think about the problem of the consumer-supplier in terms of design. We write our own queue, which accepts a callback method for handling elements. We need to decide whether it should support limiting the number of elements or not. If we need a “bounded” queue, then we will probably come up with a design that is very similar to the
ActionBlock class
design : we will add a synchronous method for adding elements, which will return
false if the queue is full, and an asynchronous method that will return the task. In the case of a full queue, the client of our class will have the opportunity to decide what to do: handle the “overflow” by calling the synchronous version of adding items or “waiting” (await) for free space in the queue using the asynchronous version.
Then you will need to decide when to call the callback method. As a result, you can come to the following logic: if the queue is not empty, then the first element is taken, the callback method is called, processing is expected to complete, after which the element is removed from the queue. (The actual implementation will be significantly more difficult than it seems, simply because it must take into account all sorts of races). The queue may decide to remove the item before calling the callback method, but, as we will see shortly, this will not affect the ability to receive deadlock.
We have come to a simple and elegant design, but it can easily lead to a problem. Suppose the queue is full and a callback call is in progress to process one of the items. But what if, instead of quickly “returning” the management queue, the handler tries to add another element by calling
await SendAsync :
The queue is full and cannot accept new items, because the callback method is not yet complete. But this method is also stuck waiting for
await SendAsync to complete and can not move on until the place
becomes free in the queue. Classic deadlock!
Ok, we get deadlock, because
ActionBlock removes an item from the queue * after * completing the callback method. But let's consider an alternative scenario: what happens if the
ActionBlock deletes the element * before * calling the callback method? In fact, nothing will change. Deadlock will still be possible.
Imagine that the queue size is one, and the degree of parallelism is two.
- The T1 thread adds an item to the queue. ActionBlock takes an item from the queue (reducing the number of items in the queue to 0) and calls the callback method.
- The T2 thread adds an item to the queue. ActionBlock takes an item from the queue (reducing the number of items in the queue to 0) and calls the callback method.
- The T1 thread adds an item to the queue. ActionBlock cannot call the handler of a new element, since the level of parallelism is 2, and we already have two handlers. The queue is full.
- The first handler during processing tries to add a new item in the queue, but sticks on the call ' await SendAsync ', because the queue is full.
- The second handler tries to add a new item in the queue during processing, but it sticks on the ' await SendAsync ' call because the queue is full.

It turns out that removing an item from the queue before processing does not help. Moreover, this will only exacerbate the problem, since the likelihood of the de-block is significantly reduced (it is necessary that with a degree of concurrency equal to N, all N callback methods try to add new items to the queue at the same time).
Another disadvantage is less obvious.
ActionBlock is not a general-purpose solution. This class implements the
ITargetSource interface and can be used to process elements in complex dataflow scenarios. For example, we can have a
BufferBlock with several “target” blocks for parallel processing of elements. In the current implementation, balancing handlers is implemented in a trivial way. As soon as the receiver (in our case
ActionBlock ) is full, it stops accepting new elements as input. This allows other blocks in the chain to process the element instead.
If an element is deleted only after it has been processed, the ActionBlock will become more greedy and will accept more elements than it can handle at the moment. In this case, the size (bounded capacity) of each block will be equal to 'BoundedCapaciy' + 'MaxDegreeOfParallelism'.
How to solve the problem with deadlock?
I'm afraid that way. If at the same time you need to limit the number of elements in the queue and the callback method can add new elements, then
ActionBlock will have to be abandoned. An alternative would be a solution based on the
BlockingCollection and “manual” control of the number of workflows, for example, using a task pool or Parallel.Invoke.
Degree of Parallelism
Unlike primitives from TPL, all blocks from TPL Dataflow, by default, are single-threaded. Those.
ActionBlock ,
TransformerBlock, and others call the callback method one at a time. The authors of TPL Dataflow decided that simplicity is more important than possible performance gains. Thinking about dataflow graphs is generally quite difficult, and parallel data processing by all blocks will make this process even harder.
To change the degree of parallelism, the block needs to pass
ExecutionDataflowBlockOptions and set the
MaxDegreeOfParallelism property
to a value greater than 1. By the way, if this property is set to -1, then all incoming elements will be processed by the new task and parallelism will be limited only by the capabilities of the used task scheduler (
TaskScheduler object), which can also be passed via
ExecutionDataflowBlockOptions .
Conclusion
Designing easy-to-use components is challenging. Designing easy-to-use components that solve concurrency issues is doubly more complicated. To use these components correctly, you need to know how they are implemented and what restrictions their developers have in mind.
The class
ActionBlock <T> is a great thing that greatly simplifies the implementation of the supplier-consumer pattern. But even in this case, you should be aware of some aspects of TPL Dataflow, like the degree of parallelism and the behavior of blocks in the event of overflow.
- (*) This example is not thread-safe and a full implementation should not use
actionBlock.InputCount . Do you see a problem?
(**) The
Post method returns
false in one of two cases: the queue is full or already completed (the
Complete method is called). This aspect may make it difficult to use this method, since it is impossible to distinguish between these two cases. The
SendAsync method, on the other hand, behaves somewhat differently: the method returns a
Task <bool> object, which will be in an incomplete state while the queue is full, and if the queue is already completed and not able to accept new items, the
task.Result will be
false .