はじめに
前回は、リアクティブシステムを作るためのプラットフォームとして、Lightbend Reactive Platformを紹介しました。Lightbend Reactive PlatformはJVM上で動くリアクティブシステムを作るための統合プラットフォームで、Play FrameworkやAkkaなど様々なプロダクトから構成されます。前回のPlay FrameworkとSlickに引き続き、今回はAkkaを解説していきたいと思います。
本題に入る前に前回のおさらいをしておきましょう。Akkaはアクター同士がメッセージパッシングでやり取りを行います。そのため、それぞれのアクターの処理を同時に実行できることや、アクターはヒエラルキーを構成し障害の影響を最小限に抑えることができることなど、様々な恩恵を受けることができました。また、アクターを追加したり削除したりすることにより、容易にスケールアウトやスケールインにも対応でき、メッセージの送信先となるアクターは、ローカルのJVM上にいても、別のJVM、あるいは別のサーバにいても同じようにメッセージを送ることができるという位置透過性の特徴も持っています。本記事では、こういった特徴を次の流れに沿って紹介していきます。
- Akkaとは
- メッセージの送信とふるまいの定義
- アクターとスレッド
- Akkaのレジリエンス
- 位置透過性
- メッセージの振り分け
- ふるまいの変更
分量が多いので、まず今回の記事では「メッセージの送信とふるまいの定義」を説明いたします。
Akkaとは
Akkaは、並行・分散アプリケーションを構築するための強力なツールキットです。Lightbend Reactive Platformでリアクティブを実現するための最も重要なプロダクトといえるでしょう。コンポーネント同士がメッセージをやり取りするメッセージ駆動(Message Driven)のアーキテクチャを持っており、リアクティブシステムの「高レスポンス(Responsive)」、「レジリエンス(Resilient)」、「伸縮性(Elastic)」を支えています。Akkaでは、メッセージをやり取りするコンポーネントをアクターといい、そのアーキテクチャをアクターモデルと呼びます。
アクターモデル
アクターモデルは、並行計算の数学的モデルの一種です。アクターは並行的に受信するメッセージに対応した次のようなふるまいを備えています。そのふるまいに逐次性は前提とされておらず、並列的にこれらを実行します(Wikipedia)。
- (他の)アクターに有限個のメッセージを送信する
- 有限個の新たなアクターを生成する
- 次に受信するメッセージに対する動作を指定する
従来型の多くの計算モデルは、基本的に逐次的に実行するのに対して、アクターモデルは本質的に並行性を備えている点が異なります。それぞれのアクターが互いに独立して並行にメッセージを処理します。冒頭から少し小難しい説明になってしまいましたので、レジ係アクターがバリスタアクターにコーヒーのドリップをお願いするユースケースで流れを整理してみましょう。
アクターの機能をコーヒーショップにたとえると
アクターモデルでは、アクターを生成し、アクターには受信したメッセージに対する動作を指定します。(1)アクターは他のアクターにメッセージを送信し、(2)メッセージを受信したアクターは一旦「メールボックス」と呼ばれるキューにメッセージを格納します。そして(3)メールボックスに届いたメッセージに対する動作を実行します。メッセージ送信者はその応答を待つことなく(2')次の作業を行い、メッセージを受信したアクターは自分のメールボックスにあるメッセージをただひたすら処理していくというモデルです。
アクターモデルはErlangによって世に広められ、Ericsson社から稼働率99.9999999%(nine nines)という信頼性の高いシステムの構築に成功したという事例も発表されています。
メッセージの送信とふるまいの定義
Akkaでメッセージを送信する際のアクターに対する操作は、ActorRefを介して行います。
メッセージ送信にはActorRefが介在する
ActorRefを仲介することにより、メッセージの受信側アクターが同一サーバ上にあっても、ネットワークを介した別サーバにあっても同じように実装できることや、障害等でインスタンスを作りなおしたとしてもメッセージを届けることができること、さらにはメッセージの宛先としてActorの代わりにシリアライズ化できるなど、様々な恩恵があります。
また、アクターにはミュータブルな状態を持つことができるのに対して、アクター間をやり取りするメッセージはイミュータブルであるべきです。アクターモデルを使用することにより、並行性を意識せず並行処理を実装することができますが、メッセージをミュータブルにしてしまうと異なるスレッドで同じ値を変更できるようになってしまい、並行性を意識しなければならなくなります。
Akkaでのメッセージ送信はActorRefの「!」を使用します。メッセージはイミュータブルかつ、シリアライザブルである必要があり、次のように様々な型を定義することができます。
リスト1:メッセージ送信は「!」
1 | private val barista : ActorRef = createBarista() |
5 | barista ! Order( "Coffee" , 2 ) |
アクターは、receiveメソッドにメッセージを受信した時のふるまいを定義します。そのふるまいはメッセージの型や値に応じて定義することができます。Scalaではパターンマッチを使用することで、次のようにシンプルに記述することができます。
リスト2:メッセージの型や値によりふるまいを変えるアクター
01 | import akka.actor.{Actor, ActorLogging} |
03 | class Barista extends Actor with ActorLogging { |
05 | override def receive : Receive = { |
06 | case product : String = > |
07 | log.info(s "Received your order: $product" ) |
09 | log.info(s "Received your order: $count" ) |
11 | log.info( "Received your order." ) |
パターンマッチ
ここで少しScalaの話になりますが、Scalaの強力なパターンマッチにより様々な場合分けができます。上述の例のように「String」や「Int」のように型でマッチングさせる他、「_」を使ってすべての場合に処理させることもできますし、Ifでガード条件を追加したり、リテラル値そのものや変数の値を条件にすることもできます。
ガード条件による判定
リスト3:ガード条件による判定
2 | case count : Int if (count < 10 ) = > |
リテラル値でマッチング
変数の値でマッチング
また、ケースクラスでマッチングし、そのプロパティを変数にバインドしたり、ケースクラスそのものをバインドしたりと様々な条件を定義することができます。
ケースクラスでマッチング
リスト6:ケースクラスでマッチング
2 | case Order(product, count) = > |
3 | log.info(s "product: $product" ) |
4 | log.info(s "count : $count" ) |
マッチしたケースクラスを変数にバインド
リスト7:マッチしたケースクラスを変数にバインド
2 | case order @ Order( "Coffee" , _ ) = > |
3 | log.info(s "product: ${order.product}" ) |
4 | log.info(s "count : ${order.count}" ) |
他にも正規表現等、様々な方法でマッチングすることができるため、パターンマッチを利用して、その条件に応じたアクターのふるまいを実装することができます。
アクター間のメッセージのやり取りにおいて注意すべき点として、メッセージは型安全ではないことが挙げられます。送信側は、あらゆる型をメッセージとして渡すことができます。そのため、メッセージプロトコルとしてアクターのコンパニオンオブジェクトにケースクラスを定義し、使用するとよいでしょう。アクターが処理可能なメッセージの型を明示しておくことで、送信側が誤った型のメッセージを送信することの抑止に繋がります。
メッセージを受信するアクター
リスト8:メッセージを受信するアクター
04 | case class Order(product : String, count : Int) |
08 | class Barista extends Actor with ActorLogging { |
09 | override def receive : Receive = { |
10 | case Order(product, count) = > |
11 | log.info( "Receive your order." ) |
メッセージを送信するアクター
リスト9:メッセージを送信するアクター
1 | barista ! Barista.Order( "Coffee" , 2 ) |
処理されないメッセージ
receiveメソッドに定義したどの条件にもマッチしないメッセージが送られてきた場合はどうなるのでしょうか? この場合は、処理されない(できない)メッセージを意味する「UnhandledMessage」がイベントストリームにパブリッシュされます。また、宛先となるアクターシステムがストップしていたり、存在しないアクターにメッセージを送ろうとしたりすると、そのメッセージはデッドレター(DeadLetter)となります。DeadLetterもUnhandledMessageと同様にイベントストリーム内にパブリッシュされ、サブスクライブすることができます。これらは、パブリッシュすると同時にログに出力され、多くの場合はプログラムに対して何らかの対処が必要となります。実際の運用では、これらの状況をLightbend Monitoring等を使用して監視しておくとよいでしょう。
メッセージの応答
先に紹介したとおり、アクターはメッセージ送信後その応答を待つことなく非同期に異なる処理を実行します。では、送信したメッセージの結果を受け取りたい場合は、どうすればよいのでしょうか?
一つの方法は、送信先が送信元に対してメッセージを返信するというものです。その際、送信元のアドレスはActorRefとしてメッセージとともに渡されるため、受信側は「sender()」というメソッドで取得できるActorRefに対してメッセージを返信することで、応答できます。送信元は、返信メッセージを受信した際のふるまいをreceiveメソッドに定義するだけです。なお「!」はtellメソッドのエイリアスであり、そのためtellパターンと呼ばれます。
tellパターン(送信側)
リスト10:送信側のtellパターン
1 | barista ! Barista.Order( "Coffee" , 2 ) |
tellパターン(受信側)
リスト11:受信側のtellパターン
1 | override def receive : Receive = { |
2 | case Order(product, count) = > |
4 | log.info(s "Your order has been completed. (product: $product, count: $count)" ) |
6 | sender() ! OrderCompleted( "ok" ) |
もう一つの方法は、askパターンです。前述の「!」の代わりに「?」でメッセージを送信します。「?」はaskメソッドのエイリアスなので、askパターンと呼ばれます。askパターンではFutureをレスポンスとして得ることができます。前回紹介したとおり、Futureを得ると応答があった後の未来の処理としてふるまいを定義することができます。その際、応答を無限に待ち続けるわけにはいきませんので、必ずタイムアウトを設定するようにしましょう。ただし「?」はFutureの生成などを行う分、「!」に比べコストがかかる処理になるため、必要性を十分に考慮したうえで使用しましょう。
実際には「!」を使用し、「Fire and Forget」で設計するほうが、メッセージを送る時点で、そのメッセージがどのような結果を及ぼすかまで考慮する必要がなくなり、実装がシンプルになります。
askパターン(送信側)
リスト12:送信側のaskパターン
01 | import scala.concurrent.duration. _ |
02 | import akka.util.Timeout |
03 | import akka.pattern.ask |
06 | implicit val timeout = Timeout( 5 seconds) |
08 | val response : Future[Any] = barista ? Barista.Order( "Coffee" , 2 ) |
11 | response.mapTo[OrderCompleted] onComplete { |
12 | case Success(result) = > |
13 | log.info(s "result: ${result.message}" ) |
15 | log.info(t.getMessage) |
また、リアクティブシステムを設計するに際しては、メッセージのフローは一方向に流れることがベストです。双方向でやり取りすると無駄なメッセージが発生したり、リアルタイム性を失ったり、システムを複雑にする可能性がありますので、そういった場合はアクターの責務が適切か? 分割することができないかをもう一度検討するとよいでしょう。