📜 ⬆️ ⬇️

How did I replace RxJava with quicksies in my project and why you probably should also do it

Hi, Habr! I present to you the translation of the article by Paulo Sato on the topic of using Kotlin Coroutines instead of RxJava in my Android projects.

RxJava as a bazooka, most applications do not use half of its firepower. The article will discuss how to replace it with korutin (coroutine) Kotlin.

I worked with RxJava for several years. This is definitely one of the best libraries for any Android project, which is still in good shape, especially if you are programming in Java. If you use Kotlin, then we can say that the city has a new sheriff.
')
Most use RxJava only to control the threads and to prevent callback hell (if you don’t know what it is, consider yourself lucky and here’s why ). The fact is that we must bear in mind that the real power of RxJava is reactive programming and backpressure. If you use it to control asynchronous requests, you use a bazooka to kill a spider. She will do her job, but this is overkill.

One notable drawback of RxJava is the number of methods. It is huge and tends to crawl all over the code. In Kotlin, you can use Korutin to implement most of the behavior you previously created using RxJava.

But ... what are korutins?

Korutin is a way to handle competitive tasks in a stream. The flow will not work until it is stopped and the context will change for each quortine without creating a new flow.
The korutins in Kotlin are still experimental, but they are included in Kotlin 1.3, so I wrote below a new class UseCase (for clean architecture) that uses them. In this example, a call for corutin is encapsulated in a single file. Thus, the other layers will not be affected by the coroutines performed, providing a more disconnected architecture.

