Hadoop: Counters

Hadoopを利用してMap関数やReduce関数を利用する際、処理し終わったデータがどのようなものだったのか知りたいものです。調べてみると Hadoop の象さん本に書いてありますね。

MapReduce 処理が終わった後、Jobオブジェクトの getCounters メソッドを使うと Counters というクラスのオブジェクトがかえされます。このオブジェクトを調べると、Map関数に対する入力行数など処理したデータに関する情報が得られます。

以下のプログラムはワードカウントするだけですが、最後に入力ファイルの行数を出力します。

import java.io.IOException;
import java.lang.Iterable;
import java.lang.InterruptedException;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Counters;

public class WordCountJob {

    public static class WordCountMapper
	extends Mapper<LongWritable, Text, Text, LongWritable> {

        @Override
        public void map(LongWritable key, Text value,
                            Context context) throws IOException, InterruptedException {
             StringTokenizer tokenizer = new StringTokenizer(value.toString(), " ");

            while (tokenizer.hasMoreTokens()) {
                String str = tokenizer.nextToken();
                context.write(new Text(str), new LongWritable(1));
            }
        }
    }

    public static class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

        @Override
            protected void reduce(Text key, Iterable<LongWritable> values,
                                  Context context) throws IOException, InterruptedException  {
            int sum = 0;

            for (LongWritable value : values) {
                sum += value.get();
            }
            context.write(key, new LongWritable(sum));
        }
    }

    public static void main(String[] args) throws Exception, InterruptedException {
        Job job = new Job();
        job.setJarByClass(WordCountJob.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        job.waitForCompletion(true);
        Counters counters = job.getCounters();
        long mapIn = counters.findCounter("org.apache.hadoop.mapred.Task$Counter",
                                         "MAP_INPUT_RECORDS").getValue();
        System.out.println("the number of lines in input is " + mapIn);
    }
}

たとえば、入力ファイルが以下のように三行からなるとき、

this is a pen
that is a hoge
those are hogehoge

先のプログラムは以下のように、入力ファイルの行数を出力します。

the number of lines in input is 3

デフォルトでも Map に対する入力だけでなく、Reduce を含め色々な入出力データに対するカウンタが定義してあります。

ちなみに、以下のページはデフォルトカウンタの種類や自分用にカスタマイズしたカウンタの作り方について親切に書いてます。
http://www.umiacs.umd.edu/~jimmylin/cloud9/docs/content/counters.html