📜 ⬆️ ⬇️

Dilute asynchronous programming functionality on Scala

Greetings In this article, it will be shown how, having the usual Futures in hand, to make in scala a similarity to corutin and asynchronous stream-s. A sort of small tutorial on functional programming.

What is it and why


What is the future of human language
Future is an entity that describes the result of some calculations, which we will receive not immediately, but in the future. But there is one feature: often we, not knowing the result, know exactly what we are going to do with it. For example, we asked the server for some kind of config, and now we have Future [Config]. We haven’t received the config itself yet, but we know for sure that when we receive it, we’ll get the address from it and ask the server for a picture at this address (config => Future [Image]). And Future [Config] is capable of changing in such a way that instead of a config, and then pictures can immediately get a picture . Entities that can be combined in this way are called monads .

Unfortunately, a simple sequential combination of 2 or more asynchronous operations (download the config, and then the picture at the address from the config as an example) is all that ordinary Future as monads are capable of. They do not allow either to save state, or to do cycles from asynchronous operations, nor to produce several (or infinitely many) values. These are the shortcomings we are going to do now.

Let's present for definiteness a certain widget. It waits for a config that is updated at regular intervals, loads a value (for example, temperature) at the address from the config, and draws on the screen the current value, minimum, maximum, average, and so on. And it does everything in a loop, and even asynchronously.
')
Applying the knowledge from this article, we can describe this process like this:

Code
//  'FState' - ,    ,   -   Future def getNextConfig: FState[Config] def getTemperature(from: String): FState[Int] case class State(temperature: Int, sumTemp: Long, count: Int) { def isGood = ... } //  ,      , //       val handle = while_ ( _.isGood) { for ( config <- getNextConfig(); if (config.isDefined); //   -   nextValue <- getTemperature(config().source); //    state <- gets[State]; //      newState = State(nextValue, state.sumTemp + nextValue, state.count + 1); _ <- puts(newState); // ..    _ <- runInUiThread { drawOnScreen(newState) } ) yield() } 


Or like this:

Code
 val configs: AsyncStream[Config] = ... //  - stream  def getTemperature(from: String): FState[Int] case class State(temperature: Int, sumTemp: Long, count: Int) //    ,    'getNextConfig' // ,  ,    - stream   val handle = foreach(configs) { config => for ( nextValue <- getTemperature(config().source); //    state <- gets[State]; //      newState = State(nextValue, state.sumTemp + nextValue, state.count + 1); _ <- puts(newState); // ..    _ <- runInUiThread { drawOnScreen(newState) } ) yield() } 


Anyone who is interested, please under the cat.

Stateful asynchronous computations


This is such a Future, which allows you to save and change the state of the pseudo-algorithm inside the for-construction, the very gets [State] and puts [State]. And the fact that gives this very pseudo-algorithm some kind korutinoobraznoe.

Let's look at this interesting entity:

 // S -   , A -   case class FState[S, +A](func: S => Future[(A, S)]) { def apply(s: S) = func(s) } 

As you can see, this is a simple wrapper over a function that takes the current state and returns Future to the result together with the new state. The view of this entity is obtained by a simple combination of the Future and State monads (and what we are doing now is called the monad transformer).

