📜 ⬆️ ⬇️

Stream API: universal intermediate operation

I am developing the free library StreamEx , which extends the standard Java 8 Stream API, adding new operations, collectors and sources of streams there. Usually I do not add everything, but comprehensively consider every potential feature. For example, when adding a new intermediate ( intermediate ) operation, these questions arise:

  1. Will it really be intermediate, that is, it will not touch the source until the terminal operation is performed?
  2. Will it be lazy and pull no more data from the source than is required?
  3. Does it work on an endless stream? Does it require a limited amount of memory?
  4. Will it parallel well?

Minusik on any of these points makes you seriously think about whether to add such an operation. Minusik on the first - it is not immediately. For example, competitors in jOOλ have the operation shuffle () , which looks like an intermediate one, but in fact directly consumes the entire stream into the list right away, mixes it up and creates a new stream. I don't respect that.

Minusiki on the remaining items does not immediately mean no, because there are standard operations that violate them. The second item breaks flatMap() , the third one - sorted() , the fourth - all sorts of limit() and takeWhile() (in JDK-9). But still, I try to avoid it. However, the other day I discovered an operation that is badly parallelized and, depending on its use, may not work on an endless stream, but is still too good. Through it, it is possible to literally express in a few lines both practically any existing intermediate operation, and a bunch of nonexistent ones. I called the operation headTail () .

The operation method takes two function parameters (everywhere below I omit PECS for short):
')
 <R> StreamEx<R> headTail(BiFunction<T, StreamEx<T>, Stream<R>> mapper, Supplier<Stream<R>> supplier) 

The first function takes two arguments: the first element of the source stream and a stream containing all the other elements. The function can do anything with this and return a new stream, which will be passed to the following operations. The second argument is a function that takes no arguments, which returns a stream of the same type as the first function. It is called if the source stream is empty. In fact, only one of the functions is called and only once during the execution of the terminal operation of the entire stream.

Often the second function should return just an empty stream (if the source stream is empty, then the result is empty), so it can be omitted:

 <R> StreamEx<R> headTail(BiFunction<T, StreamEx<T>, Stream<R>> mapper) 

Let's see what can be done about it. A simple use case might look like this:

 StreamEx.of("name", "John", "Mary", "Lucy") .headTail((head, tail) -> tail.map(str -> head+": "+str)) .forEach(System.out::println); 

Conclusion:

 name: John name: Mary name: Lucy 

Here we just bit off the first element of the stream and used it for concatenation with subsequent elements. So you can parse a text file that has headers in the first line. But it is pretty boring. Playing with this method, I discovered that it is much more powerful. Let's try to express through it the main intermediate operations from the JDK.

Stream.map


The map operation applies the specified function to all elements of the original stream. This is how it will look through headTail ():
 public static <T, R> StreamEx<R> map(StreamEx<T> input, Function<T, R> mapper) { return input.headTail((head, tail) -> map(tail, mapper).prepend(mapper.apply(head))); } 

Here we use another simple operation prepend , without which nothing would have happened. This is a variation on the concatenation of two streams (in the standard API there is Stream.concat). Here we call ourselves for the tail, and then add to the beginning of the stream the result of applying the function to the head element.

This is similar to recursion, and everyone knows that recursion is eating a stack. In functional programming languages, tailing recursion optimization sometimes saves, but in Java it is not and is not expected. However, in this case, this is not exactly recursion: we do not call the map method within ourselves, but only create a function that will be called later. It turned out that in this case it is possible to control the depth of calls if changes in each individual headTail() affect only the beginning of the stream, leaving the tail unchanged. I was not very creative in calling this feature “tail stream optimization”. It is compatible with intermediate prepend operations (add something to the beginning of the stream), mapFirst (change the first element of the stream, without touching the rest) and the headTail itself. In principle, it could be extended to standard skip and dropWhile (from JDK-9), but my library promises that standard operations are fully compatible with the original Stream API, and there would be subtle differences.

Anyway, the above map operation does not eat the stack or memory is larger than a constant size and is quite applicable for streams of any length. Let's look at other operations.

Stream.limit


Restrict the stream to a given length. If we restrict to one element, then we simply create a stream from the head, otherwise we call ourselves for the tail with a reduced restriction (process n <= 0 - an exercise for the reader):

 public static <T> StreamEx<T> limit(StreamEx<T> input, int n) { return input.headTail((head, tail) -> n > 1 ? limit(tail, n - 1).prepend(head) : Stream.of(head)); } 

