📜 ⬆️ ⬇️

ReactiveX 2.0 with examples, or grokay reactive programming 2.0. Part 1: Observable vs Flowable, Backpressure

image

Hi, my name is Alex. I write Backend's on Kotlin, as well as developing Android applications. I suffered for a long time: I was tormented with CallBack Hell, imperative style, thread synchronization and other classic Java problems on Android. It was a huge pain. And I started looking for some solution to somehow get rid of this pain. And a happy occasion comes - I meet the increasing hype on RxJava. After trying, I can not stop to this day. At the time of this writing, RxJava 2.0 has been released and there is a strong desire to understand the innovations. At the official source, in the Github Wiki, the chapter appeared RxJava 2.0: What's different in 2.0. But, unfortunately, I am not the owner of "fluent" English, and reading of such important docks took time. Accumulated some notes, there was a concept that I want to share. But in order not to become the “Art Director of Art Space” and not to bring a banal translation, but some kind of profit, this article will be a mixture of tutorial and wiki translation, seasoned with real examples of user cases on RxKotlin.


Since the approaches to the development of Web and Android applications differ, as is the context for using Rx, respectively, the conversation will be conducted in the context of Android development. Who are interested, please under the cat.


Brief introduction


Before starting, I would advise you to read a chic series of articles



And I also liked this article , which I also recommend reading.


RxJava 2.0 has been completely rewritten from scratch on top of the Reactive Streams specification. The specification itself evolved from RxJava 1.x and provides a common baseline for reactive systems and libraries.


Since Reactive-Streams has a different architecture, it provides for modifications of some known types of RxJava. For a few articles, I will try to summarize what has changed and tell you how to rewrite code 1.x to code 2.x.


NullPointerException


Yes, RxJava 2.x will no longer accept NULL values, and the following code will immediately result in a NullPointerException , or an Emitter (spawning thread) will throw an onError event.


 class NullPointerActivity : AppCompatActivity() { override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) var obsJust: TextView? = null var singleJust: TextView? = null var callable: TextView? = null var nullMaping: TextView? = null verticalLayout { obsJust = textView() singleJust = textView() callable = textView() nullMaping = textView() } try { Observable.just(null).subscribe() } catch (e: Exception) { obsJust?.text = e.localizedMessage e.printStackTrace() } try { Single.just(null).subscribe() } catch (e: Exception) { singleJust?.text = e.localizedMessage e.printStackTrace() } Observable.fromCallable{null}.subscribe({Log.d("NullPointerActivity", it)}, { callable?.text = it.localizedMessage it.printStackTrace() }) Observable.just(1).map{null}.subscribe({Log.d("NullPointerActivity", it)}, { nullMaping?.text = it.localizedMessage it.printStackTrace() }) } } 

This means that Observable<Void> can no longer generate any events, and the stream, whether hot or cold, can only end with an onComplete or onError . Well, in some cases, it can just throw Exception even before wrapping the value into a stream, as happens with Single.just(null) and Observable.just(null)


When designing / implementing custom Observable you do not need to define specific types for ObservableEmitter<Any> , which replaced Observable.OnSubscribe<Any> . For example, if you need a source that is similar to signaller, you can define Enum and throw its singleton on onNext :


 enum class Irrelevant { INSTANCE; } val source = Observable.create<Any> {emitter -> Log.d(TAG, "Side-effect 1") emitter.onNext(Irrelevant.INSTANCE) Log.d(TAG, "Side-effect 2") emitter.onNext(Irrelevant.INSTANCE) Log.d(TAG, "Side-effect 3") emitter.onNext(Irrelevant.INSTANCE) } source.subscribe({Log.d(TAG, it.toString())}, Throwable::printStackTrace) 

And as a result, the log will write what was expected:


D / NullPointerActivity: Side-effect 1
D / NullPointerActivity: INSTANCE
D / NullPointerActivity: Side-effect 2
D / NullPointerActivity: INSTANCE
D / NullPointerActivity: Side-effect 3
D / NullPointerActivity: INSTANCE

Observable vs. Flowable


Unfortunately, prior to Rx 2.0, Backpressure was introduced directly into Observable, rather than a separate class with its support. The main problem with Backpressure is that many hot Observables cannot be sufficiently reliable, and under certain circumstances can cause an unexpected MissingBackpressureException . And without a certain accumulated experience, it is very difficult to predict such an exception.


