FC2ブログ

スポンサーサイト

このエントリーのカテゴリ : スポンサー広告

上記の広告は1ヶ月以上更新のないブログに表示されています。
新しい記事を書く事で広告が消せます。

【GAEでMapReduceを使おう!】実装編3:MapperとReducer

このエントリーのカテゴリ : 【GAEでMapReduceを使おう!】

【GAEでMapReduceを使おう!】実装編2:モデルの実装の次は、MapperReducerの実装部分を説明します。



Mapperの実装】
・2:TrainInfoMapper …Javaクラス
電車遅延情報の明細を解析して、日別に集計するMap処理を行います。

こちらのページを参考にMapperを実装しました。
 http://d.hatena.ne.jp/eller/20110206/1296965559

com.google.appengine.tools.mapreduce.AppEngineMapperを継承し、
map(Key key, Entity value, Context context) メソッドを実装します。

後述の4:StartMapReduceServletにてデータストアのクラスを指定すると、
そのインスタンスが1レコードずつ第2引数(Entiy value)に渡されます。
第1引数のKeyはそのレコードのKeyです。


import com.google.appengine.api.datastore.Entity;
import com.google.appengine.api.datastore.Key;
import com.google.appengine.api.datastore.KeyFactory;
import com.google.appengine.tools.mapreduce.AppEngineMapper;

