📜 ⬆️ ⬇️

Jet Messenger, or CQRS and ES with Akka and Scala

Recently, we often hear about reactive programming and see various basvords: message-driven architecture, event-sourcing, CQRS. Unfortunately, on Habré they write quite a bit about it, so I decided to correct the situation and share my knowledge with everyone.

In this article we will learn about the main features of reactive applications, consider how the CQRS and EventSourcing patterns will help us in their creation, and in order not to be bored, we will step by step create your messenger with a websocket and actors that matches all canons of reactive programming. To implement all this stuff, we will use the wonderful Scala language along with the equally excellent Akk library that implements the model of actors. Also, we will use the Play Framework to write the web part of our application. So let's get started.

The article is intended for those who are already familiar with Scala and heard about the model of actors. All others are also invited to read, the principles of reactive programming can be applied regardless of the language and framework.

What is reactive programming


The idea of ​​reactive programming is described in the reactive manifest www.reactivemanifesto.org . Translation of its first version was already on Habré, and the second version is slightly different from the first. Let's look at a brief clipping from the second version. A reactive manifest states that reactive applications have several important properties:
')

Responsiveness


The application responds as quickly as possible. Responsiveness is the basis of usability and utility, for the simple reason that long interface delays do not add a desire to use it, and also responsiveness means that problems can be quickly detected and effectively solved. Responsive systems focus on delivering fast and consistent response times, using suitable upper time boundaries to ensure consistent quality of service. This constant and predictable behavior, in turn, simplifies error handling, enhances user confidence, and encourages them to further interact.

fault tolerance


The application remains responsive when a failure occurs. This applies not only to high-availability, mission-critical systems — any failure-unstable system will not be responsive in the event of a failure. Stability is achieved through replication, localization, isolation and delegation. Failures do not go beyond the module, and by isolating the modules from each other, you can be sure that individual parts of the application can fail and recover from a failure, while not causing the entire application to fall. Recovery of each failed module is delegated to another, external module, and high availability is achieved through replication. Module clients do not have a headache with module failure handling.

Elasticity


The app remains responsive under various loads. Reactive applications can respond to changes in load by increasing or decreasing the resources intended for its processing. This implies an architecture that does not have locking points or central bottlenecks, which is reflected in the ability to sharding and replication of modules, and the further distribution of the load between them. The result of this is scalability with the help of cheap commonly used hardware (hello, google!).

Message Orientation


Reactive applications focus on asynchronous message passing to set boundaries between modules that provide weak connectivity, isolation, location transparency, and provides means for delegating errors as messages. Introducing explicit message passing provides opportunities for load balancing, elasticity, and flow control by generating and monitoring message queues, and reducing bandwidth when needed. Location transparency allows the same error handling logic to be used on the cluster as well as on the same node. Non-blocking communications allow message recipients to consume resources only when they are active, which leads to a smaller overhead monitor when an application is running.



CQRS


CQRS stands for Command Query Responsibility Segregation (division of responsibility into teams and requests). This approach to building the application architecture, in contrast to the widely used CRUD (Create Retrieve Update Delete), implies that it is possible to use different models for updating and reading information. A natural question arises, why do we need such perversions? The fact is that based on the fact that the reading model and the writing model are separated, we can optimize them for these tasks. For example, if data is better suited for reading tasks, then no one bothers us to do this. It is more convenient to read if the data in the graph database - please. I want to store everything in the Key-Value store - for God's sake. Moreover, if you want to add new features to the read model, then all you need to do after adding them is to regenerate the model (it’s worth making a reservation that if we have events for many gigabytes, then this process will not be so fast, but we can always make snapshot, which will significantly increase the speed of recovery).

In principle, over the normalization of the Read-model, you can not bother at all, for the same reason. Using CQRS to optimize read operations in our application, we ensure the responsiveness of our application. What do we still have in order for our application to be truly reactive? Correct, elasticity and resiliency. We implement these features using the Event Sourcing pattern.



Event sourcing


The meaning of ES is that we do not store the current state of our data model, but the entire history of changes that change the state of our application (in fact, not all changes, but only those that matter to us). To get the current state, we simply summarize the changes from all existing events. What do we mean by an event, and how does an event differ from a team? The team means that someone wants from us, besides, it can be ignored. An event is something that happened, an unchangeable fact.

