📜 ⬆️ ⬇️

Rayon: data parallelism in Rust

The last couple of weeks I've been working on updating Rayon - my experimental data parallelism library in Rust.

I am quite pleased with the way the development is going on, so I decided to explain to what I came to the blog post.
The goal of Rayon is to make adding parallelism to sequential code simple, so that any for loop or iterator could be made to work in multiple threads. For example, if you have such a chain of iterators:

 let total_price = stores.iter() .map(|store| store.compute_price(&list)) .sum() 

then you can make its work parallel simply by changing the usual “sequential iterator” to “parallel iterator” from Rayon:
')
 let total_price = stores.par_iter() .map(|store| store.compute_price(&list)) .sum() 


Of course, it is not enough to make concurrency simple, it must also be made safe. Rayon ensures that using its API will never lead to a data race.
This post explains how Rayon works. First, it tells about the main primitive of Rayon ( join ), and then how it is implemented.
I separately want to draw attention to how the combination of the Rust feature set makes it possible to realize join with very low overhead during the execution of the program, while still giving strict security guarantees. Then I will briefly describe how a join iterator abstraction is built on the basis of join .
However, I want to emphasize that Rayon is more in the process of development . I expect that the design of the parallel iterator will go through many more, let's say, iterations (pun intended), because the current implementation is not as flexible as I would like. In addition, there are several special cases that are handled incorrectly, in particular the spread of panic and cleaning up resources. Regardless, Rayon may be useful for certain tasks right now. I am very glad, and I hope you will be happy too!

Rayon main primitive: join


At the beginning of the post, I showed an example of using a parallel iterator for the map-reduce operation:

 let total_price = stores.par_iter() .map(|store| store.compute_price(&list)) .sum() 

However, in fact, parallel iterators are just a small library built on the basis of a more fundamental primitive: join . Using join is easy. You call two closures, as shown below, and join potentially runs them in parallel. As soon as both of them are completed, he will return the result:

 // `do_something`  `do_something_else` **   join(|| do_something(), || do_something_else()) 

