PR

リアクティブシステムを実現するツールキットAkka その2

2017年3月2日(木)
前出 祐吾
前回に引き続き、Lightbend Reactive Platformでリアクティブシステムを構築する際のキーとなるツールキットAkkaについて紹介する。

前回に引き続き、Lightbend Reactive Platformでリアクティブシステムを構築する際のキーとなるコンポーネントAkkaについて解説します。

アクターとスレッド

読者の皆さんの中には、システム開発の終盤にパフォーマンステストを実施し、目標としていたレスポンスタイムを実現することができず、マルチスレッド処理と対峙せざるを得なかったという経験をお持ちの方も少なくないでしょう。ムーアの法則が終焉に向かっているといわれる昨今、半導体の性能向上が頭打ちになるだろうと言われている一方で、世の中のデジタル化の進行により、ソフトウェアに求められる要求は高まるばかりです。その要求をハードウェアの性能向上により解決できないとなれば、メニーコアをフル活用するため、スケールアップやスケールアウトを実現する必要があります。

これまでソフトウェアエンジニアの頭を悩ませてきたマルチスレッド処理の実装は、避けることができないでしょう。そこで、前回の記事の冒頭で「アクターモデルは本質的に並列性を備えている」というお話をしました。このアクターモデルこそが、ムーアの法則終焉という時代に対峙するソフトウェアへのキラーアーキテクチャになり得るかもしれません。

アクターモデルがどのようにマルチスレッド処理を実現するかを知るために、まずはアクターモデルを使わずに並行処理を実装した場合を考えてみましょう。コーヒーショップにボブとアリスの二人が訪れました。ボブ、アリスの順に在庫数が1点しかないプレミアマグカップを注文したとします。最新の在庫数をメモリ上に保持し、注文の際にチェックします。 最初にボブが注文すると在庫は0となり、次にアリスが注文しようとすると在庫不足のため注文できません。

しかし、二人がほぼ同時に注文するとどうなるでしょう?ボブが注文した時も、アリスが注文した時もマグカップの在庫は1だったので、両者の注文を受けてしまいました。これでは在庫がマイナスになってしまいます。

ボブとアリスがほぼ同時に注文すると在庫はマイナスに?

ボブとアリスがほぼ同時に注文すると在庫はマイナスに?

通常このようなことを防ぐために、ロックを行います。ボブの注文を処理する間、在庫をロックすることでアリスはロックの解放後、つまりはボブの処理が終わった後に自分の処理をしようとするため、在庫不足に気づき注文を受け付けずに済みます。

最初の注文処理の間に在庫をロック

最初の注文処理の間に在庫をロック

それだけといえばそれだけなのですが、ロックの対象が複数になれば、デッドロック等の考慮も必要になります。扱うデータが増えたり、機能が増えたりすることにより、その複雑さは指数関数的に増していくことになります。また、実装もさておきながら、テストも複雑になり、バグの温床になることでしょう。それでも、目標とするレスポンスタイムを満たすためには、この複雑さに立ち向かうしかありません。

ロック不要の並行処理

一方Akkaでは、このような考慮は必要ありません。アクターモデルを用いることで、明示的なロックやスレッド管理という労力を軽減した上で、信頼性のある並行処理を実現するシステムを構築することができます。なぜなら、決してアクター間で状態を共有しないシェアードナッシングの特性を持っているためです。仮にアクターが在庫数のようなミュータブルな状態を持ったとしても、その在庫を変更するのは1つのスレッドで動作する1つのアクターのみであるため、ロックや同期を気にせず、逐次処理のように実装することができます。

アクターモデルではロックが不要に

アクターモデルではロックが不要に

アクターは、メールボックスに届いたメッセージを到着順に処理します。ミュータブルな状態を持っていたとしても、1つのアクターで同時に利用されるのは1スレッドのみですので、並行処理の実装にありがちな複雑なロック制御が必要ありません。

メッセージは到着順に逐次処理される

メッセージは到着順に逐次処理される

バリスタアクターが注文数をカウントする場合はマルチスレッドを意識する必要がないため、以下のようなシンプルな実装になります。

リスト1:バリスタアクターの実装例

class Barista(offset: Int) extends Actor with ActorLogging {
  private var orderCount = offset // 注文数
  override def receive: Receive = {
    case Order(product, count) =>
        orderCount += count // 受信した注文数を加算
        log.info(s"Receive your order: $product, $count. The number of orders: $orderCount ")
        sender() ! "Received your order."
  }
}

