IIJ社における分散DB技術「ddd」(2)
MapReduceの具体例:フロー情報の解析
MapReduceはシンプルなモデルですが、やり方次第でいろいろなデータ処理ができます。参考文献[1]では、分散grep、分散sort、URLアクセス回数のカウント、転置インデックスの生成などが例として挙げられています。
IIJ では、前回の話(http://thinkit.co.jp/article/1030/1/)の通り、バックボーンルータが出力するフロー情報(トラフィック統計情報)や、サーバーが出力するログを分散ストレージに格納しており、そのデータをMapReduceを使って抽出/加工しています。
ここでは、ルータのフロー情報を解析する場合を例にとり、MapReduceの具体的な動きを説明します(図2)。やりたいこと、すなわち MapReduce ジョブの内容は、「ある期間のトラフィックのうち、送信元IPアドレス(src ip)ごとに通信したバイト数を集計する」こととします。
処理の流れの詳細
・入力データ
入力データであるフロー情報は、ddd の分散ストレージ(分散キーバリューストア)上に蓄積されています。
フロー情報は、ルータIDと時刻をキーにして、フローレコード本体を値として保存されています。フローレコード本体は、送信元IPアドレス(src ip)、宛先IPアドレス(dst ip)、実際に流れたバイト数(bytes)など数十項目からなっています。
・ジョブの分割
クライアントは、解析対象ルータ、対象期間と、抽出条件などのパラメータを含むMapReduceジョブリクエストを準備し、それを ddd のノード群のどれかに送信します。
リクエストを受けたノードは、対象期間を一定時間間隔に区切る形で、MapReduceジョブを複数のmapタスクに分割し、各タスクを対応するデータを持っているノードに割り当てます。
なお、dddでは、GoogleやHadoopのMapReduceと異なり、明確なマスタノードというものがありません。クライアントは、複数あるノードの中からどれでも好きなものを選んでMapReduceジョブリクエストを送ります。すると、そのノードが、そのジョブについてだけの一時的なマスタとなります。もちろん、どのノードにジョブを送っても、ジョブパラメータが同じなら最終的な結果は同じになります。
・mapタスクの実行
mapタスクは、フローレコードの中から着目したい項目、この場合はsrc ipとbytesのカラムをとりだし、一時ファイルに書き出します。また、このときに条件(例えばdst ipの範囲)を指定してレコードを絞り込むこともよく行われます。
各mapタスクはお互いに独立していて依存関係がありません。よって別々のノードで並列に実行することができます。
・shuffleフェーズ
mapタスクの結果は、一定の規則でreduceタスク実行ノードに振り分けられます。このフェーズをshuffleフェーズと呼びます。shuffleは、可能な限りmapと並行して行われます。
mapやreduceと違い、shuffleでは利用者が処理するプログラムを書く必要はなく、MapReduceフレームワークが自動的に行います。ただし、振り分けの規則は利用者が変更することができます。
ここの例ではsrc ipの値をハッシュし、reduceタスク数で割った余りによって振り分け先を決めます。こうすることにより、同じsrc ipは必ず同じreduceノードに集まり、1つのsrc ipが複数のreduceノードに現れることがなくなります。
・reduce タスクの実行
すべてのmapタスクが終了するとreduceフェーズがはじまります。
reduceフェーズではshuffleで集められたデータを結合し、src ipアドレス順に並べ替えます。すると、同じsrc ipが連続するようになるため、src ipごとに対応するbytesの合計値を求めます。その結果を出力データとして分散ストレージに書き出します。あるいは、クライアントに返すこともできます。
shuffleフェーズで、同じsrc ipは同じ reduceノードに振り分けているため、reduceノードが複数あってもsrc ip単位の集計が可能です。
最終結果は、各reduceノードが出力した結果をすべて結合したものになりますが、MapReduceには最後の結合を行うフェーズはありません。これは、あるMapReduceジョブの結果が別のジョブへの入力となるケースが多く、その場合は分割されたままでも支障がないからです。どうしても結合したければ利用者が自分で行います。