📜 ⬆️ ⬇️

How to write polymorphic programs with Arrow



Hi, Habr!

My name is Artem Dobrovinsky, I work in the company Finch . I propose to read the article of one of the fathers of the functional programming library Arrow on how to write polymorphic programs. Often, people who are just starting to write in a functional style do not rush to part with old habits, and in fact write a little more elegant imperativeness, with DI-containers and inheritance. The idea of ​​reusing functions, regardless of the types they use, can push many to think in the right direction.

Enjoy!


***


What if we could write applications without thinking about the data types that will be used in runtime, but just describe how this data will be processed?


Imagine that we have an application that works with the Observable type from the RxJava library. This type allows us to write chains of calls and data manipulations, but in the end, won't this Observable just a container with additional properties?


Same story with types like Flowable , Deferred (cortutina), Future , IO , and many others.


Conceptually, all these types represent an operation (already done or planned for future), which supports manipulations such as casting an internal value to another type ( map ), using flatMap to create a chain of operations of a similar type, combining with other instances of the same type ( zip ), etc.


In order to write programs based on these behaviors, while preserving the declarativeness of the description, and also to make your programs independent of specific data types like Observable sufficient that the data types used correspond to specific contracts, such as map , flatMap , and others. .


This approach may seem strange or overly complicated, but it has interesting advantages. First, consider a simple example, and then talk about them.


Canonical problem


Imagine that we have an application with a to-do list, and we would like to retrieve from the local cache a list of objects of type Task . If they are not found in the local storage, we will try to request them over the network. We need a single contract for both data sources so that they both can get a list of Task objects for the appropriate User object, regardless of the source:


 interface DataSource { fun allTasksByUser(user: User): Observable<List<Task>> } 

Here, for simplicity, we return Observable , but it can be Single , Maybe , Flowable , Deferred — anything suitable for achieving the goal.


Add a couple of implementations of data sources, one for and one for .


 class LocalDataSource : DataSource { private val localCache: Map<User, List<Task>> = mapOf(User(UserId("user1")) to listOf(Task("LocalTask assigned to user1"))) override fun allTasksByUser(user: User): Observable<List<Task>> = Observable.create { emitter -> val cachedUser = localCache[user] if (cachedUser != null) { emitter.onNext(cachedUser) } else { emitter.onError(UserNotInLocalStorage(user)) } } } class RemoteDataSource : DataSource { private val internetStorage: Map<User, List<Task>> = mapOf(User(UserId("user2")) to listOf(Task("Remote Task assigned to user2"))) override fun allTasksByUser(user: User): Observable<List<Task>> = Observable.create { emitter -> val networkUser = internetStorage[user] if (networkUser != null) { emitter.onNext(networkUser) } else { emitter.onError(UserNotInRemoteStorage(user)) } } } 

The implementations of both data sources are almost identical. These are simply mocked versions of these sources, which ideally fetch data from local storage or the network API. In both cases, Map<User, List<Task>> is stored in memory.


Because we have two data sources, we need to somehow coordinate them. Create a repository:


 class TaskRepository(private val localDS: DataSource, private val remoteDS: RemoteDataSource) { fun allTasksByUser(user: User): Observable<List<Task>> = localDS.allTasksByUser(user) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .onErrorResumeNext { _: Throwable -> remoteDS.allTasksByUser(user) } } 

It simply tries to load the List<Task> from the LocalDataSource , and if that is not found, it tries to request them from the network using the RemoteDataSource .


Let's create a simple module to provide dependencies without using any dependency injection framework (DI):


 class Module { private val localDataSource: LocalDataSource = LocalDataSource() private val remoteDataSource: RemoteDataSource = RemoteDataSource() val repository: TaskRepository = TaskRepository(localDataSource, remoteDataSource) } 

Finally, we need a simple test that runs the entire stack of operations:


 object test { @JvmStatic fun main(args: Array<String>): Unit { val user1 = User(UserId("user1")) val user2 = User(UserId("user2")) val user3 = User(UserId("unknown user")) val dependenciesModule = Module() dependenciesModule.run { repository.allTasksByUser(user1).subscribe({ println(it) }, { println(it) }) repository.allTasksByUser(user2).subscribe({ println(it) }, { println(it) }) repository.allTasksByUser(user3).subscribe({ println(it) }, { println(it) }) } } } 