ではアクターが使用するスレッドは、どのように管理されるのでしょうか? ディスパッチャにより異なりますので、ここではデフォルトのふるまいを紹介します。

アクターが使用するスレッドの管理例

アクターが使用するスレッドの管理例

スレッドプールを利用し、メールボックスにメッセージが届いたらプールされたスレッドを使用し処理を行い、処理を終えると開放します。メールボックスが空になるとスレッドを開放しますが、自身のメールボックスに複数のメッセージがキューイングされている場合はどうなるのでしょうか? この場合、連続して処理するメッセージ数が一定数を超えたタイミングで開放します。たとえば、連続して処理するメッセージ数が5(初期値)の場合は、5件まで連続して処理した後、開放し、別のアクターが利用できるようになります。この値が小さすぎるとコンテクストスイッチが頻繁に発生しオーバーヘッドが問題になり、大きすぎると一部のアクターにスレッドを専有され、別のアクターに遅延が発生する可能性があります(アクターに対してスレッドの数が少ない場合)。

ディスパッチャの選択を含め、このあたりは、アクター数、スレッド数、メッセージ数、メッセージあたりの処理量等を考慮しチューニングしていくことにより、性能を確保することができます。

Akkaのレジリエンス(耐障害性)

次に、アクターでエラーが発生した場合を見ていきましょう。エラーには様々な種類がありますが、説明を簡単にするためここでは業務的に発生が想定できる「業務エラー」とシステム都合で発生する「システム障害」の2種類に分けて考えることとします。

コーヒーショップを例に考えてみます。お客様からの注文を受けたレジ係がバリスタへ「コーヒーを作って下さい」とお願いした際に、豆がなくなってしまってコーヒーを作れない場合、バリスタは依頼元のレジ係へ「在庫切れです」と返答します。そうすることで、レジ係がお客様に状況を伝え、別の商品を選ぶ、あるいは、注文をやめるなど次のアクションが可能になります。これが業務エラーの場合の流れです。

バリスタからレジ係へ在庫切れのメッセージが送られる

バリスタからレジ係へ在庫切れのメッセージが送られる

では、システム障害が発生した場合はどうなるでしょうか? アクターモデルの基本概念として「すべてのものはアクターである」といわれていますが、それぞれのアクターはまったく関連性のないものではなく、親子関係を持ちヒエラルキーを構成します。親アクターはスーパーバイザーとして子アクターを監視し、子アクターに問題があった時はその制御を親アクターに委譲します。例えば、キッチンアクターの子アクターであるパティシエアクターでシステム障害が発生した場合は、その対処をスーパーバイザーである親アクターにお願いすることになります。これにより、レジ係はパティシエアクターで起きたシステム障害に関与する必要がなくなります。

また、このスーパーバイザーヒエラルキーにより、障害を部分的に(ここではキッチン内で)食い止め、スーパーバイザーグループの外に波及させないというメリットがあります。このためにも、障害が発生し得るリスクのある処理は可能な限り子アクターに任せ、スーパーバイザーは子アクターの監視に徹し、スーパーバイザーで障害が発生する可能性を最小限に留めるべきです。

親アクターは子アクターを監視し、障害の波及を防ぐ

親アクターは子アクターを監視し、障害の波及を防ぐ

では、スーパーバイザーである親アクターは、子アクターからエスカレーションされた障害をどのように制御するのでしょうか? Akkaでは4つの制御方法が提供されており、障害の内容に応じて選択することができます。

Akkaが提供する障害の制御方法

種類制御内容
Resume内部状態を維持したまま下位のアクターを再開
Restart内部状態をクリアして下位のアクターを再起動
Stop下位のアクターを永久に停止
Escalate失敗を上位アクターにエスカレート

また、スーパーバイザーが再開したり、ストップしたりする制御対象において、2つの戦略を持ちます。One-For-OneとAll-For-Oneです。例えば、NullPointerExceptionというシステム障害が発生した時にアクターをリスタートしたいという場合、リスタートする対象はどのアクターになるか? ということです。

One-For-Oneの場合は、その名の通り問題のあったアクターのみをリスタートするのに対し、All-For-Oneの場合は、親アクター配下に構成されるヒエラルキー内の全アクターをリスタートすることになります。どちらの戦略を選択するかは、アクター間の関係性により決まります。あるいは、ヒエラルキーの外に影響をおよぼす問題となれば、Escalateにより、さらに親アクターに制御を委譲することになるでしょう。

One-For-One戦略で障害を制御

リスト2:One-For-One戦略による障害(例外)制御

import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy._
import scala.concurrent.duration._

