I have long been afraid to use RxJava in production. Its purpose and principle of operation remained a mystery to me. Reading the source code did not add clarity, and the articles only confused. Under the cat an attempt to answer the questions: “What tasks does this technology solve better than analogs?” And “How does it work?” Using analogies from classical Java and simple metaphors.

Application
RxJava perfectly replaces the Java 8
Streams API on earlier versions of Java. Since Android Java 8 is far from being supported with 4.0, Rx will be the optimal solution. The article describes RxJava from this angle, because, in my opinion, it is the most understandable and truly reactive Android application using pure Rx is difficult to implement.
Emitter
We all know the Iterator pattern.
')
interface Iterator<T> { T next(); boolean hasNext(); }
There is some data source behind the interface, and it doesn’t matter which one. Iterator completely hides all the implementation details, providing only two methods:
next - get the next item
hasNext - find out if there is more data in the source
This pattern has one feature: the consumer requests the data and waits (“freezes”) until the source issues it. Therefore, the source is usually the final, often pre-formed collection.
Let's do a little refactoring.
interface Iterator<T> { T getNext(); boolean isComplete(); }
I think you already understand what I mean. The Emitter interface from RxJava
(for consumers, it is duplicated in the Observer (Subscriber in RxJava 1)) :
interface Emitter<T> { void onNext(T value); void onComplete(); void onError(Throwable error); }
It is similar to Iterator, but it works in the opposite direction: the
source informs the consumer that new data has appeared.This allows you to solve all the problems with multithreading on the source side and, for example, if you are designing a UI, then you can
expect that all the code responsible for the graphical interface is sequential. Incredibly comfortable. Goodbye callbacks! I will not be bored.
The analogy with Iterator is taken from [1]Sources
Now a little about the sources themselves. They are of many types: Observable, Single, Maybe ... And they all look like cabbage
(and monads, but this is not so important) .
Because by creating one source, you can wrap it in another source, which can be wrapped again into another source and so on until OutOfMemory.
(But since the normal source weighs less than 100 bytes, rather, until the charge is over.)Let's wrap the answer to that question in the source.
Observable.just(42)
As we know, getting an answer is quite a long operation. Therefore, we wrap it in a source that performs calculations in a special stream:
Observable.just(42) .subscribeOn(computation())
And we also want the application not to fall when responding. Wrap in the source, which returns the answer in the main thread:
Observable.just(42) .subscribeOn(computation()) .observeOn(mainThread())
And finally, run:
Observable.just(42) .subscribeOn(computation()) .observeOn(mainThread()) .subscribe(new DisposableObserver<Integer>() { @Override public void onNext(Integer answer) { System.out.print(answer); } @Override public void onComplete() {} @Override public void onError(Throwable e) {} });
The console got the answer, but what happened?
The
subscribe method is defined in
Observable . He does the checks and preparation, and then calls the
subscribeActual method, which is already defined differently for different sources.
In our case, the
subscribe method called the
subscribeActual method of ObservableObserveOn, which calls the subscribe method of the wrapped source, specifying to which thread the result should be returned.
In ObservableObserveOn is ObservableSubscribeOn. Its
subscribeActual starts the
subscribe wrapped in the specified stream.
And finally, ObservableSubscribeOn wraps ObservableJust, which simply gives its value to onNext.
Naturally, just with the number is not interesting. Therefore, here is a source that receives a list of products and finds out prices for them. Prices can be received only for 20 pieces (the same restriction for the InAppBilling API).
→
github.com/a-dminator/rx-products-and-pricesThis example was created to demonstrate the principle of operation, and not for use in real projects.RxJava has a huge number of different source implementations. All of them work on the same principle, and the details are perfectly described in the documentation. Therefore, I will not dwell on them.
Operations
All operations on sources are divided into 2 types:
-
Non-terminal returns a new source that wrapped the original
-
Terminal execute a chain and receive data
(subscribe, map ...)And yes, nothing will be fulfilled until the terminal operation is performed. A chain can be stored in memory as many things as possible without doing anything at all. And this is good, because if we do not receive data, then why produce them?
(Lazy calculations without Haskell included!).Similar to the Streams API from [2]Dispose (Unsubscribe in RxJava 1)
The execution of the chain can be interrupted. This is done by calling dispose () on DisposableObserver
(unsubscribe () on Subscriber in RxJava 1) .
After that, RxJava will stop executing chains, write off all Observers and cause iterrupt () on threads that are no longer needed.
You can also find out if the execution is not interrupted from sources. For this, Emitter has an isDispose () method
(isUnsubscribe () for RxJava 1) .
This has a logical but unpleasant feature: since the Observer is responsible for error handling, now all the errors are crashing the application. I have not yet found a solution that I am ready to write about.
Conclusion
RxJava:
- Allows you to easily compose requests to the network, database, etc; organizing their asynchronous execution. This means that your users will get a faster and more responsive application.
- Does not contain any magic. Only the design and execution of chains of sources.
- (For me) Solves more problems than it creates!
Thanks to all.
[1]
Video[2]
Video