📜 ⬆️ ⬇️

Compare Java 8, RxJava, Reactor

From the translator:
I have prepared an adapted translation for you with some additions and corrections. I have kept a few propaganda style of the original article, but, in itself, the information in it is interesting, so I decided, nevertheless, to translate.

People often ask me:
Why would I need to use RxJava or Reactor at all if the same can be done with Streams, CompletableFutures and Optionals?


image


The problem, in fact, is that most of the time you are engaged in solving simple problems and you really do not need these libraries. But when things get complicated, you have to write some ugly piece of code. Then this piece of code becomes more and more complex and difficult to maintain. RxJava and Reactor have many convenient features that will satisfy your needs for many years to come.


Let's define 8 criteria that will help us understand the difference between these libraries and standard Java features:


  1. Composable
  2. Lazy (Delayed / Lazy)
  3. Reusable
  4. Asynchronous
  5. Cacheable (Cached)
  6. Push or Pull (Recipients or Takers)
  7. Backpressure
  8. Operator fusion (Supporting operator fusion)

And let's choose the classes that we will compare:


  1. CompletableFuture
  2. Stream
  3. Optional
  4. Observable (RxJava 1)
  5. Observable (RxJava 2)
  6. Flowable (RxJava 2)
  7. Flux (Reactor Core)

Ready? Gathered, drove!


Composable


