public abstract class Scheduler { @NonNull public Disposable scheduleDirect(@NonNull Runnable run) { ... } @NonNull public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { ... } @NonNull public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) { ... } { @NonNull public <S extends Scheduler & Disposable> S when(@NonNull Function<Flowable<Flowable<Completable>>, Completable> combine) { ... } }
apiClient.login(auth) // some code omitted .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread())
private fun attemptLoginRx() { showProgress(true) apiClient.login(auth) .flatMap { user -> apiClient.getRepositories(user.repos_url, auth) } .map { list -> list.map { it.full_name } } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .doFinally { showProgress(false) } .subscribe( { list -> showRepositories(this, list) }, { error -> Log.e("TAG", "Failed to show repos", error) } ) }
// new SingleFlatMap() val flatMap = apiClient.login(auth) .flatMap { apiClient.getRepositories(it.repos_url, auth) } // new SingleMap val map = flatMap .map { list -> list.map { it.full_name } } // new SingleSubscribeOn val subscribeOn = map .subscribeOn(Schedulers.io()) // new SingleObserveOn val observeOn = subscribeOn .observeOn(AndroidSchedulers.mainThread()) // new SingleDoFinally val doFinally = observeOn .doFinally { showProgress(false) } // new ConsumerSingleObserver val subscribe = doFinally .subscribe( { list -> showRepositories(this@LoginActivity, list) }, { error -> Log.e("TAG", "Failed to show repos", error) } ) }
@SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(SingleObserver<? super T> subscriber) { ObjectHelper.requireNonNull(subscriber, "subscriber is null"); subscriber = RxJavaPlugins.onSubscribe(this, subscriber); ObjectHelper.requireNonNull(subscriber, "subscriber returned by the RxJavaPlugins hook is null"); try { subscribeActual(subscriber); } catch (NullPointerException ex) { throw ex; } catch (Throwable ex) { Exceptions.throwIfFatal(ex); NullPointerException npe = new NullPointerException("subscribeActual failed"); npe.initCause(ex); throw npe; } }
source.subscribe()
override fun login(auth: Authorization): Single<GithubUser> = Single.fromCallable { val response = get("https://api.github.com/user", auth = auth) if (response.statusCode != 200) { throw RuntimeException("Incorrect login or password") } val jsonObject = response.jsonObject with(jsonObject) { return@with GithubUser(getString("login"), getInt("id"), getString("repos_url"), getString("name")) } }
@Override protected void subscribeActual(final SingleObserver<? super T> s) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s, source); s.onSubscribe(parent); Disposable f = scheduler.scheduleDirect(parent); parent.task.replace(f); }
@Override protected void subscribeActual(final SingleObserver<? super T> s) { source.subscribe(new ObserveOnSingleObserver<T>(s, scheduler)); }
ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) { this.actual = actual; this.scheduler = scheduler; } @Override public void onSuccess(T value) { this.value = value; Disposable d = scheduler.scheduleDirect(this); DisposableHelper.replace(this, d); }
From the author: tomorrow and the day after tomorrow there will be a Mobius conference, where I will talk about Korutins at Kotlin. If you are interested in a series of articles and want to continue - it is not too late to decide on its visit!
Source: https://habr.com/ru/post/353852/