Cloud Functionsローカル環境(Functions Framework)でPub/Subトリガのイベント関数を検証する方法

記事タイトルとURLをコピーする

こんにちは、G-gen の武井です。今回は Google Cloud (旧称 GCP) の Cloud Functions ローカル実行環境 (Functions Framework) で、Pub/Sub トリガのイベント関数を検証する方法について紹介したいと思います。

検証イメージ

今回の検証における構成は以下のとおりです。

検証イメージ

Cloud Scheduler が定期的に Cloud Pub/Sub キューにメッセージを投入、そして Pub/Sub をトリガとして Cloud Functions のイベント関数が実行されるイメージです。
ローカル環境での検証にあたって、次のツールを準備しました。

Functions Framework
Cloud Functions のローカル実行環境です。インストール方法等についてはこちらのブログを参照ください。 blog.g-gen.co.jp

Pub/Sub エミュレータ
Cloud Pub/Sub のローカル実行環境です。インストール方法等についてはのちほど説明します。
cloud.google.com

※ Cloud Scheduler についてはローカル環境で使用可能なツールはありませんので、サンプルスクリプトを用意しました。

Pub/Sub エミュレータのセットアップ

1. Pub/Sub エミュレータのインストール

まずは Pub/Sub エミュレータをインストールします。

gcloud components install pubsub-emulator

apt-get で gcloud をインストールしている場合は以下のエラーが想定されます。

> ERROR: (gcloud.components.install)
> You cannot perform this action because the Google Cloud CLI component manager
> is disabled for this installation. You can run the following command
> to achieve the same result for this installation:
>
> sudo apt-get install google-cloud-sdk-pubsub-emulator

その場合は代替コマンドでエミュレータをインストールします。
(コマンドは Debian の場合)

sudo apt-get install google-cloud-sdk-pubsub-emulator

2. gcloud コンポーネントのアップグレード

続けて gcloud コンポーネントをアップグレードします。

gcloud components update

apt-get で gcloud をインストールしている場合は以下のエラーが想定されます。

> Beginning update. This process may take several minutes.
> ERROR: (gcloud.components.update)
> You cannot perform this action because the Google Cloud CLI component manager
> is disabled for this installation. You can run the following command
> to achieve the same result for this installation:
>
> sudo apt-get update && sudo apt-get --only-upgrade install google-cloud-sdk-app-engine-go

その場合は代替コマンドでコンポーネントをアップグレードします。
(コマンドは Debian の場合)

sudo apt-get update && sudo apt-get --only-upgrade install google-cloud-sdk-app-engine-go google-cloud-sdk-minikube google-cloud-sdk-spanner-emulator kubectl google-cloud-sdk-skaffold google-cloud-sdk-app-engine-grpc google-cloud-sdk-cloud-build-local google-cloud-sdk-pubsub-emulator google-cloud-sdk-datastore-emulator google-cloud-sdk-kubectl-oidc google-cloud-sdk-bigtable-emulator google-cloud-sdk google-cloud-sdk-cloud-run-proxy google-cloud-sdk-config-connector google-cloud-sdk-app-engine-java google-cloud-sdk-anthos-auth google-cloud-sdk-firestore-emulator google-cloud-sdk-cbt google-cloud-sdk-app-engine-python-extras google-cloud-sdk-datalab google-cloud-sdk-local-extract google-cloud-sdk-nomos google-cloud-sdk-app-engine-python google-cloud-sdk-gke-gcloud-auth-plugin google-cloud-sdk-terraform-tools google-cloud-sdk-kpt

3. Pub/Sub エミュレータの起動

プロジェクト名を指定して Pub/Sub エミュレータを起動します。

gcloud beta emulators pubsub start --project=my-project

なお、エミュレータを停止すると作成したトピック等は削除されてしまうので、その都度作り直す必要があります。
また、エミュレータはフォアグラウンドに滞在して実行されるため、以降の作業は別ターミナルを起動して実施する必要があります。

4. Pub/Sub エミュレータスクリプトの取得

GitHub から Python リポジトリをクローンして Pub/Sub エミュレータスクリプトを取得します。前述の通り、エミュレータを起動した際に使用したものとは別のターミナルを起動して実行します。

git clone https://github.com/googleapis/python-pubsub

5. venv 仮想環境の作成

先ほどクローンした python-pubsub/samples/snippets ディレクトリに venv 仮想環境を作成し、依存関係をインストールします。

cd python-pubsub/samples/snippets
python3 -m venv venv
source venv/bin/activate
pip3 install -r requirements.txt

6. 環境変数の設定

Pub/Sub エミュレータ上にトピックやサブスクリプションを作成する際に必要となる情報を環境変数を設定します。

export PUBSUB_PROJECT_ID=my-project
export TOPIC_ID=cf-test
export PUSH_SUBSCRIPTION_ID=my-subscription

また、Pub/Sub API の向き先がローカルにインストールした Pub/Sub エミュレータになるよう、以下の環境変数を設定します。

$(gcloud beta emulators pubsub env-init)