Rx 2.0 fixes this situation. Observable is a class without backpressure, and the new Flowable has been endowed with a backpressure out of the box. Next, we consider the variations where and in which case to use Flowable, and where Observable.


So what time to use?


This question is perhaps the most logical. When implementing your repository classes, business logic classes, when deciding which type, Observable or Flowable , should be accepted and returned, you should consider several factors that will help you avoid problems in getting exceptions such as MissingBackpressureException or OutOfMemoryError because inaccurate use of the wrong type leads to a drop in fps on a performance.


When to use Observable


In those cases, when you have in iteration, relatively speaking, no more than 1000 elements, while this is the worst case and does not imply scaling, you do not need backpressure. In general, we can say that if you feel that there is no chance for an OutOfMemoryException in any particular case, then this is exactly the case when Observable can and should be used. These are mainly UI use cases, a wide variety of events onClick, Touch, Pointer-movement, etc. In fact, these are any events whose frequency does not exceed 1000 Hz. Agree, you are unlikely to be able to click on the touchscreen more than 1000 times per second. But still do not forget the operator debounce.


 class MainActivity : AppCompatActivity() { override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main) RxView.clicks(backpressure) .debounce(200, TimeUnit.MILLISECONDS) .subscribe ({ openNewScreen(BackpressureExampleActivity::class.java) }, Throwable::printStackTrace) RxView.clicks(nullpointer) .debounce(200, TimeUnit.MILLISECONDS) .subscribe ({ openNewScreen(NullPointerActivity::class.java) }, Throwable::printStackTrace) } } 

RxBinding I think everyone knows that the Jake Wharton library returns Observable when binding View, and this fully justifies the concept of Rx2.x


When is Flowable?


When you are already dealing with large or unpredictable amounts of data, say over 10,000 items, or in situations with continuous data generation. Also, the reason for using Flowable is parsing, reading data from a variety of storage media (Internal / External storage).


Reading from a database using SQLiteOpenHelper is also a pretext for using Flowable. Therefore, if you use the https://github.com/square/sqlbrite and https://github.com/pushtorefresh/storio libraries, it would not be superfluous to bring the Observable to Flowable. Querying backends is another reason to use Flowable.


If you take a closer look, you can notice a common detail in all the listed cases - they block the UI thread. Accordingly, if you perform any operations blocking MainThread, this is a reason to use Flowable.


And finally. Many blocking and / or pull-based data sources, from which you can eventually get a non-blocking reactive API (Retrofit) are also grounds for using Flowable.


More about Backpressure


Backpressure is a phenomenon that can be found in the spawning thread, where some asynchronous operations cannot process values ​​quickly enough and need to slow down the manufacturer.


