📜 ⬆️ ⬇️

Grokayem RxJava, Part Three: Reactivity with Benefit

In the first part, we walked through the basics of RxJava. In the second part, I showed you the potential of operators. But perhaps all that I have shown is still not enough to convince you. In this case, I will show you some more RxJava utilities that should be the decisive argument in my favor.

Error processing


Until now, we completely ignored Observable methods such as onComplete() and onError() . These methods are called at the moment when the Observable stops generating new data, either because it has nothing more to generate, or because an error has occurred.
Our very first Subscriber followed onCompleted() and onError() . Let's do something useful at these points:

 Observable.just("Hello, world!") .map(s -> potentialException(s)) .map(s -> anotherPotentialException(s)) .subscribe(new Subscriber<String>() { @Override public void onNext(String s) { System.out.println(s); } @Override public void onCompleted() { System.out.println("Completed!"); } @Override public void onError(Throwable e) { System.out.println("Ouch!"); } }); 


')
Suppose that potentialException() and anotherPotentialException() can throw exceptions during operation. Each Observable terminates with a call to onCompleted() or onError . In this case, the output of the program will be either a line followed by “Completed!”, Or the output will consist of a single “Ouch!” (Because an exception was thrown).

Thus, we have several conclusions:

  1. onError() is called regardless of when the exception was thrown.
    Because of this, error handling becomes very simple: you can simply handle every error that occurs in a single function at the very end.
  2. Operators are not required to handle exceptions.
    Handling errors that occur anywhere in the Observables chain becomes a Subscriber task, because every exception should be directly in onError() .
  3. You always know when Subscriber stopped receiving new items.
    Knowing the completion point helps you write more consistent code (although it may happen that the Observable never completes its execution).

I find this approach to error handling to be much simpler compared to the traditional approach. If you write code with callback functions, then error handling should occur in each of them. This not only leads to the fact that your code begins to repeat in many places, but also to the fact that each callback function now needs to know how to handle errors, that is, it becomes strongly associated with the one who calls it.
In the case of RxJava, Observable should not even know what to do with errors! This also applies to operators: they will not be executed if at some of the previous stages we had a critical error. All error handling is in Subscriber .

Planners


You have an Android application that makes a request to the network. The request may take a long time, so you take it to another thread. Do not have time to look back and you have a problem.
Multi-threaded Android applications are difficult to write because you need to make sure that you run the correct code in the right thread; confuse something and the application will crash. The classic example is the exception that falls in response to your attempt to modify the View state from outside the main thread.
In RxJava, you can easily specify in which stream your Observer and Subscriber should run, using, respectively, subscribeOn() and observeOn() :

 myObservableServices.retrieveImage(url) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(bitmap -> myImageView.setImageBitmap(bitmap)); 

Just right? Everything that is executed before Subscriber is performed in a separate I / O stream, and the manipulations with View are already working in the main stream 1 .
The interesting thing is that subscribeOn() and observeOn() can be called on any Observable , since they are only operators. There is no need to worry about what our Observable() does, or the operators following it - you can simply add subscribeOn() and observeOn() at the very end in order to scatter tasks across the necessary streams.
If we use AsyncTask , or something similar, we need to write code, taking into account which parts of it should be executed in parallel. In the case of RxJava, we simply write the code - and then we indicate where we should execute it 2 .

Subscriptions


There is one moment that I still hid from you. When you call Observable.subscribe() , you are returned with an object of the Subscription class, which is the connection between your Observable and Subscriber :

 Subscription subscription = Observable.just("Hello, World!") .subscribe(s -> System.out.println(s)); 

In the future, you can use the Subscription received by us in order to terminate the subscription:

 subscription.unsubscribe(); System.out.println("Unsubscribed=" + subscription.isUnsubscribed()); //  "Unsubscribed=true" 

When we cancel a subscription, RxJava stops all the chain we have written, that is, in other words, if you have written a swelling chain of transformations consisting of many operators, unsubscribe will stop execution regardless of what code is being executed. 3 Nothing more is required.

Conclusion


Remember that this series of articles is just an introduction to RxJava. There is a lot of interesting and complex material, in comparison with which my introduction will seem like flowers to you (if you do not believe it, read about backpressure ). I would not write reactive code everywhere, but would save it for more complex sections of code that I would like to rewrite in a simple and understandable form.
Initially, I thought that three articles would be enough, but many asked me to show some practical examples of using RxJava in Android, so I wrote another one . I hope that my introduction was enough to convince you to try this wonderful framework. If you want to dive into the study more deeply, I recommend reading the official wiki page . And remember: the impossible does not exist .

Many thanks to all the people who helped me read this article for errors and inaccuracies: Matthias Käppler , Matthew Wear , Ulysses Popple , Hamid Palo and Joel Drotos (which is worth a look just because of his beard).



1 This is one of the reasons why Subscriber should be made as lightweight as possible: so as not to block the main stream more than necessary.
2 Sometimes, though, it’s possible to refrain from using observeOn() and subscribeOn() . For example, even if the Observable promises to work for a long time, while the Subscriber will be running on the I / O stream, there is no reason to transfer the latter to the new stream.
3 In the first part, I noticed that Observable.just() is not the same as the self-written Observable , which calls onNext() and onCompleted() . And it's about the subscriptions: in the case of Observable.just() before the onNext() call, it checks whether Subscriber is still signed or not.

Source: https://habr.com/ru/post/265727/


All Articles