📜 ⬆️ ⬇️

Java and Project Reactor. Episode 2


Hello! Surprisingly, the first part of the article even liked someone.
Special thanks for your feedback and comments. I have for you bad good news: we still have something to talk about! And more specifically, about some of the details of the work of Reactor.


I renounce magic


For further deepening in Reactor, it will not be superfluous to describe some principles of its work. What exactly is hidden from us behind the outer layer of Flux and Mono?


Reminder

Flux and Mono implement the Publisher interface.


public void subscribe(Subscriber<? super T> s); 

Official documentation suggests comparing Reactor with a pipeline . Publisher gives some data (materials). The data goes through the chain of operators (conveyor belt), processed, in the end it turns out the finished product, which is transferred to the desired Consumer / Subscriber and used there.


How do Reactor operators work? The recipe is averaged, because there are lots of variations. We will try to give a rough description.


Each operator has some tactics implementation as an object. Calling the operator for Flux / Mono returns an object that implements this operator. For example, a call to flatMap () will return an object of type FluxFlatMap (a successor of Flux).


Those. the operator is the Publisher, which, in addition to some of its logic, contains a link to the source Publisher to which it is applied. Operator calls create a chain from Publisher.


When subscribing () is called, the original Subscriber is created, it is passed back through our chain of publishers , each Publisher can wrap Subscriber into another Subscriber, thus creating another chain, which is passed to the original Publisher for execution.


It is logical that all this bears some kind of overhead, so it is recommended to refrain from writing normal (synchronous) code via Flux or Mono.


Schedulers | Planners


Reactor does not care about the execution model of your program, but it kindly provides the tools necessary to control the execution. Developer samurai free to choose your fate performance model.


The execution model and its details are determined by the implementation of the Scheduler interface (ie, the scheduler). There are static methods for a number of occasions that allow you to specify the execution context:



It is worth noting that the boxed Schedulers.single () and Schedulers.parallel () throw an IllegalStateException when trying to run a blocking operator in them: block (), blockLast (), toIterable (), toStream () . This innovation appeared in release 3.1.6.


If you still want to engage in such perversions - use Shchedulers.newSingle () and Schedulers.newParallel () . But the best practice for blocking operators is to use Schedulers.elastic () or Schedulers.newElastic ().


Scheduler instances can also be initialized from ExecutorService using Schedulers.fromExecutorService () . From the Executor is also possible, but not recommended.


Some operators from Flux and Mono run immediately on a specific Scheduler (but you can transfer your own). For example, the familiar Flux.interval () by default runs on Schedulers.parallel ().


 Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test")) 

Execution context


How to change the execution context? We need to resort to one of the already familiar to us operators:



They both take a Scheduler as an argument and allow you to change the execution context to the specified Scheduler.
But why are there two of them and what is the difference?


In the case of publishOn, this operator is applied just like any other, in the middle of a call chain. All subsequent Subscriber will be executed in the context of the specified Scheduler.


In the case of subscribeOn , the global operator is triggered immediately to the whole Subscriber chain. After the call to subscribe (), the execution context will be the specified Scheduler. Next, the context can be changed using the publishOn operator. Subsequent calls to subscribeOn are ignored.


Thanks to stackoverflow for an example. View Code


 Flux.just("a", "b", "c") //this is where subscription triggers data production //this is influenced by subscribeOn .doOnNext(v -> System.out.println("before publishOn: " + Thread.currentThread().getName())) .publishOn(Schedulers.elastic()) //the rest is influenced by publishOn .doOnNext(v -> System.out.println("after publishOn: " + Thread.currentThread().getName())) .subscribeOn(Schedulers.parallel()) .subscribe(v -> System.out.println("received " + v + " on " + Thread.currentThread().getName())); Thread.sleep(5000); 

will output the following result:


 before publishOn: parallel-1 before publishOn: parallel-1 after publishOn: elastic-2 before publishOn: parallel-1 received a on elastic-2 after publishOn: elastic-2 received b on elastic-2 after publishOn: elastic-2 received c on elastic-2 

Error processing


In Reactor, exceptions are interpreted as the terminal event (terminal event).
If an exception has occurred somewhere, then something went wrong, our pipeline stops, and the error is thrown before the final Subscriber and its onError method.


Favorite picture


Why is that? Reactor is not aware of the seriousness of the exception and has no idea what to do with it. Such situations should somehow be handled at the application level. Subscriber has an excellent onError () method for this. Reactor forces us to redefine it and somehow react to the exception, otherwise we will get UnsupportedOperationException on errors.


Refinement

To be honest, he throws out a successor to UnsupportedOperationException - ErrorCallbackNotImplemented . To understand that this is really him, there is the auxiliary static method Errors.errorCallbackNotImplemented (Throwable t) .


Try / catch philosophy