A classic case for using backpressure when a generating thread is hot:


 class BackpressureExampleActivity : AppCompatActivity() { private val n = 10000000 override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_backpressure) val source = PublishProcessor.create<Int>() source .observeOn(AndroidSchedulers.mainThread()) .subscribe(this::addToIntListAdapter, Throwable::printStackTrace, this::onComplete) for(i in 0..n) { source.onNext(i) if(i == n) { source.onComplete() } } } private fun addToIntListAdapter(number: Int?) { Log.d("number", number.toString()) // do something } private fun onComplete() { textView?.text = "completed" } } 

PublishProcessor is a slightly different form of the Observable we are used to. As you can see, it has a more imperative form.


In this example, the main thread will produce N elements. Imagine that the addToIntListAdapter(int number) method adds the next incoming element to the adapter that is attached to RecyclerView . This will take some time, and Overhead for the current request stack may be longer than the execution time. However, the producing thread with a for loop cannot know this and continues to call onNext .


Inside asynchronous statements, there are buffers for storing such elements until they are processed. In RxJava 1.x, these buffers were unlimited, which means that they are likely to contain all n elements from the example. The problem begins when n = 1000000. In the classic view, this would lead to OutOfMemoryError or, as a rule, to friezes of performance due to excessive GC-intensive work and, as a result, to its frequent call.


Just as error handling became a mandatory part of Rx and received operators for working with it (via the onErrorReturn , onErrorResumeNext , doOnError ), Backpressure is another property of data streams that the developer must think about and process, directly through the onBackpressureBuffer , onBackpressureDrop , onBackpressureLast .


In the case of n = 1000000 we get:


W / System.err: io.reactivex.exceptions.MissingBackpressureException: Could not emit value due to lack of requests
W / System.err: at io.reactivex.processors.PublishProcessor $ PublishSubscription.onNext (PublishProcessor.java:322)
W / System.err: at io.reactivex.processors.PublishProcessor.onNext (PublishProcessor.java:198)
W / System.err: at mobile.geekbit.rx20habrahabrproject.backpressure.BackpressureExampleActivity.onCreate (BackpressureExampleActivity.kt: 30)
W / System.err: at android.app.Activity.performCreate (Activity.java:6679)
W / System.err: at android.app.Instrumentation.callActivityOnCreate (Instrumentation.java:1118)
W / System.err: at android.app.ActivityThread.performLaunchActivity (ActivityThread.java:2618)
W / System.err: at android.app.ActivityThread.handleLaunchActivity (ActivityThread.java:2726)
W / System.err: at android.app.ActivityThread.-wrap12 (ActivityThread.java)
W / System.err: at android.app.ActivityThread $ H.handleMessage (ActivityThread.java:1477)
W / System.err: at android.os.Handler.dispatchMessage (Handler.java:102)
W / System.err: at android.os.Looper.loop (Looper.java:15)
W / System.err: at android.app.ActivityThread.main (ActivityThread.java:6119)
W / System.err: at java.lang.reflect.Method.invoke (Native Method)
W / System.err: at com.android.internal.os.ZygoteInit $ MethodAndArgsCaller.run (ZygoteInit.java:886)
W / System.err: at com.android.internal.os.ZygoteInit.main (ZygoteInit.java:776)

And if we still call Thread.sleep(10000) on the UI thread, then the Activity will start, of course, with terrible friezes, calling GC 7-8 times!


In addition to the above-mentioned PublishProcessor there are other operators that do not support backpressure. For example, Observable.interval periodically generates values ​​faster than they can be processed.


In RxJava 2.x, most asynchronous operators now have a limited internal buffer, and any attempt to overflow this buffer completes the entire sequence using a MissingBackpressureException. The documentation has a section on Backpressure and the operators that support it http://reactivex.io/documentation/operators/backpressure.html


However, backpressure is also found in more unexpected places: in normal cold streams that do not and should not give a MissingBackpressureException. If we take our example above and refactor:


 class BackpressureExampleActivity : AppCompatActivity() { private val n = 10000000 override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_backpressure) Flowable.range(1, n) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(this::addToIntListAdapter, Throwable::printStackTrace, this::onComplete) } private fun addToIntListAdapter(number: Int?) { Log.d("number", number.toString()) // do something } private fun onComplete() { textView?.text = "completed" } } 

Everything will start without MissingBackpressureException , perfomance will work in stable fps, smoothly and with adequate memory usage, the amount of which will be allocated much less. The reason for this is that many manufacturing operators can “generate” values ​​on demand and, thus, oversee the operator. We can say that Flowable.range(0, n) generates as many values ​​as the buffer can hold without overflowing. Let's all look at a more specific example:


  Flowable.range(1, n) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(object : DisposableSubscriber<Int>() { public override fun onStart() { request(1) } override fun onNext(v: Int?) { addToIntListAdapter(v) request(1) } override fun onError(ex: Throwable) { ex.printStackTrace() } override fun onComplete() { onComplete() } }) 

Here, the onStart implementation specifies the range to create its first value, which is then taken in onNext . Upon completion of the operation, another value is requested from the range. In the normal implementation of the range, such a call recursively calls onNext, which results in a StackOverflowError , which, of course, is undesirable.


To prevent StackOverflowError , operators use trampolining (in translation, the term can be defined as a trampoline ). This is conventionally called logic, which prevents repeated calls. It can very often be found in functional languages ​​such as Clojure, Scala, and, in particular, in recursive calls. In terms of the range, the trampoline will remember that there was a call to request (1), while it called onNext (). And as soon as onNext () returns, it will call the next onNext () with the following value. Therefore, if they change places, the example above will still work identically:


 override fun onNext(v: Int?) { addToIntListAdapter(v) request(1) } 

However, this does not work for onStart. Although the Flowable infrastructure guarantees that it will be called no more than once for each Subscriber, calling request (1) can immediately trigger a new element. If you have initialization logic after calling request (1), which is necessary for onNext, you may have exceptions:


 class IntMapper { private val KOFF = 2 fun map(int: Int?): Int = int ?: 0 * KOFF } Flowable.range(1, n) .subscribe(object : DisposableSubscriber<Int>() { lateinit var mapper: IntMapper public override fun onStart() { request(1) mapper = IntMapper() } override fun onNext(v: Int?) { addToIntListAdapter(mapper.map(v)) request(1) } override fun onError(ex: Throwable) { ex.printStackTrace() } override fun onComplete() { onComplete() } }) 

In this case, you may throw a NullPointerException. And given the feature of Null Safety in Kotlin, I had to try to get this exception. And please take into account one feature - I removed the operators


 .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) 

