📜 ⬆️ ⬇️

Explore RxJava 2 for Android

enter image description here


My name is Arkady, I'm an Android developer at Badoo. Recently in our blog there are a lot of posts about Go, PHP, JS, QA, and I decided to dilute them with topics on mobile development. Just engaged in porting an Android project from RxJava 1 to RxJava 2 and read everything you can find on this topic on the Internet. In particular, the report of Jake Worton from the conference GOTO Copenhagen 2016. It seemed to me that this is a worthy candidate for translation - I think many Android developers are thinking about switching to RxJava 2, and they are interested in what has changed since the first version.


Jake made a rather voluminous introduction to reactive programming, so knowledge of RxJava 1 is not required to understand the article. The report was prepared when RxJava2 was still preparing for release (currently version 2.1.0 has already been released).



Why Reactive?


Why all around suddenly began to talk about reactive programming? If you cannot make an application completely synchronous, then the presence of a single asynchronous resource completely breaks the traditional imperative programming style to which we are accustomed. “Breaking” is not in the sense of “everything stops working”, but in the sense of “leads to an increase in complexity”, and as a result you begin to lose all the advantages of imperative programming.


To explain why I think this is a serious problem, I will give an example.


Let's start with a simple class that can get a User object for us with some modifiers.


interface UserManager { User getUser(); void setName(String name); void setAge(int age); } UserManager um = new UserManager(); System.out.println(um.getUser()); um.setName("Jane Doe"); System.out.println(um.getUser()); 

If we lived in a synchronous, single-threaded world, then this code would do exactly what is expected: creating an instance, outputting a user, changing some properties, outputting a user.


The problem arises when we begin to resort to asynchrony. Suppose we need to reflect changes in properties on the server side. For this, the last two methods must be asynchronous. How would we then change the code?


One solution is to do nothing at all: you can assume that the asynchronous server update call will be successful, so you can make changes locally. They will be reflected instantly. As you know, this is not the best idea. Networks are unpredictable, the server can return an error, and then you have to somehow roll back the local state.


A simple solution is to use Runnable , which will be executed when the asynchronous call completes successfully. This is a reactive behavior: we update the displayed data only when we are sure that the change request was successful.


 interface UserManager { User getUser(); void setName(String name, Runnable callback); void setAge(int age, Runnable callback); } UserManager um = new UserManager(); System.out.println(um.getUser()); um.setName("Jane Doe", new Runnable() { @Override public void run() { System.out.println(um.getUser()); } }); 

