blog

DeNAのエンジニアが考えていることや、担当しているサービスについて情報発信しています

2020.03.24 技術記事

オートモーティブの大規模データ処理を支える技術 前編: クラウドアーキテクチャ

by Yoshitaro Makise

#automotive #dena-techcon #google-cloud #infrastructure

はじめに

こんにちは。オートモーティブ事業本部の牧瀬芳太郎です。

DeNA TechCon 2020 にて発表する予定だった内容をBlog記事としてお届けします。
今回は「オートモーティブの大規模データ処理を支える技術」と題して、前後編でお送りします。 (後編は こちら )

オートモーティブ分野のデータ処理には、2つの「大規模」が出てきます。 1つは車両の台数が多く、データの入力頻度が高いことからの大規模。 もう1つは、地図データという静的ながらも現実世界を反映した大規模なデータ。

この前後編では、オートモーティブ分野では欠かせない車両動態情報の加工・分析パイプライン、特に「マップマッチ」と呼ばれる処理を軸に、 クラウドアーキテクチャやアルゴリズムとデータ構造など様々なレイヤーでこれらの「大規模」に立ち向かう工夫をご紹介したいと思います。

前編となる本稿では、「マップマッチ」と呼ばれる処理を軸に、大規模な車両位置情報を処理するためのクラウドアーキテクチャを構築・改善した例をご紹介します。

車両位置情報の収集

オートモーティブ分野のサービスの多くで必要になることが、車両位置情報の収集です。サービスが管理している何千〜何万台の車両のそれぞれから、位置情報を収集します。 リアルタイムに収集する場合もあれば、バッチ的に収集する場合もあると思います。 また今回は扱いませんが、サービスによっては位置情報だけでなく、カメラ映像、タクシーのメーター情報といったものも収集する場合があります。

これら収集した位置情報は、様々な用途に活用されます。以下は MOV DRIVE CHART における例です。

  • 運行管理
  • サービスの実現
    • 空車車両の検索 (MOV)
    • 運行履歴からの需要予測 (MOV)
    • 速度超過・一時不停止等 違反運転の検出 etc. (DRIVE CHART)
  • サービスの改善
    • 運行履歴の分析、AI学習
  • etc.

車両位置情報収集システム

このような車両位置情報システムは、一般的には図のような構成になります。

車両位置情報収集システム

  • ①車両に専用デバイス、あるいは Androidデバイスなどを載せ、継続的にGPSデータを打ち上げる。
  • ②サーバー側で①のGPSデータを集約し、ストレージに保存。この時点のデータを活用することもあります。
  • ③GPSデータに対してマップマッチ処理(後述)を行った後、ストレージに保存。このデータを各種アプリケーションにて活用します。

マップマッチ(Map Matching)

一般に、生のGPSデータは誤差を含んでおり、そのままでは利用しづらい場合があります。 また、生のGPSデータ(緯度・経度)ではなく、どの道路を通ったか、が知りたいケースがあります。

そこで、生のGPSデータを道路ネットワークデータと突き合わせ、経路情報を得る処理が必要になります。 これをマップマッチ(Map Matching)と呼びます。

道路ネットワークデータ

道路ネットワークデータとは、地図情報を道路に着目してモデル化した、グラフ構造データです。

  • 道路がどんな形状をしているか、その座標
  • 道路同士がどのように接続しているか
  • 道路の属性(制限速度、幅員、有料道路 etc.)
  • 道路のID

などを含んでいます。

マップマッチの例

図のように、誤差を含んだGPSの座標列から、実際に走行したであろう経路と、真のGPS座標を推定しています。

マップマッチの例

MOVにおける移動体情報の収集・処理

ここからは、具体的に MOV において今まで述べてきたようなシステムをどのように実現しているか紹介します。

システム全体像

システムの全体像は図のようになっています。実際にはもう少し複雑ですが、ここではマップマッチ処理の流れにフォーカスして、関係ないものを省いています。

MOVにおける車両位置情報収集システム

①最初に、タクシーの位置情報が、タクシーに搭載した Android デバイスから、AWS IoT に送信されます。
このエッジおよびAWS側の詳細は AWS Summit 2018 での発表資料を参照してください。

