📜 ⬆️ ⬇️

Taming asynchronous processes in Android with RxJava. Yandex experience

Hello everyone, my name is Alexey Agapitov and today I want to tell you how to
using a library like RxJava, you can easily handle many
asynchronous processes in your Android application.


We will look at how to create our own cold and hot sequences, reversing
attention to some nuances when using RxJava, and also consider
how powerful are the tools provided by this library
operators.


I will talk about everything on the example of the application Yandex. Real estate and its
home screen with a map.


Screenshot

To begin with, we will look at the screen and see what is happening on it and what will we
to be implemented.



GIF, 13 mb
Gif

First of all, there is an interaction with the card: a person can move the card
and points will appear on it with ads matching its filters.
Points can be single announcements, new buildings, houses and clusters,
combining many ads. Note that single ads can be
marked as viewed (this flag is stored locally on the device).


The filters themselves change on a different screen, but they must be used when requesting
points of interest on the map.


Another component of the query is the geographical object for which we are looking
ads.


Screenshot geoobject


This element is necessary to quickly turn on / off the search in this object or in the area that is now open on the map.


Thus, the points on the map must be updated for each of the listed
actions (map, filter or geoobject change). For the sake of brevity, we will not
consider the possibility of drawing objects on the map, we believe that such objects
are a special case of a geoobject.


In addition, we have two processes occurring in other threads: getting points from the web API and checking which of these points have already been viewed on this device (for this we refer to the database).


Given that the map, filters and geoobject change faster and more often than answers with points from the server arrive, it is necessary to use only the most recent results and discard previous ones.


Thus, we need to implement a screen containing a considerable amount of
asynchronous processes dependent on each other.


Comparing RxJava with the traditional Android approach


In the traditional approach to Android, to monitor each of the considered
processes we would use callbacks. When a change event occurs
constituent elements on the map (for example, they moved the map), we read the rest
components and we connect them in one request which we execute.


When implementing this approach, there are some difficulties.


  1. Callbacks do not combine well with each other:
    1. harder to read the code - harder to understand the connections of callbacks with each other, to determine
      who depends on whom, callbacks are separated by code and it’s harder to navigate;
    2. code flexibility is lost - there is less opportunity for it
      reuse, it is more difficult to make changes to existing solutions.
  2. You must explicitly store the additional state associated with the executable.
    asynchronous operation and its callback. The more such state variables,
    the more likely to make a mistake (for example, when working with multiple threads).

We chose the RxJava library for the following reasons:


  1. The presence of a universal abstraction over asynchronous processes of any nature
    (event model, multi-threaded processing) called Observable -
    observed sequence;
  2. The ability to change the sequence through the use of operators and
    a large number of useful operators;
  3. Ability to combine sequences with each other;
  4. Reducing the number of state variables by using
    sequences and operators;
  5. Stability and quality of the library implementation;

We use this library in the application for a variety of purposes - starting with
background loading and data processing and ending with processing multiple
events occurring in the user interface.


Implementation


Let's look at examples of how the library is used in our application.


Watching map changes


First, take a look at the change in the state of the map. To do this, we use the following sequence, which reports that the coordinate borders of the map have changed:


