Suggested Solutions in .NET 4
This is the second part of an article devoted to parallelizing ideal loops. In the
first part , the problems arising from this and the general approaches to their solution were considered. In this we will talk about the specific library components provided by .NET 4.0 to support these tasks.
The following options are provided for parallelization of “ideal” cycles:
- System.Threading.Tasks.Parallel class with methods For (), ForEach ()
- Parallel LINQ with AsParallel () extension method.
')
Parallel.For and Parallel.ForEach methods
We start by considering the Parallel class and its methods of looping.
The first method we are considering has a signature (this is the base one, one of many overloads):
public static ParallelLoopResult For ( int fromInclusive, int toExclusive, Action < int > body ) ;
It is in many respects similar to the signatures of our experimental methods from the
first part of the article , except for the type of the return value.
The second “in the basic configuration” looks like:
- public static ParallelLoopResult ForEach < TSource > ( IEnumerable < TSource > source, Action < TSource > body ) ;
Let's consider what Parallel.For and Parallel.ForEach methods can do.
Exception Handling. When an exception occurs in one of the iterations, the new iterations no longer start. Already started, however, can finish their work, and after they are completed, all exceptions (initial and possible subsequent exceptions in the iterations that were finalized) are aggregated into one exception of type AggregateException and it is thrown away.
Early cycle interruption. The corresponding overloads of the For method provide the ability to interrupt execution by calling the Stop or Break methods on the context object (passed to each iteration).
The difference between Stop and Break is that Stop signals the need to stop starting new iterations in general, and Break to stop starting new iterations following (in order) after the one in which the Break was caused. That is, when calling Break at the 5th iteration, it is guaranteed, despite parallelism, that iterations from the 1st to the 4th will still be executed. And when you call Stop at 5 iterations, if iteration 4 has not yet started, then it will not start.
The current iterations that have already been started at the time of the Stop or Break call can check the interrupt status of the cycle, and end prematurely if they learn about the interruption of the entire cycle. To do this, they can check the relevant properties of the context object: IsStopped and LowestBreakIteration.
Supports data storage at stream level . Some method overloads allow you to store intermediate results of calculations in a data warehouse that is local to each stream. This allows, for example, when aggregating the result, first to aggregate the results of the work of a separate stream, and only then to aggregate them among all the threads that worked. This eliminates unnecessary synchronization costs during the calculation process.
The ability to configure the level of parallelism . You can specify the maximum number of threads to use for execution.
Support call nesting . Due to the use of a thread pool, excessive multithreading does not occur both with nested calls to For and ForEach methods, as well as with their parallel execution.
Dynamic change in the number of threads . Parallel.For was designed on the basis of time-varying load, and given that some elements of the work may require more calculations than others. Therefore, the number of threads used may vary during the operation of the method.
Sophisticated thread loading management . The method was implemented logic load balancing threads, which takes into account a large number of different factors. Also, the block size (“portions of data” for processing) increases in the process of work, which allows to obtain better load balancing for a small number of iterations (it is assumed that more “heavy” in this case), and lower costs for synchronizing the “distribution of tasks” with a large number of iterations.
Support cancel the execution of a loop outside of it. To do this, use the CancellationTokenSource class. When starting a cycle, you must pass the Token property to it, and then to request the cancellation of the cycle from the outside, you simply need to call the Cancel () method on the CancellationTokenSource object, which will prevent the start of new iterations of the cycle, and upon completion of all current ones, it will throw an OperationCanceledException. The current iterations, by the way, can check the status of the cancellation in order to “voluntarily” end prematurely if they find out that the whole cycle has been canceled.
Parallel LINQ
Available back in .NET 3.5 as an extension, Parallel LINQ is available in .NET 4.0 right away in System.Core. The idea of ​​using it is very simple: we add a .AsParallel () call to the chain of LINQ queries, and all subsequent calls in the chain are executed in parallel. For example:
- var doubled = new [ ] { 1 , 2 , 3 , 4 } . AsParallel ( ) . Select ( i => i * 2 ) ;
It may not be obvious that the order of the elements in the resulting collection can be arbitrary, as they are processed in parallel. To preserve the order of the elements in the collection, we can modify our first fragment like this:
- var doubled = new [ ] { 1 , 2 , 3 , 4 } . AsParallel ( ) . AsOrdered ( ) . Select ( i => i * 2 ) ;
Processing will still take place in parallel, but the aggregation stage (assembling the results from each thread) can take longer, since you will have to wait for the first threads that count the “first” elements, take their result, and only then take the result from the threads that read the “last” items.
It is important to realize, when working with PLINQ, that there are additional overhead costs for
- distribution of incoming elements of the original enumeration by stream
- aggregation of calculated elements into the general collection.
Moreover, there is no way to get away from the distribution of work among the streams, but you can try to delay the aggregation, or even do without it, if the task is not to get a new collection, but simply to process the elements of the existing one.
The AsParallel () method returns not IEnumerable, but ParallelEnumerable, after which all other LINQ methods (Select, Where, etc.) also return the same type. As long as the ParallelEnumarable is passed in the chain, the results of the calculation of each of the threads are not necessarily aggregated, and a pipeline is built.
It is important to understand that when the pipeline breaks, concurrency stops and the data is aggregated into a single resulting collection. For example, this code does not work in the way that one might mistakenly assume:
- List < InputData > inputData = ... ;
- foreach ( var o in inputData. AsParallel ( ) . Select ( i => new OutputData ( i ) ) )
- {
- ProcessOutput ( o ) ;
- }
In parallel, only the creation of OutputData objects will be performed here, after which all threads will collect these objects and a resulting collection will be formed from them, which will consistently deal with the call to ProcessOutput () for each element.
To avoid unnecessary aggregation here, you can use the ParallelEnumarable.ForAll () method:
- List < InputData > inputData = ... ;
- inputData. AsParallel ( ) . Select ( i => new OutputData ( i ) ) . ForAll ( o =>
- {
- ProcessOutput ( o ) ;
- } ) ;
In this case, after the “new OutputData (i)” stage, the “ProcessOutput (o)” stage will also be executed in parallel, without the aggregation stage between them.
It should be noted that the Parallel.ForEach () call for “inputData.AsParallel () .Select (i => new OutputData (i))” will have the same drawback as the first example with the usual foreach: in Parallel.ForEach () IEnumerable is passed, not ParallelEnumerable - therefore, before passing the collection to Parallel.ForEach (), it will be aggregated. To avoid this, there is the ParallelEnumerable.ForAll () method, which should be used in this case.
Typical problems and errors
Consider possible problems when working with these components.
Native code thread safety
First of all, you need to understand that using Parallel.ForEach alone will not make your code thread-safe - you must make sure that the iterations are independent of each other, or, if they are dependent, you must clearly provide thread-safe work with shared resources.
Also, when working with Parallel.For, downward loops and loops with non-standard (not equal to one) increment of the counter are not directly supported. If your original algorithms are written in this way, you need to carefully analyze them, because often non-standard cycles are written just because of the existence of dependencies between iterations (referring to the previous array element calculated in the previous step, etc.).
Cycle body sizes
Using the Parallel class implies an overhead of at least
- to call a delegate to perform the body of the loop, and
- for synchronization between threads during the distribution of tasks.
If the body of the cycle is long enough, these additional costs play a small role. However, if we parallelize something very simple like “i = i * i”, the overhead in this case exceeds the useful work. To get rid of this disadvantage, it is necessary to “enlarge” the body of the cycle. It is very easy to do by including not one, but many iterations.
You can do this manually by explicitly breaking the input sequence into a set of blocks, and start a parallel loop on this set, and in the body of this parallel loop, each block is already sequentially processed. However, there will have to decide on the number of these blocks. You can assign it to the library and use the class specifically created to create subsets of the input sequence: System.Concurrent.Collections.Partitioner.
Using it loop
- for ( int i = 0 ; i < length ; i ++ )
- result [ i ] = i * i ;
instead of the naive version:
- Parallel. For ( from, to, i =>
- {
- result [ i ] = i * i ;
- } ) ;
you can effectively parallelize like this:
- Parallel. ForEach ( Partitioner. Create ( from, to ) , range =>
- {
- for ( int i = range. Item1 ; i < range. Item2 ; i ++ )
- {
- result [ i ] = i * i ;
- }
- } ) ;
Partitioner.Create (from, to) creates the same set of blocks, in which we pass in parallel, inside the loop body, sequentially processing each block. Thus, we provide a parallel body with a long body, and distribute overhead costs for more useful work.
Nested loop processing
When processing nested loops, the question arises, how deep is the parallelization. Take an example of processing a rectangular image:
- for ( int y = 0 ; y < screenHeight ; y ++ )
- {
- int stride = y * screenWidth ;
- for ( int x = 0 ; x < screenWidth ; x ++ )
- {
- rgb [ x + stride ] = calcColor ( x, y ) ; // calculate color
- }
- } ;
You can replace only the outer loop with a call to Paralllel.For, or both. If you make both cycles parallel, won't the body be too small? If we keep the internal one consistent, do we precisely load all our cores with work?
The answer to these questions can only give performance testing. Probably, if the color calculations are not too complicated, it is worthwhile to leave the inner loop consistent. But if we work with a wide but low image, it is quite possible that parallelization of the outer loop alone will not load all the processor cores.
An alternative way is to expand two nested loops into one, and parallelize it already:
- int totalPixels = screenHeight * screenWidth ;
- Parallel. For ( 0 , totalPixels, i =>
- {
- int y = i / screenWidth, x = i % screenWidth ;
- rgb [ i ] = calcColor ( x, y ) ;
- } ) ;
If calcColor () is too simple, then in order to enlarge the body of the cycle, you can use the Partitioner class, as in the previous example:
- int totalPixels = screenHeight * screenWidth ;
- Parallel. ForEach ( Partitioner. Create ( 0 , totalPixels ) , range =>
- {
- for ( int i = range. Item1 ; i < range. Item2 ; i ++ )
- {
- int y = i / screenWidth, x = i % screenWidth ;
- rgb [ i ] = calcColor ( x, y ) ;
- }
- } ) ;
IList non-safe implementations
Both mechanisms — Parallel.ForEach and PLINQ — accept IEnumerable as input, but try to find the fastest interface for working with it for the collection given to them. In particular, the IList interface is better suited for parallelization than IEnumerable, since it has an indexer for random access to any element. Therefore, if an IList is defined for the transferred collection, then work with it occurs through this interface. This reduces synchronization costs,
but the library code relies on the thread-safe implementation of the indexer .
If the used collection does not provide a thread-safe indexer, which can often be the case if the elements are stored in a complex form for indexing, or are loaded in general lazily, then you need to explicitly tell the system that it does not need to use IList, but restrict IEnumerable.
To do this, fit two options. The first is to create for the System.Collections.Concurrent.Partitioner collection:
- // IList <T> can be used here if it is supported by the collection
- IEnumerable < T > source = ... ;
- Parallel. ForEach ( source, item => { /*...*/ } ) ;
- // And here IEnumerable <T> is guaranteed to be used
- IEnumerable < T > source = ... ;
- Parallel. ForEach ( Partitioner. Create ( source ) , item => { /*...*/ } ) ;
The second method is obvious, and familiar in the usual LINQ: just call for the collection ".Select (item => item)". It is suitable for both Parallel.For and PLINQ:
- // IEnumerable <T> will also be used here
- source. Select ( i => i ) . AsParallel ( ) . Select ( i => { /*...*/ } ) ;
Presence of affinity for the thread (thread affinity) in the original collection
When Parallel.ForEach and PLINQ are working, each of the worker threads itself calls the MoveNext () from the original collection. If the collection is such that access to it is possible only from one specific stream (possesses "affinity to the stream"), as, for example, happens when working with UI components in Windows Forms or WPF, then you cannot use these mechanisms directly for it.
To provide parallel processing of such a collection, you need to use the Producer-Consumer template, the main “brick” for which in .NET 4.0 is the BlockingCollection class, which I will write about later on separately. You can read more about it in English in the “Producer-Consumer” section on page 53 of the original document.
PS This article is written under the impression of the book "
PATTERNS OF PARALLEL PROGRAMMING: UNDERSTANDING AND APPLYING PARALLEL PATTERNS WITH THE .NET FRAMEWORK 4 AND VISUAL C # " and can be considered its free translation with recycling.
Conclusion
I hope this excursion will help you make less of your own mistakes when paralleling your code. But still, successful debugging, and with the holiday :)
For the future
I can continue the topic of parallelism in the 4 framework, if there is interest. It can be about:
- Task class and infrastructure for parallel computing with dependencies
- new thread-safe collections,
- laziness and concurrency
- shared data and sync.
Write what is interesting to this extent :)