📜 ⬆️ ⬇️

Akka, actors and reactive programming

Hello, dear readers.

Today we wanted to talk to you on the topic “everything new is well forgotten old” and recall the actors described by Karl Hewitt in the early 70s. And the thing is that recently this book came out:


')
It is quite voluminous - the translation should make more than 500 pages.

Despite the underlined elitism of the book (Akka and Scala), its author Von Vernon (the largest DDD specialist) is confident that the architectural patterns described in this work are fully realizable on .NET and C #, which is described in the application. We also place a translation of the article under the cut, the author of which allows the transfer of the actor paradigm to the Java language. Since the book's rating on Amazon is consistently high, and the topic is universal, please share your views on both it and actor architecture in principle.

The first article in this series made an overview of Akka . Now we will delve deeper into the scope of Akka actors, armed with the akka-actor module, which lays the foundation for all the other Akka modules.

In our opinion, one can learn how to program without even having the practice of reading / writing code. Here we will step by step develop a small actor library: a pubSub event bus operating on a “publish-subscribe” principle. Of course, Akka comes with a ready-to-work local and global solution of this kind, so here we just tinker with a well-known example. We will work in the Scala language, simply because it is much more convenient to write Akka-like code on it, but exactly the same results can be achieved in Java.

Actor model

In the actor model — which was invented in 1973 by Karl Hewitt and others — the actors are “fundamental units of computation that implement processing, storage, and communication.” Ok, let's get this in order.

The concept of “fundamental unit of computation” means that when we write a program in accordance with the actor model, our design and implementation work is built around actors. In a terrific interview given to Eric Meyer, Karl Hewitt explains that "actors are everywhere" and also that "there are no single actors, they exist in systems." We have already summarized this thought: when using the actor model, all our code will consist of actors.

What does an actor look like? What is, finally, “processing”, “storage” and “communication”? In essence, communication is asynchronous messaging , storing means that actors can have a state , and processing means that actors can deal with messages. Processing is also referred to as "behavior . " It doesn't sound too hard, right? So let's take the next step and look at the Akka actors.

Akka actor device

As is clear from the following picture, the Akka actor consists of several interacting components. ActorRef is the logical address of the actor, allowing you to send messages to the actor asynchronously according to the “send and forget” principle. The dispatcher — in this case, by default, one dispatcher is assigned to each actor’s system — is responsible for putting messages into the queue leading to the actor’s mailbox, and also orders that mailbox to remove one or more messages from the queue, but only one at a time and transfer them to the actor for processing. Last but not least, an actor — usually the only API that we have to implement — encapsulates state and behavior.



As will be shown below, Akka does not allow direct access to the actor and therefore ensures that the only way to interact with the actor is asynchronous messages. Cannot call method in actor.
In addition, it should be noted that sending a message to an actor and processing this message by an actor are two separate operations that most likely occur in different threads. Of course, Akka provides the necessary synchronization to ensure that any state changes are visible to all threads.

Accordingly, Akka, as it permits, allows us to program the illusion of single-threadedness, and we can not use any synchronization primitives like volatile or synchronized in the actor code - moreover, one should not do this.

Actor implementation

Quite a word, go to the code! In Akka, an actor is a class to which the Actor type is mixed:

class MyActor extends Actor { override def receive = ??? } 

The receive method returns the so-called initial behavior of the actor. It is simply a partially computable function used by Akka to process messages sent to the actor. Since the behavior is equal to PartialFunction [Any, Unit] , it is currently not possible to define such actors who accept messages of only a given type. Akka already has an experimental aka-typed module that provides type safety on this platform, but it is still being finalized. By the way, the behavior of the actor can change, which is why the return value of the receive method is called in the original behavior.

Ok, let's implement the basic actor for our PubSub library:

 class PubSubMediator extends Actor { override def receive = Actor.emptyBehavior } 

So far, we do not need PubSubMediator to process any messages, so we use the usual partially computable function Actor.emptyBehavior, for which no value is defined.

Actor systems and actor creation

As mentioned above, “there are no single actors, they exist in systems”. In Akka, the system of actors is an interrelated ensemble, whose members are organized hierarchically. Thus, each actor has its own parent actor, as shown in the following picture.



