📜 ⬆️ ⬇️

Multi-threaded programming in Android using RxJava 2

If you are new to RxJava or tried to figure it out, but didn’t get it done, you’ll find something new for yourself below.

image
The original article was written on November 29, 2017. Translation is free.

At GO-JEK, we need to perform a large number of asynchronous operations in applications and we cannot afford to compromise at the expense of the speed and smoothness of the user interface.

Writing complex multi-threaded Android applications can be quite time-consuming process, which from time to time will overwhelm you greatly due to the need to take care of a large number of things connected with each other. This and many other reasons convinced us to use RxJava in Android-developed applications.
')
In this article we will talk about how we used real multi-threading capabilities in RxJava in order to make the application development process as simple, easy and fun as possible. In all the code examples below, RxJava 2 will be used, but the concepts described can be applied in other reactive extensions .

Why reactive programming?


Each article about reactive programming begins with such a mandatory block and we will not break this tradition. There are several advantages of using a reactive approach to building Android applications, let's pay attention to those that you really need.

No more callbacks


If you have been developing for Android for a long time, then you must have noticed how quickly things get too complicated and out of control using nested callbacks.

This happens when you perform several asynchronous operations sequentially and you want further actions to depend on the result of previous operations. Almost immediately, the code becomes too overloaded and difficult to maintain.

Simple error control


In the imperative world, in a situation where many complex asynchronous operations are performed, errors can occur in a large number of places. And in every place you have to handle these errors, as a result, a lot of duplicate template code appears, the methods become cumbersome.

Very simple use of multithreading.


We all know (and secretly recognize) how difficult it can sometimes be to work with multithreading in Java. For example, executing part of the code in a background thread and returning the result back to the main thread. It only sounds simple, but in practice there are many pitfalls that need to be circumvented.

RxJava makes it incredibly easy to perform several complex operations in any thread of your choice, taking care of correct synchronization and allowing you to easily switch between threads.

The benefits of RxJava are endless. We can talk about it with hours and hellishly tire you, but instead let's dig deeper and start exploring the real work with multithreading in RxJava.

RxJava is NOT multi-threaded by default.


Yes, you read it right. RxJava is not multi-threaded by default anyway. The definition given for RxJava on the official website looks something like this:
"A library for creating asynchronous and event-based programs using sequences (observable sequences) for a Java virtual machine."

After seeing the word "asynchronous", many people mistakenly believe that RxJava is multi-threaded by default. Yes, RxJava supports multithreading, offers many powerful features for easy work with asynchronous operations, but this does not mean that the default behavior of RxJava is multi-threaded.

If you have already worked a little with RxJava, then you know its basic constructs:


Observable.just(1, 2, 3, 4, 5) .doOnNext(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { println("Emitting item on: " + currentThread().getName()); } }) .map(new Function<Integer, Integer>() { @Override public Integer apply(@NonNull Integer integer) throws Exception { println("Processing item on: " + currentThread().getName()); return integer * 2; } }) .subscribeWith(new DisposableObserver<Integer>() { @Override public void onNext(@NonNull Integer integer) { println("Consuming item on: " + currentThread().getName()); } @Override public void onError(@NonNull Throwable e) { } @Override public void onComplete() { } }); 

If you run this sample code, you will clearly see that all actions are performed in the main application thread (follow the names of the streams in the log in the console). This example shows that the default behavior of RxJava is blocking. Everything is executed in the same thread in which the code is called.

Bonus: I wonder what does doOnNext() do? This is nothing more than a side-effect operator. It helps infuse objects into a chain of observable objects and perform impure operations. For example, implement additional code in the call chain for debugging. Read more here .

Simple example


In order to start working with multithreading using RxJava, you need to become familiar with base classes and methods, such as Schedulers , observeOn / subscribeOn .

