📜 ⬆️ ⬇️

Writing Your Spliterator

Many of you have already tasted the Stream API - Java 8 streams. Certainly, some have a desire not only to use ready streams from collections, arrays, random numbers, but also to create some kind of fundamentally new stream. To do this you need to write your splitter. Spliterator is the filling of the stream, the public part of its internal logic. In this article I will tell you how and why I wrote the splitter.

What is the splitter


A splitter is an interface that contains 8 methods, four of which already have a default implementation. The remaining methods are tryAdvance , trySplit , estimateSize and characteristics . There are also special modifications of the splitter for the primitive types int , long and double : they add a few additional methods to avoid boxing. A splitter is like an ordinary iterator. The main difference - the ability to split (split) into two parts - underlies the parallel operation of threads. Also, in order to optimize, the splitter has a number of characteristic flags and can report exactly or approximately its size. Finally, the splitter never modifies a data source: it does not have a remove method like an iterator. Consider the methods in more detail:


Creating a stream over an existing splitter is very easy - you need to call StreamSupport.stream () .

When it is not necessary to write a splitrator


The main thing is to understand that by itself you do not need a splitter, you need a stream. If you can create a stream using existing functionality, then you should do just that. For example, you want to make friends with XML DOM threads and create a stream using a NodeList . There is no standard such method, but it is easy to write it without additional splitters:
')
 public class XmlStream { static Stream<Node> of(NodeList list) { return IntStream.range(0, list.getLength()).mapToObj(list::item); } } 

Similarly, you can add threads to any non-standard collection (another example - org.json.JSONArray ), which can quickly return the length and the element by its ordinal number.

If you find it difficult or too lazy to write trySplit , you'd better not write the splitter at all. Here is one friend writing the protonpack library, completely ignoring the existence of parallel threads. He wrote a lot of splitters that don’t know how to share at all. A splitter that does not share at all is a bad, worthless splitter. Do not do this. In this case, it is better to write an ordinary iterator and create a splitter on it using the Spliterators.spliterator methods or, if you do not know the size of the collection, then Spliterators.spliteratorUnknownSize . These methods have at least some sort of heuristics for division: they bypass a part of the iterator by reading it into an array and creating a new splitter for this array. If the stream continues with a lengthy operation, then parallelization will still speed up the work.

If you implement the standard Iterable interface or Collection , then you get a free default spliterator() . It is worth, of course, to see if it can be improved. One way or another, you rarely need to write your splitters. This can be useful if you are developing your data structure (for example, a collection on primitives, as leventov does).

And still write


We will write a new splitter to solve this problem: for a given stream, create a stream of pairs from neighboring values ​​of the original stream. Since there is no generally accepted type for representing a pair of values ​​in Java and there are too many options (use an array of two values, a list of two values, Map.Entry with the same key and value type, etc.), we will give it to the user : let him decide how to combine the two values. That is, we want to create a method with the following signature:

 public static <T, R> Stream<R> pairMap(Stream<T> stream, BiFunction<T, T, R> mapper) {...} 

With it, you can use any type to represent a pair. For example, if we want Map.Entry :

 public static <T> Stream<Map.Entry<T, T>> pairs(Stream<T> stream) { return pairMap(stream, AbstractMap.SimpleImmutableEntry<T, T>::new); } 

In general, you can immediately calculate something interesting without folding the pair into an intermediate container:

 public static Stream<Integer> diff(Stream<Integer> stream) { return pairMap(stream, (a, b) -> b - a); } 

This method on a stream of integers will return a stream of differences of neighboring elements. As you might guess, in the final stream there will be one element less than in the initial one.

We want our pairMap look like a normal intermediate ( intermediate ) operation, that is, in fact, no calculations should be performed until it reaches the terminal operation. To do this, you need to take the spliterator at the input stream, but do nothing with it until we are asked. Another small, but important thing: when you close a new stream through close() you must close the original stream. As a result, our method may look like this:

 public static <T, R> Stream<R> pairMap(Stream<T> stream, BiFunction<T, T, R> mapper) { return StreamSupport.stream(new PairSpliterator<>(mapper, stream.spliterator()), stream.isParallel()).onClose(stream::close); } 

The initial stream after calling the spliterator() method becomes “used”, you cannot cook any more porridge with it. But this is normal: this is the case with all intermediate threads when you add a new operation. The Stream.concat () method, which merges two streams, looks about the same. It remains to write myself PairSpliterator .

Getting to the point


The simplest thing is to write the characteristics() method. Some of the characteristics are inherited from the original splitter, but it is necessary to reset NONNULL, DISTINCT and SORTED: we cannot guarantee these characteristics after applying an arbitrary mapper function:

 public int characteristics() { return source.characteristics() & (SIZED | SUBSIZED | CONCURRENT | IMMUTABLE | ORDERED); } 

The implementation of the tryAdvance method should be fairly simple. It is only necessary to read the data from the source splitter, memorizing the previous element in the intermediate buffer, and call the mapper for the last pair. One has only to remember whether we are at the beginning of the stream or not. For this, the boolean hasPrev variable is hasPrev , indicating whether we have a previous value.

The trySplit method trySplit best implemented by calling trySplit on the source splitter. The main difficulty here is to process a pair at the junction of two separated pieces of the original stream. This pair should be processed by a splitter that bypasses the first half. Accordingly, he must keep the first value from the second half and, when he gets to the end, work again, submitting it to the mapper along with his last value.

