Hello everyone.
We continue to acquaint you with our publishing search, and wanted to probe public opinion on the topic of RxJava.

')
In the near future we are going to publish a more general material on reactive programming, which we are also interested in not for the first year, but today we suggest reading about the use of RxJava in Android, since dynamism and responsiveness are especially important on this platform. Welcome under cat
In most Android applications, we respond to user actions (clicks, flicking, etc.), while some other work (network) is in the background.
Orchestration of all these processes is a difficult task; any code risks quickly turning into a shapeless mess.
For example, it is not so easy to send a request to the database over the network, and after it is executed, immediately start simultaneously selecting both user messages and settings, and after completing all this work, display a welcome message.
It is in such cases that
RxJava (ReactiveX) is a library that allows you to organize many actions caused by certain events in the system.
Working with RxJava, you can forget about callbacks and the hellish control of the global state.
Why?Let's return to our example:
send a request to the database over the network, and after its execution, immediately start simultaneously selecting user messages and settings, and after completing all this work, display a welcome message.
If we analyze this situation in more detail, we will find three main stages in it,
all three occurring in the background :
- Select user from database
- Simultaneously select custom settings and messages
- Combine the results of both queries into one
To do the same in Java SE and Android, we would need:
- Make 3-4 different
AsyncTasks
- Create a semaphore that will wait for the completion of both requests (by settings and by messages)
- Implement object-level fields for storing results.
It is already clear that for this purpose it is required to manage the state, and also to use some blocking mechanisms that exist in Java.
All this can be avoided by working with RxJava (see examples below) - all code looks like a stream located
in one place and is built on the basis of the
functional paradigm (see
here ).
Quick start in AndroidTo get the libraries that you most likely need for your project, insert the following lines into your build.gradle file:
compile 'io.reactivex: rxjava: 1.1.0'
compile 'io.reactivex:rxjava-async-util:0.21.0' compile 'io.reactivex:rxandroid:1.1.0' compile 'com.jakewharton.rxbinding:rxbinding:0.3.0' compile 'com.trello:rxlifecycle:0.4.0' compile 'com.trello:rxlifecycle-components:0.4.0'
Thus will be included:
- RxJava is the main ReactiveX library for Java.
- RxAndroid - RxJava extensions for Android, which will help to work with threads in Android and with Loopers.
- RxBinding - bindings between RxJava and Android user interface elements, in particular, Buttons buttons and text views of TextViews
- RxJavaAsyncUtil - helps to glue Callable and Future code.
ExampleLet's start with an example:
Observable.just("1", "2") .subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println(s); } });
Here we have created
Observable , which will generate two elements - 1 and 2.
We have subscribed to observable, and now, as soon as the item is received, we will display it on the screen.
Some detailsAn
Observable object is an entity that you can subscribe to and then receive generated Observable elements. They can be created in a variety of ways. However, the Observable does not usually start generating elements until you subscribe to them.
After you subscribe to observable, you will receive a subscription (subscription). The subscription will receive objects from observable until it signals that it has completed its work (does not put such a mark), or (in very rare cases) the reception will continue indefinitely.
Moreover, all these actions will be performed in the main thread.
Advanced example Observable.from(fetchHttpNetworkContentFuture()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println(s); } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { throwable.printStackTrace(); } });
Here we see something new:
- subscribeOn (Schedulers.io ()) - thanks to this method, Observable will perform waiting and calculations in the thread pool ThreadPool intended for input / output (Schedulers.io ()).
- observeOn (AndroidSchedulers.mainThread ()) - thanks to this method, the result of the subscriber’s action will be performed on the main Android thread. This is required in cases where you need to change something in the Android user interface.
- In the second argument to .subscribe (), an error handler appears for operations with a subscription in case something goes wrong. Such a thing should be present almost always.
Complex flow controlRemember the complex flow we described at the very beginning?
This is how it will look like with RxJava:
Observable.fromCallable(createNewUser()) .subscribeOn(Schedulers.io()) .flatMap(new Func1<User, Observable<Pair<Settings, List<Message>>>>() { @Override public Observable<Pair<Settings, List<Message>>> call(User user) { return Observable.zip( Observable.from(fetchUserSettings(user)), Observable.from(fetchUserMessages(user)) , new Func2<Settings, List<Message>, Pair<Settings, List<Message>>>() { @Override public Pair<Settings, List<Message>> call(Settings settings, List<Message> messages) { return Pair.create(settings, messages); } }); } }) .doOnNext(new Action1<Pair<Settings, List<Message>>>() { @Override public void call(Pair<Settings, List<Message>> pair) { System.out.println("Received settings" + pair.first); } }) .flatMap(new Func1<Pair<Settings, List<Message>>, Observable<Message>>() { @Override public Observable<Message> call(Pair<Settings, List<Message>> settingsListPair) { return Observable.from(settingsListPair.second); } }) .subscribe(new Action1<Message>() { @Override public void call(Message message) { System.out.println("New message " + message); } });
In this case, a new user (createNewUser ()) will be created, and at the stage of creating and returning the result, the selection of user messages (fetchUserMessages ()) and user settings (fetchUserSettings) will continue at the same time. We will wait for both actions to complete and return the combined result (Pair.create ()).
Do not forget - all this happens in a separate thread (in the background).
Then the program will display the results. Finally, the list of messages will be adapted to another observable, which will display messages one by one, rather than a whole list, with each message appearing in the terminal window.
Functional approachIt will be much easier to work with RxJava if you are familiar with functional programming, in particular, with map and zip concepts. In addition, generalized logic is very similar in RxJava and OP.
How to create your own observable?If the code becomes largely tied to RxJava (for example,
here ), then often you will have to write your own observable, so that they fit into the logic of your program.
Consider an example:
public Observable<String> customObservable() { return rx.Observable.create(new rx.Observable.OnSubscribe<String>() { @Override public void call(final Subscriber<? super String> subscriber) {
But a similar option that does not require performing an action strictly in a specific thread:
Observable<String> observable = Observable.create( new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("Hi"); subscriber.onCompleted(); } } );
Here it is important to note three methods:
- onNext (v) - sends a new value to the subscriber
- onError (e) - notifies the observer about the error that occurred
- onCompleted () - notifies the subscriber that you should unsubscribe, since no more content will be received from this observable
In addition, it will probably be convenient to use RxJavaAsyncUtil.
Integration with other libraries
As RxJava becomes more popular and de facto turns into the standard of asynchronous programming in Android, more and more libraries are increasingly integrating with it.
Just a few examples:
Retrofit - "Type-safe HTTP-client for Android and Java"
SqlBrite - “Lightweight wrapper for SQLiteOpenHelper, enriching SQL operations with semantics of reactive flows.”
StorIO - “Beautiful API for SQLiteDatabase and ContentResolver”
All of these libraries greatly simplify working with HTTP requests and databases.
Interactivity with Android UIThis introduction would be incomplete if we did not consider how to use native UI elements in Android.
TextView finalText; EditText editText; Button button; ... RxView.clicks(button) .subscribe(new Action1<Void>() { @Override public void call(Void aVoid) { System.out.println("Click"); } }); RxTextView.textChanges(editText) .subscribe(new Action1<CharSequence>() { @Override public void call(CharSequence charSequence) { finalText.setText(charSequence); } }); ...
Obviously, you can simply rely on setOnClickListener, but in the long run, RxBinding might be better for you, because it allows you to connect the UI to the general flow of RxJava.
TipsPractice shows that when working with RxJava, you should follow some rules.
Always use error handlerSkip the error handler in this way.
.subscribe(new Action1<Void>() { @Override public void call(Void aVoid) { System.out.println("Click"); } });
usually does not follow. An exception that is thrown at the observer or in one of the actions will throw an exception that will most likely kill you the entire application.
It would be even better to make a generic handler:
.subscribe(..., myErrorHandler);
Extract action methodsIf you have a lot of internal classes, then after some time the readability of the code may deteriorate (especially if you are not working with RetroLambda).
Therefore, this code:
.doOnNext(new Action1<Pair<Settings, List<Message>>>() { @Override public void call(Pair<Settings, List<Message>> pair) { System.out.println("Received settings" + pair.first); } })
would look better after this refactoring:
.doOnNext(logSettings()) @NonNull private Action1<Pair<Settings, List<Message>>> logSettings() { return new Action1<Pair<Settings, List<Message>>>() { @Override public void call(Pair<Settings, List<Message>> pair) { System.out.println("Received settings" + pair.first); } }; }
Use own classes or tuplesThere are cases in which a certain value is determined by a different value (for example, user and user settings), and you would like to get both of these values ​​using two asynchronous requests.
In such cases, I recommend using
JavaTuples .
Example:
Observable.fromCallable(createNewUser()) .subscribeOn(Schedulers.io()) .flatMap(new Func1<User, Observable<Pair<User, Settings>>>() { @Override public Observable<Pair<User, Settings>> call(final User user) { return Observable.from(fetchUserSettings(user)) .map(new Func1<Settings, Pair<User, Settings>>() { @Override public Pair<User, Settings> call(Settings o) { return Pair.create(user, o); } }); } });
Lifecycle ManagementIt often happens that the background process (subscription) must last longer than the activity or fragment in which it is contained. But what if the result doesn't interest you anymore, as soon as the user leaves the activity?
In such cases, the
RxLifecycle project will help you.
Wrap your observable like this (taken from the documentation) and immediately after its destruction, an unsubscribe will be executed:
public class MyActivity extends RxActivity { @Override public void onResume() { super.onResume(); myObservable .compose(bindToLifecycle()) .subscribe(); } }
ConclusionOf course, this is not a complete guide on using RxJava in Android, but I hope I was able to convince you that in some respects RxJava is better than ordinary AsyncTask.