At the beginning I wrote a little differently (like the flatMap argument, the headTail argument can return null instead of an empty stream):

 public static <T> StreamEx<T> limit(StreamEx<T> input, int n) { return input.headTail((head, tail) -> n > 0 ? limit(tail, n - 1).prepend(head) : null); } 

But this implementation has a flaw: it considers one element more from the source than necessary (for n = 0, the head argument is read but not used). Sometimes it can be critical. For example, such code should work:

 limit(StreamEx.of(new Random().ints(0, 1000).boxed().distinct()), 1000).forEach(System.out::println); 

An endless stream of random numbers from 0 to 999, from which we select unique ones. There are 1000 unique ones, but there is no 1001, so if you try to pull out the 1001th number from the source, everything will hang.

Stream.skip


Drop n first elements. If n = 0, we simply return the tail with the head stuck, otherwise we call ourselves with a reduced argument:

 static <T> StreamEx<T> skip(StreamEx<T> input, int n) { return input.headTail((head, tail) -> n > 0 ? skip(tail, n - 1) : tail.prepend(head)); } 


Stream.flatMap


Display each element on the stream and make one common stream. In our case, the implementation is the same as that of the map:

 public static <T, R> StreamEx<R> flatMap(StreamEx<T> input, Function<T, Stream<R>> mapper) { return input.headTail((head, tail) -> flatMap(tail, mapper).prepend(mapper.apply(head))); } 

Here, the only difference is that another prepend is used , which receives the stream (actually, the first prepend is a special case of this).

Stream.peek


Perform an additional action for each element of the stream and return the stream as is. Perform the action and glue the head to the tail:
 public static <T> StreamEx<T> peek(StreamEx<T> input, Consumer<T> consumer) { return input.headTail((head, tail) -> { consumer.accept(head); return peek(tail, consumer).prepend(head); }); } 


Stream.filter


Leave elements satisfying the predicate. We glue the head only if the predicate is fulfilled:
 public static <T> StreamEx<T> filter(StreamEx<T> input, Predicate<T> predicate) { return input.<T> headTail((head, tail) -> predicate.test(head) ? filter(tail, predicate).prepend(head) : filter(tail, predicate)); } 

Stream.distinct


Leave unique items. There will obviously need additional memory. A naive implementation will use filter (standard or declared above can be):

 public static <T> StreamEx<T> distinct(StreamEx<T> input) { return input.headTail((head, tail) -> distinct(tail.filter(n -> !Objects.equals(head, n))).prepend(head)); } 

But such code still eats the stack, there is no optimization of tail streams. In addition, each element is checked linearly by a chain of filters, and I would like to optimize it. To do this, we will keep in the parameters HashSet:

 private static <T> StreamEx<T> distinct(StreamEx<T> input, Set<T> observed) { return input.headTail((head, tail) -> observed.add(head) ? distinct(tail, observed).prepend(head) : distinct(tail, observed)); } 

Do not forget that Set.add returns false if the element was already in the set. In this case, do not glue the head. Such an implementation stack no longer eats and is not inferior in memory from the standard. Here it is worth adding a method for running (with recursive functions it often happens that you need a separate public method for running):

 public static <T> StreamEx<T> distinct(StreamEx<T> input) { return distinct(input, new HashSet<>()); } 

Stream.sorted


Sort stream. The operation is special: it is impossible to give anything as a result here until the source is fully read. We'll have to buffer everything (for example, in an ArrayList ) and here, for the first time, we will need the second headTail argument:

 public static <T> StreamEx<T> sorted(StreamEx<T> input) { return sorted(input, new ArrayList<>()); } private static <T> StreamEx<T> sorted(StreamEx<T> input, List<T> buf) { return input.headTail((head, tail) -> { buf.add(head); return sorted(tail, buf); }, () -> { buf.sort(null); return buf.stream(); }); } 

When the entire source stream is over, we sort the buffer and return the stream from it. I note that this sorted works similar to the standard one and it is still better than the shuffle given above. For example, if you concatenate two sorted streams, the second will not be sorted until you have completely read the first. By the way, replacing buf.sort(null) with Collections.shuffle(buf) you and shuffle can do more or less normally. And with Collections.reverse(buf) you can flip the stream.