Let's teach this entity to be a monad. In principle, it is enough for us to determine the unit and flatMap operations for this entity (and also the map, which is expressed in the first two), but we will immediately go through the thorny scalaz, and we will receive a bonus algebra of operations defined in terms of these two.

 class FStateMonad[S] extends Monad[({ type f[X] = FState[S, X]})#f] { type F[X] = FState[S, X] override def point[A](a: => A): F[A] override def bind[A, B](m: F[A])(f: A => F[B]): F[B] } 

answer
 class FStateMonad[S] extends Monad[({ type f[X] = FState[S, X]})#f] { type F[X] = FState[S, X] override def point[A](a: => A): F[A] = FState((s: S) => Future((a, s))) override def bind[A, B](m: F[A])(f: A => F[B]): F[B] = FState((s: S) => m(s) flatMap { pair => f(pair._1)(pair._2) }) } 


For example, we just received for free such a completely magnificent operation:

 //        - ! def whileM_[A](p: F[Boolean], body: => F[A]): F[Unit] 

But how to change the state inside the pseudo-algorithm? We can see that it does not appear in the bind combinator. We write:

 def gets[S](): FState[S, S] def puts[S](news: S): FState[S, S] 

answer
 //  ,     ,   //  ""  . def gets[S](): FState[S, S] = FState((s: S) => Future((s, s))) //      def puts[S](news: S): FState[S, S] = FState((_: S) => Future((news, news))) 


That's basically it! Now we can write something like:

 implicit val m = new FStateMonad[Int] //  -  Int //       10... val algo = for( _ <- m.whileM_(gets[Int] map (_ < 10), for( i <- gets[Int]; _ <- puts(i + 1) ) yield(())); v1 <- gets[Int] ) yield (v1) // algo(0)() should be ((10, 10)) 

But we can write ourselves as much syntactic sugar as we want, and end up with something like this:

 implicit val m = new FStateMonad[Int] //  -  Int val algo = for( //   for,  ,     _ <- m.forM_ ( _ < 10, _ + 1) { //   FState }; v1 <- gets[Int] ) yield (v1) // algo(0)() should be ((10, 10)) 

Asynchronous stream


This is just such a Future that can return more than one value asynchronously. In the second example, at the beginning of the article, we simply iterate over this stream with a stateful pseudo-algorithm. However, with stream th you can do some more different interesting things, but let's order. Let's start by building a stream.

What is stream? Simplified - a list with a tail calculated lazily. Consequently, it can be infinite. Our asynchronous AsyncStream will look something like this:

 class Pair[A, B](fp: A, sp: => B) { val first = fp lazy val second = sp } object Pair { def apply[A, B](first: A, second: => B) = new Pair[A, B](first, second) } //  stream-   Future(null),     // -  Future-,  -  . //       Option-,    null. case class AsyncStream[A](data: Future[Pair[A, AsyncStream[A]]]) 

So, everything seems to be simple: asynchronously we return the value and the tail, and the laziness of the tail should keep us from stackoverflow when using infinite streams.

But the streams have one very attractive feature: they can be minimized! We write:

 //   foldLeft,   Future[B] // -       def foldLeft[B](start: B)(f: (B, A) => B): Future[B] 

answer
 def foldLeft[B](start: B)(f: (B, A) => B): Future[B] = { // ,    ,  //   data def impl(d: Future[Pair[A, AsyncStream[A]]], acc: Future[B]): Future[B] = d flatMap (pair => { if (pair eq null) acc else impl(pair.second.data, acc map (b => f(b, pair.first))) }) impl(data, Future(start)) } 


You can still collapse like this:

 def flatten : Future[List[A]] 

answer
 def flatten : Future[List[A]] = foldLeft[List[A]](Nil)((list, el) => el :: list) map (_.reverse) 


Another couple of useful functions, mainly for working with endless streams:

 def takeWhile(p: A => Boolean): AsyncStream[A] def take(n: Int): AsyncStream[A] 

answer
 def takeWhile(p: A => Boolean): AsyncStream[A] = new AsyncStream[A](data map (pair => { if (pair eq null) null else if (!p(pair.first)) null else Pair(pair.first, pair.second.takeWhile(p)) })) def take(n: Int): AsyncStream[A] = if (n <= 0) nil else new AsyncStream[A](data map (pair => { if (pair eq null) null else Pair(pair.first, pair.second.take(n - 1)) })) 


We have learned to turn off, but to build such a stream is still inconvenient. Fix it and write a generic stream generator:

 // 'gen'   -,    // Future(null),       def genAsyncStream[S,A](start: S)(gen: S => Future[(A, S)]): AsyncStream[A] 

answer
 def genAsyncStream[S,A](start: S)(gen: S => Future[(A, S)]): AsyncStream[A] = new AsyncStream[A]( gen(start) match { case _: NoFuture => Future(null) case future => future map (pair => { // Future[Pair[A, AsyncStream]] if (pair eq null) null else Pair(pair._1, genAsyncStream(pair._2)(gen)) })}) 


By the way, with the help of foldLeft + genAsyncStream streams can be copied.

Stream-s can be connected:

 def concat[A](s1: AsyncStream[A], s2: AsyncStream[A]): AsyncStream[A] 

answer
 def concat[A](s1: AsyncStream[A], s2: AsyncStream[A]): AsyncStream[A] = new AsyncStream[A](s1.data flatMap (pair => { if (pair eq null) s2.data else Future(Pair(pair.first, concat(pair.second, s2))) })) 


And asynchronous stream is also a monad:

 class AsyncStreamMonad extends Monad[AsyncStream] { override def point[A](a: => A): AsyncStream[A] override def bind[A, B]( ma: AsyncStream[A])(f: A => AsyncStream[B]): AsyncStream[B] } 

answer
 class AsyncStreamMonad extends Monad[AsyncStream] { override def point[A](a: => A): AsyncStream[A] = unit(a) override def bind[A, B]( ma: AsyncStream[A])(f: A => AsyncStream[B]): AsyncStream[B] = new AsyncStream[B](ma.data flatMap (pair => { if (pair eq null) Future(null) else f(pair.first).data map ( pair2 => Pair(pair2.first, concat(pair2.second, bind(pair.second)(f)))) })) } 


Basically, the construction of asynchronous stream can be completed.

FState + AsyncStream =?


In fact, they get along fine. Let's look at the generator function in genAsyncStream , nothing like? Yes, it's FState!

Now let's learn how to iterate over stream:

  def foreach[A, S] (stream: AsyncStream[A])(f: A => FState[S, Any]): FState[S, Unit] 

answer
 //   foldLeft- def foreach[A, S] (stream: AsyncStream[A])(f: A => FState[S, Any]): FState[S, Unit] = FState(s => { stream.foldLeft(Future(s))( (futureS, a) => futureS.flatMap(s2 => f(a)(s2).map(_._2))).flatten.map( ((), _) ) }) 


That is, we can quite calmly write a generator that yields values ​​to the asynchronous stream, then transfer this stream somewhere, and use it to iterate over another algorithm — quite convenient, IMHO, when you need to transfer data from one program module to another and I do not want to introduce dependencies.

Total


As a result, we got a couple of entities that extend the capabilities of ordinary Future-s. I hope it was interesting.
Thanks for attention)

The code is here .

Source: https://habr.com/ru/post/281346/


All Articles