Pub/SubでStreamingPull APIを使用してメッセージをリアルタイムで処理する

記事タイトルとURLをコピーする

G-gen の佐々木です。当記事では、Google Cloud (旧称 GCP) が提供するメッセージングサービスである Cloud Pub/Sub と、そのクライアントライブラリを使用することで、継続的に送信されるデータ(メッセージ)をリアルタイムで処理する仕組みを実装していきます。

構成

今回は Cloud Run jobs をパブリッシャーとして、Pub/Sub トピックにメッセージを送信するジョブを並列して実行する、という構成を検証します。

GKE クラスタ上に展開した Pod をサブスクライバーとし、Pub/Sub の API である StreamingPull API を使用することで、Pub/Sub に送信されたメッセージをリアルタイムで受信・処理します。

Pub/SubとGKEを使用したストリーミング処理

使用するサービスの解説

Cloud Pub/Sub

Cloud Pub/Sub(以下、Pub/Sub)は、メッセージを生成する パブリッシャー とそれを処理する サブスクライバー(コンシューマー) を切り離すマネージドな メッセージング サービス です。
Pub/Sub を使用することで、パブリッシャーとサブスクライバーの互換性・拡張性が担保された粗結合なシステムを構成することができます。

Cloud Run jobs

Cloud Run jobs とは、サーバーレス コンテナコンピューティングサービスである Cloud Run の 1 機能です。
Cloud Run jobs を使用することで、コンテナイメージとして実装したジョブを、手動、スケジュール、ワークフローによる任意タイミングで並列して実行することができます。

Cloud Run jobs の詳細については以下の記事で解説しています。

blog.g-gen.co.jp

Google Kubernetes Engine(GKE)

Google Kubernetes Engine(以下、GKE)は、コンテナ オーケストレーションツールである Kubernetes の Google マネージドなクラスタを利用することができるサービスです。

GKE の詳細については以下の記事で解説しています。

blog.g-gen.co.jp

StreamingPull API とは

Pub/Sub クライアントライブラリで StreamingPull API を使用すると、アプリケーションと Pub/Sub との間に永続的な双方向接続が維持され、Pub/Sub でメッセージが利用可能になるとすぐに pull されます。

この仕様により、1 つの pull リクエストで 1 つの pull レスポンスが返る通常の 単項 Pull と比較すると、高スループット・低レイテンシでメッセージを処理することができます。

StreamingPull API は以下の言語のクライアントライブラリで使用することができます。

  • C++
  • C#
  • Go(当記事で使用)
  • Java
  • Node.js
  • Python
  • Ruby

Pub/Sub のクライアントライブラリは上記言語のほか PHP でも利用できますが、StreamingPull API については PHP ではサポートされていません。

Pub/Sub の設定

Pub/Sub のトピックとサブスクリプションをそれぞれデフォルトの状態で作成します。
トピック名、サブスクリプション名は後ほど環境変数で使用します。

# Pub/Subトピックの作成
$ gcloud pubsub topics create {トピック名}

# 実行例
$ gcloud pubsub topics create mytopic
# Pub/Subサブスクリプションの作成
$ gcloud pubsub subscriptions create {サブスクリプション名} --topic={トピック名}

# 実行例
$ gcloud pubsub subscriptions create mysubscription --topic=mytopic

パブリッシャーの作成(Cloud Run jobs)

Pub/Sub にメッセージをパブリッシュするジョブを Cloud Run jobs で作成します。

コンテナイメージの作成

使用するコード

当記事では 公式ドキュメント のサンプルコードを参考に、Go 言語で処理を実装していきます。
メッセージのパブリッシュには Pub/Sub のクライアントライブラリである cloud.google.com/go/pubsub を使用します。

package main
  
import (
    "context"
    "fmt"
    "log"
    "os"
    "time"
  
    "cloud.google.com/go/pubsub" // Pub/Subクライアントライブラリ
    "github.com/google/uuid"
)
  