override val supervisorStrategy =
  OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
    case _: ArithmeticException      => Resume
    case _: NullPointerException     => Restart
    case _: IllegalArgumentException => Stop
    case _: Exception                => Escalate
  }

このような障害制御の仕組みを持つAkkaは、「起こりうるすべての障害を完全に予測し、絶対に障害が起きないシステム」を作るのではなく、「想定外の障害も含め障害が起きた際にいかに対処するか」を重要視しているため、そのコンセプトは「let it crash」と呼ばれています。障害が発生した際に部分的にクラッシュさせてその障害を隔離し、システム全体は動かし続け、エンドユーザーにはできる限り何ごともなかったように見せ続け、復旧させるという仕組みこそ、リアクティブシステムの要素の1つであるレジリエンスなのです。さらに、分散システムにおいてレジリエンスを実現するために、Akkaクラスタという強力な機能がありますが、こちらは別の機会に紹介します。

位置透過性(Location Transparency)

Akkaのアクターは同じJVM内だけでなく、「akka-remote」というモジュールを使って、異なるJVM、異なるサーバにメッセージを送信することができます。そこで、Akkaは位置透過性という特徴を持っていて、メッセージの送信先がローカルマシン内であっても、ネットワークを介した別マシン内であっても、同じようにメッセージを送ることができます。これはAkkaのメッセージ送信が、分散環境で動作することを前提で設計されているためです。

例えば、開発中はローカルシステム内でメッセージのやり取りをし、プロダクション環境では異なるサーバにアクターシステムを配置するということもできます。また、システムの性能に問題があった場合はボトルネックとなる要因に応じた対処が必要ですが、アクターを別サーバに配置しスケールアウトさせることもできます。メッセージ送信は「!」(あるいは「?」)を使用していますので、サーバを跨いだやり取りをしている(可能性のある)場所は明確です。ここでは詳しく述べませんが、ネットワーク越しの通信によるレイテンシや様々なメッセージの信頼性の違いに対する十分な考慮が必要です。

メッセージの振り分け

並行処理の難しさについては前述しましたが、アクターモデルを使わない場合は、より低レベルな実装でマルチスレッド処理を実現しスケールアップを図ってきました。一方Akkaでは、アクターを追加することで容易にスケールアップにも対応でき、さらには、位置透過性の特徴を備えていますので、スケールアウトやスケールインも可能です。また責務によりアクターを分割して、一部のアクターがボトルネックとなりパフォーマンスが劣化している場合は、その部分を並行に処理するようにしたり、分散させたりすることでボトルネックを取り払うといった対処が可能となります。

ボトルネックを並行・分散処理することで解消する

ボトルネックを並行・分散処理することで解消する

Akkaでは、ルーターを介して宛先となるアクターへメッセージを振り分けることができます。宛先となるアクターはPoolとGroupの2種類があります。Poolはルーターが作成したアクター、つまりルーターの子アクターが振り分けの対象となります。それに対してGroupは別途作成したアクターでグループを作り、メッセージを振り分けます。そして、このPoolやGroup内のアクターへの振り分け方法は、ランダムに振り分けたり、暇をしているアクターに振り分けたりと様々ですので、どのような方法で振り分けができるのかを以下に紹介します。

メッセージ振り分けの方法

種別振り分け方法
RoundRobinメッセージを順番に振り分ける
Randomメッセージをランダムに振り分ける
Balancing忙しいアクターから仕事のないアクターへメッセージの再振り分けを行う
SmallestMailbox次の優先順位でメッセージを振り分ける
 1.メールボックスが空、かつ仕事をしていないアクター
 2.メールボックスが空のアクター
 3.メールボックス内のメッセージが少ないアクター
 4.リモートアクター(メールボックス内のメッセージ量が分からないため)
Broadcastすべてのアクターにメッセージを送る
ScatterGatherFirstCompletedすべてのアクターにメッセージを送り、最初の応答を送り主に返し、それ以外は捨てる
TailChopping任意の間隔を空けてすべてのアクターにメッセージを送り、最初の応答を送り主に返し、それ以外は捨てる
ConsistentHashing送信メッセージに基づき、コンシステントハッシュ法を使って選んだアクターに振り分け

このように複数のアクターを使用し単に並行処理を可能にするというだけでなく、並行処理を可能にしたアクターに対するメッセージの振り分け方法もアクターの特性に応じてチューニングできます。

ふるまいの変更

