G-gen の佐々木です。当記事では、Google Cloud (旧称 GCP) が提供するメッセージングサービスである Cloud Pub/Sub と、そのクライアントライブラリを使用することで、継続的に送信されるデータ(メッセージ)をリアルタイムで処理する仕組みを実装していきます。
構成
今回は Cloud Run jobs をパブリッシャーとして、Pub/Sub トピックにメッセージを送信するジョブを並列して実行する、という構成を検証します。
GKE クラスタ上に展開した Pod をサブスクライバーとし、Pub/Sub の API である StreamingPull API を使用することで、Pub/Sub に送信されたメッセージをリアルタイムで受信・処理します。

使用するサービスの解説
Cloud Pub/Sub
Cloud Pub/Sub(以下、Pub/Sub)は、メッセージを生成する パブリッシャー とそれを処理する サブスクライバー(コンシューマー) を切り離すマネージドな メッセージング サービス です。
Pub/Sub を使用することで、パブリッシャーとサブスクライバーの互換性・拡張性が担保された粗結合なシステムを構成することができます。
- 参考:公式ドキュメント
Cloud Run jobs
Cloud Run jobs とは、サーバーレス コンテナコンピューティングサービスである Cloud Run の 1 機能です。
Cloud Run jobs を使用することで、コンテナイメージとして実装したジョブを、手動、スケジュール、ワークフローによる任意タイミングで並列して実行することができます。
Cloud Run jobs の詳細については以下の記事で解説しています。
Google Kubernetes Engine(GKE)
Google Kubernetes Engine(以下、GKE)は、コンテナ オーケストレーションツールである Kubernetes の Google マネージドなクラスタを利用することができるサービスです。
GKE の詳細については以下の記事で解説しています。
StreamingPull API とは
Pub/Sub クライアントライブラリで StreamingPull API を使用すると、アプリケーションと Pub/Sub との間に永続的な双方向接続が維持され、Pub/Sub でメッセージが利用可能になるとすぐに pull されます。
この仕様により、1 つの pull リクエストで 1 つの pull レスポンスが返る通常の 単項 Pull と比較すると、高スループット・低レイテンシでメッセージを処理することができます。
StreamingPull API は以下の言語のクライアントライブラリで使用することができます。
- C++
- C#
- Go(当記事で使用)
- Java
- Node.js
- Python
- Ruby
Pub/Sub のクライアントライブラリは上記言語のほか PHP でも利用できますが、StreamingPull API については PHP ではサポートされていません。
Pub/Sub の設定
Pub/Sub のトピックとサブスクリプションをそれぞれデフォルトの状態で作成します。
トピック名、サブスクリプション名は後ほど環境変数で使用します。
# Pub/Subトピックの作成 $ gcloud pubsub topics create {トピック名} # 実行例 $ gcloud pubsub topics create mytopic
# Pub/Subサブスクリプションの作成 $ gcloud pubsub subscriptions create {サブスクリプション名} --topic={トピック名} # 実行例 $ gcloud pubsub subscriptions create mysubscription --topic=mytopic
パブリッシャーの作成(Cloud Run jobs)
Pub/Sub にメッセージをパブリッシュするジョブを Cloud Run jobs で作成します。
コンテナイメージの作成
使用するコード
当記事では 公式ドキュメント のサンプルコードを参考に、Go 言語で処理を実装していきます。
メッセージのパブリッシュには Pub/Sub のクライアントライブラリである cloud.google.com/go/pubsub
を使用します。
package main import ( "context" "fmt" "log" "os" "time" "cloud.google.com/go/pubsub" // Pub/Subクライアントライブラリ "github.com/google/uuid" ) type Attributes struct { taskNum string uuid string creationTime string } // メッセージの内容を生成する関数 func generateAttributes() Attributes { // タスク番号を取得(Cloud Run jobsのデフォルトの環境変数) taskNum := os.Getenv("CLOUD_RUN_TASK_INDEX") attr := Attributes{ taskNum: taskNum, uuid: fmt.Sprint(uuid.New()), creationTime: time.Now().Format("2006-01-02 15:04:05"), } return attr } // メッセージをパブリッシュする関数 func publishMessage(c context.Context, attr Attributes) error { // 環境変数からプロジェクトIDとPubSubトピックID を取得 projectId := os.Getenv("PROJECT_ID") topicId := os.Getenv("TOPIC_ID") // クライアント作成 client, err := pubsub.NewClient(c, projectId) if err != nil { return fmt.Errorf("pubsub.NewClient: %v", err) } defer client.Close() // Pub/Sub トピックの参照 t := client.Topic(topicId) // トピックにメッセージをパブリッシュ result := t.Publish(c, &pubsub.Message{ Attributes: map[string]string{ "taskNum": attr.taskNum, "uuid": attr.uuid, "creationTime": attr.creationTime, }}) id, err := result.Get(c) if err != nil { return fmt.Errorf("result.Get: %v", err) } fmt.Printf("Published a messsage; msg ID: %v\n", id) return nil } func main() { // 空のコンテキストを作成 ctx := context.Background() // メッセージ内容の生成 attr := generateAttributes() // メッセージのパブリッシュ err := publishMessage(ctx, attr) if err != nil { log.Fatal(err) } }
コンテナイメージのビルド
ここでは Dockerfile を使用せず、Buildpack を使用してコンテナイメージをビルドします。
Buildpack を使用することで、ソースコードを自動でパッケージ化し、デプロイ可能なコンテナイメージを生成することができます。
コンテナイメージは Artifact Registry のリポジトリにプッシュします(リポジトリがない場合は作成してください)。
# Buildpackを使用してイメージをビルドする $ gcloud builds submit --pack image={リポジトリのURL}/{コンテナイメージ名} # 実行例(リポジトリにArtifact Registryを使用) $ gcloud builds submit --pack image=asia-northeast1-docker.pkg.dev/myproject/pubsub-container/pubsub-publisher
ジョブの作成
コンテナイメージを使用して Cloud Run jobs でジョブを作成します。
環境変数として プロジェクトID
と トピック名
を設定し、同時に実行する Task の数を 50
に設定しています。
# Cloud Run jobsのジョブを作成する(実行Task数=50) $ gcloud run jobs create {ジョブ名} \ --image {イメージのURL} \ --region {リージョン} \ --tasks 50 \ --set-env-vars PROJECT_ID={プロジェクトID},TOPIC_ID={トピック名} # 実行例 $ gcloud run jobs create jobs-pubsub-publisher \ --image asia-northeast1-docker.pkg.dev/myproject/pubsub-container/pubsub-publisher \ --region asia-northeast1 \ --tasks 50 \ --set-env-vars PROJECT_ID=myproject,TOPIC_ID=mytopic
当記事では認証にデフォルトのサービスアカウントを使用します。サービスアカウントを個別に設定する場合、トピックに対する「Pub/Sub パブリッシャー(roles/pubsub.publisher
)」ロールが紐付いたサービスアカウントを使用してください。
サブスクライバーの作成(GKE)
Pub/Sub からメッセージを Pull するアプリケーションを GKE クラスタにデプロイします。
コンテナイメージの作成
使用するコード
公式ドキュメント のサンプルコードを参考に、こちらも Go 言語で処理を実装していきます。
パブリッシュの処理と同様に cloud.google.com/go/pubsub
ライブラリを使用し、メッセージをストリーミングで Pull するようにコードを記述します。
ここでは検証のため、Pull したメッセージはそのまま標準出力に出力します。
package main import ( "context" "fmt" "io" "log" "os" "cloud.google.com/go/pubsub" // Pub/Subクライアントライブラリ ) // メッセージをStreamingPullする関数 func pullMessages(w io.Writer, c context.Context, projectId, subId string) error { // クライアント作成 client, err := pubsub.NewClient(c, projectId) if err != nil { return fmt.Errorf("pubsub.NewClient: %v", err) } defer client.Close() // サブスクリプションの参照 sub := client.Subscription(subId) // メッセージをpullし続ける err = sub.Receive(c, func(_ context.Context, msg *pubsub.Message) { fmt.Fprintf(w, "%v\n", msg.Attributes) // メッセージを標準出力に出力 msg.Ack() }) if err != nil { return fmt.Errorf("sub.Receive: %v", err) } return nil } func main() { // 空のコンテキスト作成 ctx := context.Background() // プロジェクトID と PubSubトピックID を取得 projectId := os.Getenv("PROJECT_ID") subId := os.Getenv("SUBSCRIPTION_ID") err := pullMessages(os.Stdout, ctx, projectId, subId) if err != nil { log.Fatal(err) } }
コンテナイメージを Artifact Registry にプッシュ
アプリケーションを GKE にデプロイするために、コンテナイメージを作成します。
以下の Dockerfile を使用します。
FROM golang:1.20 WORKDIR /usr/src/app COPY go.mod go.sum main.go ./ RUN go mod download && go mod verify RUN go build -v -o /usr/local/bin/app ./... CMD ["app"]
Cloud Build を使用し、ビルドしたコンテナイメージを Artifact Registry のリポジトリにプッシュします。
# コンテナイメージ $ gcloud builds submit --tag={リポジトリのURL}/{コンテナイメージ名} # 実行例 $ gcloud builds submit --tag=asia-northeast1-docker.pkg.dev/myproject/pubsub-container/pubsub-subscriber
GKE クラスタの作成
当記事では Autopilot モードの GKE クラスタを使用します。
# AutopilotモードのGKEクラスタを作成する $ gcloud container clusters create-auto {クラスタ名} \ --region {リージョン} \ --network {VPC名} --subnetwork {サブネット名} # 実行例 $ gcloud container clusters create-auto mycluster-autopilot \ --region asia-northeast1 \ --network myvpc \ --subnetwork mysubnet
Workload Identity の設定
アプリケーションが Pub/Sub からメッセージを Pull できるように、Workload Identity によって Pod から IAM サービスアカウントを使用できるようにします。
当記事では便宜上、Google Cloud APIs にアクセスするためのサービスアカウントを GSA
、Kubernetes の ServiceAccount リソースを KSA
と呼びます。
GSA の作成
Pub/Sub に対するアクセス権限を持った GSA を作成します。
# GSA を作成する $ gcloud iam service-accounts create {GSAの名前} --project {プロジェクトID} # 実行例 $ gcloud iam service-accounts create my-gsa --project myproject
作成した Pub/Sub サブスクリプションからメッセージを Pull できるように、「Pub/Sub サブスクライバー(roles/pubsub.subscriber
)」を付与します。
# GSA に IAM ロールを紐付ける $ gcloud pubsub subscriptions add-iam-policy-binding {サブスクリプション名} \ --role "roles/pubsub.subscriber" \ --member "serviceAccount:{GSAの名前}@{プロジェクトID}.iam.gserviceaccount.com" # 実行例 $ gcloud pubsub subscriptions add-iam-policy-binding mysubscription \ --role "roles/pubsub.subscriber" \ --member "serviceAccount:my-gsa@myproject.iam.gserviceaccount.com"
GKE クラスタに ServiceAccount リソースを作成
以下のマニフェストファイルを使用して、KSA を GKE クラスタに作成します。
apiVersion: v1 kind: ServiceAccount metadata: name: my-ksa annotations: # Workload Identity で紐付ける GSA を指定する iam.gke.io/gcp-service-account: my-gsa@myproject.iam.gserviceaccount.com
KSA と GSA の紐付け
GSA に対する Workload Identity User (roles/iam.workloadIdentityUser)
ロールを KSA に紐付け、KSA が GSA の権限を借用できるようにします。
# KSA と GSA を紐付ける $ gcloud iam service-accounts add-iam-policy-binding {GSAの名前}@{プロジェクトID}.iam.gserviceaccount.com \ --role roles/iam.workloadIdentityUser \ --member "serviceAccount:{プロジェクトID}.svc.id.goog[{KSAを作成したNamespace}/{KSAの名前}]" # 実行例(default名前空間を使用している場合) $ gcloud iam service-accounts add-iam-policy-binding my-gsa@myproject.iam.gserviceaccount.com \ --role roles/iam.workloadIdentityUser \ --member "serviceAccount:myproject.svc.id.goog[default/my-ksa]"
アプリケーションのデプロイ(GKE)
以下のマニフェストを使用して、GKE クラスタにアプリケーションをデプロイします。
複数の Pod を使用してメッセージを Pull できるように Deployment リソースを作成します。
apiVersion: apps/v1 kind: Deployment metadata: name: pubsub-subscriber spec: replicas: 3 selector: matchLabels: app: subscriber template: metadata: labels: app: subscriber spec: containers: - name: subsc-container image: "asia-northeast1-docker.pkg.dev/myproject/pubsub-container/pubsub-subscriber:latest" # コンテナイメージのURL env: - name: "PROJECT_ID" value: "myproject" # Pub/Subを作成したプロジェクトのID - name: "SUBSCRIPTION_ID" value: "mysubscription" # Pub/Subサブスクリプションの名前 serviceAccountName: my-ksa # Workload Identityで使用するServiceAccount
Pod のステータスがすべて Running
になるまで待機します。
# Podのステータスを確認する $ kubectl get po NAME READY STATUS RESTARTS AGE pubsub-subscriber-555ffcb5df-fxm2z 1/1 Running 0 3m3s pubsub-subscriber-555ffcb5df-sz4xl 1/1 Running 0 3m3s pubsub-subscriber-555ffcb5df-xmrr2 1/1 Running 0 3m3s
動作確認
ジョブの実行(Cloud Run jobs)
Cloud Run jobs のジョブを実行し、Pub/Sub にメッセージをパブリッシュします。
# ジョブを実行する $ gcloud run jobs execute {ジョブ名} --region {リージョン} # 実行例 $ gcloud run jobs execute jobs-pubsub-publisher --region asia-northeast1
Pod のログを確認(GKE)
kubectl logs
コマンドで Pod のログを確認すると、Cloud Run jobs のジョブがパブリッシュしたメッセージが記録されています。
複数の Pod を展開しているため、Pod ごとにメッセージが分散処理されています。
# Podのログを確認する $ kubectl logs {Pod名} # 実行例 $ kubectl logs pubsub-subscriber-555ffcb5df-fxm2z map[creationTime:2023-05-03 15:15:49 taskNum:35 uuid:c38275b5-da20-4527-9e28-cec2744f6374] map[creationTime:2023-05-03 15:15:49 taskNum:2 uuid:f5ab93a7-6a93-4637-b50e-ee491cdfcc8d] map[creationTime:2023-05-03 15:15:49 taskNum:30 uuid:e82aa1ef-c53b-4812-a3a3-e74437b436ef] map[creationTime:2023-05-03 15:15:49 taskNum:23 uuid:8bc9d8c0-cc4d-49d8-b565-c3e26570c92c] map[creationTime:2023-05-03 15:15:50 taskNum:39 uuid:1594b54e-2ed0-4a2e-b248-6613677c4cf3] map[creationTime:2023-05-03 15:15:49 taskNum:21 uuid:b81177da-d6de-4ee2-acb9-bf1442286cd5] map[creationTime:2023-05-03 15:15:50 taskNum:31 uuid:c4950a18-5fba-4a8c-aae1-6c628db22c65] map[creationTime:2023-05-03 15:15:50 taskNum:49 uuid:20b3ca60-8a83-4116-ab54-a26e187bd8d0] map[creationTime:2023-05-03 15:15:50 taskNum:16 uuid:ede7ab24-d246-485f-8bcb-2a71fcf9e3a5] map[creationTime:2023-05-03 15:15:49 taskNum:48 uuid:166a1bd9-f0ac-4c93-8129-7110b2429f55] map[creationTime:2023-05-03 15:15:50 taskNum:42 uuid:2faa0967-46f6-4a86-84f5-c1e821607c83] map[creationTime:2023-05-03 15:15:52 taskNum:46 uuid:0eea95d7-a673-4e44-b214-ca49dd34be91] $ kubectl logs pubsub-subscriber-555ffcb5df-sz4xl map[creationTime:2023-05-03 15:15:48 taskNum:41 uuid:86be2dd1-196f-4d04-8990-fbd1fa6bc0db] map[creationTime:2023-05-03 15:15:48 taskNum:5 uuid:b0f957d8-e113-451d-ba9e-19dc061b1632] map[creationTime:2023-05-03 15:15:48 taskNum:0 uuid:e3690426-4c7d-4fe8-b927-8a6343f23a54] map[creationTime:2023-05-03 15:15:49 taskNum:18 uuid:760c5c63-d41f-4d78-b2fe-9d839d872859] map[creationTime:2023-05-03 15:15:49 taskNum:12 uuid:dbfdab23-687e-4dce-9ae9-8c775e88d873] map[creationTime:2023-05-03 15:15:48 taskNum:17 uuid:f13c4842-5503-4e0b-a91b-57879563448b] map[creationTime:2023-05-03 15:15:49 taskNum:33 uuid:f307f4f9-4735-4abd-8232-b2793f6dd08c] map[creationTime:2023-05-03 15:15:49 taskNum:1 uuid:e5f5a0d2-7430-4816-8a24-4bc0464c1c9c] map[creationTime:2023-05-03 15:15:49 taskNum:44 uuid:9d23a4f4-4c3e-40b9-be85-ea191936ecb6] map[creationTime:2023-05-03 15:15:48 taskNum:14 uuid:5389088c-0452-46a2-ba96-255f79404741] map[creationTime:2023-05-03 15:15:48 taskNum:32 uuid:171f0b3a-a19f-4574-b75d-87ddf30e9b7f] map[creationTime:2023-05-03 15:15:48 taskNum:47 uuid:6cc0f34c-d9ac-4ab9-8308-45c21a7c1c52] map[creationTime:2023-05-03 15:15:49 taskNum:24 uuid:9fef79ac-a637-451f-9c3c-5ddbfd99631d] map[creationTime:2023-05-03 15:15:50 taskNum:40 uuid:8ac572b6-e02a-4505-b7bf-d491410f28e3] map[creationTime:2023-05-03 15:15:51 taskNum:19 uuid:59f52dc0-56e5-4780-a6ac-8e0b9d0ffad8] map[creationTime:2023-05-03 15:15:51 taskNum:28 uuid:6cf456b1-b234-4847-a8f7-7ee8c6e06c97] map[creationTime:2023-05-03 15:15:51 taskNum:37 uuid:cad50885-e41e-4d97-ae99-24c92507cfcf] map[creationTime:2023-05-03 15:15:51 taskNum:25 uuid:54fc2bec-c510-4ed4-a247-64a1141dd1e0] map[creationTime:2023-05-03 15:15:51 taskNum:7 uuid:1ee32849-261a-4e2e-82d8-04426675b88b] map[creationTime:2023-05-03 15:15:50 taskNum:15 uuid:bae917a3-01e5-4c20-b678-6f21bfe878e2] map[creationTime:2023-05-03 15:15:50 taskNum:27 uuid:d6a5a28f-4e24-460f-8d09-5f79af09f74c] map[creationTime:2023-05-03 15:15:51 taskNum:43 uuid:f873af32-edc5-45fc-9752-3f87cc3309e1] map[creationTime:2023-05-03 15:15:51 taskNum:10 uuid:120f48fb-3377-4896-9cb2-a32dfe00f25b] $ kubectl logs pubsub-subscriber-555ffcb5df-xmrr2 map[creationTime:2023-05-03 15:15:49 taskNum:26 uuid:39cbb823-592c-40e9-99cb-bf3ce52f2989] map[creationTime:2023-05-03 15:15:48 taskNum:38 uuid:447a296c-564d-4dc9-a04a-155f67c760e9] map[creationTime:2023-05-03 15:15:49 taskNum:11 uuid:0989fe2c-2fc0-4668-95d6-b1829fa6d92d] map[creationTime:2023-05-03 15:15:48 taskNum:29 uuid:db6e7299-47d8-473d-aebb-e18f2c846de6] map[creationTime:2023-05-03 15:15:49 taskNum:8 uuid:501f74cf-ccb8-4b1e-b388-32f565da889d] map[creationTime:2023-05-03 15:15:49 taskNum:6 uuid:108e22df-629d-4d88-9ab0-5758f608237f] map[creationTime:2023-05-03 15:15:49 taskNum:36 uuid:6e85faa5-d2ed-48b6-8866-d0114386303c] map[creationTime:2023-05-03 15:15:49 taskNum:20 uuid:aa07a78a-e478-4df7-9fef-835e215bc2ee] map[creationTime:2023-05-03 15:15:49 taskNum:45 uuid:4a43bad3-5a3d-4315-a565-ca7137104aa8] map[creationTime:2023-05-03 15:15:49 taskNum:9 uuid:f3f4c132-80ff-4850-8bea-94371b9cdfe5] map[creationTime:2023-05-03 15:15:50 taskNum:34 uuid:d026ad8e-f1cb-49df-bd33-9cf03cb0ee55] map[creationTime:2023-05-03 15:15:50 taskNum:13 uuid:f6731fca-d48b-41d7-a505-61e44425d817] map[creationTime:2023-05-03 15:15:51 taskNum:22 uuid:6ade1f2a-229b-4664-bf82-55aec79ba8fa] map[creationTime:2023-05-03 15:15:49 taskNum:3 uuid:95291f1f-94d3-44bb-a22c-97bfdd59fa25] map[creationTime:2023-05-03 15:15:51 taskNum:4 uuid:469e54b9-de42-4fb2-9c38-3f5c3b625d7b]
佐々木 駿太 (記事一覧)
G-gen 最北端、北海道在住のクラウドソリューション部エンジニア。
2022 年 6 月に G-gen にジョイン。Google Cloud All Certifications Engineer。
好きな Google Cloud プロダクトは Cloud Run。
Follow @sasashun0805