JDK-9 adds two new intermediate operations for now. We also implement them:

Stream.takeWhile


Trim the stream as soon as the predicate returns false. It looks like limit:

 public static <T> StreamEx<T> takeWhile(StreamEx<T> input, Predicate<T> predicate) { return input.headTail((head, tail) -> predicate.test(head) ? takeWhile(tail, predicate).prepend(head) : null); } 

Stream.dropwhile


Throw out elements from the stream until the predicate returns false . Looks like a skip :

 public static <T> StreamEx<T> dropWhile(StreamEx<T> input, Predicate<T> predicate) { return input.headTail((head, tail) -> predicate.test(head) ? dropWhile(tail, predicate) : tail.prepend(head)); } 

Well, reinventing the wheel is boring. Let's try to implement new operations that are not in the Stream API.

mirror


Add to the end of the stream its contents in the reverse order (so that the stream from 1, 2, 3 turns into 1, 2, 3, 3, 2, 1). You can do it easily, but without tail optimization:

 public static <T> StreamEx<T> mirror(StreamEx<T> input) { return input.headTail((head, tail) -> mirror(tail).append(head).prepend(head)); } 

With a tail buffer is required:

 public static <T> StreamEx<T> mirror(StreamEx<T> input) { return mirror(input, new ArrayDeque<>()); } private static <T> StreamEx<T> mirror(StreamEx<T> input, Deque<T> buf) { return input.headTail((head, tail) -> { buf.addFirst(head); return mirror(tail, buf).prepend(head); }, buf::stream); } 

Both implementations do not take more than necessary: mirror(StreamEx.of(1,2,3,4,5)).limit(3) does not reach the reflection point and subtracts only three elements from the source.

scanLeft


We consistently modify the stream by performing the specified operation. For example, scanLeft(StreamEx.of(1,2,3,4,5), Integer::sum) should sequentially summarize the elements and create streams 1, 3, 6, 10, 15 .

 public static <T> StreamEx<T> scanLeft(StreamEx<T> input, BinaryOperator<T> operator) { return input.headTail((head, tail) -> scanLeft(tail.mapFirst(cur -> operator.apply(head, cur)), operator).prepend(head)); } 

Here we used the mapFirst method, which already exists in StreamEx. But if it were not, we would easily write it even without any recursion:

 public static <T> StreamEx<T> mapFirst(StreamEx<T> input, UnaryOperator<T> operator) { return input.headTail((head, tail) -> tail.prepend(operator.apply(head))); } 

In any case, tails are optimized, both with our mapFirst, and with the existing one.

takeWhileClosed


The name may not be very good. Sometimes you want to modify takeWhile , so that not only elements that satisfy the predicate but also the first element that violates it are included in the stream. Through existing operations and through the usual takeWhile this is not normally expressed. And through headTail , it's easy:

 public static <T> StreamEx<T> takeWhileClosed(StreamEx<T> input, Predicate<T> predicate) { return input.headTail((head, tail) -> predicate.test(head) ? takeWhileClosed(tail, predicate).prepend(head) : Stream.of(head)); } 

every


Take the elements from the stream at a specified interval (for example, every tenth), starting with the first. Here it is convenient to combine with the skip operation, but the standard skip does not optimize tails, so we will use our redefined skip :

 public static <T> StreamEx<T> every(StreamEx<T> input, int n) { return input.headTail((head, tail) -> every(skip(tail, n - 1), n).prepend(head)); } 

couples


Break the stream into disjoint pairs of elements, applying the specified function to them (if there are an odd number of elements, throw out the last one). Here it is convenient to call headTail twice:

 public static <T, R> StreamEx<R> couples(StreamEx<T> input, BiFunction<T, T, R> mapper) { return input.headTail((left, tail1) -> tail1.headTail((right, tail2) -> couples(tail2, mapper).prepend(mapper.apply(left, right)))); } 

pairMap


And if we want the same pair of intersecting pairs? Easy, you just need to return the right element to the stream when recursively calling:

 public static <T, R> StreamEx<R> pairMap(StreamEx<T> input, BiFunction<T, T, R> mapper) { return input.headTail((left, tail1) -> tail1.headTail((right, tail2) -> pairMap(tail2.prepend(right), mapper).prepend(mapper.apply(left, right)))); } 

