πŸ“œ ⬆️ ⬇️

Akso HTTP WebSocket in practice

For quite a long time there was only one worthy implementation of working with HTTP over Akka - spray . To this library, a couple of craftsmen have written extensions for WebSocket,
which was quite understandable in use and there were no problems. But years went by and spray, in one form or another, migrated to Akka HTTP with implemented WebSocket support out of the box.
To work with WebSocket, the guys from Akka suggest that we use the Akka Stream, thereby simplifying our life with streaming data and, at the same time, complicating it. Akka Stream is not so easy to understand. Next, I will try to show basic practical examples of use.

Akka Stream in brief


This is a kind of data processing pipeline, each iteration of which does something with the data falling into it. Flow is divided into 3 components: Source, GraphStage, Sink.
This is best shown in the diagram from the documentation.
image

To implement WebSocket, we need to implement GraphStag. Source gives us akka, this is exactly our client with messages flying from him. And Sink is the sending of our messages to the client.

Actor style


Perhaps one of the most inefficient methods of processing, but the easiest to understand.
His idea is that all incoming messages get into the actor, and he had ActorRef , which sent the data directly to the client.
')
import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props, Terminated} import akka.http.scaladsl.Http import akka.http.scaladsl.model.ws._ import akka.stream.{ActorMaterializer, OverflowStrategy} import akka.stream.scaladsl._ import akka.http.scaladsl.server.Directives._ import scala.io.StdIn object Boot extends App { implicit val system = ActorSystem("example") implicit val materializer = ActorMaterializer() def flow: Flow[Message, Message, Any] = { val client = system.actorOf(Props(classOf[ClientConnectionActor])) val in = Sink.actorRef(client, 'sinkclose) val out = Source.actorRef(8, OverflowStrategy.fail).mapMaterializedValue { a β‡’ client ! ('income β†’ a) a } Flow.fromSinkAndSource(in, out) } val route = path("ws")(handleWebSocketMessages(flow)) val bindingFuture = Http().bindAndHandle(route, "localhost", 8080) println(s"Server online at http://localhost:8080/\nPress RETURN to stop...") StdIn.readLine() import system.dispatcher bindingFuture .flatMap(_.unbind()) .onComplete(_ β‡’ system.terminate()) } class ClientConnectionActor extends Actor { var connection: Option[ActorRef] = None val receive: Receive = { case ('income, a: ActorRef) β‡’ connection = Some(a); context.watch(a) case Terminated(a) if connection.contains(a) β‡’ connection = None; context.stop(self) case 'sinkclose β‡’ context.stop(self) case TextMessage.Strict(t) β‡’ connection.foreach(_ ! TextMessage.Strict(s"echo $t")) case _ β‡’ // ingone } override def postStop(): Unit = connection.foreach(context.stop) } 

For each client connection, we create the ClientConnectionActor actor. And also Source , which will be another actor that sends the received messages to flow. After its creation through the mapMaterializedValue method , we will get a link to it. In addition, we create a Sink , which will send all messages to ClientConnectionActor .

Thus, ClientConnectionActor will receive all messages from the socket. We will send them through the ActorRef that has arrived to him, which will deliver them to the client.

Cons: it is necessary to monitor the side actors; be careful with OverflowStrategy ; we have only one actor for processing all messages, it is, respectively, single-threaded, which can cause performance problems.

We will not consider the derivative using ActorPublisher and ActorSubscriber , since, judging by the official documentation, it is deprecated .

Flow style


The idea of ​​this approach is to fully use the Akka Stream to achieve the goals. The general view of it comes down to building a pipeline processing incoming client messages.

Skeleton
 import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.model.ws._ import akka.stream.ActorMaterializer import akka.stream.scaladsl._ import akka.http.scaladsl.server.Directives._ import scala.io.StdIn object Boot extends App { implicit val system = ActorSystem("example") implicit val materializer = ActorMaterializer() def flow: Flow[Message, Message, Any] = { Flow[Message].collect { case TextMessage.Strict(t) β‡’ t }.map { text β‡’ TextMessage.Strict(s"echo: $text") } } val route = path("ws")(handleWebSocketMessages(flow)) val bindingFuture = Http().bindAndHandle(route, "localhost", 8080) println(s"Server online at http://localhost:8080/\nPress RETURN to stop...") StdIn.readLine() import system.dispatcher bindingFuture .flatMap(_.unbind()) .onComplete(_ β‡’ system.terminate()) } 

In this case, we process only text messages and modify them. Next TextMessage is sent to the client.


Now let's make the skeleton a bit more complicated and add parsing and JSON serialization.

Classes for serialization
 trait WsIncome trait WsOutgoing @JsonCodec case class Say(name: String) extends WsIncome with WsOutgoing implicit val WsIncomeDecoder: Decoder[WsIncome] = Decoder[Say].map[WsIncome](identity) implicit val WsOutgoingEncoder: Encoder[WsOutgoing] = { case s: Say β‡’ s.asJson } 


Modify flow

 Flow[Message] .collect { case tm: TextMessage β‡’ tm.textStream } .mapAsync(CORE_COUNT * 2 - 1)(in β‡’ in.runFold("")(_ + _).flatMap(in β‡’ Future.fromTry(parse(in).toTry.flatMap(_.as[WsIncome].toTry)))) .collect { case Say(name) β‡’ Say(s"hello: $name") } .mapAsync(CORE_COUNT * 2 - 1)(out β‡’ Future(TextMessage(out.asJson.noSpaces))) 

First, we cut off all binary messages, then parse the incoming stream into JSON, process it and serialize it into text for sending to the client.

Complicate the design by adding context to each client. This will help us statefulMapConcat .

Clientcontext
 class ClientContext { @volatile var userName: Option[String] = None } object ClientContext { def unapply(arg: ClientContext): Option[String] = arg.userName } @JsonCodec case class SetName(name: String) extends WsIncome @JsonCodec case class Say(text: String) extends WsIncome with WsOutgoing implicit val WsIncomeDecoder: Decoder[WsIncome] = Decoder[Say].map[WsIncome](identity) .or(Decoder[SetName].map[WsIncome](identity)) 


 def flow: Flow[Message, Message, Any] = { Flow[Message] .collect { case tm: TextMessage β‡’ tm.textStream } .mapAsync(CORE_COUNT * 2 - 1)(in β‡’ in.runFold("")(_ + _).flatMap(in β‡’ Future.fromTry(parse(in).toTry.flatMap(_.as[WsIncome].toTry)))) .statefulMapConcat(() β‡’ { val context = new ClientContext m β‡’ (context β†’ m) :: Nil }) .mapConcat { case (c: ClientContext, SetName(name)) β‡’ c.userName = Some(name) Nil case a β‡’ a :: Nil } .collect { case (ClientContext(userName), Say(text)) β‡’ Say(s"$userName: $text") case (_, Say(text)) β‡’ Say(s"unknown: $text") } .mapAsync(CORE_COUNT * 2 - 1)(out β‡’ Future(TextMessage(out.asJson.noSpaces))) } 

There is another way: you can implement your filter / map by inheriting GraphStage [FlowShape [A, A]] .

Example (not adapted for the previous code)
 class AuthFilter(auth: ws.AuthMessage β‡’ Future[Option[UserProfile]])(implicit ec: ExecutionContext) extends GraphStage[FlowShape[ws.WsIncomeMessage, ws.WsContextIncomeMessage]] { val in = Inlet[ws.WsIncomeMessage]("AuthFilter.in") val out = Outlet[ws.WsContextIncomeMessage]("AuthFilter.out") val shape = FlowShape.of(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = { new GraphStageLogic(shape) { @volatile var profile: Option[UserProfile] = None setHandler(in, new InHandler { override def onPush(): Unit = profile match { case Some(p) β‡’ push(out, ws.WsContextIncomeMessage(p, grab(in))) case _ β‡’ grab(in) match { case a: ws.AuthMessage β‡’ auth(a) onComplete { case Success(p) β‡’ profile = p pull(in) case Failure(e) β‡’ fail(out, e) } case _ β‡’ pull(in) } } }) setHandler(out, new OutHandler { override def onPull(): Unit = pull(in) }) } } } 

In this embodiment, all messages are filtered until an authorization message is received. If the authorization is successful, the messages are passed on along with the user profile.

And finally, we will do so that to all connected users the current time is sent every second:

 case object Tick extends WsOutgoing implicit val WsOutgoingEncoder: Encoder[WsOutgoing] = { case s: Say β‡’ s.asJson case Tick β‡’ Json.obj("time" β†’ DateTime.now.toIsoDateTimeString().asJson) } ... val broadcast = Source.tick[WsOutgoing](1.second, 1.second, Tick) ... .collect { case (ClientContext(userName), Say(text)) β‡’ Say(s"$userName: $text") case (_, Say(text)) β‡’ Say(s"unknown: $text") } .merge(broadcast) .mapAsync(CORE_COUNT * 2 - 1)(out β‡’ Future(TextMessage(out.asJson.noSpaces))) 

These are basic examples of how you can implement WebSocket support in your project. The Akka Stream package is large and diverse, it will help to solve a rather large layer of tasks, without worrying about scaling and parallelization.

PS: Using a new technology for you in a more or less loaded project, do not forget to carry out load testing, monitor memory and hot parts of the code ( gatling can help you with this). All good.

Source: https://habr.com/ru/post/319978/


All Articles