リアクティブシステムを実現するツールキットAkka

2017年2月23日(木)
前出 祐吾
Lightbend Reactive Platformでリアクティブシステムを構築する際のキーとなるツールキットAkkaについて、2回に分けて紹介する。

はじめに

前回は、リアクティブシステムを作るためのプラットフォームとして、Lightbend Reactive Platformを紹介しました。Lightbend Reactive PlatformはJVM上で動くリアクティブシステムを作るための統合プラットフォームで、Play FrameworkやAkkaなど様々なプロダクトから構成されます。前回のPlay FrameworkとSlickに引き続き、今回はAkkaを解説していきたいと思います。

本題に入る前に前回のおさらいをしておきましょう。Akkaはアクター同士がメッセージパッシングでやり取りを行います。そのため、それぞれのアクターの処理を同時に実行できることや、アクターはヒエラルキーを構成し障害の影響を最小限に抑えることができることなど、様々な恩恵を受けることができました。また、アクターを追加したり削除したりすることにより、容易にスケールアウトやスケールインにも対応でき、メッセージの送信先となるアクターは、ローカルのJVM上にいても、別のJVM、あるいは別のサーバにいても同じようにメッセージを送ることができるという位置透過性の特徴も持っています。本記事では、こういった特徴を次の流れに沿って紹介していきます。

  • Akkaとは
  • メッセージの送信とふるまいの定義
  • アクターとスレッド
  • Akkaのレジリエンス
  • 位置透過性
  • メッセージの振り分け
  • ふるまいの変更

分量が多いので、まず今回の記事では「メッセージの送信とふるまいの定義」を説明いたします。

Akkaとは

Akkaは、並行・分散アプリケーションを構築するための強力なツールキットです。Lightbend Reactive Platformでリアクティブを実現するための最も重要なプロダクトといえるでしょう。コンポーネント同士がメッセージをやり取りするメッセージ駆動(Message Driven)のアーキテクチャを持っており、リアクティブシステムの「高レスポンス(Responsive)」、「レジリエンス(Resilient)」、「伸縮性(Elastic)」を支えています。Akkaでは、メッセージをやり取りするコンポーネントをアクターといい、そのアーキテクチャをアクターモデルと呼びます。

アクターモデル

アクターモデルは、並行計算の数学的モデルの一種です。アクターは並行的に受信するメッセージに対応した次のようなふるまいを備えています。そのふるまいに逐次性は前提とされておらず、並列的にこれらを実行します(Wikipedia)。

  • (他の)アクターに有限個のメッセージを送信する
  • 有限個の新たなアクターを生成する
  • 次に受信するメッセージに対する動作を指定する

従来型の多くの計算モデルは、基本的に逐次的に実行するのに対して、アクターモデルは本質的に並行性を備えている点が異なります。それぞれのアクターが互いに独立して並行にメッセージを処理します。冒頭から少し小難しい説明になってしまいましたので、レジ係アクターがバリスタアクターにコーヒーのドリップをお願いするユースケースで流れを整理してみましょう。

アクターの機能をコーヒーショップにたとえると

アクターの機能をコーヒーショップにたとえると

