【Scala】CQRS・ESの練習で銀行システムを作る

この記事はこちらのコードを参考に自分がわかりやすいように噛み砕いたものになります。

ScalaでCQRS・ESをやりたいならPersistentActorを使うといいらしい

PersistentActorとは

永続化・リカバリー機能などを備えたActor
...

依存

  • Akka

    private val version     = "2.5.19"
    "com.typesafe.akka" %% "akka-actor" % version
    "com.typesafe.akka" %% "akka-stream" % version
    "com.typesafe.akka" %% "akka-persistence" % version
    "com.typesafe.akka" %% "akka-persistence-query" % version
    "com.typesafe.akka" %% "akka-cluster" % version
    "com.typesafe.akka" %% "akka-cluster-tools" % version
    "com.typesafe.akka" %% "akka-cluster-sharding" % version
  • leveldb

    "org.iq80.leveldb" % "leveldb" % "0.7",
    "org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8"

PersistentActorを使ってWriteサイドを作ってみる

設定ファイルをかく

PersistentActorではpersistメソッドにコマンドを渡すことで、それを永続化することができる。しかしどの方法で永続化するかを設定ファイル(reference.conf)で指定しないといけない。

例: ローカルファイルに保存する・MySQLに保存するなど

ここでは簡単のため標準で用意されているローカルに保存する方法を使う。

akka {
    persistence {
        journal {
            plugin = "akka.persistence.journal.leveldb"
            leveldb {
                dir = "target/example/journal"
            }
        }
        snapshot-store {
            plugin = "akka.persistence.snapshot-store.local"
            local.dir = "target/example/snapshots"
        }
    }
}

外部からのコマンドを用意する

sealed trait BankAccountCommandRequest {
    val bankAccountId: String
  }
  case class OpenBankAccountRequest(bankAccountId: String)
      extends BankAccountCommandRequest
  case class GetBalanceRequest(bankAccountId: String)
      extends BankAccountCommandRequest
  case class GetBalanceResponse(bankAccountId: String, balance: BigDecimal)
      extends BankAccountCommandRequest
  case class DepositRequest(bankAccountId: String, deposit: BigDecimal)
      extends BankAccountCommandRequest
  case class WithdrawRequest(bankAccountId: String, withdraw: BigDecimal)
      extends BankAccountCommandRequest
  case class CloseBankAccountRequest(bankAccountId: String)
      extends BankAccountCommandRequest

PersistentActor内部コマンドを用意する

リカバリー時などは自分(PersistentActor)に対して、内部コマンドが投げられます

sealed trait BankAccountEvent {
    val bankAccountId: String
  }
  case class BankAccountOpened(bankAccountId: String) extends BankAccountEvent
  case class BankAccountDeposited(bankAccountId: String, deposit: BigDecimal)
      extends BankAccountEvent
  case class BankAccountWithdrawn(bankAccountId: String, withdraw: BigDecimal)
      extends BankAccountEvent
  case class BankAccountClosed(bankAccountId: String) extends BankAccountEvent

PersistentActorを作る

  1. 一つのPersistentActorはあくまでも一つのアカウントに対応している。

  2. 外部からのコマンドリクエストはreceiveCommandへ、リカバリー時の自身へのコマンドリクエストはreceiveRecoverに送られる。

  3. 外部からのコマンドリクエスト時にtryToSaveSnapshotを呼び、一定条件達成でスナップショットを保存する。

class BankAccountAggregate extends PersistentActor with ActorLogging {
  context.setReceiveTimeout(120.seconds)

  private var stateOpt: Option[BankAccount] = None

  private def equalsId(requestId: String): Boolean =
    stateOpt match {
      case None =>
        throw new IllegalStateException(
          s"Invalid state: requestId = $requestId")
      case Some(state) =>
        state.bankAccountId == requestId
    }

  private def applyState(event: BankAccountOpened): BankAccount = {
    BankAccount(
      bankAccountId = event.bankAccountId,
      isClosed = false,
      balance = 0
    )
  }

  private def foreachState(f: BankAccount => Unit): Unit =
    stateOpt.filter(!_.isClosed).foreach(f)

  private def tryToSaveSnapshot: Unit =
    if (lastSequenceNr % 5 == 0) {
      println(s"SaveSnapshot: lastSequenceNr=$lastSequenceNr")
      foreachState(saveSnapshot)
    }

  override def receiveRecover: Receive = {
    case event: BankAccountOpened =>
      stateOpt = Some(applyState(event))
    case BankAccountDeposited(_, deposit) =>
      stateOpt = stateOpt.flatMap(_.deposit(deposit).toOption)
    case BankAccountWithdrawn(_, withdraw) =>
      stateOpt = stateOpt.flatMap(_.withdraw(withdraw).toOption)
    case BankAccountClosed(_) =>
      stateOpt = stateOpt.flatMap(_.close().toOption)
    case SnapshotOffer(_, _state: BankAccount) =>
      stateOpt = Some(_state)
    case SaveSnapshotSuccess(metadata) =>
      println(s"receiveRecover: SaveSnapshotSuccess succeeded: $metadata")
    case RecoveryCompleted =>
      println(s"Recovery completed: $persistenceId")
  }