The main point here is that the two closures can potentially run in parallel: the decision whether or not to use parallel threads is made dynamically, depending on whether there are free cores or not. The idea is that you can use join mark in your program the places where concurrency can be useful, and then let the library at run time decide whether to use it or not.
The potential parallelism approach is the basic idea that distinguishes Rayon from limited crossbeam flows . If you distribute work across two restricted threads in the crossbeam, it will always be executed in parallel in different threads. At the same time, the call to join in Rayon does not necessarily lead to parallel code execution. As a result, we have not only a simpler API, but also more efficient use of resources. All because it is very difficult to predict in advance when parallelization will be profitable. This always requires knowledge of some global context, for example: does the computer have free cores and what other parallel operations are being performed now? In fact, one of the main goals of this post is to promote potential parallelism as the basis for libraries for data parallelism in Rust, as opposed to guaranteed parallelism , which we saw earlier.
This is not to mention the fact that there is no separate role for guaranteed parallelism offered by crossbeam. The semantics of potential concurrency also imposes some limitations on what your parallelized closures can do. For example, if you try to use a channel for communication between two closures in a join , then this will most likely lead to a deadlock. It’s worthwhile to think about join as a hint to use parallelism in a typically sequential algorithm. Sometimes this is not what you want - some algorithms are initially parallel . (Note, however, that it is perfectly normal to use types like Mutex , AtomicU32 , etc. from within join - you just don't want one closure to be locked while waiting for another.)

Join example: parallel quicksort


The join primitive is ideal for divide-and-rule algorithms . These algorithms divide the work into approximately equal parts and then recursively perform it. For example, we can implement a parallel version of quicksort :

 fn quick_sort<T:PartialOrd+Send>(v: &mut [T]) { if v.len() > 1 { let mid = partition(v); let (lo, hi) = v.split_at_mut(mid); rayon::join(|| quick_sort(lo), || quick_sort(hi)); } } fn partition<T:PartialOrd+Send>(v: &mut [T]) -> usize { // . https://en.wikipedia.org/wiki/Quicksort#Lomuto_partition_scheme } 

In fact, the only difference between this version of quicksort and the consistent one is that at the end we call rayon::join !

How to implement join: work interception (work-stealing)


Inside join implemented using a technique known as intercepting work. As far as I know, the interception of work was first introduced as part of the Cilk project, and has since become a fairly standard technique (in fact, the name Rayon (eng. "Viscose", and "Cilk" allusion to "silk", i.e. "silk" - approx. Transl.) - a tribute to Cilk).
The main idea is that with each call to join(a, b) we define two tasks a and b , which can be safely executed in parallel, but we do not yet know whether there are free threads for this. All that the current thread does is add b to the “planned work” queue, then take, and immediately execute a . At the same time, there is a pool of other active threads (usually one thread per core of the CPU, or something like that). As soon as one of the threads is released, it goes and digs into the “planned work” queues of other threads: if there is a task there, the free flow captures it and executes it itself. So in this case, while the first thread is busy executing a , another thread can start executing b .
As soon as the first thread finishes a , it checks to see if someone else has started performing b ? If not, he performs it himself. If so, then he needs to wait until another thread finishes it. But while the first thread is waiting, it can go and steal work from another thread, thereby contributing to the completion of the entire work process as a whole.
In the form of a Rust-like pseudo-code, join looks something like this (the real code is a bit different, for example, it allows each operation to have a result):

 fn join<A,B>(oper_a: A, oper_b: B) where A: FnOnce() + Send, B: FnOnce() + Send, { //  `oper_b`  ,      : let job = push_onto_local_queue(oper_b); //  `oper_a` : oper_a(); // Check whether anybody stole `oper_b`: if pop_from_local_queue(oper_b) { //  ,  . oper_b(); } else { // ,    . //         : while not_yet_complete(job) { steal_from_others(); } result_b = job.result(); } } 

What makes the interception of work so elegant is its natural adaptation to the CPU load. That is, if all worker threads are busy, then join(a, b) starts to execute all closures sequentially (i.e. a(); b(); ), which is no worse than the sequential code. But if there are free threads, then we get parallel execution.

Performance measurement


Rayon is still quite young, so I don’t have a lot of test programs (and so far I haven’t optimized it much). In spite of this, now you can get a noticeable acceleration, although this will have to spend a little more time debugging than I would like. For example, with the improved version of quicksort, I see the following acceleration from parallel execution on my 4-core Macbook Pro (so that four-fold acceleration is the maximum that can be expected):
Array lengthAcceleration
1K0.95x
32K2.19x
64K3.09x
128K3.52x
512K3.84x
1024K4.01x

The change I made in comparison with the original version - I added the transition to a sequential algorithm . The bottom line is that if the input array is small enough (in my code - less than 5000 elements), then we move to a sequential version of the algorithm, refusing to call join . This can be done at all without duplicating the code using types, as can be seen from the code of my example . (If curious, I explain the idea in the appendix at the end of the article.)
Hopefully, after some optimizations, the transition to sequential execution will be needed less often, but it is worth noting that high-level APIs (such as the parallel iterator, which I mentioned above) can also make the transition to sequential execution for you, so you don’t need to constantly think about it .
In any case, if you do not make the transition to a consistent implementation, the results will not be as good, although they could be much worse:
Array lengthAcceleration
1K0.41x
32K2.05x
64K2.42x
128K2.75x
512K3.02x
1024K3.10x

In particular, remember that this version of the code gives all the subarrays up to unit length for parallel processing . If the array is 512K or 1024K long, then many subarrays are created, which means there are a lot of tasks, but we still get accelerations up to 3.10x. I think the reason why the code runs so well is that the basic approach is correct: Rayon avoids memory allocation and virtual dispatching, as described in the next section. And yet I would like better performance than 0.41x for 1K arrays (and I think this is possible).

Using Rust features to minimize overhead


As you can see above, to make this scheme work, you need to reduce the overhead of placing the task in the local queue as much as possible. In the end, it is expected that most tasks will not be intercepted, because the number of processors is much less than the number of tasks. The Rayon API is designed to use some of the Rust features to reduce this overhead:

As can be seen from the above, the overhead of placing the task is quite low, although not as much as I would like. There are several ways to reduce them even more:


Freedom from racing data


I mentioned earlier that Rayon guarantees freedom from data races. This means that you can add concurrency to a previously sequential code, without worrying about the fact that strange, difficult to reproduce bugs may appear.
There are two types of errors that we should worry about. First, two closures can use the same mutable state, so changes made in one thread can affect the other. For example, if I change the example above so that it (incorrectly) called quick_sort with the lo parameter in both closures, then I hope that the code will not compile:

 fn quick_sort<T:PartialOrd+Send>(v: &mut [T]) { if v.len() > 1 { let mid = partition(v); let (lo, hi) = v.split_at_mut(mid); rayon::join(|| quick_sort(lo), || quick_sort(lo)); // <-- ! } } 

And indeed, I will see such an error:

 test.rs:14:10: 14:27 error: closure requires unique access to `lo` but it is already borrowed [E0500] test.rs:14 || quick_sort(lo)); ^~~~~~~~~~~~~~~~~ 

Similar errors will arise if I try to process lo (or hi ) in one circuit, and v in the other, which overlaps with both sections.
Note: this example seems to be artificial, but in fact this is a real bug that I once allowed (or, rather, would allow) when implementing parallel iterators, which I will discuss later. It is very easy to make such mistakes with copy-paste, and it’s very good that Rust turns them into an impossible event, and not into a bug with the program crash.
Another type of bugs that can be caught is the use of thread safe types from one of the closures in the join . For example, Rust offers a type with a non-atomic reference counter called Rc . Since Rc uses non-atomic instructions to update the reference counter, it is not safe to separate Rc between different threads. If someone tries to do so, as in the following example, the reference counter can easily become incorrect, which can lead to a double memory free or worse:

 fn share_rc<T:PartialOrd+Send>(rc: Rc<i32> { //     `clone`   . //     . //     ! rayon::join(|| something(rc.clone()), || something(rc.clone())); } 

But, of course, if I try to compile this example, I get an error:

 test.rs:14:5: 14:9 error: the trait `core::marker::Sync` is not implemented for the type `alloc::rc::Rc<i32>` [E0277] test.rs:14 rayon::join(|| something(rc.clone()), ^~~~~~~~~~~ test.rs:14:5: 14:9 help: run `rustc --explain E0277` to see a detailed explanation test.rs:14:5: 14:9 note: `alloc::rc::Rc<i32>` cannot be shared between threads safely 

As you can see, in the last message after “note” the compiler tells us that you cannot share access to Rc between different threads.
You might be wondering, what kind of dark magic allows the join function to support both of these invariants? In fact, the answer is surprisingly simple. The first error I received when I tried to transfer the same &mut -slice into two different closures results from the basic type system of Rust: you cannot have two closures that both exist simultaneously and have access to the same &mut - cut . This is because access to &mut data must be unique , which means that if you had two closures could get unique access to the same &mut value, this would make the value not so unique .
(In fact, it was one of the greatest insights for me when working with the Rust type system. Before that, I thought that “dangling pointers” in sequential programs and “data races” are completely different kinds of bugs, but now I present them as two heads One Hydra. Basically, both types of bugs have unrestrained use of pseudonyms and data changes, and both of them can be solved using a system of ownership and borrowing. Deft, yes?)
So what about the second error, in which I tried to send Rc between threads? It arose because the join function requires that both its closure arguments satisfy the type Send . The Send type in Rust indicates that data can be safely transferred between threads. So when join declares that both closures must satisfy the type Send , it seems to say: "for data that closures can access, it must be safe to move from one stream to another . "

Parallel Iterators


At the beginning of the post I gave this example with a parallel iterator:

 let total_price = stores.par_iter() .map(|store| store.compute_price(&list)) .sum(); 

But since then I have focused exclusively on join . As I said earlier, the API for parallel iterators is actually a fairly simple wrapper around join . At the moment, it looks more like a concentrate than anything else. But what’s really elegant about it is that it does not require any unsafe code related to concurrency. That is, the API of parallel iterators is simply built on the basis of join , which hides all unsafe code. (To be more precise, there is still quite a bit of unsafe code associated with managing uninitialized memory when constructing a vector. But this code has nothing to do with parallelism , similar code can be found in the Vec implementation. This code is also not correct in some boundary cases because I didn’t have time to write it properly.)
I do not want to dive too much into the details of the implementation of the parallel iterator, because according to my plans it will still change. But at a high level, the idea is that we have a ParallelIterator type with the following main methods :

 pub trait ParallelIterator { type Item; type Shared: Sync; type State: ParallelIteratorState<Shared=Self::Shared, Item=Self::Item> + Send; fn state(self) -> (Self::Shared, Self::State); ... //    ,  `map`  . . } 

, state - . () , Sync ( ). join , Send ( ).
ParallelIteratorState (, - ). :

 pub trait ParallelIteratorState: Sized { type Item; type Shared: Sync; fn len(&mut self) -> ParallelLen; fn split_at(self, index: usize) -> (Self, Self); fn for_each<OP>(self, shared: &Self::Shared, op: OP) where OP: FnMut(Self::Item); } 

The method lengives an idea of ​​the amount of work remaining. The method split_atdivides this state into two parts. The method for_eachprocesses all values ​​from this iterator. So, for example, a parallel iterator for the slice &[T]should be:

, , . , , . (, , ):

 fn process(shared, state) { if state.len() is too big { //     let midpoint = state.len() / 2; let (state1, state2) = state.split_at(midpoint); rayon::join(|| process(shared, state1), || process(shared, state2)); } else { //     state.for_each(|item| { // process item }) } } 

, .


Rayon. , , , .
, , Rust. . Rust, , , Erlang , , . , quicksort. , , quicksort.
, , Rayon — . , , — Rust , . , :


:


, quicksort, , . quicksort . , Rust . , .
-, Joiner , join :

 trait Joiner { ///    ,    . fn is_parallel() -> bool; ///   `rayon::join`,   `oper_a(); oper_b();`. fn join<A,R_A,B,R_B>(oper_a: A, oper_b: B) -> (R_A, R_B) where A: FnOnce() -> R_A + Send, B: FnOnce() -> R_B + Send; } 

, :

 struct Parallel; impl Joiner for Parallel { .. } struct Sequential; impl Joiner for Sequential { .. } 

quick_sort J: Joiner , ( ). :

 fn quick_sort<J:Joiner, T:PartialOrd+Send>(v: &mut [T]) { if v.len() > 1 { //     ,    5K: if J::is_parallel() && v.len() <= 5*1024 { return quick_sort::<Sequential, T>(v); } let mid = partition(v); let (lo, hi) = v.split_at_mut(mid); J::join(|| quick_sort::<J,T>(lo), || quick_sort::<J,T>(hi)); } 

Source: https://habr.com/ru/post/274299/


All Articles