📜 ⬆️ ⬇️

The transfer pattern scala.concurrent.Promise to the actor: usage features and alternatives

In the process of supporting various projects, I found myself several times in a situation in which, due to incorrect work with Promise there were problems in production. Moreover, the pattern of this very wrong work was always the same, but it was hidden in different guises. Moreover, the erroneous code was written by different people. In addition, I haven’t really found a mention of the problem I want to cover in any article about working with Promise . So I assume that many people forget about the problem, about which I will tell.


Interesting to read a lot of examples of asynchronous code on Scala, with promises, futures and actors? Welcome under the cut!


Little bit about the future


For a start, a little theory about Future as an introduction. Those familiar with this type from the standard Scala library can safely skip this part.


In Scala, Future[T] used to represent asynchronous computing. Suppose we need to pull the value from the DBMS by key. The synchronous signature for such a query would look something like this:


 trait SyncKVStore[K, V] { def get(key: K): V } 

Then it could be used like this:


 val store: SyncKVStore[String, String] = ... val value = store.get("1234") // value   String 

This approach has a drawback: the get() method can be blocking, and in view of the possible data transmission over the network for a relatively long time. Let's try to make it non-blocking, then the treit will look like this:


 import scala.concurrent.Future trait KVStore[K, V] { def get(key: K): Future[V] } 

