FC2ブログ

スポンサーサイト

このエントリーのカテゴリ : スポンサー広告

上記の広告は1ヶ月以上更新のないブログに表示されています。
新しい記事を書く事で広告が消せます。

【GAEでMapReduceを使おう!】appengine-mapreduceをカスタマイズしてInputの条件指定する!

このエントリーのカテゴリ : 【GAEでMapReduceを使おう!】

GoogleAppEngie上でappengine-mapreduceを使って作った電車遅延情報の収集サイト<http://monitorequest.appspot.com/traininfo/>ですが、【GAEでMapReduceを使おう!】MapReduceを使ってみた感想で「2.MapInputの条件指定ができない」ために、【GAEでMapReduceを使おう!】MapInputの条件指定でやったように、
 ①テーブルのレコードを全件(全期間)取得する
 ②レコードのMap処理で対象日以外は集計しない
としていましたが、【GAEでMapReduceを使おう!】GAEの料金体系が変わって動かない!にあるように、①全件取得すると1日のDataStoreのRead上限に達してどうにもこうにもなっていませんでした。

(RangeInputFormatというのがあったので使ってみようかとソースを読んでみると、本当に数値の上限と下限を渡すだけだった…。)

appengine-mapreduceDatastoreInputFormatもそのうち条件指定できるようになるかなー、と思っていましたが、
全く拡張される気配がありません。
仕方がないのでappengine-mapreduceをカスタマイズすることにしました。

適当なクラス(FilterDatastoreInputFormatクラスとか)を作ってappengine-mapreduceのソースから
 com.google.appengine.tools.mapreduce.DatastoreInputFormat
をそのままコピペしてクラス名を変えます。