All the above code can be found on a githaba .


This program composes a run chain for three users, then subscribes to the resulting Observable .


The first two User objects are accessible; we are lucky with that. User1 is available in the local DataSource , and User2 is available on the remote.


But there is a problem with User3 , because it is not available in the local storage. The program will try to download it from a remote service - but there it is not there either. The search will fail, and we will display an error message to the console.


Here is what will be displayed in the console for all three cases:


 > [Task(value=LocalTask assigned to user1)] > [Task(value=Remote Task assigned to user2)] > UserNotInRemoteStorage(user=User(userId=UserId(value=unknown user))) 

We are done with an example. Now we will try to program this logic in the style of .


Data type abstraction


Now the contract for the DataSource interface will look like this:


 interface DataSource<F> { fun allTasksByUser(user: User): Kind<F, List<Task>> } 

Everything seems to be similar, but there are two important differences:



Kind is how Arrow encodes what is commonly called the (higher kind) .
Let me explain this concept with a simple example.


Observable<A> has 2 parts:



We are used to seeing generic types like A as an abstraction. But not many know that we can also abstract container types like Observable . For this and there are high types.


The idea is that we can have a constructor like F<A> in which both F and A can be a generic type. This syntax is not yet supported by the Kotlin compiler ( still? ), So we mimic it with a similar approach.


Arrow supports this by using the Kind<F, A> intermediate meta interface, which holds references to both types, and also generates converters in both directions during compilation so that you can follow the path from Kind<Observable, List<Task>> to Observable<List<Task>> and vice versa. Not an ideal solution, but working.


Therefore, we will again look at the interface of our repository:


 interface DataSource<F> { fun allTasksByUser(user: User): Kind<F, List<Task>> } 

The DataSource function returns a high type: Kind<F, List<Task>> . It is translated to F<List<Task>> , where F remains generic.


We fix only the List<Task> in the signature. In other words, we don’t care what type F container is used as long as it contains List<Task> . We can pass different data containers to a function. Already clearer? Go ahead.


Let's take a look at the DataSource implemented in this way, but this time for each separately. First local:


 class LocalDataSource<F>(A: ApplicativeError<F, Throwable>) : DataSource<F>, ApplicativeError<F, Throwable> by A { private val localCache: Map<User, List<Task>> = mapOf(User(UserId("user1")) to listOf(Task("LocalTask assigned to user1"))) override fun allTasksByUser(user: User): Kind<F, List<Task>> = Option.fromNullable(localCache[user]).fold( { raiseError(UserNotInLocalStorage(user)) }, { just(it) } ) } 

A lot of new things have been added, we will analyze everything step by step.


This DataSource saves the generic type F because it implements the DataSource<F> . We want to keep the possibility of transmission of this type from the outside.


Now, let's forget about the possibly unfamiliar ApplicativeError in the constructor and focus on the allTasksByUser() function. And we will come back to ApplicativeError .


 override fun allTasksByUser(user: User): Kind<F, List<Task>> = Option.fromNullable(localCache[user]).fold( { raiseError(UserNotInLocalStorage(user)) }, { just(it) } ) 

It is seen that it returns Kind<F, List<Task>> . We still don’t care what the container F as long as it contains List<Task> .


But there is a problem. Depending on whether we can find a list of Task objects for the desired user in the local storage or not, we want to report an error ( Task not found) or return Task already wrapped in F ( Task found).


And for both cases we need to return: Kind<F, List<Task>> .


In other words: there is a type about which we know nothing ( F ), and we need a way to return an error wrapped in this type. Plus, we need a way to create an instance of this type, in which the value obtained after the successful completion of the function will be wrapped. Sounds like something impossible?


