Akka Streamsで実装するリアクティブストリーム

2018年3月7日(水)
前出 祐吾
今回と次回の2回に分けて、Akka Streamsを使って実装するリアクティブストリームについて解説する。

もうひとつの出力値

Akka Streamsにはソースとシンク、フローを流れるデータ要素とは別に、グラフ実行時に得られるマテリアライズされた値という、もうひとつの出力値があります。

マテリアライズされた値

マテリアライズされた値

説明のため先ほどのソースコードに少し手を加え、さらに型も記述します(Scalaは型推論の機能を持っているため、記述は必須ではありません)。シンクはfoldを使用し、足し算をした結果を返すようにします。

参考までにScalaのコレクションで同様の操作をした場合は、以下の結果が返ります(リアクティブストリームで提供するmapfoldなどのメソッドは、コレクションのそれとは違うので、結果のイメージに留めてください)。

リスト3:1から10のコレクションの要素を足し算

scala> (1 to 10).fold(0)(_ + _)
res0: Int = 55

リスト4:修正したソースコード

// ソースを生成
val source: Source[Int, NotUsed] = Source(1 to 10)

// シンクを生成
val sink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _)

// ソースをシンクに繋いでRunnableGraphを生成
val runnableGraph: RunnableGraph[Future[Int]] = source.toMat(sink)(Keep.right)

// RunnableGraphを実行
val result: Future[Int] = runnableGraph.run()

implicit val executionContext = system.dispatcher
result.foreach { r =>
  println(s"result:$r")
  // アクターシステムを終了
  system.terminate()
}

ソースの型はSource[Int, NotUsed]、シンクの型はSink[Int, Future[Int]]で、それぞれ2つの型引数を持ちます。1つ目はソースから出力する要素の型とシンクに入力する要素の型を示しています。ここでは、1から10の数値がソースから出力され、シンクに入力されるので共にInt型です。2つ目はマテリアライズされた値(materialized value)と呼ばれるもので、グラフの実行時に得られる補助値です。たとえば、処理結果の値やファイルI/O処理の成功/失敗の状態などが得られます。ソースは数値を出力するだけなので、マテリアライズされた値は使用しません。この場合、使用されていないことを示すNotUsedという型になります。一方シンクはFuture[Int]でグラフを実行するとマテリアライズされた値として、foldの結果を取得できます。型はIntで非同期に実行されるので、Futureに包まれた形になります。

マテリアライズされた値

マテリアライズされた値

上記の例ではソースのマテリアライズされた値はNotUsedでしたが、値が提供される場合もあります。たとえば、ファイルからソースを生成する場合はFileIO.fromPathを使用します。生成するSourceの2つの型引数はそれぞれ、出力するデータとしてByteString、マテリアライズされた値としてFuture[IOResult]となります。IOResultは、ファイルの読み込み結果を表す値です。マテリアライズされた値を利用することで、ファイルの読み込み結果に応じた処理も実装できます。

リスト5:ファイルを読み込むソース

val source: Source[ByteString, Future[IOResult]]
              = FileIO.fromPath(Paths.get("README.md"))

さきほどの、コードを実行してみましょう。RunnableGraphを生成・実行され、Future[Int]が返ってきました。この値は「55」で、コレクション操作で確認した値と同じ結果となることがわかります。

リスト6:実行結果

[info] result:55

55(Future[Int])は、シンクのマテリアライズされた値です。これは、ソースとシンクをつなげてマテリアライズするときにtoMatの引数に、シンクに加えKeep.rightという値を渡していたためです。Keep.rightは右側、つまりシンク側のマテリアライズされた値を保持することを示します。同様にKeep.leftと指定することで、左側、つまりソース側のマテリアライズされた値を保持することもできます。

ソースのマテリアライズされた値がFuture[IOResult]、シンクの同値がFuture[Done]の場合を見てみましょう。Doneforeachなど戻り値がない場合に使用します(Javaからも同じ型が利用できるようUnit型ではありません)。

リスト7:ソースとシンクのマテリアライズされた値

val source: Source[ByteString, Future[IOResult]]
              = FileIO.fromPath(Paths.get("README.md"))