When creating an actor system, Akka - internally using many so-called "system actors" - creates three actors: this is the "root guardian", located at the root of the actor's hierarchy, and the system and user guards. The user guard - often simply referred to as “guard” - is the parent element for all the top-level actors we create (in this context, we mean the “highest level to which we have access”).

Suppose, but how to create a system of actors? You just need to call the factory method provided by the lone ActorSystem object :

 val system = ActorSystem("pub-sub-mediator-spec-system") 

And why do we even create ActorSystem ? Why not just create actors? The latter is impossible, because when you call the actor constructor directly, the system will throw an exception. Instead, we have to use the factory method provided — you guessed it — with the ActorSystem to create the top-level actor:

 system.actorOf(Props(new PubSubMediator), "pub-sub-mediator") 

Of course, actorOf does not return an Actor instance, but ActorRef . So Akka does not allow us to gain access to an Actor instance, which, in turn, guarantees: the exchange of information with the actor is possible only through asynchronous messages. The name we specify must be unique among the siblings of this actor, otherwise an exception will be thrown. If we do not specify a name, Akka will create it for us, since each actor must have a name.

And what kind of thing Props ? This is just a configuration object for the actor. It accepts a constructor as a parameter passed by name (that is, lazily) and may contain other important information — for example, about routing or deployment.

When it comes to remote communication, it is important to bear in mind that Props can be serialized, so it’s already a practice to add a Props factory to the actor’s companion object. It is also convenient to set a constant corresponding to the name of the actor.

Knowing all this, let's add PubSubMediator and also create a test for it with ScalaTest and Akka Testkit, another Akka module that simplifies testing Akka actors:

 object PubSubMediator { final val Name = "pub-sub-mediator" def props: Props = Props(new PubSubMediator) } class PubSubMediator extends Actor { override def receive = Actor.emptyBehavior } class PubSubMediatorSpec extends WordSpec with Matchers with BeforeAndAfterAll { implicit val system = ActorSystem("pub-sub-mediator-spec-system") "A PubSubMediator" should { "be suited for getting started" in { EventFilter.debug(occurrences = 1, pattern = s"started.*${classOf[PubSubMediator].getName}").intercept { system.actorOf(PubSubMediator.props) } } } override protected def afterAll() = { Await.ready(system.terminate(), Duration.Inf) super.afterAll() } } 

As you can see, we create the ActorSystem and the PubSubMediator actor in the PubSubMediatorSpec . The test itself is a bit contrived, because our PubSubMediator is still quite raw. It uses life cycle debugging and expects logging of a debugging message of the “started ... PubSubMediator ...” type . The full code of its current version is located at GitHub under the label step-01 .

Communication

So, having learned to create actors, let's talk about communication, which - as mentioned above - is based on asynchronous messages and is closely related to two other properties of the actor: behavior (that is, the ability to process messages) and state.

To send a message to an actor, you need to know his address, that is, ActorRef :

 mediator ! GetSubscribers("topic") 

As you can see, there is an operator in ActorRef ! - the so-called “bang”, which sends a given message to the appropriate actor. As soon as the message is delivered, the operation is completed, and the sending code continues. It is understood that there is no return value (except for Unit ), therefore, messages really go off on the principle of "sent and forgotten."