because with them, you will not get a NullPointerException, because the nature of these operators is asynchronous.
And it is in this synchronous case that you will receive an immediate NullPointerException as soon as the onStart call occurs. Moreover, this error is more difficult to catch if the request (1) call is accompanied by an asynchronous onNext call in another thread, due to which Race Condition occurs.


Thus, you need to perform all initializing operations in onStart or, even better, before it, and request request () last.


Operators onBackpressureBuffer, onBackpressureDrop, onBackpressureLast


Most developers face back pressure when their application crashes with a MissingBackpressureException, and the logs usually point to the observeOn operator. The actual reason is, as a rule, the use of PublishProcessor, Observable, as well as the timer (), interval () or custom lift () operators, as well as the operations that occur in create ().


There are several ways to solve such situations, and now we will consider them.


  1. Increasing buffer sizes

Sometimes such overflows occur due to potentially dangerous sources. For example, all of a sudden, the user drops too fast on the screen and the observeOn buffer is very quickly and actively overflowing.


I would like to dwell on the buffer itself. An appropriate question may arise: what is it all about and how is it involved in the Rx mechanism? Everything is actually quite simple. To begin with, the default buffer dimension is calculated as Math.max(1, Integer.getInteger("rx2.buffer-size", 128)) , and according to my observations, this method always returns 128 (the captain obviously does not sleep). As a result, we get a buffer with a dimension of 128 ... mm ... what? Bit? Byte? Parrots? We can get the answer to this question here:


  @CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.CUSTOM) public final Flowable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new FlowableObserveOn<T>(this, scheduler, delayError, bufferSize)); } 

The bufferSize parameter bufferSize checked modulo and sent to the FlowableObserveOn instance.


  final int prefetch; //    bufferSize public FlowableObserveOn( Flowable<T> source, Scheduler scheduler, boolean delayError, int prefetch) { super(source); this.scheduler = scheduler; this.delayError = delayError; this.prefetch = prefetch; } 

Further in onSubscribe we already see something familiar.


 @Override public void onSubscribe(Subscription s) { // -  s.request(prefetch); // -  } 

And it turns out that we actually already used the buffer, just did not know about it.
request (1) remember? So, this is what the Subscription interface actually looks like.


 public interface Subscription { public void request(long n); public void cancel(); } 

Where request(long n) nothing more than the number of elements for requests to the upstream.
So, we can conclude that the buffer is the same request(n) . But how to tell Flowable its buffer size? About this below.


Most of the backpressure-sensitive operators in the latest versions of RxJava now allow developers to specify the size of their internal buffers. The corresponding parameters are called bufferSize, prefetch or capacityHint. Given the overflowed example in the introduction, we can simply increase the buffer size for the PublishProcessor to have enough space for all values.


 val source = PublishProcessor.create<Int>() source .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread(), false, 1024) .subscribe(this::addToIntListAdapter, Throwable::printStackTrace, this::onComplete) for(i in 0..n) { source.onNext(i) if(i == n) { source.onComplete() } } Thread.sleep(10000) 

But note that, in principle, this is a crutch, since an overflow can still occur if the manufacturer generates more than the size of the predicted buffer. In this case, you can use one of the operators in the next section.


  1. Grouping / passing / buffering values ​​with standard operators