val sink:   Sink[ByteString, Future[Done]]
              = Sink.foreach(b => println(b.utf8String))

Keep.leftを指定した場合は、ソースのマテリアライズされた値であるFuture[IOResult]がRunnableGraphに保持されます。

リスト8:ソースのマテリアライズされた値を保持する場合

val graph: RunnableGraph[Future[IOResult]]
             = source.toMat(sink)(Keep.left)

Keep.rightを指定した場合は、シンクのマテリアライズされた値であるFuture[Done]がRunnableGraphに保持されます。

リスト9:シンクのマテリアライズされた値を保持する場合

val graph: RunnableGraph[Future[Done]]
             = source.toMat(sink)(Keep.right)

Keep.bothを指定すると、両方のマテリアライズされた値がRunnableGraphに保持されます。

リスト10:ソースとシンク両方のマテリアライズされた値を保持する場合

val graph: RunnableGraph[(Future[IOResult], Future[Done])]
             = sourcae.toMat(sink)(Keep.both)

ソースとシンクを繋げるtoメソッドはKeep.leftが内部で指定されているため、ソースのマテリアライズされた値であるFuture[IOResult]がRunnableGraphに保持されます。

リスト11:toメソッドを使用する場合

val graph: RunnableGraph[Future[IOResult]] = source.to(sink)

エラー処理

Akka Streamsのグラフ実行時に例外が発生した場合のふるまいを見ていきましょう。デフォルトのふるまいとしては、例外が発生した時点でストリーム処理を中断します。動作を確認するために要素の値が3の場合に「IllegalArgumentException」が発生するflowを定義した、次のようなソースコードを使用します。

リスト12:要素の値が3の時に例外が発生するグラフ

// ソースを作成
val source = Source(1 to 5)
// シンクを作成
val sink = Sink.foreach[Int](e => println(s"element=$e"))
// フローを作成
val flow = Flow[Int].map{e =>
  // 要素の値が3の場合に例外を発生させる
  if (e == 3) throw new IllegalArgumentException("oops!") else e
}
// ソースをシンクに繋いでRunnableGraphを生成
val runnableGraph = source.via(flow).toMat(sink)(Keep.right)

// RunnableGraphを実行
val result = runnableGraph.run()

result.onComplete { r =>
  println(s"result:$r")
  // アクターシステムを終了
  system.terminate()
}

上記のflowを介さないRunnableGraphを実行した場合と比較してみます。

リスト13:正常に処理された(flowを介さないグラフ)場合の実行結果

[info] element=1
[info] element=2
[info] element=3
[info] element=4
[info] element=5
[info] result:Success(Done)

flowを介さないグラフは例外が発生せず、ソースから流れる最後のデータ要素まで処理され、成功値(Success(Done))で終了しています。

リスト14:要素の値が3の場合に例外を発生させるグラフの実行結果

[info] element=1
[info] element=2
[info] result:Failure(java.lang.IllegalArgumentException: oops!)

これに対して、例外を発生させた場合は、2番目のデータ要素まで処理されたあと例外が発生し、異常値(Failure(java.lang.IllegalArgumentException: oops!))で終了します。

エラーからの回復

グラフ実行時にrecoverメソッドを使用すると、例外発生時に適切な回復処理を実行できます。ここではログを出力したあと、戻り値に「recovered」という文字列を設定します。

リスト15:recoverメソッドでリカバリ

// RunnableGraphを実行
val result = runnableGraph.run().recover {
  case _: IllegalArgumentException =>
    println("oops! IllegalArgumentException")
    "recovered"
}

リスト16:実行結果

[info] element=1
[info] element=2
[info] oops! IllegalArgumentException
[info] result:Success(recovered)

グラフを実行すると処理は中断されましたが、成功値(Success(recovered))で終了するようになりました。

代替ソースによる回復

例外が発生したときに何らかの処理を行ったあとにただ終了するのではなく、問題のあったソースの代わりに代替ソースを使用し、処理を継続させることもできます。グラフコンポーネントに対してrecoverWithRetriesを適用し、障害から回復したあとの再実行で使用する代替ソースを指定します。

リスト17:recoverWithRetriesを適用