  override def receiveCommand: Receive = {
    case OpenBankAccountRequest(bankAccountId) if stateOpt.isEmpty =>
      persist(BankAccountOpened(bankAccountId)) { event =>
        stateOpt = Some(applyState(event))
        tryToSaveSnapshot
      }
    case GetBalanceRequest(bankAccountId) if equalsId(bankAccountId) =>
      foreachState { state =>
        sender() ! GetBalanceResponse(state.bankAccountId, state.balance)
      }
    case DepositRequest(bankAccountId, deposit) if equalsId(bankAccountId) =>
      persist(BankAccountDeposited(bankAccountId, deposit)) { event =>
        stateOpt = stateOpt.flatMap(_.deposit(event.deposit).toOption)
        tryToSaveSnapshot
      }
    case WithdrawRequest(bankAccountId, withdraw) if equalsId(bankAccountId) =>
      persist(BankAccountWithdrawn(bankAccountId, withdraw)) { event =>
        stateOpt = stateOpt.flatMap(_.withdraw(event.withdraw).toOption)
        tryToSaveSnapshot
      }
    case CloseBankAccountRequest(bankAccountId) if equalsId(bankAccountId) =>
      persist(BankAccountClosed(bankAccountId)) { _ =>
        stateOpt = stateOpt.flatMap(_.close().toOption)
        tryToSaveSnapshot
      }
    case SaveSnapshotSuccess(metadata) =>
      println(s"receiveCommand: SaveSnapshotSuccess succeeded: $metadata")
  }

  override def persistenceId: String = self.path.name
}

ClusterShardingを使ってPersistentActorをまとめる

一つのアカウントに対応したPersistentActorをまとめる親Actorが必要になる。

親ActorはきたコマンドリクエストのアカウントIDに対して、

1. そのアカウントIDに対応したPersistentActorがある場合→そのPersistentActorに対してコマンドリクエストを回す。
2. そのアカウントIDに対応したPersistentActorがない場合→新しくPersistentActorを作り、自身の管理下に置く

extractEntityIdの戻り値のタブルの1項の値が同じであれば同一のPersistentActorにリクエストが飛ぶ。

今回はリクエストのアカウントID(cmd.bankAccountId)を渡すことによって、アカウントID毎にそれぞれのPersistentActorにコマンドリクエストが振り分けられる。

class BankAccountAggregates extends Actor {
  val shardName = "bank-accounts"

  val extractEntityId: ShardRegion.ExtractEntityId = {
    case cmd: BankAccountCommandRequest =>
      (cmd.bankAccountId, cmd)
  }

  val extractShardId: ShardRegion.ExtractShardId = {
    case cmd: BankAccountCommandRequest =>
      (cmd.bankAccountId.toLong % 12).toString
  }

  ClusterSharding(context.system).start(
    shardName,
    Props(new BankAccountAggregate()),
    ClusterShardingSettings(context.system),
    extractEntityId,
    extractShardId
  )

  override def receive: Receive = {
    case cmd: BankAccountCommandRequest =>
      ClusterSharding(context.system).shardRegion(shardName) forward cmd
  }

}

ClusterSharding用の設定を追加する

akka {
    persistence {
        journal {
            plugin = "akka.persistence.journal.leveldb"
            leveldb {
                dir = "target/example/journal"
            }
        }

        snapshot-store {
            plugin = "akka.persistence.snapshot-store.local"
            local.dir = "target/example/snapshots"
        }
    }

    actor {
        provider = "cluster"
      }

    remote {
        log-remote-lifecycle-events = off
        netty.tcp {
          hostname = "127.0.0.1"
          port = 2551
        }
    }

    cluster {
        seed-nodes = [ "akka.tcp://bank-system@127.0.0.1:2551" ]
        auto-down-unreachable-after = 10s
        metrics {
            enabled = off
            native-library-extract-folder = ${user.dir}/target/native
        }
    }

}

動かしてみる

object Main extends App {

  implicit val system: ActorSystem =
    ActorSystem("bank-system", config = ConfigFactory.load())
  implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val executionContext: ExecutionContextExecutor = system.dispatcher

  val bankAccountAggregatesRef: ActorRef =
    system.actorOf(Props(new BankAccountAggregates()), "sharded-bank-accounts")
  implicit val timeout = Timeout(5000, TimeUnit.MILLISECONDS)
  bankAccountAggregatesRef ! OpenBankAccountRequest("1")
  bankAccountAggregatesRef ! OpenBankAccountRequest("2")
  bankAccountAggregatesRef ! DepositRequest("1", 1000)
  bankAccountAggregatesRef ! DepositRequest("2", 2000)
  (bankAccountAggregatesRef ? GetBalanceRequest("1")).onComplete(println)
  (bankAccountAggregatesRef ? GetBalanceRequest("2")).onComplete(println)

  sys.addShutdownHook {
    system.terminate()
  }
}

ReadJournalを使ってReadサイドを作ってみる

WriteサイドでJournalに溜め込んだコマンドの集まりからReadModelを作る

編集中...

コメント

タイトルとURLをコピーしました