public class FilterDataStoreInputFormat extends InputFormat {



Servletから渡すパラメータのキーを定義します。


public static final String FILTER_KEY = "filterdatastoreinputformat.filterkey";
public static final String FILTER_START_VALUE = "filterdatastoreinputformat.startvalue";
public static final String FILTER_END_VALUE = "filterdatastoreinputformat.endvalue";



getSplitsの中でServletから渡されるパラメータの値を取得してQueryの条件に指定します。


public List getSplits(JobContext context) throws IOException {
 String entityKind = context.getConfiguration().get(ENTITY_KIND_KEY);
 if (entityKind == null) {
  throw new IOException("No entity kind specified in job.");
 }
 log.info("Getting input splits for: " + entityKind);

 // filterkeyの追加
 String filterKey = context.getConfiguration().get(FILTER_KEY);
 String filterStartValue = context.getConfiguration().get(FILTER_START_VALUE);
 String filterEndValue = context.getConfiguration().get(FILTER_END_VALUE);
・・・
 //上限と下限が指定されていればクエリのfilter条件に設定する
 // key <= '上限' AND key >= '下限'みたいな感じ
 Query scatter = new Query(entityKind);
 if(filterKey != null ){
  if(filterStartValue != null){
   scatter.addFilter(filterKey, FilterOperator.GREATER_THAN_OR_EQUAL, filterStartValue);
  }
  if(filterEndValue != null){
   scatter.addFilter(filterKey, FilterOperator.LESS_THAN_OR_EQUAL, filterEndValue);
  }
  scatter.addSort(filterKey);
 }
 scatter.addSort(SCATTER_RESERVED_PROPERTY);
 scatter.setKeysOnly();
・・・



getStartKeyでも同様にクエリに条件を追加します。


private static Key getStartKey(String entityKind, DatastoreService datastoreService,
    String filterKey,String filterStartValue,String filterEndValue)
  throws IOException {
 Query ascending = new Query(entityKind);
 if(filterKey != null ){
  if(filterStartValue != null){
   ascending.addFilter(filterKey, FilterOperator.GREATER_THAN_OR_EQUAL, filterStartValue);
  }
  if(filterEndValue != null){
   ascending.addFilter(filterKey, FilterOperator.LESS_THAN_OR_EQUAL, filterEndValue);
  }
  ascending.addSort(filterKey);
 }
 ascending.addSort(Entity.KEY_RESERVED_PROPERTY);
 ascending.setKeysOnly();
・・・



呼び出しのServlet側では下記のように昨日今日を指定するようにします。
※MapReduceのその他の実装コードについては【GAEでMapReduceを使おう!】を参照ください。



//Mapinputクラスに上記で作成したクラスを指定
conf.setClass("mapreduce.inputformat.class", FilterDataStoreInputFormat.class, InputFormat.class);

//Datastoreのテーブルを指定
conf.set(FilterDataStoreInputFormat.ENTITY_KIND_KEY, "TrainInfo");

//条件指定のキーを指定
conf.set(FilterDataStoreInputFormat.FILTER_KEY, "InfoDate");

//上限値(今日)
Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("Asia/Tokyo"));
conf.set(FilterDataStoreInputFormat.FILTER_END_VALUE, DateUtil.formatYMD(cal.getTime()));

//下限値(昨日)
cal.add(Calendar.HOUR, -24);
conf.set(FilterDataStoreInputFormat.FILTER_START_VALUE, DateUtil.formatYMD(cal.getTime()));



実行してみると…、ばっちり!
DatastoreのRead上限に達せず集計できました!

http://monitorequest.appspot.com/traininfo/monitor

きれいに負荷分散されないのが気になるけど、動くなったので、まーいいか。
スポンサーサイト

テーマ : Google関連
ジャンル : コンピュータ

【GAEでMapReduceを使おう!】GAEの料金体系が変わって動かない!

このエントリーのカテゴリ : 【GAEでMapReduceを使おう!】

GAE上で無料で電車遅延情報収集サービスを作って遊んでいましたが、
http://monitorequest.appspot.com/traininfo/
2011/11/7から突然エラーが発生して見れなくなってしましました。

ダッシュボードを見てみると「Datastore Read Operations」が無料分の100%を超えていて、
データが読み込めないようでした。

エラーログを見てみると、MapReduceの集計処理で
The API call datastore_v3.RunQuery() required more quota than is available.
が頻発しています。

ちょっと調べてみると、2011/11/7からGAEの料金体系が変わって無料分が大幅に削減されたようですね。
http://sourceforge.jp/forum/message.php?msg_id=60410
とかに載っています。
Hadoopで7万件程度のレコードを集約する処理なので、一発で無料分は終わりだな。

http://www.chrome-life.com/google-plus/2245/
に記載されていますが、GAE上を前提としたサービスを開始している事業者は痛手でしょうねぇ。
無料なのをいいことにサービス拡大して、いつのまにかGoogleにロックインされてしまって、
今回のように突然値上げされると出さざるを得なくなる。

まあそんなに高くなさそうなので問題ないかもしれませんが、
ビジネスをやるつもりでもなかった私とかはゼロだったのが10円でも必要になるんで考えるかなー。

ロックインの危険性を肌で感じられたのはいい経験かも。

とはいえ、なんとか対応しないといけないな。
うーん、どうしよ。

テーマ : Google関連
ジャンル : コンピュータ

【GAEでMapReduceを使おう!】TaskQueueが詰まってえらいこっちゃ

このエントリーのカテゴリ : 【GAEでMapReduceを使おう!】

こちらのサイトで
http://monitorequest.appspot.com/traininfo/
電車遅延の口コミをひたすら収集していましたが、お盆休みで目を離したスキに集計処理が止まってました。
GoogleAppEngine上で、appengine-mapreduceのモジュールを使って集計処理をしています。

GoogleAppEngineのコンソールを見てみると、MapReduceの処理が開始されるや常にCPUが100%近くになって、6時間くらい経つと止まってました。
原因を見てみるとTaskQueueがフリーで使用できる上限に達しているようです。
QuotaDetailsの「Task Queue API Calls」が上限の100,000になってました。

どうもMapReduceの処理が失敗して、再実行するのを繰り返し行うために無限ループに入ってしまったようです。
フリーだから10万回で止まったから良かったものの、お金払ってたとしたら…怖いですねぇ。

で、止めようとしたのですが、一筋縄ではいきませんでした。

1. EclipseからGoogleAppEngineのアプリケーション更新しようとしたら500エラー!
 とりあえずcronで半日ごとに起動させていたMapReduceの処理を止めようと思いました。
 cron.xmlの定義をコメントアウトしてEclipseから更新!しましたが、500エラーが出て更新できません(ToT)

2. TaskQueueが削除できない!
 管理コンソールからTaskQueueを見ると、上限の5つたまっていたので、これをdeleteしようとするとこれも失敗!
 Queueを停止させてもダメ。

おそらく、Eclipseからのアプリケーションの更新も、TaskQueueの削除も全部TaskQueueで処理してるんじゃないかと思います。
うーん、TaskQueueを削除するのもTaskQueueで、フリーの上限に来ているから消せない。ちょっと滑稽。
これはお金を払うしかないのか?でも払っても上限が来たらもっと払わないとダメになるのでは…。


なんて思いましたが対処できました。

1. とりあえずTaskQueueを停止(Suspend)
2. 日が変わってTaskQueueの受付が可能になる
3. MapReduceのTaskQueueがリクエストされても実行されずに待ち状態

EclipseからGoogleAppEngineのアプリケーション更新時に500エラー出なくなりました。
また、管理コンソールのTaskQueueでQueueのdeleteでの失敗のなくなりました。

さらに、MapReduceの処理も失敗しなくなりました。。。
これは何もアプリを変えていないので微妙です。
いつ失敗して無限ループに陥るか。

はやいところ失敗したらリトライしないように修正しよう。

テーマ : Google関連
ジャンル : コンピュータ

【GAEでMapReduceを使おう!】MapReduceからの非同期連携

このエントリーのカテゴリ : 【GAEでMapReduceを使おう!】

サイトはコチラ→http://monitorequest.appspot.com/traininfo/

【GAEでMapReduceを使おう!】MapInputの条件指定で、MapReduceで行う集計処理の対象を全期間から直近2日分に減らしました。

うまくいったと思いきや、全期間の集計データまで2日分に減ってしまっていました。

非同期処理追加前

うーん、MySQLとかRDBだったら画面からのリクエスト時に

SELECT 路線名,count(*) AS 日数, sum(報告数) AS 累積 FROM 日別路線別サマリ
GROUP BY 路線名


で済むんだけど…。
Java上で集計してもいいけど、日数x路線数だとそのうちまたOutOfMemory問題になりそうだし。
せっかくなので集計処理にします。

最初は日別路線別サマリからのMapReduce処理にしようかと思いましたが、上記SQLで単純に記述できるように日別路線別サマリのCOUNTと報告数のSUMを路線別サマリにINSERTするだけなので余計に複雑になってしまいました。
そこで、この集計処理をサーブレット一つにして、MapReduce処理の非同期後続処理にしました。

非同期後続処理後

サーブレット処理は単純に、以下の処理だけです。
 1. 日別路線別サマリを全件取得
 2. 路線別に日数と報告数を集計
 3. 2を保存(Update or Insert、Oracleで言うところのmerge)

MapReduceのReduce処理サーブレットに集計処理サーブレットのURLをキューに追加するだけでできました。

Queue queue = QueueFactory.getDefaultQueue();
TaskOptions task = TaskOptions.Builder.withDefaults()
  .url("集計処理サーブレットのURL").method(Method.POST);
queue.add(task);



真面目にやるとすれば、MapReduceは1日分のみ対象で、明細に集計済みフラグかなにかをつけて、路線別サマリにも日数と報告数を追加するだけにすればいいんだろうけど、路線別サマリにコミットできなかったら明細の集計済みフラグをロールバックするとかできなさそうだし。。。
バッチを再実行可能なようにつくるには結構面倒だなぁ。

テーマ : Google関連
ジャンル : コンピュータ

【GAEでMapReduceを使おう!】MapInputの条件指定

このエントリーのカテゴリ : 【GAEでMapReduceを使おう!】

【GAEでMapReduceを使おう!】実装編3:MapperとReducer【GAEでMapReduceを使おう!】MapReduceを使ってみた感想の「【感想】2.MapInputの条件指定ができない」ために「【課題】1.Reducerサーブレットの負荷軽減」が問題でしたが、案の上エラーになりました。

エラー内容は

2011-06-06 21:03:55.439 /Reducer処理URL/ 500 32557ms 283878cpu_ms 277928api_cpu_ms 0kb AppEngine-Google; (+http://code.google.com/appengine)
com.google.apphosting.api.ApiProxy$ApiDeadlineExceededException: The API call datastore_v3.Put() took too long to respond and was cancelled.



2011-06-05 21:04:21.211 /Reducer処理URL/ 500 30847ms 274708cpu_ms 271768api_cpu_ms 0kb AppEngine-Google; (+http://code.google.com/appengine)
com.google.appengine.api.datastore.DatastoreTimeoutException: Unknown


です。

Reducerのサーブレット処理が30秒以上掛ってますね…^^;

でもまだappengine-mapreduceはフル実装のMapReduceになっていません。
Subversionのリポジトリからソースをダウンロードしても、AppEngineReducer.javaはありますが、MapReduceServlet.javaには呼ばれている気配すらありません。

そこでMapInputを絞り込むことにしました。

と言っても、MapInputの条件を絞り込めないので読み込んだ後で集計対象にしないようにしただけです。
こんな感じです。


 @Override
 public void taskSetup(Context context) {
  // 1日前までしか対象にしない対応
  Calendar cal = Calendar.getInstance(TimeZone
    .getTimeZone("Asia/Tokyo"));
  cal.add(Calendar.HOUR, -24);
  try {
   yesterday = DateUtil.parse00(cal);
  } catch (ParseException e) {
   log.warning("Parse Error");
  }
 }

・・・

 public void map(Key key, Entity value, Context context) {
  log.info("Mapping key: " + key);
  if (value.hasProperty(TrainInfoParser.INFO_DATE)
    && value.hasProperty(TrainInfoParser.TRAIN_NAME)) {

   String infoDate = (String) value
     .getProperty(TrainInfoParser.INFO_DATE);

   // 1日前までしか対象にしない対応
   if(yesterday != null){
    try {
     if(yesterday.after(DateUtil.parseYMD(infoDate))){
      return;
     }
    } catch (ParseException e) {
     log.warning("Parse Error");
    }
   }
   
   context.getCounter(TrainInfoDateSum.TOTAL_COUNT, infoDate)
     .increment(1);
 ・・・



集計の対象を1日前までのデータを対象にしています。
というのも、MapReduce処理は12時間おきに走らせていますが、0時台に実行されると前日分の集計がされないためです。

また、yesterday = DateUtil.parse00(cal);としていますが、内部的には0時0分0秒に変換しています。
yesterday = cal.getTime()にした場合、前日のHH:MMになり比較対象はYMDのため00:00なので前日分が含まれないためです。

そしてこの処理はtaskSetup()にて行います。
setUp()で処理を行った場合、map処理ではインスタンスフィールドyesterdayがnullになりますが、taskSetup()にて設定しておけばmap処理中でもyesterdayが取得できます。

Map処理は現状でも分散処理されているので、エントリーデータが多くなってもそれほど問題にはならないと考えています。
なので、Reducer処理に渡るデータの量を減らしてみました。

結果は…

2011-06-07 09:03:28.349 /Reducer処理URL/ 200 2689ms 14616cpu_ms 11606api_cpu_ms 0kb AppEngine-Google; (+http://code.google.com/appengine)
↑修正後

↓修正前
2011-06-06 21:03:55.439 /Reducer処理URL/ 500 32557ms 283878cpu_ms 277928api_cpu_ms 0kb AppEngine-Google; (+http://code.google.com/appengine)



32秒掛っていたのが2.7秒になりました!
大成功!!!L(@^▽^@)」

テーマ : Google関連
ジャンル : コンピュータ

プロフィール

toronic

Author:toronic
IT関係で10年働いたのでそろそろ独立したいと考えているけどなかなか一歩が踏み出しきれないありきたりなプログラマ

カテゴリ
ブックマーク
最新記事
月別アーカイブ
検索フォーム
最新コメント
ブロとも申請フォーム

この人とブロともになる

メールフォーム

名前:
メール:
件名:
本文:

スポンサーリンク
リンク
一攫千金?
RSSリンクの表示
QRコード
QRコード
    助成金
    上記広告は1ヶ月以上更新のないブログに表示されています。新しい記事を書くことで広告を消せます。