requestDataTask = new AsyncTask<Void, Void, JSONObject>() { @Override protected JSONObject doInBackground(Void... params) { final String requestResult = apiService.getData(); final JSONObject json = JsonUtils.parse(requestResult); lruCache.cacheJson(json); return json; } };
JSONObject
based on the result of the query.JSONObject
.JSONObject
.Exception
dropped somewhere at runtime?doInBackground(Void...)
runs in a separate thread, how do we tell the user about a UI error? To get fields for Exception
?Observable
. Observable
is like data streams (they can also be considered as monads) that can receive and transmit this data in any way. Over Observable
you can use operations such as flatmap
, filter
, zip
, merge
, cast
, etc. //Observable, Iterable final Subscription subscription = Observable.from(Arrays.asList(1, 2, 3, 4, 5)) .subscribeOn(Schedulers.newThread()) // background .observeOn(AndroidSchedulers.mainThread()) //, main thread .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { //do something with result } });
Observable
, which in turn gives us numbers from Iterable
. We indicate that the generation and transmission of data will take place in a separate thread, and the processing of the result will be in main thread. We subscribe to it, and in the subscriber's method we perform any manipulations with each subsequent result. //Observable, Iterable final Subscription subscription = Observable.from(Arrays.asList(1, 2, 3, 4, 5)). // filter(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer % 2 == 0; // } }) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { //do something with ONLY EVEN result } });
filter
operator, we can process only even values. //Observable, Observable.OnSubscribe<String> Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { // , subscriber.onNext(apiService.getData()); // , subscriber.onCompleted(); } });
Observable
and specify its behavior. We make a request and give the result in onNext(...)
, after which we say to Subscriber
that we ended up by calling onCompleted()
.Observalble
, which is only responsible for getting the String
object from the API. SRP in its purest form.Observable
method retry(...)
, which will repeat this Observable
n times until it is completed successfully (read, without Exception
). In addition, we can give Observable
'from another Observable
, even if retry()
did not help. If the backend is written crookedly, it would be better for us to close the connection by timeout. And we have a timeout(...)
method for this case. All together it would look like this: final Subscription subscription = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext(apiService.getData()); subscriber.onCompleted(); } }) .timeout(5, TimeUnit.SECONDS) // .retry(3) // 3 // , .onErrorResumeNext(new Func1<Throwable, Observable<? extends String>>() { @Override public Observable<? extends String> call(Throwable throwable) { //return new observable here, that can rescure us from error } });
final Subscription subscription = createApiRequestObservable() // Observable .timeout(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS) // .retry(RETRY_COUNT_FOR_REQUEST) // - .onErrorResumeNext(createRequestErrorHandler()); //
Observable
(and there the String
) must be converted. We use map(...)
, and if something suddenly goes wrong, we return another one we need in case of failure, json using onErrorReturn(...)
. final Subscription subscription = createApiRequestObservable() .timeout(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS) .retry(RETRY_COUNT_FOR_REQUEST) .onErrorResumeNext(createRequestErrorHandler()) // Observable, String JSONObject .map(new Func1<String, JSONObject>() { @Override public JSONObject call(String s) { return JsonUtils.parse(s); } }) // // "" json .onErrorReturn(new Func1<Throwable, JSONObject>() { @Override public JSONObject call(Throwable throwable) { return jsonObjectForErrors; } });
Observable
has methods doOnNext(...)
, doOnEach(...)
, etc. It turns out something like this: final Subscription subscription = createApiRequestObservable() .timeout(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS) .retry(RETRY_COUNT_FOR_REQUEST) .onErrorResumeNext(createRequestErrorHandler()) // Observable, String JSONObject .map(new Func1<String, JSONObject>() { @Override public JSONObject call(String s) { return JsonUtils.parse(s); } }) // // "" json .onErrorReturn(new Func1<Throwable, JSONObject>() { @Override public JSONObject call(Throwable throwable) { return jsonObjectForErrors; } }) //, onNext(..) Observable .doOnNext(new Action1<JSONObject>() { @Override public void call(JSONObject jsonObject) { lruCache.cacheJson(jsonObject); } });
final Subscription subscription = createApiRequestObservable() // Observable .timeout(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS) // .retry(RETRY_COUNT_FOR_REQUEST) // - .onErrorResumeNext(createRequestErrorHandler()) // .map(createJsonMapOperator()) // Observable, JSONObject .onErrorReturn(createJsonErrorHandler()) // , .doOnNext(createCacheOperation()); // JSONObject
final Subscription subscription = createApiRequestObservable() // Observable .timeout(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS) // .retry(RETRY_COUNT_FOR_REQUEST) // - .onErrorResumeNext(createRequestErrorHandler()) // .map(createJsonMapOperator()) // Observable, JSONObject .onErrorReturn(createJsonErrorHandler()) // , .doOnNext(createCacheOperation()); // JSONObject .subscribeOn(Schedulers.newThread()) // , , .observeOn(AndroidSchedulers.mainThread()) // - main thread .subscribe(subscriber); //
doOnNext
will be executed in parallel with the processing of the result.Observable
several times and keep the entire system in a consistent state.doOnError(Throwable thr)
method. You want to filter the results - add a filter
statement and implement its behavior. // Observable final Observable<AggregatedContactData> obs = Observable.combineLatest( createContactsObservable(context), //Observable createPhonesObservable(context), //Observable createAccountsObservable(context), //Observable createJobsObservable(context), //Observable contactsInfoCombinator() // Observable ).onErrorReturn(createEmptyObservable()).cache() // .subscribeOn(Schedulers.executor(ThreadPoolHolder.getGeneralExecutor())) // .observeOn(AndroidSchedulers.mainThread()); // - main thread
Observable.combineLatest(...)
. This operator waits for onNext(...)
from all Observable
passed to it and applies the combining function to all results at once. It may seem complicated, but the RxJava wiki image will make everything clearer. The most important thing is that each of the Observable
sent to Observable.combineLatest(...)
is a CursorObservable
, which sends a new cursor to its onNext(...)
as it changes in the database. Thus, a combination function is performed on any update to any of the four cursors, which allows you to always deliver the latest data. This is the principle of focus on events.Collections.emptyList();
cache()
operator is very useful if several Subscribers
can subscribe to this Observable
at once. If this operator is applied to Observable
, then its new subscriber instantly receives data, provided that this data has already been calculated for those who subscribed earlier. Thus, everyone has the same current data.subscribeOn(...)
I give the thread pool to 4 threads so that each of my 4 Observable
executed in a separate thread in order to maximize speed, RxJava takes care of the rest. That is, all 4 processors will be involved, with the presence of these.OutOfMemoryError
with OutOfMemoryError
. The solution was simple. Enter CachedThreadPool
for these cases and the problem is solved.cache()
operator from the example above. I would like the next request for the same url to be immediately taken from the cache. There is no such thing in RxJava. In principle, this is correct, because reactivity and cache are two different things. Therefore, we wrote our cache.Checkable
change or EditText
(this is out of the box in RxJava for Android). Everything is simple, but terribly convenient.Source: https://habr.com/ru/post/228125/
All Articles