はじめまして。AIシステム部MLエンジニアリンググループ で学生インターンをしている 早坂( @takemioIO ) です。 普段はパケット処理などをやっているのですが、縁あってここでは MLOps の通常業務に携わっております。 私は二ヶ月間インターンとして開発に取り組んでいました。ここではその実装物の一つを紹介します。
この AI システム部 のとあるプロジェクトでは、 Cloud Functions と Cloud Pub/Sub を利用したデータパイプライン を構築しております。
そのプロジェクトは毎日のように変更が取り込まれ、非常に開発が盛んですが一方それゆえに破壊的な変更で足を撃ち抜いてしまいそれによって悩まされることがありました。
さらにはクラウドサービスを利用してるという部分からローカルでの検証環境がありませんでしたので、毎回 GCP に デプロイするしかなく、トライアンドエラーに時間がかかったり、新規機能を追加するといろいろな部分でコンフリクトする問題が存在していました。
そこでローカル上で Cloud Functions の実行環境のまま E2Eテスト 可能にすることで開発のアジリティを上げ、さらには簡単に立ち上げれることでコンポーネントごとの検証を可能にし、すぐに破壊的な変更に気がつけるようになることで辛さを減らすことを目指しました。
当記事では、これらがまとまってる情報がないことからTipsとして Cloud Functions をローカルで 統合テスト する知見とその具体的な実装をご紹介します。
この記事には以下のようなことが書いてあります。
- 現状の Cloud Functions に関してのテスト
- GCP Emulatorsについて
- 我々はどのようなアプローチをとっているのか
この記事ではなんとなく以下のことをを知っていることを前提とします。
- Cloud Functions とは何か
- Cloud Pub/Sub とは何か
- Cloud Firestore とは何か
現状のCloud Functionsに関してのテスト
とりあえず Cloud Functions に関しての GCP公式ドキュメント を眺めてみましょう。
- テストの基本
- HTTP 関数のテスト
- バックグラウンド関数のテスト
- CI / CD
とありますが、中身をざっと眺めてみますと、基本的に単体テストでは mock-up を使用し、統合テスト(つまり単一のfunctions)では Functions Framework を利用して実行環境をエミュレートすることが推奨されています。
Functions Framework とは Cloud Functionsの一つの関数をひとつのポートでlistenするツールです。これは Python, Node.js, Go… などと様々な言語で実装が書かれています。 実際 GitHub に転がってるいくつかのプロジェクトを調べても、Functions Framework だけで間に合わないものは基本的にmock-upだけで行うというのが主流のようです。
また、多くの場合は Cloud Functions を HTTP関数 として使っている場合が多く、Swagger や API Blueprint などで定義したものを Dredd などを使って投げ込んでテストすることが多いのではないでしょうか。
しかし、我々は前述の通り、Cloud Functions と Cloud Pub/Sub の組み合わせでパイプラインを作っている故に、そのような形のRESTらしい扱い方はしておらず、関数は Cloud Pub/Sub をトリガーとして実行され直接的な返り値を持ちません。また Cloud Functions という一貫したコンポーネントをテストしたかったというモチベーションもあり、単純に Functions Framework
を使うことは残念ながら要件とマッチしませんでした…
GCP Emulatorsについて
また、さらに面倒な用件が存在しました。 Cloud Functions の中から Cloud Firestore や Cloud Pub/Sub を利用していたので、それらをうまいこと動かしてあげる必要があったということです。 例えば ある関数で何かしらの条件チェック等を行って、問題がなければバリデーションをした情報をPub/Sub経由で受け渡すみたいなことが行われていました。
そこで GCP Emulators というツールを利用しました。 https://cloud.google.com/sdk/gcloud/reference/beta/emulators?hl=ja
これはGCP謹製のサービスエミュレーターです。これを使うとローカルでもあたかもGCPのサービスがあるように実行が可能になります。 対応しているモノとしては以下のものをが挙げられます。
- Cloud Pub/Sub
- Cloud Firestore
- Cloud Bigtable
- Cloud Datastore
- Cloud Spanner
弊社のプロジェクトでは Cloud Pub/Sub と Cloud Firestore を利用しているので選択肢としてはちょうどカバー可能でした。
これらを実行するアプリケーションに対して特定の環境変数にエミュレーターがlistenする接続先情報を渡すことで実行が可能になります。 以下は Cloud Pub/Sub の例です。
export PUBSUB_EMULATOR_HOST="localhost:8085"
また実行時の注意点としては以下を参照するとJavaを事前にインストールする必要があります。 https://cloud.google.com/pubsub/docs/emulator?hl=ja#prereq
Java JRE(バージョン 7 以降)がインストールされている。
ということでそれを踏まえると Cloud Pub/Sub のエミュレーターのインストールの方法は以下の通りです
brew cask install java
gcloud components update
gcloud components install pubsub-emulator
我々はどのようなアプローチをとっているのか
さて、使えそうなツールは見つけましたので、まずはどのように使っているかの全体像を説明します。 我々は以下のような仕組みを実装しました。
フローに沿って数字が振っていますのでそれに沿って説明します。 まず、ざっと一言ずつで説明すると以下のようになります。
- Fixtures に書いてる中身をtesterが読み込む
- Cloud Functions のロードと GCP Emulators の立ち上げ
- GCP Emulators のセットアップ
- Cloud Functions にリクエストを渡す
- GCP Emulators から結果を受け取り assert & GCP Emulators の中身を削除する
- Cloud Functions と GCP Emulators をシャットダウンさせる
これだけだとよくわからないので、一部わかりやすいように実際に使ってるコードを抜粋と改変を加えながら、もう少し詳しく説明します
1. Fixturesに書いてる中身をtesterが読み込む
我々は fixtures と呼んでいますが、接続先と実行関数と入出力のアセットが記述されたファイルを定義し、それらをテストクライアントが読み出します。 以下が実際にプロダクションで使っているフォーマットを抜粋したモノです。
function: 関数名
entrypoint: 実行される関数のエントリポイント
listenport: サーバーが立ち上がるポート
input: #Cloud Functionsへの入力データ
testcase1: #入力データ名
...
# メッセージの中身を辞書で書く
setup: #セットアップするためのデータ
awesome_firestore_lists: #入力データ名
...
output: #出力
testcase1: #出力データ名
type: 関数からの返ってくるところはどこかを書く。(firestore|pubsub)
...
expect: #期待される中身を書く。
...
2. Cloud Functions のロードと GCP Emulators の立ち上げ
ここでは先ほどの読み込んだ情報から Functions Framework と GCP Emulators を立ち上げます。 実際の実装の一部を示します。
def listen_pubsub():
async_exec_cmd(f"gcloud beta emulators pubsub start --project={PROJECT_ID}")
res = exec_cmd("gcloud beta emulators pubsub env-init")
env = res.split(" ")[1].split("=")
return {env[0]: env[1]}
env = listen_pubsub()
async_exec_cmd(
f"functions-framework --target={entrypoint} --port={port} --source={source} --signature-type=event --debug",
env=env,
stdout=stdout,
stderr=stderr,
)
これらは GCP Emulators を立ち上げた後、接続先情報を取得し関数を立ち上げる際にはそれを環境変数として渡すことでその GCP Emulators への参照を可能にします。
開発の所感として、せっかくコード上で STDIO
を握れるようになったので以下のようにしておくと便利だと思います。
- いっそのこと初めからデバッグオプションを渡す
- 内部的には Flask を使っているので、そちらに デバッグフラグを渡した時の挙動が動きます
- テストの実行時の条件分岐にデバッグフラグを用意し、デバッグフラグを渡した際の挙動としては以下を行う
- コンソールへの表示・非表示切り替える
- デバッグフラグを渡した際は後述する Cloud Functions と GCP Emulators のシャットダウンを無効にする
これらの利点としては以下ことが挙げられます。
- デバッグ時にはより詳細なログを表示させながら実行が行える
- エラーを吐いた時のスタックトレースを見ることができる
- 実装中にテストが落ちてしまっても、Cloud Functions はクローズされずに手元で実行可能になる
- アプリケーションコードにホットリロードが有効になる
これらが有効になってる方が開発中には有用だと感じました。
3. GCP Emulators のセットアップ
GCP Emulators は立ち上がりましたが、実際にはそれを動かすための設定が必要になります。例としては Cloud Pub/Sub の topicの設定、Cloud Firestore に事前にモックデータを入れるなどが挙げられます。
以下は、Cloud Pub/Sub のトピック、サブスクライブの設定例です。
def setup_pubsub_topic(output_case):
topicname = output_case["topic"]
topic_id = f"{topicname}-topic"
subscription_id = f"{topicname}-subscription"
publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(PROJECT_ID, topic_id)
publisher.create_topic(topic_path)
subscription_path = subscriber.subscription_path(PROJECT_ID, subscription_id)
subscriber.create_subscription(subscription_path, topic_path)
4. Cloud Functions にリクエストを渡す
いよいよ実在する関数にリクエストをします。
以下のコードは post_request
を叩いて Cloud Functions にリクエストができます。
トリッキーなポイントとしては Functions Framework
は自動で "data"
というkeyの中身を括り出してくるのでわざと二重に書く必要があります。
def build_message(innermessage):
message = {"data": {"data": innermessage}}
return message
def post_request(dictmessage: dict, port: int):
innermessage = json.dumps(dictmessage).encode()
message = build_message(base64.b64encode(innermessage).decode())
response = requests.post(
f"http://localhost:{port}", data=json.dumps(message), headers={"Content-Type": "application/json"},
)
if response.status_code != requests.codes.ok:
raise Exception(f"failed http request. status is {response.status_code}")
5. GCP Emulators から結果を受け取り assert & GCP Emulators の中身を削除する
ここでは任意の GCP Emulators から結果を受け取ります。一般的には関数に引数を渡し、返り値を受け取って assert しますが、我々の実装としては前述の通り Cloud Pub/Sub をトリガーとして関数を実行してるので直接的な返り値を持ちません。つまり間接的に情報を受け取る必要があるので、実行中の Cloud Pub/Sub や Cloud Firestore から値を受け取ることになります。
以下のコードは Cloud Pub/Sub から値をpullし、それを期待される値と比較している例です。
def expect_check_pubsub(output_case):
topicname = output_case["topic"]
subscription_id = f"{topicname}-subscription"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(PROJECT_ID, subscription_id)
response = subscriber.pull(subscription_path, max_messages=5)
for msg in response.received_messages:
mess = json.loads(msg.message.data.decode("utf-8"))
assert mess == output_case["expect"]
ack_ids = [msg.ack_id for msg in response.received_messages]
subscriber.acknowledge(subscription_path, ack_ids)
また、次のテストを実行する際には値を引き継がないで動いて欲しいというケースがあると思います。 例えば Cloud Firestore を利用してる際は以下のような方法で任意のプロジェクト以下を全て削除することができます。
## cf. https://firebase.google.com/docs/emulator-suite/connect_firestore
def flush_firestore():
FIRESTORE_DELETE_ENDPOINT = (
f"http://{FIRESTORE_EMULATOR_HOST}/emulator/v1/projects/{PROJECT_ID}/databases/(default)/documents"
)
requests.delete(FIRESTORE_DELETE_ENDPOINT)
6. Cloud Functions と GCP Emulators をシャットダウンさせる
最後にシャットダウンする時には Functions Framework と GCP Emulator に対して pkill を行いクローズさせます。
コマンドライン上から別プロセスとしてサービスを実行してるので、単純にクライアントをクローズするだけでは Functions Framework や emulator を停止させてあげる必要があります。
async_exec_cmd
という関数は、内部的には Popen
と呼ばれる Python組み込みのクラスを利用しており、その Popen
のメソッドには kill
というプロセスを止めるモノがあります。しかし今回の場合は Functions Framework や GCP Emulator は複数プロセス立ち上がるのでそれらを取り回して各プロセスを閉じる実装は煩雑になると判断して、pkill を採用しました。
def service_teardown():
async_exec_cmd("pkill -9 -f functions-framework")
async_exec_cmd("pkill -9 -f google-cloud-sdk")
これで全容と実装を示し、無事自分たちのサービスにテストを導入することができました!
まとめ
この記事では Functions Framework と GCP Emulator で実装するローカル上で動かすCloud Functions の統合テストの実装例を紹介しました。
GCPのプロダクトをちゃんと使うとなると、我々の実装バグだけではなく GCP 本体のバグに当たることもしばしばあります。その問題を手軽に切り分けるためにもどこまでの実装が自分の意図とマッチしているのかというのは常に比較して行くべきで、その一つの方策として我々はこのような方法を取りました。
もちろんエミュレーターですので微妙な挙動の違いはあると思いますが、(公式なので)一定の質が担保がされたものを利用することができます。function を跨ぐようなテストも可能なので、この方法はテスタビリティを上げるひとつの解になると思います。
我々もまだまだ模索中ですがこの実装例がみなさんの参考になれば嬉しいです。またこれよりも更に良い実装があったら是非教えてください!
最後になりますが、時に厳しく時に優しく指導してくださったメンターの林さん、藤原さん、大谷さん、川瀬さん、 yurfuwa さんをはじめ、受け入れてくださったAIシステム部のみなさんに感謝します。
最後まで読んでいただき、ありがとうございます!
この記事をシェアしていただける方はこちらからお願いします。