【Scala】配信型のWebSocketサーバーを立てる

概要

接続してきたユーザー全員に同じデータを配信する感じ(一対多)の
WebSocketServerを立てようとしたときにかなり時間かかったので記事にしました。

試行錯誤してそれっぽい動作にはなりましたが、これが最適解かどうかはわかりません笑
わかる方は教えてください!

サンプル

WebSocketサーバーの基本形

接続したユーザーがSendでなにか送ってきたら、それをそのまま送り返します。
handleWebSocketMessagesを使うのがポイント
webSocketFlowをあれこれ変えることで動作をカスタマイズします。

import akka.NotUsed
import akka.actor.{ActorRef, ActorSystem}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.http.scaladsl.server.Directives.{complete, get, handleWebSocketMessages, path, pathEndOrSingleSlash}
import akka.stream._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.stream.scaladsl.{Broadcast, BroadcastHub, Flow, GraphDSL, Keep, Merge, Sink, Source}
import org.slf4j.LoggerFactory

import scala.concurrent.duration._
import scala.concurrent.ExecutionContextExecutor

object Sample extends App {

  val logger = LoggerFactory.getLogger(getClass)
  implicit val system: ActorSystem                        = ActorSystem("ws-server")
  implicit val materializer: ActorMaterializer            = ActorMaterializer()
  implicit val executionContext: ExecutionContextExecutor = system.dispatcher

  val host = "0.0.0.0"

  val port = 8080

  val routes: Route = get {
    pathEndOrSingleSlash {
      complete("Welcome to websocket server")
    }
  } ~ path("ws") {
    get {
      handleWebSocketMessages(webSocketFlow)
    }
  }

  val bindingFuture = Http().bindAndHandle(routes, host, port)

  sys.addShutdownHook {
    bindingFuture
      .flatMap(_.unbind())
      .onComplete(_ => {
        system.terminate()
      })
  }

  def webSocketFlow = Flow[Message].map {
    case TextMessage.Strict(m) => TextMessage(m)
    case other => TextMessage(other.toString)
  }

}

配信用のSource、Actorを作る

  1. actorRef にメッセージを送ると、データが source に溜まる
    ex.actorRef ! TextMessage("send message to all user")
  2. source を加工することで全員にメッセージを送る
  //上のつづき
  val actorRefSource: Source[Message, ActorRef] = Source.actorRef[Message](bufferSize = 1000, OverflowStrategy.fail)
  val (actorRef, source) = actorRefSource.toMat(BroadcastHub.sink[Message](bufferSize = 256))(Keep.both).run()

メインのFlowを作る

ざっと流れは2つです

  1. ユーザー自身がpingを送ったときはpongを送る(その人だけに)
  2. Sourceにメッセージが貯まったらMasterBusがそれを全員に送る
    これをGraphDSLを使って実装していきます

    //上のつづき
    def webSocketFlowGraph(source: Source[Message, NotUsed]) = {
    GraphDSL.create() { implicit builder =>
    
      import GraphDSL.Implicits._
      source.runWith(Sink.ignore)
    
      //TODO FlowのためSource.emptyをつけているが、ここもっと良い書き方ありそう?
      val MasterBus: Flow[Message, Message, UniqueKillSwitch] = Flow.fromSinkAndSource(Sink.ignore, source)
        .joinMat(KillSwitches.singleBidi[Message, Message])(Keep.right)
        .backpressureTimeout(3.seconds)
    
      val IN = builder.add(Broadcast[Message](1))
    
      //ユーザー入力に対する処理
      val UserFlow: FlowShape[Message, Message] = builder.add(Flow[Message].map {
        case TextMessage.Strict("ping") => TextMessage("pong")
        case TextMessage.Strict(m) => TextMessage(m)
        case other => TextMessage(other.toString)
      })
    
      // MasterBus と UserFlow を合成します
      val OUT = builder.add(Merge[Message](2))
    
      Source.empty ~> MasterBus ~> OUT
      IN ~> UserFlow ~> OUT
      FlowShape(IN.in, OUT.out)
    
    }
    }

上のFlowを結合する

