
Recently, we often hear about reactive programming and see various basvords: message-driven architecture, event-sourcing, CQRS. Unfortunately, on Habré they write quite a bit about it, so I decided to correct the situation and share my knowledge with everyone.
In this article we will learn about the main features of reactive applications, consider how the CQRS and EventSourcing patterns will help us in their creation, and in order not to be bored, we will step by step create your messenger with a websocket and actors that matches all canons of reactive programming. To implement all this stuff, we will use the wonderful Scala language along with the equally excellent Akk library that implements the model of actors. Also, we will use the Play Framework to write the web part of our application. So let's get started.
The article is intended for those who are already familiar with Scala and heard about the model of actors. All others are also invited to read, the principles of reactive programming can be applied regardless of the language and framework.
What is reactive programming
The idea of ​​reactive programming is described in the reactive manifest
www.reactivemanifesto.org . Translation of its
first version was already on Habré, and the second version is slightly different from the first. Let's look at a brief clipping from the second version. A reactive manifest states that reactive applications have several important properties:
')
Responsiveness
The application responds as quickly as possible. Responsiveness is the basis of usability and utility, for the simple reason that long interface delays do not add a desire to use it, and also responsiveness means that problems can be quickly detected and effectively solved. Responsive systems focus on delivering fast and consistent response times, using suitable upper time boundaries to ensure consistent quality of service. This constant and predictable behavior, in turn, simplifies error handling, enhances user confidence, and encourages them to further interact.
fault tolerance
The application remains responsive when a failure occurs. This applies not only to high-availability, mission-critical systems — any failure-unstable system will not be responsive in the event of a failure. Stability is achieved through replication, localization, isolation and delegation. Failures do not go beyond the module, and by isolating the modules from each other, you can be sure that individual parts of the application can fail and recover from a failure, while not causing the entire application to fall. Recovery of each failed module is delegated to another, external module, and high availability is achieved through replication. Module clients do not have a headache with module failure handling.
Elasticity
The app remains responsive under various loads. Reactive applications can respond to changes in load by increasing or decreasing the resources intended for its processing. This implies an architecture that does not have locking points or central bottlenecks, which is reflected in the ability to sharding and replication of modules, and the further distribution of the load between them. The result of this is scalability with the help of cheap commonly used hardware (hello, google!).
Message Orientation
Reactive applications focus on asynchronous message passing to set boundaries between modules that provide weak connectivity, isolation, location transparency, and provides means for delegating errors as messages. Introducing explicit message passing provides opportunities for load balancing, elasticity, and flow control by generating and monitoring message queues, and reducing bandwidth when needed. Location transparency allows the same error handling logic to be used on the cluster as well as on the same node. Non-blocking communications allow message recipients to consume resources only when they are active, which leads to a smaller overhead monitor when an application is running.
CQRS
CQRS stands for Command Query Responsibility Segregation (division of responsibility into teams and requests). This approach to building the application architecture, in contrast to the widely used CRUD (Create Retrieve Update Delete), implies that it is possible to use different models for updating and reading information. A natural question arises, why do we need such perversions? The fact is that based on the fact that the reading model and the writing model are separated, we can optimize them for these tasks. For example, if data is better suited for reading tasks, then no one bothers us to do this. It is more convenient to read if the data in the graph database - please. I want to store everything in the Key-Value store - for God's sake. Moreover, if you want to add new features to the read model, then all you need to do after adding them is to regenerate the model (it’s worth making a reservation that if we have events for many gigabytes, then this process will not be so fast, but we can always make snapshot, which will significantly increase the speed of recovery).
In principle, over the normalization of the Read-model, you can not bother at all, for the same reason. Using CQRS to optimize read operations in our application, we ensure the responsiveness of our application. What do we still have in order for our application to be truly reactive? Correct, elasticity and resiliency. We implement these features using the Event Sourcing pattern.
Event sourcing
The meaning of ES is that we do not store the current state of our data model, but the entire history of changes that change the state of our application (in fact, not all changes, but only those that matter to us). To get the current state, we simply summarize the changes from all existing events. What do we mean by an event, and how does an event differ from a team? The team means that someone wants from us, besides, it can be ignored. An event is something that happened, an unchangeable fact.
The advantage of this approach is that we never delete or change anything. As you may have guessed, this gives us ample opportunities to scale our application, and as a database we can use well-proven NoSQL solutions such as Cassandra or HBase. EventSourcing gives us resiliency and elasticity.
Stop talking, show us the code.
So, as mentioned earlier, we will implement the whole thing using the
Typesafe stack .
The architecture of our application will look like this:
The user has the ability to read and send messages. Sending and receiving messages occurs through the web socket, which the UserConnection actor has access to. This actor sends messages to the RoomWriter actor, which, in addition to writing messages to the log, is engaged in kicking the RoomReader actor, which reads messages from the log and sends them back to the UserConnection actor. In addition to all this, we have the Receptionist actor, which handles the issuance of names and ensures that the application does not have users with two identical names. C architecture more or less figured out, now begin to write code.
RoomWriter
The very first we implement the actor that records incoming messages in the log.
RoomWriter Class Codeclass RoomWriter(roomLogId: String) extends PersistentActor { import RoomWriter._ override def persistenceId = roomLogId val listeners = mutable.Set.empty[ActorRef] def receiveRecover = Actor.emptyBehavior def receiveCommand = { case msg: Message => persistAsync(msg) { _ => listeners foreach (_ ! Update) } case Listen(ref) => listeners add context.watch(ref) case Terminated(ref) => listeners remove ref } }
What is it written here? As you might guess, we declared the RoomWriter class, which has three parts:
- identifier persistenceId, which is necessary to uniquely identify the events that were produced by this actor;
- a set of listeners containing a set of links to actors that should be notified that something has changed in the log;
- two methods, receiveRecover, which is called when replaying messages from the log that occurs when creating an actor, and receiveCommand, which is used to process messages during normal operation.
Let's look at the receiveCommand method in more detail. This method handles three different messages:
- when a Message is received, it is asynchronously written to the log, and a message is sent to each listener that the log has been updated.
- when we receive a Listen, we begin to follow the life cycle of the actor, the link to which lies in the message, among other things, the link to the actor is added to the set of listeners
- A terminated message containing a link to the deceased actor will be received if the actor over the life cycle of which we are watching will suddenly die. If this happens (the user has closed the browser), then we remove this actor from the mailing list.
The rule of good tone is the declaration of all processed messages and the factory method for creating an actor in a companion object:
RoomWriter companion object code object RoomWriter { case class Listen(ref: ActorRef) case class Message(author: String, content: String, time: Long) case object Update def props(roomId: String) = Props(new RoomWriter(roomId)) }
With RoomWriter, we figured out, now is the time to look at the RoomReader actor, which receives updates from the log, and sends them through the hierarchy above.
RoomReader
RoomReader class class RoomReader(roomLogId: String, roomWriter: ActorRef, userConnection: ActorRef) extends PersistentView { import RoomWriter._ roomWriter ! Listen(self) override def persistenceId = roomLogId override def viewId = roomLogId + "-view" def receive = { case msg @ Message(_, _,sendingTime) if currentTime - sendingTime < tenMinutes => userConnection ! msg case msg: Message => case Update => self ! akka.persistence.Update() } }
RoomReader depends on the log identifier, depending on which it will receive its updates. In our case, this identifier will match with the identifier of the actor RoomWriter, which will mean that everything that RoomWriter writes to the log will come to RoomReader. Consider how message processing takes place:
- when a Message is received, the time it is sent is checked, and if the message is older than ten minutes, it will not be shown to the user. This is done so that thousands of previously accumulated messages do not arrive to the user.
- when receiving Update, the actor reads the log, and sends the read messages to the user.
As in the previous case, our companion object:
RoomReader Companion Object Code object RoomReader { def currentTime = System.currentTimeMillis() val tenMinutes = Duration(10, MINUTES).toMillis def props(roomLogId: String, roomWriter: ActorRef, userConnection: ActorRef) = Props( new RoomReader(roomLogId, roomWriter, userConnection) ) }
We proceed to the most interesting part, the UserConnection actor, which is responsible for processing messages from the web socket.
Userconnection
Class code userconnection class UserConnection(receptionist: ActorRef, roomWriter: ActorRef, out: ActorRef, roomLogId: String) extends Actor { import actors.UserConnection._ def receive = waitingForUsername def waitingForUsername: Receive = { case WebSocketInMsg(RegisterMeWithName, username) => receptionist ! UsernameRequest(username) case Ack(username) => context become readyToChat(username) context actorOf RoomReader.props(roomLogId, roomWriter, self) out ! WebSocketOutMsg(currentTime, "system", "welcome") case NAck => out ! WebSocketOutMsg(currentTime, "system", "taken") } def readyToChat(username: String): Receive = { case WebSocketInMsg(SendMessage, message) => roomWriter ! Message(username, message, currentMillis) case Message(author, content, time) => out ! WebSocketOutMsg(formatTime(time), author, content) } }
This actor has one feature that distinguishes it from others: it can change its behavior and state. Initially, he is in a state of waiting for a username. In this state, it can accept client requests for a name, and forward them to the actor responsible for issuing names. Upon successful receipt of the name, the actor enters the state of readiness for the chat, and begins to send messages between parts of the system.
This companion turned out to be quite large this time:
Companion object code for the UserConnection class object UserConnection { def props(receptionist: ActorRef, roomWriter: ActorRef, out: ActorRef, roomLogId: String) = Props( new UserConnection(receptionist, roomWriter, out, roomLogId) ) case class WebSocketInMsg(messageType: Int, messageText: String) case class WebSocketOutMsg(time: String, from: String, messageText: String) case class UsernameRequest(name: String) case class Ack(username: String) case object NAck val RegisterMeWithName = 0 val SendMessage = 1 val formatter = DateTimeFormat.forPattern("HH:mm:ss").withLocale(Locale.US) def currentTime = DateTime.now().toString(formatter) def currentMillis = System.currentTimeMillis() def formatTime(timeStamp: Long) = new DateTime(timeStamp).toString(formatter) }
The last actor to be given our attention is the Receptionist.
Receptionist
Receptionist class code class Receptionist extends Actor { var takenNames = mutable.Map("system" -> self) def receive = { case UsernameRequest(username) => if (takenNames contains username) { sender() ! NAck } else { takenNames += (username -> context.watch(sender())) sender() ! Ack(username) } case Terminated(ref) => takenNames collectFirst { case (name, actor) if actor == ref => name } foreach takenNames.remove } }
Its task is to issue names to users: it contains an associative array that maps names to actorRefs. Just like in RoomWriter, we follow the life cycle of the actors to whom we gave out the names, and in case of their death, remove their names from the list of registered names.
Do not forget about the companion object, we place the factory method there to create the actor:
Companion object code of the Receptionist class object Receptionist { def props() = Props[Receptionist] }
Controller
At the moment we are finished with all the actors that we had plans for implementation. Now let's look at how we can connect a web socket and an actor. For this, we will use the tools that the play framework can offer us. We implement the controller of our application as follows:
Controller code object Application extends Controller { val logId = "akka-is-awesome" val roomWriter = Akka.system.actorOf(RoomWriter.props(logId), "writer") val receptionist = Akka.system.actorOf(Receptionist.props(), "receptionist") def index = Action { implicit request => Ok(views.html.chat()) } implicit val InMsgFormat = Json.format[WebSocketInMsg] implicit val InMsgFrameFormatter = FrameFormatter.jsonFrame[WebSocketInMsg] implicit val OutMsgFormat = Json.format[WebSocketOutMsg] implicit val OutMsgFrameFormatter = FrameFormatter.jsonFrame[WebSocketOutMsg] def socket = WebSocket.acceptWithActor[WebSocketInMsg, WebSocketOutMsg] { request => out => UserConnection.props(receptionist, roomWriter, out, logId) } }
First, we create two actors: roomWriter and receptionist. They are dependencies for the UserConnection actor. Next, we describe how to format messages for transmission through the webcast. Finally, we describe how we handle incoming connections to the web socket. The helper built into the Play Framework makes it incredibly easy to do.
It's time to create a web interface. For the layout we will use the twitter bootstrap framework, and angular.js - to implement the business logic on the client.
Client part of the code angular.module('chatApp', []) .controller('ChatCtrl', ['$scope', function($scope) { var wsUri = "ws://"+window.location.host+"/ws"; var websocket = new WebSocket(wsUri); $scope.name = ""; $scope.messages = []; $scope.registered = false; $scope.taken = false; $scope.sendMessage = function () { websocket.send(angular.toJson({ "messageType": 1, "messageText":$scope.messageText })); $scope.messageText = ""; }; $scope.sendName = function () { if (!$scope.registered) { websocket.send(angular.toJson({ "messageType": 0, "messageText": $scope.name })); } }; websocket.onmessage = function (e) { var msg = angular.fromJson(e.data); console.log(e.data); if (!$scope.registered) { switch (msg.from) { case "system": handleSystemMsg(msg.messageText); break; } } else { $scope.messages.push(msg); $scope.$apply(); var chatWindow = $("#chat-window"); chatWindow.scrollTop(chatWindow[0].scrollHeight); } }; function handleSystemMsg(msg) { switch (msg) { case "welcome": $scope.registered = true; break; case "taken": $scope.taken = true; break; } } }]);
What our html page will look like:
Html code of the application <!DOCTYPE html> <html ng-app="chatApp"> <head> <meta charset="utf-8"> <meta http-equiv="X-UA-Compatible" content="IE=edge"> <meta name="viewport" content="width=device-width, initial-scale=1"> <meta name="description" content=""> <meta name="author" content=""> <title>Akka WebSocket Chat</title> <link href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.1/css/bootstrap.min.css" rel="stylesheet"> <script src="https://ajax.googleapis.com/ajax/libs/angularjs/1.3.5/angular.min.js"></script> <link href="@routes.Assets.at("stylesheets/main.css")" rel="stylesheet"> <script src="@routes.Assets.at("javascripts/chatApp.js")"></script> </head> <body> <div ng-controller="ChatCtrl"> <nav class="navbar navbar-inverse navbar-fixed-top" role="navigation"> <div class="container"> <div class="navbar-header"> <a class="navbar-brand" href="#">Reactive Messenger</a> </div> <form class="navbar-form navbar-left" ng-submit="sendName()" ng-show="!registered"> <div class="form-group"> <input type="text" class="form-control" ng-model="name" placeholder="Username" required> </div> <button type="submit" class="btn btn-default">Set name</button> </form> </div> </nav> <div class="container" > <div class="chat col-lg-6"> <div id="chat-window"> <ul class="list-group"> <li class="list-group-item" ng-repeat="message in messages"> <span class="label label-info">{{message.time}}</span> <span class="label label-default">{{message.from}}</span> {{message.messageText}} </li> </ul> </div> <form ng-submit="sendMessage()"> <div> <div class="input-group"> <input type="text" ng-model="messageText" class="form-control" required> <span class="input-group-btn"> <button class="btn btn-default" type="submit"> Send<span class="glyphicon glyphicon-send" aria-hidden="true"></span> </button> </span> </div> </div> </form> </div> </div> </div> <script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.1/jquery.min.js"></script> <script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.1/js/bootstrap.min.js"></script> </body> </html>
Scaling out
We have a prototype of the application, but before rolling it out into production, we should pump it a little. We will pump it in the following way:
- Let's replace our ersatz magazine with something really good. In this case, we will take Cassandra, and will use it to store events.
- Default Java serialization does not differ in its stability when changing the format of messages or the speed in their serialization. It is worth replacing it with Google Protobuf or Kryo. In this case, we will use Protobuf.
- Users of our messenger want to stay up to date with the latest news, and do not want to read messages older than half an hour. To do this, we will change the logic of our actors, and we will create a snapshot every half hour, so we do not have to restore the entire message history every time a user connects.
- In order for the application to process a large number of users, it is worth making it distributed.
When you run an application on multiple servers, its architecture will change very slightly. Due to the fact that the actors in Akka have the property location transparency, we can safely pull away our application on several servers. Moreover, our actors will not even guess that they are now separated and work on different servers communicating over the network. All we need is to add some code, and Akka will do the rest of the work for us.
I'll run ahead and provide a picture of how our application will look after all the improvements. In general, the architecture will undergo minor changes, but the idea will remain the same.
To use cassandra as a magazine, we need
- install cassandra on nodes,
- use the plugin to keep the log in cassandra.
The first paragraph is well described in the official manual, so there is no special reason to bring it here. It is worth noting only that it is not necessary to make all the nodes of the cassanda seed-nodes, for a cluster of three machines one sid will be enough.
Regarding the second, we need to specify the type of log in the config, and register the addresses of the cassandra nodes. This can be done as follows:
Akka-persistence configuration akka.persistence.journal.plugin = "cassandra-journal" cassandra-journal.contact-points = ["ip1,ip2,ip3"] akka.persistence.snapshot-store.plugin = "cassandra-snapshot-store" cassandra-snapshot-store.contact-points = ["ip1,ip2,ip3"]
After connecting cassandra, we will write our class to serialize and deserialize messages: first, we will use the protobuff code generator to generate the necessary classes, and then with their help we will make a serializer.
This is how the protobuf file will look like:
Content of the protobuf file option java_package = "actors.messages"; option optimize_for = SPEED; message ChatMessage { optional string author = 1; optional string content = 2; optional int64 timestamp = 3; }
After the generation of the required class by the protobuf, we will write our serializer:
Message Serializer Code class ChatMessageSerializer extends Serializer { def identifier: Int = 193823 def includeManifest: Boolean = false def toBinary(obj: AnyRef): Array[Byte] = obj match { case ChatMessage(author, content, timestamp) => ProtoChatMessage.newBuilder() .setAuthor(author) .setContent(content) .setTimestamp(timestamp) .build() .toByteArray case _ => throw new IllegalArgumentException("unknown type " + obj.getClass) } def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = { val proto = ProtoChatMessage.parseFrom(bytes) ChatMessage(proto.getAuthor, proto.getContent, proto.getTimestamp) } }
So, we now have a normal log, and a way to write to it. Now we need to think of a way to save messages no longer than 10 minutes. To do this, we write our own buffer that will save messages in the last 10 minutes.
Buffer code class FixedTimeMessageBuffer(duration: Long) extends Traversable[ChatMessage] { val list = ListBuffer[ChatMessage]() def now = System.currentTimeMillis() def old = now - duration def append(elem: ChatMessage) = { if (elem.timestamp > old) { while (list.nonEmpty && list.head.timestamp < old) { list.remove(0) } list.append(elem) } } override def toList = list.toList def replace(newList: List[ChatMessage]) = { list.clear() list ++= newList } def foreach[U](f: ChatMessage => U) = list.foreach(f) }
We chose ListBuffer as the data structure for storing messages - for the reason that we only add elements to the end and remove them from the beginning. ListBuffer allows you to do these operations for a constant time. In the future, we will apply this buffer in our Reader actor in order to limit the number of messages sent to newly connected clients.
Consider how we divide actors across the network. In order for our application not to fall when one node is disconnected, and waiting for it to be turned on, we need to write the appropriate logic in the actor. Actor RoomWriter should notify RoomReader about new messages, so it will be useful for him to know the state of RoomReader. This logic is well described by the introduction of two states in the actor.
New methods for the RoomReader class ... sendIdentifyRequest() def sendIdentifyRequest(): Unit = { log.info(s"Trying connecting to $roomReaderPath") context.actorSelection(roomReaderPath) ! Identify(roomReaderPath) context.system.scheduler.scheduleOnce(3.seconds, self, ReceiveTimeout) } def receiveRecover = Actor.emptyBehavior def receiveCommand = identifying def identifying: Receive = { case msg: ChatMessage => persistAsync(msg) { m => log.info(s"Message $m persisted, but the reader isn't available") } case ActorIdentity(`roomReaderPath`, Some(actor)) => log.info(s"Successfully connected to $roomReaderPath") context.watch(actor) context.become(active(actor)) case ActorIdentity(`roomReaderPath`, None) => log.info(s"Remote actor is not available: $roomReaderPath") case ReceiveTimeout => sendIdentifyRequest() case _ => log.info("Not ready yet") } def active(reader: ActorRef): Receive = { case msg: ChatMessage => persistAsync(msg) { _ => reader ! Update } case "snap" => saveSnapshot("foo") case Terminated(`reader`) => log.info("reader terminated") sendIdentifyRequest() context.become(identifying) case ReceiveTimeout =>
In the sendIdentifyRequest method, we try to get the ActorRef of the remote actor by sending it a Identify message. All actors understand this message, and in response they send us the ActorRef we need. After receiving the ActorRef, we go back to normal and begin work. It should be noted that we are also starting to monitor the life cycle of the remote actor and, if it is not available, we will try to reach it again.
In order to implement such a logic of work for the UserConnection actor, we will create a separate actor that will act as an intermediary in communicating with the backend.
BackendTalker class code class BackendTalker(roomWriterPath: String, roomReaderPath: String) extends Actor with ActorLogging { import BackendTalker._ val listeners = collection.mutable.Set[ActorRef]() sendReaderIdentifyRequest() sendWriterIdentifyRequest() def sendReaderIdentifyRequest(): Unit = { log.info("sending identify request to reader") context.actorSelection(roomReaderPath) ! Identify(roomReaderPath) import context.dispatcher context.system.scheduler.scheduleOnce(3.seconds, self, ReaderReceiveTimeout) } def sendWriterIdentifyRequest(): Unit = { log.info("sending identify request to writer") context.actorSelection(roomWriterPath) ! Identify(roomWriterPath) import context.dispatcher context.system.scheduler.scheduleOnce(3.seconds, self, WriterReceiveTimeout) } def receive = identifying def identifying: Receive = { case ActorIdentity(`roomWriterPath`, Some(actor)) => log.info(s"Successfully identified writer at $roomWriterPath") context.watch(actor) context.become(waitingForReader(actor)) case ActorIdentity(`roomReaderPath`, Some(actor)) => log.info(s"Successfully identified reader at $roomReaderPath") listeners.foreach(actor ! Listen(_)) context.watch(actor) context.become(waitingForWriter(actor)) case ActorIdentity(path, None) => log.info(s"Remote actor not available: $path") case ReaderReceiveTimeout => sendReaderIdentifyRequest() case WriterReceiveTimeout => sendWriterIdentifyRequest() case msg: ChatMessage => listeners += context.watch(sender()) sender() ! ChatMessage("system", "Connection problem, try again later", System.currentTimeMillis()) case Terminated(userCon) => listeners -= userCon case _ => log.info("Not ready yet") } def waitingForReader(writer: ActorRef): Receive = { case ActorIdentity(`roomReaderPath`, Some(reader)) => log.info(s"Successfully identified reader at $roomReaderPath") listeners.foreach(reader ! Listen(_)) context.watch(reader) context.become(active(reader, writer)) case ActorIdentity(`roomReaderPath`, None) => log.info(s"Reader actor not available: $roomReaderPath") case ReaderReceiveTimeout => sendReaderIdentifyRequest() case WriterReceiveTimeout => sendWriterIdentifyRequest() case Terminated(`writer`) => log.info("writer terminated") sendWriterIdentifyRequest() context.become(identifying) case msg: ChatMessage => listeners += context.watch(sender()) sender() ! ChatMessage("system", "Connection problem, try again later", System.currentTimeMillis()) case Terminated(userCon) => listeners -= userCon case _ => log.info("Not ready yet") } def waitingForWriter(reader: ActorRef): Receive = { case ActorIdentity(`roomWriterPath`, Some(writer)) => log.info(s"Successfully identified writer at $roomWriterPath") context.watch(writer) context.become(active(reader, writer)) case ActorIdentity(`roomWriterPath`, None) => log.info(s"Reader actor not available: $roomWriterPath") case ReaderReceiveTimeout => sendReaderIdentifyRequest() case WriterReceiveTimeout => sendWriterIdentifyRequest() case Terminated(`reader`) => log.info("reader terminated") sendReaderIdentifyRequest() context.become(identifying) case msg: ChatMessage => listeners += context.watch(sender()) sender() ! ChatMessage("system", "Connection problem, try again later", System.currentTimeMillis()) case Terminated(userCon) => listeners -= userCon case _ => log.info("Not ready yet") } def active(reader: ActorRef, writer: ActorRef): Receive = { case l: Listen => reader ! l case msg: ChatMessage => writer ! msg case Terminated(`reader`) => log.info("reader terminated") sendReaderIdentifyRequest() context.become(waitingForReader(writer)) case Terminated(`writer`) => log.info("writer terminated") sendWriterIdentifyRequest() context.become(waitingForWriter(reader)) case ReaderReceiveTimeout => case WriterReceiveTimeout =>
In it, we implement the logic of waiting for remote actors by analogy with what we did in the RoomWriter actor. In this case, we need to expect connection to two actors at once, so the logic of the work is a bit more complicated.
The final touch remains: we will rewrite RoomReader a bit in order to limit the number of messages that users receive.
To do this, we will add a couple of lines in it.
In the constructor, we define our buffer for storing messages, and write an auxiliary method for working with it. In addition, we will launch the scheduler, which every 10 minutes will give the command to create snapshots. It is worth noting that the command is given by sending a message to the actor, and we do not call the saveSnapshot method directly. This is done specifically in order not to violate the principle that the work with mutable data of the actor should be done only by the actor. Violating this principle, we can get subtle bugs.
Supplement to RoomReader context.system.scheduler.schedule(tenMinutes, tenMinutes, self, Snap) val state = FixedTimeMessageBuffer(tenMinutes) def updateState(msg: ChatMessage) = state.append(msg)
In the receive method, we implement the ability to save snapshots after a special message arrives. We also implement the correct recovery of the state from snapshot.
Supplement to RoomReader case msg:ChatMessage => updateState(msg) sendAll(msg) case Listen(ref) => listeners add context.watch(ref) state.foreach(ref ! _) case Snap => saveSnapshot(state.toList) case SnapshotOffer(_, snapshot: List[ChatMessage]) => state.replace(snapshot)
Summarizing, we can say that we have implemented a modern web application, made in the spirit of reactive programming. It allows you to quickly respond to user requests and also has some degree of stability. However, it has much to improve. In order for our application to work even in the case of the fall of individual nodes, we should use the akka-cluster module, which allows you to quickly implement decentralized applications that do not have a single point of failure.
In addition, we need to somehow handle the situation when the flow of messages is too large and the actors do not have time to process it. To work with this, there is an experimental module akka-streams. We will find out about this and many other things in the next article.