What is usually done inside a catch block in Java? Well, apart from all your favorite empty catch blocks.


  1. Static Fallback Value. Return some static default value:
     try { return fromRemoteAndUnstableSource(); } catch(Throwable e) { return DEFAULT_VALUE; } 
  2. Fallback Method. Calling an alternative method in case of an error:


     try { return fromRemoteAndUnstableSource(); } catch(Throwable e) { return loadValueFromCache(); } 

  3. Dynamic Fallback Value. Return some dynamic value depending on the exception:


     try { return fromRemoteAndUnstableSource(); } catch(Throwable e) { if (e instanceof TimeoutException) { return loadValueFromCache(); } return DEFAULT_VALUE; } 

  4. Catch and Rethrow . Wrap in some kind of domain exception and forward the exception further:


     try { return fromRemoteAndUnstableSource(); } catch(Throwable e) { throw new BusinessException(e); } 

  5. Log or React on the Side . Pledge an error and forward an exception further:


     try { return fromRemoteAndUnstableSource(); } catch(Throwable e) { logger.error(e.getMessage(), e); throw e; } 

  6. Using Resources and the Finally Block. Freeing resources in a finally block or using try-with-resources.
     try { return fromRemoteAndUnstableSource(); } catch(Throwable e) { //do nothing } finally { cleanAllStuff(); } 

The good news: all this is in Reactor in the form of equivalent operators.


Less pleasant news: in case of an error, your beautiful data sequence will end anyway (terminal event), despite the error-handling operator.
Such operators are used rather to create a new, fallback sequence for replacing a completed one.


Let's give an example:


 Flux<String> s = Flux.range(1, 10) .map(v -> doSomethingDangerous(v)) .map(v -> doSecondTransform(v)); s.subscribe(value -> System.out.println("RECEIVED " + value), error -> System.err.println("CAUGHT " + error)); 

You can compare this with a similar try / catch block:


 try { for (int i = 1; i < 11; i++) { String v1 = doSomethingDangerous(i); String v2 = doSecondTransform(v1); System.out.println("RECEIVED " + v2); } } catch (Throwable t) { System.err.println("CAUGHT " + t); } 

Note: for is interrupted!


Another example of the completion of a sequence in case of an error:


 Flux<String> flux = Flux.interval(Duration.ofMillis(250)) .map(input -> { if (input < 3) return "tick " + input; throw new RuntimeException("boom"); }) .onErrorReturn("Uh oh"); flux.subscribe(System.out::println); Thread.sleep(2100); 

On the screen we get:


 tick 0 tick 1 tick 2 Uh oh 

Implement try / catch


Static Fallback Value


Using the onErrorReturn operator:


 Flux.just(10) .map(this::doSomethingDangerous) .onErrorReturn("RECOVERED"); 

You can add a predicate so that the statement is not executed for all exceptions:


 Flux.just(10) .map(this::doSomethingDangerous) .onErrorReturn(e -> e.getMessage().equals("boom10"), "recovered10"); 

Fallback method


Using the onErrorResume operator,


 Flux.just("key1", "key2") .flatMap(k -> callExternalService(k)) //   .onErrorResume(e -> getFromCache(k)); //      

You can add a predicate so that the statement is not executed for all exceptions:


 Flux.just("timeout1", "unknown", "key2") .flatMap(k -> callExternalService(k)) .onErrorResume(TimeoutException.class, getFromCache(k)) .onErrorResume((Predicate<Throwable>) error -> error instanceof UnknownKeyException, registerNewEntry(k, "DEFAULT")); 

Similarly:


 Flux.just("timeout1", "unknown", "key2") .flatMap(k -> callExternalService(k)) .onErrorResume(error -> { if (error instanceof TimeoutException) return getFromCache(k); else if (error instanceof UnknownKeyException) return registerNewEntry(k, "DEFAULT"); else return Flux.error(error); }); 

Dynamic Fallback Value


All the same onErrorResume:


 erroringFlux.onErrorResume(error -> Mono.just( myWrapper.fromError(error); //,    ,     )); 

Catch and reethrow


You can do it in two ways. The first is with the onErrorResume operator:


 Flux.just("timeout1") .flatMap(k -> callExternalService(k)) .onErrorResume(original -> Flux.error( new BusinessException("oops, SLA exceeded", original) ); 

And more concisely - with the help of onErrorMap :


 Flux.just("timeout1") .flatMap(k -> callExternalService(k)) .onErrorMap(original -> new BusinessException("oops, SLA exceeded", original)); 

Log or React on the Side


You can add some side effect (metrics, logging) using the doOnError operator


 LongAdder failureStat = new LongAdder(); Flux<String> flux = Flux.just("unknown") .flatMap(k -> callExternalService(k)) .doOnError(e -> { failureStat.increment(); log("uh oh, falling back, service failed for key " + k); }) .onErrorResume(e -> getFromCache(k)); 

Using Resources


So, how to get an analog try-with-resources or finally block? The operator Flux.using () comes to the rescue .


First you need to familiarize yourself with the interface Disposable. It forces us to implement the dispose () method. Calling this method should cancel or complete a task or sequence. Method calls must be idempotent. Used resources must be released.


 AtomicBoolean isDisposed = new AtomicBoolean(); Disposable disposableInstance = new Disposable() { @Override public void dispose() { isDisposed.set(true); } @Override public String toString() { return "DISPOSABLE"; } }; Flux<String> flux = Flux.using( () -> disposableInstance, //   disposable -> Flux.just(disposable.toString()), // Disposable::dispose //  ); 

Repetition | Retrying


On retry, a similar behavior is observed, the original sequence is terminated (terminate event), we re-subscribe (re-subscribing) to Flux.


Let us consider an example. Code


 Flux.interval(Duration.ofMillis(250)) .map(input -> { if (input < 3) return "tick " + input; throw new RuntimeException("boom"); }) .elapsed() .retry(1) .subscribe(System.out::println, System.err::println); Thread.sleep(2100); 

will lead


 259,tick 0 249,tick 1 251,tick 2 506,tick 0 248,tick 1 253,tick 2 java.lang.RuntimeException: boom 

More complex retry logic is available using the retryWhen () operator.


Conclusion


I hope this small note has managed to shed light on some of the features of the Reactor.


Let's sum up:



Thanks for attention!


Based on Reactor documentation


It’s not a bad idea.

I am not here, but there are more worthy men, incl. and contributors / maintainers.


')

Source: https://habr.com/ru/post/359194/


All Articles