type Attributes struct {
    taskNum      string
    uuid         string
    creationTime string
}
  
// メッセージの内容を生成する関数
func generateAttributes() Attributes {
    // タスク番号を取得(Cloud Run jobsのデフォルトの環境変数)
    taskNum := os.Getenv("CLOUD_RUN_TASK_INDEX")
  
    attr := Attributes{
        taskNum:      taskNum,
        uuid:         fmt.Sprint(uuid.New()),
        creationTime: time.Now().Format("2006-01-02 15:04:05"),
    }
  
    return attr
}
  
// メッセージをパブリッシュする関数
func publishMessage(c context.Context, attr Attributes) error {
    // 環境変数からプロジェクトIDとPubSubトピックID を取得
    projectId := os.Getenv("PROJECT_ID")
    topicId := os.Getenv("TOPIC_ID")
  
    // クライアント作成
    client, err := pubsub.NewClient(c, projectId)
    if err != nil {
        return fmt.Errorf("pubsub.NewClient: %v", err)
    }
    defer client.Close()
  
    // Pub/Sub トピックの参照
    t := client.Topic(topicId)
  
    // トピックにメッセージをパブリッシュ
    result := t.Publish(c, &pubsub.Message{
        Attributes: map[string]string{
            "taskNum":      attr.taskNum,
            "uuid":         attr.uuid,
            "creationTime": attr.creationTime,
        }})
    id, err := result.Get(c)
    if err != nil {
        return fmt.Errorf("result.Get: %v", err)
    }
  
    fmt.Printf("Published a messsage; msg ID: %v\n", id)
  
    return nil
}
  
func main() {
    // 空のコンテキストを作成
    ctx := context.Background()
  
    // メッセージ内容の生成
    attr := generateAttributes()
  
    // メッセージのパブリッシュ
    err := publishMessage(ctx, attr)
    if err != nil {
        log.Fatal(err)
    }
}

コンテナイメージのビルド

ここでは Dockerfile を使用せず、Buildpack を使用してコンテナイメージをビルドします。 Buildpack を使用することで、ソースコードを自動でパッケージ化し、デプロイ可能なコンテナイメージを生成することができます。
コンテナイメージは Artifact Registry のリポジトリにプッシュします(リポジトリがない場合は作成してください)。

# Buildpackを使用してイメージをビルドする
$ gcloud builds submit --pack image={リポジトリのURL}/{コンテナイメージ名}
  
# 実行例(リポジトリにArtifact Registryを使用)
$ gcloud builds submit --pack image=asia-northeast1-docker.pkg.dev/myproject/pubsub-container/pubsub-publisher

ジョブの作成

コンテナイメージを使用して Cloud Run jobs でジョブを作成します。
環境変数として プロジェクトIDトピック名 を設定し、同時に実行する Task の数を 50 に設定しています。

# Cloud Run jobsのジョブを作成する(実行Task数=50)
$ gcloud run jobs create {ジョブ名} \
--image {イメージのURL} \
--region {リージョン} \
--tasks 50 \
--set-env-vars PROJECT_ID={プロジェクトID},TOPIC_ID={トピック名}
  
# 実行例
$ gcloud run jobs create jobs-pubsub-publisher \
--image asia-northeast1-docker.pkg.dev/myproject/pubsub-container/pubsub-publisher \
--region asia-northeast1 \
--tasks 50 \
--set-env-vars PROJECT_ID=myproject,TOPIC_ID=mytopic

当記事では認証にデフォルトのサービスアカウントを使用します。サービスアカウントを個別に設定する場合、トピックに対する「Pub/Sub パブリッシャー(roles/pubsub.publisher)」ロールが紐付いたサービスアカウントを使用してください。

サブスクライバーの作成(GKE)

Pub/Sub からメッセージを Pull するアプリケーションを GKE クラスタにデプロイします。

コンテナイメージの作成

使用するコード

