📜 ⬆️ ⬇️

Understanding multithreading in RxJava


When describing the benefits of RxJava, they always mention the convenience of organizing a multi-threaded application by means of RxJava. How to use the subscribeOn and observeOn operators can be found in almost every article on the basics of RxJava. For example, there are well described cases when using the subscribeOn methods and when obserOn. However, in practice, one often encounters problems that require a deeper understanding of what the subscribeOn and observeOn methods do. In this article I would like to consider a number of issues that sometimes arise when using these operators.

You can study the nuances of RxJava in several ways: by documentation (which is very detailed), by source code or in practice. I chose the latter method. To do this, I sketched a couple of tests, according to which I could better understand asynchronous reactive programming.

First, to check the work of the flow change, I used the following code:
Hidden text
private void testSchedulersTemplate(Observable.Transformer<String, String> transformer) { Observable<String> obs = Observable .create(subscriber -> { logThread("Inside observable"); subscriber.onNext("Hello from observable"); subscriber.onCompleted(); }) .doOnNext(s -> logThread("Before transform")) .compose(transformer) .doOnNext(s -> logThread("After transform")); TestSubscriber<String> subscriber = new TestSubscriber<>(new Subscriber<String>() { @Override public void onCompleted() { logThread("In onComplete"); } @Override public void onError(Throwable e) {} @Override public void onNext(String o) { logThread("In onNext"); } }); obs.subscribe(subscriber); subscriber.awaitTerminalEvent(); } 


Let's check how this code works without any transformations:
  testSchedulersTemplate(stringObservable -> stringObservable); 

Result:
Inside observable: main
Before transform: main
After transform: main
Inside doOnNext: main
In onNext: main
In onComplete: main

As expected, no change of flow.
')

1. ObserveOn and SubscribeOn


SubscribeOn
How to understand from the documentation reactivex.io/documentation/operators/subscribeon.html
With this operator, you can specify the Scheduler in which the Observable process will run.
Checking:
 testSchedulersTemplate(stringObservable -> stringObservable.subscribeOn(Schedulers.io())); 

Result:
Inside observable: RxCachedThreadScheduler-1
Before transform: RxCachedThreadScheduler-1
After transform: RxCachedThreadScheduler-1
Inside doOnNext: RxCachedThreadScheduler-1
In onNext: RxCachedThreadScheduler-1
In onComplete: RxCachedThreadScheduler-1

Beginning with the execution of the Observable content and until the result was obtained, all methods were executed in the stream created by Schedulers.io ().

Observeon
The documentation for this method says that the use of this operator causes subsequent operations on the “radiated” data to be performed using the Scheduler passed to this method.

Checking:
 testSchedulersTemplate(stringObservable -> stringObservable.observeOn(Schedulers.io())); 

Result:
Inside observable: main
Before transform: main
After transform: RxCachedThreadScheduler-1
Inside doOnNext: RxCachedThreadScheduler-1
In onNext: RxCachedThreadScheduler-1
In onComplete: RxCachedThreadScheduler-1

As expected, from the moment the observeOn method is applied, the stream in which the data is processed will be changed to the one that the specified Scheduler will allocate to it.

Let's combine the use of subscribeOn and observeOn:
 testSchedulersTemplate(stringObservable -> stringObservable .subscribeOn(Schedulers.computation()) .observeOn(Schedulers.io())); 

Result:
Inside observable: RxComputationThreadPool-3
Before transform: RxComputationThreadPool-3
After transform: RxCachedThreadScheduler-1
Inside doOnNext: RxCachedThreadScheduler-1
In onNext: RxCachedThreadScheduler-1
In onComplete: RxCachedThreadScheduler-1

The methods performed before applying the observeOn operator were performed in the Scheduler specified in subscribeOn, and after in the scheduler specified in observeOn.

By combining these two methods, you can achieve asynchronous downloading of data from the Internet and display them on the screen in the main application stream.

But what happens if you apply these methods several times?
To begin, call observeOn several times:
 testSchedulersTemplate(stringObservable -> stringObservable .observeOn(Schedulers.computation()) .doOnNext(str -> logThread("Between two observeOn")) .observeOn(Schedulers.io())); 

Inside observable: main
Before transform: main
Between two observeOn: RxComputationThreadPool-3
After transform: RxCachedThreadScheduler-1
Inside doOnNext: RxCachedThreadScheduler-1
In onNext: RxCachedThreadScheduler-1
In onComplete: RxCachedThreadScheduler-1

No surprises. After using observeOn, the elements are processed using the specified Scheduler.

Now call subscribeOn several times.
 testSchedulersTemplate(stringObservable -> stringObservable .subscribeOn(Schedulers.computation()) .doOnNext(str -> logThread("Between two observeOn")) .subscribeOn(Schedulers.io())); 

