こんにちは、G-gen の武井です。今回は Google Cloud (旧称 GCP) の Cloud Functions ローカル実行環境 (Functions Framework) で、Pub/Sub トリガのイベント関数を検証する方法について紹介したいと思います。
検証イメージ
今回の検証における構成は以下のとおりです。
Cloud Scheduler が定期的に Cloud Pub/Sub キューにメッセージを投入、そして Pub/Sub をトリガとして Cloud Functions のイベント関数が実行されるイメージです。
ローカル環境での検証にあたって、次のツールを準備しました。
Functions Framework
Cloud Functions のローカル実行環境です。インストール方法等についてはこちらのブログを参照ください。
blog.g-gen.co.jp
Pub/Sub エミュレータ
Cloud Pub/Sub のローカル実行環境です。インストール方法等についてはのちほど説明します。
cloud.google.com
※ Cloud Scheduler についてはローカル環境で使用可能なツールはありませんので、サンプルスクリプトを用意しました。
Pub/Sub エミュレータのセットアップ
1. Pub/Sub エミュレータのインストール
まずは Pub/Sub エミュレータをインストールします。
gcloud components install pubsub-emulator
apt-get で gcloud をインストールしている場合は以下のエラーが想定されます。
> ERROR: (gcloud.components.install) > You cannot perform this action because the Google Cloud CLI component manager > is disabled for this installation. You can run the following command > to achieve the same result for this installation: > > sudo apt-get install google-cloud-sdk-pubsub-emulator
その場合は代替コマンドでエミュレータをインストールします。
(コマンドは Debian の場合)
sudo apt-get install google-cloud-sdk-pubsub-emulator
2. gcloud コンポーネントのアップグレード
続けて gcloud コンポーネントをアップグレードします。
gcloud components update
apt-get で gcloud をインストールしている場合は以下のエラーが想定されます。
> Beginning update. This process may take several minutes. > ERROR: (gcloud.components.update) > You cannot perform this action because the Google Cloud CLI component manager > is disabled for this installation. You can run the following command > to achieve the same result for this installation: > > sudo apt-get update && sudo apt-get --only-upgrade install google-cloud-sdk-app-engine-go
その場合は代替コマンドでコンポーネントをアップグレードします。
(コマンドは Debian の場合)
sudo apt-get update && sudo apt-get --only-upgrade install google-cloud-sdk-app-engine-go google-cloud-sdk-minikube google-cloud-sdk-spanner-emulator kubectl google-cloud-sdk-skaffold google-cloud-sdk-app-engine-grpc google-cloud-sdk-cloud-build-local google-cloud-sdk-pubsub-emulator google-cloud-sdk-datastore-emulator google-cloud-sdk-kubectl-oidc google-cloud-sdk-bigtable-emulator google-cloud-sdk google-cloud-sdk-cloud-run-proxy google-cloud-sdk-config-connector google-cloud-sdk-app-engine-java google-cloud-sdk-anthos-auth google-cloud-sdk-firestore-emulator google-cloud-sdk-cbt google-cloud-sdk-app-engine-python-extras google-cloud-sdk-datalab google-cloud-sdk-local-extract google-cloud-sdk-nomos google-cloud-sdk-app-engine-python google-cloud-sdk-gke-gcloud-auth-plugin google-cloud-sdk-terraform-tools google-cloud-sdk-kpt
3. Pub/Sub エミュレータの起動
プロジェクト名を指定して Pub/Sub エミュレータを起動します。
gcloud beta emulators pubsub start --project=my-project
なお、エミュレータを停止すると作成したトピック等は削除されてしまうので、その都度作り直す必要があります。
また、エミュレータはフォアグラウンドに滞在して実行されるため、以降の作業は別ターミナルを起動して実施する必要があります。
4. Pub/Sub エミュレータスクリプトの取得
GitHub から Python リポジトリをクローンして Pub/Sub エミュレータスクリプトを取得します。前述の通り、エミュレータを起動した際に使用したものとは別のターミナルを起動して実行します。
git clone https://github.com/googleapis/python-pubsub
5. venv 仮想環境の作成
先ほどクローンした python-pubsub/samples/snippets
ディレクトリに venv 仮想環境を作成し、依存関係をインストールします。
cd python-pubsub/samples/snippets python3 -m venv venv source venv/bin/activate pip3 install -r requirements.txt
6. 環境変数の設定
Pub/Sub エミュレータ上にトピックやサブスクリプションを作成する際に必要となる情報を環境変数を設定します。
export PUBSUB_PROJECT_ID=my-project export TOPIC_ID=cf-test export PUSH_SUBSCRIPTION_ID=my-subscription
また、Pub/Sub API の向き先がローカルにインストールした Pub/Sub エミュレータになるよう、以下の環境変数を設定します。
$(gcloud beta emulators pubsub env-init)
Pub/Sub トピック、サブスクリプションの作成
1. Pub/Sub トピックの作成
Pub/Sub トピックを作成します。トピックとはメッセージキューに相当します。
作業ディレクトリは 引数で指定するファイル (publisher.py) が存在する python-pubsub/samples/snippets
です。
python3 publisher.py ${PUBSUB_PROJECT_ID} create ${TOPIC_ID}
作成したトピックを確認するには以下のコマンドを実行します。
python3 publisher.py ${PUBSUB_PROJECT_ID} list
2. Push サブスクリプションの作成
イベント関数のトリガとなる Push サブスクリプションを作成します。
Push サブスクリプションが宛先としている http://localhost:8080
は、のちほど Functions Framework で実行するイベント関数となります。
python3 subscriber.py $PUBSUB_PROJECT_ID create-push $TOPIC_ID $PUSH_SUBSCRIPTION_ID http://localhost:8080
作成ができると、以下のような戻り値が表示されます。
Push subscription created: name: "projects/my-project/subscriptions/my-subscription" topic: "projects/my-project/topics/cf-test" push_config { push_endpoint: "http://localhost:8080" } ack_deadline_seconds: 10 message_retention_duration { seconds: 604800 } . Endpoint for subscription is: http://localhost:8080
動作確認
1. Functions Framework の起動
Functions Framework を起動してイベント関数をデバッグモードでモニタするため、これまでの作業で使用したものとは別のターミナルを起動します。
今回準備したイベント関数 (main.py) は、Pub/Sub エミュレータのサブスクリプションからメッセージを受信したら、それをトリガとしてメッセージ内容を出力する仕組みです。任意のディレクトリに配置してからコマンドを実行します。
(以下のコードはご自由にご利用いただけますが、あくまでサンプルコードであり、ご利用によって発生したトラブル等に関して当社では責任を持ちかねます。以下、全てのコードで同様です)
main.py
import base64 import json def main(event, context): if 'data' in event: message = base64.b64decode(event['data']).decode('utf-8') message_dict = json.loads(message) if message_dict["text"]: print("Received:") print(message_dict["text"])
実行コマンド
functions-framework --target=main --signature-type=event --debug --port=8080
なお、イベント関数についてもフォアグラウンドに滞在して実行されますので、以降の作業は別のターミナルを起動して実行します。
2. イベント関数のテスト
Cloud Scheduler が担うはずの「メッセージの送信 (パブリッシュ)」については、ローカル環境で代替可能なツールが存在しないため、自作のサンプルスクリプト (ggen_publisher.py) を用いて実現します。
ggen_publisher.py
#!/usr/bin/env python import datetime import sys from google.cloud import pubsub_v1 def publish_message(project_id: str, topic_id: str, file_path: str): # パブリッシャークライアントを作成 publisher = pubsub_v1.PublisherClient() topic_path = publisher.topic_path(project_id, topic_id) # JSON ファイルを読み込み with open(file_path) as f: data_str = f.read() # メッセージをパブリッシュする data = data_str.encode("utf-8") future = publisher.publish(topic_path, data ) future.result() return 0 if __name__ == "__main__": args = sys.argv project_id = args[1] topic_id = args[2] file_path = args[3] publish_message(project_id, topic_id, file_path)
次に test.json というファイル名で、 Cloud Functions に送信するメッセージの本文を定義し、スクリプトと同じディレクトリに配置します。
test.json
{ "text" : "this is a test message" }
次のコマンドを実行してスクリプトを動かします。
Pub/Sub 関連の変数を定義した際の作業ターミナルとは異なるため、こちらのターミナルでも変数を定義する必要があります。
引数で指定したファイル (test.json) ですが、Cloud Scheduler で送信するメッセージに相当します。スクリプトと同じディレクトリに配置した上でコマンドを実行します。
実行コマンド
export PUBSUB_PROJECT_ID=my-project export TOPIC_ID=cf-test export PUSH_SUBSCRIPTION_ID=my-subscription $(gcloud beta emulators pubsub env-init) python3 ggen_publisher.py $PUBSUB_PROJECT_ID $TOPIC_ID ./test.json
コマンドを実行したターミナルでは、トピック (cf-test) に対して JSON 形式のメッセージをパブリッシュした旨のログが出力されたことがわかります。
また、イベント関数を実行したターミナルでは、サブスクリプションから受信したメッセージを出力していることがわかります。
これにより、今回の目的であった Pub/Sub をトリガとしたイベント関数が無事に実行されたことがわかりましたので、検証は成功です。
武井 祐介 (記事一覧)
クラウドソリューション部所属。G-gen唯一の山梨県在住エンジニア
Google Cloud Partner Top Engineer 2025 選出。IaC や CI/CD 周りのサービスやプロダクトが興味分野です。
趣味はロードバイク、ロードレースやサッカー観戦です。
Follow @ggenyutakei