公式ドキュメント のサンプルコードを参考に、こちらも Go 言語で処理を実装していきます。
パブリッシュの処理と同様に cloud.google.com/go/pubsub ライブラリを使用し、メッセージをストリーミングで Pull するようにコードを記述します。
ここでは検証のため、Pull したメッセージはそのまま標準出力に出力します。

package main
  
import (
    "context"
    "fmt"
    "io"
    "log"
    "os"
  
    "cloud.google.com/go/pubsub" // Pub/Subクライアントライブラリ
)
  
// メッセージをStreamingPullする関数
func pullMessages(w io.Writer, c context.Context, projectId, subId string) error {
  
    // クライアント作成
    client, err := pubsub.NewClient(c, projectId)
    if err != nil {
        return fmt.Errorf("pubsub.NewClient: %v", err)
    }
    defer client.Close()
  
    // サブスクリプションの参照
    sub := client.Subscription(subId)
  
    // メッセージをpullし続ける
    err = sub.Receive(c, func(_ context.Context, msg *pubsub.Message) {
        fmt.Fprintf(w, "%v\n", msg.Attributes) // メッセージを標準出力に出力
        msg.Ack()
    })
    if err != nil {
        return fmt.Errorf("sub.Receive: %v", err)
    }
  
    return nil
}
  
func main() {
    // 空のコンテキスト作成
    ctx := context.Background()
  
    // プロジェクトID と PubSubトピックID を取得
    projectId := os.Getenv("PROJECT_ID")
    subId := os.Getenv("SUBSCRIPTION_ID")
  
    err := pullMessages(os.Stdout, ctx, projectId, subId)
    if err != nil {
        log.Fatal(err)
    }
}

コンテナイメージを Artifact Registry にプッシュ

アプリケーションを GKE にデプロイするために、コンテナイメージを作成します。
以下の Dockerfile を使用します。

FROM golang:1.20
  
WORKDIR /usr/src/app
  
COPY go.mod go.sum main.go ./
RUN go mod download && go mod verify
  
RUN go build -v -o /usr/local/bin/app ./...
  
CMD ["app"]

Cloud Build を使用し、ビルドしたコンテナイメージを Artifact Registry のリポジトリにプッシュします。

# コンテナイメージ
$ gcloud builds submit --tag={リポジトリのURL}/{コンテナイメージ名}
  
# 実行例
$ gcloud builds submit --tag=asia-northeast1-docker.pkg.dev/myproject/pubsub-container/pubsub-subscriber

GKE クラスタの作成

当記事では Autopilot モードの GKE クラスタを使用します。

# AutopilotモードのGKEクラスタを作成する
$ gcloud container clusters create-auto {クラスタ名} \
--region {リージョン} \
--network {VPC名}
--subnetwork {サブネット名}

# 実行例
$ gcloud container clusters create-auto mycluster-autopilot \
--region asia-northeast1 \
--network myvpc \
--subnetwork mysubnet

Workload Identity の設定

アプリケーションが Pub/Sub からメッセージを Pull できるように、Workload Identity によって Pod から IAM サービスアカウントを使用できるようにします。

当記事では便宜上、Google Cloud APIs にアクセスするためのサービスアカウントを GSA、Kubernetes の ServiceAccount リソースを KSA と呼びます。

GSA の作成

Pub/Sub に対するアクセス権限を持った GSA を作成します。

# GSA を作成する
$ gcloud iam service-accounts create {GSAの名前} --project {プロジェクトID}

# 実行例
$ gcloud iam service-accounts create my-gsa --project myproject

作成した Pub/Sub サブスクリプションからメッセージを Pull できるように、「Pub/Sub サブスクライバー(roles/pubsub.subscriber)」を付与します。

# GSA に IAM ロールを紐付ける
$ gcloud pubsub subscriptions add-iam-policy-binding {サブスクリプション名} \
--role "roles/pubsub.subscriber" \
--member "serviceAccount:{GSAの名前}@{プロジェクトID}.iam.gserviceaccount.com"