Let's look at one of the simplest examples. Suppose we want to get a list of Book objects by a network request and show it in the main application flow. A pretty general and clear example to begin with.

 getBooks().subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableObserver<Book>() { @Override public void onNext(@NonNull Book book) { //       Book  } @Override public void onError(@NonNull Throwable e) { //    } @Override public void onComplete() { //   Book . ! } }); 


Here we see the getBooks() method, which makes a network call and compiles a list of books for us. The network call takes time (a few milliseconds or seconds), so we use subscribeOn() and specify the Schedulers.io() scheduler to perform the operation in the I / O stream.

We also use the observeOn() operator together with the AndroidSchedulers.mainThread() scheduler to process the result in the main thread and show the list of books in the application's user interface.

Do not worry, soon we will move on to more advanced things. This example was intended only to recall basic concepts, before diving deeper.

Make friends with schedulers (Schedulers)


RxJava provides a powerful set of schedulers. You cannot directly access or control streams. If you need to work with threads, you need to use the built-in schedulers.

You can think of schedulers as threads or thread pools (a collection of threads) for performing various kinds of tasks .

Simply put, if you need to perform a task in a separate thread, you need to use the correct scheduler, which will take the stream from its pool of available threads and perform the task in it.

There are several types of schedulers available in RxJava. The hardest part is choosing the right planner for your task. A task will never be executed optimally unless you select the correct scheduler. Let's break down every scheduler.

Schedulers.io ()


This scheduler is based on an unrestricted pool of threads and is used for intensive work with I / O without using CPU , for example, access to the file system, making network calls, access to the database, and so on. The number of threads in this scheduler is unlimited and can grow as needed.

Schedulers.computation ()


This scheduler is used to do work that loads the CPU heavily , such as processing large amounts of data, images, and so on. The scheduler is based on a limited pool of threads with a size of the number of available processors.
Since this scheduler is only suitable for intensive work with the CPU - the number of threads is limited. This is done so that the threads do not compete for CPU time and do not stand idle.

Schedulers.newThread ()


This scheduler creates a completely new thread with each call. In this case, the use of a pool of threads will not bring any benefits. Threads are very expensive to create and destroy. You must be careful not to abuse the excessive creation of threads, as this can lead to system slowdown and memory overflow. A new stream will be created to process each item received from the observable source .
Ideally, you should use this scheduler quite rarely, mainly to bring the long-running parts of the program to a separate stream.

Schedulers.single ()


This scheduler is based on a single thread that is used to execute tasks consistently. It can be very useful when you have a set of background tasks in different places in your application, but you cannot allow more than one of these tasks to be executed simultaneously.

Schedulers.from (Executor executor)


This scheduler will be based on your own Executor . There may be a situation in which it will be necessary to perform certain tasks in the scheduler based on its own flow distribution logic.

Suppose you want to limit the number of concurrent network calls your application makes. You can create your own scheduler that will work on the basis of a limited thread pool ( Scheduler.from(Executors.newFixedThreadPool(n)) ) and use it in all places related to network calls.

AndroidSchedulers.mainThread ()


This is a special scheduler that is not available in the RxJava library. You must use the extensible RxAndroid library to access this scheduler. This scheduler is useful in Android applications for performing actions in the UI thread .
By default, this scheduler queues jobs in the Looper associated with the main thread, but there is a possibility of overriding: AndroidSchedulers.from(Looper looper) .

Note: Be careful about using schedulers based on unrestricted thread pools, such as Schedulers.io() . There is always the risk of an endless increase in the number of threads.

Understanding subscribeOn () and observeOn ()


Now that you have an idea of ​​the types of schedulers, let 's sort out subscribeOn () and observeOn () in detail.

You need to have a deep understanding of how these two operators work separately and together in order to work professionally with multithreading in RxJava.

subscribeOn ()


In simple words, this operator tells which stream the source observable will transmit elements to . You must understand the importance of the word "source . " When you have a chain of observable elements (observables) , the source (source observable) is always the root element or the upper part of the chain from which the events are created.

As you have already seen, if you do not use subscribeOn() , then all events occur in the stream in which the code call occurred (in our case, the main stream).

Let's redirect events to the compute thread with subscribeOn() and the Schedulers.computation() scheduler. When you run the following code example, you will see that events occur in one of the compute threads available in the pool, RxComputThreadPool-1 .

In order to shorten the code, we will not completely override all the methods of DisposableSubscriber , since we do not need to override onError() and onComplete() . We use doOnNext() and lambdas.

 Observable.just(1, 2, 3, 4, 5, 6) .subscribeOn(Schedulers.computation()) .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName())) .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName())); 

It doesn't matter where in the call chain you use subscribeOn() . It works only with the observable source (source observable) , and controls to which stream the observable source transmits events.

In the following example, after the observable source, other observable objects are created (using map() and filter() methods, and the subscribeOn() operator is placed at the end of the call chain. But as soon as you run this code, you will notice that all events will occur in the stream specified in subscribeOn() . This will become clearer when adding observeOn() to the call chain. And even if we place the subscribeOn() below observeOn() , the operation logic will not change. subscribeOn() works only with source observable.

 Observable.just(1, 2, 3, 4, 5, 6) .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName())) .map(integer -> integer * 3) .filter(integer -> integer % 2 == 0) .subscribeOn(Schedulers.computation()) .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName())); 

It is also important to understand that you cannot use subscribeOn() several times in the same call chain. You can, of course, write again, but this will not entail any changes. In the example below, we consistently call three different schedulers, can you guess which scheduler will work on startup?

 Observable.just(1, 2, 3, 4, 5, 6) .subscribeOn(Schedulers.io()) .subscribeOn(Schedulers.computation()) .subscribeOn(Schedulers.newThread()) .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName())) .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName())); 

If you answered Schedulers.io() , then you are right! Even if you make the call multiple times - only the first subscribeOn() , called after the observable source, will work .

Under the hood