②AWS IoT にて集約したデータは GCP に受け渡した後、Cloud Dataflow にて 5分毎に1ファイルにまとめ、いったん Cloud Storage に保存します。

③マップマッチ処理を行い、結果を BigQuery に格納します。これについては後述します。

Dataflow とは

上の図に出てきた、また、この後も出てくる Google Cloud Dataflow について紹介します。

GCP上のフルマネージド分散処理サービスです。 Apache Beam というプログラミングモデルを採用しており、Java, Scala, Python, Go で処理を記述できます。

Apache Beam は、バッチ処理とストリーミング処理を統合していることや、様々な分散処理ランタイム上で実行可能であることが特徴です。
Dataflow は Apache Beam が動作するランタイムの1つ、という位置付けになります。

Dataflow は私達のシステムにおいて大規模データ処理を行う上でのキーとなるコンポーネントの1つとなっています。

マップマッチ処理フロー(初期版)

マップマッチ処理の部分について詳しく説明します。まず、単純ではあるが効率が悪かった初期実装について説明します。

マップマッチ処理フロー(初期版)

①マップマッチ処理は日次のバッチ処理で行っており、Cloud Composer (Apache Airflow) でトリガーします。

②前出の図の通り、Cloud Storage に全車両のGPSデータが含まれるファイルが5分毎に1ファイルずつ配置されます。

③Dataflow にて、以下の処理を行います。

    1. Cloud Storage から1日分のファイルを読み込む
    1. 車両IDで GroupByし、タイムスタンプでソート。これにより車両ID毎に1日分のGPSデータが得られます
    1. 車両ID毎にMapMatch APIを呼び出す
    1. BigQuery に結果を格納

④MapMatch API は GKE 上の API サーバーとして構築しています。1リクエスト毎に1車両分のGPS時系列データを受け取り、道路IDと補正後の緯度経度を付加して返却します。
なお、1日分では処理に時間がかかり Dataflow 側がタイムアウトしてしまうため、1時間単位でデータを切って繰り返し MapMatch API を呼び出しています。

MapMatch API のインフラ構成

MapMatch API は python で書かれていますが、インフラは下図のような構成になっています。

MapMatch API

GKE 上の Deployment として実装しており、道路ネットワークデータを ReadOnlyMany モードの Persistent Volume で共有しています。
Pod を増やしていくらでもスケールできる構成になっています。また、Pod および Node のオートスケール(HorizontalPodAutoscaler, Cluster autoscaler)を設定しています。

また GKE のノードプールには Preemptible VM を使用し、コストを節約しています。

マップマッチ処理フロー(初期版)の問題点

初期版フローがこのような実装になっている背景には、元々、Cloud Storage から BigQuery にインポートする Dataflow の汎用的なフレームワークを作成していたことがありました。
そのフレームワークに乗る形で、MapMatch API への呼び出しを最小工数で追加したものが初期版フローになります。

しばらくはこの構成で運用していたのですが、コスト・運用上の課題がいくつかありました。

  • Dataflow が MapMatch API のレスポンスを待つ時間が長く、時間・コストの無駄
  • 並列度を上げられずスケールしない
  • 不正GPSデータがあると全車両分やり直しになる

次項で具体的に説明します。

初期版フローにおける Dataflow ワーカーの動作

Dataflow は複数のワーカーノードの複数のスレッドに処理を分散しますが、今回のジョブの場合には図のような処理の流れになります。

初期版フローにおける Dataflow ワーカーの動作

ここで問題となるのが、以下の点です。

  • MapMatch API を呼び出し、そのレスポンスを待つ処理の繰り返しが、ジョブ全体の実行時間の大部分を占め、その間 Dataflow の CPU は空いている
  • MapMatch API を呼び出す処理を行っている間、他に並行してできる作業がなく、空いている CPU を有効活用できない

さらに悪いことに、Dataflow の料金体系はワーカーインスタンスの時間課金です。 初期版フローでは日次のバッチ処理に 5〜8時間かかっていました。 CPU を遊ばせた状態で時間課金のインスタンスを長時間動かし続けるという、コストの無駄が大きい構成になっていました。