/** * (C) Copyright 2018 Paulo Vitor Sato Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package com.psato.devcamp.interactor.usecase import android.util.Log import kotlinx.coroutines.experimental.* import kotlinx.coroutines.experimental.android.UI import kotlin.coroutines.experimental.CoroutineContext /** * Abstract class for a Use Case (Interactor in terms of Clean Architecture). * This interface represents a execution unit for different use cases (this means any use case * in the application should implement this contract). * <p> * By convention each UseCase implementation will return the result using a coroutine * that will execute its job in a background thread and will post the result in the UI thread. */ abstract class UseCase<T> { protected var parentJob: Job = Job() //var backgroundContext: CoroutineContext = IO var backgroundContext: CoroutineContext = CommonPool var foregroundContext: CoroutineContext = UI protected abstract suspend fun executeOnBackground(): T fun execute(onComplete: (T) -> Unit, onError: (Throwable) -> Unit) { parentJob.cancel() parentJob = Job() launch(foregroundContext, parent = parentJob) { try { val result = withContext(backgroundContext) { executeOnBackground() } onComplete.invoke(result) } catch (e: CancellationException) { Log.d("UseCase", "canceled by user") } catch (e: Exception) { onError(e) } } } protected suspend fun <X> background(context: CoroutineContext = backgroundContext, block: suspend () -> X): Deferred<X> { return async(context, parent = parentJob) { block.invoke() } } fun unsubscribe() { parentJob.cancel() } } 

First of all, I created a parent task. This is the key to undo all korutinov that were created in the class UseCase. When we call execution, it is important that the old tasks are canceled, to be sure that we have not missed a single coroutine (this also happens if we unsubscribe from the given UseCase).

Also, I trigger the launch (UI). This means that I want to create a corutin that will be executed in the UI thread. After that, I call the background method that creates async in CommonPool (this approach, in fact, will have poor performance). In turn, async will return Deffered, and then, I will call its wait method. He is waiting for the completion of the background quorutine, which will bring a result or error.

This can be used to implement most of what we did with RxJava. Below are some examples.

Map


I downloaded the searchShow results and changed them to return the name of the first show.
RxJava code:
 public class SearchShows extends UseCase { private ShowRepository showRepository; private ResourceRepository resourceRepository; private String query; @Inject public SearchShows(ShowRepository showRepository, ResourceRepository resourceRepository) { this.showRepository = showRepository; this.resourceRepository = resourceRepository; } public void setQuery(String query) { this.query = query; } @Override protected Single<String> buildUseCaseObservable() { return showRepository.searchShow(query).map(showInfos -> { if (showInfos != null && !showInfos.isEmpty() && showInfos.get(0).getShow() != null) { return showInfos.get(0).getShow().getTitle(); } else { return resourceRepository.getNotFoundShow(); } }); } } 

Code on corutines:

 class SearchShows @Inject constructor(private val showRepository: ShowRepository, private val resourceRepository: ResourceRepository) : UseCase<String>() { var query: String? = null override suspend fun executeOnBackground(): String { query?.let { val showsInfo = showRepository.searchShow(it) val showName: String? = showsInfo?.getOrNull(0)?.show?.title return showName ?: resourceRepository.notFoundShow } return "" } } 

ZIP


Zip will take two issues from the Observer and put them together in a new issue. Note that with RxJava you must specify to make the call in parallel using subscribeOn in each Single. We want to get both at the same time and return them together.

RxJava code:

 public class ShowDetail extends UseCase { private ShowRepository showRepository; private String id; @Inject public SearchShows(ShowRepository showRepository) { this.showRepository = showRepository; } public void setId(String id) { this.id = id; } @Override protected Single<Show> buildUseCaseObservable() { Single<ShowDetail> singleDetail = showRepository.showDetail(id).subscribeOn(Schedulers.io()); Single<ShowBanner> singleBanner = showRepository.showBanner(id).subscribeOn(Schedulers.io()); return Single.zip(singleDetail, singleBanner, (detail, banner) -> new Show(detail,banner)); } 

Code on corutines:

 class SearchShows @Inject constructor(private val showRepository: ShowRepository, private val resourceRepository: ResourceRepository) : UseCase<Show>() { var id: String? = null override suspend fun executeOnBackground(): Show { id?.let { val showDetail = background{ showRepository.showDetail(it) } val showBanner = background{ showRepository.showBanner(it) } return Show(showDetail.await(), showBanner.await()) } return Show() } } 

Flatmap


In this case, I'm looking for shows that have a query string and for each result (limited to 200 results), I also get a show rating. At the end, I return the list of shows with the corresponding ratings.

RxJava code:

 public class SearchShows extends UseCase { private ShowRepository showRepository; private String query; @Inject public SearchShows(ShowRepository showRepository) { this.showRepository = showRepository; } public void setQuery(String query) { this.query = query; } @Override protected Single<List<ShowResponse>> buildUseCaseObservable() { return showRepository.searchShow(query).flatMapPublisher( (Function<List<ShowInfo>, Flowable<ShowInfo>>) Flowable::fromIterable) .flatMapSingle((Function<ShowInfo, SingleSource<ShowResponse>>) showInfo -> showRepository.showRating(showInfo.getShow().getIds().getTrakt()) .map(rating -> new ShowResponse(showInfo.getShow().getTitle(), rating .getRating())).subscribeOn(Schedulers.io()), false, 4).toList(); } } 

Code on corutines:

 class SearchShows @Inject constructor(private val showRepository: ShowRepository) : UseCase<List<ShowResponse>>() { var query: String? = null override suspend fun executeOnBackground(): List<ShowResponse> { query?.let { query -> return showRepository.searchShow(query).map { background { val rating: Rating = showRepository.showRating(it.show!!.ids!!.trakt!!) ShowResponse(it.show.title!!, rating.rating) } }.map { it.await() } } return arrayListOf() } } 

Let me explain. Using RxJava, my repository returns single List emissions, so I need several emissions, one for each ShowInfo. To do this, I called flatMapPublisher. For each issue, I have to highlight ShowResponse, and at the end collect all of them in the list.

We end up with this construction: List foreach → (ShowInfo → ShowRating → ShowResponse) → List.

With Corutin, I made a map for each List element to convert it to a List <Deffered>.

As you can see, much of what we have done with RxJava is easier to implement with synchronous calls. Korutiny can even handle flatMap, which I believe are some of the most complex functions in RxJava.

It is well known that cortinas can be lightweight ( here is an example), but the results have puzzled me. In this example, RxJava ran about 3.1 seconds, while the cortinas took about 5.8 seconds to run on CommonPool.

These results put before me the question that there could be something inappropriate in them. Later, I found it. I used retrofit Call, which blocked the stream.

There are two ways to fix this error, the choice depends on which version of Android Studio you are using. In the version of Android Studio 3.1, we need to make sure that we are not blocking the background thread. For this, I used this library:
implementation 'ru.gildor.coroutines: kotlin-coroutines-retrofit: 0.12.0'

This code creates an extension to the function of retrofit Call to suspend the flow:

 public suspend fun <T : Any> Call<T>.await(): T { return suspendCancellableCoroutine { continuation -> enqueue(object : Callback<T> { override fun onResponse(call: Call<T>?, response: Response<T?>) { if (response.isSuccessful) { val body = response.body() if (body == null) { continuation.resumeWithException( NullPointerException("Response body is null: $response") ) } else { continuation.resume(body) } } else { continuation.resumeWithException(HttpException(response)) } } override fun onFailure(call: Call<T>, t: Throwable) { // Don't bother with resuming the continuation if it is already cancelled. if (continuation.isCancelled) return continuation.resumeWithException(t) } }) registerOnCompletion(continuation) } } 

In Android Studio 3.2, you can update the Corutin library to version 0.25.0. This version has CoroutineContext IO (you can see the corresponding comment in my class UseCase).

Running on CommonPool without a blocking call took 2.3 seconds and 2.4 seconds with IO and blocking calls.

image

I hope this article will inspire you to use Corutin, a more lightweight and, perhaps, faster alternative to RxJava and make it a little easier to understand that you are writing synchronized code that is executed asynchronously.

Source: https://habr.com/ru/post/421739/


All Articles