アクターモデルでは、アクターを生成し、アクターには受信したメッセージに対する動作を指定します。(1)アクターは他のアクターにメッセージを送信し、(2)メッセージを受信したアクターは一旦「メールボックス」と呼ばれるキューにメッセージを格納します。そして(3)メールボックスに届いたメッセージに対する動作を実行します。メッセージ送信者はその応答を待つことなく(2')次の作業を行い、メッセージを受信したアクターは自分のメールボックスにあるメッセージをただひたすら処理していくというモデルです。

アクターモデルはErlangによって世に広められ、Ericsson社から稼働率99.9999999%(nine nines)という信頼性の高いシステムの構築に成功したという事例も発表されています。

メッセージの送信とふるまいの定義

Akkaでメッセージを送信する際のアクターに対する操作は、ActorRefを介して行います。

メッセージ送信にはActorRefが介在する

メッセージ送信にはActorRefが介在する

ActorRefを仲介することにより、メッセージの受信側アクターが同一サーバ上にあっても、ネットワークを介した別サーバにあっても同じように実装できることや、障害等でインスタンスを作りなおしたとしてもメッセージを届けることができること、さらにはメッセージの宛先としてActorの代わりにシリアライズ化できるなど、様々な恩恵があります。

また、アクターにはミュータブルな状態を持つことができるのに対して、アクター間をやり取りするメッセージはイミュータブルであるべきです。アクターモデルを使用することにより、並行性を意識せず並行処理を実装することができますが、メッセージをミュータブルにしてしまうと異なるスレッドで同じ値を変更できるようになってしまい、並行性を意識しなければならなくなります。

Akkaでのメッセージ送信はActorRefの「!」を使用します。メッセージはイミュータブルかつ、シリアライザブルである必要があり、次のように様々な型を定義することができます。

リスト1:メッセージ送信は「!」

private val barista: ActorRef = createBarista()

barista ! "Coffee"            // String型
barista ! 2                   // Int型
barista ! Order("Coffee", 2)  // Order型

アクターは、receiveメソッドにメッセージを受信した時のふるまいを定義します。そのふるまいはメッセージの型や値に応じて定義することができます。Scalaではパターンマッチを使用することで、次のようにシンプルに記述することができます。

リスト2:メッセージの型や値によりふるまいを変えるアクター

import akka.actor.{Actor, ActorLogging}

class Barista extends Actor with ActorLogging {

  override def receive: Receive = {
    case product: String =>  // String型のメッセージを受信した場合
        log.info(s"Received your order: $product")
    case count: Int   =>     // Int型のメッセージを受信した場合
        log.info(s"Received your order: $count")
    case _            =>     // String型、Int型以外のメッセージを受信した場合
        log.info("Received your order.")
  }
}

パターンマッチ

ここで少しScalaの話になりますが、Scalaの強力なパターンマッチにより様々な場合分けができます。上述の例のように「String」や「Int」のように型でマッチングさせる他、「_」を使ってすべての場合に処理させることもできますし、Ifでガード条件を追加したり、リテラル値そのものや変数の値を条件にすることもできます。

ガード条件による判定

リスト3:ガード条件による判定

// 10より小さい数値(Int型)の場合
case count: Int if(count < 10) =>

リテラル値でマッチング

リスト4:リテラル値でマッチング

// 1の場合
case 1 =>

変数の値でマッチング

リスト5:変数の値でマッチング

// 変数「product」の値と一致する場合
case `product` =>

また、ケースクラスでマッチングし、そのプロパティを変数にバインドしたり、ケースクラスそのものをバインドしたりと様々な条件を定義することができます。

ケースクラスでマッチング

リスト6:ケースクラスでマッチング

// Order型の場合
case Order(product, count) =>
    log.info(s"product: $product")
    log.info(s"count  : $count")

マッチしたケースクラスを変数にバインド

リスト7:マッチしたケースクラスを変数にバインド

// Order型で第一引数productが「Coffee」の場合
case order @ Order("Coffee", _) =>
    log.info(s"product: ${order.product}")
    log.info(s"count  : ${order.count}")

他にも正規表現等、様々な方法でマッチングすることができるため、パターンマッチを利用して、その条件に応じたアクターのふるまいを実装することができます。

アクター間のメッセージのやり取りにおいて注意すべき点として、メッセージは型安全ではないことが挙げられます。送信側は、あらゆる型をメッセージとして渡すことができます。そのため、メッセージプロトコルとしてアクターのコンパニオンオブジェクトにケースクラスを定義し、使用するとよいでしょう。アクターが処理可能なメッセージの型を明示しておくことで、送信側が誤った型のメッセージを送信することの抑止に繋がります。

メッセージを受信するアクター

リスト8:メッセージを受信するアクター

// バリスタアクターのコンパニオンオブジェクト
object Barista {
  // メッセージプロトコルの定義
  case class Order(product: String, count: Int)
}

// バリスタアクター
class Barista extends Actor with ActorLogging {
  override def receive: Receive = {
    case Order(product, count) =>
        log.info("Receive your order.") // ケースクラスを受信した時のふるまい
  }
}

メッセージを送信するアクター

リスト9:メッセージを送信するアクター

barista ! Barista.Order("Coffee", 2)  // メッセージをケースクラスで送信

処理されないメッセージ

receiveメソッドに定義したどの条件にもマッチしないメッセージが送られてきた場合はどうなるのでしょうか? この場合は、処理されない(できない)メッセージを意味する「UnhandledMessage」がイベントストリームにパブリッシュされます。また、宛先となるアクターシステムがストップしていたり、存在しないアクターにメッセージを送ろうとしたりすると、そのメッセージはデッドレター(DeadLetter)となります。DeadLetterもUnhandledMessageと同様にイベントストリーム内にパブリッシュされ、サブスクライブすることができます。これらは、パブリッシュすると同時にログに出力され、多くの場合はプログラムに対して何らかの対処が必要となります。実際の運用では、これらの状況をLightbend Monitoring等を使用して監視しておくとよいでしょう。

メッセージの応答

先に紹介したとおり、アクターはメッセージ送信後その応答を待つことなく非同期に異なる処理を実行します。では、送信したメッセージの結果を受け取りたい場合は、どうすればよいのでしょうか?

一つの方法は、送信先が送信元に対してメッセージを返信するというものです。その際、送信元のアドレスはActorRefとしてメッセージとともに渡されるため、受信側は「sender()」というメソッドで取得できるActorRefに対してメッセージを返信することで、応答できます。送信元は、返信メッセージを受信した際のふるまいをreceiveメソッドに定義するだけです。なお「!」はtellメソッドのエイリアスであり、そのためtellパターンと呼ばれます。

tellパターン(送信側)

リスト10:送信側のtellパターン

barista ! Barista.Order("Coffee", 2)  // 「!」でメッセージを送信

tellパターン(受信側)

リスト11:受信側のtellパターン

override def receive: Receive = {
  case Order(product, count) =>
      // Baristaからの応答を受け取った時のふるまい
      log.info(s"Your order has been completed. (product: $product, count: $count)")
      // 送信元に注文に対する調理が完了したことを返す
      sender() ! OrderCompleted("ok")
}

もう一つの方法は、askパターンです。前述の「!」の代わりに「?」でメッセージを送信します。「?」はaskメソッドのエイリアスなので、askパターンと呼ばれます。askパターンではFutureをレスポンスとして得ることができます。前回紹介したとおり、Futureを得ると応答があった後の未来の処理としてふるまいを定義することができます。その際、応答を無限に待ち続けるわけにはいきませんので、必ずタイムアウトを設定するようにしましょう。ただし「?」はFutureの生成などを行う分、「!」に比べコストがかかる処理になるため、必要性を十分に考慮したうえで使用しましょう。

実際には「!」を使用し、「Fire and Forget」で設計するほうが、メッセージを送る時点で、そのメッセージがどのような結果を及ぼすかまで考慮する必要がなくなり、実装がシンプルになります。

askパターン(送信側)

リスト12:送信側のaskパターン

import scala.concurrent.duration._
import akka.util.Timeout
import akka.pattern.ask

// タイムアウトの設定
implicit val timeout = Timeout(5 seconds)
// 「?」でメッセージを送信
val response: Future[Any] = barista ? Barista.Order("Coffee", 2)

// 応答があった後の未来の処理
response.mapTo[OrderCompleted] onComplete {
  case Success(result) =>
      log.info(s"result: ${result.message}")
  case Failure(t) =>
      log.info(t.getMessage)
}

また、リアクティブシステムを設計するに際しては、メッセージのフローは一方向に流れることがベストです。双方向でやり取りすると無駄なメッセージが発生したり、リアルタイム性を失ったり、システムを複雑にする可能性がありますので、そういった場合はアクターの責務が適切か? 分割することができないかをもう一度検討するとよいでしょう。

TIS株式会社

生産技術R&D室所属。これまで社内向けWebアプリケーションフレームワークの開発などシステム開発の効率化に取り組んできた。現在はリアクティブシステムをエンタープライズの分野に適用するための技術検証を行う傍ら、Lightbendの認定コンサルティングパートナーとして、同技術を用いたPoCや開発の支援などに従事している。

リアクティブシステムが日本国内で広く活用されることでエンジニアが障害対応から解放されることを願い、執筆/講演活動、Akkaのドキュメント翻訳などにも取り組んでいる。

連載バックナンバー

設計/手法/テスト技術解説
第9回

Akka HTTPでWeb APIに仕立てる

2018/9/20
第9回目となる今回は、Akkaの拡張機能Akka HTTPを用いてWeb APIを作成する方法を紹介します。
開発言語
第8回

Akka Streamsで実装するリアクティブストリーム(その2)

2018/3/13
前回に引き続き、Akka Streamsを使って実装するリアクティブストリームについて解説する。
開発言語技術解説
第7回

Akka Streamsで実装するリアクティブストリーム

2018/3/7
今回と次回の2回に分けて、Akka Streamsを使って実装するリアクティブストリームについて解説する。

Think ITメルマガ会員登録受付中

Think ITでは、技術情報が詰まったメールマガジン「Think IT Weekly」の配信サービスを提供しています。メルマガ会員登録を済ませれば、メルマガだけでなく、さまざまな限定特典を入手できるようになります。

Think ITメルマガ会員のサービス内容を見る

他にもこの記事が読まれています