It is worth spending a little more time for a more detailed study of the considered example. Why does Schedulers.io() only work? Usually, everyone thinks that Schedulers.newThread() will work, since this call is at the end of the chain.

It is necessary to understand that in RxJava a subscription is created after the callback of all Observable instances. The code below will help us understand this. This is a previously reviewed example, but described in more detail.

 Observable<Integer> o1 = Observable.just(1, 2, 3, 4, 5); Observable<Integer> o2 = o1.filter(integer -> integer % 2 == 0); Observable<Integer> o3 = o2.map(integer -> integer * 10); o3.subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName())); 

In order to understand how everything works, let's begin to sort everything out from the last line of the example. In it, the target subscriber calls the subscribe() method on the observable object o3 , which then makes an implicit call subscribe() on its parent observable object o2 . The implementation of the observer (observer) provided by the o3 object multiplies the transmitted numbers by 10.

The process repeats and o2 implicitly calls the subscribe() object o1 , passing the observer implementation, which allows only even numbers to be processed. Now we have reached the root element ( o1 ), which has no parent for the subsequent call to subscribe() . At this stage, the chain of observable elements is completed, after which the observable source begins to transmit (emit) elements.

Now you need to understand the concept of how subscriptions work in RxJava. By now you should have an understanding of how chains of observable objects are formed and how events spread from the observable source.

observeOn ()


As we have already seen, subscribeOn() instructs the observable source to transfer elements to a specific stream, and this stream will be responsible for promoting elements up to the subscriber (Subscriber) . Therefore, by default, the subscriber receives processed items in the same thread.

But this may not be the behavior you expect. Suppose you want to get some data from the network and display it in the user interface.

Two things need to be done:


You will have an Observable that makes a network call in the I / O stream and sends the result to the subscriber. If you use only subscribeOn(Schedulers.io()) , then the target subscriber will process the result in the same I / O stream. And we are not lucky, because you can work with the user interface in Android only in the main thread.

Now it is extremely necessary for us to switch flows and we use observeOn() for this. When observeOn() is encountered in a call chain, elements transmitted by the observable source are immediately transferred to the stream specified in observeOn() .

 getIntegersFromRemoteSource() .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName())) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName())); 

In this invented example, we observe the receipt of integers from the network and their further transmission from the observable source. In actual examples, this could be any other asynchronous operation, for example, reading a large file, fetching data from a database, etc. You can run this example and look at the results, just follow the logs in the console.

Now consider a more complex example in which observeOn() will be called several times to switch threads in the course of data processing.

 getIntegersFromRemoteSource() .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName())) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .map(integer -> { println("Mapping item " + integer + " on: " + currentThread().getName()); return integer * integer; }) .observeOn(Schedulers.newThread()) .filter(integer -> { println("Filtering item " + integer + " on: " + currentThread().getName()); return integer % 2 == 0; }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName())); 

In the example above, the observable source passes elements to the chain of handlers in the input / output stream, since we used subscribeOn() along with Schedulers.io() . Next, we want to convert each element using the map() operator, but this needs to be done in the computational flow. To do this, use observeOn() along with Schedulers.computation() before calling map() to switch the stream and transfer the elements to the target computational stream.

The next step is to filter some elements and for some reason we want to perform this operation in a new stream for each of the elements. We use again observeOn() , but already paired with Schedulers.newThread() before calling the filter() operator to transfer each element to a new stream.

As a result, we want the subscriber to get the result of processing in the UI thread. For this we use observeOn() together with the AndroidSchedulers.mainThread() scheduler.

But what happens if we use observeOn() several times sequentially? In the example below, in which thread does the subscriber get the result?

 getIntegersFromRemoteSource() .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName())) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .observeOn(Schedulers.single()) .observeOn(Schedulers.computation()) .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName())); 

If you run the example, you will see that the subscriber will receive the elements in the RxComputationThreadPool-1 computational flow. This means that the last one called observeOn() worked. I wonder why?

Under the hood


Perhaps you guessed it. As we know, a subscription (subscription) is called after a reverse roundabout of all Obsevable , but with the transfer of events (emissions) everything happens the other way round, that is, in the usual order as the code is written. The call comes from the observable source and further down the call chain up to the subscriber.

The observeOn() operator always works in a straightforward order, therefore sequential switching of flows occurs and the latter switches to the computational flow ( observeOn(Schedulers.computation()) ). So, when you need to switch a stream to process data in a new stream, just call observeOn() , and then process the elements. Synchronization, exclusion of the race condition, all this and many other multi-threading difficulties RxJava handles for you.

Summary


Now you should have a fairly good idea of ​​how to properly use RxJava to write multi-threaded applications that provide fast and smooth user interface.

If the understanding did not come right away, do not worry. Read the article again, experiment with code examples. There are a lot of nuances to understand, do not rush.

Source: https://habr.com/ru/post/344016/


All Articles