Observables
and Subscribers
1 . Observable
is the data source, and Subscriber
is the consumer.Observable
data generation always occurs in accordance with the same order of actions: Observable
"radiates" a certain amount of data (including Observable
may not emit anything), and completes its work, either successfully or with an error. For each Subscriber
subscribed to Observable
, the Subscriber.onNext()
method is called for each element of the data stream, after which Subscriber.onComplete()
and Subscriber.onError()
can be called.Observables
often do not begin to generate data until someone explicitly subscribes to them 2 . In other words: if a tree falls and nobody is near, then the sound of its fall is not audible .Observable
: Observable<String> myObservable = Observable.create( new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> sub) { sub.onNext("Hello, world!"); sub.onCompleted(); } } );
Observable
spawns the string “Hello, world!”, And ends its work. Now we will create a Subscriber
to accept the data and do something with it. Subscriber<String> mySubscriber = new Subscriber<String>() { @Override public void onNext(String s) { System.out.println(s); } @Override public void onCompleted() { } @Override public void onError(Throwable e) { } };
Subscriber
does is print the lines passed to it by Observable
. Now that we have myObservable
and mySubscriber
, we can link them together using the subscribe()
method: myObservable.subscribe(mySubscriber); // "Hello, world!"
mySubscriber
to myObservable
, myObservable
calls the mySubscriber
onNext()
and onComplete()
methods of onNext()
, and as a result, mySubscriber
outputs “Hello, world!” To the console and finishes its execution.Observable
. In RxJava, there are methods for creating Observable
, suitable for solving the most typical tasks. In our case, Observable.just()
spawns one data element, and then completes its execution, just like our first version 3 : Observable<String> myObservable = Observable.just("Hello, world!");
Subscriber
. We are not interested in the onCompleted()
and onError()
methods, so we can use another base class to determine what to do in onNext()
: Action1<String> onNextAction = new Action1<String>() { @Override public void call(String s) { System.out.println(s); } };
Action
can be used to replace any part of Subscriber
: Observable.subscribe()
can accept one, two, or three Action
parameters that will be executed instead of onNext()
, onError()
and onCompete()
. That is, we can replace our Subscriber
like this: myObservable.subscribe(onNextAction, onErrorAction, onCompleteAction);
onError()
and onCompete()
, we can simplify the code even more: myObservable.subscribe(onNextAction); // "Hello, world!"
Observable.just("Hello, world!") .subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println(s); } });
Observable.just("Hello, world!") .subscribe(s -> System.out.println(s));
Observable
: Observable.just("Hello, world! -Dan") .subscribe(s -> System.out.println(s));
Observable
is defined, but this will not always be the case - for example, when you use someone’s library. Another problem: what if we use our Observable
in many places, but we want to add a signature only in some cases ?Subscriber
: Observable.just("Hello, world!") .subscribe(s -> System.out.println(s + " -Dan"));
Observable
and Subscriber
for data manipulation. There are a lot of operators in RxJava, so for a start it will be better to focus on only a few.map()
operator would be best suited for converting one data element to another: Observable.just("Hello, world!") .map(new Func1<String, String>() { @Override public String call(String s) { return s + " -Dan"; } }) .subscribe(s -> System.out.println(s));
Observable.just("Hello, world!") .map(s -> s + " -Dan") .subscribe(s -> System.out.println(s));
map()
operator, roughly speaking, is Observable
, which transforms a data item into it. We can create a chain of as many map()
as we see fit to give the data the most convenient and simple form in order to facilitate the task to our Subscriber
.map()
is that it does not have to generate data of the same type as the original Observable
.Subscriber
should display not a generated text, but its hash: Observable.just("Hello, world!") .map(new Func1<String, Integer>() { @Override public Integer call(String s) { return s.hashCode(); } }) .subscribe(i -> System.out.println(Integer.toString(i)));
Subscriber
accepts Integer. By the way, we again forgot about lambdas: Observable.just("Hello, world!") .map(s -> s.hashCode()) .subscribe(i -> System.out.println(Integer.toString(i)));
Subscriber
do as little work as possible, so let's apply another map()
to convert our hash back to String
: Observable.just("Hello, world!") .map(s -> s.hashCode()) .map(i -> Integer.toString(i)) .subscribe(s -> System.out.println(s));
Observable
and Subscriber
now look the same as at the very beginning! We just added a few intermediate steps that transform our data. We could even add the code again, adding my signature to the generated lines: Observable.just("Hello, world!") .map(s -> s + " -Dan") .map(s -> s.hashCode()) .map(i -> Integer.toString(i)) .subscribe(s -> System.out.println(s));
Observable
and Subscriber
can do anythingObservable
can be a database query, and Subscriber
can display the results of a query. Observable
can also be a click on the screen, Subscriber
can contain a reaction to this click. Observable
can be a stream of bytes received from the network, while a Subscriber
can write this data to a storage device.Observable
and Subscriber
do not depend on intermediate steps between themmap()
calls as you like between the Observable
and the Subscriber
subscribed to it. The system is easily composable , and with its help it is very easy to manage the flow of data. If the operators work with the correct input / output data, you can write a chain of transformations of infinite length 4 .map()
operator, and you cannot write much with it. In the second part of this article, we will look at the large number of operators available to you out of the box when you use RxJava.Subscriber
implements the Observer
interface, and therefore the “basic building block” can be called the last, but in practice you will most often use Subscriber
, because it has several additional useful methods, including Subscriber.unsubscribe()
.Observables
. The hot Observable
spawns data all the time, even if no one subscribes to it. Cold Observable
, respectively, generates data only if it has at least one subscriber (it is the cold Observables
that are used in the article). For the initial stages of learning RxJava, this difference is not so important.Observable.just()
not a complete analog of our original code, but I will explain why this happens only in the third part of the article.Source: https://habr.com/ru/post/265269/
All Articles