//  'FState' - ,    ,   -   Future def getNextConfig: FState[Config] def getTemperature(from: String): FState[Int] case class State(temperature: Int, sumTemp: Long, count: Int) { def isGood = ... } //  ,      , //       val handle = while_ ( _.isGood) { for ( config <- getNextConfig(); if (config.isDefined); //   -   nextValue <- getTemperature(config().source); //    state <- gets[State]; //      newState = State(nextValue, state.sumTemp + nextValue, state.count + 1); _ <- puts(newState); // ..    _ <- runInUiThread { drawOnScreen(newState) } ) yield() }  val configs: AsyncStream[Config] = ... //  - stream  def getTemperature(from: String): FState[Int] case class State(temperature: Int, sumTemp: Long, count: Int) //    ,    'getNextConfig' // ,  ,    - stream   val handle = foreach(configs) { config => for ( nextValue <- getTemperature(config().source); //    state <- gets[State]; //      newState = State(nextValue, state.sumTemp + nextValue, state.count + 1); _ <- puts(newState); // ..    _ <- runInUiThread { drawOnScreen(newState) } ) yield() }  // S -   , A -   case class FState[S, +A](func: S => Future[(A, S)]) { def apply(s: S) = func(s) }  class FStateMonad[S] extends Monad[({ type f[X] = FState[S, X]})#f] { type F[X] = FState[S, X] override def point[A](a: => A): F[A] override def bind[A, B](m: F[A])(f: A => F[B]): F[B] }  class FStateMonad[S] extends Monad[({ type f[X] = FState[S, X]})#f] { type F[X] = FState[S, X] override def point[A](a: => A): F[A] = FState((s: S) => Future((a, s))) override def bind[A, B](m: F[A])(f: A => F[B]): F[B] = FState((s: S) => m(s) flatMap { pair => f(pair._1)(pair._2) }) }  //        - ! def whileM_[A](p: F[Boolean], body: => F[A]): F[Unit]  def gets[S](): FState[S, S] def puts[S](news: S): FState[S, S]  //  ,     ,   //  ""  . def gets[S](): FState[S, S] = FState((s: S) => Future((s, s))) //      def puts[S](news: S): FState[S, S] = FState((_: S) => Future((news, news)))  implicit val m = new FStateMonad[Int] //  -  Int //       10... val algo = for( _ <- m.whileM_(gets[Int] map (_ < 10), for( i <- gets[Int]; _ <- puts(i + 1) ) yield(())); v1 <- gets[Int] ) yield (v1) // algo(0)() should be ((10, 10))  implicit val m = new FStateMonad[Int] //  -  Int val algo = for( //   for,  ,     _ <- m.forM_ ( _ < 10, _ + 1) { //   FState }; v1 <- gets[Int] ) yield (v1) // algo(0)() should be ((10, 10))  class Pair[A, B](fp: A, sp: => B) { val first = fp lazy val second = sp } object Pair { def apply[A, B](first: A, second: => B) = new Pair[A, B](first, second) } //  stream-   Future(null),     // -  Future-,  -  . //       Option-,    null. case class AsyncStream[A](data: Future[Pair[A, AsyncStream[A]]])  //   foldLeft,   Future[B] // -       def foldLeft[B](start: B)(f: (B, A) => B): Future[B]  def foldLeft[B](start: B)(f: (B, A) => B): Future[B] = { // ,    ,  //   data def impl(d: Future[Pair[A, AsyncStream[A]]], acc: Future[B]): Future[B] = d flatMap (pair => { if (pair eq null) acc else impl(pair.second.data, acc map (b => f(b, pair.first))) }) impl(data, Future(start)) }  def flatten : Future[List[A]]  def flatten : Future[List[A]] = foldLeft[List[A]](Nil)((list, el) => el :: list) map (_.reverse)  def takeWhile(p: A => Boolean): AsyncStream[A] def take(n: Int): AsyncStream[A]  def takeWhile(p: A => Boolean): AsyncStream[A] = new AsyncStream[A](data map (pair => { if (pair eq null) null else if (!p(pair.first)) null else Pair(pair.first, pair.second.takeWhile(p)) })) def take(n: Int): AsyncStream[A] = if (n <= 0) nil else new AsyncStream[A](data map (pair => { if (pair eq null) null else Pair(pair.first, pair.second.take(n - 1)) }))  // 'gen'   -,    // Future(null),       def genAsyncStream[S,A](start: S)(gen: S => Future[(A, S)]): AsyncStream[A]  def genAsyncStream[S,A](start: S)(gen: S => Future[(A, S)]): AsyncStream[A] = new AsyncStream[A]( gen(start) match { case _: NoFuture => Future(null) case future => future map (pair => { // Future[Pair[A, AsyncStream]] if (pair eq null) null else Pair(pair._1, genAsyncStream(pair._2)(gen)) })})  def concat[A](s1: AsyncStream[A], s2: AsyncStream[A]): AsyncStream[A]  def concat[A](s1: AsyncStream[A], s2: AsyncStream[A]): AsyncStream[A] = new AsyncStream[A](s1.data flatMap (pair => { if (pair eq null) s2.data else Future(Pair(pair.first, concat(pair.second, s2))) }))  class AsyncStreamMonad extends Monad[AsyncStream] { override def point[A](a: => A): AsyncStream[A] override def bind[A, B]( ma: AsyncStream[A])(f: A => AsyncStream[B]): AsyncStream[B] }  class AsyncStreamMonad extends Monad[AsyncStream] { override def point[A](a: => A): AsyncStream[A] = unit(a) override def bind[A, B]( ma: AsyncStream[A])(f: A => AsyncStream[B]): AsyncStream[B] = new AsyncStream[B](ma.data flatMap (pair => { if (pair eq null) Future(null) else f(pair.first).data map ( pair2 => Pair(pair2.first, concat(pair2.second, bind(pair.second)(f)))) })) }   def foreach[A, S] (stream: AsyncStream[A])(f: A => FState[S, Any]): FState[S, Unit]  //   foldLeft- def foreach[A, S] (stream: AsyncStream[A])(f: A => FState[S, Any]): FState[S, Unit] = FState(s => { stream.foldLeft(Future(s))( (futureS, a) => futureS.flatMap(s2 => f(a)(s2).map(_._2))).flatten.map( ((), _) ) }) Source: https://habr.com/ru/post/281346/
All Articles