/**
* TrainInfoのMapper
*/
public class TrainInfoDataRailMapper extends
 AppEngineMapper<Key, Entity, NullWritable, NullWritable> {
(省略)
 @Override
 public void map(Key key, Entity value, Context context) {

  // valueが日付、路線名の値を持っている場合のみ処理
  if (value.hasProperty(INFO_DATE)
    && value.hasProperty(TRAIN_NAME)) {

   // 1. 日別の報告数集計をカウントアップ
   String date= (String) value.getProperty(INFO_DATE);
   context.getCounter(TOTAL_COUNT,date).increment(1);
      
   // 2. 日別の路線別集計をカウントアップ
   String train=(String)value.getProperty(TRAIN_NAME);
   // 複数属性の場合は「日付+","+路線名」とする
   String dateTrain=createDateRailKey(date,train);
   context.getCounter(KEY_PARAM,dateTrain).increment(1);
      
   // 3. 日別の時刻別集計をカウントアップ
   String infoTime = (String) value.getProperty(INFO_TIME);
   // 時分(HH:mm)からHHの部分のみ取得
   String hour = null;
   int index = infoTime.indexOf(':');
   if (index > 0) {
    hour = infoTime.substring(0, index);
    // 「日+","+時刻」を値とする
    String dateTime = createDateTimeKey(date, hour);
    if(dateTime != null){
     // 「日+","+時刻」をカウントアップ
     context.getCounter(HOUR_COUNTS, dateTime)
      .increment(1);
    }
   }
 }
}



日別の報告数、日別の路線数、日別の時刻別報告数の3つを集計しています。

1. 日別の報告数集計をカウントアップ
 1-1. 遅延報告情報の報告日を取得
  String date = (String) value.getProperty(INFO_DATE);

 1-2. "日別サマリの集計"という属性が"date"である合計数を1つ加える
  context.getCounter(TraTOTAL_COUNT, date).increment(1);

appengine-mapreduceでは、属性名とその値の組み合わせで集計できます。
context.getCounter(属性名,値)に合計データが格納されているので、increment(1)します。
Mapper処理は複数インスタンスで同時並列実行されますが、context.getCounter()すればすべての処理が集約されます。だからincrement(1)なんですね。

ただし、集計の属性名は文字列しか設定できません。
「5月1日の東海道線」のように日時と路線の組み合わせの場合、文字列にする必要があります。

2. 日別の路線別集計をカウントアップ
 2-1. 路線名を取得
  String train = (String) value.getProperty(TRAIN_NAME);
 2-2. 1-1.で取得済みの日付と路線名を組み合わせて属性名とする
  String dateTrain = createDateRailKey(infoDate, train);
  ※createDateRailKeyはdateとtrainを","で文字列結合する処理のみ
 2-3. "日別路線名"が"date,train"の合計を1加える
  context.getCounter(KEY_PARAM, dateTrain).increment(1);

3. 日別時刻別集計データ追加も同様に、日付と時刻を文字列で組み合わせた値で集計しています。(データの構造上、"HH:mm"で保持していたので"HH"のみ取得するロジックが入っています。)



Reducerの実装】
・3:ReducerServlet …Servlet

Mapperの集計処理がすべて終わった後に、集計結果をデータストアに保存します。
appengine-mapreduceの場合、ReducerというよりMapperの集計結果を処理するサーブレットになります。
こちらを参考に実装しています。
 http://d.hatena.ne.jp/eller/20110403#1301840205


/**
* ReducerServlet
*/
public class TrainInfoSumStoreServlet extends HttpServlet {

 // doGetだと来ません。POSTのようです。
 @Override
 public void doPost(HttpServletRequest req,
   HttpServletResponse resp) throws ServletException{

  // データストアへの保存用のアクセスインスタンスを取得
  PersistenceManager pm = GaePersistentManagerFactory
     .get().getPersistenceManager();
  try {
   
   // 1. JobIDをキーとして処理結果を取り出す
   DatastoreService ds = DatastoreServiceFactory
     .getDatastoreService();
   JobID id = JobID.forName(req.getParameter("job_id"));
   MapReduceState state = MapReduceState
     .getMapReduceStateFromJobID(ds, id);

   // 2. 集計結果を取り出す
   Counters counters = state.getCounters();
      
   // 3. 日別報告数の集計結果を保持するマップ
   Map<String,TrainInfoDateSum> dateSumMap
     = new HashMap<String,TrainInfoDateSum>();
      
   // 4. 日付報告数の集計結果を一旦マップに詰め込む
   CounterGroup dateTotalCounters
      = counters.getGroup(TOTAL_COUNT);
   for (Counter c : dateTotalCounters) {
    String date = c.getName();
    long value = c.getValue();
        
    // dateSumMapからdateをキーにTrainInfoDateSumを取得
    TrainInfoDateSum dateSum
      = getTrainInfoDataSum(dateSumMap, date);
    // dateSumMapに集計結果を格納
    dateSum.setTotalCount((int)value);
   }

   // 5. 日別時刻別の集計も同様に一旦マップで保持する
   CounterGroup dateTimeCounters = counters
     .getGroup(HOUR_COUNTS);
   for (Counter c : dateTimeCounters) {
    String dateTime = c.getName();
    long value = c.getValue();
        
    // 「日+","+時刻」を区切って24個の配列に結果を格納する
    TrainInfoDateSum dateSum = getTrainInfoDateSum(
      dateSumMap,dateTime,(int)value);
   }
      
   // 6. 日別路線別の集計も同様に一旦マップで保持する

   // 「1日に何路線が遅延したか」と、
   //「ある路線が何日遅延したか」も集計
   List<TrainInfoDateRailSum> dateRailList
     = new ArrayList<TrainInfoDateRailSum>();
   Map<String,TrainInfoRailSum> railSumMap
     = new HashMap<String,TrainInfoRailSum>();

   CounterGroup rails = counters.getGroup(KEY_PARAM);
   for (Counter c : rails) {
    String name = c.getName();
    long value = c.getValue();
                
    // 6-1. 日別路線別集計結果を設定
    // (2011年4月1日に東海道線の遅延報告が100件、など)
    TrainInfoDateRailSum dateRailSum
      = createTrainInfoDateRailSum(name);
    dateRailSum.setCount(value);
    dateRailList.add(dateRailSum);
        
    // 6-2. 日別の路線数集計を設定
    //(2011年4月1日は全国で20路線が遅延していた、など)
    String infoDate = dateRailSum.getInfoDate();
    TrainInfoDateSum dateSum
      = getTrainInfoDataSum(dateSumMap, infoDate);
    dateSum.setTrainCount(dateSum.getTrainCount() +1 );
        
    // 6-3. 路線別遅延日数を追加
    // (東海道線はこれまで60日遅延した、など)
    TrainInfoRailSum rSum = getTrainInfoRailSum(
      railSumMap, dateRailSum.getTrainName());
    rSum.setTotalCount(rSum.getTotalCount()+(int)value);
    rSum.setDateCount(rSum.getDateCount() + 1);
  }
      
  // 7. 集計結果をデータストアに保存する
  if(dateSumMap.size() > 0){
pm.makePersistentAll(dateSumMap.values());
  }
    
  if(dateRailList.size() > 0){
pm.makePersistentAll(dateRailList);
  }      
      
  if(railSumMap.size() > 0){
pm.makePersistentAll(railSumMap.values());
  }
 } catch (Exception e) {
  throw new ServletException(e);
}finally {
pm.close();
}
}



1. JobIDをキーとして処理結果を取り出す
 おまじないですね。ひとまずこのままで問題ないと思います。

2. 集計結果を取り出す
  Counters counters = state.getCounters();
 このcountersにMapperにてcontext.getCounter(XX,XX).increment(1)で集計した結果が格納されています。

3. 日別の集計結果を保持するマップ
 集計結果を1行(この場合1日)ずつデータストアに保存するのはどうかと思うので、日別のデータをすべて溜めてから保存するようにします。
 ※この辺りはデータ量(日付)が多くなったら要チューニングポイントです。

4. 日付報告数の集計結果を一旦マップに詰め込む
 4-1. 日別の集計結果のリストを取得します。
  CounterGroup dateTotalCounters = counters.getGroup(TOTAL_COUNT);
 4-2. 日別に集計結果を3.のマップにつめていきます。
  for (Counter c : dateTotalCounters) { ←Mapperで設定した集計結果
   String infoDate = c.getName(); ←2011/04/01、など
   long value = c.getValue(); ←100、など
        
 4-3. dateSumMapにinfoDateをキーとしてTrainInfoDataSumがなければ新規作成&マップ格納する処理を隠蔽化
   TrainInfoDateSum dateSum
      = getTrainInfoDataSum(dateSumMap, infoDate);
   dateSum.setTotalCount((int)value);

5. 日別時刻別の集計も同様に一旦マップで保持する
 4.の処理とほぼ同じですが、時刻別にTrainInfoDataSumに保持しているので若干面倒です。
  TrainInfoDateSum dateSum = getTrainInfoDateSum(dateSumMap,
   infoDateTime,(int)value);

なお、getTrainInfoDateSumの中身はこんな感じです。


public static TrainInfoDateSum getTrainInfoDateSum(
   Map<String, TrainInfoDateSum> dateSumMap,
   String dateTimeKey,int value) {
 if (dateTimeKey == null) return null;
 TrainInfoDateSum dateSum = null;
 StringTokenizer token = new StringTokenizer(dateTimeKey, KEY_SEP);
 if (token.hasMoreTokens()) {
  String infoDate = token.nextToken();
  if (token.hasMoreTokens()) {
   String infoTime = token.nextToken();
   try {
    int hour = Integer.parseInt(infoTime);

    dateSum = dateSumMap.get(infoDate);
    if (dateSum == null) {
     dateSum = new TrainInfoDateSum();
     dateSum.setInfoDate(infoDate);
     dateSumMap.put(infoDate, dateSum);
    }
    int[] hourCounts = dateSum.getCounts();
    if (hourCounts == null) {
     hourCounts = new int[24];
     dateSum.setCounts(hourCounts);
    }
    hourCounts[hour] = value;
   } catch (Exception e) {
   // 時刻のパースに失敗したらカウントしない
  }
 }
 return dateSum;
}



6. 日別路線別の集計も同様に一旦マップで保持する
 ここも日別路線別の集計(2011年4月1日に東海道線の遅延報告が100件、など)は4.や5.と同じですが、ついでに以下も集計して保持しています。
  ・1日に何路線遅延したか(2011年4月1日は全国で20路線が遅延)
  ・各路線の遅延日数(東海道線はこれまで60日遅延)
 前回まで説明していなかったのでこの辺りの説明は割愛させていただきます。

7. 集計結果をデータストアに保存する
 PersistentManagerのmakePersistentAllにMapのvalues()を渡しています。
pm.makePersistentAll(dateSumMap.values());
 日別データ(日別報告数、日別路線別報告数、日別時刻別報告数)のみであればdateSumMapのみですが、6.で説明したとおり、他にも2つのサマリを追加するため、合計3つのテーブルにデータを追加しています。

(2011年5月25日追記)
さて、このReducerもどきのサーブレット処理ですが、案の定30秒以上かかってしまっています。
なんとか分散処理させたいなぁ、と思っていたらこんなニュースがでました。
グーグル、フル機能のMapReduceをGoogle App Engineで提供へ
「さらにShuffler機能もオープンソースで公開する。Python版は今日公開、Java版もまもなく公開する。」
おー、Shuffle機能が出たら試してみよう!

次は、MapReduce処理を開始するサーブレットの実装です。
サンプルをそのまま使った場合、/mapreduce/startのページで「start」ボタンを押すとMapReduceが開始されますが、毎回手動で実施するのも面倒なので日次で自動処理できるようにサーブレットを用意しておきます。

【GAEでMapReduceを使おう!】実装編4:MapReduce起動サーブレットとweb.xml



ここで実装したアプリはこちら
 ・日別の報告数と路線数のグラフ
 ・日別時刻別の報告数集計表



【GAEでMapReduceを使おう!】GAEの制約とMapReduceの活用
【GAEでMapReduceを使おう!】環境準備編
【GAEでMapReduceを使おう!】実装編1:追加開発部分
【GAEでMapReduceを使おう!】実装編2:モデルの実装
【GAEでMapReduceを使おう!】実装編3:MapperとReducerの実装
【GAEでMapReduceを使おう!】実装編4:MapReduce起動サーブレットとweb.xml
【GAEでMapReduceを使おう!】実装編5:cronで日次処理にする
【GAEでMapReduceを使おう!】管理コンソールでMapReduce状況を確認する
【GAEでMapReduceを使おう!】MapReduceを使ってみた感想
【GAEでMapReduceを使おう!】JavaでのShardの増やし方
スポンサーサイト

テーマ : Google関連
ジャンル : コンピュータ

コメントの投稿

非公開コメント

プロフィール

toronic

Author:toronic
IT関係で10年働いたのでそろそろ独立したいと考えているけどなかなか一歩が踏み出しきれないありきたりなプログラマ

カテゴリ
ブックマーク
最新記事
月別アーカイブ
検索フォーム
最新コメント
ブロとも申請フォーム

この人とブロともになる

メールフォーム

名前:
メール:
件名:
本文:

スポンサーリンク
リンク
一攫千金?
RSSリンクの表示
QRコード
QRコード
    助成金
    上記広告は1ヶ月以上更新のないブログに表示されています。新しい記事を書くことで広告を消せます。