If the source data can be processed more efficiently in a grouped form, you can reduce the chance of eliminating MissingBackpressureException using one of the standard batch processing operators (by size or by time).


 val source = PublishProcessor.create<Int>() source .buffer(1024*1024) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread(), false, 1024) .flatMap { PublishProcessor.fromIterable(it) } .subscribe(this::addToIntListAdapter, Throwable::printStackTrace, this::onComplete) for(i in 0..n) { source.onNext(i) if(i == n) { source.onComplete() } } Thread.sleep(10000) 

In that case, when some of the values ​​can be safely ignored, you can use the sampling (sample operator) and throttling operators (throttleFirst, throttleLast, throttleWithTimeout).


 val source = PublishProcessor.create<Int>() source .sample(1, TimeUnit.MILLISECONDS) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread(), false, 1024) .subscribe(this::addToIntListAdapter, Throwable::printStackTrace, this::onComplete) for(i in 0..n) { source.onNext(i) if(i == n) { source.onComplete() } } 

However, these operators only reduce the downstream admission rate and, therefore, may still result in a MissingBackpressureException exception.


  1. OnBackpressureBuffer () operator

This operator in its basic form re-introduces an unrestricted buffer between the original source and the downstream operator. This means that until memory is exhausted, it can handle almost any amount from a continuously generating source.


 Flowable.range(1, n) .onBackpressureBuffer() .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(this::addToIntListAdapter, Throwable::printStackTrace, this::onComplete) 

In this example, the observeOn function has a very low buffer size (recall that it is 128 by default), but there is no MissingBackpressureException exception, since onBackpressureBuffer absorbs all n values ​​(of which val n = 1000000 ) and transmits to
Execution packets (batch) values ​​in small batches.
It is worth noting that the onBackpressureBuffer operator consumes the generating source in an unlimited way, that is, without applying back pressure to it. This leads to the fact that even manufacturers such as range () will be fully executed.


There are also several overloaded forms with the onBackpressureBuffer operator. I will give the most necessary.


onBackpressureBuffer (int capacity)


This is a limited version that will notify about the occurrence of a BufferOverflowException when the buffer reaches the specified capacity.


 Flowable.range(1, n) .onBackpressureBuffer(16) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(this::addToIntListAdapter, Throwable::printStackTrace, this::onComplete) 

The adequacy of this operator decreases, as more and more operators now allow you to set the size of your buffers. Otherwise, it gives you the opportunity to expand your internal buffer.


onBackpressureBuffer (int capacity, Action onOverflow)


This overloaded method causes a callback in case of overflow. Action itself does not represent
nothing out of the ordinary


 public interface Action { void run() throws Exception; } 

But it is worth noting that its utility is rather limited, since there is no other information about overflow than the current call stack.


onBackpressureBuffer (int capacity, Action0 onOverflow, BackpressureOverflowStrategy strategy)


But this option is really something interesting. Its utility is somewhat higher, as it allows you to determine what to do if the capacity has been reached. BackpressureOverflowStrategy is enum, which offers 3 static fields with implementations representing typical actions on:


ERROR is the default behavior for all previous overloads, throwing a BufferOverflowException.


DROP_OLDES - in essence, this means updating the buffer when it overflows. All old values ​​are discarded and the buffer is filled with a new stack.


DROP_LATEST is a slightly different behavior, characterized in that if an overflow occurs, the current value will simply be ignored and only the old values ​​will be delivered after downstream requests.


 Flowable.range(1, n) .onBackpressureBuffer(1024, { Toast.makeText(baseContext, BufferOverflowException::class.simpleName, Toast.LENGTH_SHORT).show() }, BackpressureOverflowStrategy.DROP_LATEST) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(this::addToIntListAdapter, Throwable::printStackTrace, this::onComplete) 

, , . , BufferOverflowException.


BackpressureOverflowStrategy. onBackpressureDrop() onBackpressureLatest(). .


 Flowable.range(1, n) .onBackpressureDrop() .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(this::addToIntListAdapter, Throwable::printStackTrace, this::onComplete) 

 Flowable.range(1, n) .onBackpressureLatest() .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(this::addToIntListAdapter, Throwable::printStackTrace, this::onComplete) 

, , backpressure . , , , .. ... , onBackpressure , .


backpressured datasources, . .


https://github.com/scrobot/Rx2.0habrahabrproject




')

Source: https://habr.com/ru/post/336268/


All Articles