Pub/Sub トピック、サブスクリプションの作成

1. Pub/Sub トピックの作成

Pub/Sub トピックを作成します。トピックとはメッセージキューに相当します。
作業ディレクトリは 引数で指定するファイル (publisher.py) が存在する python-pubsub/samples/snippets です。

python3 publisher.py ${PUBSUB_PROJECT_ID} create ${TOPIC_ID}

作成したトピックを確認するには以下のコマンドを実行します。

python3 publisher.py ${PUBSUB_PROJECT_ID} list

2. Push サブスクリプションの作成

イベント関数のトリガとなる Push サブスクリプションを作成します。
Push サブスクリプションが宛先としている http://localhost:8080 は、のちほど Functions Framework で実行するイベント関数となります。

python3 subscriber.py $PUBSUB_PROJECT_ID create-push $TOPIC_ID $PUSH_SUBSCRIPTION_ID http://localhost:8080

作成ができると、以下のような戻り値が表示されます。

Push subscription created: name: "projects/my-project/subscriptions/my-subscription"
topic: "projects/my-project/topics/cf-test"
push_config {
  push_endpoint: "http://localhost:8080"
}
ack_deadline_seconds: 10
message_retention_duration {
  seconds: 604800
}
.
Endpoint for subscription is: http://localhost:8080

動作確認

1. Functions Framework の起動

Functions Framework を起動してイベント関数をデバッグモードでモニタするため、これまでの作業で使用したものとは別のターミナルを起動します。

今回準備したイベント関数 (main.py) は、Pub/Sub エミュレータのサブスクリプションからメッセージを受信したら、それをトリガとしてメッセージ内容を出力する仕組みです。任意のディレクトリに配置してからコマンドを実行します。

(以下のコードはご自由にご利用いただけますが、あくまでサンプルコードであり、ご利用によって発生したトラブル等に関して当社では責任を持ちかねます。以下、全てのコードで同様です)

main.py

import base64
import json
 
def main(event, context):
    if 'data' in event:
         message = base64.b64decode(event['data']).decode('utf-8')
    
    message_dict = json.loads(message)
 
    if message_dict["text"]:
         print("Received:")
         print(message_dict["text"])

実行コマンド

functions-framework --target=main --signature-type=event --debug --port=8080

なお、イベント関数についてもフォアグラウンドに滞在して実行されますので、以降の作業は別のターミナルを起動して実行します。

2. イベント関数のテスト

Cloud Scheduler が担うはずの「メッセージの送信 (パブリッシュ)」については、ローカル環境で代替可能なツールが存在しないため、自作のサンプルスクリプト (ggen_publisher.py) を用いて実現します。

ggen_publisher.py

#!/usr/bin/env python
 
import datetime  
import sys

from google.cloud import pubsub_v1
    
    
def publish_message(project_id: str, topic_id: str, file_path: str):
    # パブリッシャークライアントを作成
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project_id, topic_id)
 
    # JSON ファイルを読み込み
    with open(file_path) as f:
        data_str = f.read() 
 
    # メッセージをパブリッシュする
    data = data_str.encode("utf-8")
    future = publisher.publish(topic_path, data )
    future.result()
 
    return 0
 
   
if __name__ == "__main__":
    args = sys.argv
    project_id = args[1]
    topic_id = args[2]
    file_path = args[3]
 
    publish_message(project_id, topic_id, file_path)

次に test.json というファイル名で、 Cloud Functions に送信するメッセージの本文を定義し、スクリプトと同じディレクトリに配置します。

test.json

{
    "text" : "this is a test message"
}

次のコマンドを実行してスクリプトを動かします。

Pub/Sub 関連の変数を定義した際の作業ターミナルとは異なるため、こちらのターミナルでも変数を定義する必要があります。

引数で指定したファイル (test.json) ですが、Cloud Scheduler で送信するメッセージに相当します。スクリプトと同じディレクトリに配置した上でコマンドを実行します。

実行コマンド

export PUBSUB_PROJECT_ID=my-project
export TOPIC_ID=cf-test
export PUSH_SUBSCRIPTION_ID=my-subscription
$(gcloud beta emulators pubsub env-init)
python3 ggen_publisher.py $PUBSUB_PROJECT_ID $TOPIC_ID ./test.json

コマンドを実行したターミナルでは、トピック (cf-test) に対して JSON 形式のメッセージをパブリッシュした旨のログが出力されたことがわかります。

コマンド実行画面の出力

また、イベント関数を実行したターミナルでは、サブスクリプションから受信したメッセージを出力していることがわかります。

デバッグ画面の出力

これにより、今回の目的であった Pub/Sub をトリガとしたイベント関数が無事に実行されたことがわかりましたので、検証は成功です。

武井 祐介 (記事一覧)

クラウドソリューション部

2022年4月より G-gen にジョイン。前職では AWS を中心にインフラ環境の設計構築、運用保守業務を担当。先日 Associate Cloud Engineer に続いて Professional Cloud Architect を取得。

6月末の Professional DataEngineer の受験に向けて追い込み中。