並列度を上げられずスケールしない

別の問題として、MapMatch API はいくらでも横に並べることができますが、 Dataflow はワーカーノード数 x vCPU 数までしか並列動作しないため、MapMatch API を呼び出す際の並列数がそこでサチってしまうという点がありました。

各ワーカースレッド内でさらに並行に HTTP リクエストを投げる実装にすればこの点は解決しますが、ナイーブな実装ではハマるポイントです。

不正GPSデータがあると全車両分やり直しになる

MapMatch API は不正なGPSデータを読み込むとエラーを返すのですが、それをそのまま Dataflow のジョブのエラーとしてハンドリングしていたため、 リトライして4回目の実行が失敗すると、Dataflow のジョブ全体が失敗してしまい、全車両分の処理をやり直す必要がありました。

改善プロジェクト

とある事情で、過去6ヶ月分の軌跡データに対してマップマッチを実行し直す必要性が生じました。 初期版フローのまま実施すると、100万円以上かかる見込みでした。

さすがに初期版フローのまま実施するのは費用の無駄が大きく、以前から改善したいと考えていたこともあり、マップマッチ処理フローの改善プロジェクトが発足しました。

改善案

改善の基本的なアイデアとしては、Dataflow で待ち時間の長いタスクは避ける、ということです。 待ち時間がある部分は Dataflow から切り離し非同期化します。

非同期化にあたり、Cloud Pub/Sub または Cloud Tasks を使うことを検討しましたが、 Cloud Tasks は Push型で HTTP Target をサポートしており、MapMatch API の構成を大きく変えずに済みそうだったため、Cloud Tasks を採用しました。

マップマッチ処理フロー(改良版)

改善後の構成が以下になります。

マップマッチ処理フロー(改良版)

大きく3つのパートに分かれます。順次説明します。

改良版フロー(1)

まず Dataflow の処理です。

①入力は初期版と同じで、Cloud Storage に全車両のGPSデータが含まれるファイルが5分毎に1ファイルずつ配置されます。

②Dataflow の処理で、車両IDで GroupByするところまでは同じですが、直接 MapMatch API を呼び出していたところを、Cloud Tasks に enqueue するのみとします。

③MapMatch API に与えるGPSデータは車両ID毎に1ファイルにして Cloud Storage にアップロードします。

④Cloud Tasks に後述する Adaptor API の呼び出しを enqueue します。これを1日の車両の数だけ行います。

Cloud Tasks に enqueue する際のパラメータは以下のようになります。

  • 後述する Adapter API のエンドポイント URL
  • ペイロード(JSON)
{
    "dak_id": 車両ID,
    "source_path": 入力GPSデータのGCS上のパス,
    "output_path": 出力データのGCS上のパス
}

Cloud Tasks にジョブを一通り enqueue したら、Dataflow の役目は終わりです。

改良版フロー(2)

次は MapMatch API のパートです。

⑤Cloud Tasks とのインターフェースを取るために、MapMatch API の前段に、Adaptor と呼んでいる API サーバーをかませています。
これが Cloud Tasks からキックされると、Cloud Storage からファイルを読み込んで MapMatch API を呼び出し、結果を Cloud Storage の別の場所に格納します(⑥)。
入力と出力の Cloud Storage のパスは、前節で Cloud Tasks に enqueue されたパラメータ source_path, output_path を利用します。

Adaptor を導入することにより、MapMatch API 自体には改修を入れずに Cloud Tasks や Cloud Storage との接続を実現できました。

Adaptor は MapMatch API と同じ Pod の中に別コンテナとして差し込んでいます。

改良版フロー(3)

ここまでで、マップマッチ済みのデータが1車両1ファイルずつ Cloud Storage 上にアップロードされました(⑥)。これを BigQuery に投入するのが最後のステップです。

ここで1つ解決しなければならない課題があります。 非同期化したおかげで、「1日分の処理がいつ完了したか」がわからなくなり、後続の処理に影響が出ます。 BigQuery を参照する側は、いつになったら対象日の全車両分のデータが揃うのかがわかりません。