Result:
Inside observable: RxComputationThreadPool-1
Before transform: RxComputationThreadPool-1
Between two observeOn: RxComputationThreadPool-1
After transform: RxComputationThreadPool-1
Inside doOnNext: RxComputationThreadPool-1
In onNext: RxComputationThreadPool-1
In onComplete: RxComputationThreadPool-1

As you can see, the use of the second subscribeOn has not led to any changes. But is it completely useless?

Add a statement between calls to subscribeOn:
 .lift((Observable.Operator<String, String>) subscriber -> { logThread("Inside lift"); return subscriber; }) 

Get the first message in the log:
Inside lift: RxCachedThreadScheduler-1

RxCachedThreadScheduler-1 is the one that was obtained from Schedulers.io () specified in the second call to subscribeOn.

lift () is an operator with which you can transform a subscription.
You can schematically describe the subscription process as follows:
User subscribes to observable by passing subscription.
This subscription is delivered to the root observable, and it can be converted with the help of operators.
Subscription is passed to observable, sent onNext, onComplete, onError.
Transformations are performed on the generated elements.
The converted elements fall into the onNext of the original subscriber.
Thus, when a subscription is delivered to observable, you can change the stream using subscribeOn. And when elements are delivered from observable to subscription - it is affected by observeOn.
In order to illustrate this, consider the code:
 Observable.create(subscriber -> { ... }) .map(val-> val*2) .subscribe(val -> Log.d(TAG, “onNext “ + val)); 

The subscriber created in the last line is passed to the Observable created with Observable.create (). Inside the map statement, the lift statement is called, where Operation is passed, which decorates Subscriber during the subscription. When the Observable emits data, it falls into the decorated Subscriber. Decorated Subscriber modifies the data and sends it to the original Subscriber.
Without a Scheduler change, the entire process will be executed in the thread in which the subscribe method is called. Further, while Subscriber is being decorated, with the help of subscribeOn you can change the stream in which the next decoration will be performed. In the call () method of the OnSubscribe interface, the latest Scheduler specified in SubscribeOn will be used. After the data has been radiated, the Scheduler is already changing using onserveOn.

2. Perform tasks in parallel.


Consider the following case:
It is necessary to download various information from the server, then compile it and display it on the screen. At the same time, to speed up the process, it is worth loading data in parallel (if there is such an opportunity). If we did not have RxJava, then this task would require considerable effort. But with reactive programming, this task is trivial.

We will perform three tasks, each of which waits for 1 second, and then sends a message in a subscription. Then, using the combineLatest operator, all messages will be merged and transferred to the subscription.

For verification, we will use the following code:
Hidden text
 private void template(Observable.Transformer<String, String> transformer, Observable.Transformer<String, String> firstObsTransformer, Observable.Transformer<String, String> secondObsTransformer, Observable.Transformer<String, String> thirdObsTransformer) { Observable<String> obs = Observable.combineLatest(createObservable("Observable1", firstObsTransformer), createObservable("Observable2", secondObsTransformer), createObservable("Observable3", thirdObsTransformer), (s, s2, s3) -> { logThread("Inside combining result"); return s + s2 + s3; }) .doOnNext(s -> logThread("Before transform")) .compose(transformer) .doOnNext(s -> logThread("After tranform")); TestSubscriber<String> subscriber = new TestSubscriber<>(new Subscriber<String>() { @Override public void onCompleted() { logThread("In onComplete"); } @Override public void onError(Throwable e) {} @Override public void onNext(String o) { logThread("In onNext"); } }); obs.subscribe(subscriber); subscriber.awaitTerminalEvent(); } private Observable<String> createObservable(final String name, Observable.Transformer<String, String> transformer) { Observable<String> result = Observable.create(subscriber -> { logThread("Inside " + name); sleep(1000); subscriber.onNext(name); subscriber.onCompleted(); }); if (transformer != null) { return result.compose(transformer); } return result; } 


To begin, run the test without any transformations:
 template(stringObservable -> stringObservable, null, null, null); 

Result:
Inside Observable1: main
Inside Observable2: main
Inside Observable3: main
Inside combining result: main
Before transform: main
After tranform: main
In onNext: main
In onComplete: main

As you can see, everything is performed in one thread. Our three tasks are performed sequentially.

Add subscribeOn and observeOn for observable, obtained using the zip function.
 template(stringObservable -> stringObservable.subscribeOn(Schedulers.io()) .observeOn(Schedulers.newThread()), null, null, null); 

Result:
Inside Observable1: RxCachedThreadScheduler-1
Inside Observable2: RxCachedThreadScheduler-1
Inside Observable3: RxCachedThreadScheduler-1
Inside combining result: RxCachedThreadScheduler-1
Before transform: RxCachedThreadScheduler-1
After tranform: RxNewThreadScheduler-1
In onNext: RxNewThreadScheduler-1
In onComplete: RxNewThreadScheduler-1

Everything is as it was described in the previous part of the article about subscribeOn and observeOn.

