📜 ⬆️ ⬇️

Introduction to RxJava: Creating a Sequence

image

Now that we understand the basic principles of Rx, it is time to learn how to create and manage sequences. The sequence management style was borrowed from the original C # LINQ , which in turn was inspired by functional programming. We will divide all operations by topics, which are sorted in order of increasing complexity of operations. Most of the Rx operators control existing sequences, but first we learn how to create them.


Content
Content:

  • Part One - Introduction
    1. Why Rx?
    2. Key types
    3. Subscription life cycle
  • Part Two - Sequences
    1. Creating a sequence
    2. Sequence filtering
    3. Study
    4. Aggregation
    5. Transformation
  • Part Three - Sequence Management
  • Part Four - Parallelism

Part 2 - Sequence Basics


Creating a sequence


Previously , we used Subject and manually applied values ​​to them to create a sequence. We did this to demonstrate some key points, including the main Rx method subscribe . In most cases, the Subject is not the best way to create a new Observable . In this section, we will look at more elegant ways to do this.


Simple factory methods


Observable.just


just creates an Observable , which will produce a predetermined number of values, and then it will end.


 Observable<String> values = Observable.just("one", "two", "three"); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") ); 

Conclusion


 Received: one Received: two Received: three Completed 

Observable.empty


This Observable will only onCompleted an onCompleted event and nothing else.


 Observable<String> values = Observable.empty(); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") ); 

Conclusion


 Completed 

Observable.never


This Observable will never issue anything.


 Observable<String> values = Observable.never(); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") ); 

The code above will not print anything. But this does not mean that the program is blocked. In fact, it just ends instantly.


Observable.error


This Observable will throw an onError event and terminate.


 Observable<String> values = Observable.error(new Exception("Oops")); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") ); 

Conclusion


 Error: java.lang.Exception: Oops 

Observable.defer


defer does not create a new Observable , but allows you to determine how the Observable will be created when subscribers appear. Think about how you would create an Observable that will issue the current time? Since there is only one meaning, it seems that just can help us here.


 Observable<Long> now = Observable.just(System.currentTimeMillis()); now.subscribe(System.out::println); Thread.sleep(1000); now.subscribe(System.out::println); 

Conclusion


 1431443908375 1431443908375 

Notice how the second subscriber, subscribing a second later, got the same time. This is because the time value was calculated only once: when the execution reaches the just method. However, in our case, we want to calculate the current time for each subscription. defer accepts a function that returns Observable and will be executed for each new subscriber.


 Observable<Long> now = Observable.defer(() -> Observable.just(System.currentTimeMillis())); now.subscribe(System.out::println); Thread.sleep(1000); now.subscribe(System.out::println); 

Conclusion


 1431444107854 1431444108858 

Observable.create


create is a very powerful method of creating Observable .


 static <T> Observable<T> create(Observable.OnSubscribe<T> f) 

Everything is much simpler than it looks. Inside there is only a function that accepts Subscriber for type T Inside it, we can manually determine the events that will be issued to the subscriber.


 Observable<String> values = Observable.create(o -> { o.onNext("Hello"); o.onCompleted(); }); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") ); 

Conclusion


 Received: Hello Completed 

When someone signs up to our Observable ( values in this case), the corresponding Subscriber instance will be passed to the create function. As the code runs, the values ​​will be transferred to the subscriber. Note that you need to call the onCompleted method yourself to signal the end of the sequence.


This method is the recommended way to create an Observable in case none of the other methods are appropriate. This is similar to how we created the Subject and manually applied values ​​to it, however there are several important differences. First of all, the event source is neatly encapsulated and separated from other code. Secondly, the Subject have unobvious hazards: anyone who has access to the object can change the sequence. We will return to this problem later.


Another key difference with using Subject is that the code is executed "lazily", only when a new subscriber arrives. In the example above, the code is executed not at the time of creation of Observable (since there are no subscribers yet), but at the time of calling the subscribe method. This means that the values ​​will be recalculated for each subscriber, as in the ReplaySubject . The end result is similar to ReplaySubject , with the exception of caching. With the help of create we can also easily transfer execution to a separate stream, while with the ReplaySubject we would have to manually create threads to calculate values. We will also look at ways to make the execution of the onSubscribe method parallel.


