In the introductory article, we looked at the benefits of a reactive approach to Java programming, as well as situations in which the Rx library is more or less useful. In this part we will look at the main types on which the concept of reactive programming is based and several additional classes that will help us in understanding the principles of the library.Content:
Key types
Rx is based on two fundamental types, while some others extend their functionality. These basic types are
Observable and
Observer , which we will look at in this section. We will also look at
Subjects — they will help in understanding the basic concepts of Rx.
Rx is built on the
Observer pattern. There is nothing new in this, event handlers already exist in Java (for example, JavaFX EventHandler [
1 ]), but they lose out on Rx for the following reasons:
- Event handling in them is difficult to compose
- Their call cannot be postponed.
- May lead to memory leak
- There is no easy way to report the end of a stream of events.
- Requires manual multithreading control.
Observable
Observable is the first base type we will look at. This class contains the bulk of the Rx implementation, including all the basic operators. We will look at them later, but for now we should understand the principle of the
subscribe method. Here is the key overload [
2 ]:
')
public final Subscription subscribe(Observer<? super T> observer)
The
subscribe method is used to get the data returned by [
3 ] observable. These data are transmitted to the observer, who assumes their processing depending on the requirements of the consumer. The observer in this case is an implementation of the
Observer interface.
Observable reports three kinds of events:
- Data
- Signal of completion of the sequence [ 4 ] (which means that there will be no more data)
- An error if the sequence ended due to an exception (this event also implies the completion of the sequence)
Observer
Rx provides an abstract implementation of Observer,
Subscriber .
Subscriber implements additional functionality and, as a rule, it should be used to implement the
Observer . However, for starters, consider only the interface:
interface Observer<T> { void onCompleted(); void onError(java.lang.Throwable e); void onNext(T t); }
These three methods are behavior that describes the observer's reaction to a message from observable.
The observer's onNext will be called 0 or more times, optionally followed by
onCompleted or
onError . After them there will be no more calls.
When developing code with Rx, you will see a lot of
Observable , but a lot less
Observer . And although it is necessary to understand the concept of
Observer , there are ways that do not require the direct creation of its instance.
Implementing Observable and Observer
You can manually implement
Observer and
Observable . In reality, this is usually not necessary: Rx provides ready-made solutions to simplify development. It may also not be entirely secure, since the interaction between parts of the Rx library includes principles and internal infrastructure that may not be obvious to the newcomer. In any case, it will be easier to start using the many tools already provided by the library to create the functionality we need.
To subscribe to observable, there is absolutely no need for an
observer implementation. There are other overloads of the
subscribe method that take as arguments the corresponding functions for
onNext ,
onError, and
onSubscribe , which encapsulate the creation of an
Observer instance. It is not necessary to provide them all either, you can describe only a part of them, for example, only
onNext or only
onNext and
onError .
Lambda expressions in Java 1.8 make these overloads very suitable for use in short examples in this series of articles.
Subject
Subjects are extensions of the
Observable , while simultaneously implementing the Observer interface. The idea may seem strange, but in certain cases they make some things a lot easier. They can receive event messages (as observer) and report them to their subscribers (as observable). This makes them the ideal starting point for exploring Rx code: when you have data coming in from outside, you can pass it on to the Subject, turning it into observable in this way.
There are several implementations of Subject. Now we consider the most important of them.
PublishSubject
PublishSubject is the simplest implementation of
Subject . When data is sent to the
PublishSubject , it issues it to all subscribers who are currently subscribed to it.
public static void main(String[] args) { PublishSubject<Integer> subject = PublishSubject.create(); subject.onNext(1); subject.subscribe(System.out::println); subject.onNext(2); subject.onNext(3); subject.onNext(4); }
Conclusion: 2 3 4
As we see,
1 was not printed due to the fact that we were not signed at the moment when it was transmitted. After we signed up, we started getting all the values coming in to subject.
Here we use the
subscribe method for the first time, so it’s worth paying attention to. In this case, we use an overloaded version that accepts one object of the
Function class responsible for
onNext . This function accepts a value of type Integer and returns nothing. Functions that return nothing are also called actions. We can transfer this function in the following ways:
- Provide an object of class Action1 <Integer>
- Implicitly create one using lambda expression
- Send a link to an existing method with the appropriate signature. In this case, System.out :: println has an overloaded version that accepts Object , so we pass a reference to it. Thus, the subscription allows us to print all incoming numbers to the Subject into the main output stream.
Replaysubject
ReplaySubject has a special ability to cache all the data entered into it. When he gets a new subscriber, the sequence is issued to him from the beginning. All subsequent incoming data will be issued to subscribers as usual.
ReplaySubject<Integer> s = ReplaySubject.create(); s.subscribe(v -> System.out.println("Early:" + v)); s.onNext(0); s.onNext(1); s.subscribe(v -> System.out.println("Late: " + v)); s.onNext(2);
Conclusion Early:0 Early:1 Late: 0 Late: 1 Early:2 Late: 2
All values were obtained, despite the fact that one of the subscribers subscribed later than the other. Please note that before getting a new value, the subscriber gets all missing. Thus, the sequence order for the subscriber is not broken.
Caching everything is not always the best idea, since sequences can be long or even infinite. The factory method
ReplaySubject.createWithSize limits the size of the buffer, and
ReplaySubject.createWithTime limits the time that objects will remain in the cache.
ReplaySubject<Integer> s = ReplaySubject.createWithSize(2); s.onNext(0); s.onNext(1); s.onNext(2); s.subscribe(v -> System.out.println("Late: " + v)); s.onNext(3);
Conclusion Late: 1 Late: 2 Late: 3
This time our subscriber missed the first value that fell out of the buffer of size 2. In the same way, over time, objects drop out of the buffer.
Subject created with
createWithTime .
ReplaySubject<Integer> s = ReplaySubject.createWithTime(150, TimeUnit.MILLISECONDS, Schedulers.immediate()); s.onNext(0); Thread.sleep(100); s.onNext(1); Thread.sleep(100); s.onNext(2); s.subscribe(v -> System.out.println("Late: " + v)); s.onNext(3);
Conclusion Late: 1 Late: 2 Late: 3
Creating a time-
bound ReplaySubject requires a
Scheduler object, which is a time representation in Rx. We will definitely return to the schedulers in the section about multithreading.
ReplaySubject.createWithTimeAndSize restricts the buffer in both parameters.
BehaviorSubject
BehaviorSubject stores only the last value. This is the same as
ReplaySubject , but with a buffer size of 1. At creation time, it can be assigned an initial value, thus ensuring that the data will always be available to new subscribers.
BehaviorSubject<Integer> s = BehaviorSubject.create(); s.onNext(0); s.onNext(1); s.onNext(2); s.subscribe(v -> System.out.println("Late: " + v)); s.onNext(3);
Conclusion Late: 2 Late: 3
The initial value is provided in order to be available even before the data is received.
BehaviorSubject<Integer> s = BehaviorSubject.create(0); s.subscribe(v -> System.out.println(v)); s.onNext(1);
Conclusion 0 1
Since the
BehaviorSubject role is to always have available data, it is considered wrong to create it without an initial value, as well as complete it.
Asyncsubject
AsyncSubject also stores the last value. The difference is that it does not issue data until the sequence is completed. It is used when it is necessary to produce a single value and complete immediately.
AsyncSubject<Integer> s = AsyncSubject.create(); s.subscribe(v -> System.out.println(v)); s.onNext(0); s.onNext(1); s.onNext(2); s.onCompleted();
Conclusion 2
Note that if we did not
call s.onCompleted () , this code would not have printed anything.
Implicit infrastructure
As we already mentioned, there are principles that may not be obvious in the code. One of the most important is that no event will be issued after the sequence is completed (
onError or
onCompleted ). The
subject implementation respects these principles:
Subject<Integer, Integer> s = ReplaySubject.create(); s.subscribe(v -> System.out.println(v)); s.onNext(0); s.onCompleted(); s.onNext(1); s.onNext(2);
Conclusion 0
Security cannot be guaranteed wherever Rx is used, so you better be aware and not violate this principle, as this can lead to undefined consequences.
In the
sequel, we will look at the Observable life cycle.
[1] Or familiar to all Event Listeners. -
Note. The author[2] I, nevertheless, believe that the key overload here is exactly the version with the Observer as an argument, in the original the version of the subscribe version is given (Subscriber <? Super T> subscriber) -
Note. The author[3] I will use the word “issue” to describe the data transfer event from the Observable to the Observer (to emit in original). -
Note. The author[4] The author uses the term sequence to denote the set of all data that Observable can issue. -
Note. The author