The advantage of this approach is that we never delete or change anything. As you may have guessed, this gives us ample opportunities to scale our application, and as a database we can use well-proven NoSQL solutions such as Cassandra or HBase. EventSourcing gives us resiliency and elasticity.

Stop talking, show us the code.


So, as mentioned earlier, we will implement the whole thing using the Typesafe stack .

The architecture of our application will look like this:



The user has the ability to read and send messages. Sending and receiving messages occurs through the web socket, which the UserConnection actor has access to. This actor sends messages to the RoomWriter actor, which, in addition to writing messages to the log, is engaged in kicking the RoomReader actor, which reads messages from the log and sends them back to the UserConnection actor. In addition to all this, we have the Receptionist actor, which handles the issuance of names and ensures that the application does not have users with two identical names. C architecture more or less figured out, now begin to write code.

RoomWriter


The very first we implement the actor that records incoming messages in the log.

RoomWriter Class Code
class RoomWriter(roomLogId: String) extends PersistentActor { import RoomWriter._ override def persistenceId = roomLogId val listeners = mutable.Set.empty[ActorRef] def receiveRecover = Actor.emptyBehavior def receiveCommand = { case msg: Message => persistAsync(msg) { _ => listeners foreach (_ ! Update) } case Listen(ref) => listeners add context.watch(ref) case Terminated(ref) => listeners remove ref } } 

What is it written here? As you might guess, we declared the RoomWriter class, which has three parts:

Let's look at the receiveCommand method in more detail. This method handles three different messages:

The rule of good tone is the declaration of all processed messages and the factory method for creating an actor in a companion object:

RoomWriter companion object code
 object RoomWriter { case class Listen(ref: ActorRef) case class Message(author: String, content: String, time: Long) case object Update def props(roomId: String) = Props(new RoomWriter(roomId)) } 

With RoomWriter, we figured out, now is the time to look at the RoomReader actor, which receives updates from the log, and sends them through the hierarchy above.

RoomReader


RoomReader class
 class RoomReader(roomLogId: String, roomWriter: ActorRef, userConnection: ActorRef) extends PersistentView { import RoomWriter._ roomWriter ! Listen(self) override def persistenceId = roomLogId override def viewId = roomLogId + "-view" def receive = { case msg @ Message(_, _,sendingTime) if currentTime - sendingTime < tenMinutes => userConnection ! msg case msg: Message => case Update => self ! akka.persistence.Update() } } 

RoomReader depends on the log identifier, depending on which it will receive its updates. In our case, this identifier will match with the identifier of the actor RoomWriter, which will mean that everything that RoomWriter writes to the log will come to RoomReader. Consider how message processing takes place:

As in the previous case, our companion object:
RoomReader Companion Object Code
 object RoomReader { def currentTime = System.currentTimeMillis() val tenMinutes = Duration(10, MINUTES).toMillis def props(roomLogId: String, roomWriter: ActorRef, userConnection: ActorRef) = Props( new RoomReader(roomLogId, roomWriter, userConnection) ) } 


We proceed to the most interesting part, the UserConnection actor, which is responsible for processing messages from the web socket.

Userconnection


Class code userconnection
 class UserConnection(receptionist: ActorRef, roomWriter: ActorRef, out: ActorRef, roomLogId: String) extends Actor { import actors.UserConnection._ def receive = waitingForUsername def waitingForUsername: Receive = { case WebSocketInMsg(RegisterMeWithName, username) => receptionist ! UsernameRequest(username) case Ack(username) => context become readyToChat(username) context actorOf RoomReader.props(roomLogId, roomWriter, self) out ! WebSocketOutMsg(currentTime, "system", "welcome") case NAck => out ! WebSocketOutMsg(currentTime, "system", "taken") } def readyToChat(username: String): Receive = { case WebSocketInMsg(SendMessage, message) => roomWriter ! Message(username, message, currentMillis) case Message(author, content, time) => out ! WebSocketOutMsg(formatTime(time), author, content) } } 

This actor has one feature that distinguishes it from others: it can change its behavior and state. Initially, he is in a state of waiting for a username. In this state, it can accept client requests for a name, and forward them to the actor responsible for issuing names. Upon successful receipt of the name, the actor enters the state of readiness for the chat, and begins to send messages between parts of the system.

This companion turned out to be quite large this time:

Companion object code for the UserConnection class
 object UserConnection { def props(receptionist: ActorRef, roomWriter: ActorRef, out: ActorRef, roomLogId: String) = Props( new UserConnection(receptionist, roomWriter, out, roomLogId) ) case class WebSocketInMsg(messageType: Int, messageText: String) case class WebSocketOutMsg(time: String, from: String, messageText: String) case class UsernameRequest(name: String) case class Ack(username: String) case object NAck val RegisterMeWithName = 0 val SendMessage = 1 val formatter = DateTimeFormat.forPattern("HH:mm:ss").withLocale(Locale.US) def currentTime = DateTime.now().toString(formatter) def currentMillis = System.currentTimeMillis() def formatTime(timeStamp: Long) = new DateTime(timeStamp).toString(formatter) } 


The last actor to be given our attention is the Receptionist.

Receptionist


Receptionist class code
 class Receptionist extends Actor { var takenNames = mutable.Map("system" -> self) def receive = { case UsernameRequest(username) => if (takenNames contains username) { sender() ! NAck } else { takenNames += (username -> context.watch(sender())) sender() ! Ack(username) } case Terminated(ref) => takenNames collectFirst { case (name, actor) if actor == ref => name } foreach takenNames.remove } } 

Its task is to issue names to users: it contains an associative array that maps names to actorRefs. Just like in RoomWriter, we follow the life cycle of the actors to whom we gave out the names, and in case of their death, remove their names from the list of registered names.

Do not forget about the companion object, we place the factory method there to create the actor:

Companion object code of the Receptionist class
 object Receptionist { def props() = Props[Receptionist] } 

Controller


At the moment we are finished with all the actors that we had plans for implementation. Now let's look at how we can connect a web socket and an actor. For this, we will use the tools that the play framework can offer us. We implement the controller of our application as follows:

Controller code
 object Application extends Controller { val logId = "akka-is-awesome" val roomWriter = Akka.system.actorOf(RoomWriter.props(logId), "writer") val receptionist = Akka.system.actorOf(Receptionist.props(), "receptionist") def index = Action { implicit request => Ok(views.html.chat()) } implicit val InMsgFormat = Json.format[WebSocketInMsg] implicit val InMsgFrameFormatter = FrameFormatter.jsonFrame[WebSocketInMsg] implicit val OutMsgFormat = Json.format[WebSocketOutMsg] implicit val OutMsgFrameFormatter = FrameFormatter.jsonFrame[WebSocketOutMsg] def socket = WebSocket.acceptWithActor[WebSocketInMsg, WebSocketOutMsg] { request => out => UserConnection.props(receptionist, roomWriter, out, logId) } } 

First, we create two actors: roomWriter and receptionist. They are dependencies for the UserConnection actor. Next, we describe how to format messages for transmission through the webcast. Finally, we describe how we handle incoming connections to the web socket. The helper built into the Play Framework makes it incredibly easy to do.

It's time to create a web interface. For the layout we will use the twitter bootstrap framework, and angular.js - to implement the business logic on the client.

Client part of the code
 angular.module('chatApp', []) .controller('ChatCtrl', ['$scope', function($scope) { var wsUri = "ws://"+window.location.host+"/ws"; var websocket = new WebSocket(wsUri); $scope.name = ""; $scope.messages = []; $scope.registered = false; $scope.taken = false; $scope.sendMessage = function () { websocket.send(angular.toJson({ "messageType": 1, "messageText":$scope.messageText })); $scope.messageText = ""; }; $scope.sendName = function () { if (!$scope.registered) { websocket.send(angular.toJson({ "messageType": 0, "messageText": $scope.name })); } }; websocket.onmessage = function (e) { var msg = angular.fromJson(e.data); console.log(e.data); if (!$scope.registered) { switch (msg.from) { case "system": handleSystemMsg(msg.messageText); break; } } else { $scope.messages.push(msg); $scope.$apply(); var chatWindow = $("#chat-window"); chatWindow.scrollTop(chatWindow[0].scrollHeight); } }; function handleSystemMsg(msg) { switch (msg) { case "welcome": $scope.registered = true; break; case "taken": $scope.taken = true; break; } } }]); 

What our html page will look like:

Html code of the application
 <!DOCTYPE html> <html ng-app="chatApp"> <head> <meta charset="utf-8"> <meta http-equiv="X-UA-Compatible" content="IE=edge"> <meta name="viewport" content="width=device-width, initial-scale=1"> <meta name="description" content=""> <meta name="author" content=""> <title>Akka WebSocket Chat</title> <!-- Bootstrap core CSS --> <link href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.1/css/bootstrap.min.css" rel="stylesheet"> <script src="https://ajax.googleapis.com/ajax/libs/angularjs/1.3.5/angular.min.js"></script> <!-- Custom styles for this template --> <link href="@routes.Assets.at("stylesheets/main.css")" rel="stylesheet"> <script src="@routes.Assets.at("javascripts/chatApp.js")"></script> </head> <body> <div ng-controller="ChatCtrl"> <nav class="navbar navbar-inverse navbar-fixed-top" role="navigation"> <div class="container"> <div class="navbar-header"> <a class="navbar-brand" href="#">Reactive Messenger</a> </div> <form class="navbar-form navbar-left" ng-submit="sendName()" ng-show="!registered"> <div class="form-group"> <input type="text" class="form-control" ng-model="name" placeholder="Username" required> </div> <button type="submit" class="btn btn-default">Set name</button> </form> </div> </nav> <div class="container" > <div class="chat col-lg-6"> <div id="chat-window"> <ul class="list-group"> <li class="list-group-item" ng-repeat="message in messages"> <span class="label label-info">{{message.time}}</span> <span class="label label-default">{{message.from}}</span> {{message.messageText}} </li> </ul> </div> <form ng-submit="sendMessage()"> <div> <div class="input-group"> <input type="text" ng-model="messageText" class="form-control" required> <span class="input-group-btn"> <button class="btn btn-default" type="submit"> Send<span class="glyphicon glyphicon-send" aria-hidden="true"></span> </button> </span> </div> <!-- /input-group --> </div> <!-- /.col-lg-6 --> </form> </div> </div> <!-- /.container --> </div> <!-- Bootstrap core JavaScript ================================================== --> <!-- Placed at the end of the document so the pages load faster --> <script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.1/jquery.min.js"></script> <script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.1/js/bootstrap.min.js"></script> </body> </html> 



Scaling out


We have a prototype of the application, but before rolling it out into production, we should pump it a little. We will pump it in the following way:

When you run an application on multiple servers, its architecture will change very slightly. Due to the fact that the actors in Akka have the property location transparency, we can safely pull away our application on several servers. Moreover, our actors will not even guess that they are now separated and work on different servers communicating over the network. All we need is to add some code, and Akka will do the rest of the work for us.
I'll run ahead and provide a picture of how our application will look after all the improvements. In general, the architecture will undergo minor changes, but the idea will remain the same.



To use cassandra as a magazine, we need
  1. install cassandra on nodes,
  2. use the plugin to keep the log in cassandra.

The first paragraph is well described in the official manual, so there is no special reason to bring it here. It is worth noting only that it is not necessary to make all the nodes of the cassanda seed-nodes, for a cluster of three machines one sid will be enough.

Regarding the second, we need to specify the type of log in the config, and register the addresses of the cassandra nodes. This can be done as follows:
Akka-persistence configuration
 akka.persistence.journal.plugin = "cassandra-journal" cassandra-journal.contact-points = ["ip1,ip2,ip3"] akka.persistence.snapshot-store.plugin = "cassandra-snapshot-store" cassandra-snapshot-store.contact-points = ["ip1,ip2,ip3"] 

After connecting cassandra, we will write our class to serialize and deserialize messages: first, we will use the protobuff code generator to generate the necessary classes, and then with their help we will make a serializer.

This is how the protobuf file will look like:

Content of the protobuf file
 option java_package = "actors.messages"; option optimize_for = SPEED; message ChatMessage { optional string author = 1; optional string content = 2; optional int64 timestamp = 3; } 

After the generation of the required class by the protobuf, we will write our serializer:

Message Serializer Code
 class ChatMessageSerializer extends Serializer { def identifier: Int = 193823 def includeManifest: Boolean = false def toBinary(obj: AnyRef): Array[Byte] = obj match { case ChatMessage(author, content, timestamp) => ProtoChatMessage.newBuilder() .setAuthor(author) .setContent(content) .setTimestamp(timestamp) .build() .toByteArray case _ => throw new IllegalArgumentException("unknown type " + obj.getClass) } def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = { val proto = ProtoChatMessage.parseFrom(bytes) ChatMessage(proto.getAuthor, proto.getContent, proto.getTimestamp) } } 

So, we now have a normal log, and a way to write to it. Now we need to think of a way to save messages no longer than 10 minutes. To do this, we write our own buffer that will save messages in the last 10 minutes.