It is possible that in order to correctly rewrite the implementation we will need to use an asynchronous driver for the DBMS we use. For example, let's write the in-memory implementation on the map:


 import scala.concurrent.ExecutionContext class DummyKVStore(implicit ec: ExecutionContext) extends KVStore[String, String] { // Future(...)       //      ExecutionContext def get(key: String): Future[String] = Future(map(key)) private val map = Map("1234" -> "42", "42 -> 13", "1a" -> "b3") } 

We can use the obtained value, for example, as follows:


 //  -    ExecutionContext import scala.concurrent.ExecutionContext.Implicits.global val store = new DummyKVStore // map    ExecutionContext //        store.get("1234").map { value => log.info(value) //  DummyKVStore  42 } 

Future has several useful asynchronous value processing methods, we will briefly describe some of them:



It is also worth clarifying that all of these methods also accept the implicit ec: ExecutionContext parameter, which contains information about the execution context of the futures, as the name suggests.


For more information about futures, you can read, for example, here .


Promise: a couple more paragraphs of the theory


So what is a Promise ? In fact, this is a typed write-once container containing futures:


 val p = Promise[Int]() p.success(42) p.future //  Future   42 

It has many methods to complete the futures, for example:



Thus, promises can be used to create futures.


Promise: immersed in the practice of using


Imagine that we want to implement our own asynchronous API on top of a ready-made asynchronous Java API on callbacks. Since the result that we want to return in the futures is available only in a callback, we cannot directly use the Future.apply() method. This is where Promise will help us. In this answer to SO, there would seem to be a great example of using Promise in the real world:


 def makeHTTPCall(request: Request): Future[Response] = { val p = Promise[Response]() registerOnCompleteCallback { buffer => val response = makeResponse(buffer) p.success(response) } p.future } 

Well, we use this function in our new web service, for example, on Akka-HTTP. First, let's connect the dependency to SBT:


 libraryDependencies += "com.typesafe.akka" %% "akka-http" % "10.0.10" 

And write the service code:


 import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.marshalling.GenericMarshallers.futureMarshaller import scala.concurrent.ExecutionContext.Implicits.global object WebService extends App { //        Akka-HTTP implicit val system = ActorSystem() val store = new DummyKVStore val route = //   GET /value/$id (get & path("value" / IntNumber)) { id => complete { for { value <- store.get(id) response <- makeHTTPCall(new RequestImpl(value)) } yield response } } Http().bindAndHandle(route, "localhost", 80) } 

Note: the complete() method from Akka-HTTP can accept Future[T] , for this futureMarshaller imported. He will respond to an HTTP request after the end of the futures.


We also decided to drop a task, which will send the value for some key to all emails from our database via some HTTP API. Moreover, it does this in a cycle: after the distribution is completed, all clients begin to do it again.


 def allEmails: Seq[String] = ... def sendEmails(implicit ec: ExecutionContext): Future[Unit] = { Future.sequence { for { email <- allEmails } yield for { value <- store.get("42") response <- makeHTTPCall(new SendMailRequest(email, value)) } yield response }.flatMap(_ => sendEmails) //          } 

Put it all in production. However, in a couple of days our API consumers come to us and complain about the periodic service rollbacks by timeout. And after three days, we discover that the task has stopped sending mail! What's the matter?


In the log, we see the following spectra:


 some.package.SomeException at some.package.makeResponse(...) at some.package.$anonfun$makeHTTPCall$1(...) ... 

It turns out that the makeResponse() method makeResponse() thrown an exception. Looking at the sources of makeHTTPCall() , you can see that in this case, the futures that it returns never end!


 val p = Promise[Response] registerOnCompleteCallback(buffer => { val response = makeResponse(buffer) //     p.success(response) //  success()   }) p.future 

That is why our API fell off on a timeout, and the mailing cycle stopped working. Alas, in Scala we cannot program without thinking that any function can return an event as many want ...


So, we recall that the Try.apply() method can intercept exceptions and return Success with a value, or Failure with an exception thrown. Fix the lambda body in a naive way and send a review to the code:


 import scala.util._ Try(makeResponse(buffer)) match { case Success(r) => p.success(r) case Failure(e) => p.failure(e) } 

However, the review tells us that the promise has a complete() method, which itself does the same thing that we wrote with our hands:


 p.complete(Try(makeResponse(buffer)) 

So, what we learned about the Promise :


  1. If the Promise declared at the beginning of the method, and its futures are returned at the end, this does not mean that this futures will ever end.
  2. It would be useful to consider Promise as a resource that must always be closed. However, promis is usually declared and closed in different threads, so using it as a resource using any standard language constructs ( try-finally ), or even libraries ( scala-arm ) is problematic.

Perhaps someone will say that this is a far-fetched example and in real life no one forgets to close promises? Well, for such skeptics, I have the answer in the form of several real bugs / PRs in Akka related to inter-promises that are uncomplicated in certain situations:


  1. QueueSource does not complete onComplete future on abrupt termination
  2. IgnoreSink doesn’t complete mat
  3. Calling for a complete complementary future

In addition, it is not always so simple and obvious, as in this example.


The last piece of theory: a small introduction to the actors


Familiar with the actors can skip this part.


We will need some knowledge of the Akka actors later on in this article. Let's connect the akka-actor module to our project, an example for SBT:


 libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.5.7" 

Actor in Akka is an object with some behavior, asynchronously receiving messages. By default, behavior is defined in the receive method:


 import akka.actor._ import akka.actor.Actor.Receive val log: Logger = ourLogObject() case object HelloRequest class HelloActor extends Actor { // Receive --     def receive: Receive = { //    HelloRequest,    "hello!" case HelloRequest => log.info("hello!") } } //       val system = ActorSystem() //       val actor: ActorRef = system.actorOf(Props(new HelloActor)) //   "hello!"   actor ! HelloRequest 

After creation, the actor is available not directly, but through a proxy called ActorRef . Through this proxy, you can asynchronously send messages using the method ! (alias for tell ), and these messages will be processed by the method that determines the behavior of the actor. Messages must be serializable, so often a case object created for messages (with no message parameters) or a case class . The actor can process at most one message at a time, so it can be used as a synchronization primitive as well.


There is one more important point: the actor can change its function of behavior, that is, in fact, the processing of messages. To do this, you need to call the context.become(newReceive) method in the actor, where newReceive is a Receive type parameter. After that, starting with the next message, the processing by the function newReceive will begin instead of the default receive .


We connect parts of a pattern: we transfer promis to the actor


So let's move on to the next example.


Suppose we need to write a client for some service. For example, for booking. Suppose we want to be able to get hotel information by id.


 case class Hotel(id: Long, name: String, country: String) //    trait BookingClient { def getHotel(id: Long): Future[Hotel] } 

Now you need to define a method that will access the booking API and process the response. For this we use the asynchronous HTTP client of the Akka-HTTP library. Let's connect it by dependency:


 libraryDependencies ++= "com.typesafe.akka" %% "akka-http" % "10.0.10" 

They want to run our method with a fairly large RPS for a short time, and the response time is not very important. The Akka-HTTP client has a feature: it does not allow to run more in parallel than akka.http.host-connection-pool.max-connections requests . We will use an extremely simple solution: we will make it so that all requests go through the actor, that is, into one stream (the actual solution was a bit more complicated, but it does not matter for our example). Since we want to return the futures, we will create a promise and pass it to the actor, and then in the actor we compile it.


 // implicit system  materializer     Akka-HTTP; ec   Future.foreach() class HttpBookingClient(baseUri: String)(implicit system: ActorSystem, materializer: ActorMaterializer, ec: ExecutionContext) { override def getHotel(id: Long): Future[Hotel] = { val p = Promise[Hotel]() actor ! Request(id, p) p.future } private val actor = system.actorOf(Props(new ClientActor)) private case class Request(id: Long, p: Promise[Hotel]) private case object Completed private class ClientActor extends Actor { //   ,      override def receive: Receive = { case Request(id, p) => val uri = Uri(baseUri).withQuery(Query("id" -> id.toString)) val eventual = Http().singleRequest(HttpRequest(uri = uri)) //        API eventual.foreach { response => p.completeWith { val unmarshalled = Unmarshal(response.entity) //       200,      response.status match { case StatusCodes.OK => //   Unmarshaller[Hotel], ,   akka-http-spray-json //          unmarshalled.to[Hotel] case _ => unmarshalled.to[String].flatMap(error => Future.failed(new Exception(error))) } }) } p.future.onComplete(_ => self ! Completed) //      running context.become(running) } //  ,       //      private def running: Receive = { case request: Request => //       ,   ;    self ! request case Completed => //       context.become(receive) } } } 

And again, after putting it out, everything went well at first, but then we received a bug report with the heading "The getHotel() method returns incomplete futures". Why did this happen? It looks like we have foreseen everything, used the completeWith() method on the whole body of the lambda ... Nevertheless, under certain conditions, the futures still stick.


The thing is that the lambda passed to the foreach() method will start only when the eventual eventual successfully complete. Thus, if this futures spilled (for example, the grid fell off), the promis will never fail to complete!


It can be assumed that the fix is ​​relatively trivial: instead of foreach() you need to use onComplete() , and in the lambda passed to it, handle the error. Like that:


 eventual.onComplete { case Success(response) => //    ,     foreach... case Failure(e) => p.failure(e) } 

This will solve the problem of sticking futures specifically because of the eventual eventual , but it does not solve all possible problems with sticking futures transmitted in this way.


For the sake of simplicity, we will implement a simpler and at the same time more general example of the transfer of promis to the actor to complete the futures:


 case class Request(arg: String, p: Promise[String]) trait GimmeActor { val actor: ActorRef def function(arg: String): Future[String] = { val p = Promise[String]() actor ! Request(arg, p) p.future } } 

By the way, the funcion() body type construction is often found, for example, in Akka source code and other libraries. In the same Akka you can find several dozen uses of Promise , written according to this pattern:


  1. Create Promise
  2. Passing it with one of the arguments of the asynchronous function (moreover, in Akka sources, this is often just the actor.tell() call)
  3. Return the field to the future created Promise .

A couple of examples for clarity:


  1. Here in the L129 callback, the transfer of p to the actor takes place.
  2. And here the promise is sent directly in the parameters for creating the actor.

There are at least a couple of problems with this pattern.



 class DoesntEvenKnowAboutRequest extends Actor { def receive: Receive = { case PoisonPill => } } class HandlesRequestWrongWay extends Actor { def receive: Receive = { case Request(arg, p) => } } 

In both cases, the promise will obviously not be completed.


On the one hand, it can be said that these examples are too synthetic. On the other hand, the compiler does not protect us from such errors. In addition, this problem must be remembered not only for actors, but also for the transfer of promises to ordinary asynchronous functions.



 class AlwaysCompletesRequest extends Actor { def receive: Receive = { case Request(arg, p) => p.success("42") } 

We are fine, our services using this actor work fine. However, we decided to start the service on one server at the beginning.


But now, the company is growing, the number of users too, and with it the number of requests to our actor. At peak loads, the actor stopped keeping up with the flow of messages in the required time and we decided to do horizontal scaling: to run AlwaysCompletesRequest on different nodes in the cluster. To organize a cluster, you need to use akka-cluster, however, for simplicity, we will not organize a cluster in the article, but simply turn to one remote actor AlwaysCompletesRequest .


We need to create an ActorSystem with the support of akka-remote on both JVMs: accessing the actor and hosting it. To do this, add the following configuration to the application.conf both services:


 akka { actor { provider = remote } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "some-hostname" port = 2552 #     ;      -  } } } 

You also need to add a dependence on akka-remote for both services, an example for SBT:


 libraryDependencies += "com.typesafe.akka" %% "akka-remote" % "2.5.7" 

Now create an actor on the server:


 val system = ActorSystem("server") system.actorOf(Props(new AlwaysCompletesRequest), "always-complete-actor") 

Then get it on the client:


 val system = ActorSystem("client") val actor = system.actorSelection("akka.tcp://server@some-hostname:2552/user/promise-actor") val gimme = new GimmeActor { override val actor: ActorRef = actor } val future = gimme.function("give me 42, please!") 

And we started the server together with the client ... And immediately got stuck futures!


In the logs we see the following exception:


 akka.remote.MessageSerializer$SerializationException: Failed to serialize remote message [class akka.actor.ActorSelectionMessage] using serializer [class akka.remote.serialization.MessageContainerSerializer]. Caused by: java.io.NotSerializableException: scala.concurrent.impl.CallbackRunnable 

Thus, when we try to send a promise to a remote actor, we quite predictably received a promis serialization error. In fact, even if we could serialize and transmit the promise, it would only be compressed on the remote JVM, and in our JVM it would remain stuck. Thus, the transfer of promis to the actor works only with local message passing, that is, this pattern of sending promis to the actor does not scale well.


Pattern Problem Solution


Timed out


As the most obvious solution to the problem - timeless promise. Through Akka, you can do this, for example, as follows:


 //  .seconds import scala.concurrent.duration._ val system: ActorSystem = ... val timeout = 2.seconds //         system.scheduler.scheduleOnce(timeout)(p.tryFailure(new SomeTimeoutException)) 

The Promise methods that we discussed earlier are completed successfully only if the Promise not completed earlier. If these methods are called already after the completion of the promise, they throw an IllegalStateException . For cases when it is necessary to try to complete the promise, when it has probably already been completed, there are methods similar to those considered, but with the try prefix in the name. They return true if they completed the promise themselves; false if the promise was completed before calling this method.


Also, as an option, you can fake on the timeout right inside the actor:


 class AlwaysAlwaysCompletesRequest extends Actor { def receive: Receive = { case Request(str, p) => p.completeWith(someFuture()) context.scheduler.scheduleOnce(timeout)(self ! Timeout(p)) case Timeout(p) => p.tryFailure(new SomeTimeoutException) } } 

Of course, this option does not solve the problem of scalability.


ask pattern


It was possible, of course, to do otherwise. For example, use the ask-pattern, which requires the transmission of a timeout:


 import akka.pattern.ask import akka.util.Timeout case class Request(arg: String) //      def function(arg: String): Future[String] = { implicit val timeout = Timeout(2.seconds) val any: Future[Any] = actor ? Request(arg) // timeout    ? any.mapTo[String] } 

In this case, the implementation of the actor should be somewhat different: instead of completing the promise, you need to respond to the message:


 class AlwaysCompletesRequest extends Actor { def receive: Receive = { case Request(arg) => //   :       sender() ! "42" } 

However, the ease of implementation is fraught with danger.


  1. Now the futures "leaving" the actor are not type safe. The compiler will be able to catch us by the hand if we try to close the promise with a value of the wrong type. However, it will not protect us in the case of a conversion from Any .
  2. This argument may seem somewhat subjective, but I can not disagree with him. It is often considered that tell design is usually easier to understand for an actor model.
  3. ask " " , akka-remote, , "" ActorRef (, remote, ). Akka tell , ask , .

Akka Typed


, ask- , , . Akka ask- Future[Any] . , , :


 (actor ? Request(arg)).mapTo[String] 

, , String , . Akka Typed. :


 libraryDependencies += "com.typesafe.akka" %% "akka-typed" % "2.5.7" 

, :


 import akka.typed._ import akka.typed.scaladsl.Actor import akka.typed.scaladsl.AskPattern._ import akka.util.Timeout import scala.concurrent.Future import scala.concurrent.duration._ case class Request(arg: String, replyTo: ActorRef[String]) //    val typeSafeActor = Actor.immutable[Request] { (_, msg) => msg.replyTo ! "42" //    :        Actor.same } val system: ActorSystem[Request] = ActorSystem(typeSafeActor, "type-safe-actor") def function(arg: String): Future[String] = { implicit val timeout: Timeout = Timeout(2.seconds) system ? (Request(arg, _)) } 

. , , 2. , - API " ".


, .. ,


, , - , ActorSystem . :


 case class Request(id: Long) case class Response(hotel: Hotel) //    API class CallingActor(actor: ActorRef) extends Actor { def receive: Receive = { case response: String => doSomethingWithActorResponse(response) case request: Request => actor ! request //         } } //   class AlwaysCompletesRequest extends Actor { def receive: Receive = { case Request(arg) => //    sender() ! "42" } 

, . API , .


,


Akka , . , , , : 100%-, 100%- , , . , , . , , , .


, , , . , . , :


 override def receive: Receive = { case Request(id, p) => p.completeWith(doRequest(id)) p.future.onComplete(self ! Completed) context.become(running) } def doRequest(id: Long): Future[Hotel] = { val uri = Uri(baseUri).withQuery(Query("id" -> id.toString)) val eventual = Http().singleRequest(HttpRequest(Uri(uri = uri))) eventual.flatMap { response => val unmarshalled = Unmarshal(response.entity) response.code match { case StatusCodes.OK => unmarshalled.to[Hotel] case _ => Future.failed(new Exception(unmarshalled.to[String])) } } } 

:


  1. ;
  2. API flatMap , onComplete ;
  3. - Akka-HTTP.

, actor.tell() .


:



asktyped askpromise
-+++-
ask/telltellaskasktelltell
±-+±±
++++-
++-++
API+++-+

, , .


, , , , . , , :


  1. , ;
  2. , , , ;
  3. .

, .


- , !


')

Source: https://habr.com/ru/post/344692/


All Articles