import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props, Terminated} import akka.http.scaladsl.Http import akka.http.scaladsl.model.ws._ import akka.stream.{ActorMaterializer, OverflowStrategy} import akka.stream.scaladsl._ import akka.http.scaladsl.server.Directives._ import scala.io.StdIn object Boot extends App { implicit val system = ActorSystem("example") implicit val materializer = ActorMaterializer() def flow: Flow[Message, Message, Any] = { val client = system.actorOf(Props(classOf[ClientConnectionActor])) val in = Sink.actorRef(client, 'sinkclose) val out = Source.actorRef(8, OverflowStrategy.fail).mapMaterializedValue { a β client ! ('income β a) a } Flow.fromSinkAndSource(in, out) } val route = path("ws")(handleWebSocketMessages(flow)) val bindingFuture = Http().bindAndHandle(route, "localhost", 8080) println(s"Server online at http://localhost:8080/\nPress RETURN to stop...") StdIn.readLine() import system.dispatcher bindingFuture .flatMap(_.unbind()) .onComplete(_ β system.terminate()) } class ClientConnectionActor extends Actor { var connection: Option[ActorRef] = None val receive: Receive = { case ('income, a: ActorRef) β connection = Some(a); context.watch(a) case Terminated(a) if connection.contains(a) β connection = None; context.stop(self) case 'sinkclose β context.stop(self) case TextMessage.Strict(t) β connection.foreach(_ ! TextMessage.Strict(s"echo $t")) case _ β // ingone } override def postStop(): Unit = connection.foreach(context.stop) }
import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.model.ws._ import akka.stream.ActorMaterializer import akka.stream.scaladsl._ import akka.http.scaladsl.server.Directives._ import scala.io.StdIn object Boot extends App { implicit val system = ActorSystem("example") implicit val materializer = ActorMaterializer() def flow: Flow[Message, Message, Any] = { Flow[Message].collect { case TextMessage.Strict(t) β t }.map { text β TextMessage.Strict(s"echo: $text") } } val route = path("ws")(handleWebSocketMessages(flow)) val bindingFuture = Http().bindAndHandle(route, "localhost", 8080) println(s"Server online at http://localhost:8080/\nPress RETURN to stop...") StdIn.readLine() import system.dispatcher bindingFuture .flatMap(_.unbind()) .onComplete(_ β system.terminate()) }
trait WsIncome trait WsOutgoing @JsonCodec case class Say(name: String) extends WsIncome with WsOutgoing implicit val WsIncomeDecoder: Decoder[WsIncome] = Decoder[Say].map[WsIncome](identity) implicit val WsOutgoingEncoder: Encoder[WsOutgoing] = { case s: Say β s.asJson }
Flow[Message] .collect { case tm: TextMessage β tm.textStream } .mapAsync(CORE_COUNT * 2 - 1)(in β in.runFold("")(_ + _).flatMap(in β Future.fromTry(parse(in).toTry.flatMap(_.as[WsIncome].toTry)))) .collect { case Say(name) β Say(s"hello: $name") } .mapAsync(CORE_COUNT * 2 - 1)(out β Future(TextMessage(out.asJson.noSpaces)))
class ClientContext { @volatile var userName: Option[String] = None } object ClientContext { def unapply(arg: ClientContext): Option[String] = arg.userName } @JsonCodec case class SetName(name: String) extends WsIncome @JsonCodec case class Say(text: String) extends WsIncome with WsOutgoing implicit val WsIncomeDecoder: Decoder[WsIncome] = Decoder[Say].map[WsIncome](identity) .or(Decoder[SetName].map[WsIncome](identity))
def flow: Flow[Message, Message, Any] = { Flow[Message] .collect { case tm: TextMessage β tm.textStream } .mapAsync(CORE_COUNT * 2 - 1)(in β in.runFold("")(_ + _).flatMap(in β Future.fromTry(parse(in).toTry.flatMap(_.as[WsIncome].toTry)))) .statefulMapConcat(() β { val context = new ClientContext m β (context β m) :: Nil }) .mapConcat { case (c: ClientContext, SetName(name)) β c.userName = Some(name) Nil case a β a :: Nil } .collect { case (ClientContext(userName), Say(text)) β Say(s"$userName: $text") case (_, Say(text)) β Say(s"unknown: $text") } .mapAsync(CORE_COUNT * 2 - 1)(out β Future(TextMessage(out.asJson.noSpaces))) }
class AuthFilter(auth: ws.AuthMessage β Future[Option[UserProfile]])(implicit ec: ExecutionContext) extends GraphStage[FlowShape[ws.WsIncomeMessage, ws.WsContextIncomeMessage]] { val in = Inlet[ws.WsIncomeMessage]("AuthFilter.in") val out = Outlet[ws.WsContextIncomeMessage]("AuthFilter.out") val shape = FlowShape.of(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = { new GraphStageLogic(shape) { @volatile var profile: Option[UserProfile] = None setHandler(in, new InHandler { override def onPush(): Unit = profile match { case Some(p) β push(out, ws.WsContextIncomeMessage(p, grab(in))) case _ β grab(in) match { case a: ws.AuthMessage β auth(a) onComplete { case Success(p) β profile = p pull(in) case Failure(e) β fail(out, e) } case _ β pull(in) } } }) setHandler(out, new OutHandler { override def onPull(): Unit = pull(in) }) } } }
case object Tick extends WsOutgoing implicit val WsOutgoingEncoder: Encoder[WsOutgoing] = { case s: Say β s.asJson case Tick β Json.obj("time" β DateTime.now.toIsoDateTimeString().asJson) } ... val broadcast = Source.tick[WsOutgoing](1.second, 1.second, Tick) ... .collect { case (ClientContext(userName), Say(text)) β Say(s"$userName: $text") case (_, Say(text)) β Say(s"unknown: $text") } .merge(broadcast) .mapAsync(CORE_COUNT * 2 - 1)(out β Future(TextMessage(out.asJson.noSpaces)))
Source: https://habr.com/ru/post/319978/
All Articles