You may have noticed that any of the previous Observable can be implemented using Observable.create . Our create example is equivalent to Observable.just("hello") .


Functional methods


In functional programming, the creation of infinite sequences is commonplace.


Observable.range


Simple and familiar to functional programmers method. Gives values ​​from the specified range.


 Observable<Integer> values = Observable.range(10, 15); 

This example consistently yields values ​​from 10 to 24.


Observable.interval


This function will create an infinite sequence of values ​​separated by a specified time interval.


 Observable<Long> values = Observable.interval(1000, TimeUnit.MILLISECONDS); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") ); System.in.read(); 

Conclusion


 Received: 0 Received: 1 Received: 2 Received: 3 ... 

The sequence is not completed until we unsubscribe.


Attention should be paid to why blocking input at the end of the example is required. Without it, the program will end without printing. This is because all our operations are non-blocking: we create periodically issuing Observable values, then register a subscriber who performs some actions at the time of arrival of these values. None of this blocks the main thread from terminating.


Observable.timer


There are two overloads of Observable.timer . The first option creates Observable issuing 0L after a specified period of time.


 Observable<Long> values = Observable.timer(1, TimeUnit.SECONDS); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") ); 

Conclusion


 Received: 0 Completed 

The second option waits for a specified period of time, then begins to produce values ​​as well as interval with a specified frequency.


 Observable<Long> values = Observable.timer(2, 1, TimeUnit.SECONDS); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") ); 

Conclusion


 Received: 0 Received: 1 Received: 2 ... 

The example above waits 2 seconds, then starts counting every second.


Turning into an Observable


In java, there are tools for working with sequences, collections, and asynchronous events that may not be directly compatible with Rx. Now we will look at how you can turn them into the input data of your Rx code.


If you use EventHandlers, then using Observable.create you can create a sequence from events.


 Observable<ActionEvent> events = Observable.create(o -> { button2.setOnAction(new EventHandler<ActionEvent>() { @Override public void handle(ActionEvent e) { o.onNext(e) } }); }) 

Depending on the specific event, its type (in this case, ActionEvent ) itself can carry enough information to become the type of your Observable . However, very often you may need something else, for example, the value of a certain field at the time of the event. It is best to get the value of such a field inside the handler while the UI stream is blocked and the field values ​​are relevant. And although there are no guarantees that the value will remain unchanged until reaching the final subscriber, in a correctly implemented Rx code, changes are controlled on the consumer side [1].


Observable.from


You can turn any input into Observable using create . However, for common data types, ready-made methods exist to make this process easier.


Future 's are part of Java and you must have come across them while working with frameworks that use multi-threading. They are a less powerful multi-threaded tool than Rx, since they return only one value. Typically, you will want to turn them into Observable .


 FutureTask<Integer> f = new FutureTask<Integer>(() -> { Thread.sleep(2000); return 21; }); new Thread(f).start(); Observable<Integer> values = Observable.from(f); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") ); 

Conclusion


 Received: 21 Completed 

Observable FutureTask result of FutureTask in readiness, and then terminates. If the task has been canceled, observable will generate a java.util.concurrent.CancellationException error.


If you are interested in the result of the Future for a limited time only, it is possible to set a timeout as an argument.


 Observable<Integer> values = Observable.from(f, 1000, TimeUnit.MILLISECONDS); 

If during this time Future does not end, observable ignores the result and TimeoutException .


With Observable.from you can turn any collection into a sequence. An Observable will be created, issuing each element of the collection separately and onCompleted at the end.


 Integer[] is = {1,2,3}; Observable<Integer> values = Observable.from(is); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") ); 

Conclusion


 Received: 1 Received: 2 Received: 3 Completed 

Observable is not the same as Iterable or Stream . Observable push-oriented, in the sense that the onNext call onNext stack of handlers to execute right up to the last subscribe method. The remaining models are pull-oriented — values ​​are requested from the other side and execution is blocked until the result is returned.


[1] consumer , who absorbs values ​​issued by Observable


Now the project has its own public repository and anyone can join the creation of an in-depth Russian-language tutorial on Rx. The translation of this part is already there, the rest will appear soon, and with your help, even faster.


')

Source: https://habr.com/ru/post/281633/


All Articles