この記事はこちらのコードを参考に自分がわかりやすいように噛み砕いたものになります。
ScalaでCQRS・ESをやりたいならPersistentActor
を使うといいらしい
PersistentActorとは
永続化・リカバリー機能などを備えたActor
...
依存
-
Akka
private val version = "2.5.19" "com.typesafe.akka" %% "akka-actor" % version "com.typesafe.akka" %% "akka-stream" % version "com.typesafe.akka" %% "akka-persistence" % version "com.typesafe.akka" %% "akka-persistence-query" % version "com.typesafe.akka" %% "akka-cluster" % version "com.typesafe.akka" %% "akka-cluster-tools" % version "com.typesafe.akka" %% "akka-cluster-sharding" % version
-
leveldb
"org.iq80.leveldb" % "leveldb" % "0.7", "org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8"
PersistentActorを使ってWriteサイドを作ってみる
設定ファイルをかく
PersistentActor
ではpersist
メソッドにコマンドを渡すことで、それを永続化することができる。しかしどの方法で永続化するかを設定ファイル(reference.conf)で指定しないといけない。
例: ローカルファイルに保存する・MySQLに保存するなど
ここでは簡単のため標準で用意されているローカルに保存する方法を使う。
akka {
persistence {
journal {
plugin = "akka.persistence.journal.leveldb"
leveldb {
dir = "target/example/journal"
}
}
snapshot-store {
plugin = "akka.persistence.snapshot-store.local"
local.dir = "target/example/snapshots"
}
}
}
外部からのコマンドを用意する
sealed trait BankAccountCommandRequest {
val bankAccountId: String
}
case class OpenBankAccountRequest(bankAccountId: String)
extends BankAccountCommandRequest
case class GetBalanceRequest(bankAccountId: String)
extends BankAccountCommandRequest
case class GetBalanceResponse(bankAccountId: String, balance: BigDecimal)
extends BankAccountCommandRequest
case class DepositRequest(bankAccountId: String, deposit: BigDecimal)
extends BankAccountCommandRequest
case class WithdrawRequest(bankAccountId: String, withdraw: BigDecimal)
extends BankAccountCommandRequest
case class CloseBankAccountRequest(bankAccountId: String)
extends BankAccountCommandRequest
PersistentActor内部コマンドを用意する
リカバリー時などは自分(PersistentActor)に対して、内部コマンドが投げられます
sealed trait BankAccountEvent {
val bankAccountId: String
}
case class BankAccountOpened(bankAccountId: String) extends BankAccountEvent
case class BankAccountDeposited(bankAccountId: String, deposit: BigDecimal)
extends BankAccountEvent
case class BankAccountWithdrawn(bankAccountId: String, withdraw: BigDecimal)
extends BankAccountEvent
case class BankAccountClosed(bankAccountId: String) extends BankAccountEvent
PersistentActorを作る
-
一つの
PersistentActor
はあくまでも一つのアカウントに対応している。 -
外部からのコマンドリクエストは
receiveCommand
へ、リカバリー時の自身へのコマンドリクエストはreceiveRecover
に送られる。 -
外部からのコマンドリクエスト時に
tryToSaveSnapshot
を呼び、一定条件達成でスナップショットを保存する。
class BankAccountAggregate extends PersistentActor with ActorLogging {
context.setReceiveTimeout(120.seconds)
private var stateOpt: Option[BankAccount] = None
private def equalsId(requestId: String): Boolean =
stateOpt match {
case None =>
throw new IllegalStateException(
s"Invalid state: requestId = $requestId")
case Some(state) =>
state.bankAccountId == requestId
}
private def applyState(event: BankAccountOpened): BankAccount = {
BankAccount(
bankAccountId = event.bankAccountId,
isClosed = false,
balance = 0
)
}
private def foreachState(f: BankAccount => Unit): Unit =
stateOpt.filter(!_.isClosed).foreach(f)
private def tryToSaveSnapshot: Unit =
if (lastSequenceNr % 5 == 0) {
println(s"SaveSnapshot: lastSequenceNr=$lastSequenceNr")
foreachState(saveSnapshot)
}
override def receiveRecover: Receive = {
case event: BankAccountOpened =>
stateOpt = Some(applyState(event))
case BankAccountDeposited(_, deposit) =>
stateOpt = stateOpt.flatMap(_.deposit(deposit).toOption)
case BankAccountWithdrawn(_, withdraw) =>
stateOpt = stateOpt.flatMap(_.withdraw(withdraw).toOption)
case BankAccountClosed(_) =>
stateOpt = stateOpt.flatMap(_.close().toOption)
case SnapshotOffer(_, _state: BankAccount) =>
stateOpt = Some(_state)
case SaveSnapshotSuccess(metadata) =>
println(s"receiveRecover: SaveSnapshotSuccess succeeded: $metadata")
case RecoveryCompleted =>
println(s"Recovery completed: $persistenceId")
}
override def receiveCommand: Receive = {
case OpenBankAccountRequest(bankAccountId) if stateOpt.isEmpty =>
persist(BankAccountOpened(bankAccountId)) { event =>
stateOpt = Some(applyState(event))
tryToSaveSnapshot
}
case GetBalanceRequest(bankAccountId) if equalsId(bankAccountId) =>
foreachState { state =>
sender() ! GetBalanceResponse(state.bankAccountId, state.balance)
}
case DepositRequest(bankAccountId, deposit) if equalsId(bankAccountId) =>
persist(BankAccountDeposited(bankAccountId, deposit)) { event =>
stateOpt = stateOpt.flatMap(_.deposit(event.deposit).toOption)
tryToSaveSnapshot
}
case WithdrawRequest(bankAccountId, withdraw) if equalsId(bankAccountId) =>
persist(BankAccountWithdrawn(bankAccountId, withdraw)) { event =>
stateOpt = stateOpt.flatMap(_.withdraw(event.withdraw).toOption)
tryToSaveSnapshot
}
case CloseBankAccountRequest(bankAccountId) if equalsId(bankAccountId) =>
persist(BankAccountClosed(bankAccountId)) { _ =>
stateOpt = stateOpt.flatMap(_.close().toOption)
tryToSaveSnapshot
}
case SaveSnapshotSuccess(metadata) =>
println(s"receiveCommand: SaveSnapshotSuccess succeeded: $metadata")
}
override def persistenceId: String = self.path.name
}
ClusterShardingを使ってPersistentActorをまとめる
一つのアカウントに対応したPersistentActor
をまとめる親Actor
が必要になる。
親Actor
はきたコマンドリクエストのアカウントIDに対して、
1. そのアカウントIDに対応したPersistentActorがある場合→そのPersistentActorに対してコマンドリクエストを回す。
2. そのアカウントIDに対応したPersistentActorがない場合→新しくPersistentActorを作り、自身の管理下に置く
extractEntityId
の戻り値のタブルの1項の値が同じであれば同一のPersistentActor
にリクエストが飛ぶ。
今回はリクエストのアカウントID(cmd.bankAccountId)を渡すことによって、アカウントID毎にそれぞれのPersistentActor
にコマンドリクエストが振り分けられる。
class BankAccountAggregates extends Actor {
val shardName = "bank-accounts"
val extractEntityId: ShardRegion.ExtractEntityId = {
case cmd: BankAccountCommandRequest =>
(cmd.bankAccountId, cmd)
}
val extractShardId: ShardRegion.ExtractShardId = {
case cmd: BankAccountCommandRequest =>
(cmd.bankAccountId.toLong % 12).toString
}
ClusterSharding(context.system).start(
shardName,
Props(new BankAccountAggregate()),
ClusterShardingSettings(context.system),
extractEntityId,
extractShardId
)
override def receive: Receive = {
case cmd: BankAccountCommandRequest =>
ClusterSharding(context.system).shardRegion(shardName) forward cmd
}
}
ClusterSharding用の設定を追加する
akka {
persistence {
journal {
plugin = "akka.persistence.journal.leveldb"
leveldb {
dir = "target/example/journal"
}
}
snapshot-store {
plugin = "akka.persistence.snapshot-store.local"
local.dir = "target/example/snapshots"
}
}
actor {
provider = "cluster"
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 2551
}
}
cluster {
seed-nodes = [ "akka.tcp://bank-system@127.0.0.1:2551" ]
auto-down-unreachable-after = 10s
metrics {
enabled = off
native-library-extract-folder = ${user.dir}/target/native
}
}
}
動かしてみる
object Main extends App {
implicit val system: ActorSystem =
ActorSystem("bank-system", config = ConfigFactory.load())
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val executionContext: ExecutionContextExecutor = system.dispatcher
val bankAccountAggregatesRef: ActorRef =
system.actorOf(Props(new BankAccountAggregates()), "sharded-bank-accounts")
implicit val timeout = Timeout(5000, TimeUnit.MILLISECONDS)
bankAccountAggregatesRef ! OpenBankAccountRequest("1")
bankAccountAggregatesRef ! OpenBankAccountRequest("2")
bankAccountAggregatesRef ! DepositRequest("1", 1000)
bankAccountAggregatesRef ! DepositRequest("2", 2000)
(bankAccountAggregatesRef ? GetBalanceRequest("1")).onComplete(println)
(bankAccountAggregatesRef ? GetBalanceRequest("2")).onComplete(println)
sys.addShutdownHook {
system.terminate()
}
}
ReadJournalを使ってReadサイドを作ってみる
WriteサイドでJournalに溜め込んだコマンドの集まりからReadModelを作る
編集中...
コメント