📜 ⬆️ ⬇️

Parallel IEnumerable processing in .NET

The proposed article discusses the solution of the parallel synchronous processing IEnumerable problem, as well as where the task came from.

As in many other cases, the solution presented began with very specific needs.

Preamble


In one of the internal projects there is a need to build a spreading report for 100+ slices on the data file, which takes more than 12 hours to read. The data volumes themselves are also rather big. The combination of continuous reading and huge amounts of data (~ 1.5M tuples, each weighing up to 50 MB) introduces two limitations to the code: the extreme undesirability of multiple readings (unfortunately, nobody is ready to spend 50 days on the weekly report) and the technical impossibility placing the entire sample into RAM. The development process that preceded me went, obviously, iteratively: the length of the method that aggregates the data for the slices was about 4000 lines. Before me was tasked to make this code turned into supported.

Prerequisites


The thread of Ariadne in the current situation for me was the realization that “licking” one and the same entity in turn (as well as the chupa chups) is at least ugly: even if I manage to refactor the code to the “slice-method” state, then I will get a sheet of more than a hundred consecutive calls hung with conditions. I also wanted to get away from this, since the current implementation not only rested against slow reading, but also added a slip on top of processing data in one stream. The server on which the process is executed has 8 cores and is able to parallelize the data processing to such a state that it will take less time than reading the next tuple from the database.
')

Idea


So, after weighing the situation, I formulated a straightforward idea: handle the “flow of objects” in parallel threads (threads), where each slice has its own neat method (or even lambda) that runs in a separate thread. Since we are not aware of the number of objects initially, the IEnumerable<T> interface was perfectly suited for representing the “flow of objects”. It only remains to make sure that each thread has its own IEnumerable<T> , and that it is the same IEnumerable<T> (just like in the tasks about coins and glasses ). It is obvious that the nickname "clones" immediately stuck to such objects.

It was necessary to ensure that on each enumerable.Next() streams, a single object instance was readable. Such a solution has a peculiarity (this is rather a minus, but not terrible): all threads will wait for the slowest brother, that is, the reading will be synchronous. To be honest, I really hoped that someone had already written it for me. For example, Microsoft did something similar in its Parallel.Linq. However, at that time it was not possible to google anything like that - everything related to parallel processing of collections was related to parallel processing of parts of one collection (for example, Parallel.For() ), but not at all to the problem being solved. Well, I adore bicycles, a file in hands!

Core solution


There was always an understanding in my head that an object was hidden behind any interface. Therefore, to clone a collection, you probably need to either find or invent an object with the desired interface. And here it is not necessary. The C # language has long and successfully been giving its adherents the operators yield return and yield break . Their application formed the basis of the decision.

I think it makes no sense to explain how each line of code was born. I will explain the idea. In order for your data to become large, a factory object is created with the GetClone() method, which returns a call to the method using the yield . So that the reading was synchronized and no one lost anything, the factory remembers its readers and makes it so that the next object is read from the source IEnumerable only after all readers receive their link to the copy of the previous object. This is achieved by the fact that each reader is assigned two WaitHandles - “I am ready to read” and “I have read”:

 private IEnumerable<T> _input; private IEnumerator<T> _inputEnumerator; private Dictionary<string, AutoResetEvent> _imReadyToRead; private Dictionary<string, AutoResetEvent> _iVeReadMyValue; private WaitHandle[] _ivAlreadyReadArray; private T _nextObject; private bool _hasCurrent; private bool _hasNext; ... private void GetNext() { if (!_hasCurrent) return; foreach (var ready in _imReadyToRead) ready.Value.Set(); do { WaitHandle.WaitAll(_ivAlreadyReadArray); //  ,   _hasNext = _inputEnumerator.MoveNext(); //      _nextObject = _inputEnumerator.Current; lock (_imReadyToRead) { if (!_hasNext) _hasCurrent = false; foreach (var ready in _imReadyToRead) ready.Value.Set(); //     .    } } while (_hasNext); } 


The reading itself is done from an IEnumerable , which ( GetCloneInstance ?) Is returned by the GetCloneInstance method.

 private T GetCurrent(string subscriber) { T toReturn; _imReadyToRead[subscriber].WaitOne(); //   - ! toReturn = _nextObject; _iVeReadMyValue[subscriber].Set(); return toReturn; } private IEnumerable<T> GetCloneInstance(string key) { T res; do { res = GetCurrent(key); yield return res; } while (true); } 


So it seemed, the problem of parallelization was solved. But during pre-flight preparation, one feature of the WaitAll() method WaitAll() : it supports the operation with no more than 64 instances at a time. But I need 100+ readers! How to be? Yes, in general, simple. And I began to clone clones. Out of every 64 “honest” clones, I choose a victim, which I will clone in the future. For a large number of readers I can have clones in 2, 3, 4, etc. generation! As the tests showed, quite viable creatures. It looks like this:

 private int _maxCloneOfOne = 64; //    private IEnumerable<T> _input; //    private Stack<ICloner<T>> cloners; // -,    64  private Dictionary<Guid, IEnumerable<T>> clones; //  private Stack<IEnumerable<T>> missMe; // ,     - ,    public IEnumerable<T> GetClone() { if (cloners.Count == 0) cloners.Push(new Cloner<T>(_input)); //    var isLast = clones.Count > 0 && clones.Count % (_maxCloneOfOne - 1) == 0; //    -      ICloner<T> cloner; var g = Guid.NewGuid(); IEnumerable<T> result; if (!isLast) { cloner = cloners.Peek(); //     -   } else { //    -    var lastCloneForCloner = cloners.Peek().GetClone(); missMe.Push(lastCloneForCloner); cloners.Push(cloner = new Cloner<T>(lastCloneForCloner)); g = Guid.NewGuid(); } result = cloner.GetClone(); clones.Add(g, result); return result; } 


Now everything is exactly. Added initialization code, checks for any boundary situations, wrapped diligently in thy-catch. Works.


If to anyone my decision seems necessary or at least interesting, welcome to the page on google.code . Those who want to participate in improving and screwing in new features will receive the keys to all the doors.

Source: https://habr.com/ru/post/165449/


All Articles