📜 ⬆️ ⬇️

Rx. We comprehend retryWhen and repeatWhen on examples from Android development

There are a lot of Russian and English-language articles on Rx operators retryWhen and repeatWhen.
Despite this, I often encounter a reluctance to use them (due to complex syntax and incomprehensible diagrams).

I will give a few examples of how to effectively restart sections of a chain with their help and delegate the processing of restarts in case of errors and flow terminations.

In the examples there will be Java code with lambdas (Retrolamda), but it will not be difficult to rewrite it to Kotlin or pure Java.

Imperative way to restart the chain


Suppose we use Retrofit and start loading in the load () method. Repository.getSomething () returns Single <Something> ().
')
@NonNull private Subscription loadingSubscription = Subscriptions.unsubscribed(); private void load() { subscription.unsubscribe(); subscription = repository .getSomething() .subscribe(result -> {}, err -> {}); } private void update() { load(); } 

From some update manager (eg PullToRefreshView) we call the update () method, which, in turn, will call the load () method, where the subscription will be created from scratch.

I offer to your attention the option of using a more reactive, in my opinion, method with the above-mentioned operator repeatWhen () .

Reactive way to restart the chain - repeatWhen


Create a PublishSubject updateSubject object and pass in a lambda operator
repeatHandler -> repeatHandler.flatMap (nothing -> updateSubject.asObservable ())

 @NonNull private final PublishSubject<Void> updateSubject = PublishSubject.create(); private void load() { repository .getSomething() .repeatWhen(repeatHandler -> repeatHandler.flatMap(nothing -> updateSubject.asObservable())) .subscribe(result -> {}, err -> {}); } 

Now, to update the loaded data, you need to null in updateSubject .

 private void update() { updateSubject.onNext(null); } 

It must be remembered that such a reactive method works only with Single, which calls onComplete () immediately after the emite of a single element (will work with the Observable, but only after the end of the stream).

Reactive error handling retryWhen


Similarly, you can handle errors. Suppose the user has lost the network, which will lead to an error and an onError () call inside Single, which is returned by the getUser () method.

At this point, you can show the user a dialog with the text “Check connection”, and by pressing the OK button, call the retry () method.

 @NonNull private final PublishSubject<Void> retrySubject = PublishSubject.create(); private void load() { repository .getSomething() .doOnError(err -> showConnectionDialog()) .retryWhen(retryHandler -> retryHandler.flatMap(nothing -> retrySubject.asObservable())) .subscribe(result -> {}, err -> {}); } private void retry() { retrySubject.onNext(null); } 

By calling retrySubject.onNext (null), the entire chain above retryWhen () will re-subscribe to the getUser () source and retry the request.

With this approach, it is important to remember that doOnError () must be higher in the chain than retryWhen () , since the latter “absorbs” errors before the repeat of the repeat Handler.

In this particular case, there will be no gain in performance, and the code has become even a little more, but these examples will help to start thinking with reactive patterns.

In the following example, shamelessly far-fetched, an example, in the load () method, we combine two sources with the combineLatest operator.

The first source - repository.getSomething () loads something from the network, the second, localStorage.fetchSomethingReallyHuge () , loads something heavy from the local storage.

 public void load() { Observable.combineLatest(repository.getSomething(), localStorage.fetchSomethingReallyHuge(), (something, hugeObject) -> new Stuff(something, hugeObject)) .subscribe(stuff -> {}, err -> {}); } 

When processing an error in an imperative way, calling load () on each error, we will re-subscribe to both sources, which, in this example, is absolutely unnecessary. In case of a network error, the second source successfully returns data, an error will occur only in the first one. In this case, the imperative method will also be slower.

Let's see what the reactive method will look like.

 public void load() { Observable.combineLatest( repository.getSomething() .retryWhen(retryHandler -> retryHandler.flatMap( err -> retrySubject.asObservable())), localStorage.fetchSomethingReallyHuge() .retryWhen(retryHandler -> retryHandler.flatMap( nothing -> retrySubject.asObservable())), (something, hugeObject) -> new Stuff(something, hugeObject)) .subscribe(stuff -> {}, err -> {}); } 

The beauty of this approach is that the lambda transmitted to the operator retryWhen () is executed only after an error inside the source, respectively, if only one of the sources “ makes a mistake”, then the subscription will occur only to it, and the rest of the chain below will wait for overrun.

And if an error occurs inside both sources, then the same retryHandler will work in two places.

Error Handling Delegation


The next step is to delegate replay processing to a certain RetryManager. Before that, you can still prepare a little for the move to Rx2 and remove from our streams null objects that are prohibited in Rx2. To do this, you can create a class:

 public class RetryEvent { } 

Without anything. Later there you can add different flags, but that's another story. The RetryManager interface might look something like this:

 interface RetryManager { Observable<RetryEvent> observeRetries(@NonNull Throwable error); } 

An implementation can check for errors, show dialogs, snack bars, set a silent timeout - whatever your heart desires. And listen to callbacks from all of these UI components in order to subsequently retry RetryEvent to our retryHandler.

The previous example using this RetryManager will look like this:

 //pass this through constructor, DI or use singleton (but please don't) private final RetryManager retryManager; public void load() { Observable.combineLatest( repository.getSomething() .retryWhen(retryHandler -> retryHandler.flatMap( err -> retryManager.observeRetries())), localStorage.fetchSomethingReallyHuge() .retryWhen(retryHandler -> retryHandler.flatMap( nothing -> retryManager.observeRetries())), (something, hugeObject) -> new Stuff(something, hugeObject)) .subscribe(stuff -> {}, err -> {}); } 

In such a simple way, repetition processing for errors is delegated to a third-party entity that can be passed as a dependency.

I hope these examples will be useful to someone and will tempt you to try repeatWhen () and retryWhen () in your projects.

Source: https://habr.com/ru/post/326890/


All Articles