Now we will carry out each of the tasks in our thread. For this it is enough to specify Scheduler.io (), since inside it contains a pool of threads, optimal for loading data.
 Observable.Transformer<String, String> ioTransformer = stringObservable -> stringObservable.subscribeOn(Schedulers.io()); template(stringObservable -> stringObservable.subscribeOn(Schedulers.newThread()) .observeOn(Schedulers.computation()), ioTransformer, ioTransformer, ioTransformer); 

Result:
Inside Observable1: RxCachedThreadScheduler-1
Inside Observable2: RxCachedThreadScheduler-2
Inside Observable3: RxCachedThreadScheduler-3
Inside combining result: RxCachedThreadScheduler-3
Before transform: RxCachedThreadScheduler-3
After tranform: RxComputationThreadPool-3
In onNext: RxComputationThreadPool-3
In onComplete: RxComputationThreadPool-3

We achieved what we wanted - our three tasks were performed in parallel.

3. Operators with Schedulers.


In the previous chapter, the delay () operator would be great for emulating long tasks, but the problem is that this operator is not as simple as it might seem at first glance.
There are a number of operators that require specifying a Scheduler for their work. Moreover, there are overloaded versions of them, which use computation () as the Scheduler. delay () is an example of such an operator:
 TestSubscriber<Integer> subscriber = new TestSubscriber<>(); Observable.just(1).delay(1, TimeUnit.SECONDS).subscribe(subscriber); subscriber.awaitTerminalEvent(); Logger.d("LastSeenThread: " + subscriber.getLastSeenThread().getName()); 

Although we did not specify any Scheduler, the result will be as follows:
LastSeenThread: RxComputationThreadPool-1

In order to avoid the use of computation scheduler, the third parameter is enough to pass the required scheduler:
.delay (1, TimeUnit.SECONDS, Schedulers.immediate ())

Note: Schedulers.immediate () - performs the task in the same thread in which the previous task was performed.

Result:
LastSeenThread: main

In addition to delay (), there are other operators that can themselves change Scheduler: interval (), timer (), some overloads buffer (), debounce (), skip (), take (), timeout (), and some others.

4. Subjects.


When using Subjects, it is worth considering that by default the chain of changes to the data sent to the onNext subject will be executed in the same thread in which the onNext () method was called. Until the operator observeOn occurs in the chain of transformations.
But apply subscribeOn so simply will not work.

Consider the following code:
  BehaviorSubject<Object> subject = BehaviorSubject.create(); subject .doOnNext(obj -> Logger.logThread("doOnNext")) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.newThread()) .subscribe(new Subscriber<Object>() { @Override public void onCompleted() { Logger.logThread("onComplete"); } @Override public void onError(Throwable e) { } @Override public void onNext(Object o) { Logger.logThread("onNext"); } }); subject.onNext("str"); Handler handler = new Handler(); handler.postDelayed(() -> subject.onNext("str"), 1000); handler.postDelayed(() -> subject.onNext("str"), 2000); 

Both observeOn and subscribeOn are indicated here, but the result will be as follows:
doOnNext: RxCachedThreadScheduler-1
onNext: RxNewThreadScheduler-1
doOnNext: main
onNext: RxNewThreadScheduler-1
doOnNext: main
onNext: RxNewThreadScheduler-1

Those. when we subscribe to subject, it immediately returns the value and it is processed by the stream from Shedulers.io (), but when the next message arrives in subject, the stream in which onNext () was called is used.

Therefore, if you, after receiving the object from subject, start some kind of long operation, then you must explicitly put observeOn between them.

5. Backpressure


In this article, it is impossible not to mention such a concept as backpressure. MissingBackpressureException - an error that spoiled my nerves quite a bit. I will not retell here what can be read in the official RxJava wiki: github.com/ReactiveX/RxJava/wiki/Backpressure . But if you are actively using RxJava, then you should definitely read about backpressure.
When you have in an application there is some data producer in one stream and some consumer in another, then you should take into account the situation when the consumer will not have time to process the data. In such a situation, you will be helped by the operators described in the link provided.

Conclusion


RxJava makes it very convenient to manage the execution of tasks in different threads. But when using it, it is worth knowing very well what exactly subscribeOn, observeOn do, as well as how different operators behave.
You should carefully study the documentation for the operators that you use - they indicate in which Scheduler the statement is executed. It is also worth being careful with the Subject. And do not forget about backpressure.
It is also worth considering one of the tips that Ben Christensen (@benjchristensen) once gave - one of the main authors of RxJava:
“It can be asynchronously delivered, even though it can be synchronized.”

“For the subscriber, it makes sense to assume that the data is delivered asynchronously, even in cases where it can be delivered synchronously.”

Link to the source from the article: github.com/HotIceCream/GrokkingRxSchedulers

Source: https://habr.com/ru/post/280388/


All Articles