HBase と Serialization
Hadoop で計算したデータは HDFS にファイルとして保存するのが手軽ですが、出力されたファイルに含まれるデータ片にアクセスするにはファイルを全ロードする必要があって面倒です (MapFile にはランダムアクセスできますが)。このような場合データベースにデータを格納すると格納された個々のデータ片アクセスできて便利です。
そこで HBase という データベースにデータを格納し、後でそのデータを取り出すという処理について調べてました。HBase は Hadoop のサブプロジェクトであり、キーバリューペアのデータを格納できます。HBase ではシリアライズされたオブジェクトを入れておいて、後でデシリアライズすることでオブジェクトを元通り復元することができます。
HBase のシリアライズの仕方については、HBase の Serialization テストを見ると書いてあります。ただ少々と分かりづらいので勉強がてら単純なサンプルを作ってみました。このサンプルプログラムで格納するデータは図書館の利用履歴です。ユーザ ID がキーで、借りられた本の ID からなる配列がバリューです。サンプルのシリアライズは Writable というインターフェースに基づいています。処理は単純で HBase のテーブル "library" を作って、そのテーブルに ユーザ IDと利用履歴を表す Array (LongArrayWritable) データを入れてからそれを取り出してデシリアライズする処理を行っています。このサンプルプログラムを実行すると、以下の用に本とそれを借りたユーザペアが表示されます。
book 21 is borrowed by 1-th user.
book 121 is borrowed by 1-th user.
book 821 is borrowed by 2-th user.
book 26 is borrowed by 2-th user.
book 40 is borrowed by 2-th user.
動作確認はできたので次はもうちっと本格的な自作のクラスのシリアライズをやってみようと思います。自作クラスをシリアライズするには Writable インタフェースを実装する必要があるそうです。ただシリアライズの方法も他にあるそうなのでそこいら辺をチェックしなくては。
import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Scan; class LongArrayWritable extends ArrayWritable { public LongArrayWritable() { super(LongWritable.class); } } public class HBaseSampleAppendArray { static final String TABLE_NAME = "library"; static final String BOOK_FAMILY = "book_number"; public static void main(String[] args) throws Exception { HBaseConfiguration config = new HBaseConfiguration(); HBaseAdmin admin = new HBaseAdmin(config); if (admin.tableExists(TABLE_NAME)) { admin.disableTable(TABLE_NAME); admin.deleteTable(TABLE_NAME); } // create table HTableDescriptor desc = new HTableDescriptor(TABLE_NAME.getBytes()); desc.addFamily(new HColumnDescriptor(BOOK_FAMILY.getBytes())); admin.createTable(desc); HTable table = new HTable(config, TABLE_NAME); // put ids of users and borrowed books to HBase Put userOnePut = new Put(Writables.getBytes(new LongWritable(1))); Put userTwoPut = new Put(Writables.getBytes(new LongWritable(2))); LongArrayWritable userOneHistory = new LongArrayWritable(); LongArrayWritable userTwoHistory = new LongArrayWritable(); userOneHistory.set(new LongWritable[] {new LongWritable(21), new LongWritable(121)}); userTwoHistory.set(new LongWritable[] {new LongWritable(821), new LongWritable(26), new LongWritable(40)}); byte[] ueserOneHistoryByte = Writables.getBytes(userOneHistory); byte[] ueserTwoHistoryByte = Writables.getBytes(userTwoHistory); userOnePut.add((BOOK_FAMILY).getBytes(), null, ueserOneHistoryByte); userTwoPut.add((BOOK_FAMILY).getBytes(), null, ueserTwoHistoryByte); table.put(userOnePut); table.put(userTwoPut); // get the borrowing histories from HBase Scan scan = new Scan(); scan.addFamily(BOOK_FAMILY.getBytes()); ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { LongWritable userId = (LongWritable) Writables.getWritable(result.getRow(), new LongWritable()); for(KeyValue key : result.sorted()) { LongArrayWritable deserializedUserHistory = (LongArrayWritable) Writables.getWritable(result.getValue(Bytes.toBytes(BOOK_FAMILY)), new LongArrayWritable()); Object [] userHistoryArray = deserializedUserHistory.get(); for (int j = 0; j < userHistoryArray.length; j++) { LongWritable bookId = (LongWritable) userHistoryArray[j]; System.out.println("book " + new String(Long.toString(bookId.get())) + " is borrowed by " + userId + "-th user." ); } } } } }