public static Observable<BoundingBox> observeMapBoundingBox(final MapController mapController) { return Observable.create(new Observable.OnSubscribe<BoundingBox>() { @Override public void call(final Subscriber<? super BoundingBox> subscriber) { final OnMapListener listener = new OnMapListener() { @Override public void onMapActionEvent(MapEvent mapEvent) { switch (mapEvent.getMsg()) { case MapEvent.MSG_SCALE_END: case MapEvent.MSG_SCROLL_END: case MapEvent.MSG_ZOOM_END: if (!subscriber.isUnsubscribed()) { subscriber.onNext(getViewportBoundingBox(mapController)); } break; } } }; mapController.addMapListener(listener); //    -    subscriber.add(Subscriptions.create(() -> { mapController.removeMapListener(listener); })); } }); } 

In this implementation, two main parts:


  1. creating and adding a map event listener
  2. deleting a given listener when unsubscribed from the sequence.

Notice that we create and register a listener inside OnSubscribe, that is, when the sequence has become active (someone has subscribed to it).


Here we are dealing with a classic example of a cold sequence - one that releases new elements while it is signed. An excellent example of the implementation of such sequences is the RxBinding library, which allows you to monitor the events in widgets present in the standard API, as well as the support library.


Watching filters change


Now consider the second component of the request for points - filters. Suppose we have a class that stores current filters and provides methods for updating them. And we want to observe the change in the value of this field. We can go the same way as in the case of the map, adding a field with an observer of changes to this field and notifying the observer when the field has changed. But, the field may have a lot of observers, which means that either it is necessary to store their array, or use the operators share , publish + autoConnect when creating the sequence
in order to send the event to multiple sequence watchers. However, I want to make it transparent for the consumer, and here we will be helped by a class from the RxJava library, such as Subject, to which we will pass all the listed
duties.


Subject is a sequence that can simultaneously have multiple subscribers who receive all the data and notifications about its completion or error. At the same time, work with Subject occurs using the same methods that its subscribers possess: onNext , onCompleted , onError . That is, Subject itself is a subscriber, and therefore, if necessary, he can subscribe to another sequence and relay it to all his subscribers.


Let's look at an example of what this gives us:


 public class FilterHolder { private final PublishSubject<Filter> subject = PublishSubject.create(); private Filter current; public Observable<Filter> observeChanges(boolean emitCurrentValue) { return emitCurrentValue ? subject.startWith(current) : subject; } public void set(Filter filter) { this.current = filter; subject.onNext(filter); } } 

As you can see, when setting a new value, we send it to all subscribers. In this case, we use PublishSubject , which sends freshly received data, to all of its subscribers. In principle, it would be possible to use ReplaySubject , which is able to store the latest received data and repeat it for those subscribers who have subscribed after receiving this data. But in this case, we would have to change the implementation of the observeChanges method — instead of sending the current value, we would have missed it.


Similarly, you can extend existing classes and add them
reactive capabilities.


Subject is an example of a hot sequence, that is, it remains active and will receive / send items, even if nobody has subscribed to it. The main thing is to remember that a Subject can accept new sequence elements in onNext and send them to its subscribers until it has called onCompleted or onError .


This is important in those situations when the data / event source is infinite and there are no calls for onCompleted and onError , so calling these methods on a Subject sending this data to its subscribers can lead to unexpected effects.


Monitoring the third component of the request to the API for points - a geoobject is similar
filters and implemented using Subject.


Finally, you need to put these three elements together and send them in a network request.


API call


To access the API, we use the well-known library Retrofit, and all the results of network calls are represented as Observable.


As a result, the method in the network layer will look like this:


 public Observable<ClustersData> getClusters(MapBoundingBox box, Filter filter, GeoObject geo) { //   API } 

Putting it all together


So, we combine all the listed asynchronous processes:


 Observable.combineLatest( observeMapBoundingBox(mapController).debounce(300L, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread()), filterHolder.observeChanges(true), observeGeoObject(true), SearchRequest::clusters//      ) .switchMap(request -> networkHelper .getClusters(request.boundingBox, request.filter, request.geoObject) .observeOn(AndroidSchedulers.mainThread()) .doOnError(handleErrorAction())//  .onErrorResumeNext(Observable.empty())//  ) .observeOn(Schedulers.computation()) //  ,      .map(this::processViewedClusters) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<ClustersData>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { //     ,    } @Override public void onNext(ClustersData clustersData) { //    } }); 

We use the combineLatest operator to follow the changes in each of the three values, and when one of them changes, we create an object describing
network request.


This operator works as follows: it waits until each of the sequences passed to it provides one element and calls a function that converts all these elements into some other object. Then, each time a new element appears in any of the sequences, the operator again calls this function passing it the new value and the last values ​​of the others.


CombineLatest operator


Thus, it is very similar to zip , with the only difference being that it uses the last known values ​​of the elements, unless the sequences provide new ones. This is convenient when you combine sequences that produce elements with different frequencies, for example, a change in the state of the map, which happens more often than filters.


After the network request object is built, we go directly to the request. To do this, we refer to the object responsible for interacting with the API, and pass the parameters we collected to it. There are two interesting points here.


But first, consider the flatMap operator, which for each element of the original sequence returns a new sequence, and then merges them all into one resultant sequence.


Flatmap operator


The switchMap operator works the same way, with the only difference being that it unsubscribes from the sequence received from the previous element, switches to a new one and waits for results from it.


SwitchMap operator


This is necessary because network requests are slow, so if, for example, a person has moved the map, the previous request loses its relevance and we must request new points.


The second point is to suppress network errors with the help of the doOnError and onErrorResumeNext (with which we return an empty sequence).
This is done so that the sequence map / filters / geo-object -> network request -> points on the map does not break if one of the requests ends with a (network) error - in fact, in this case the new map changes will not give any result, and network errors may well occur, and we need to handle them.


The next step after receiving points on the map is to identify those that the user has already viewed. For this, a database query is made, after which all scanned points are flagged. Since this is a long operation, we release the network scheduler and switch calculations to computation: observeOn(Schedulers.computation()) . To query the database, we use Cupboard and our Rx wrappers on top of it, but in this case we managed the usual synchronous method, although you could use the method that returns Observable .


You, probably, have already noticed that the debounce operator appeared in the sequence of monitoring the change in the position of the card, which allows you to drop extra elements if they all came at a specified time interval. This is necessary in order not to make too often requests to the server while the user is viewing the map. By default, this operator uses the computation-scheduler, but since we know that our events occur in the main thread, we can override it with the scheduler for the main flow. This avoids unnecessary switching of streams at a given place, and also saves the computation-scheduler from unnecessary tasks (since the number of threads in it is by default limited by the number of cores).


And now let's summarize.


A small amount of code.


All the logic fit into one sequence, in which the movement of data and the logic of their processing are understood.


Subjectively, such code seems simpler than if we used callbacks. But here we must make a reservation that this requires at least a knowledge of the basics of RxJava.


More simple implementation


All the work on synchronization and storage of the intermediate state of asynchronous operations was passed on to the library. As a result, we keep a minimum of the intermediate state. This reduces the likelihood of errors when working with multiple threads and asynchronous processes.


In addition, instead of implementing the processing of a set of asynchronous processes, we operate on data streams and implement our business tasks directly. At the same time, it is easy to add new processing steps to our sequence and change existing ones.


I also want to mention that the code with sequences is easier to test, because
the sequence can be replaced with the one needed for testing (in the case of callbacks this will be more difficult).


For example, you can replace all sequences associated with an interface with
predefined values ​​using the just operator.


')

Source: https://habr.com/ru/post/311084/


All Articles