All these classes are composable and allow you to think functionally ( corrected by the author's typo - comment. Per. ). For this we love them.


CompletableFuture is a set of .then*() methods that allow you to build a chain in which either nothing or a single + throwable value is passed from stage to stage.


Stream - a bunch of concatenated operators that allow you to convert input data. Can transmit N values ​​from stage to stage.


Optional is a pair of intermediate operators: .map() , .flatMap() , .filter() .


Observable, Flowable, Flux - similar to Stream .


Lazy


CompletableFuture is not lazy, as it simply stores asynchronous results. Such objects are created to represent work that has already begun. ( Corrected on the numbers - approx. per. ) . They know nothing about the work, but the result is known. Thus, there is no way to go upstream and run the chain from top to bottom. The next stage starts when the value is set in the CompletableFuture .
(the conclusion is correct, but the reasoning is controversial. In fact, the CompletableFuture not lazy, because the search and setting its values ​​begin even before we turn to it for the result - comment. per. )


Stream - all intermediate operations are lazy. All final operations start the computational process.


Optional - not lazy, all operations take place immediately.


Observable, Flowable, Flux - nothing happens until there is a subscriber (Subscriber).


Reusable


CompletableFuture can be reusable, as it is just a wrapper over the value. But you need to use it carefully, as this wrapper is mutable. If you are sure that no one will call .obtrude*() , then it is safe.


Stream is not reusable. As stated in JavaDoc :


Flow operations (intermediate or final) should be performed only once. An implementation of a thread can throw an IllegalStateException if it detects that the thread is being reused. However, since some stream operations can return their recipient, rather than a new object of the Stream class, it is not possible to detect reuse in all cases.

Optional is completely reusable because it is immutable and all work happens immediately.


Observable, Flowable, Flux - are designed to be reused. All stages start from the starting point and only when there is a subscriber.


Asynchronous


CompletableFuture - well, the whole point of this class is to link operations asynchronously. CompletableFuture personifies the work associated with some Executor . If you do not explicitly specify Executor when creating the task, the usual ForkJoinPool . This pool can be obtained using ForkJoinPool.commonPool() , and by default it creates as many threads as there are hardware threads in your system (as a rule, the number of cores, and twice as many if the kernel supports HyperThreading). However, you can set the number of threads in this pool with the jvm parameter
-Djava.util.concurrent.ForkJoinPool.common.parallelism=?
or use the new Executor every time you create a work step.


Stream - there is no possibility of asynchronous processing, but it can perform computations in parallel, creating a parallelized stream — stream.parallel() .


Optional - Nope, this is just a container.


Observable, Flowable, Flux - although aimed at building asynchronous systems, but synchronous by default. The subscribeOn and observeOn allow you to control the subscription registration and the receipt of notifications (i.e. which thread will call onNext / OnError / OnCompleted with an observer).


With subscribeOn you decide on which Scheduler Observable.create run. Even if you do not call create yourself, there is an internal equivalent. Example:


 Observable .fromCallable(() -> { log.info("Reading on thread: " + currentThread().getName()); return readFile("input.txt"); }) .map(text -> { log.info("Map on thread: " + currentThread().getName()); return text.length(); }) .subscribeOn(Schedulers.io()) // <-- setting scheduler .subscribe(value -> { log.info("Result on thread: " + currentThread().getName()); }); 

At the exit:


 Reading file on thread: RxIoScheduler-2 Map on thread: RxIoScheduler-2 Result on thread: RxIoScheduler-2 

On the other hand, observeOn() controls which Scheduler used to invoke subsequent steps following observeOn() . Example:


 Observable .fromCallable(() -> { log.info("Reading on thread: " + currentThread().getName()); return readFile("input.txt"); }) .observeOn(Schedulers.computation()) // <-- setting scheduler .map(text -> { log.info("Map on thread: " + currentThread().getName()); return text.length(); }) .subscribeOn(Schedulers.io()) // <-- setting scheduler .subscribe(value -> { log.info("Result on thread: " + currentThread().getName()); }); 

At the exit:


 Reading file on thread: RxIoScheduler-2 Map on thread: RxComputationScheduler-1 Result on thread: RxComputationScheduler-1 

Cacheable


What is the difference between reusable and cached? Suppose we have the chain A , and we reuse it twice to create the chains B = A + O and C = A + O


If B & C succeeds, then the class is reused.
If the B & C completed successfully and each stage of the chain A is called only once, then the class is cached. To be cached, the class must be reusable.


CompletableFuture is the same answer as for reusability.


Stream - there is no possibility to cache the intermediate result until the final statement is called.


Optional is “cacheable” because all the work happens right away.


Observable, Flowable, Flux - not cached by default. But you can make A cachable by calling it .cache() .


 Observable<Integer> work = Observable.fromCallable(() -> { System.out.println("Doing some work"); return 10; }); work.subscribe(System.out::println); work.map(i -> i * 2).subscribe(System.out::println); 

At the exit:


 Doing some work 10 Doing some work 20 

With .cache() :


 Observable<Integer> work = Observable.fromCallable(() -> { System.out.println("Doing some work"); return 10; }).cache(); // <- apply caching work.subscribe(System.out::println); work.map(i -> i * 2).subscribe(System.out::println); 

At the exit:


 Doing some work 10 20 

Push or pull


Stream & Optional work on the Pull principle. The result is taken from the chain by calling various methods ( .get() , .collect() , etc.). Pull is often associated with blocking, synchronous execution, and this is true. You call a method and the thread starts waiting for the data to arrive. Until then, the thread is blocked.


CompletableFuture, Observable, Flowable, Flux work on the principle of Push. The chain is signed and then notified when something needs to be processed. Push is often associated with non-blocking asynchronous execution. You can do anything, while the chain runs in any thread. You have already described the code for execution, so the notification will trigger the execution of this code in the next step.


Backpressure


In order to be able to restrain the flow, the chain must be built on the principle of Push.

Constraining a flow is a situation in a chain where some asynchronous stages cannot process values ​​quickly enough and they need a way to turn up the chain, asking them to be slower. The situation is unacceptable when a failure occurs at some stage, because there is too much data ( the ambiguity of the author’s wording is preserved ).


image



Observable (RxJava 1), Flowable, Flux - solve this problem. The main strategies are:



Observable (RxJava 2) - does not solve this problem. Many users of RxJava 1 used Observable for events that are unreasonable to restrain, or did not use any strategies, which caused unexpected exceptions. Therefore, in RxJava 2, there is a clear separation between the Flowable and Observable classes.


Operator Fusion


The idea is to change the chain at different points in the entire life cycle in order to reduce the complexity created by the library architecture. All these optimizations are done internally, so for the end user everything remains clear.


Only RxJava 2 & Reactor supports operator merging, but in a slightly different way. In general, there are 2 types of optimizations:



image



image


... the subscriber may request the value from the parent Observable:


image


More information can be found here: Part 1 & Part 2


Conclusion


imagecomparison


Stream , CompletableFuture and Optional were created to solve specific problems. And they are really good at solving these problems. If they are enough to meet your needs, go ahead.


However, different problems have different complexity and some of them require new approaches. RxJava & Reactor are versatile tools to help you solve your tasks in a declarative style, rather than creating “hacks” using tools that were not designed to solve such problems.


')

Source: https://habr.com/ru/post/327460/


All Articles