Buffer code
 class FixedTimeMessageBuffer(duration: Long) extends Traversable[ChatMessage] { val list = ListBuffer[ChatMessage]() def now = System.currentTimeMillis() def old = now - duration def append(elem: ChatMessage) = { if (elem.timestamp > old) { while (list.nonEmpty && list.head.timestamp < old) { list.remove(0) } list.append(elem) } } override def toList = list.toList def replace(newList: List[ChatMessage]) = { list.clear() list ++= newList } def foreach[U](f: ChatMessage => U) = list.foreach(f) } 

We chose ListBuffer as the data structure for storing messages - for the reason that we only add elements to the end and remove them from the beginning. ListBuffer allows you to do these operations for a constant time. In the future, we will apply this buffer in our Reader actor in order to limit the number of messages sent to newly connected clients.

Consider how we divide actors across the network. In order for our application not to fall when one node is disconnected, and waiting for it to be turned on, we need to write the appropriate logic in the actor. Actor RoomWriter should notify RoomReader about new messages, so it will be useful for him to know the state of RoomReader. This logic is well described by the introduction of two states in the actor.

New methods for the RoomReader class
 ... sendIdentifyRequest() def sendIdentifyRequest(): Unit = { log.info(s"Trying connecting to $roomReaderPath") context.actorSelection(roomReaderPath) ! Identify(roomReaderPath) context.system.scheduler.scheduleOnce(3.seconds, self, ReceiveTimeout) } def receiveRecover = Actor.emptyBehavior def receiveCommand = identifying def identifying: Receive = { case msg: ChatMessage => persistAsync(msg) { m => log.info(s"Message $m persisted, but the reader isn't available") } case ActorIdentity(`roomReaderPath`, Some(actor)) => log.info(s"Successfully connected to $roomReaderPath") context.watch(actor) context.become(active(actor)) case ActorIdentity(`roomReaderPath`, None) => log.info(s"Remote actor is not available: $roomReaderPath") case ReceiveTimeout => sendIdentifyRequest() case _ => log.info("Not ready yet") } def active(reader: ActorRef): Receive = { case msg: ChatMessage => persistAsync(msg) { _ => reader ! Update } case "snap" => saveSnapshot("foo") case Terminated(`reader`) => log.info("reader terminated") sendIdentifyRequest() context.become(identifying) case ReceiveTimeout => // ignore } ... 

In the sendIdentifyRequest method, we try to get the ActorRef of the remote actor by sending it a Identify message. All actors understand this message, and in response they send us the ActorRef we need. After receiving the ActorRef, we go back to normal and begin work. It should be noted that we are also starting to monitor the life cycle of the remote actor and, if it is not available, we will try to reach it again.

In order to implement such a logic of work for the UserConnection actor, we will create a separate actor that will act as an intermediary in communicating with the backend.

BackendTalker class code
 class BackendTalker(roomWriterPath: String, roomReaderPath: String) extends Actor with ActorLogging { import BackendTalker._ val listeners = collection.mutable.Set[ActorRef]() sendReaderIdentifyRequest() sendWriterIdentifyRequest() def sendReaderIdentifyRequest(): Unit = { log.info("sending identify request to reader") context.actorSelection(roomReaderPath) ! Identify(roomReaderPath) import context.dispatcher context.system.scheduler.scheduleOnce(3.seconds, self, ReaderReceiveTimeout) } def sendWriterIdentifyRequest(): Unit = { log.info("sending identify request to writer") context.actorSelection(roomWriterPath) ! Identify(roomWriterPath) import context.dispatcher context.system.scheduler.scheduleOnce(3.seconds, self, WriterReceiveTimeout) } def receive = identifying def identifying: Receive = { case ActorIdentity(`roomWriterPath`, Some(actor)) => log.info(s"Successfully identified writer at $roomWriterPath") context.watch(actor) context.become(waitingForReader(actor)) case ActorIdentity(`roomReaderPath`, Some(actor)) => log.info(s"Successfully identified reader at $roomReaderPath") listeners.foreach(actor ! Listen(_)) context.watch(actor) context.become(waitingForWriter(actor)) case ActorIdentity(path, None) => log.info(s"Remote actor not available: $path") case ReaderReceiveTimeout => sendReaderIdentifyRequest() case WriterReceiveTimeout => sendWriterIdentifyRequest() case msg: ChatMessage => listeners += context.watch(sender()) sender() ! ChatMessage("system", "Connection problem, try again later", System.currentTimeMillis()) case Terminated(userCon) => listeners -= userCon case _ => log.info("Not ready yet") } def waitingForReader(writer: ActorRef): Receive = { case ActorIdentity(`roomReaderPath`, Some(reader)) => log.info(s"Successfully identified reader at $roomReaderPath") listeners.foreach(reader ! Listen(_)) context.watch(reader) context.become(active(reader, writer)) case ActorIdentity(`roomReaderPath`, None) => log.info(s"Reader actor not available: $roomReaderPath") case ReaderReceiveTimeout => sendReaderIdentifyRequest() case WriterReceiveTimeout => sendWriterIdentifyRequest() case Terminated(`writer`) => log.info("writer terminated") sendWriterIdentifyRequest() context.become(identifying) case msg: ChatMessage => listeners += context.watch(sender()) sender() ! ChatMessage("system", "Connection problem, try again later", System.currentTimeMillis()) case Terminated(userCon) => listeners -= userCon case _ => log.info("Not ready yet") } def waitingForWriter(reader: ActorRef): Receive = { case ActorIdentity(`roomWriterPath`, Some(writer)) => log.info(s"Successfully identified writer at $roomWriterPath") context.watch(writer) context.become(active(reader, writer)) case ActorIdentity(`roomWriterPath`, None) => log.info(s"Reader actor not available: $roomWriterPath") case ReaderReceiveTimeout => sendReaderIdentifyRequest() case WriterReceiveTimeout => sendWriterIdentifyRequest() case Terminated(`reader`) => log.info("reader terminated") sendReaderIdentifyRequest() context.become(identifying) case msg: ChatMessage => listeners += context.watch(sender()) sender() ! ChatMessage("system", "Connection problem, try again later", System.currentTimeMillis()) case Terminated(userCon) => listeners -= userCon case _ => log.info("Not ready yet") } def active(reader: ActorRef, writer: ActorRef): Receive = { case l: Listen => reader ! l case msg: ChatMessage => writer ! msg case Terminated(`reader`) => log.info("reader terminated") sendReaderIdentifyRequest() context.become(waitingForReader(writer)) case Terminated(`writer`) => log.info("writer terminated") sendWriterIdentifyRequest() context.become(waitingForWriter(reader)) case ReaderReceiveTimeout => case WriterReceiveTimeout => // ignore } } 

In it, we implement the logic of waiting for remote actors by analogy with what we did in the RoomWriter actor. In this case, we need to expect connection to two actors at once, so the logic of the work is a bit more complicated.

The final touch remains: we will rewrite RoomReader a bit in order to limit the number of messages that users receive.

To do this, we will add a couple of lines in it.

In the constructor, we define our buffer for storing messages, and write an auxiliary method for working with it. In addition, we will launch the scheduler, which every 10 minutes will give the command to create snapshots. It is worth noting that the command is given by sending a message to the actor, and we do not call the saveSnapshot method directly. This is done specifically in order not to violate the principle that the work with mutable data of the actor should be done only by the actor. Violating this principle, we can get subtle bugs.

Supplement to RoomReader
 context.system.scheduler.schedule(tenMinutes, tenMinutes, self, Snap) val state = FixedTimeMessageBuffer(tenMinutes) def updateState(msg: ChatMessage) = state.append(msg) 

In the receive method, we implement the ability to save snapshots after a special message arrives. We also implement the correct recovery of the state from snapshot.

Supplement to RoomReader
 case msg:ChatMessage => updateState(msg) sendAll(msg) case Listen(ref) => listeners add context.watch(ref) state.foreach(ref ! _) case Snap => saveSnapshot(state.toList) case SnapshotOffer(_, snapshot: List[ChatMessage]) => state.replace(snapshot) 

Summarizing, we can say that we have implemented a modern web application, made in the spirit of reactive programming. It allows you to quickly respond to user requests and also has some degree of stability. However, it has much to improve. In order for our application to work even in the case of the fall of individual nodes, we should use the akka-cluster module, which allows you to quickly implement decentralized applications that do not have a single point of failure.In addition, we need to somehow handle the situation when the flow of messages is too large and the actors do not have time to process it. To work with this, there is an experimental module akka-streams. We will find out about this and many other things in the next article.

Source: https://habr.com/ru/post/257477/


All Articles