📜 ⬆️ ⬇️

Rx Event Source Reference



RxJava is used in a large number of android-applications, but at the same time many do not know other sources of events, except Observable and, maybe, Flowable. They forget about the specialized classes Single, Maybe and Completable, which are often able to add more clarity to the code.

Under the cut you are waiting for a cheat sheet on the sources of events that exist in RxJava.

Completable is in fact the Rx equivalent of Runnable. It is an operation that can be performed or not. If we draw an analogy with Kotlin, then this is fun completable () from the world of Rx. Accordingly, to subscribe to it, you must implement onComplete and onError. It cannot be created from the value (Observable # just, ...), because it is not designed for this.
')
Single - reactive Callable, because here it is possible to return the result of the operation. Continuing the comparison with Kotlin, we can say that Single is a fun single (): T {}. Thus, to subscribe to it, you must implement onSuccess (T) and onError.

Maybe is a cross between Single and Completable, because it supports one value, no values, and an error. It’s harder to draw a straightforward parallel with the methods, but I think Maybe is fun maybe (): T? {}, which returns null when there is no result. It is easy to guess that for a subscription you will need to define onSuccess (T), onComplete and onError.
It is important to note here that onSuccess (T) and onComplete are mutually exclusive. Those. in the case of the first call, you can not wait for the second.
Observable is the most common source due to its versatility. He knows how not to produce events at all, and to generate many of those, so it can be used whenever the other options are not suitable. Despite this, Observable has a flaw - it doesn’t know how to handle backpressure at all. To subscribe to it, you need onNext (T), onError and onComplete.

Backpressure - a situation when new events arrive much faster than they are processed, and begin to accumulate in the buffer, overflowing it. This can lead to troubles like OutOfMemoryError. More details can be found here .

ConnectableObservable - a heated version of Observable. All data sources start issuing their event stream at the time of subscription. But not this guy. To do this, ConnectableObservable is waiting for a connect call. This is done so that several observers can observe one stream of events without restarting it at each subscription. For illustration, I will give the following snippet:

val observable = Observable.fromCallable { Log.d("RxLogs", "observable fromCallable executed") Thread.sleep(1000) }.subscribeOn(Schedulers.computation()) observable.subscribe() observable.subscribe() observable.subscribe() observable.subscribe() 
The console will be:
observable from Callable executed
observable from Callable executed
observable from Callable executed
observable from Callable executed

 val connectedObservable = Observable.fromCallable { Log.d("RxLogs", "connectedObservable fromCallable executed") Thread.sleep(1000) }.subscribeOn(Schedulers.computation()) .publish() connectedObservable.subscribe() connectedObservable.subscribe() connectedObservable.subscribe() connectedObservable.subscribe() connectedObservable.connect() 

And in this case: observable from Callable executed

Flowable is a source that provides additional operators for backpressure processing. When you need to handle more than 10,000 events that occur quickly one after the other, it is recommended to use it instead of Observable.

The latter can create ConnectableFlowable, which opens the same possibilities as ConnectableObservable.

Speaking of event generators, not to mention Subject and Processor.

Subject - a class that can be both a source and a browser. This allows you to use it, for example, in all sorts of controllers, which will give it out in the form of Observable and inside notify as Observer. Next, we go through the different implementations of this class.

AsyncSubject / AsyncProcessor holds the last event until the thread correctly ends, and then gives it to subscribers. If an error occurs, no events will be forwarded.

image

PublishSubject / PublishProcessor sends events coming to it further, until a terminal signal arrives. After the end of the stream or error, it returns the corresponding events.

image

BehaviorSubject / BehaviorProcessor works in the same way as PublishSubject / PublishProcessor, but when subscribing, it returns the last event, if it exists and if the Subject did not go to the terminal state.

image

ReplaySubject / ReplayProcessor - BehaviourSubject / BehaviorProcessor on steroids. Returns not one last event, but as much as you like. If you subscribe to a completed ReplaySubject or ReplayProcessor, then all accumulated data will be received.

image
Thus, ReplaySubject.createWithSize (1) and BehaviourSubject.create () work differently after switching to the terminal state. During the subscription, the first will return the last event, and the second will not. The same is true for the ReplayProcessor.
CompletableSubject , MaybeSubject and SingleSubject work in the same way as PublishSubject, only designed for use with Completable, Maybe and Single respectively.

UnicastSubject / UnicastProcessor is actually a ReplaySubject, which ensures that it has only one subscriber. It throws an IllegalStateException when trying to re-subscribe.

image

Those. next snippet

 val subject = UnicastSubject.create<String>(3) repeat(3) { subject.onNext(it.toString()) } subject.onComplete() subject.subscribe({ Log.d("RxLogs", it) }, { }, { Log.d("RxLogs", "complete") }) 

will log
0
one
2
complete

MulticastProcessor works similarly to PublishProcessor, except for one small feature. He is able to handle backpressure for the stream entering it. MulticastProcessor allows you to set the size of the buffer in which it will pre-request items from upstream for future subscribers.

In the diagram below, a processor is created with a storage for 2 elements, which it immediately requests from its source. Therefore, when the first observer subscribes to it, it immediately gives the contents of the buffer, which is instantly filled with new events. After the terminal event, MulticastProcessor clears its repository and new subscribers immediately receive the completion of the stream.

image

Source: https://habr.com/ru/post/459174/


All Articles