Let's return to the class declaration and note that the ApplicativeError is passed to the constructor and then used as a delegate for the class ( by A ).


 class LocalDataSource<F>(A: ApplicativeError<F, Throwable>) : DataSource<F>, ApplicativeError<F, Throwable> by A { //... } 

ApplicativeError inherited from Applicative ; they are both type classes.


Type classes define behaviors (contracts). They are encoded as interfaces that work with arguments as generic types, as in Monad<F> , Functor<F> and many others. This F is a data type. Thus we can transfer types like Either , Option , IO , Observable , Flowable and many others.


So back to our two problems:



For this we can use a class of type Applicative . Because ApplicativeError inherits from it, we can delegate its properties.


Applicative simply provides the just(a) function. just(a) wraps the value in the context of any high type. Thus, if we have Applicative<F> , it can call just(a) to wrap the value in container F , whatever that value is. Suppose we use Observable , we will have Applicative<Observable> , which knows how to wrap a in Observable , to end up with Observable.just(a) .



For this we can use ApplicativeError . It provides a raiseError(e) function that wraps the error in a container of type F For an example with Observable , the appearance of an error will create something like Observable.error<A>(t) , where t is Throwable , since we declared our type of error as a class of type ApplicativeError<F, Throwable> .


Let's look at our abstract implementation of LocalDataSource<F> .


 class LocalDataSource<F>(A: ApplicativeError<F, Throwable>) : DataSource<F>, ApplicativeError<F, Throwable> by A { private val localCache: Map<User, List<Task>> = mapOf(User(UserId("user1")) to listOf(Task("LocalTask assigned to user1"))) override fun allTasksByUser(user: User): Kind<F, List<Task>> = Option.fromNullable(localCache[user]).fold( { raiseError(UserNotInLocalStorage(user)) }, { just(it) } ) } 

The Map<User, List<Task>> saved in memory remains the same, but now the function does a couple of things that may be new to you:



Thus, we abstract the implementation of data sources using classes so that they do not know which container will be used for the type F .


The implementation of the network DataSource looks similar:


 class RemoteDataSource<F>(A: Async<F>) : DataSource<F>, Async<F> by A { private val internetStorage: Map<User, List<Task>> = mapOf(User(UserId("user2")) to listOf(Task("Remote Task assigned to user2"))) override fun allTasksByUser(user: User): Kind<F, List<Task>> = async { callback: (Either<Throwable, List<Task>>) -> Unit -> Option.fromNullable(internetStorage[user]).fold( { callback(UserNotInRemoteStorage(user).left()) }, { callback(it.right()) } ) } } 

But there is one small difference: instead of delegating to the ApplicativeError instance, we use another class of the type: Async .


This is due to the fact that, by their nature, network calls are asynchronous. We want to write code that will be executed asynchronously, it is logical to use a type class designed for this.


Async used to simulate asynchronous operations. He can simulate any operation based on callbacks. Note that we still do not know the specific data types, we simply describe an operation asynchronous in nature.


Consider the following function:


 override fun allTasksByUser(user: User): Kind<F, List<Task>> = async { callback: (Either<Throwable, List<Task>>) -> Unit -> Option.fromNullable(internetStorage[user]).fold( { callback(UserNotInRemoteStorage(user).left()) }, { callback(it.right()) } ) } 

We can use the async {} function, which is provided to us by a class of type Async to simulate the operation and create an instance of type Kind<F, List<Task>> which will be created asynchronously.


If we used a fixed data type like Observable , Async.async {} would be equivalent to Observable.create() , i.e. creating an operation that can be called from a synchronous or asynchronous code, such as Thread or AsyncTask .


The callback parameter is used to bundle the resulting callbacks into the context of container F , which is a high type.


Thus, our RemoteDataSource abstracted and depends on a still unknown type F container.


Let's rise on level of abstraction above and once again look at our repository. If you remember, we first need to search for the Task objects in the LocalDataSource , and only then (if they were not found locally) request them from the RemoteLocalDataSource .


 class TaskRepository<F>( private val localDS: DataSource<F>, private val remoteDS: RemoteDataSource<F>, AE: ApplicativeError<F, Throwable>) : ApplicativeError<F, Throwable> by AE { fun allTasksByUser(user: User): Kind<F, List<Task>> = localDS.allTasksByUser(user).handleErrorWith { when (it) { is UserNotInLocalStorage -> remoteDS.allTasksByUser(user) else -> raiseError(UnknownError(it)) } } } 

ApplicativeError<F, Throwable> again with us! It also provides the handleErrorWith() function, which runs on top of any high-type receiver.


It looks like this:


 fun <A> Kind<F, A>.handleErrorWith(f: (E) -> Kind<F, A>): Kind<F, A> 

Because localDS.allTasksByUser(user) returns Kind<F, List<Task>> , which can be viewed as F<List<Task>> , where F remains a generic type, we can call handleErrorWith() on top of it.


handleErrorWith() allows you to respond to errors using the passed lambda. Consider the function closer:


 fun allTasksByUser(user: User): Kind<F, List<Task>> = localDS.allTasksByUser(user).handleErrorWith { when (it) { is UserNotInLocalStorage -> remoteDS.allTasksByUser(user) else -> raiseError(UnknownError(it)) } } 

Thus, we get the result of the first operation, except when an exception was thrown. The exception will be handled by lambda. If the error is of type UserNotInLocalStorage , we will try to find Tasks objects in the remote DataSource . In all other cases, we wrap an unknown error in a type F container.


The dependency rendering module remains very similar to the previous version:


 class Module<F>(A: Async<F>) { private val localDataSource: LocalDataSource<F> = LocalDataSource(A) private val remoteDataSource: RemoteDataSource<F> = RemoteDataSource(A) val repository: TaskRepository<F> = TaskRepository(localDataSource, remoteDataSource, A) } 

The only difference is that it is now abstract and depends on F , which remains polymorphic. I deliberately did not pay attention to this in order to reduce the noise level, but Async inherited from ApplicativeError , so it can be used as its instance at all levels of program execution.


Testing polymorphism


Finally, our application is completely abstracted from using specific data types for containers ( F ) and we can focus on testing polyformism in runtime. We will test the same piece of code passing different data types for type F . The scenario is the same as when we used Observable .


The program is written in such a way that we have completely got rid of the boundaries of abstractions and can transfer implementation details as we like.


To begin with, we will try to use as a container for F Single from RxJava.


 object test { @JvmStatic fun main(args: Array<String>): Unit { val user1 = User(UserId("user1")) val user2 = User(UserId("user2")) val user3 = User(UserId("unknown user")) val singleModule = Module(SingleK.async()) singleModule.run { repository.allTasksByUser(user1).fix().single.subscribe(::println, ::println) repository.allTasksByUser(user2).fix().single.subscribe(::println, ::println) repository.allTasksByUser(user3).fix().single.subscribe(::println, ::println) } } } 

Compatibility for the sake of Arrow provides wrappers for known library data types. For example, there is a convenient wrapper SingleK . These wrappers allow you to use type classes in conjunction with data types as high types.


The following will be displayed on the console:


 [Task(value=LocalTask assigned to user1)] [Task(value=Remote Task assigned to user2)] UserNotInRemoteStorage(user=User(userId=UserId(value=unknown user))) 

The same result will be if you use Observable .


Now let's work with Maybe , for which the MaybeK wrapper is MaybeK :


 @JvmStatic fun main(args: Array<String>): Unit { val user1 = User(UserId("user1")) val user2 = User(UserId("user2")) val user3 = User(UserId("unknown user")) val maybeModule = Module(MaybeK.async()) maybeModule.run { repository.allTasksByUser(user1).fix().maybe.subscribe(::println, ::println) repository.allTasksByUser(user2).fix().maybe.subscribe(::println, ::println) repository.allTasksByUser(user3).fix().maybe.subscribe(::println, ::println) } } 

The same result will be output to the console, but now using a different data type:


 [Task(value=LocalTask assigned to user1)] [Task(value=Remote Task assigned to user2)] UserNotInRemoteStorage(user=User(userId=UserId(value=unknown user))) 

What about ObservableK / FlowableK ?
Let's try:


 object test { @JvmStatic fun main(args: Array<String>): Unit { val user1 = User(UserId("user1")) val user2 = User(UserId("user2")) val user3 = User(UserId("unknown user")) val observableModule = Module(ObservableK.async()) observableModule.run { repository.allTasksByUser(user1).fix().observable.subscribe(::println, ::println) repository.allTasksByUser(user2).fix().observable.subscribe(::println, ::println) repository.allTasksByUser(user3).fix().observable.subscribe(::println, ::println) } val flowableModule = Module(FlowableK.async()) flowableModule.run { repository.allTasksByUser(user1).fix().flowable.subscribe(::println) repository.allTasksByUser(user2).fix().flowable.subscribe(::println) repository.allTasksByUser(user3).fix().flowable.subscribe(::println, ::println) } } } 

See in the console:


 [Task(value=LocalTask assigned to user1)] [Task(value=Remote Task assigned to user2)] UserNotInRemoteStorage(user=User(userId=UserId(value=unknown user))) [Task(value=LocalTask assigned to user1)] [Task(value=Remote Task assigned to user2)] UserNotInRemoteStorage(user=User(userId=UserId(value=unknown user))) 

Everything works as expected.


Let's try to use DeferredK , a wrapper for the type kotlinx.coroutines.Deferred :


 object test { @JvmStatic fun main(args: Array<String>): Unit { val user1 = User(UserId("user1")) val user2 = User(UserId("user2")) val user3 = User(UserId("unknown user")) val deferredModule = Module(DeferredK.async()) deferredModule.run { runBlocking { try { println(repository.allTasksByUser(user1).fix().deferred.await()) println(repository.allTasksByUser(user2).fix().deferred.await()) println(repository.allTasksByUser(user3).fix().deferred.await()) } catch (e: UserNotInRemoteStorage) { println(e) } } } } } 

As you know, exception handling when using corutin has to be explicitly prescribed. , , .


— :


 [Task(value=LocalTask assigned to user1)] [Task(value=Remote Task assigned to user2)] UserNotInRemoteStorage(user=User(userId=UserId(value=unknown user))) 

Arrow API DeferredK . runBlocking :


 object test { @JvmStatic fun main(args: Array<String>): Unit { val user1 = User(UserId("user1")) val user2 = User(UserId("user2")) val user3 = User(UserId("unknown user")) val deferredModuleAlt = Module(DeferredK.async()) deferredModuleAlt.run { println(repository.allTasksByUser(user1).fix().unsafeAttemptSync()) println(repository.allTasksByUser(user2).fix().unsafeAttemptSync()) println(repository.allTasksByUser(user3).fix().unsafeAttemptSync()) } } } 

[ Try ]({{ '/docs/arrow/core/try/ru' | relative_url }}) (.., Success Failure ).


 Success(value=[Task(value=LocalTask assigned to user1)]) Success(value=[Task(value=Remote Task assigned to user2)]) Failure(exception=UserNotInRemoteStorage(user=User(userId=UserId(value=unknown user)))) 

, , IO .
IO , in/out , , .


 object test { @JvmStatic fun main(args: Array<String>): Unit { val user1 = User(UserId("user1")) val user2 = User(UserId("user2")) val user3 = User(UserId("unknown user")) val ioModule = Module(IO.async()) ioModule.run { println(repository.allTasksByUser(user1).fix().attempt().unsafeRunSync()) println(repository.allTasksByUser(user2).fix().attempt().unsafeRunSync()) println(repository.allTasksByUser(user3).fix().attempt().unsafeRunSync()) } } } 

 Right(b=[Task(value=LocalTask assigned to user1)]) Right(b=[Task(value=Remote Task assigned to user2)]) Left(a=UserNotInRemoteStorage(user=User(userId=UserId(value=unknown user)))) 

IO — . Either<L,R> ( ). , "" Either , "" , . Right(...) , , Left(...) .


.


, . , , , .


.


… ?


, , . .



Additionally


, .
, , , , .


, . — Twitter: @JorgeCastilloPR .


(, ) :



FP to the max John De Goes FpToTheMax.kt , arrow-examples . , , .


')

Source: https://habr.com/ru/post/447234/


All Articles