Mockito と Junit を使用した Hadoop Unit Test

まだまだ、Hadoop の勉強中です。今は Hadoop プログラムのテストおよびデバッグ方法について調べてました。

Hadoop 本 (http://oreilly.com/catalog/9780596521981)には Junit と Mockito を利用したテストの書き方が紹介されています。勉強がてら簡単なプログラムを書いてみました。

まずは、今回テストされる単語をカウントするためのマッパークラスを以下のように定義します。このクラスの map メソッドは入力文をスペースで分割してキーが単語 (Text), バリューが 1 (LongWritable) の要素を Context.write によって追加します。

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper
    extends Mapper<LongWritable, Text, Text, LongWritable> {

  @Override
  protected void map(LongWritable key, Text value,
                     Context context) throws IOException, InterruptedException {

      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
          context.write(new Text(itr.nextToken()), new LongWritable(1));
      }
  }
}

この map メソッドが期待通り動いているのかチェックするために、以下のテストプログラムを書きました。このプログラムでは、WordCountMapper内の map メソッドで期待した出力が Context オブジェクトの write メソッドを通じて期待された回数だけ出力されているのかを verify メソッドでチェックします。

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;

import org.junit.*;
import org.junit.runner.JUnitCore;
import static org.mockito.Mockito.*;

import java.lang.InterruptedException;
import java.io.IOException;

public class WordCountMapperTest {

    public static void main(String[] args) {
        JUnitCore.main(WordCountMapperTest.class.getName());
    }

    @Test
        public void testValidInput() throws IOException, InterruptedException {
        WordCountMapper wordCountMapper = new WordCountMapper();
        Text    value   = new Text("This is a pen and that is a note.");
        Context mock_context = mock(Context.class);

        wordCountMapper.map(null, value, mock_context);

        verify(mock_context, times(1)).write(new Text("This"), new LongWritable(1));
        verify(mock_context, times(2)).write(new Text("is"), new LongWritable(1));
        verify(mock_context, times(2)).write(new Text("a"), new LongWritable(1));
        verify(mock_context, times(1)).write(new Text("pen"), new LongWritable(1));
        verify(mock_context, times(1)).write(new Text("that"), new LongWritable(1));
        verify(mock_context, times(1)).write(new Text("note."), new LongWritable(1));
    }
}

Hadoop プログラムの単純なデバッグ方法について

最近 Hadoop ライブラリを用いて書かれたプログラムをデバッグする方法について調べてました。標準エラー出力を使用する方法と Context オブジェクトを利用する方法が簡単なようです。

標準エラー出力Hadoop プログラムから出力すると、プロンプトには出力されないのですが、とあるログファイルには出力されます。また、オブジェクト Context (昔は Reporter オブジェクトを使用していましたが 0.2 以降は Context を使用する方が良いらしいです) を利用して、別の場所に出力することもできます。

以下デバッグ用の文を含む単語カウントプログラムを書きました。このプログラムは通常の単語カウントプログラムなのですが、'invalid' という単語の場合だけはカウントを行わず、不正な入力として警告します (人工的なサンプルで申し訳ございません)。また入力ファイルに入っている各単語が標準エラー出力に出力されます。

import java.io.IOException;
import java.lang.Iterable;
import java.lang.InterruptedException;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountJob {

    public static class WordCountMapper
        extends Mapper<LongWritable, Text, Text, LongWritable> {

        private final static String invalidWord = new String("invalid");

        @Override
            public void map(LongWritable key, Text value,
                            Context context) throws IOException, InterruptedException {

            System.err.println("Enter WordCountMapper");
            StringTokenizer tokenizer = new StringTokenizer(value.toString(), " ");

            while (tokenizer.hasMoreTokens()) {
                String str = tokenizer.nextToken();
                System.err.println("Input str: " + str);

                if (str.equals(invalidWord)) {
                    context.setStatus("Detected invalid word: " + str);
                    continue;
                } else {
                    context.write(new Text(str), new LongWritable(1));
                }
            }
        }
    }

    public static class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

        @Override
            protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedEx$
            int sum = 0;

            for (LongWritable value : values) {
                sum += value.get();
            }
            context.write(key, new LongWritable(sum));
        }
    }

    public static void main(String[] args) throws Exception, InterruptedException {
        Job job = new Job();
        job.setJarByClass(WordCountJob.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

このプログラムを jar コマンドで WordCount.jar とまとめます。次に以下の内容を持つテキストファイルを作り、HDFS上に invalid.txt という名前で保存します。

this sentence contains word, invalid

最後に Hadoop からプログラム (WordCount.jar) を起動してください。

hadoop jar WordCount.jar WordCountJob invalid.txt output-wc

残念ながら、プロンプトには期待したデバッグ用の出力が現れませんが、その代わりデバッグ用の出力は Hadoop のログファイルの中に出力されています。標準エラー出力への出力は ${HADOOP_LOG_DIR}/userlogs/attempt___/stderr に出力されます。ここで attempt-id はタスクの ID です。たとえば、上記のプログラムの場合にはファイル stderr に以下の内容が記載されています。

Enter WordCountMapper
input str: this
input str: sentence
input str: contains
input str: word,
input str: invalid

ちなみに上記のプログラムでのもうひとつのデバッグ用のコードがあります。このコードは入力となる単語が invalid のときだけ "Detected possibly invalid word" と不平を出力します。この出力、

context.setStatus("Detected possibly invalid word: " + str);

はタスクトラッカー用のログファイル 
${HADOOP_LOG_DIR}/hadoop-username-tasktracker-nodename.log に出力されます。
たとえば上記の例だとファイルに以下のような行が出力が記載されます。

2009-10-23 14:41:08,378 INFO org.apache.hadoop.mapred.TaskTracker: attempt_200910231149_0013_m_000000_0 1.0% Detected possibly invalid record: invalid

このようなデバッグログを見るには、Hadoopのログディレクトリを探すのも良いですが、http://node-url:50030/jobtracker.jsp にアクセスすると、ブラウザ上からもログファイルを見ることができ便利です。

Hadoopには他にもいろいろなデバッグ方法がありますが、今回はこんな感じで。