However, we do not handle problems that may arise (for example, network problems). So maybe it is worth creating a special Listener so that we can do something in case of an error?


 UserManager um = new UserManager(); System.out.println(um.getUser()); um.setName("Jane Doe", new UserManager.Listener() { @Override public void success() { System.out.println(um.getUser()); } @Override public void failure(IOException e) { // TODO show the error... } }); 

We can inform the user about the problem. We can automatically try again. Such solutions work, and go in this direction when you need to combine asynchronous code with code running in a single thread (in the case of Android, this is a UI stream).


The more you need to make asynchronous calls, the more problems arise. For example, when a user fills out a form, he changes several properties. Or you have a sequence of asynchronous calls, when the successful execution of one call must start another asynchronous call, which can also be successful or unsuccessful.


 UserManager um = new UserManager(); System.out.println(um.getUser()); um. setName(“Jane Doe”, new UserManager.Listener() { @Override public void success() { System.out.println(um.getUser()); } @Override public void failure(IOException e) { // TODO show the error… } }); um.setAge(40, new UserManager.Listener() { @Override public void success() { System.out.println(um.getUser()); } @Override public void failure(IOException e) { // TODO show the error… } }); 

Do not forget also that all this happens in the context of Android. Therefore, many other factors need to be considered. For example, we can try to transfer information directly to the UI in the success callback, but the problem is that the Android activity is ephemeral. They can be destroyed at any time. Let's say the user receives an incoming call - and your application is minimized by the system. Or maybe the user clicked on Home or Back . If the asynchronous call returns after destroying the UI, then you will have difficulty.


 public final class UserActivity extends Activity { private final UserManager um = new UserManager(); @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.user); TextView tv = (TextView) findViewById(R.id.user_name); tv.setText(um.getUser().toString()); um.setName("Jane Doe", new UserManager.Listener() { @Override public void success() { tv.setText(um.getUser().toString()); } @Override public void failure(IOException e) { // TODO show the error... } }); } } 

There are imperative approaches to solving the problem. We can check the status before calling UI methods.


 public final class UserActivity extends Activity { private final UserManager um = new UserManager(); @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.user); TextView tv = (TextView) findViewById(R.id.user_name); tv.setText(um.getUser().toString()); um.setName(“Jane Doe”, new UserManager.Listener() { @Override public void success() { if (!isDestroyed()) { tv.setText(um.getUser().toString()); } } @Override public void failure(IOException e) { // TODO show the error… } }); } } 

In this example, we are creating an anonymous type that uniquely leads to a short-term memory leak, because it will keep a link to our Activity until the asynchronous call is completed.


The problem also lies in the fact that we do not know in which threads these callbacks are called. They may be called in a background thread, so you need to pass events to the main thread ( main/UI thread ).


 public final class UserActivity extends Activity { private final UserManager um = new UserManager(); @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.user); TextView tv = (TextView) findViewById(R.id.user_name); tv.setText(um.getUser().toString()); um.setName("Jane Doe", new UserManager.Listener() { @Override public void success() { runOnUiThread(new Runnable() { @Override public void run() { if (!isDestroyed()) { tv.setText(um.getUser().toString()); } } }); } @Override public void failure(IOException e) { // TODO show the error... } }); } } 

We have cluttered the Activity bunch of things that are not related to the main task solved by our code. And all this is just to start working asynchronously and process asynchronous results. We only implemented an asynchronous request call; we do not block user input, do not handle button presses, and do not work with multiple fields.


Even when your code solves only one simple task, and you start turning it into a real application, you immediately have problems, and you are left alone with the need to manage a bunch of states and checks in your Activity .


Reactive thinking


In a real application, everything works asynchronously. We have a network to which we send requests and from which we receive answers over a long period of time. We cannot block the main thread of execution, so work with the network should be performed in the background thread. We cannot block the main thread when working with the file system, database, writing to the repository, or even shared preferences , so we have to perform these operations in the background threads.


Users are also something like asynchronous data sources. We give them information through the UI, and they respond to it by pressing buttons and entering data into the fields.


enter image description here


The user may return to the application at different times. And the application must be ready to receive data, must be reactive, so that there is no state in which the main thread of execution is blocked; so that there is no situation when a part of the data arrives asynchronously, but the application does not expect this and as a result does not take into account the data received or even drops. Therein lies the difficulty: you must maintain all these states in your Activity / Fragment . It is necessary to come to terms with the fact that numerous asynchronous sources generate and consume data, possibly at different speeds. And this we still do not take into account the work of Android itself, which is an asynchronous platform. We have push notifications, broadcast messages, and configuration changes. Users can at any time rotate the device from portrait orientation to landscape and back, and if your code is not ready for this, the application will crash or behave incorrectly.


As long as you cannot ensure the synchronization of the entire architecture of your application, the presence of a single asynchronous resource will break the traditional imperative programming style.


It is difficult to find an application that does not use network requests, and they are inherently asynchronous. You have a disk, a database - asynchronous sources. The UI should also be considered solely as an asynchronous source. So by default everything in Android functions asynchronously. If you cling to traditional imperative programming and state management techniques, you will harm yourself.


enter image description here


Instead of trying to coordinate all the asynchronous elements of the architecture, we can discard this responsibility by connecting them directly. You can directly sign the UI to the database so that it responds to data changes. You can change the database and network calls so that they respond to a button click, rather than trying to get a click and then send it.


It would be great if the network response we received updated the data. After all, when data is updated, the UI is automatically updated. So we disclaim responsibility for this. If Android does something asynchronously (for example, screen rotation or broadcast broadcasting), then it would be great if it would automatically affect the interface or automatically start some background task.


enter image description here


In general, this approach allows us not to write a bunch of code needed to support states: instead of managing states, we simply connect the components together.


Rxjava


Go to RxJava. This reactive library became the most popular when developing for Android for the most part because it was the first full-fledged [reactive] tool for Java. RxJava 2 retains support for the old version of Java, which is important for Android development.


RxJava provides:



Sources


The data source does some work when you start or stop listening to it. Submit a network request that will not be sent until you begin to wait for a response. And if you unsubscribe from the data source before it is completed, then theoretically it can cancel the network request.


The source can work both synchronously and asynchronously. For example, a blocking network request running in a background thread, or something purely asynchronous, such as accessing Android and waiting onActivityResult. A source can produce one element or several elements. Network request will return one response. But while your UI is working, the button click flow is potentially endless, even if you are subscribed to a single button.


More sources may be empty. This is the concept of a data source that does not contain any elements and whose work either succeeds or fails. To make it clear: Imagine that you are writing data to a database or file. They do not return items to you. The recording is either successful or not. In RxJava, sources model this approach of “execution or failure” using the so-called terminal events onComplete()/ onError() . This is similar to the method that either returns the answer or throws an exception.


Completion may not be. For example, we simulate a button press as a data source, which works as long as the UI works. And when the UI disappears, then you probably unsubscribe from this source of button presses, but it does not complete its work.


All this corresponds to the Observer pattern. We have something that can generate data; there is an agreement on how this data should look. And we want to watch them. We want to add a listener and receive notifications when something happens.


Flowable vs. Observable


In RxJava 2, sources are represented by two main types - Flowable and Observable. They are arranged very similar. Both generate zero to n elements. Both may succeed or fail. So why do we need two different types to represent the same data structure?


It all comes down to such a thing as backpressure . Without going into details, I’ll just say that backpressure slows down the data source. Existing systems have limited resources. And with the help of backpressure, we can tell everyone who sends us data to slow down, because we cannot process information at the speed with which it arrives to us.


RxJava 1 had backpressure support, but it was added rather late in the API development process. In RxJava 1, every type in the system has a backpressure mechanism. And although the concept of backpressure is supported by all types, far from all sources implement it, so using this mechanism can lead to a crash of the application. The use of backpressure must be designed and accounted for in advance. That is why in RxJava 2 two different types of sources. Therefore, you can now specify using the source type whether backpressure support should be provided.


Suppose we have a data source - touch screen events. We can't slow it down. You can’t tell the user: “Draw a half of the symbol, stop and wait for me to process and then draw the rest.” We can slow down the data entry otherwise, for example, by disabling the buttons or displaying another UI, but the source itself cannot be slowed down.


Take another example: we have a database containing a large set of rows from which we need to extract several at a time. A DB can very effectively solve this problem thanks to a tool such as cursors . But for the flow of touch events this is impossible to implement, because it is impossible to slow down the user's finger.


In RxJava 1, both of the above types are implemented as Observable, so it may happen that you try to use backpressure in runtime, and as a result you get the exception MissingBackpressureException . This was the reason why in RxJava 2 sources are presented in the form of different types: one supports backpressure, the second - no. Both types of Observable and Flowable behave similarly in terms of data transfer to callbacks. There are two corresponding interfaces for this:


Observer :


 interface Observer<T> { void onNext(T t); void onComplete(); void onError(Throwable t); void onSubscribe(Disposable d); } 

And Subscriber :


 interface Subscriber<T> { void onNext(T t); void onComplete(); void onError(Throwable t); void onSubscribe(Subscription s); } 

The first method is called onNext , elements will be delivered here. This method is called whenever an Observable or Flowable generates an element, allowing it to be processed arbitrarily. This can happen endlessly. If you listen to a button press, the onNext method will be called up with each press. For finite sources, there are two terminal events:



onComplete and onError are terminal events, that is, you will not receive any events from the source after receiving one of them.
The difference between the Observer and Subscriber interfaces lies in the latter method - onSubscribe . This is a new method compared to RxJava 1. When you subscribe to Observable or Flowable , you create a resource, and resources often need to be cleared after you finish working with them. The onSubscribe is called as soon as you start listening to Observable or Flowable, and it will give you an object of one of two types: Disposable .


 interface Observer<T> { void onNext(T t); void onComplete(); void onError(Throwable t); void onSubscribe(Disposable d); } interface Disposable { void dispose(); } 

Or Subscription :


 interface Subscriber<T> { void onNext(T t); void onComplete(); void onError(Throwable t); void onSubscribe(Subscription s); } interface Subscription { void cancel(); void request(long r); } 

For Observable the Disposable type allows you to call the dispose method, meaning "I’ve finished working with this resource, I don’t need data anymore." If you have a network request, it can be canceled. If you listened to an endless stream of button presses, this will mean that you no longer want to receive these events, in which case you can delete OnClickListener from View .


All this is true for the Subscription interface. Although it is called differently, it is used in the same way: it has a cancel() method, similar to dispose() . It differs only in the presence of the second method request(long r) , through which the backpressure is manifested in the API. Using this method, we are telling Flowable that we need more elements.


With backpressure supportWithout backpressure support
0 – n elements, complete | error complete | errorFlowableObservable

So the only difference between these two types is that one supports backpressure and the other does not.


Reactive Streams


I want to touch on the question of why the types of Disposable and Subscription have different names, like their methods - dispose() and cancel() . Why it was impossible to simply expand one by another by adding the request() method? It's all about the specification of reactive flows. It is the result of the initiative of a number of companies that came together and decided to develop a standard set of interfaces for reactive Java libraries. The specification includes four interfaces.


 interface Publisher<T> { void subscribe(Subscriber<? super T> s); } interface Subscriber<T> { void onNext(T t); void onComplete(); void onError(Throwable t); void onSubscribe(Subscription s); } interface Subscription { void request(long n); void cancel(); } interface Processor<T, R> extends Subscriber<T>, Publisher<R> { } 

In the presented code, you see the Subscriber and Subscription types. They are part of the specification, and that is why these names were used in RxJava 2. Since they are part of the standard, we can’t do anything about it. But this situation has a good side. Suppose you need to use two different libraries for threads. If their authors have implemented the above standard, then you can switch between them without serious consequences.


Reactive Streams (With backpressure support)Without backpressure support
0 ... n elements, complete | error complete | errorFlowableObservable

The Flowable type implements the specification of reactive flows, which implies backpressure support.


Let's return to our UserManager . We used to extract users from this class, displaying them when we thought it appropriate. Now we can use Observable:


 interface UserManager { Observable<User> getUser(); void setName(String name); void setAge(int age); } 

Observable<User> is the source of User objects. It generates an element with every change, and we can respond to it by displaying data on the screen. Now you should not try to determine the most appropriate time for this based on other events occurring in the system.


Specialized sources


RxJava 2 has three specialized sources, which are a subset of Observable . The first is called Single . It either contains one element, or gives an error, so this is not so much a sequence of elements as a potentially asynchronous source of a single element. It does not support backpressure. You can imagine it as a normal method. You call the method and get the return value; either method throws an exception. This scheme is implemented by Single . You subscribe to it and get either an item or an error. Single .


Completable . void-. - , . , , , .


— Maybe . RxJava 1. , , — Optional. backpressure.


RxJava 2 , Single/ Completable/ Maybe , backpressure ( Reactive Streams Specification).


( backpressure)backpressure
0…n , complete | errorFlowableObservable
item | complete | errorMaybe
item | errorSingle
complete | errorCompletable

 interface UserManager { Observable<User> getUser(); void setName(String name); void setAge(int age); } 

setName setAge , , , . Completable .


 interface UserManager { Observable<User> getUser(); Completable setName(String name); Completable setAge(int age); } 


, , , . , .


 Flowable.just("Hello"); Flowable.just("Hello", "World"); Observable.just("Hello"); Observable.just("Hello", "World"); Maybe.just("Hello"); Single.just("Hello"); 

Iterable .


 String[] array = { “Hello”, “World” }; List<String> list = Arrays.asList(array); Flowable.fromArray(array); Flowable.fromIterable(list); Observable.fromArray(array); Observable.fromIterable(list); 

, , , , ( , ).


fromCallable .


 Observable.fromCallable(new Callable<String>() { @Override public String call() { return getName(); } }); 

, . fromCallable Java- Callable , , . , HTTP- -.


 OkHttpClient client = // … Request request = // … Observable.fromCallable(new Callable<String>() { @Override public String call() throws Exception{ return client.newCall(request).execute(); } }); 

Observable ( ) , , onError . , onNext .


fromCallable :


 Flowable.fromCallable(() -> "Hello"); Observable.fromCallable(() -> "Hello"); Maybe.fromCallable(() -> "Hello"); Single.fromCallable(() -> "Hello"); Completable.fromCallable(() -> "Ignored!"); 

. , .


Maybe Completable . , , – , .


 Maybe.fromAction(()-> System.out.println(“Hello”)); Maybe.fromRunnable(()-> System.out.println(“Hello”)); Completable.fromAction(()-> System.out.println(“Hello”)); Completable.fromRunnable(()-> System.out.println(“Hello”)); 

, , Observable create .


 Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { e.onNext("Hello"); e.onComplete(); } }); 

RxJava 1, – , RxJava 1. RxJava 2, create – , . , subscribe, . ObservableEmitter , . ObservableEmitter . , .


.


 Observable.create(e -> { e.onNext("Hello"); e.onComplete(); }); 

.


 Observable.create(e -> { e.onNext("Hello"); e.onNext("World"); e.onComplete(); }); 

onNext .


— . , HTTP-, onNext HTTP-.


 OkHttpClient client = // … Request request = // … Observable.create(e -> { Call call = client.newCall(request); call.enqueue(new Callback() { @Override public void onResponse(Response r) throws IOException { e.onNext(r.body().string()); e.onComplete(); } @Override public void onFailure(IOException e) { e.onError(e); } }); }); 

, Observable reate , , . - HTTP-, . HTTP- .


 Observable.create(e -> { Call call = client.newCall(request); e.setCancelation(() -> call.cancel()); call.enqueue(new Callback() { @Override public void onResponse(Response r) throws IOException { e.onNext(r.body().string()); e.onComplete(); }A @Override public void onFailure(IOException e) { e.onError(e); } }); }); 

Android . , Observable , , Listener , .


 View view = // … Observable.create(e -> { e.setCancellation(() -> view.setOnClickListener(null)); view.setOnClickListener(v -> e.onNext(v)); }); 

create :


 Flowable.create(e -> { … }); Observable.create(e -> { … }); Maybe.create(e -> { … }); Single.create(e -> { … }); Completable.create(e -> { … }); 


onSubscribe Observer / Subscriber .


Observer :


 interface Observer<T> { void onNext(T t); void onComplete(); void onError(Throwable t); void onSubscribe(Disposable d); } interface Disposable { void dispose(); } 

Subscriber :


 interface Subscriber<T> { void onNext(T t); void onComplete(); void onError(Throwable t); void onSubscribe(Subscription s); } interface Subscription { void cancel(); void request(long r); } 

, Observer/ Subscriber , subscribe. - onSubscribe – - Disposable / Subscription .


 Observable<String> o = Observable.just(“Hello”); o.subscribe(new Observer<String>() { @Override public void onNext(Sring s) { … } @Override public void onComplete() { … } @Override public void onError(Throwable t) { … } @Override public void onSubscribe(Disposable d) { ??? } }); 

DisposableObserver , , Observable .


 Observable<String> o = Observable.just("Hello"); o.subscribe(new DisposableObserver<String>() { @Override public void onNext(String s) { … } @Override public void onComplete() { … } @Override public void onError(Throwable t) { … } }); 

? .
– DisposableObserver Observer . Disposable , dispose, .


 Observable<String> o = Observable.just(“Hello”); DisposableObserver observer = new DisposableObserver<String>() { @Override public void onNext(Sring s) { … } @Override public void onComplete() { … } @Override public void onError(Throwable t) { … } } o.subscribe(observer); observer.dispose(); 

RxJava 2 subscribeWith , subscribe RxJava 1. Disposable .


 Observable<String> o = Observable.just(“Hello”); Disposable d = new o.subscribeWith(new DisposableObserver<String>() { @Override public void onNext(String s) { … } @Override public void onComplete() { … } @Override public void onError(Throwable t) { … } }); d.dispose(); 

RxJava Disposable : , Disposable , CompositeDisposable .


 Observable<String> o = Observable.just(“Hello”); CompositeDisposable disposables = new CompositeDisposable(); disposables.add(o.subscribeWith(new DisposableObserver<String>() { @Override public void onNext(Sring s) { … } @Override public void onComplete() { … } @Override public void onError(Throwable t) { … } })); disposables.dispose(); 

Android , CompositeDisposable Activity , onDestroy ( -).


subscribeWith backpressure.


 Observable<String> o = Observable.just(“Hello”); Disposable d2 = o.subscribeWith(new DisposableObserver<String>() { … }); Maybe<String> m = Maybe.just(“Hello”); Disposable d3 = m.subscribeWith(new DisposableMaybeObserver<String>() { … }); Single<String> s = String.just(“Hello”); Disposable d4 = s.subscribeWith(new DisposableSingleObserver<String>() { … }); Completable c = Completable.completed(); Disposable d5 = c.subscribeWith(new Disposable Completable Observer<String>() { … }); 

Flowable subscribeWith , , Flowable onSubscribe Subscription , Disposable .


 Flowable<String> f = Flowable.just("Hello"); Disposable d1 = f.subscribeWith(new DisposableSubscriber<String>() { … }); 

Disposable , Flowable .


, , . , - . , . Disposable Observable, .


Operators


:



.
, , . , toUppercase() .


 String greeting = “Hello”; String yelling = greeting.toUppercase(); 

Observable .


 Observable<String> greeting = Observable.just("Hello"); Observable<String> yelling = greeting.map(s -> s.toUppercase()); 

map . - , .


User : , , . , , .


 Observable<User> user = um.getUser(); Observable<User> mainThreadUser = user.observeOn(AndroidSchedulers.mainThread()); 

: « Observable ». .


observeOn , Observable .


 OkHttpClient client = // … Request request = // … Observable<Response> response = Observable.fromCallable(() -> { return client.newCall(request).execute(); }); Observable<Response> backgroundResponse = response.subscribeOn(Schedulers.io()); 

. , , . , ( ), . , Schedulers.io() — . , . subscribeOn — , .


, Observable . , . – . . . . . .


 OkHttpClient client = // … Request request = // … Observable<Response> response = Observable.fromCallable(() -> { return client.newCall(request).execute(); }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .map(response -> response.body().string()); // NetworkOnMainThread! 

map observeOn , Android. HTTP- – , .


 OkHttpClient client = // … Request request = // … Observable<Response> response = Observable.fromCallable(() -> { return client.newCall(request).execute(); }) .subscribeOn(Schedulers.io()) .map(response -> response.body().string()); // Ok! .observeOn(AndroidSchedulers.mainThread()) 


RxJava , Observable . , first() , . RxJava 1 Observable , . : , get(0) , , , , – . RxJava 2 : first() , , Single .


enter image description here


Observable , , Single , .


enter image description here


firstElement() , Maybe . Observable Maybe .


enter image description here


, Completable . , , ignoreElements .


enter image description here


Flowable : , .


.


enter image description here


«» . , , , «» Single . «» . , Single Observable .



, User : « , UI ». User', , .


 um.getUser() .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableObserver<User>() { @Override public void onNext(User user) { tv.setText(user.toString()); } @Override public void onComplete() { /* ignored */ } @Override public void onError(Throwable t) { /* crash or show */ } })); 

, - Disposable. Android, Activity . onDestroy Disposables .


 // onCreate disposables.add(um.getUser() .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableObserver<User>() { @Override public void onNext(User user) { tv.setText(user.toString()); } @Override public void onComplete() { /* ignored */ } @Override public void onError(Throwable t) { /* crash or show */ } })); // onDestroy disposables.dispose(); 

, , , . – . .


 disposables.add(um.setName("Jane Doe") .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableCompletableObserver() { @Override public void onComplete() { // success! re-enable editing } @Override public void onError(Throwable t) { // retry or show } })); 

: . Disposable . Disposable .


RxJava 2 , Android: . , Observable , . map, Observable , , .


. RxJava 2 . , . , , – API.


Conclusion


RxJava 2 : — , Android, , UI — , , .


RxJava 1, , . RxJava 2.


 class RxJavaInterop { static <T> Flowable<T> toV2Flowable(rx.Observable<T> o) { … } static <T> Observable<T> toV2Observable(rx.Observable<T> o) { … } static <T> Maybe<T> toV2Maybe(rx.Single<T> s) { … } static <T> Maybe<T> toV2Maybe(rx.Completable c) { … } static <T> Single<T> toV2Single(rx.Single<T> s) { … } static Completable toV2Completable(rx.Completable c) { … } static <T> rx.Observable<T> toV1Observable(Publisher<T> p) { … } static <T> rx.Observable<T> toV1Observable(Observable<T> o, …) { … } static <T> rx.Single<T> toV1Single(Single<T> o) { … } static <T> rx.Single<T> toV1Single(Maybe<T> m) { … } static rx.Completable toV1Completable(Completable c) { … } static rx.Completable toV1Completable(Maybe<T> m) { … } } 

— . , . , - .


')

Source: https://habr.com/ru/post/328434/


All Articles