Such an operation is already in StreamEx, and I wrote about it. headTail() course, it is normally parallelized, in contrast to the implementation via headTail() .

batches


Okay, with couples understandable. And if we want to break the stream into pieces of a fixed length (in the form of lists) and not lose a whole piece at the end? For example, batches(StreamEx(1,2,3,4,5,6,7), 3) should make a stream from the lists [1,2,3], [4,5,6], [7] . An argument containing the intermediate buffer will help here:

 public static <T> StreamEx<List<T>> batches(StreamEx<T> input, int size) { return batches(input, size, Collections.emptyList()); } private static <T> StreamEx<List<T>> batches(StreamEx<T> input, int size, List<T> cur) { return input.headTail((head, tail) -> cur.size() >= size ? batches(tail, size, Collections.singletonList(head)).prepend(cur) //         : batches(tail, size, StreamEx.of(cur).append(head).toList()), //     () -> Stream.of(cur)); } 

When the source is exhausted, we return the last accumulated buffer as a result with () -> Stream.of(cur) , so as not to lose the tail. Here, for the beauty of the implementation, I create a new list every time through StreamEx.of(cur).append(head).toList() , rather than changing the existing one. But easy and changeable lists to insert, if performance is important.

withIndices


Did you need to know the indexes of the elements in the stream? You can and this. In order not to have a special type, like an index-element pair, BiFunction<Integer, T, R> take an abstract function of the type BiFunction<Integer, T, R> , which can do everything it wants with the index and the element:

 public static <T, R> StreamEx<R> withIndices(StreamEx<T> input, BiFunction<Integer, T, R> mapper) { return withIndices(input, 0, mapper); } private static <T, R> StreamEx<R> withIndices(StreamEx<T> input, int idx, BiFunction<Integer, T, R> mapper) { return input.headTail((head, tail) -> withIndices(tail, idx + 1, mapper).prepend(mapper.apply(idx, head))); } 


dominators


A more exotic task: we will throw out the elements following this one, over which the given one “dominates”. Dominance defines a predicate from two elements. For example, dominators(numbers, (a, b) -> a >= b) will leave an increasing subset of the original numbers. The implementation is similar to every, but instead of skip our dropWhile is used:

 public static <T> StreamEx<T> dominators(StreamEx<T> input, BiPredicate<T, T> isDominator) { return input.headTail((head, tail) -> dominators(dropWhile(tail, e -> isDominator.test(head, e)), isDominator) .prepend(head)); } 


appendReduction


Add one more element to the end of the stream - the result of its reduction with a given operation. For example, appendReduction(numbers, 0, Integer::sum) add to the stream of numbers the sum of its elements.

 public static <T> StreamEx<T> appendReduction(StreamEx<T> input, T identity, BinaryOperator<T> op) { return input.headTail((head, tail) -> appendReduction(tail, op.apply(identity, head), op).prepend(head), () -> Stream.of(identity)); } 

As usual, everything is lazy and tails are optimized.

primes


Rather a learning task. Make a sieve of Eratosthenes: a lazy stream of primes that throws out those that are divided into those already seen before:

 public static StreamEx<Integer> sieve(StreamEx<Integer> input) { return sieve(StreamEx.iterate(2, x -> x+1)); } private static StreamEx<Integer> sieve(StreamEx<Integer> input) { return input.headTail((head, tail) -> sieve(tail.filter(n -> n % head != 0)).prepend(head)); } 

Here tail optimization does not work, although a similar thing in functional languages ​​is also, of course, not optimized. But it looks simple. With the default settings, the JVM has time to produce prime numbers up to 200,000 or more, until it drops with StackOverflowError.

You can think of other useful operations. For example, repeat the contents of a stream in a loop a specified number of times. Or duplicate the stream by filtering it with two different filters (without storing in memory what the second filter did not pass). You can make a running window (by analogy with batches, but overlap). In fact, whatever I came up with, I managed to implement it with the help of headTail very briefly (my tests are here ). Anyway, for me, headTail is definitely shorter and clearer than writing an Iterator or Spliterator . As I understand it, in the world of functional programming, such things are common. It's nice that this is possible in Java.

Program with pleasure!

Source: https://habr.com/ru/post/262139/


All Articles