val flow = Flow[Int].map(e =>
  if (e == 3) throw new IllegalArgumentException("oops!") else e
).recoverWithRetries(1, {
  case _: IllegalArgumentException => Source(List(30,40))
})

リスト18:実行結果

[info] element=1
[info] element=2
[info] element=30
[info] element=40
[info] result:Success(Done)

例外発生後、代替ソース(List(30,40))で処理が継続されました。

スーパーバイザー戦略

代替ソースではなく、そのままのソースでストリーム処理を継続したい場合は、Akkaのアクターと同じようにスーパーバイザー戦略を指定できます。指定できる戦略にはResume(状態維持)、Stop(停止)、Restart(状態破棄)があります。スーパーバイザー戦略はマテリアライザー、またはグラフコンポーネントに指定できます。デフォルトではStopが適用されますので、すでに見たように例外が発生すると失敗値(Failure)で終了します。ここでは、Resumeを指定して、例外が発生したあとも処理を継続するようにします。

リスト19:スーパーバイザー戦略にResumeを指定

// アクターシステムを生成
implicit val system = ActorSystem("simple-stream")

// Supervision.Deciderの定義
val decider: Supervision.Decider = {
  case _: IllegalArgumentException => Supervision.Resume
  case _                           => Supervision.Stop
}
// マテリアライザーにスーパーバイザー戦略を適用
implicit val materializer = ActorMaterializer(
  ActorMaterializerSettings(system).withSupervisionStrategy(decider))

IllegalArgumentExceptionが発生したときの対処として、Supervision.Resumeを設定したSupervision.Deciderを定義しました。withSupervisionStrategyを使用して、定義したDeciderActorMaterializerに適用します。実行結果は次のようになります。

リスト20:実行結果

[info] element=1
[info] element=2
[info] element=4
[info] element=5
[info] result:Success(Done)

ストリーム処理で例外が発生した3番目のデータ要素を除き、最後のソースまで処理されていることがわかります。結果は成功値(Success(Done))で終了しています。withAttributes(ActorAttributes.supervisionStrategy(decider))を使用して、グラフコンポーネントごとにスーパーバイザー戦略を指定することもできます。

リスト21:グラフコンポーネントにスーパーバイザー戦略を指定

// Supervision.Deciderの定義
val decider: Supervision.Decider = {
  case _: IllegalArgumentException => Supervision.Stop
  case _                           => Supervision.Stop
}

// フローを作成
val flow = Flow[Int].map { e =>
  // 要素の値が3の場合に例外を発生させる
 if (e == 3) throw new IllegalArgumentException("oops!") else e
}.withAttributes(ActorAttributes.supervisionStrategy(decider))
TIS株式会社

生産技術R&D室所属。これまで社内向けWebアプリケーションフレームワークの開発などシステム開発の効率化に取り組んできた。現在はリアクティブシステムをエンタープライズの分野に適用するための技術検証を行う傍ら、Lightbendの認定コンサルティングパートナーとして、同技術を用いたPoCや開発の支援などに従事している。

リアクティブシステムが日本国内で広く活用されることでエンジニアが障害対応から解放されることを願い、執筆/講演活動、Akkaのドキュメント翻訳などにも取り組んでいる。

連載バックナンバー

設計/手法/テスト技術解説
第9回

Akka HTTPでWeb APIに仕立てる

2018/9/20
第9回目となる今回は、Akkaの拡張機能Akka HTTPを用いてWeb APIを作成する方法を紹介します。
開発言語
第8回

Akka Streamsで実装するリアクティブストリーム(その2)

2018/3/13
前回に引き続き、Akka Streamsを使って実装するリアクティブストリームについて解説する。
開発言語技術解説
第7回

Akka Streamsで実装するリアクティブストリーム

2018/3/7
今回と次回の2回に分けて、Akka Streamsを使って実装するリアクティブストリームについて解説する。

Think ITメルマガ会員登録受付中

Think ITでは、技術情報が詰まったメールマガジン「Think IT Weekly」の配信サービスを提供しています。メルマガ会員登録を済ませれば、メルマガだけでなく、さまざまな限定特典を入手できるようになります。

Think ITメルマガ会員のサービス内容を見る

他にもこの記事が読まれています