Now that we understand the basic principles of Rx, it is time to learn how to create and manage sequences. The sequence management style was borrowed from the original C # LINQ , which in turn was inspired by functional programming. We will divide all operations by topics, which are sorted in order of increasing complexity of operations. Most of the Rx operators control existing sequences, but first we learn how to create them.
Previously , we used Subject
and manually applied values to them to create a sequence. We did this to demonstrate some key points, including the main Rx method subscribe
. In most cases, the Subject
is not the best way to create a new Observable
. In this section, we will look at more elegant ways to do this.
just
creates an Observable
, which will produce a predetermined number of values, and then it will end.
Observable<String> values = Observable.just("one", "two", "three"); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") );
Received: one Received: two Received: three Completed
This Observable
will only onCompleted
an onCompleted
event and nothing else.
Observable<String> values = Observable.empty(); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") );
Completed
This Observable
will never issue anything.
Observable<String> values = Observable.never(); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") );
The code above will not print anything. But this does not mean that the program is blocked. In fact, it just ends instantly.
This Observable
will throw an onError event and terminate.
Observable<String> values = Observable.error(new Exception("Oops")); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") );
Error: java.lang.Exception: Oops
defer
does not create a new Observable
, but allows you to determine how the Observable
will be created when subscribers appear. Think about how you would create an Observable
that will issue the current time? Since there is only one meaning, it seems that just
can help us here.
Observable<Long> now = Observable.just(System.currentTimeMillis()); now.subscribe(System.out::println); Thread.sleep(1000); now.subscribe(System.out::println);
1431443908375 1431443908375
Notice how the second subscriber, subscribing a second later, got the same time. This is because the time value was calculated only once: when the execution reaches the just
method. However, in our case, we want to calculate the current time for each subscription. defer
accepts a function that returns Observable
and will be executed for each new subscriber.
Observable<Long> now = Observable.defer(() -> Observable.just(System.currentTimeMillis())); now.subscribe(System.out::println); Thread.sleep(1000); now.subscribe(System.out::println);
1431444107854 1431444108858
create
is a very powerful method of creating Observable
.
static <T> Observable<T> create(Observable.OnSubscribe<T> f)
Everything is much simpler than it looks. Inside there is only a function that accepts Subscriber
for type T
Inside it, we can manually determine the events that will be issued to the subscriber.
Observable<String> values = Observable.create(o -> { o.onNext("Hello"); o.onCompleted(); }); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") );
Received: Hello Completed
When someone signs up to our Observable
( values
in this case), the corresponding Subscriber
instance will be passed to the create
function. As the code runs, the values will be transferred to the subscriber. Note that you need to call the onCompleted
method yourself to signal the end of the sequence.
This method is the recommended way to create an Observable
in case none of the other methods are appropriate. This is similar to how we created the Subject
and manually applied values to it, however there are several important differences. First of all, the event source is neatly encapsulated and separated from other code. Secondly, the Subject
have unobvious hazards: anyone who has access to the object can change the sequence. We will return to this problem later.
Another key difference with using Subject
is that the code is executed "lazily", only when a new subscriber arrives. In the example above, the code is executed not at the time of creation of Observable
(since there are no subscribers yet), but at the time of calling the subscribe
method. This means that the values will be recalculated for each subscriber, as in the ReplaySubject
. The end result is similar to ReplaySubject
, with the exception of caching. With the help of create
we can also easily transfer execution to a separate stream, while with the ReplaySubject
we would have to manually create threads to calculate values. We will also look at ways to make the execution of the onSubscribe
method parallel.
You may have noticed that any of the previous Observable
can be implemented using Observable.create
. Our create
example is equivalent to Observable.just("hello")
.
In functional programming, the creation of infinite sequences is commonplace.
Simple and familiar to functional programmers method. Gives values from the specified range.
Observable<Integer> values = Observable.range(10, 15);
This example consistently yields values from 10 to 24.
This function will create an infinite sequence of values separated by a specified time interval.
Observable<Long> values = Observable.interval(1000, TimeUnit.MILLISECONDS); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") ); System.in.read();
Received: 0 Received: 1 Received: 2 Received: 3 ...
The sequence is not completed until we unsubscribe.
Attention should be paid to why blocking input at the end of the example is required. Without it, the program will end without printing. This is because all our operations are non-blocking: we create periodically issuing Observable
values, then register a subscriber who performs some actions at the time of arrival of these values. None of this blocks the main thread from terminating.
There are two overloads of Observable.timer
. The first option creates Observable
issuing 0L
after a specified period of time.
Observable<Long> values = Observable.timer(1, TimeUnit.SECONDS); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") );
Received: 0 Completed
The second option waits for a specified period of time, then begins to produce values as well as interval
with a specified frequency.
Observable<Long> values = Observable.timer(2, 1, TimeUnit.SECONDS); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") );
Received: 0 Received: 1 Received: 2 ...
The example above waits 2 seconds, then starts counting every second.
In java, there are tools for working with sequences, collections, and asynchronous events that may not be directly compatible with Rx. Now we will look at how you can turn them into the input data of your Rx code.
If you use EventHandlers, then using Observable.create
you can create a sequence from events.
Observable<ActionEvent> events = Observable.create(o -> { button2.setOnAction(new EventHandler<ActionEvent>() { @Override public void handle(ActionEvent e) { o.onNext(e) } }); })
Depending on the specific event, its type (in this case, ActionEvent
) itself can carry enough information to become the type of your Observable
. However, very often you may need something else, for example, the value of a certain field at the time of the event. It is best to get the value of such a field inside the handler while the UI stream is blocked and the field values are relevant. And although there are no guarantees that the value will remain unchanged until reaching the final subscriber, in a correctly implemented Rx code, changes are controlled on the consumer side [1].
You can turn any input into Observable
using create
. However, for common data types, ready-made methods exist to make this process easier.
Future
's are part of Java and you must have come across them while working with frameworks that use multi-threading. They are a less powerful multi-threaded tool than Rx, since they return only one value. Typically, you will want to turn them into Observable
.
FutureTask<Integer> f = new FutureTask<Integer>(() -> { Thread.sleep(2000); return 21; }); new Thread(f).start(); Observable<Integer> values = Observable.from(f); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") );
Received: 21 Completed
Observable
FutureTask
result of FutureTask
in readiness, and then terminates. If the task has been canceled, observable will generate a java.util.concurrent.CancellationException
error.
If you are interested in the result of the Future
for a limited time only, it is possible to set a timeout as an argument.
Observable<Integer> values = Observable.from(f, 1000, TimeUnit.MILLISECONDS);
If during this time Future
does not end, observable ignores the result and TimeoutException
.
With Observable.from
you can turn any collection into a sequence. An Observable
will be created, issuing each element of the collection separately and onCompleted
at the end.
Integer[] is = {1,2,3}; Observable<Integer> values = Observable.from(is); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") );
Received: 1 Received: 2 Received: 3 Completed
Observable
is not the same as Iterable
or Stream
. Observable
push-oriented, in the sense that the onNext
call onNext
stack of handlers to execute right up to the last subscribe
method. The remaining models are pull-oriented — values are requested from the other side and execution is blocked until the result is returned.
[1] consumer , who absorbs values issued by Observable
Now the project has its own public repository and anyone can join the creation of an in-depth Russian-language tutorial on Rx. The translation of this part is already there, the rest will appear soon, and with your help, even faster.
Source: https://habr.com/ru/post/281633/
All Articles