# 実行例
$ gcloud pubsub subscriptions add-iam-policy-binding mysubscription \
--role "roles/pubsub.subscriber" \
--member "serviceAccount:my-gsa@myproject.iam.gserviceaccount.com"

GKE クラスタに ServiceAccount リソースを作成

以下のマニフェストファイルを使用して、KSA を GKE クラスタに作成します。

apiVersion: v1
kind: ServiceAccount
metadata:
  name: my-ksa
  annotations:
    # Workload Identity で紐付ける GSA を指定する
    iam.gke.io/gcp-service-account: my-gsa@myproject.iam.gserviceaccount.com

KSA と GSA の紐付け

GSA に対する Workload Identity User (roles/iam.workloadIdentityUser) ロールを KSA に紐付け、KSA が GSA の権限を借用できるようにします。

# KSA と GSA を紐付ける
$ gcloud iam service-accounts add-iam-policy-binding {GSAの名前}@{プロジェクトID}.iam.gserviceaccount.com \
--role roles/iam.workloadIdentityUser \
--member "serviceAccount:{プロジェクトID}.svc.id.goog[{KSAを作成したNamespace}/{KSAの名前}]"

# 実行例(default名前空間を使用している場合)
$ gcloud iam service-accounts add-iam-policy-binding my-gsa@myproject.iam.gserviceaccount.com \
--role roles/iam.workloadIdentityUser \
--member "serviceAccount:myproject.svc.id.goog[default/my-ksa]"

アプリケーションのデプロイ(GKE)

以下のマニフェストを使用して、GKE クラスタにアプリケーションをデプロイします。
複数の Pod を使用してメッセージを Pull できるように Deployment リソースを作成します。

apiVersion: apps/v1
kind: Deployment
metadata:
  name: pubsub-subscriber
spec:
  replicas: 3
  selector:
    matchLabels:
      app: subscriber
  template:
    metadata:
      labels:
        app: subscriber
    spec:
      containers:
        - name: subsc-container
          image: "asia-northeast1-docker.pkg.dev/myproject/pubsub-container/pubsub-subscriber:latest"  # コンテナイメージのURL
          env: 
          - name: "PROJECT_ID"
            value: "myproject"       # Pub/Subを作成したプロジェクトのID
          - name: "SUBSCRIPTION_ID"
            value: "mysubscription"  # Pub/Subサブスクリプションの名前
      serviceAccountName: my-ksa     # Workload Identityで使用するServiceAccount

Pod のステータスがすべて Running になるまで待機します。

# Podのステータスを確認する
$ kubectl get po
NAME                                 READY   STATUS    RESTARTS   AGE
pubsub-subscriber-555ffcb5df-fxm2z   1/1     Running   0          3m3s
pubsub-subscriber-555ffcb5df-sz4xl   1/1     Running   0          3m3s
pubsub-subscriber-555ffcb5df-xmrr2   1/1     Running   0          3m3s

動作確認

ジョブの実行(Cloud Run jobs)

Cloud Run jobs のジョブを実行し、Pub/Sub にメッセージをパブリッシュします。

# ジョブを実行する
$ gcloud run jobs execute {ジョブ名} --region {リージョン}
  
# 実行例
$ gcloud run jobs execute jobs-pubsub-publisher --region asia-northeast1

Pod のログを確認(GKE)

kubectl logs コマンドで Pod のログを確認すると、Cloud Run jobs のジョブがパブリッシュしたメッセージが記録されています。
複数の Pod を展開しているため、Pod ごとにメッセージが分散処理されています。

# Podのログを確認する
$ kubectl logs {Pod名}

