Akka Streamsで実装するリアクティブストリーム
もうひとつの出力値
Akka Streamsにはソースとシンク、フローを流れるデータ要素とは別に、グラフ実行時に得られるマテリアライズされた値という、もうひとつの出力値があります。
説明のため先ほどのソースコードに少し手を加え、さらに型も記述します(Scalaは型推論の機能を持っているため、記述は必須ではありません)。シンクはfoldを使用し、足し算をした結果を返すようにします。
参考までにScalaのコレクションで同様の操作をした場合は、以下の結果が返ります(リアクティブストリームで提供するmapやfoldなどのメソッドは、コレクションのそれとは違うので、結果のイメージに留めてください)。
scala> (1 to 10).fold(0)(_ + _) res0: Int = 55
// ソースを生成 val source: Source[Int, NotUsed] = Source(1 to 10) // シンクを生成 val sink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _) // ソースをシンクに繋いでRunnableGraphを生成 val runnableGraph: RunnableGraph[Future[Int]] = source.toMat(sink)(Keep.right) // RunnableGraphを実行 val result: Future[Int] = runnableGraph.run() implicit val executionContext = system.dispatcher result.foreach { r => println(s"result:$r") // アクターシステムを終了 system.terminate() }
ソースの型はSource[Int, NotUsed]、シンクの型はSink[Int, Future[Int]]で、それぞれ2つの型引数を持ちます。1つ目はソースから出力する要素の型とシンクに入力する要素の型を示しています。ここでは、1から10の数値がソースから出力され、シンクに入力されるので共にInt型です。2つ目はマテリアライズされた値(materialized value)と呼ばれるもので、グラフの実行時に得られる補助値です。たとえば、処理結果の値やファイルI/O処理の成功/失敗の状態などが得られます。ソースは数値を出力するだけなので、マテリアライズされた値は使用しません。この場合、使用されていないことを示すNotUsedという型になります。一方シンクはFuture[Int]でグラフを実行するとマテリアライズされた値として、foldの結果を取得できます。型はIntで非同期に実行されるので、Futureに包まれた形になります。
上記の例ではソースのマテリアライズされた値はNotUsedでしたが、値が提供される場合もあります。たとえば、ファイルからソースを生成する場合はFileIO.fromPathを使用します。生成するSourceの2つの型引数はそれぞれ、出力するデータとしてByteString、マテリアライズされた値としてFuture[IOResult]となります。IOResultは、ファイルの読み込み結果を表す値です。マテリアライズされた値を利用することで、ファイルの読み込み結果に応じた処理も実装できます。
val source: Source[ByteString, Future[IOResult]] = FileIO.fromPath(Paths.get("README.md"))
さきほどの、コードを実行してみましょう。RunnableGraphを生成・実行され、Future[Int]が返ってきました。この値は「55」で、コレクション操作で確認した値と同じ結果となることがわかります。
[info] result:55
55(Future[Int])は、シンクのマテリアライズされた値です。これは、ソースとシンクをつなげてマテリアライズするときにtoMatの引数に、シンクに加えKeep.rightという値を渡していたためです。Keep.rightは右側、つまりシンク側のマテリアライズされた値を保持することを示します。同様にKeep.leftと指定することで、左側、つまりソース側のマテリアライズされた値を保持することもできます。
ソースのマテリアライズされた値がFuture[IOResult]、シンクの同値がFuture[Done]の場合を見てみましょう。Doneはforeachなど戻り値がない場合に使用します(Javaからも同じ型が利用できるようUnit型ではありません)。
val source: Source[ByteString, Future[IOResult]] = FileIO.fromPath(Paths.get("README.md")) val sink: Sink[ByteString, Future[Done]] = Sink.foreach(b => println(b.utf8String))
Keep.leftを指定した場合は、ソースのマテリアライズされた値であるFuture[IOResult]がRunnableGraphに保持されます。
val graph: RunnableGraph[Future[IOResult]] = source.toMat(sink)(Keep.left)
Keep.rightを指定した場合は、シンクのマテリアライズされた値であるFuture[Done]がRunnableGraphに保持されます。
val graph: RunnableGraph[Future[Done]] = source.toMat(sink)(Keep.right)
Keep.bothを指定すると、両方のマテリアライズされた値がRunnableGraphに保持されます。
val graph: RunnableGraph[(Future[IOResult], Future[Done])] = sourcae.toMat(sink)(Keep.both)
ソースとシンクを繋げるtoメソッドはKeep.leftが内部で指定されているため、ソースのマテリアライズされた値であるFuture[IOResult]がRunnableGraphに保持されます。
val graph: RunnableGraph[Future[IOResult]] = source.to(sink)
エラー処理
Akka Streamsのグラフ実行時に例外が発生した場合のふるまいを見ていきましょう。デフォルトのふるまいとしては、例外が発生した時点でストリーム処理を中断します。動作を確認するために要素の値が3の場合に「IllegalArgumentException」が発生するflowを定義した、次のようなソースコードを使用します。
// ソースを作成 val source = Source(1 to 5) // シンクを作成 val sink = Sink.foreach[Int](e => println(s"element=$e")) // フローを作成 val flow = Flow[Int].map{e => // 要素の値が3の場合に例外を発生させる if (e == 3) throw new IllegalArgumentException("oops!") else e } // ソースをシンクに繋いでRunnableGraphを生成 val runnableGraph = source.via(flow).toMat(sink)(Keep.right) // RunnableGraphを実行 val result = runnableGraph.run() result.onComplete { r => println(s"result:$r") // アクターシステムを終了 system.terminate() }
上記のflowを介さないRunnableGraphを実行した場合と比較してみます。
[info] element=1 [info] element=2 [info] element=3 [info] element=4 [info] element=5 [info] result:Success(Done)
flowを介さないグラフは例外が発生せず、ソースから流れる最後のデータ要素まで処理され、成功値(Success(Done))で終了しています。
[info] element=1 [info] element=2 [info] result:Failure(java.lang.IllegalArgumentException: oops!)
これに対して、例外を発生させた場合は、2番目のデータ要素まで処理されたあと例外が発生し、異常値(Failure(java.lang.IllegalArgumentException: oops!))で終了します。
エラーからの回復
グラフ実行時にrecoverメソッドを使用すると、例外発生時に適切な回復処理を実行できます。ここではログを出力したあと、戻り値に「recovered」という文字列を設定します。
// RunnableGraphを実行 val result = runnableGraph.run().recover { case _: IllegalArgumentException => println("oops! IllegalArgumentException") "recovered" }
[info] element=1 [info] element=2 [info] oops! IllegalArgumentException [info] result:Success(recovered)
グラフを実行すると処理は中断されましたが、成功値(Success(recovered))で終了するようになりました。
代替ソースによる回復
例外が発生したときに何らかの処理を行ったあとにただ終了するのではなく、問題のあったソースの代わりに代替ソースを使用し、処理を継続させることもできます。グラフコンポーネントに対してrecoverWithRetriesを適用し、障害から回復したあとの再実行で使用する代替ソースを指定します。
val flow = Flow[Int].map(e => if (e == 3) throw new IllegalArgumentException("oops!") else e ).recoverWithRetries(1, { case _: IllegalArgumentException => Source(List(30,40)) })
[info] element=1 [info] element=2 [info] element=30 [info] element=40 [info] result:Success(Done)
例外発生後、代替ソース(List(30,40))で処理が継続されました。
スーパーバイザー戦略
代替ソースではなく、そのままのソースでストリーム処理を継続したい場合は、Akkaのアクターと同じようにスーパーバイザー戦略を指定できます。指定できる戦略にはResume(状態維持)、Stop(停止)、Restart(状態破棄)があります。スーパーバイザー戦略はマテリアライザー、またはグラフコンポーネントに指定できます。デフォルトではStopが適用されますので、すでに見たように例外が発生すると失敗値(Failure)で終了します。ここでは、Resumeを指定して、例外が発生したあとも処理を継続するようにします。
// アクターシステムを生成 implicit val system = ActorSystem("simple-stream") // Supervision.Deciderの定義 val decider: Supervision.Decider = { case _: IllegalArgumentException => Supervision.Resume case _ => Supervision.Stop } // マテリアライザーにスーパーバイザー戦略を適用 implicit val materializer = ActorMaterializer( ActorMaterializerSettings(system).withSupervisionStrategy(decider))
IllegalArgumentExceptionが発生したときの対処として、Supervision.Resumeを設定したSupervision.Deciderを定義しました。withSupervisionStrategyを使用して、定義したDeciderをActorMaterializerに適用します。実行結果は次のようになります。
[info] element=1 [info] element=2 [info] element=4 [info] element=5 [info] result:Success(Done)
ストリーム処理で例外が発生した3番目のデータ要素を除き、最後のソースまで処理されていることがわかります。結果は成功値(Success(Done))で終了しています。withAttributes(ActorAttributes.supervisionStrategy(decider))を使用して、グラフコンポーネントごとにスーパーバイザー戦略を指定することもできます。
// Supervision.Deciderの定義 val decider: Supervision.Decider = { case _: IllegalArgumentException => Supervision.Stop case _ => Supervision.Stop } // フローを作成 val flow = Flow[Int].map { e => // 要素の値が3の場合に例外を発生させる if (e == 3) throw new IllegalArgumentException("oops!") else e }.withAttributes(ActorAttributes.supervisionStrategy(decider))