Having dealt with this, we will write the constructors:

 class PairSpliterator<T, R> implements Spliterator<R> { Spliterator<T> source; boolean hasLast, hasPrev; private T cur; private final T last; private final BiFunction<T, T, R> mapper; public PairSpliterator(BiFunction<T, T, R> mapper, Spliterator<T> source) { this(mapper, source, null, false, null, false); } public PairSpliterator(BiFunction<T, T, R> mapper, Spliterator<T> source, T prev, boolean hasPrev, T last, boolean hasLast) { this.source = source; //   this.hasLast = hasLast; //       (   ) this.hasPrev = hasPrev; //     this.cur = prev; //   this.last = last; //     this.mapper = mapper; } // ... } 

The tryAdvance method (instead of lambda for transmission to the original tryAdvance use the reference to the setter):

 void setCur(T t) { cur = t; } @Override public boolean tryAdvance(Consumer<? super R> action) { if (!hasPrev) { //    :      if (!source.tryAdvance(this::setCur)) { return false; //    —  } hasPrev = true; } T prev = cur; //    if (!source.tryAdvance(this::setCur)) { //     if (!hasLast) return false; //    —  hasLast = false; //       cur = last; } action.accept(mapper.apply(prev, cur)); //   action  mapper' return true; } 

Here is the trySplit() method:

 public Spliterator<R> trySplit() { Spliterator<T> prefixSource = source.trySplit(); //    if (prefixSource == null) return null; //   —       T prev = cur; //       ,     if (!source.tryAdvance(this::setCur)) { //      source = prefixSource; //      —    return null; } boolean oldHasPrev = hasPrev; hasPrev = true; //      ,      return new PairSpliterator<>(mapper, prefixSource, prev, oldHasPrev, cur, true); } 

It is easy to write estimateSize() : if the source splitter is able to estimate its size, you just need to check the flags and tweak it one way or the other:

 public long estimateSize() { long size = source.estimateSize(); if (size == Long.MAX_VALUE) //       —     return size; if (hasLast) //          size++; if (!hasPrev && size > 0) //        size--; return size; } 

In this form, this splitter and got into my library StreamEx. The only difference is that it took to make versions for primitive types, and pairMap is not a static method.

All this, probably, strongly brakes?


With speed it's not so bad. Take for example the following problem with StackOverflow: from the given set of Integer numbers, leave only those that are smaller than the number following them, and save the result to a new list. The task itself is very simple, so a significant portion of the time will go to the overhead. You can offer two naive implementations: through an iterator (will work with any collection) and through access by item number (will work only with a list with quick random access). Here is an option with an iterator (naiveIterator):

 List<Integer> result = new ArrayList<>(); Integer last = null; for (Integer cur : input) { if (last != null && last < cur) result.add(last); last = cur; } 

But with random access (naiveGet):

 List<Integer> result = new ArrayList<>(); for (int i = 0; i < input.size() - 1; i++) { Integer cur = input.get(i), next = input.get(i + 1); if (cur < next) result.add(cur); } 

The solution using the StreamEx library is very compact and works with any data source (streamEx):

 List<Integer> result = StreamEx.of(input).pairMap((a, b) -> a < b ? a : null).nonNull().toList(); 

Commentators suggested three more working solutions. The largest number of votes gained more or less traditional, which at the entrance requires a list with random access (let's call this solution stream):

 List<Integer> result = IntStream.range(0, input.size() - 1).filter(i -> input.get(i) < input.get(i + 1)).mapToObj(input::get) .collect(Collectors.toList()); 

The following is reduce with a side effect that does not parallel (reduce):

 List<Integer> result = new ArrayList<>(); input.stream().reduce((a, b) -> { if (a < b) result.add(a); return b; }); 

And the last thing is its own collector, which also does not parallel (collector):

 public static Collector<Integer, ?, List<Integer>> collectPrecedingValues() { int[] holder = { Integer.MAX_VALUE }; return Collector.of(ArrayList::new, (l, elem) -> { if (holder[0] < elem) l.add(holder[0]); holder[0] = elem; }, (l1, l2) -> { throw new UnsupportedOperationException("Don't run in parallel"); }); } List<Integer> result = input.stream().collect(collectPrecedingValues()); 

Parallel versions of stream and streamEx are also compared. We will conduct the experiment on arrays of random integers of length n = 10 000, 100 000 and 1 000 000 elements (about half of the result will be in the result) The complete JMH benchmark code is here . It is checked that all algorithms produce the same result array.

The measurements were carried out on a quad-core Core-i5. The results look like this (all times in microseconds per operation, less is better):
Algorithmn = 10,000n = 100,000n = 1,000,000
naiveIterator97.7904.010592.7
naiveGet99.81084.411424.2
collector112.51404.914387.2
reduce112.11139.512001.5
stream146.41624.116600.9
streamEx115.21247.112967.0
streamParallel56.9582.36120.5
streamExParallel53.4516.75353.4
It can be seen that the version with pairMap (streamEx) overtakes both the traditional streaming version (stream) and the version with a collector, second only to the wrong reduce. At the same time, the parallel version of streamEx is also faster than the parallel version of stream and significantly overtakes all sequential versions even for a small data set. This is consistent with the rule of thumb from the Stream Parallel Guidance : it makes sense to parallel the task if it is executed for at least 100 microseconds.

If you want to create your own streams, remember that it depends on a good splitter how your task will be paralleled. If you do not want to bother with division, do not write the splitter at all, but use utility methods. Also, do not write a new splitter if it is possible to create a stream using the existing JDK functionality. If you have a good splitter, even a not very difficult task can be accelerated with parallel processing.

Source: https://habr.com/ru/post/256905/


All Articles