Let it be simple, we often need a response. Due to the fact that the operator ! implicitly accepts the sender as ActorRef , it can be done without difficulty:

 override def receive = { case Subscribe(topic) => //     sender() ! Subscribed } 

In this example, the behavior of the recipient actor processes a specific message — the Subscribe command — and sends the message — the Subscribed event — back to the sender. Then the sender method is used to access the sender of the message that is being processed.

Given all this, let's further improve PubSubMediator and the corresponding test.
First, let's add a message protocol — the set of all messages related to PubSubMediator — to the accompanying object:

 object PubSubMediator { case class Publish(topic: String, message: Any) case class Published(publish: Publish) case class Subscribe(topic: String, subscriber: ActorRef) case class Subscribed(subscribe: Subscribe) case class AlreadySubscribed(subscribe: Subscribe) case class Unsubscribe(topic: String, subscriber: ActorRef) case class Unsubscribed(unsubscribe: Unsubscribe) case class NotSubscribed(unsubscribe: Unsubscribe) case class GetSubscribers(topic: String) final val Name = "pub-sub-mediator" def props: Props = Props(new PubSubMediator) } 

Next, let's implement the behavior that has so far remained unfilled:

 class PubSubMediator extends Actor { import PubSubMediator._ private var subscribers = Map.empty[String, Set[ActorRef]].withDefaultValue(Set.empty) override def receive = { case publish @ Publish(topic, message) => subscribers(topic).foreach(_ ! message) sender() ! Published(publish) case subscribe @ Subscribe(topic, subscriber) if subscribers(topic).contains(subscriber) => sender() ! AlreadySubscribed(subscribe) case subscribe @ Subscribe(topic, subscriber) => subscribers += topic -> (subscribers(topic) + subscriber) sender() ! Subscribed(subscribe) case unsubscribe @ Unsubscribe(topic, subscriber) if !subscribers(topic).contains(subscriber) => sender() ! NotSubscribed(unsubscribe) case unsubscribe @ Unsubscribe(topic, subscriber) => subscribers += topic -> (subscribers(topic) - subscriber) sender() ! Unsubscribed(unsubscribe) case GetSubscribers(topic) => sender() ! subscribers(topic) } } 

As you can see, the behavior processes all commands — for example, Publish or Subscribe — and always sends an affirmative or negative response to the sender. Whether a team is valid and giving a positive result — for example, Subscribed — depends both on the team and on the state represented as a private variable subscribers field.

As mentioned above, only one message is processed at the same time, and Akka ensures that state changes remain visible when processing the next message, so you do not need to manually synchronize all access to subscribers. Competitive without problems!

Finally, let's look at a fragment of the extended test:

 val subscribe01 = Subscribe(topic01, subscriber01.ref) mediator ! subscribe01 sender.expectMsg(Subscribed(subscribe01)) val subscribe02 = Subscribe(topic01, subscriber02.ref) mediator ! subscribe02 sender.expectMsg(Subscribed(subscribe02)) val subscribe03 = Subscribe(topic02, subscriber03.ref) mediator ! subscribe03 sender.expectMsg(Subscribed(subscribe03)) 

As you can see, we send Subscribe messages to the intermediary with the help of the operator ! and expect to receive relevant feedback. As above, the entire project code is currently located at GitHub under the label step-02 .

Life cycle

So far, we have neglected one important aspect of the actors: their existence, of course - that is, they are completed, and the actor can be completed at any time.

Having access to ActorRef , we do not know whether the corresponding actor is “alive”. In particular, we will not receive exceptions if we send messages to the completed actor. In such a case, ActorRef remains valid, but Akka redirects, and for messages sent to dead mailboxes, non-guaranteed delivery is in effect. Thus, these messages are logged, which is useful for testing, but this method is by no means suitable for implementing something like a retransmission or even guaranteed delivery.

But sometimes we really need to know whether the actor is still alive or not. In this case, we need the ability to get rid of completed subscribers, because otherwise PubSubMediator sends unnecessary messages and may even sooner or later use up all the memory.

For all these reasons, Akka provides the ability to track the life cycle of actors. Since we can only observe the completion of the actor, this mechanism is called “ dead watch ” (death watch). To track the actor, we simply call the watch method provided by ActorContext , available in Actor via context :

 context.watch(subscriber) 

Then Akka will send the message Terminated to the observing actor after the observing actor is completed. This message is guaranteed to be the last received from the actor, even with a remote connection.

Well, we finish PubSubMediator :

 class PubSubMediator extends Actor { import PubSubMediator._ ... override def receive = { ... case subscribe @ Subscribe(topic, subscriber) => subscribers += topic -> (subscribers(topic) + subscriber) context.watch(subscriber) sender() ! Subscribed(subscribe) ... case Terminated(subscriber) => subscribers = subscribers.map { case (topic, ss) => topic -> (ss - subscriber) } } } 

As you can see, we track all subscribers by processing the Subscribe valid command and deleting each completed subscriber when dealing with the corresponding Terminated message. Again, the complete actual code for this example is on GitHub under the label step-03 .

Conclusion

This concludes a preliminary acquaintance with the actors Akka. So, we looked at the most important aspects of the actor model - communication, behavior, and state, and also talked about the actor systems. We also discussed the implementation of these concepts with the help of Akka and talked about the dead watch.

Of course, I had to omit a lot of interesting and important material: the creation of subsidiary actors, tracking (supervision), etc. We refer you to interesting additional resources, such as excellent Akka documentation .

Source: https://habr.com/ru/post/266103/


All Articles