上で作ったwebSocketFlowGraphをそのままhandleWebSocketMessagesに入れたいんですが、
そのまま入れられないので、両側からFlowを挟み新たなFlowを作成
(これも他にいい方法ありそうだなぁ。。。)

  val actorRefSource: Source[Message, ActorRef] = Source.actorRef[Message](bufferSize = 1000, OverflowStrategy.fail)
  val (actorRef, source) = actorRefSource.toMat(BroadcastHub.sink[Message](bufferSize = 256))(Keep.both).run()

  //少し変更する
  def webSocketFlow = {
    fromFlow
      .viaMat(webSocketFlowGraph(source))(Keep.right)
      .viaMat(toFlow)(Keep.right)
  }

  //来たものは全て通す
  def fromFlow = Flow[Message]

  def toFlow = Flow[Message].map {
    case TextMessage.Strict(m) => TextMessage(m)
  }

全体コード

表示
```scala
import akka.NotUsed
import akka.actor.{ActorRef, ActorSystem}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.http.scaladsl.server.Directives.{complete, get, handleWebSocketMessages, path, pathEndOrSingleSlash}
import akka.stream._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.stream.scaladsl.{Broadcast, BroadcastHub, Flow, GraphDSL, Keep, Merge, Sink, Source}
import org.slf4j.LoggerFactory

import scala.concurrent.duration._
import scala.concurrent.ExecutionContextExecutor

object Sample extends App {

val logger = LoggerFactory.getLogger(getClass)

implicit val system: ActorSystem = ActorSystem("ws-server")
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val executionContext: ExecutionContextExecutor = system.dispatcher

val host = "0.0.0.0"
val port = 8080
val routes: Route = get {
pathEndOrSingleSlash {
//localhost:8080
complete("Welcome to websocket server")
}
} ~ path("ws") {
//localhost:8080/ws
get {
handleWebSocketMessages(webSocketFlow)
}
}
val bindingFuture = Http().bindAndHandle(routes, host, port)

sys.addShutdownHook {
bindingFuture
.flatMap(_.unbind())
.onComplete(_ => {
system.terminate()
})
}

val actorRefSource: Source[Message, ActorRef] = Source.actorRef[Message](bufferSize = 1000, OverflowStrategy.fail)
val (actorRef, source) = actorRefSource.toMat(BroadcastHub.sink[Message](bufferSize = 256))(Keep.both).run()

def webSocketFlow = {
//TODO webSocketFlowGraph を直接 handleWebSocketMessages に入れることはできないのか?
fromFlow
.viaMat(webSocketFlowGraph(source))(Keep.right)
.viaMat(toFlow)(Keep.right)
}

//来たものは全て通す
def fromFlow = Flow[Message]

def toFlow = Flow[Message].map {
case TextMessage.Strict(m) => TextMessage(m)
}

def webSocketFlowGraph(source: Source[Message, NotUsed]) = {
GraphDSL.create() { implicit builder =>

import GraphDSL.Implicits._
source.runWith(Sink.ignore)

//TODO FlowのためSource.emptyをつけているが、ここもっと良い書き方ありそう?
val MasterBus: Flow[Message, Message, UniqueKillSwitch] = Flow.fromSinkAndSource(Sink.ignore, source)
.joinMat(KillSwitches.singleBidi[Message, Message])(Keep.right)
.backpressureTimeout(3.seconds)

val IN = builder.add(Broadcast[Message](1))

//ユーザー入力に対する処理
val UserFlow: FlowShape[Message, Message] = builder.add(Flow[Message].map {
case TextMessage.Strict("ping") => TextMessage("pong")
case TextMessage.Strict(m) => TextMessage(m)
case other => TextMessage(other.toString)
})

// MasterBus と UserFlow を合成します
val OUT = builder.add(Merge[Message](2))

Source.empty ~> MasterBus ~> OUT
IN ~> UserFlow ~> OUT
FlowShape(IN.in, OUT.out)

}
}

}
```

動作確認

$ sbt run

サーバーを立てたら
localhost:8080/wsにWebSocket接続して
pingを送ってpongが帰ってきたらOKです!
ただこれだとサーバー側から何も送信していないので、定期実行などで
actorRef ! TextMessage("send message to all user")
などを実行すれば全員に送られていることが確認できるはずです。
そこの実装は読者の皆さんにおまかせします^^

おわりに

全体コードにある2つのTODOについてなにかわかる方、もっと良い書き方を知っている方などいましたら教えてください!

コメント

タイトルとURLをコピーしました