Introduction
Usually, when optimizing a program for multi-core computers, the first step is to ascertain the possibility of dividing the algorithm into parts running in parallel. If, to solve the problem, it is necessary to parallelly process individual elements from a large data set, then the first candidates will be new parallelism possibilities in the .NET Framework 4:
Parallel.ForEach and Parallel LINQ (
PLINQ )
Parallel.ForEach
The Parallel class contains the
ForEach method, which is a multi-threaded version of the usual foreach loop in C #. Like regular foreach, Parallel.ForEach iterates over enumerable data, but using multiple threads. One of the more frequently used
Parallel.ForEach overloads is as follows:
public static ParallelLoopResult ForEach<TSource>( IEnumerable<TSource> source, Action<TSource> body)
Ienumerable indicates the sequence in which to iterate, and the Action body specifies the delegate that is called for each element. A full list of Parallel.ForEach overloads can be found
here .
PLINQ
Related to Parallel.ForEach
PLINQ is a programming model for parallel data operations. The user defines an operation from a standard set of operators, including projections, filters, aggregation, etc. Like Parallel.ForEach
PLINQ achieves parallelism by breaking the input sequence into parts and processing elements in different streams.
The article highlights the differences between these two approaches to parallelism. Understand the use cases in which Parallel.ForEach is best used instead of PLINQ and vice versa.
')
Perform independent operations
If you need to perform lengthy calculations on the elements of the sequence and the results are independent, then it is preferable to use Parallel.ForEach. PLinq in turn will be too heavy for such operations. In addition, the maximum number of streams is specified for
Parallel.ForEach , that is, if
ThreadPool has few resources and there are fewer streams available than specified in
ParallelOptions.MaxDegreeOfParallelism , the optimal number of streams will be used, which can be increased as it runs. For
PLINQ, the number of threads executed is strictly specified.
Parallel operations with data ordering
PLINQ to keep order
If your transformations require you to preserve the order of the input data, then you will most likely find that it is simpler to use
PLINQ than
Parallel.ForEach . For example, if we want to convert color RGB-frames of a video into black and white, on the output, the frame order, naturally, should be preserved. In this case, it is better to use
PLINQ and the
AsOrdered () function, which plinq splits the input sequence, performs transformations, and then puts the result in the correct order.
public static void GrayscaleTransformation(IEnumerable<Frame> Movie) { var ProcessedMovie = Movie .AsParallel() .AsOrdered() .Select(frame => ConvertToGrayscale(frame)); foreach (var grayscaleFrame in ProcessedMovie) {
Why not use Parallel.ForEach here?
With the exception of trivial cases, the implementation of parallel operations on serial data using Parallel.ForEach requires a significant amount of code. In our case, we can use the overload of the Foreach function to repeat the effect of the AsOrdered () operator:
public static ParallelLoopResult ForEach<TSource >( IEnumerable<TSource> source, Action<TSource, ParallelLoopState,Int64>body)
In the overloaded version of Foreach, the index parameter of the current element was added to the delegate of action on data. Now we can write the result to the output collection using the same index, perform costly computations in parallel, and finally get the output sequence in the correct order. The following example illustrates one way to preserve order using Parallel.ForEach :
public static double [] PairwiseMultiply( double[] v1, double[] v2) { var length = Math.Min(v1.Length, v2.Lenth); double[] result = new double[length]; Parallel.ForEach(v1, (element, loopstate, elementIndex) => result[elementIndex] = element * v2[elementIndex]); return result; }
However, the shortcomings of this approach are immediately detected. If the input sequence is an IEnumerable type, not an array, then there are 4 ways to implement order preservation:
- The first option is to call IEnumerable.Count (), which will cost O (n). If the number of elements is known, you can create an output array to store the results at a given index
- The second option is to materialize the collection, (turning it, for example, into an array). If there is a lot of data, then this method is not very suitable.
- The third option is to think carefully about the output collection. The output collection can be a hash, then the amount of memory needed for storing the output value will be at least 2 times the input memory in order to avoid collisions during hashing; if there is a lot of data, the data structure for the hash will be prohibitively large, besides, you can get a performance drop due to false sharing and the garbage collector.
- And the last option is to save the results with their original indexes, and then apply your own algorithm for sorting the output collection.
In PLINQ, the user simply requests order conservation, and the query engine manages all the routine details to ensure the correct order of results. The PLINQ infrastructure allows AsOrdered () to handle streaming data, in other words, PLINQ supports lazy materialization. In PLINQ, the materialization of the entire sequence is the worst solution, you can easily avoid the above problems and perform parallel operations on data simply by using the AsOrdered () operator.
Parallel streaming processing
Using PLINQ for stream processing
PLINQ offers the ability to process a query as a query over a stream. This feature is extremely valuable for the following reasons:
- 1. The results do not materialize in the array, so there is no redundancy in storing data in memory.
- 2. You can get (enumerate) the results in a single stream of calculations as you receive new data.
Continuing with an example of analyzing securities, let us imagine that you want to calculate the risk of each paper from a portfolio of papers, issuing only papers that meet the criterion of risk analysis, and then perform some calculations on the filtered results. In PLINQ, the code will look something like this:
public static void AnalyzeStocks(IEnumerable<Stock> Stocks) { var StockRiskPortfolio = Stocks .AsParallel() .AsOrdered() .Select(stock => new { Stock = stock, Risk = ComputeRisk(stock)}) .Where(stockRisk => ExpensiveRiskAnalysis(stockRisk.Risk)); foreach (var stockRisk in StockRiskPortfolio) { SomeStockComputation(stockRisk.Risk);
In this example, the elements are divided into parts ( partitions ), processed by several threads, then reordered; This is important for understanding that these steps are performed in parallel, as filtering results appear, a single-threaded consumer in a foreach loop can perform calculations. PLINQ is optimized for performance, not latency, and uses buffers internally; it may happen that although a partial result has already been obtained, it will be in the output buffer as long as the output buffer is completely saturated and does not allow further processing. The situation can be corrected using the PLINQ WithMergeOptions extension method, which allows you to specify output buffering. The WithMergeOptions method accepts the ParallelMergeOptions enumeration as a parameter; you can specify how the query returns the final result that will be used by a single stream. The following options are offered:
- ParallelMergeOptions.NotBuffered - indicates that each processed element is returned from each stream as soon as it is processed
- ParallelMergeOptions.AutoBuffered - indicates that the elements are going to the buffer, the buffer is periodically returned to the thread-consumer
- ParallelMergeOptions.FullyBuffered - indicates that the output sequence is completely buffered, it allows you to get results faster than using other options, but then the consumer will have to wait for a long time to receive the first element for processing.
An example of using WithMergeOptions is available on MSDN.
Why not Parallel.ForEach?
Let's throw aside the shortcomings of Parallel.ForEach to preserve the order of the sequence. For unordered calculations over a stream using Parallel.ForEach, the code will look like this:
public static void AnalyzeStocks(IEnumerable<Stock> Stocks) { Parallel.ForEach(Stocks, stock => { var risk = ComputeRisk(stock); if(ExpensiveRiskAnalysis(risk) {
This code is almost identical to the PLINQ example, with the exception of explicit blocking and less elegant code. Note that in this situation, Parallel.ForeEach implies saving the results in a stream-safe style, while PLINQ does it for you.
To save the results, we have 3 ways: the first is to save the values in the thread-unsafe collection and require blocking with each record. The second is to save to a thread-safe collection, since the .NET Framework 4 provides a set of such collections in the System.Collections.Concurrent namespace and you don’t have to implement it yourself. The third way is to use Parallel.ForEach with a thread-local storage, which will be discussed later. Each of these methods requires the explicit management of third-party write effects to the collection, while PLINQ allows us to abstract away from these operations.
Operations on two collections
Using PLINQ for operations on two collections
The PLINQ ZIP operator performs parallel computations on two different collections in a special way. Since it can be combined with other requests, you can simultaneously perform complex operations on each collection before combining the two collections. For example:
public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b) { return a .AsParallel() .AsOrdered() .Select(element => ExpensiveComputation(element)) .Zip( b .AsParallel() .AsOrdered() .Select(element => DifferentExpensiveComputation(element)), (a_element, b_element) => Combine(a_element,b_element)); }
The example above demonstrates how each data source is processed in parallel by different operations, then the results from both sources are combined by the Zip operator.
Why not Parallel.ForEach?
A similar operation can be performed with a Parallel.ForEach overload using indexes, for example:
public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b) { var numElements = Math.Min(a.Count(), b.Count()); var result = new T[numElements]; Parallel.ForEach(a, (element, loopstate, index) => { var a_element = ExpensiveComputation(element); var b_element = DifferentExpensiveComputation(b.ElementAt(index)); result[index] = Combine(a_element, b_element); }); return result; }
However, there are potential pitfalls and flaws, described in the application of Parallel.ForEach while preserving the data order, one of the drawbacks includes viewing the entire collection to the end and explicitly managing the indexes.
Local Thread State ( Thread-Local State )
Using Parallel.ForEach to access local stream state
Although PLINQ provides more concise means for parallel operations on data, some processing scenarios are better suited for using Parallel.ForEach , for example, operations that support the local stream state. The signature of the corresponding Parallel.ForEach method looks like this:
public static ParallelLoopResult ForEach<TSource,TLocal>( IEnumerable<TSource> source, Func<TLocal> localInit, Func<TSource, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally)
It should be noted that there is an overload of the Aggregate operator, which allows access to the local stream state and can be used if the data processing template can be described as a reduction in dimension. The following example illustrates how to exclude numbers that are not simple from the sequence:
public static List<R> Filtering<T,R>(IEnumerable<T> source) { var results = new List<R>(); using (SemaphoreSlim sem = new SemaphoreSlim(1)) { Parallel.ForEach(source, () => new List<R>(), (element, loopstate, localStorage) => { bool filter = filterFunction(element); if (filter) localStorage.Add(element); return localStorage; }, (finalStorage) => { lock(myLock) { results.AddRange(finalStorage) }; }); } return results; }
This functionality could be achieved much more easily with PLINQ, the purpose of the example is to show that using Parallel.ForEach and the local state of a stream can greatly reduce the cost of synchronization. However, in other scenarios, local stream states become absolutely necessary, the following example demonstrates this scenario.
Imagine that you, as a brilliant computer scientist and mathematician, have developed a statistical model for analyzing the risks of securities; This model, in your opinion, will break all other risk models to pieces. In order to prove this, you need data from sites with information on stock markets. But the data loading sequence will be very long and is a bottleneck for an eight-core computer. Although using Parallel.ForEach is an easy way to parallelly load data using a WebClient , each stream will be blocked at each download, which can be improved using asynchronous I / O; more information is available here . For performance reasons, you decided to use Parallel.ForEach to iterate through the collection of URLs and upload data in parallel. The code looks like this:
public static void UnsafeDownloadUrls () { WebClient webclient = new WebClient(); Parallel.ForEach(urls, (url,loopstate,index) => { webclient.DownloadFile(url, filenames[index] + ".dat"); Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url); }); }
Surprisingly, we will get an exception at runtime: “System.NotSupportedException -> WebClient does not support concurrent I / O operations.” Realizing that multiple threads cannot access one WebClient at the same time, you decide to create a WebClient for each download.
public static void BAD_DownloadUrls () { Parallel.ForEach(urls, (url,loopstate,index) => { WebClient webclient = new WebClient(); webclient.DownloadFile(url, filenames[index] + ".dat"); Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url); }); }
This code allows the program to create more than a hundred web clients, the program will throw an exception about the timeout in WebClient. You will understand that the computer is not running a server operating system, so the maximum number of connections is limited. Then you can guess that using Parallel.ForEach with the local state of the stream will solve the problem:
public static void downloadUrlsSafe() { Parallel.ForEach(urls, () => new WebClient(), (url, loopstate, index, webclient) => { webclient.DownloadFile(url, filenames[index]+".dat"); Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url); return webclient; }, (webclient) => { }); } }
In this implementation, each data access operation is independent of the other. At the same time, the access point is neither independent nor thread-safe. Using the local storage of the stream allows us to be sure that the number of created WebClient instances is as long as required, and each instance of the WebClient belongs to the stream that created it.
What is bad PLINQ here?
If we implement the previous example using ThreadLocal and PLINQ objects, the code will be as follows:
public static void downloadUrl() { var webclient = new ThreadLocal<WebClient>(()=> new WebClient ()); var res = urls .AsParallel() .ForAll( url => { webclient.Value.DownloadFile(url, host[url] +".dat")); Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url); }); }
While the implementation achieves the same goals, it is important to understand that in any scenario, using ThreadLocal <> is significantly more expensive than the corresponding Parallel.ForEach overload. Note that in this scenario, the cost of instantiating ThreadLocal <> is insignificant compared to the time it takes to download a file from the Internet.
Exit operations
Using Parallel.ForEach to quit operations
In a situation where control over the execution of operations is essential, it is important to understand that exiting the Parallel.ForEach cycle allows achieving the same effect as checking the condition for the need to continue calculations inside the loop body. One of the Parallel.ForEach overloads that allow you to monitor ParallelLoopState looks like this:
public static ParallelLoopResult ForEach<TSource >( IEnumerable<TSource> source, Action<TSource, ParallelLoopState> body)
ParallelLoopState provides support for interrupting loop execution with two different methods, described below.
ParallelLoopState.Stop ()
Stop () informs the loop about the need to stop executing iterations; The ParallelLoopState.IsStopped property allows each iteration to determine if any other iteration caused the Stop () method. The Stop () method is usually useful if the loop performs an unordered search and should be exited as soon as the search element is found. For example, if we want to find out if an object is in the collection, the code would be something like this:
public static boolean FindAny<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T> { var matchFound = false; Parallel.ForEach(TSpace, (curValue, loopstate) => { if (curValue.Equals(match) ) { matchFound = true; loopstate.Stop(); } }); return matchFound; }
Functionality can also be achieved using PLINQ, this example demonstrates how to use ParallelLoopState.Stop () to control the flow of execution.
ParallelLoopState.Break ()
Break () informs the loop that the elements preceding the current element should be processed, but for subsequent elements, the iteration should be stopped. The value of the lower iteration can be obtained from the ParallelLoopState.LowestBreakIteration property. Break () is usually useful if you are searching through ordered data. In other words, there is a certain criterion of the need for data processing. For example, for a sequence containing non-unique elements in which you need to find the subscript of the matching object, the code would look like this:
public static int FindLowestIndex<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T> { var loopResult = Parallel.ForEach(source, (curValue, loopState, curIndex) => { if (curValue.Equals(match)) { loopState.Break(); } }); var matchedIndex = loopResult.LowestBreakIteration; return matchedIndex.HasValue ? matchedIndex : -1; }
In this example, the loop is executed until an object is found, the Break () signal means that only elements with a smaller index than the object found should be processed; if another matching instance is found, the Break () signal will be received again, this is repeated as long as there are elements, if the object was found, the LowestBreakIteration field points to the first index of the matching object.
Why not PLINQ?
Although PLINQ provides support for quitting the query, the differences in PLINQ and Parallel.ForEach exit mechanisms are significant. PLINQ, (cancellation token), . C Parallel.ForEach . PLINQ , .
Conclusion
Parallel.ForEach PLINQ — . , .
Useful links:
Threading in C#
RSDN: C#.
Microsoft Samples for Parallel Programming with the .NET Framework