
One of the main ideas underlying Rx is that it is not known exactly when the sequence will produce a new value or end. However, we have the ability to control the time at which we start or end to receive these values. In addition, if our subscribers use external resources, we will probably want to release them at the end of a certain sequence.
Content:
- Part One - Introduction
- Why Rx?
- Key types
- Subscription life cycle
- Part Two - Sequences
- Creating a sequence
- Sequence filtering
- Study
- Aggregation
- Transformation
- Part Three - Sequence Management
- Part Four - Parallelism
Subscription
There are several overloaded
Observable :: subscribe methods that perform the same function.
Subscription subscribe() Subscription subscribe(Action1<? super T> onNext) Subscription subscribe(Action1<? super T> onNext, Action1<java.lang.Throwable> onError) Subscription subscribe(Action1<? super T> onNext, Action1<java.lang.Throwable> onError, Action0 onComplete) Subscription subscribe(Observer<? super T> observer) Subscription subscribe(Subscriber<? super T> subscriber)
subscribe () absorbs events, but by itself does not perform direct actions. Its overloaded versions, having at least one parameter of type
Action , create a
Subscriber object. If you do not pass functions for the
onError and
onCompleted events, they will simply be ignored.
')
Subject<Integer, Integer> s = ReplaySubject.create(); s.subscribe( v -> System.out.println(v), e -> System.err.println(e)); s.onNext(0); s.onError(new Exception("Oops"));
Conclusion
0 java.lang.Exception: Oops
If you do not pass a function to handle errors,
OnErrorNotImplementedException will be thrown in the place where
s.onError is called on the provider side. In this case, the provider
[1] and the consumer
[2] are in the same code block, which allows the use of a traditional
try-catch . However, in reality, the provider and the consumer may be located in different places. In this case, if the consumer does not provide a function for handling errors, he will never know when and for what reason the sequence ended.
Unsubscribe
You can stop receiving data before the sequence ends. All overloads of the
subscribe method return a
Subscribtion interface object, which has 2 methods:
boolean isUnsubscribed() void unsubscribe()
An unsubscribe call
will stop the arrival of events in observer.
Subject<Integer, Integer> values = ReplaySubject.create(); Subscription subscription = values.subscribe( v -> System.out.println(v), e -> System.err.println(e), () -> System.out.println("Done") ); values.onNext(0); values.onNext(1); subscription.unsubscribe(); values.onNext(2);
Conclusion 0 1
By unsubscribing one subscriber, we will not affect other subscribers of the same ovbservable.
Subject<Integer, Integer> values = ReplaySubject.create(); Subscription subscription1 = values.subscribe( v -> System.out.println("First: " + v) ); Subscription subscription2 = values.subscribe( v -> System.out.println("Second: " + v) ); values.onNext(0); values.onNext(1); subscription1.unsubscribe(); System.out.println("Unsubscribed first"); values.onNext(2);
Conclusion First: 0 Second: 0 First: 1 Second: 1 Unsubscribed first Second: 2
onError and onCompleted
onError and
onCompleted indicate the completion of a sequence. A bona fide observable that follows contracts Rx will cease to issue values after the occurrence of one of these events. This is what you should always remember when creating your own
Observable .
Subject<Integer, Integer> values = ReplaySubject.create(); Subscription subscription1 = values.subscribe( v -> System.out.println("First: " + v), e -> System.out.println("First: " + e), () -> System.out.println("Completed") ); values.onNext(0); values.onNext(1); values.onCompleted(); values.onNext(2);
Conclusion First: 0 First: 1 Completed
Resource Release
A subscription keeps in memory the resources with which it is associated. These resources will not be automatically freed when the
Subscription object
goes out of scope. If, after calling the
subscribe method, to ignore the return value, there is a risk of losing the only opportunity to unsubscribe. Subscription will continue to exist, while access to it will be lost, which can lead to memory leaks and unwanted actions.
There are exceptions. For example, when you call overloaded
subscribe methods that implicitly construct a
Subscriber object, a mechanism will be created that will automatically unbind the subscribers at the moment the sequence ends. However, even in this case, one should keep in mind endless sequences. You will still need a
Subscription object to stop receiving data from them at some point.
There are several implementations of the
Subscription interface:
- BooleanSubscription
- CompositeSubscription
- MultipleAssignmentSubscription
- RefCountSubscription
- Safesubscriber
- Scheduler.Worker
- SerializedSubscriber
- SerialSubscription
- Subscriber
- TestSubscriber
We will meet them in future articles. It is worth noting that
Subscriber also implements the
Subscription , which means that we also have the opportunity to unsubscribe using the link to the
Subscriber object.
With an understanding of the life cycle of subscriptions, you gain control over their associated resources. This will allow your program to be more predictable, easily supported and expandable, and hopefully less susceptible to bugs.
In the next part, we will learn how to create and process sequences using the powerful library toolkit.
[1] He who controls (creates) observable - Note.
[2] Anyone who uses values issued by observable - Note.