📜 ⬆️ ⬇️

Reactive programming for Android

Fault tolerance, responsiveness, focus on events and scalability are the four principles of today's popular reactive programming . It is following them that the backend of large systems is created with the simultaneous support of tens of thousands of connections.

Responsiveness, simplicity, flexibility and extensibility of the code - the principles that can be assigned to a reactive UI.

Surely, if you combine reactive backend and UI, you can get a quality product. That's what we tried to do by developing 2GIS Dialer - dialers that work through the API and at the same time should remain fast and convenient.
')



Why do we need reactive programming


Consider an example:

requestDataTask = new AsyncTask<Void, Void, JSONObject>() { @Override protected JSONObject doInBackground(Void... params) { final String requestResult = apiService.getData(); final JSONObject json = JsonUtils.parse(requestResult); lruCache.cacheJson(json); return json; } }; 

Everything is simple, we create AsyncTask, in which:

  1. Make a request to API 2GIS .
  2. Create a JSONObject based on the result of the query.
  3. Cache JSONObject .
  4. Return JSONObject .

Such code is found in many projects, it is understandable, and millions of lemmings can not be wrong. But let's dig a little deeper:

  1. What to do if Exception dropped somewhere at runtime?
  2. doInBackground(Void...) runs in a separate thread, how do we tell the user about a UI error? To get fields for Exception ?
  3. And what to return if the request failed? null ?
  4. And if json is not valid?
  5. What should I do if I could not cache the object?

And this is not the most difficult example. Imagine that you need to make another request based on the results of the previous one. On AsyncTasks, this will be a callback-hell, which, at a minimum, will be unstable to crashes, errors, etc.

There are more questions than answers. On the shortcomings of AsyncTask'ov you can write an entire article, seriously. But are there any better options?

RxJava Framework


Looking back at the principles of reactive programming, we began to look for a solution that would provide:


That was RxJava from the guys from Netflix - the reactive extension, the idea (but not the implementation) of which migrated from the reactive extension for c #.

In RxJava, everything revolves around Observable . Observable is like data streams (they can also be considered as monads) that can receive and transmit this data in any way. Over Observable you can use operations such as flatmap , filter , zip , merge , cast , etc.

A simple example:

 //Observable,        Iterable final Subscription subscription = Observable.from(Arrays.asList(1, 2, 3, 4, 5)) .subscribeOn(Schedulers.newThread()) //      background .observeOn(AndroidSchedulers.mainThread()) //,     main thread .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { //do something with result } }); 

We create an Observable , which in turn gives us numbers from Iterable . We indicate that the generation and transmission of data will take place in a separate thread, and the processing of the result will be in main thread. We subscribe to it, and in the subscriber's method we perform any manipulations with each subsequent result.

You can make this example more interesting:

 //Observable,        Iterable final Subscription subscription = Observable.from(Arrays.asList(1, 2, 3, 4, 5)). //      filter(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer % 2 == 0; //      } }) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { //do something with ONLY EVEN result } }); 

Now, specifying the filter operator, we can process only even values.

How to use RxJava


Let's return to our first AsyncTask and see how we would solve the problem with the help of reactive programming.
First, create an Observable with the query:

 //Observable,       Observable.OnSubscribe<String> Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { //   ,     subscriber.onNext(apiService.getData()); //    ,        subscriber.onCompleted(); } }); 

Here we create an Observable and specify its behavior. We make a request and give the result in onNext(...) , after which we say to Subscriber that we ended up by calling onCompleted() .

This is understandable: we created Observalble , which is only responsible for getting the String object from the API. SRP in its purest form.
What if the request failed for some reason? Then we can call the Observable method retry(...) , which will repeat this Observable n times until it is completed successfully (read, without Exception ). In addition, we can give Observable 'from another Observable , even if retry() did not help. If the backend is written crookedly, it would be better for us to close the connection by timeout. And we have a timeout(...) method for this case. All together it would look like this:

 final Subscription subscription = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext(apiService.getData()); subscriber.onCompleted(); } }) .timeout(5, TimeUnit.SECONDS) //     .retry(3) //  3   //   ,        .onErrorResumeNext(new Func1<Throwable, Observable<? extends String>>() { @Override public Observable<? extends String> call(Throwable throwable) { //return new observable here, that can rescure us from error } }); 

And a bit of refactoring:

 final Subscription subscription = createApiRequestObservable() // Observable   .timeout(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS) //  .retry(RETRY_COUNT_FOR_REQUEST) // -  .onErrorResumeNext(createRequestErrorHandler()); //    

Now let's create json. For this, the result of our first Observable (and there the String ) must be converted. We use map(...) , and if something suddenly goes wrong, we return another one we need in case of failure, json using onErrorReturn(...) .
Like this:

 final Subscription subscription = createApiRequestObservable() .timeout(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS) .retry(RETRY_COUNT_FOR_REQUEST) .onErrorResumeNext(createRequestErrorHandler()) // Observable,    String  JSONObject .map(new Func1<String, JSONObject>() { @Override public JSONObject call(String s) { return JsonUtils.parse(s); } }) //    //   "" json .onErrorReturn(new Func1<Throwable, JSONObject>() { @Override public JSONObject call(Throwable throwable) { return jsonObjectForErrors; } }); 

Ok, with json sorted out. Caching left. Caching: this is not a result conversion, but an action on it. For this, Observable has methods doOnNext(...) , doOnEach(...) , etc. It turns out something like this:

 final Subscription subscription = createApiRequestObservable() .timeout(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS) .retry(RETRY_COUNT_FOR_REQUEST) .onErrorResumeNext(createRequestErrorHandler()) // Observable,    String  JSONObject .map(new Func1<String, JSONObject>() { @Override public JSONObject call(String s) { return JsonUtils.parse(s); } }) //    //   "" json .onErrorReturn(new Func1<Throwable, JSONObject>() { @Override public JSONObject call(Throwable throwable) { return jsonObjectForErrors; } }) //,    onNext(..)  Observable .doOnNext(new Action1<JSONObject>() { @Override public void call(JSONObject jsonObject) { lruCache.cacheJson(jsonObject); } }); 