そこで、③で Cloud Storage に配置したマップマッチ前のファイルと、⑥で Cloud Storage に出力されたファイルの一覧を比較することで、 全てのファイルが処理されたことをチェックしています(ファイル名には日付や車両IDの情報が含まれます)。
Cloud Composer の Sensor によって実現しており(⑦)、ポーリングにより上記チェックを行うことで処理の完了を待ちます。

以上が改良版のマップマッチ処理フローになります。

工夫した点

以下は改良版フローにおいて工夫した点のまとめです。

  • 待ちが多い処理を Dataflow から追い出す
  • Cloud Tasks を利用して非同期処理
  • 非同期になった部分の待ち合わせを Composer で行う

成果

この改良により、以下の成果がありました。

  • 大幅なコスト削減
  • 処理時間短縮
  • デバッグが楽に
  • リトライの柔軟性向上
  • スケール単純化

順に説明します。

大幅なコスト削減

  • 過去分のマップマッチが100万円以上 → 数万円弱で完了し、96%コスト削減できました。
  • また、Dataflow 費用が 1日数千円かかっていたのが数十円になり、99%コスト削減できました(MapMatch API 費用は別)。
  • Cloud Tasksは最初の100万リクエストは無料であり、無料枠に収まります。

処理時間短縮

  • 1日分の処理が5〜8時間 → 約2時間で完了するようになりました。
  • 処理時間は Cloud Tasks の同時実行数、MapMatch API の HPA 設定を変えることで柔軟に調節可能です。
  • 過去6ヶ月分のマップマッチは8時間で完了。この時、4800 vCPU分の MapMatch API サーバーを並列に立てて処理しました。

デバッグが楽に

  • マップマッチに失敗したものは Cloud Tasks のキューに残るので、どの車両/どこにデータがあるのかわかりやすくなりました。
    失敗したデータのみローカルでデバッグ実行することもできます。

リトライの柔軟性向上

  • マップマッチに失敗した車両のデータだけをリトライすればよくなりました。

スケール単純化

  • MapMatch API (GKE Node/K8s Pod)のスケールだけを考えればよくなりました。
  • Cloud Tasks はフルマネージドで、スケーリングについて考える必要がありません。

今後の改善

今後改善していきたいこととしては、2つあります。

1つは、運用性向上です。現状、不正GPSデータがあった場合に MapMatch API がエラー終了したりタイムアウトする場合があり、 その際 Cloud Tasks にキューが残ったままになり、手動で対応が必要になります。

もう1つは、Pull型 Worker です。現状の MapMatch API では、オートスケーリングでいくらでもスケールできる反面、 細かいスケーリングの制御ができず、GKEノードの CPU を余らせてしまうケースがあります。
現状は Cloud Tasks からの Push型で動作している所を、Pull型でデータを取ってくる形にすれば、CPU 利用に関してよりプログラマが制御できる余地が増えると考えています。

DRIVE CHART の Pull型 MapMatch Worker

実は DRIVE CHART ではこの Pull型 MapMatch Worker を既に実装しています。

DRIVE CHART でもマップマッチ処理を行っていますが、MOV とは完全に別のシステムになっており、AWS 上に構築されています。 マップマッチのロジック実装のみ MOV と共通化しています。

DRIVE CHART の Pull型マップマッチ

DRIVE CHART では MapMatch Worker を ECS 上に立て、SQS からタスクを取得する形になっています。

まとめ

以上、大規模な車両位置情報の収集・処理を行うクラウドアーキテクチャの MOV における例をご紹介しました。

Dataflow, Cloud Tasks, GKE 等マネージドサービスを活用し、スケーラビリティの確保、運用負荷の削減を行った例をご紹介しました。

Dataflowは大規模データの分散処理に有用ですが、タスクの性質に注意して利用する必要がある、ということをご説明しました。

最後まで読んでいただき、ありがとうございます!
この記事をシェアしていただける方はこちらからお願いします。

recruit

DeNAでは、失敗を恐れず常に挑戦し続けるエンジニアを募集しています。