Hadoopコースを攻略しよう(HDInsightによる大量ログ解析編)
この記事では、『Tuning Maniax 2014 - 蒼き調律者たち』Hadoop編に参加される方に向けて、 次の事柄を説明いたします。
- 競技の内容
- 解析対象ファイルセットの入手方法
- MapReduceプログラムの作成方法
競技内容の再確認
前回の記事にもありましたが、この競技は簡単に言えば以下のようなものです。
総計1TBのWebアクセスログ(を模して生成されたデータ)を解析し、次の3項目を抽出する時間を競う
- アクセス数の多いURI:上位10件
- 多く利用されているユーザーエージェント:上位10件
- URIごとの平均レスポンスタイム:下位10件(時間が長い方から10件)
解析対象となるファイルは次のような行の繰り返しです。
2014-04-01 02:52:48 192.168.47.74 user734 100.77.60.35 /javadocs /BucketizedHiveInputFormat.html 759 Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:22.0) Gecko/20130328 Firefox/22.0
フィールドは下記の7項目で、スペースではなくタブで区切られています。
- 日時
- クライアントのIPアドレス
- ユーザー名
- サーバーのIPアドレス
- リクエストURI
- リクエスト所要時間
- ユーザーエージェント
解析対象ファイルセット
こちらのBLOBストレージで公開しております。リージョンは「東南アジア」です。
> http://tuningmaniax.blob.core.windows.net/hdinsight/logs/full-1GBx1280
1GB程度(500万行)のファイルが1280個、総計1.2 TBほどあります。「ビッグデータ」というほどではない量ですが、あまり多いとAzure無償評価枠の中で解析できなくなってしまうので!
また、何度もジョブを実行して試行錯誤する際に、毎回1.2 TBを解析するのは大変ですから、「練習用」のファイルセットをいくつか用意しました。
■「本番」と同じ1 GBのファイルを、64個に減らしたもの。
> http://tuningmaniax.blob.core.windows.net/hdinsight/logs/medium-1GBx64
■さらに軽い練習用に、1ファイルのサイズを100 MBにしたもの。
こちらは16個、32個、64個の3セットあります。
> http://tuningmaniax.blob.core.windows.net/hdinsight/logs/small-100MBx16
> http://tuningmaniax.blob.core.windows.net/hdinsight/logs/small-100MBx32
> http://tuningmaniax.blob.core.windows.net/hdinsight/logs/small-100MBx64
自分のBLOBストレージへコピー
前述の解析対象ファイルセットは、公開されたものをそのまま利用することも原理的には可能ですが、多数のHadoopクラスタから同時にアクセスされると、性能面で悪影響が出る可能性もあります。そのため、前回の記事にあったように「東南アジア」リージョンにストレージアカウントを作成し、そちらへコピーしてのご利用をお勧めします。
コピーの方法ですが、前回記事の手順でWindows Azure PowerShellをインストールしてあれば、同時にazcopyというツールが導入されています。これは「Azure用のrobocopy」とでも言うべきもので、ストレージアカウント間のコピーを効率的に行うことができます(一旦ダウンロードすることなく、Azure-to-Azureでコピーできます)。
なお、インストールしただけではパスが通っていませんので、下記のフルパス名を使うか、環境変数PATHにフォルダ名を追加してください。
"C:\Program Files (x86)\Microsoft SDKs\Windows Azure\AzCopy\AzCopy.exe"
azcopyのコマンドラインは、BLOBのURLやストレージアクセスキーを指定するため長くなりがちですから、下記のようなバッチファイルにして実行するのも便利です。
@echo off setlocal set azcopy="C:\Program Files (x86)\Microsoft SDKs\Windows Azure\AzCopy\AzCopy.exe" set src="http://tuningmaniax.blob.core.windows.net/hdinsight/logs/small-100MBx16" set dest="http://sasakinsight.blob.core.windows.net/maniax/logs/small-100MBx16" set destkey="Azure管理ポータルからコピーしたアクセスキー" %azcopy% %src% %dest% /S /DestKey:%destkey%
※これは私の環境で100MBx16フォルダをコピーした際の例です。実行の際は、src, dest, destkeyあたりを適切に書き換えてください。
HDInsight Emulatorのファイルシステムへコピー
前回の記事で、Windows AzurePowerShellと共に”HDInsight Emulator”をインストールしました。これを使ってローカルで練習できるように、解析対象ファイルをHDInsight Emulatorのファイルシステムにもコピーしておくと便利です。
HDInsight Emulatorをインストールしてあると、デスクトップに”Hadoop Command Line”というショートカットがあるはずです。これを起動するとコマンドプロンプトが出てきますから、その中で次のようなコマンドを実行します。
hadoop distcp wasb://hdinsight@tuningmaniax.blob.core.windows.net/logs/small-100MBx16 /examples/logs
※一番小さな、100MBのファイルが16個配置されたフォルダを、/examples/logsへコピーする例です。
MapReduceプログラムの作成
では、いよいよログファイル解析のMapReduceプログラムを書いてみましょう。前回の記事でEclipseをインストールしましたので、それを使って今回のお題の一つである
「アクセス数の多いURI: 上位10件」
を、抽出するプログラムを書いてみましょう。
全く0からプログラムを作るのも大変ですから、Hadoop-1.2.1に含まれるサンプル(WordCount.java)をベースに、改造を加えていく形にしたいと思います。
Hadoopのアーカイブファイルの入手
サンプルプログラムは、Hadoopの配布物一式に含まれています。下記のページを参照し、いずれかのミラーサイトからダウンロードしてください。
> http://www.apache.org/dyn/closer.cgi/hadoop/common/
私は、こちらのファイルをダウンロードしてみました。
> http://ftp.tsukuba.wide.ad.jp/software/apache/hadoop/common/hadoop-1.2.1/hadoop-1.2.1.tar.gz
※なお、hadoop-1.2.1-bin.tar.gzにはソースファイルが入っていませんのでご注意。
このファイルを、どこか適当なところに展開してください。私はC:\Users\ksasaki\Downloads\hadoop-1.2.1に展開しました。Windows環境での.tar.gzの展開には、7-ZIP等のツールやCygwinに含まれるGNU Tarを使うのが手軽かと思います。
EclipseにJavaプロジェクトを作成
Eclipseを起動し、File→New→Java ProjectでJavaプロジェクトを一つ作ります。
※記事中の図をクリックすると拡大表示します(一部除く)。
初期状態ではプロジェクトは空っぽです。ここに、先ほど展開したhadoopのサンプルソースファイルをインポートしましょう。
1. ソースフォルダ”src”を右クリックして、”Import…”をクリックします。
2. “General”→”File System”をクリックして、”Next”をクリックします。
3. “From directory:”に、先ほど展開したhadoopアーカイブの”src/examples”フォルダを指定し、”examples”下の”org”をインポート対象としてチェックします。
最下層の”dancing”と”terasort”は不要なのでチェックを外します。
"Finish"をクリックします。
このようになりましたか?WorcCountクラスがあるのがわかると思います。
※私の好きなJava Browsingパースペクティブに切り替えてあります。
このパッケージにはたくさんのサンプルプログラムが含まれているのですが、わかりやすくするためにWordCount以外のクラスはひとまず削除してしまいましょう。すると、下図のようにすっきりとします。
また、インポートしただけの状態では、JAR参照が足りずにコンパイルエラーが出ていますので、参照を追加します。
1. プロジェクトを右クリックして、”Properties”をクリックします。
2. “Java Build Path”→”Libraries”タブ→”Add External JARs”をクリックします。
3. 先ほど展開したhadoop-1.2.1.tar.gzに含まれていた、”hadoop-core-1.2.1.jar”と” commons-cli-1.2.jar”を選択して、「OK」をクリックします。
以上で、WordCountクラスがコンパイルできるようになったはずです。これを元にして、リクエストURIのトップ10を数えてみましょう。
第1段階:下準備
まずは、正しく動かすための下準備です。
WordCount#main()で、Job#setCombinerClass()を呼んでいる箇所がありますが、これをコメントアウトします。単純なワードカウントであれば、ReducerをそのままCombiner(中間集計処理)としても使えますが、最終的にトップ10の抽出処理を行う我々のReducerは、Combinerにはできません。
それから、クラス名あるいはパッケージ名を変えておきましょう。
org.apache.hadoop.examples.WordCount のままだと、Hadoop実行環境にもともと存在する同名クラスが優先されてしまい、自分で書いたクラスが実行されません。
ここでは、クラス名を”UriCount”にしてみました。
第2段階:Map処理の改造
入力ファイル(解析対象のログファイル)は、ある程度の大きさごとに「入力スプリット」に分割され、各入力スプリットがデータノードに割り当てられます。
各データノードでは、この入力スプリットから1行ずつ取り出しては、Mapperのmapメソッドに引き渡します。
Mapperは、TokenizerMapperというネスト型として定義されています。Mapメソッドをちょっと見てみましょう。
valueにログファイルの一行が入ってくるので、これを処理するわけです。WordCountでは、単純にStringTokenizerで単語に分割して、「単語名=1」というkey-valueペアをcontext.write()に渡しています。例えば、以下のような1行があったとすると、
Quick brown fox jumps over the lazy dog
このようなkey-valueペアが生成されるわけです。
Key | Value |
---|---|
quick | 1 |
brown | 1 |
fox | 1 |
jumps | 1 |
over | 1 |
the | 1 |
lazy | 1 |
dog | 1 |
我々が実装すべきURIカウントでは、すべての単語を数える必要はありませんから、
- 入力レコード(これは前述の通りタブ区切り)をバラしてURIフィールドを取り出す。
- そのURIをカウントアップする。
という簡単な処理で良さそうです。こんな感じでしょうか。
public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split("\t"); // datetime, clientip, user, serverip, uri, timetaken, useragent word.set(fields[4]); context.write(word, one); }
# 変更点を最小にすべく”word”変数は名前を変えずにそのまま使っています。
さて、これでWordCount改めUriCountクラスは単語ではなくURIを数えるプログラムになったはずです。「TOP10の算出」処理をまだ実装していませんが、ひとまず動かしてみましょう。こんなとき便利なのが、手元で動くHDInsight Emulatorです。
第3段階:エミュレーターで動かしてみる。
まずはWordCountプログラムをjarファイルにまとめなければなりません。
1. クラスあるいはパッケージを右クリックして、”Export…”をクリックします。
2. “Java”→”JAR file”を選択し、”Next”をクリックします。
3. JARファイルの出力先を指定します。ここでは"C:\work\maniax.jar"としました。
4. “Finish”をクリックします。
これで、JARファイルが出力できましたので、”Hadoop Command Line”で実行してみましょう。
1. デスクトップの”Hadoop Command Line”アイコンをクリックして、HDInsight Emulator用のコマンドプロンプトを出してください。
2. 入力ファイルは、先ほど事務局のBLOBストレージからコピーしたものを使いましょう。所在を確認してみます。
c:\work>hadoop fs -lsr /examples/logs/2014* -rw-r--r-- 1 ksasaki supergroup 104491633 2014-04-25 15:28 /examples/logs/20140101-m-00000 -rw-r--r-- 1 ksasaki supergroup 105489682 2014-04-25 15:28 /examples/logs/20140101-m-00001 <以下略>
ありますね。
3. 次のコマンドを実行します。とりあえずのテストですから、入力ファイルは一つだけとします。結果は/out/test1へ出力してみます。
hadoop jar JARファイル名 org.apache.hadoop.examples.UriCount /examples/logs/20140101-m-00000 /out/test1
これで、ジョブの実行が始まるはずです。
4. ジョブが完了したら、出力ファイルを確認してみます。
c:\work>hadoop fs -ls /out/test1 Found 3 items -rw-r--r-- 1 ksasaki supergroup 0 2014-04-25 23:35 /out/test1/_SUCCESS drwxr-xr-x - ksasaki supergroup 0 2014-04-25 23:34 /out/test1/_logs -rw-r--r-- 1 ksasaki supergroup 5235601 2014-04-25 23:35 /out/test1/part-r-00000
_SUCCESSというのは、ジョブが正常に完了したことを示すフラグファイルです。
part-r-00000がReducerの出力した結果ファイルです。内容を確認してみましょう。全部出力すると多すぎるので、-tailコマンドを使って最後だけ見てみます。
c:\work>hadoop fs -tail /out/test1/part-r-00000 /xref/org/apache/hadoop/metrics2/MetricHistogram.html 11 /xref/overview-summary.html 9 /xref/stylesheet.css 13 /yoko/index.html 11 /~jim/projects.html 9
URIと、その出現回数がカウントされていますね!
では次に、「TOP10の集計」をしてみましょうか。
第4段階:TOP10の集計
これを実現するには、Reducerを改造する必要があります。そのためにまず、MapperからReducerへと至るデータの流れを再確認しましょう。
今、解析対象のログファイルに、”/foo”というURIが3回、”/bar”が2回、”/baz”が4回含まれているとしましょう。
Mapperは受け取ったレコードを処理した結果、次のようなkey-valueペアを出力します。
{/foo => 1} {/bar => 1} {/baz => 1} {/foo => 1} {/baz => 1} {/bar => 1} {/baz => 1} {/foo => 1} {/baz => 1}
Mapperの出力を受け取ったHadoopはこれを、キーでソート・集約し、Reducerのreduceを呼び出します。イメージとしては次のような感じです。
reduce({/foo => [1,1,1]}) reduce({/bar => [1,1]}) reduce({/baz => [1,1,1,1]})
※この時、同じキーを持つ値はまとめて一度のreduce呼び出しで処理されます。
WordCountの場合、キーごとの数値列を全部足し込んで出力するだけです。値の比較が不要ですから、極めてシンプルです。
「TOP10の集計」をする場合は、「どのキー(Uri)はいくつあった」というのを全部覚えておいて、最後の上位10エントリを抽出する必要があるので、一手間増えます。
どこに「一手間」を記述すべきなのか、Reducer実行の流れを疑似コードで見てみましょう。このような3段階の処理になっていて、それぞれをオーバーライドできるようになっています。Template Methodパターンですね。
setup(context); while(hasMoreKeys){ reduce(key, values);} cleanup(context);
となれば、
- reduceメソッドに「Uriごとの出現回数を全部覚えておく」処理を追加し、
- cleanupメソッドで「TOP10の集計」をする。
という感じが良さそうですね。
やってみましょう。
まず、reduceメソッドに関しては、次のように変更します。
- 複数回のreduce呼び出しにわたって、Uriとその出現回数を保持しておくHashMapをフィールド”uris”として追加。
- reduceメソッド内では、context.writeを呼び出さない。
結果として、reduceメソッドは以下のようにシンプルになりました。
private HashMap<String, Integer> uris = new HashMap<String, Integer>(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } uris.put(key.toString(), sum); }
次に、集計処理を実装すべく、cleanupメソッドをオーバーライドします。実行すべき処理は、
- urisフィールドを、値の降順にソート。
- ソート結果の先頭10件を出力(context.write)
という二つだけなのですが、Comparator#compareをオーバーライドする処理等で少し長ったらしくなっていますね…
@Override protected void cleanup(Context context) throws IOException, InterruptedException { List<Entry<String, Integer>> l = new ArrayList<Entry<String, Integer>>(uris.entrySet()); Collections.sort(l, new Comparator<Entry<String, Integer>>() { @Override public int compare(Entry<String, Integer> o1, Entry<String, Integer> o2) { return o2.getValue().compareTo(o1.getValue()); } }); Text key = new Text(); for (int i = 0; i < 10; i++) { Entry<String, Integer> e = l.get(i); key.set(e.getKey()); result.set(e.getValue()); context.write(key, result); } }
これを、先ほどと同じようにHDInsight Emulatorで実行してみると、次のような結果が得られました。
/index.html 1023 /docs/r2.2.0/hadoop-yarn/hadoop-yarn-site/index.html 176 /docs/r2.2.0/hadoop-yarn/hadoop-yarn-site/WritingYarnApplications.html 170 /docs/r2.2.0/hadoop-yarn/hadoop-yarn-site/MapredAppMasterRest.html 169 /docs/r2.2.0/hadoop-yarn/hadoop-yarn-site/WebServicesIntro.html 153 /docs/r2.2.0/hadoop-yarn/hadoop-yarn-site/YARN.html 152 /docs/r2.2.0/hadoop-yarn/hadoop-yarn-site/WebApplicationProxy.html 151 /docs/r2.2.0/hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html 146 /docs/r2.2.0/hadoop-yarn/hadoop-yarn-site/NodeManagerRest.html 146 /docs/r2.2.0/hadoop-auth/images/logos/maven-feather.png 133
アクセス数の多い順に10件、出力できましたね!
本物のHDInsightで動かすには?
エミュレーターではなく、Azure上のHDInsightで動かす方法は、前回の記事で説明したサンプルプログラムの実行方法と同じです。EclipseからエクスポートしたJARファイルを、HDInsightで使用するBLOBストレージへ配置したうえで、ジョブを実行してください。
工夫のしどころは?
さて、全員「Azure上のHDInsight」という同じ土俵で戦う今回のTuningManiax、ライバルに差を付けるためにはどんな工夫の余地があるのでしょうか。考えられるポイントをいくつか挙げておきます。
- 複数の問題を一度のジョブで処理する。
- 今回、問題は3つあります。個別に3回のジョブとして実行しても良いですが、うまくまとめて一つのジョブとして実行することも可能かもしれませんね。
- Combinerを組み込む。
- Mapperが出力したkey-valueペア群は、Reducerに渡される前に大規模なソート・集約処理が行われます。この処理負荷を軽減するための”Combiner”と呼ばれるクラスを定義することが可能です。
- Hadoopの実行時パラメータの調整。
- Mapperの数やJVMのヒープサイズなど、変更可能な設定項目はいくつもありそうです…
ここには書きませんが他にも工夫ポイントはありますよ!ぜひ、いろいろ試して高速なMapReduce処理を実現してください!