⬆️ ⬇️

Guide to background work in Android. Part 4: RxJava



Event handling is a loop.



In the last part, we talked about using thread pool executors for background work in Android. The problem with this approach turned out to be that the sending event knows how the result should be processed. Now let's see what RxJava offers.



Disclaimer: This is not an article on how to use RxJava in Android. Such texts on the Internet and so prorva. This one is about the details of the library implementation.



Generally speaking, RxJava is not even a tool specifically for working in the background, it is a tool for handling event streams.

')



Background work is only one aspect of such processing. The general idea of ​​the approach is to use Scheduler. Let's look directly at the code for this class:



public abstract class Scheduler { @NonNull public Disposable scheduleDirect(@NonNull Runnable run) { ... } @NonNull public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { ... } @NonNull public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) { ... } { @NonNull public <S extends Scheduler & Disposable> S when(@NonNull Function<Flowable<Flowable<Completable>>, Completable> combine) { ... } } 


Pretty hard, right? The good news is that you don’t have to do it yourself! The library already includes a number of such schedulers: Schedulers.io (), Schedulers.computation (), and so on. All that is required of you is to transfer the scheduler instance to the subscribeOn () / observeOn () method of your Rx chain:



 apiClient.login(auth) // some code omitted .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) 


Then RxJava will do the rest for you: take the lambdas that you pass to the operators, and execute them on the desired scheduler.



For example, if you want your observers to change the user interface, all you have to do is pass AndroidSchedulers.mainThread () to observeOn (). And the trick is: no more unnecessary connectivity, no platform-specific code, one happiness. Of course, AndroidSchedulers is not included in the original RxJava library, but is connected as part of a separate one, but this is just another line in your build.gradle.



And what's so complicated about threads? The trick is that you cannot simply place subscribeOn () / observeOn () anywhere in your rxChain (which would be convenient, right?) Instead, you have to consider how these operators get their schedulers. First, let's understand that each time you call map, or flatMap, or filter, or something, you get a new object.



For example:



 private fun attemptLoginRx() { showProgress(true) apiClient.login(auth) .flatMap { user -> apiClient.getRepositories(user.repos_url, auth) } .map { list -> list.map { it.full_name } } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .doFinally { showProgress(false) } .subscribe( { list -> showRepositories(this, list) }, { error -> Log.e("TAG", "Failed to show repos", error) } ) } 


So almost every line here creates a new object:



 // new SingleFlatMap() val flatMap = apiClient.login(auth) .flatMap { apiClient.getRepositories(it.repos_url, auth) } // new SingleMap val map = flatMap .map { list -> list.map { it.full_name } } // new SingleSubscribeOn val subscribeOn = map .subscribeOn(Schedulers.io()) // new SingleObserveOn val observeOn = subscribeOn .observeOn(AndroidSchedulers.mainThread()) // new SingleDoFinally val doFinally = observeOn .doFinally { showProgress(false) } // new ConsumerSingleObserver val subscribe = doFinally .subscribe( { list -> showRepositories(this@LoginActivity, list) }, { error -> Log.e("TAG", "Failed to show repos", error) } ) } 


And, for example, SingleMap will get its scheduler through a call chain starting with a .subscribe () call at the end of our thread:



  @SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(SingleObserver<? super T> subscriber) { ObjectHelper.requireNonNull(subscriber, "subscriber is null"); subscriber = RxJavaPlugins.onSubscribe(this, subscriber); ObjectHelper.requireNonNull(subscriber, "subscriber returned by the RxJavaPlugins hook is null"); try { subscribeActual(subscriber); } catch (NullPointerException ex) { throw ex; } catch (Throwable ex) { Exceptions.throwIfFatal(ex); NullPointerException npe = new NullPointerException("subscribeActual failed"); npe.initCause(ex); throw npe; } } 


subsribeActual is implemented for each Single-operator as follows:



 source.subscribe() 


where source is an operator preceding the current one, so a chain is created with which we work and by which we reach the first created Single. In our case, this is Single.fromCallable:



 override fun login(auth: Authorization): Single<GithubUser> = Single.fromCallable { val response = get("https://api.github.com/user", auth = auth) if (response.statusCode != 200) { throw RuntimeException("Incorrect login or password") } val jsonObject = response.jsonObject with(jsonObject) { return@with GithubUser(getString("login"), getInt("id"), getString("repos_url"), getString("name")) } } 


Inside this lambda we make our network calls.



But where is our scheduler? Here, inside SingleSubsribeOn:



 @Override protected void subscribeActual(final SingleObserver<? super T> s) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s, source); s.onSubscribe(parent); Disposable f = scheduler.scheduleDirect(parent); parent.task.replace(f); } 


In this case, the scheduler is the one we passed to the subsribeOn () method.



All this code shows how the scheduler that we passed to the chain is used by the code that we passed to the operator's lambda.



Also pay attention to the observeOn () method. He creates an instance of the class (in our case SingleObserveOn), and its subscribeActial for us already looks trivial:



 @Override protected void subscribeActual(final SingleObserver<? super T> s) { source.subscribe(new ObserveOnSingleObserver<T>(s, scheduler)); } 


But ObserveOnSingleObserver is much more interesting here:



 ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) { this.actual = actual; this.scheduler = scheduler; } @Override public void onSuccess(T value) { this.value = value; Disposable d = scheduler.scheduleDirect(this); DisposableHelper.replace(this, d); } 


When you call observeOn in the scheduler stream, it is called by observer, which in turn opens up the possibility of switching threads directly in rxChain: you can get data from the server to Schedulers.io (), then perform resource-intensive calculations in Schedulers.computation (), update the UI , count something else, and then just go to the code in the subscribe.



RxJava is a rather complicated “under the hood”, a very flexible and powerful tool for handling events (and, as a result, managing background work). But, in my opinion, this approach has its drawbacks:



  1. RxJava training takes a lot of time
  2. The number of operators that need to be learned is large, and the difference between them is not obvious.
  3. Call stack traces for RxJava are almost irrelevant to the code you write


What's next? Of course, Kotlinov Korutiny!



Previous articles in the series:





From the author: tomorrow and the day after tomorrow there will be a Mobius conference, where I will talk about Korutins at Kotlin. If you are interested in a series of articles and want to continue - it is not too late to decide on its visit!

Source: https://habr.com/ru/post/353852/



All Articles