# 実行例
$ kubectl logs pubsub-subscriber-555ffcb5df-fxm2z
map[creationTime:2023-05-03 15:15:49 taskNum:35 uuid:c38275b5-da20-4527-9e28-cec2744f6374]
map[creationTime:2023-05-03 15:15:49 taskNum:2 uuid:f5ab93a7-6a93-4637-b50e-ee491cdfcc8d]
map[creationTime:2023-05-03 15:15:49 taskNum:30 uuid:e82aa1ef-c53b-4812-a3a3-e74437b436ef]
map[creationTime:2023-05-03 15:15:49 taskNum:23 uuid:8bc9d8c0-cc4d-49d8-b565-c3e26570c92c]
map[creationTime:2023-05-03 15:15:50 taskNum:39 uuid:1594b54e-2ed0-4a2e-b248-6613677c4cf3]
map[creationTime:2023-05-03 15:15:49 taskNum:21 uuid:b81177da-d6de-4ee2-acb9-bf1442286cd5]
map[creationTime:2023-05-03 15:15:50 taskNum:31 uuid:c4950a18-5fba-4a8c-aae1-6c628db22c65]
map[creationTime:2023-05-03 15:15:50 taskNum:49 uuid:20b3ca60-8a83-4116-ab54-a26e187bd8d0]
map[creationTime:2023-05-03 15:15:50 taskNum:16 uuid:ede7ab24-d246-485f-8bcb-2a71fcf9e3a5]
map[creationTime:2023-05-03 15:15:49 taskNum:48 uuid:166a1bd9-f0ac-4c93-8129-7110b2429f55]
map[creationTime:2023-05-03 15:15:50 taskNum:42 uuid:2faa0967-46f6-4a86-84f5-c1e821607c83]
map[creationTime:2023-05-03 15:15:52 taskNum:46 uuid:0eea95d7-a673-4e44-b214-ca49dd34be91]
  
$ kubectl logs pubsub-subscriber-555ffcb5df-sz4xl 
map[creationTime:2023-05-03 15:15:48 taskNum:41 uuid:86be2dd1-196f-4d04-8990-fbd1fa6bc0db]
map[creationTime:2023-05-03 15:15:48 taskNum:5 uuid:b0f957d8-e113-451d-ba9e-19dc061b1632]
map[creationTime:2023-05-03 15:15:48 taskNum:0 uuid:e3690426-4c7d-4fe8-b927-8a6343f23a54]
map[creationTime:2023-05-03 15:15:49 taskNum:18 uuid:760c5c63-d41f-4d78-b2fe-9d839d872859]
map[creationTime:2023-05-03 15:15:49 taskNum:12 uuid:dbfdab23-687e-4dce-9ae9-8c775e88d873]
map[creationTime:2023-05-03 15:15:48 taskNum:17 uuid:f13c4842-5503-4e0b-a91b-57879563448b]
map[creationTime:2023-05-03 15:15:49 taskNum:33 uuid:f307f4f9-4735-4abd-8232-b2793f6dd08c]
map[creationTime:2023-05-03 15:15:49 taskNum:1 uuid:e5f5a0d2-7430-4816-8a24-4bc0464c1c9c]
map[creationTime:2023-05-03 15:15:49 taskNum:44 uuid:9d23a4f4-4c3e-40b9-be85-ea191936ecb6]
map[creationTime:2023-05-03 15:15:48 taskNum:14 uuid:5389088c-0452-46a2-ba96-255f79404741]
map[creationTime:2023-05-03 15:15:48 taskNum:32 uuid:171f0b3a-a19f-4574-b75d-87ddf30e9b7f]
map[creationTime:2023-05-03 15:15:48 taskNum:47 uuid:6cc0f34c-d9ac-4ab9-8308-45c21a7c1c52]
map[creationTime:2023-05-03 15:15:49 taskNum:24 uuid:9fef79ac-a637-451f-9c3c-5ddbfd99631d]
map[creationTime:2023-05-03 15:15:50 taskNum:40 uuid:8ac572b6-e02a-4505-b7bf-d491410f28e3]
map[creationTime:2023-05-03 15:15:51 taskNum:19 uuid:59f52dc0-56e5-4780-a6ac-8e0b9d0ffad8]
map[creationTime:2023-05-03 15:15:51 taskNum:28 uuid:6cf456b1-b234-4847-a8f7-7ee8c6e06c97]
map[creationTime:2023-05-03 15:15:51 taskNum:37 uuid:cad50885-e41e-4d97-ae99-24c92507cfcf]
map[creationTime:2023-05-03 15:15:51 taskNum:25 uuid:54fc2bec-c510-4ed4-a247-64a1141dd1e0]
map[creationTime:2023-05-03 15:15:51 taskNum:7 uuid:1ee32849-261a-4e2e-82d8-04426675b88b]
map[creationTime:2023-05-03 15:15:50 taskNum:15 uuid:bae917a3-01e5-4c20-b678-6f21bfe878e2]
map[creationTime:2023-05-03 15:15:50 taskNum:27 uuid:d6a5a28f-4e24-460f-8d09-5f79af09f74c]
map[creationTime:2023-05-03 15:15:51 taskNum:43 uuid:f873af32-edc5-45fc-9752-3f87cc3309e1]
map[creationTime:2023-05-03 15:15:51 taskNum:10 uuid:120f48fb-3377-4896-9cb2-a32dfe00f25b]
  
