In the process of supporting various projects, I found myself several times in a situation in which, due to incorrect work with Promise
there were problems in production. Moreover, the pattern of this very wrong work was always the same, but it was hidden in different guises. Moreover, the erroneous code was written by different people. In addition, I haven’t really found a mention of the problem I want to cover in any article about working with Promise
. So I assume that many people forget about the problem, about which I will tell.
Interesting to read a lot of examples of asynchronous code on Scala, with promises, futures and actors? Welcome under the cut!
For a start, a little theory about Future
as an introduction. Those familiar with this type from the standard Scala library can safely skip this part.
In Scala, Future[T]
used to represent asynchronous computing. Suppose we need to pull the value from the DBMS by key. The synchronous signature for such a query would look something like this:
trait SyncKVStore[K, V] { def get(key: K): V }
Then it could be used like this:
val store: SyncKVStore[String, String] = ... val value = store.get("1234") // value String
This approach has a drawback: the get()
method can be blocking, and in view of the possible data transmission over the network for a relatively long time. Let's try to make it non-blocking, then the treit will look like this:
import scala.concurrent.Future trait KVStore[K, V] { def get(key: K): Future[V] }
It is possible that in order to correctly rewrite the implementation we will need to use an asynchronous driver for the DBMS we use. For example, let's write the in-memory implementation on the map:
import scala.concurrent.ExecutionContext class DummyKVStore(implicit ec: ExecutionContext) extends KVStore[String, String] { // Future(...) // ExecutionContext def get(key: String): Future[String] = Future(map(key)) private val map = Map("1234" -> "42", "42 -> 13", "1a" -> "b3") }
We can use the obtained value, for example, as follows:
// - ExecutionContext import scala.concurrent.ExecutionContext.Implicits.global val store = new DummyKVStore // map ExecutionContext // store.get("1234").map { value => log.info(value) // DummyKVStore 42 }
Future has several useful asynchronous value processing methods, we will briefly describe some of them:
map(f: A => B)
- upon completion of the futures, converts a successful result using the f
functionflatMap(f: A => Future[B])
- also converts a successful result, but takes, in fact, an asynchronous functionforeach(f: A => Unit)
- upon successful completion, performs the function f
, transferring to it the result of futuresonComplete(f: Try[A] => Unit)
is the same as foreach
, but performs the function on any termination, including with an errorIt is also worth clarifying that all of these methods also accept the implicit ec: ExecutionContext
parameter, which contains information about the execution context of the futures, as the name suggests.
For more information about futures, you can read, for example, here .
So what is a Promise
? In fact, this is a typed write-once container containing futures:
val p = Promise[Int]() p.success(42) p.future // Future 42
It has many methods to complete the futures, for example:
success(t: T)
- ends the futures with the result t
failure(e: Throwable)
- faylit futures with e
complete(try: Try[T])
- ends the futures if try
- Success
; feilit if failurecompleteWith(future: Future[T])
- completes the internal futures when the future
endsThus, promises can be used to create futures.
Imagine that we want to implement our own asynchronous API on top of a ready-made asynchronous Java API on callbacks. Since the result that we want to return in the futures is available only in a callback, we cannot directly use the Future.apply()
method. This is where Promise
will help us. In this answer to SO, there would seem to be a great example of using Promise
in the real world:
def makeHTTPCall(request: Request): Future[Response] = { val p = Promise[Response]() registerOnCompleteCallback { buffer => val response = makeResponse(buffer) p.success(response) } p.future }
Well, we use this function in our new web service, for example, on Akka-HTTP. First, let's connect the dependency to SBT:
libraryDependencies += "com.typesafe.akka" %% "akka-http" % "10.0.10"
And write the service code:
import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.marshalling.GenericMarshallers.futureMarshaller import scala.concurrent.ExecutionContext.Implicits.global object WebService extends App { // Akka-HTTP implicit val system = ActorSystem() val store = new DummyKVStore val route = // GET /value/$id (get & path("value" / IntNumber)) { id => complete { for { value <- store.get(id) response <- makeHTTPCall(new RequestImpl(value)) } yield response } } Http().bindAndHandle(route, "localhost", 80) }
Note: the complete()
method from Akka-HTTP can accept Future[T]
, for this futureMarshaller
imported. He will respond to an HTTP request after the end of the futures.
We also decided to drop a task, which will send the value for some key to all emails from our database via some HTTP API. Moreover, it does this in a cycle: after the distribution is completed, all clients begin to do it again.
def allEmails: Seq[String] = ... def sendEmails(implicit ec: ExecutionContext): Future[Unit] = { Future.sequence { for { email <- allEmails } yield for { value <- store.get("42") response <- makeHTTPCall(new SendMailRequest(email, value)) } yield response }.flatMap(_ => sendEmails) // }
Put it all in production. However, in a couple of days our API consumers come to us and complain about the periodic service rollbacks by timeout. And after three days, we discover that the task has stopped sending mail! What's the matter?
In the log, we see the following spectra:
some.package.SomeException at some.package.makeResponse(...) at some.package.$anonfun$makeHTTPCall$1(...) ...
It turns out that the makeResponse()
method makeResponse()
thrown an exception. Looking at the sources of makeHTTPCall()
, you can see that in this case, the futures that it returns never end!
val p = Promise[Response] registerOnCompleteCallback(buffer => { val response = makeResponse(buffer) // p.success(response) // success() }) p.future
That is why our API fell off on a timeout, and the mailing cycle stopped working. Alas, in Scala we cannot program without thinking that any function can return an event as many want ...
So, we recall that the Try.apply()
method can intercept exceptions and return Success
with a value, or Failure
with an exception thrown. Fix the lambda body in a naive way and send a review to the code:
import scala.util._ Try(makeResponse(buffer)) match { case Success(r) => p.success(r) case Failure(e) => p.failure(e) }
However, the review tells us that the promise has a complete()
method, which itself does the same thing that we wrote with our hands:
p.complete(Try(makeResponse(buffer))
So, what we learned about the Promise
:
Promise
declared at the beginning of the method, and its futures are returned at the end, this does not mean that this futures will ever end.Promise
as a resource that must always be closed. However, promis is usually declared and closed in different threads, so using it as a resource using any standard language constructs ( try-finally
), or even libraries ( scala-arm
) is problematic.Perhaps someone will say that this is a far-fetched example and in real life no one forgets to close promises? Well, for such skeptics, I have the answer in the form of several real bugs / PRs in Akka related to inter-promises that are uncomplicated in certain situations:
In addition, it is not always so simple and obvious, as in this example.
Familiar with the actors can skip this part.
We will need some knowledge of the Akka actors later on in this article. Let's connect the akka-actor module to our project, an example for SBT:
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.5.7"
Actor in Akka is an object with some behavior, asynchronously receiving messages. By default, behavior is defined in the receive
method:
import akka.actor._ import akka.actor.Actor.Receive val log: Logger = ourLogObject() case object HelloRequest class HelloActor extends Actor { // Receive -- def receive: Receive = { // HelloRequest, "hello!" case HelloRequest => log.info("hello!") } } // val system = ActorSystem() // val actor: ActorRef = system.actorOf(Props(new HelloActor)) // "hello!" actor ! HelloRequest
After creation, the actor is available not directly, but through a proxy called ActorRef
. Through this proxy, you can asynchronously send messages using the method !
(alias for tell
), and these messages will be processed by the method that determines the behavior of the actor. Messages must be serializable, so often a case object
created for messages (with no message parameters) or a case class
. The actor can process at most one message at a time, so it can be used as a synchronization primitive as well.
There is one more important point: the actor can change its function of behavior, that is, in fact, the processing of messages. To do this, you need to call the context.become(newReceive)
method in the actor, where newReceive
is a Receive
type parameter. After that, starting with the next message, the processing by the function newReceive
will begin instead of the default receive
.
So let's move on to the next example.
Suppose we need to write a client for some service. For example, for booking. Suppose we want to be able to get hotel information by id.
case class Hotel(id: Long, name: String, country: String) // trait BookingClient { def getHotel(id: Long): Future[Hotel] }
Now you need to define a method that will access the booking API and process the response. For this we use the asynchronous HTTP client of the Akka-HTTP library. Let's connect it by dependency:
libraryDependencies ++= "com.typesafe.akka" %% "akka-http" % "10.0.10"
They want to run our method with a fairly large RPS for a short time, and the response time is not very important. The Akka-HTTP client has a feature: it does not allow to run more in parallel than akka.http.host-connection-pool.max-connections requests . We will use an extremely simple solution: we will make it so that all requests go through the actor, that is, into one stream (the actual solution was a bit more complicated, but it does not matter for our example). Since we want to return the futures, we will create a promise and pass it to the actor, and then in the actor we compile it.
// implicit system materializer Akka-HTTP; ec Future.foreach() class HttpBookingClient(baseUri: String)(implicit system: ActorSystem, materializer: ActorMaterializer, ec: ExecutionContext) { override def getHotel(id: Long): Future[Hotel] = { val p = Promise[Hotel]() actor ! Request(id, p) p.future } private val actor = system.actorOf(Props(new ClientActor)) private case class Request(id: Long, p: Promise[Hotel]) private case object Completed private class ClientActor extends Actor { // , override def receive: Receive = { case Request(id, p) => val uri = Uri(baseUri).withQuery(Query("id" -> id.toString)) val eventual = Http().singleRequest(HttpRequest(uri = uri)) // API eventual.foreach { response => p.completeWith { val unmarshalled = Unmarshal(response.entity) // 200, response.status match { case StatusCodes.OK => // Unmarshaller[Hotel], , akka-http-spray-json // unmarshalled.to[Hotel] case _ => unmarshalled.to[String].flatMap(error => Future.failed(new Exception(error))) } }) } p.future.onComplete(_ => self ! Completed) // running context.become(running) } // , // private def running: Receive = { case request: Request => // , ; self ! request case Completed => // context.become(receive) } } }
And again, after putting it out, everything went well at first, but then we received a bug report with the heading "The getHotel()
method returns incomplete futures". Why did this happen? It looks like we have foreseen everything, used the completeWith()
method on the whole body of the lambda ... Nevertheless, under certain conditions, the futures still stick.
The thing is that the lambda passed to the foreach()
method will start only when the eventual eventual
successfully complete. Thus, if this futures spilled (for example, the grid fell off), the promis will never fail to complete!
It can be assumed that the fix is relatively trivial: instead of foreach()
you need to use onComplete()
, and in the lambda passed to it, handle the error. Like that:
eventual.onComplete { case Success(response) => // , foreach... case Failure(e) => p.failure(e) }
This will solve the problem of sticking futures specifically because of the eventual eventual
, but it does not solve all possible problems with sticking futures transmitted in this way.
For the sake of simplicity, we will implement a simpler and at the same time more general example of the transfer of promis to the actor to complete the futures:
case class Request(arg: String, p: Promise[String]) trait GimmeActor { val actor: ActorRef def function(arg: String): Future[String] = { val p = Promise[String]() actor ! Request(arg, p) p.future } }
By the way, the funcion()
body type construction is often found, for example, in Akka source code and other libraries. In the same Akka you can find several dozen uses of Promise
, written according to this pattern:
Promise
actor.tell()
call)future
created Promise
.A couple of examples for clarity:
p
to the actor takes place.There are at least a couple of problems with this pattern.
class DoesntEvenKnowAboutRequest extends Actor { def receive: Receive = { case PoisonPill => } } class HandlesRequestWrongWay extends Actor { def receive: Receive = { case Request(arg, p) => } }
In both cases, the promise will obviously not be completed.
On the one hand, it can be said that these examples are too synthetic. On the other hand, the compiler does not protect us from such errors. In addition, this problem must be remembered not only for actors, but also for the transfer of promises to ordinary asynchronous functions.
class AlwaysCompletesRequest extends Actor { def receive: Receive = { case Request(arg, p) => p.success("42") }
We are fine, our services using this actor work fine. However, we decided to start the service on one server at the beginning.
But now, the company is growing, the number of users too, and with it the number of requests to our actor. At peak loads, the actor stopped keeping up with the flow of messages in the required time and we decided to do horizontal scaling: to run AlwaysCompletesRequest
on different nodes in the cluster. To organize a cluster, you need to use akka-cluster, however, for simplicity, we will not organize a cluster in the article, but simply turn to one remote actor AlwaysCompletesRequest
.
We need to create an ActorSystem
with the support of akka-remote on both JVMs: accessing the actor and hosting it. To do this, add the following configuration to the application.conf
both services:
akka { actor { provider = remote } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "some-hostname" port = 2552 # ; - } } }
You also need to add a dependence on akka-remote for both services, an example for SBT:
libraryDependencies += "com.typesafe.akka" %% "akka-remote" % "2.5.7"
Now create an actor on the server:
val system = ActorSystem("server") system.actorOf(Props(new AlwaysCompletesRequest), "always-complete-actor")
Then get it on the client:
val system = ActorSystem("client") val actor = system.actorSelection("akka.tcp://server@some-hostname:2552/user/promise-actor") val gimme = new GimmeActor { override val actor: ActorRef = actor } val future = gimme.function("give me 42, please!")
And we started the server together with the client ... And immediately got stuck futures!
In the logs we see the following exception:
akka.remote.MessageSerializer$SerializationException: Failed to serialize remote message [class akka.actor.ActorSelectionMessage] using serializer [class akka.remote.serialization.MessageContainerSerializer]. Caused by: java.io.NotSerializableException: scala.concurrent.impl.CallbackRunnable
Thus, when we try to send a promise to a remote actor, we quite predictably received a promis serialization error. In fact, even if we could serialize and transmit the promise, it would only be compressed on the remote JVM, and in our JVM it would remain stuck. Thus, the transfer of promis to the actor works only with local message passing, that is, this pattern of sending promis to the actor does not scale well.
As the most obvious solution to the problem - timeless promise. Through Akka, you can do this, for example, as follows:
// .seconds import scala.concurrent.duration._ val system: ActorSystem = ... val timeout = 2.seconds // system.scheduler.scheduleOnce(timeout)(p.tryFailure(new SomeTimeoutException))
The Promise
methods that we discussed earlier are completed successfully only if the Promise
not completed earlier. If these methods are called already after the completion of the promise, they throw an IllegalStateException
. For cases when it is necessary to try to complete the promise, when it has probably already been completed, there are methods similar to those considered, but with the try
prefix in the name. They return true
if they completed the promise themselves; false
if the promise was completed before calling this method.
Also, as an option, you can fake on the timeout right inside the actor:
class AlwaysAlwaysCompletesRequest extends Actor { def receive: Receive = { case Request(str, p) => p.completeWith(someFuture()) context.scheduler.scheduleOnce(timeout)(self ! Timeout(p)) case Timeout(p) => p.tryFailure(new SomeTimeoutException) } }
Of course, this option does not solve the problem of scalability.
It was possible, of course, to do otherwise. For example, use the ask-pattern, which requires the transmission of a timeout:
import akka.pattern.ask import akka.util.Timeout case class Request(arg: String) // def function(arg: String): Future[String] = { implicit val timeout = Timeout(2.seconds) val any: Future[Any] = actor ? Request(arg) // timeout ? any.mapTo[String] }
In this case, the implementation of the actor should be somewhat different: instead of completing the promise, you need to respond to the message:
class AlwaysCompletesRequest extends Actor { def receive: Receive = { case Request(arg) => // : sender() ! "42" }
However, the ease of implementation is fraught with danger.
Any
.tell
design is usually easier to understand for an actor model.ask
" " , akka-remote, , "" ActorRef
(, remote, ). Akka tell
, ask
, ., ask- , , . Akka ask- Future[Any]
. , , :
(actor ? Request(arg)).mapTo[String]
, , String
, . Akka Typed. :
libraryDependencies += "com.typesafe.akka" %% "akka-typed" % "2.5.7"
, :
import akka.typed._ import akka.typed.scaladsl.Actor import akka.typed.scaladsl.AskPattern._ import akka.util.Timeout import scala.concurrent.Future import scala.concurrent.duration._ case class Request(arg: String, replyTo: ActorRef[String]) // val typeSafeActor = Actor.immutable[Request] { (_, msg) => msg.replyTo ! "42" // : Actor.same } val system: ActorSystem[Request] = ActorSystem(typeSafeActor, "type-safe-actor") def function(arg: String): Future[String] = { implicit val timeout: Timeout = Timeout(2.seconds) system ? (Request(arg, _)) }
. , , 2. , - API " ".
, , - , ActorSystem
. :
case class Request(id: Long) case class Response(hotel: Hotel) // API class CallingActor(actor: ActorRef) extends Actor { def receive: Receive = { case response: String => doSomethingWithActorResponse(response) case request: Request => actor ! request // } } // class AlwaysCompletesRequest extends Actor { def receive: Receive = { case Request(arg) => // sender() ! "42" }
, . API , .
Akka , . , , , : 100%-, 100%- , , . , , . , , , .
, , , . , . , :
override def receive: Receive = { case Request(id, p) => p.completeWith(doRequest(id)) p.future.onComplete(self ! Completed) context.become(running) } def doRequest(id: Long): Future[Hotel] = { val uri = Uri(baseUri).withQuery(Query("id" -> id.toString)) val eventual = Http().singleRequest(HttpRequest(Uri(uri = uri))) eventual.flatMap { response => val unmarshalled = Unmarshal(response.entity) response.code match { case StatusCodes.OK => unmarshalled.to[Hotel] case _ => Future.failed(new Exception(unmarshalled.to[String])) } } }
:
flatMap
, onComplete
;, actor.tell()
.
:
+
— ;-
— .ask | typed ask | promise | |||
---|---|---|---|---|---|
- | + | + | + | - | |
ask/tell | tell | ask | ask | tell | tell |
± | - | + | ± | ± | |
+ | + | + | + | - | |
+ | + | - | + | + | |
API | + | + | + | - | + |
, , .
, , , , . , , :
, .
- , !
Source: https://habr.com/ru/post/344692/
All Articles