概要
接続してきたユーザー全員に同じデータを配信する感じ(一対多)の
WebSocketServerを立てようとしたときにかなり時間かかったので記事にしました。
試行錯誤してそれっぽい動作にはなりましたが、これが最適解かどうかはわかりません笑
わかる方は教えてください!
サンプル
WebSocketサーバーの基本形
接続したユーザーがSendでなにか送ってきたら、それをそのまま送り返します。
handleWebSocketMessages
を使うのがポイント
webSocketFlow
をあれこれ変えることで動作をカスタマイズします。
import akka.NotUsed
import akka.actor.{ActorRef, ActorSystem}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.http.scaladsl.server.Directives.{complete, get, handleWebSocketMessages, path, pathEndOrSingleSlash}
import akka.stream._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.stream.scaladsl.{Broadcast, BroadcastHub, Flow, GraphDSL, Keep, Merge, Sink, Source}
import org.slf4j.LoggerFactory
import scala.concurrent.duration._
import scala.concurrent.ExecutionContextExecutor
object Sample extends App {
val logger = LoggerFactory.getLogger(getClass)
implicit val system: ActorSystem = ActorSystem("ws-server")
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val executionContext: ExecutionContextExecutor = system.dispatcher
val host = "0.0.0.0"
val port = 8080
val routes: Route = get {
pathEndOrSingleSlash {
complete("Welcome to websocket server")
}
} ~ path("ws") {
get {
handleWebSocketMessages(webSocketFlow)
}
}
val bindingFuture = Http().bindAndHandle(routes, host, port)
sys.addShutdownHook {
bindingFuture
.flatMap(_.unbind())
.onComplete(_ => {
system.terminate()
})
}
def webSocketFlow = Flow[Message].map {
case TextMessage.Strict(m) => TextMessage(m)
case other => TextMessage(other.toString)
}
}
配信用のSource、Actorを作る
actorRef
にメッセージを送ると、データがsource
に溜まる
ex.actorRef ! TextMessage("send message to all user")
- source を加工することで全員にメッセージを送る
//上のつづき
val actorRefSource: Source[Message, ActorRef] = Source.actorRef[Message](bufferSize = 1000, OverflowStrategy.fail)
val (actorRef, source) = actorRefSource.toMat(BroadcastHub.sink[Message](bufferSize = 256))(Keep.both).run()
メインのFlowを作る
ざっと流れは2つです
- ユーザー自身が
ping
を送ったときはpong
を送る(その人だけに) -
Sourceにメッセージが貯まったら
MasterBus
がそれを全員に送る
これをGraphDSL
を使って実装していきます//上のつづき def webSocketFlowGraph(source: Source[Message, NotUsed]) = { GraphDSL.create() { implicit builder => import GraphDSL.Implicits._ source.runWith(Sink.ignore) //TODO FlowのためSource.emptyをつけているが、ここもっと良い書き方ありそう? val MasterBus: Flow[Message, Message, UniqueKillSwitch] = Flow.fromSinkAndSource(Sink.ignore, source) .joinMat(KillSwitches.singleBidi[Message, Message])(Keep.right) .backpressureTimeout(3.seconds) val IN = builder.add(Broadcast[Message](1)) //ユーザー入力に対する処理 val UserFlow: FlowShape[Message, Message] = builder.add(Flow[Message].map { case TextMessage.Strict("ping") => TextMessage("pong") case TextMessage.Strict(m) => TextMessage(m) case other => TextMessage(other.toString) }) // MasterBus と UserFlow を合成します val OUT = builder.add(Merge[Message](2)) Source.empty ~> MasterBus ~> OUT IN ~> UserFlow ~> OUT FlowShape(IN.in, OUT.out) } }
上のFlowを結合する
上で作ったwebSocketFlowGraph
をそのままhandleWebSocketMessages
に入れたいんですが、
そのまま入れられないので、両側からFlowを挟み新たなFlowを作成
(これも他にいい方法ありそうだなぁ。。。)
val actorRefSource: Source[Message, ActorRef] = Source.actorRef[Message](bufferSize = 1000, OverflowStrategy.fail)
val (actorRef, source) = actorRefSource.toMat(BroadcastHub.sink[Message](bufferSize = 256))(Keep.both).run()
//少し変更する
def webSocketFlow = {
fromFlow
.viaMat(webSocketFlowGraph(source))(Keep.right)
.viaMat(toFlow)(Keep.right)
}
//来たものは全て通す
def fromFlow = Flow[Message]
def toFlow = Flow[Message].map {
case TextMessage.Strict(m) => TextMessage(m)
}
全体コード
表示
import akka.NotUsed
import akka.actor.{ActorRef, ActorSystem}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.http.scaladsl.server.Directives.{complete, get, handleWebSocketMessages, path, pathEndOrSingleSlash}
import akka.stream._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.stream.scaladsl.{Broadcast, BroadcastHub, Flow, GraphDSL, Keep, Merge, Sink, Source}
import org.slf4j.LoggerFactory
import scala.concurrent.duration._
import scala.concurrent.ExecutionContextExecutor
object Sample extends App {
val logger = LoggerFactory.getLogger(getClass)
implicit val system: ActorSystem = ActorSystem("ws-server")
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val executionContext: ExecutionContextExecutor = system.dispatcher
val host = "0.0.0.0"
val port = 8080
val routes: Route = get {
pathEndOrSingleSlash {
//localhost:8080
complete("Welcome to websocket server")
}
} ~ path("ws") {
//localhost:8080/ws
get {
handleWebSocketMessages(webSocketFlow)
}
}
val bindingFuture = Http().bindAndHandle(routes, host, port)
sys.addShutdownHook {
bindingFuture
.flatMap(_.unbind())
.onComplete(_ => {
system.terminate()
})
}
val actorRefSource: Source[Message, ActorRef] = Source.actorRef[Message](bufferSize = 1000, OverflowStrategy.fail)
val (actorRef, source) = actorRefSource.toMat(BroadcastHub.sink[Message](bufferSize = 256))(Keep.both).run()
def webSocketFlow = {
//TODO webSocketFlowGraph を直接 handleWebSocketMessages に入れることはできないのか?
fromFlow
.viaMat(webSocketFlowGraph(source))(Keep.right)
.viaMat(toFlow)(Keep.right)
}
//来たものは全て通す
def fromFlow = Flow[Message]
def toFlow = Flow[Message].map {
case TextMessage.Strict(m) => TextMessage(m)
}
def webSocketFlowGraph(source: Source[Message, NotUsed]) = {
GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
source.runWith(Sink.ignore)
//TODO FlowのためSource.emptyをつけているが、ここもっと良い書き方ありそう?
val MasterBus: Flow[Message, Message, UniqueKillSwitch] = Flow.fromSinkAndSource(Sink.ignore, source)
.joinMat(KillSwitches.singleBidi[Message, Message])(Keep.right)
.backpressureTimeout(3.seconds)
val IN = builder.add(Broadcast[Message](1))
//ユーザー入力に対する処理
val UserFlow: FlowShape[Message, Message] = builder.add(Flow[Message].map {
case TextMessage.Strict("ping") => TextMessage("pong")
case TextMessage.Strict(m) => TextMessage(m)
case other => TextMessage(other.toString)
})
// MasterBus と UserFlow を合成します
val OUT = builder.add(Merge[Message](2))
Source.empty ~> MasterBus ~> OUT
IN ~> UserFlow ~> OUT
FlowShape(IN.in, OUT.out)
}
}
}
```
動作確認
$ sbt run
サーバーを立てたら
localhost:8080/ws
にWebSocket接続して
ping
を送ってpong
が帰ってきたらOKです!
ただこれだとサーバー側から何も送信していないので、定期実行などで
actorRef ! TextMessage("send message to all user")
などを実行すれば全員に送られていることが確認できるはずです。
そこの実装は読者の皆さんにおまかせします^^
おわりに
全体コードにある2つのTODOについてなにかわかる方、もっと良い書き方を知っている方などいましたら教えてください!
コメント