Again, a little refactor code:

 final Subscription subscription = createApiRequestObservable() // Observable   .timeout(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS) //  .retry(RETRY_COUNT_FOR_REQUEST) // -  .onErrorResumeNext(createRequestErrorHandler()) //    .map(createJsonMapOperator()) // Observable,   JSONObject .onErrorReturn(createJsonErrorHandler()) //    ,   .doOnNext(createCacheOperation()); // JSONObject 

We are almost done. As in the very first example with RxJava, add a result handler and specify the threads in which to execute.
Final version:

 final Subscription subscription = createApiRequestObservable() // Observable   .timeout(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS) //  .retry(RETRY_COUNT_FOR_REQUEST) // -  .onErrorResumeNext(createRequestErrorHandler()) //    .map(createJsonMapOperator()) // Observable,   JSONObject .onErrorReturn(createJsonErrorHandler()) //    ,   .doOnNext(createCacheOperation()); // JSONObject .subscribeOn(Schedulers.newThread()) // , ,     .observeOn(AndroidSchedulers.mainThread()) //   -  main thread .subscribe(subscriber); //  

Let's see what we have achieved here:

  1. The principle of fault tolerance in action: the result of all operations is always predictable. We know about all the errors and potentially problem areas that may occur in the code, and have already processed them. There will be no exceptions.
  2. The principle of responsiveness in action: the connection to the database or server will not freeze due to timeout, will try to recover on error and, which is also important, will return the result immediately before caching. And the caching in doOnNext will be executed in parallel with the processing of the result.
  3. The principle of focus on events in action: in the course of the execution of the request and parsing, we always respond to events - events of successful / unsuccessful completion of the request, the json parsing termination event (2 reactions: processing in the UI and processing in the backtrade for caching), etc. . In addition, you can subscribe to one Observable several times and keep the entire system in a consistent state.
  4. The code is easily extensible and requires almost no changes. If we need to log errors or save stackraces, we can add the doOnError(Throwable thr) method. You want to filter the results - add a filter statement and implement its behavior.

As well as the disadvantages of AsyncTask's, the advantages of this approach, in my opinion, can be listed for a very long time. The last of the principles of reactive programming, the principle of scalability, we will demonstrate below.

RxJava to 2GIS Dialer


Living example:

 //  Observable     final Observable<AggregatedContactData> obs = Observable.combineLatest( createContactsObservable(context), //Observable      createPhonesObservable(context), //Observable      createAccountsObservable(context), //Observable        createJobsObservable(context), //Observable      contactsInfoCombinator() //    Observable  ).onErrorReturn(createEmptyObservable()).cache() //     .subscribeOn(Schedulers.executor(ThreadPoolHolder.getGeneralExecutor())) //       .observeOn(AndroidSchedulers.mainThread()); //     -  main thread 


  1. Here there is a lot of interesting and more complicated things described above:
    The first thing that catches your eye is Observable.combineLatest(...) . This operator waits for onNext(...) from all Observable passed to it and applies the combining function to all results at once. It may seem complicated, but the RxJava wiki image will make everything clearer. The most important thing is that each of the Observable sent to Observable.combineLatest(...) is a CursorObservable , which sends a new cursor to its onNext(...) as it changes in the database. Thus, a combination function is performed on any update to any of the four cursors, which allows you to always deliver the latest data. This is the principle of focus on events.
  2. If something goes wrong, then we will return what is required based on our needs. In this case, Collections.emptyList();
  3. The cache() operator is very useful if several Subscribers can subscribe to this Observable at once. If this operator is applied to Observable , then its new subscriber instantly receives data, provided that this data has already been calculated for those who subscribed earlier. Thus, everyone has the same current data.
  4. And here we see the principle of scalability: in subscribeOn(...) I give the thread pool to 4 threads so that each of my 4 Observable executed in a separate thread in order to maximize speed, RxJava takes care of the rest. That is, all 4 processors will be involved, with the presence of these.

As you can see, the potential of reactive programming is huge, and RxJava functional realizes it sufficiently.

The problems we are facing


Everything shown above and much more in one form or another is used in our diler. And here are the problems we faced:


What else?


We saw how cool it was to work responsively with multithreading and querying on Android. But that's not all. For example, you can subscribe to a Checkable change or EditText (this is out of the box in RxJava for Android). Everything is simple, but terribly convenient.
By the way, reactive programming for Java is not limited to one RxJava. There are other libraries, for example, Bolts-Android .
In addition, Reactive-Streams is being actively developed, which is intended to unify work with various jet providers in java.

Conclusion


Did we like it? Definitely. Reactive applications are really much more resistant to bugs and crashes, the code becomes clear and flexible (if it were lambda, it would be beautiful). A lot of routine work is shifted to the library, which performs its work better than native Android components. This allows you to focus on the implementation of things that are really worth considering.

Reactive programming is a slightly different way of thinking compared to traditional Android development. Data streams, functional operators - these complex, at first glance, things turn out to be much easier if you look into it. It is worth a little work with reactive programming, and the brain begins to rebuild from objects and states to monads and operators on them. This is such a big, kind, powerful piece of OP in OOP, which makes life and code simpler, and the application is better. Try, you will not regret.

Links that will help you deal with reactive programming or may just be interesting:


A small digression. If you share our views on programming and creating products, then come - we will be glad to see you in the 2GIS Dialer team.

Source: https://habr.com/ru/post/228125/


All Articles