The last couple of weeks I've been working on updating
Rayon - my experimental
data parallelism library in Rust.
I am quite pleased with the way the development is going on, so I decided to explain to what I came to the blog post.
The goal of Rayon is to make adding parallelism to sequential code simple, so that any
for
loop or iterator could be made to work in multiple threads. For example, if you have such a chain of iterators:
let total_price = stores.iter() .map(|store| store.compute_price(&list)) .sum()
then you can make its work parallel simply by changing the usual
“sequential iterator” to
“parallel iterator” from Rayon:
')
let total_price = stores.par_iter() .map(|store| store.compute_price(&list)) .sum()
Of course, it is not enough to make concurrency simple, it must also be made safe.
Rayon ensures that using its API will never lead to a data race.This post explains how Rayon works. First, it tells about the main primitive of Rayon (
join
), and then how it is implemented.
I separately want to draw attention to how the combination of the Rust feature set makes it possible to realize
join
with very low overhead during the execution of the program, while still giving strict security guarantees. Then I will briefly describe how a
join
iterator abstraction is built on the basis of
join
.
However, I want to emphasize that Rayon is more
in the process of development . I expect that the design of the parallel iterator will go through many more, let's say, iterations (pun intended), because the current implementation is not as flexible as I would like. In addition, there are several special cases that are handled incorrectly, in particular the spread of panic and cleaning up resources. Regardless, Rayon may be useful for certain tasks right now. I am very glad, and I hope you will be happy too!
Rayon main primitive: join
At the beginning of the post, I showed an example of using a parallel iterator for the map-reduce operation:
let total_price = stores.par_iter() .map(|store| store.compute_price(&list)) .sum()
However, in fact, parallel iterators are just a small library built on the basis of a more fundamental primitive:
join
. Using
join
is easy. You call two closures, as shown below, and
join
potentially runs them in parallel. As soon as both of them are completed, he will return the result:
The main point here is that the two closures
can potentially run in parallel: the
decision whether or not to use parallel threads is made dynamically, depending on whether there are free cores or not. The idea is that you can use
join
mark in your program the places where concurrency can be useful, and then let the library at run time decide whether to use it or not.
The
potential parallelism approach is the basic idea that distinguishes Rayon from
limited crossbeam flows . If you distribute work across two restricted threads in the crossbeam, it will always be executed in parallel in different threads. At the same time, the call to
join
in Rayon does not necessarily lead to parallel code execution. As a result, we have not only a simpler API, but also more efficient use of resources. All because it is very difficult to predict in advance when parallelization will be profitable. This always requires knowledge of some global context, for example: does the computer have free cores and what other parallel operations are being performed now?
In fact, one of the main goals of this post is to promote potential parallelism as the basis for libraries for data parallelism in Rust, as opposed to
guaranteed parallelism , which we saw earlier.
This is not to mention the fact that there is no separate role for guaranteed parallelism offered by crossbeam. The semantics of
potential concurrency also imposes some limitations on what your parallelized closures can do. For example, if you try to use a channel for communication between two closures in a
join
, then this will most likely lead to a deadlock. It’s worthwhile to think about
join
as a hint to use parallelism in a typically sequential algorithm. Sometimes this is not what you want - some algorithms are initially
parallel . (Note, however, that it is perfectly normal to use types like
Mutex
,
AtomicU32
, etc. from within
join
- you just don't want one closure to be
locked while waiting for another.)
Join example: parallel quicksort
The
join
primitive is ideal for
divide-and-rule algorithms . These algorithms divide the work into approximately equal parts and then recursively perform it. For example, we can implement a
parallel version of quicksort :
fn quick_sort<T:PartialOrd+Send>(v: &mut [T]) { if v.len() > 1 { let mid = partition(v); let (lo, hi) = v.split_at_mut(mid); rayon::join(|| quick_sort(lo), || quick_sort(hi)); } } fn partition<T:PartialOrd+Send>(v: &mut [T]) -> usize {
In fact, the only difference between this version of quicksort and the consistent one is that at the end we call
rayon::join
!
How to implement join: work interception (work-stealing)
Inside
join
implemented using a technique known as
intercepting work. As far as I know, the interception of work was first introduced as part of the
Cilk project, and has since become a fairly standard technique (in fact, the name Rayon
(eng. "Viscose", and "Cilk" allusion to "silk", i.e. "silk" - approx. Transl.) - a tribute to Cilk).
The main idea is that with each call to
join(a, b)
we define two tasks
a
and
b
, which can be safely executed in parallel, but we do not yet know whether there are free threads for this. All that the current thread does is add
b
to the
“planned work” queue, then take, and immediately execute
a
. At the same time, there is a pool of other active threads (usually one thread per core of the CPU, or something like that). As soon as one of the threads is released, it goes and digs into the
“planned work” queues of other threads: if there is a task there, the free flow captures it and executes it itself. So in this case, while the first thread is busy executing
a
, another thread can start executing
b
.
As soon as the first thread finishes
a
, it checks to see if someone else has started performing
b
? If not, he performs it himself. If so, then he needs to wait until another thread finishes it. But while the first thread is waiting, it can go and steal work from another thread, thereby contributing to the completion of the entire work process as a whole.
In the form of a Rust-like pseudo-code,
join
looks something like this (the
real code is a bit different, for example, it allows each operation to have a result):
fn join<A,B>(oper_a: A, oper_b: B) where A: FnOnce() + Send, B: FnOnce() + Send, { // `oper_b` , : let job = push_onto_local_queue(oper_b); // `oper_a` : oper_a(); // Check whether anybody stole `oper_b`: if pop_from_local_queue(oper_b) { // , . oper_b(); } else { // , . // : while not_yet_complete(job) { steal_from_others(); } result_b = job.result(); } }
What makes the interception of work so elegant is its natural adaptation to the CPU load. That is, if all worker threads are busy, then
join(a, b)
starts to execute all closures sequentially (i.e.
a(); b();
), which is no worse than the sequential code. But if there are free threads, then we get parallel execution.
Performance measurement
Rayon is still quite young, so I don’t have a lot of test programs (and so far I haven’t optimized it much). In spite of this, now you can get a noticeable acceleration, although this will have to spend a
little more time debugging than I would like. For example, with the
improved version of quicksort, I see the following
acceleration from parallel execution on my 4-core Macbook Pro (so that four-fold acceleration is the maximum that can be expected):
Array length | Acceleration |
---|
1K | 0.95x |
32K | 2.19x |
64K | 3.09x |
128K | 3.52x |
512K | 3.84x |
1024K | 4.01x |
The change I made in comparison with the original version - I added the
transition to a sequential algorithm . The bottom line is that if the input array is small enough (in my code - less than 5000 elements), then we move to a sequential version of the algorithm, refusing to call
join
. This can be done at all without duplicating the code using types, as can be seen from the
code of my example . (If curious, I explain the idea in the appendix at the end of the article.)
Hopefully, after some optimizations, the transition to sequential execution will be needed less often, but it is worth noting that high-level APIs (such as the parallel iterator, which I mentioned above) can also make the transition to sequential execution for you, so you don’t need to constantly think about it .
In any case, if you
do not make the transition to a consistent implementation, the results will not be as good, although they could be much worse:
Array length | Acceleration |
---|
1K | 0.41x |
32K | 2.05x |
64K | 2.42x |
128K | 2.75x |
512K | 3.02x |
1024K | 3.10x |
In particular, remember that this version of the code
gives all the subarrays up to unit length for parallel processing . If the array is 512K or 1024K long, then many subarrays are created, which means there are a lot of tasks, but we still get accelerations up to 3.10x. I think the reason why the code runs so well is that the
basic approach is correct: Rayon avoids memory allocation and virtual dispatching, as described in the next section. And yet I would like better performance than 0.41x for 1K arrays (and I think this is possible).
Using Rust features to minimize overhead
As you can see above, to make this scheme work, you need to reduce the overhead of placing the task in the local queue as much as possible. In the end, it is expected that most tasks will not be intercepted, because the number of processors is much less than the number of tasks. The Rayon API is designed to use some of the Rust features to reduce this overhead:
join
polymorphic about the closure types of its arguments. And this means that in the process of monomorphization separate copies of the join
will be created specialized for each specific call . Which, in turn, leads to the fact that when the join
calls oper_a()
and oper_b()
(in contrast to the relatively rare cases when they are intercepted), the calls are dispatched statically, which means they can be inlined. Yes, and it turns out that the creation of the circuit does not require memory allocation.- Because
join
blocks execution before executing both closures, we can make full use of the layout on the stack . This is good for both the API users and the implementation: for example, the quicksort example above is based on having access to the slice &mut [T]
, which is passed to the input, which is possible due to the blocking in join
. At the same time, the join
implementation can completely avoid allocating memory from the heap and use only the stack (for example, objects of closures that are put in the local task queue are placed on the stack).
As can be seen from the above, the overhead of placing the task is quite low, although not as much as I would like. There are several ways to reduce them even more:
- Many implementations of job interception use heuristics when deciding whether to skip the placement of a task in the task queue for parallel processing. For example, in the work of Lazy Tzannes Planning , an attempt was made to avoid placing a task in a queue if there are no free work flows (they are called “hungry” flows) capable of intercepting work.
- And, of course, good old optimizations can help. For example, I have never even looked into the LLVM bitcode or assembly code obtained during the compilation of
join
, and it’s very likely that it’s the easiest to optimize.
Freedom from racing data
I mentioned earlier that Rayon guarantees freedom from data races. This means that you can add concurrency to a previously sequential code, without worrying about the fact that strange, difficult to reproduce bugs may appear.
There are two types of errors that we should worry about. First, two closures can use the same mutable state, so changes made in one thread can affect the other. For example, if I change the example above so that it (incorrectly) called
quick_sort
with the
lo
parameter in both closures, then I hope that the code will not compile:
fn quick_sort<T:PartialOrd+Send>(v: &mut [T]) { if v.len() > 1 { let mid = partition(v); let (lo, hi) = v.split_at_mut(mid); rayon::join(|| quick_sort(lo), || quick_sort(lo));
And indeed, I will see such an error:
test.rs:14:10: 14:27 error: closure requires unique access to `lo` but it is already borrowed [E0500] test.rs:14 || quick_sort(lo)); ^~~~~~~~~~~~~~~~~
Similar errors will arise if I try to process
lo
(or
hi
) in one circuit, and
v
in the other, which overlaps with both sections.
Note: this example seems to be artificial, but in fact this is a real bug that I once allowed (or, rather, would allow) when implementing parallel iterators, which I will discuss later. It is very easy to make such mistakes with copy-paste, and it’s very good that Rust turns them into an impossible event, and not into a bug with the program crash.
Another type of bugs that can be caught is the use of thread safe types from one of the closures in the
join
. For example, Rust offers a
type with a non-atomic reference counter called
Rc
. Since
Rc
uses non-atomic instructions to update the reference counter, it is not safe to separate
Rc
between different threads. If someone tries to do so, as in the following example, the reference counter can easily become incorrect, which can lead to a double memory free or worse:
fn share_rc<T:PartialOrd+Send>(rc: Rc<i32> {
But, of course, if I try to compile this example, I get an error:
test.rs:14:5: 14:9 error: the trait `core::marker::Sync` is not implemented for the type `alloc::rc::Rc<i32>` [E0277] test.rs:14 rayon::join(|| something(rc.clone()), ^~~~~~~~~~~ test.rs:14:5: 14:9 help: run `rustc --explain E0277` to see a detailed explanation test.rs:14:5: 14:9 note: `alloc::rc::Rc<i32>` cannot be shared between threads safely
As you can see, in the last message after
“note” the compiler tells us that you cannot share access to
Rc
between different threads.
You might be wondering, what kind of dark magic allows the
join
function to support both of these invariants? In fact, the answer is surprisingly simple. The first error I received when I tried to transfer the same
&mut
-slice into two different closures results from the basic type system of Rust: you cannot have two closures that both exist simultaneously and have access to the same
&mut
- cut . This is because access to
&mut
data must be
unique , which means that if you had two closures could get
unique access to the same
&mut
value, this would make the value not so
unique .
(In fact, it was one of the
greatest insights for me when working with the Rust type system. Before that, I thought that
“dangling pointers” in sequential programs and
“data races” are completely different kinds of bugs, but now I present them as two heads One Hydra. Basically, both types of bugs have unrestrained use of pseudonyms and data changes, and both of them can be solved using a system of ownership and borrowing. Deft, yes?)
So what about the second error, in which I tried to send
Rc
between threads? It arose because the
join
function requires that both its closure arguments satisfy the type
Send
. The
Send
type in Rust indicates that data can be safely transferred between threads. So when
join
declares that both closures must satisfy the type
Send
, it seems to say:
"for data that closures can access, it must be safe to move from one stream to another .
"Parallel Iterators
At the beginning of the post I gave this example with a parallel iterator:
let total_price = stores.par_iter() .map(|store| store.compute_price(&list)) .sum();
But since then I have focused exclusively on
join
. As I said earlier, the API for parallel iterators is actually a
fairly simple wrapper around
join
. At the moment, it looks more like a concentrate than anything else. But what’s really elegant about it is that it does not require
any unsafe code related to concurrency. That is, the API of parallel iterators is simply built on the basis of
join
, which hides all unsafe code. (To be more precise, there is still quite a bit of unsafe code associated with
managing uninitialized memory when constructing a vector. But this code has nothing to do with
parallelism , similar code can be found in the
Vec
implementation. This code is also not correct in some boundary cases because I didn’t have time to write it properly.)
I do not want to dive too much into the details of the implementation of the parallel iterator, because according to my plans it will still change. But at a high level, the idea is that we have a
ParallelIterator
type
with the following main methods :
pub trait ParallelIterator { type Item; type Shared: Sync; type State: ParallelIteratorState<Shared=Self::Shared, Item=Self::Item> + Send; fn state(self) -> (Self::Shared, Self::State); ...
,
state
-
. () ,
Sync
( ).
join
,
Send
( ).
ParallelIteratorState
(, - ). :
pub trait ParallelIteratorState: Sized { type Item; type Shared: Sync; fn len(&mut self) -> ParallelLen; fn split_at(self, index: usize) -> (Self, Self); fn for_each<OP>(self, shared: &Self::Shared, op: OP) where OP: FnMut(Self::Item); }
The method len
gives an idea of ​​the amount of work remaining. The method split_at
divides this state into two parts. The method for_each
processes all values ​​from this iterator. So, for example, a parallel iterator for the slice &[T]
should be:- to implement
len
, which simply returns the length of the slice, - implement
split_at
which will cut the slice into two sub-slice, - and implement
for_each
, which will go through the array and cause an operation for each element op
.
, , . , , . (, , ):
fn process(shared, state) { if state.len() is too big {
,
.
Rayon. , , , .
, , Rust. . Rust, , , Erlang , , . , quicksort. , , quicksort.
, ,
Rayon
— . , ,
— Rust , . , :
- , . , ,
&mut
, const
- ( , ) ( , Rust , , - , const
— . .) . , Rust , . - Send , RFC458.
Send
, . RFC Send
-: 'static
, . Erlang, , , . - , .
, 'static
Send
, !
:
, quicksort, , . quicksort . , Rust .
, .
-,
Joiner
,
join
:
trait Joiner {
, :
struct Parallel; impl Joiner for Parallel { .. } struct Sequential; impl Joiner for Sequential { .. }
quick_sort
J: Joiner
, ( ). :
fn quick_sort<J:Joiner, T:PartialOrd+Send>(v: &mut [T]) { if v.len() > 1 {