$ kubectl logs pubsub-subscriber-555ffcb5df-xmrr2 
map[creationTime:2023-05-03 15:15:49 taskNum:26 uuid:39cbb823-592c-40e9-99cb-bf3ce52f2989]
map[creationTime:2023-05-03 15:15:48 taskNum:38 uuid:447a296c-564d-4dc9-a04a-155f67c760e9]
map[creationTime:2023-05-03 15:15:49 taskNum:11 uuid:0989fe2c-2fc0-4668-95d6-b1829fa6d92d]
map[creationTime:2023-05-03 15:15:48 taskNum:29 uuid:db6e7299-47d8-473d-aebb-e18f2c846de6]
map[creationTime:2023-05-03 15:15:49 taskNum:8 uuid:501f74cf-ccb8-4b1e-b388-32f565da889d]
map[creationTime:2023-05-03 15:15:49 taskNum:6 uuid:108e22df-629d-4d88-9ab0-5758f608237f]
map[creationTime:2023-05-03 15:15:49 taskNum:36 uuid:6e85faa5-d2ed-48b6-8866-d0114386303c]
map[creationTime:2023-05-03 15:15:49 taskNum:20 uuid:aa07a78a-e478-4df7-9fef-835e215bc2ee]
map[creationTime:2023-05-03 15:15:49 taskNum:45 uuid:4a43bad3-5a3d-4315-a565-ca7137104aa8]
map[creationTime:2023-05-03 15:15:49 taskNum:9 uuid:f3f4c132-80ff-4850-8bea-94371b9cdfe5]
map[creationTime:2023-05-03 15:15:50 taskNum:34 uuid:d026ad8e-f1cb-49df-bd33-9cf03cb0ee55]
map[creationTime:2023-05-03 15:15:50 taskNum:13 uuid:f6731fca-d48b-41d7-a505-61e44425d817]
map[creationTime:2023-05-03 15:15:51 taskNum:22 uuid:6ade1f2a-229b-4664-bf82-55aec79ba8fa]
map[creationTime:2023-05-03 15:15:49 taskNum:3 uuid:95291f1f-94d3-44bb-a22c-97bfdd59fa25]
map[creationTime:2023-05-03 15:15:51 taskNum:4 uuid:469e54b9-de42-4fb2-9c38-3f5c3b625d7b]

佐々木 駿太 (記事一覧)

G-gen最北端、北海道在住のクラウドソリューション部エンジニア

2022年6月にG-genにジョイン。Google Cloud Partner Top Engineer 2024に選出。好きなGoogle CloudプロダクトはCloud Run。

趣味はコーヒー、小説(SF、ミステリ)、カラオケなど。