Akkaのアクターは、メッセージを受信した時のふるまいをメッセージの内容(型や値など)ごとに定義できることは前述しましたが、それ以外にもbecome()メソッドやFSM(Finite State Machine)を使うことで、状態によって異なるふるまいを定義できます。

例えば、バリスタアクターはOrder型のメッセージを受けたらコーヒーを作ってその結果を返しますが、「オープン状態の時だけコーヒーを作り、クローズ状態の時は注文だけ受け付けて、オープン後にコーヒーを作るといったように処理する」というように、アクターの状態によってふるまいを変えることができます。

オープン状態の場合

オープンの場合のバリスタアクターのふるまい

オープンの場合のバリスタアクターのふるまい

クローズ状態の場合

クローズの場合のバリスタアクターのふるまい

クローズの場合のバリスタアクターのふるまい

まず、become()メソッドを使った例を紹介します。メッセージプロトコルとしてOrderの他に、OpenケースクラスとCloseケースオブジェクトを追加します。バリスタクラスにはオープン状態とクローズ状態それぞれのふるまいを定義します。

  • オープン状態の場合
    • Orderを受信すると、「Receive your order: <商品名>, <注文数>」というログを 出力
    • Closeを受信すると、become()メソッドによりクローズ状態に遷移
  • クローズ状態の場合
    • Orderを受信すると、stash()メソッドにより、受信したメッセージを退避
    • Openを受信すると、become()メソッドによりオープン状態に遷移し、蓄えておいたメッセージに対する処理を実行

クローズ状態に受け付けた注文もオープン後に対応するというちょっと親切なバリスタさんになりました。実装例は次のようになります。

状態によってふるまいを変えるアクター

リスト3:状態によりふるまいを変更するアクターの実装例

object Barista {
  // メッセージプロトコルの定義
  case class Order(product: String, count: Int)
  case class Open(bean: Bean)
  case object Close

  // コーヒー豆の種類
  case class Bean(name: String)
}

class Barista extends Actor with ActorLogging with Stash {

  // 初期状態の設定
  override def receive: Receive = close
  // 注文数
  var orderCount = 0

  // オープン状態のふるまい
  def open(bean: Bean): Receive = {
    case Order(product, count) =>
        orderCount += count         // 受信した注文数を加算
        log.info(s"Receive your order: $product, $count.")
        sender() ! s"Received your order. Today's coffee is ${bean.name}."
    case Close =>
        context.become(close)       // クローズへ状態変更
  }

  // クローズ状態のふるまい
  def close: Receive = {
    case Order(product, count) =>
        stash()                     // オーダーを退避しておく
        log.info("I'm closed.")
    case Open(bean) =>
        unstashAll()                // 退避したオーダーを引き戻す
        context.become(open(bean))  // オープンへ状態変更
  }
}

バリスタがオープンするたびに、コーヒー豆の種類が変わるといったような場合、オープンする時に豆の種類を指定することもできます。上記の実装例のように、クローズからオープンに状態変更する時「become(open(bean))」のように、コーヒー豆の種類を指定します。

今回は「Lightbend Reactive Platform」の最も重要なプロダクトであるAkkaについて紹介しました。

なお、本記事で紹介したサンプルコードはGitHub上で公開しています。また、本記事執筆にあたり参考にしたAkkaの公式ドキュメントは、Akka日本語化コミュニティにて鋭意翻訳中です。どなたでもご参加いただけますので、ご興味のある方はGitHub上のAkka日本語翻訳プロジェクトまでアクセス下さい。

参考資料

TIS株式会社

生産革新本部 生産革新部 生産技術R&D室

連載記事一覧

開発言語技術解説
第4回

リアクティブシステムを実現するツールキットAkka その2

2017/3/2
前回に引き続き、Lightbend Reactive Platformでリアクティブシステムを構築する際のキーとなるツールキットAkkaについて紹介する。
開発言語技術解説
第3回

リアクティブシステムを実現するツールキットAkka

2017/2/23
Lightbend Reactive Platformでリアクティブシステムを構築する際のキーとなるツールキットAkkaについて、2回に分けて紹介する。
システム設計技術解説
第2回

Lightbend Reactive Platformによるリアクティブシステムの構築

2016/5/30
「リアクティブ」なシステムを構築するための統合プラットフォーム「Lightbend Reactive Platform」を用いたプログラミングを紹介します。

Think IT会員サービスのご案内

Think ITでは、より付加価値の高いコンテンツを会員サービスとして提供しています。会員登録を済ませてThink ITのWebサイトにログインすることでさまざまな限定特典を入手できるようになります。

Think IT会員サービスのご案内

関連記事