Pub/SubのStreamingPull APIで順序指定キーを含むメッセージを処理する

記事タイトルとURLをコピーする

G-gen の佐々木です。当記事では、Google Cloud (旧称 GCP) が提供するメッセージングサービスである Cloud Pub/Sub の StreamingPull API順序指定キー を使用し、メッセージを Pub/Sub トピックに送信された順にリアルタイム処理する仕組みを実装していきます。

前提知識

Cloud Pub/Sub とは

Cloud Pub/Sub(以下、Pub/Sub)とは、メッセージを生成するアプリケーション (パブリッシャー)とそれを処理するアプリケーション(サブスクライバー)を切り離すマネージドな メッセージング サービス です。

Pub/Sub を使用することで、パブリッシャーとサブスクライバーの互換性・拡張性を担保した、粗結合なシステムを構成できます。

StreamingPull API とは

Pub/Sub のクライアントライブラリでは、1 つの pull リクエストで 1 つの pull レスポンスが返る 単項 Pull のほかに、高スループット・低レイテンシのメッセージ処理を目的とした StreamingPull を使用することができます。

StreamingPull API を使用することで、メッセージを処理するアプリケーションと Pub/Sub との間に永続的な双方向接続が維持され、Pub/Sub でメッセージが利用可能になるとすぐにアプリケーションから pull されるようになります。

StreamingPull API の詳細については、以下の記事もご一読ください。

blog.g-gen.co.jp

Pub/Sub におけるメッセージの配信順序について

Pub/Sub のデフォルトの設定では、パブリッシュされたメッセージがサブスクリプションに配信される際、その配信順序は保証されていません。つまり、先にパブリッシュされたメッセージが次のメッセージよりも後に処理される可能性があるということです。

Pub/Subでは 順序指定キー (Ordering Key) を使用することで、メッセージの配信順序を制御することができます。 順序指定キーを使用するには、パブリッシャー側と Pub/Sub サブスクリプション側の 両方で 以下のような設定をします。

対象 設定内容
パブリッシャー側のコード クライアントライブラリで送信するメッセージに順序指定キーを設定する。
Pub/Sub サブスクリプション enable_message_ordering プロパティを true (有効)にする。

順序指定キーの特性

順序付け

同じ順序指定キーをもつメッセージは、パブリッシュされた順番を保持したままサブスクリプションに配信されるようになります(first-in-first-out)。

順序指定キーによるメッセージの順序付け

再配信の一貫性

順序指定が有効になっているメッセージのいずれかがエラーにより再配信される場合、Pub/Sub はメッセージの順序を維持するため、同じ順序指定キーを持つすべてのメッセージを再配信します。
したがって、メッセージ重複に対する追加の処理をサブスクライバー側に実装し、冪等性を担保する必要があります。

再配信の一貫性とメッセージの重複

アフィニティ

サブスクライバーが複数のワーカーで構成され、StreamingPull API を使用する場合、同じ順序指定キーを持つメッセージは ベストエフォート ベースで 同じサブスクライバーに送信されます。

StreamingPull API と順序指定キーを使用した場合のアフィニティ

ただしこのアフィニティの仕様は公式ガイドには記載されておらず、Google Cloud Pub/Sub チームの開発者によって以下のコラムに記載されていたものです。この記載は公式ガイドではないものの、Google Cloud 開発チームの名前と共に発表されていることに加え、公式ガイドからリンクが貼られていることもあり、当記事でも公式に準ずる仕様としてご紹介しました。

検証

ここからは、Pub/Subにおける StreamingPull API 使用時の順序指定キーの動作を検証していきます。

構成

使用するサービス

メッセージのパブリッシャーには Cloud Run jobs を使用し、並列実行されるタスクから複数のメッセージを送信します。
サブスクライバーとしては Google Kubernetes Engine(以下、GKE)を使用し、複数の Pod から StreamingPull API によるメッセージの Pull を行います。

Cloud Run jobs と GKE を使用した Pub/Sub 構成

Cloud Run jobs について

Cloud Run jobs とは、サーバーレス コンテナコンピューティングサービスである Cloud Run の 1 機能です。
HTTP リクエストを処理の起点とする Cloud Run services に対して、Cloud Run jobs はコンテナイメージとして実装したジョブを、手動、スケジュール、ワークフローなど、ユーザの任意タイミングで並列して実行することができます。

当記事では、同一の処理(タスク)を容易に並列実行できる特性を利用し、Pub/Sub にメッセージを送信する複数のパブリッシャーの役割を持たせます。

Cloud Run jobs の詳細については、以下の記事で解説しています。

blog.g-gen.co.jp

Google Kubernetes Engine (GKE) について

GKE はコンテナオーケストレーションツールである Kubernetes を、Google マネージドのクラスタで使用することができるサービスです。
当記事ではコンピュートリソースである Pod を容易に水平スケールできる特性を利用し、StreamingPull を実行するサブスクライバーが複数ある場合のメッセージ送信先のアフィニティを検証します。

GKE の詳細については以下の記事で解説しています。

blog.g-gen.co.jp

Pub/Sub トピック、サブスクリプションの作成

Pub/Sub のトピックとサブスクリプションをそれぞれ作成します。
始めは順序指定キーを設定しない場合の動作を確認するため、デフォルトの設定のまま作成します。

# Pub/Sub トピックの作成
$ gcloud pubsub topics create {トピック名}
  
# 実行例
$ gcloud pubsub topics create mytopic
# Pub/Sub サブスクリプションの作成
$ gcloud pubsub subscriptions create {サブスクリプション名} --topic={トピック名}
  
# 実行例
$ gcloud pubsub subscriptions create mysubscription --topic=mytopic

パブリッシャーの作成(Cloud Run jobs)

使用するコード(Go)

公式ドキュメント のサンプルコードを元に、Pub/Sub に順序指定キーを指定したメッセージを送信する処理を実装します。
メッセージのパブリッシュには Pub/Sub のクライアントライブラリである cloud.google.com/go/pubsub を使用します。

Cloud Run jobs のデフォルトの環境変数 CLOUD_RUN_TASK_INDEX からタスクのインデックス番号を取得し、これを Pub/Sub の順序指定キーとして使用します。
並列実行される Cloud Run jobs タスクごとに task#{タスクのインデックス番号} messageNumber={メッセージ番号} の形式でメッセージがパブリッシュされます。

package main
  
import (
    "context"
    "fmt"
    "log"
    "os"
    "sync"
    "sync/atomic"
  
    "cloud.google.com/go/pubsub" // Pub/Subクライアントライブラリ
    "google.golang.org/api/option"
)
  
type Message struct {
    message     string
    orderingKey string
}
  
// メッセージを生成する関数
func generateMessages(taskNum string) ([]Message, error) {
    var messages []Message
  
    // メッセージを 5 個生成
    for i := 0; i < 5; i++ {
        m := Message{
            message:     fmt.Sprintf("task#%v messageNumber=%v", taskNum, i),
            orderingKey: fmt.Sprintf("task#%v", taskNum),
        }
        messages = append(messages, m)
    }
  
    fmt.Printf("Generate messages: %v\n", messages)
    return messages, nil
}
  
// メッセージをパブリッシュする関数
func publishWithOrderingKey(messages []Message, projectID, topicID string) error {
  
    ctx := context.Background()
  
    // Pub/Sub Client
    client, err := pubsub.NewClient(ctx, projectID,
        option.WithEndpoint("asia-northeast1-pubsub.googleapis.com:443"))
    if err != nil {
        return fmt.Errorf("pubsub.NewClient: %v", err)
    }
    defer client.Close()
  
    var wg sync.WaitGroup
    var totalErrors uint64
  
    t := client.Topic(topicID)     // トピックの指定
    t.EnableMessageOrdering = true // 順序指定キーの有効化
  
    // メッセージのパブリッシュ
    for _, m := range messages {
        res := t.Publish(ctx, &pubsub.Message{
            Data:        []byte(m.message),
            OrderingKey: m.orderingKey,
        })
  
        wg.Add(1)
        go func(res *pubsub.PublishResult) {
            defer wg.Done()
  
            _, err := res.Get(ctx)
            if err != nil {
                fmt.Printf("Failed to publish: %s\n", err)
                atomic.AddUint64(&totalErrors, 1)
                return
            } 
        }(res)
    }
  
    wg.Wait()
  
    if totalErrors > 0 {
        fmt.Printf("%d messages did not publish successfully", totalErrors)
        return nil
    }
  
    fmt.Println("Published messages with ordering keys successfully")
    return nil
}
  
// メイン関数
func main() {
    // タスク番号を取得(Cloud Run jobs のデフォルトの環境変数)
    taskNum := os.Getenv("CLOUD_RUN_TASK_INDEX")
  
    // Cloud Run jobs に設定した環境変数からプロジェクト ID と Pub/Sub トピック ID を取得
    projectID := os.Getenv("PROJECT_ID")
    topicID := os.Getenv("TOPIC_ID")
  
    messages, err := generateMessages(taskNum)
    if err != nil {
        log.Fatal(err)
    }
  
    err = publishWithOrderingKey(messages, projectID, topicID)
    if err != nil {
        log.Fatal(err)
    }
}
    

このコードは 1 回の実行で 5 つのメッセージを生成・パブリッシュするため、並列して実行する Cloud Run jobs のタスク 1 つにつき、以下に示す各行のメッセージが順番にパブリッシュされます。

# タスクのインデックス番号が 0 の場合
task#0 messageNumber=0
task#0 messageNumber=1
task#0 messageNumber=2
task#0 messageNumber=3
task#0 messageNumber=4

参考:順序指定キーを使用したパブリッシュ

コンテナイメージのビルド

Dockerfile を使用せず、Buildpack を使用してコンテナイメージをビルドします。Buildpack を使用することで、ソースコードを自動でパッケージ化し、デプロイ可能なコンテナイメージを生成することができます。
Artifact Registry にコンテナイメージをプッシュするため、リポジトリがない場合は以下のコマンドを実行する前に作成してください。

# Buildpack を使用してイメージをビルドする
$ gcloud builds submit --pack image={リポジトリの URL}/{コンテナイメージ名}
  
# 実行例(リポジトリに Artifact Registry を使用)
$ gcloud builds submit --pack image=asia-northeast1-docker.pkg.dev/myproject/pubsub-container/publisher-orderingkey

ジョブの作成

Artifact Registry にプッシュしたコンテナイメージを使用して、Cloud Run jobs のジョブを作成します。
環境変数として プロジェクト IDPub/Sub トピック名 を設定し、同時に実行する Task の数を 50 に設定しています。

# Cloud Run jobs のジョブを作成する(実行 Task 数 = 50)
$ gcloud run jobs create {ジョブ名} \
--image {イメージの URL} \
--region {リージョン} \
--tasks 50 \
--set-env-vars PROJECT_ID={プロジェクト ID},TOPIC_ID={Pub/Sub トピック名}
  
# 実行例
$ gcloud run jobs create jobs-publisher-orderingkey \
--image asia-northeast1-docker.pkg.dev/myproject/pubsub-container/publisher-orderingkey \
--region asia-northeast1 \
--tasks 50 \
--set-env-vars PROJECT_ID=myproject,TOPIC_ID=mytopic

サブスクライバーの作成(GKE)

使用するコード(Go)

公式ドキュメント を参考に、StreamingPull API を使用して Pub/Sub のメッセージを受け取り、そのままログ出力する処理を実装します。
このアプリケーションは GKE クラスタ上の Pod で動作し、Pub/Sub から Pull したメッセージをそのまま標準出力に出力します。

パブリッシュの処理と同様、cloud.google.com/go/pubsub ライブラリを使用して Pub/Sub API にアクセスします。

package main
  
import (
    "context"
    "fmt"
    "io"
    "log"
    "os"
  
    "cloud.google.com/go/pubsub" // Pub/Sub クライアントライブラリ
)
  
// メッセージを StreamingPull する関数
func pullMessages(w io.Writer, c context.Context, projectId, subId string) error {
  
    // Pub/Sub Client
    client, err := pubsub.NewClient(c, projectId)
    if err != nil {
        return fmt.Errorf("pubsub.NewClient: %v", err)
    }
    defer client.Close()
  
    // サブスクリプションの参照
    sub := client.Subscription(subId)
  
    // メッセージを pull し続ける
    err = sub.Receive(c, func(_ context.Context, msg *pubsub.Message) {
        fmt.Fprintf(w, "%v\n", string(msg.Data)) // メッセージを標準出力に出力
        msg.Ack()
    })
    if err != nil {
        return fmt.Errorf("sub.Receive: %v", err)
    }
  
    return nil
}
  
func main() {
    ctx := context.Background()
  
    // 環境変数からプロジェクト ID と PubSub トピック ID を取得
    projectId := os.Getenv("PROJECT_ID")
    subId := os.Getenv("SUBSCRIPTION_ID")
  
    err := pullMessages(os.Stdout, ctx, projectId, subId)
    if err != nil {
        log.Fatal(err)
    }
}
  

コンテナイメージを Artifact Registry にプッシュ

GKE クラスタ上で実行される Pod にアプリケーションをデプロイするため、こちらもコンテナイメージを作成して Artifact Registry にプッシュします。

# Buildpack を使用してイメージをビルドする
$ gcloud builds submit --pack image={リポジトリの URL}/{コンテナイメージ名}
  
# 実行例(リポジトリに Artifact Registry を使用)
$ gcloud builds submit --pack image=asia-northeast1-docker.pkg.dev/myproject/pubsub-container/subscriber-orderingkey

GKE クラスタの作成

当記事では Autopilot モードの GKE クラスタ上でサブスクライバー用の Pod を実行します。
使用できる VPC、サブネットがない場合は以下のコマンドを実行する前に作成してください。

# Autopilot モードの GKE クラスタを作成する
$ gcloud container clusters create-auto {クラスタ名} \
--region {リージョン} \
--network {VPC名}
--subnetwork {サブネット名}
  
# 実行例
$ gcloud container clusters create-auto mycluster-autopilot \
--region asia-northeast1 \
--network myvpc \
--subnetwork mysubnet

Workload Identity の設定

Autopilot モードの GKE クラスタ上で実行される Pod から Pub/Sub などの Google Cloud APIs を使用するためには、Pod に設定する Kubernetes の ServiceAccount と Pub/Sub の権限を付与した Google Cloud のサービスアカウントWorkload Identity によって紐づける必要があります。

当記事では便宜上、Kubernetes の ServiceAccount を KSA、Google Cloud のサービスアカウントを GSA と呼びます。

Workload Identity は以下の記事で解説しているので、詳細についてはこちらもご一読ください。

blog.g-gen.co.jp

GSA の作成

まず、何も権限を持たない GSA を作成します。

# GSA を作成する
$ gcloud iam service-accounts create {GSA の名前} --project {プロジェクト ID}
  
# 実行例
$ gcloud iam service-accounts create my-gsa --project myproject

次に、作成した Pub/Sub サブスクリプションからメッセージを Pull するために「Pub/Sub サブスクライバーroles/pubsub.subscriber)」 ロールを GSA に付与します。

# GSA に IAM ロールを紐付ける
$ gcloud pubsub subscriptions add-iam-policy-binding {サブスクリプション名} \
--role "roles/pubsub.subscriber" \
--member "serviceAccount:{GSA の名前}@{プロジェクト ID}.iam.gserviceaccount.com"
  
# 実行例
$ gcloud pubsub subscriptions add-iam-policy-binding mysubscription \
--role "roles/pubsub.subscriber" \
--member "serviceAccount:my-gsa@myproject.iam.gserviceaccount.com"

KSA の作成

GKE クラスタに接続し、クラスタに KSA を作成します。
以下の内容でマニフェストファイル(ksa.yaml)を作成し、クラスタに適用します。

# ksa.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
  name: my-ksa
  annotations:
    # Workload Identity で紐付ける GSA を指定する
    iam.gke.io/gcp-service-account: my-gsa@myproject.iam.gserviceaccount.com
# GKE クラスタに接続する
$ gcloud container clusters get-credentials {クラスタ名} --region asia-northeast1 --project {プロジェクト名}
  
# 実行例
$ gcloud container clusters get-credentials mycluster-autopilot --region asia-northeast1 --project myproject
# GKE クラスタに KSA を作成する
$ kubectl apply -f ksa.yaml

KSA と GSA の紐付け

KSA が GSA の権限を借用して Google Cloud APIs にアクセスできるように、 GSA に対する「Workload Identity User (roles/iam.workloadIdentityUser)」ロールを KSA に紐付けます。

# KSA と GSA を紐付ける
$ gcloud iam service-accounts add-iam-policy-binding {GSAの名前}@{プロジェクトID}.iam.gserviceaccount.com \
--role roles/iam.workloadIdentityUser \
--member "serviceAccount:{プロジェクトID}.svc.id.goog[{KSAを作成したNamespace}/{KSAの名前}]"
  
# 実行例(default名前空間を使用している場合)
$ gcloud iam service-accounts add-iam-policy-binding my-gsa@myproject.iam.gserviceaccount.com \
--role roles/iam.workloadIdentityUser \
--member "serviceAccount:myproject.svc.id.goog[default/my-ksa]"

アプリケーションのデプロイ(GKE)

サブスクライバーのアプリケーションを GKE にデプロイするマニフェストファイル(deployment.yaml)は以下のようになります。 当記事では始めにサブスクライバー(Pod)の数が 1 の場合の動作検証をするため、spec.replicas の値を 1 にしています。
マニフェストファイルは、この時点ではまだクラスタに適用しません。

# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: pubsub-subscriber
spec:
  replicas: 1
  selector:
    matchLabels:
      app: subscriber
  template:
    metadata:
      labels:
        app: subscriber
    spec:
      containers:
        - name: subsc-container
          image: "asia-northeast1-docker.pkg.dev/myproject/pubsub-container/subscriber-orderingkey"  # サブスクライバーのコンテナイメージ
          env: 
          - name: "PROJECT_ID"
            value: "myproject"  # Pub/Sub を作成したプロジェクトの ID
          - name: "SUBSCRIPTION_ID"
            value: "mysubscription"  # Pub/Sub サブスクリプションの名前
          resources:
            requests:
              cpu: "500m"
      serviceAccountName: my-ksa  # Workload Identity で使用する ServiceAccount

動作確認

動作確認の基本的な流れ

動作確認は、基本的に以下の流れで行います。

① Cloud Run jobs のタスク実行(メッセージのパブリッシュ)

② GKE クラスタにサブスクライバー用 Pod を作成(メッセージの StreamingPull 処理)

③ Pod のログを確認

④ Pod の削除

①と②は順番が逆のように見えますが、②→①の順に実施してしまうと、(メッセージがそれほど多くない場合)メッセージが Pub/Sub に貯まることなくすぐに Pod によって処理されてしまうため、順序指定を有効化するまでもなく順番通りに処理されてしまい、順序指定の効果が確認できなくなります。したがって、この検証では①→②の順に実施します。

メッセージの順序指定を有効化しない場合の動作

まず、順序指定を 有効化しない 場合の動作を確認します。
ここまでに作成したリソースは以下のような構成になっています。パブリッシャーのコードで順序指定キーを設定していますが、サブスクリプションでメッセージの順序指定を有効化していません。この場合は順序指定キーは機能しません。

順序指定が有効化されていないメッセージを単一の Pod で StreamingPull する

以下のコマンドで Cloud Run のジョブを実行し、メッセージをパブリッシュします。Cloud Run jobs では 50 個のタスクを並列実行し、各タスク 5 つ、合計 250 個のメッセージを Pub/Sub トピックにパブリッシュします。

# ジョブの実行(Cloud Run jobs)
$ gcloud run jobs execute jobs-publisher-orderingkey --region asia-northeast1 --wait

ジョブの完了を確認したら、GKE クラスタに deployment.yaml を適用し、Pod を作成します。

# Pod の作成
$ kubectl apply -f deployment.yaml
# Pod のステータスを確認
$ kubectl get pods
  
# 出力例
$ kubectl get pods
NAME                                 READY   STATUS    RESTARTS   AGE
pubsub-subscriber-545b97bb97-lnkkb   1/1     Running   0          3m37s

Pod のステータスが Running になったら、Pod のログを確認します。
順序指定が有効になっていないため、タスクのインデックス番号(task#の数字)が同じであっても messageNumber の順番が 0→1→2→3→4 になっていない(メッセージのパブリッシュ順に処理されていない)ケースがいくつかあることがわかります。

# Pod のログを確認する(順序指定キーを設定していない場合)
$ kubectl logs {Pod 名}
  
# 出力例(抜粋)
$ kubectl logs pubsub-subscriber-545b97bb97-lnkkb 
task#5 messageNumber=0
task#5 messageNumber=1
task#7 messageNumber=4
  
~~~省略~~~
  
task#43 messageNumber=0
task#43 messageNumber=1
task#43 messageNumber=2
task#43 messageNumber=3
task#43 messageNumber=4
task#7 messageNumber=0
task#7 messageNumber=1
task#7 messageNumber=2
task#7 messageNumber=3
task#13 messageNumber=2
task#13 messageNumber=3
task#13 messageNumber=4
task#35 messageNumber=0
task#35 messageNumber=1
task#35 messageNumber=2
task#35 messageNumber=3
task#35 messageNumber=4
task#11 messageNumber=4
task#11 messageNumber=0
task#11 messageNumber=1
task#11 messageNumber=2
task#11 messageNumber=3
  
~~~省略~~~

次の検証のため、Pod を一旦削除します。

# Pod の削除
$ kubectl delete -f deployment.yaml

メッセージの順序指定が有効化されたサブスクリプションの作成

順序指定が有効化されたサブスクリプションを作成します。
順序指定の設定はサブスクリプション作成後に変更することはできないため、一度サブスクリプションを削除します。

# 順序指定が有効化されていない Pub/Sub サブスクリプションの削除
$ gcloud pubsub subscriptions delete {サブスクリプション名}
  
# 実行例
$ gcloud pubsub subscriptions delete mysubscription

サブスクリプションを削除したら、順序指定を有効化した同名のサブスクリプションを作成します。

# 順序指定が有効化された Pub/Sub サブスクリプションの作成
$ gcloud pubsub subscriptions create {サブスクリプション名} \
--topic={トピック名} \
--enable-message-ordering
  
# 実行例
$ gcloud pubsub subscriptions create mysubscription \
--topic=mytopic \
--enable-message-ordering

Pub/Sub サブスクリプションを作り直したため、再度サブスクリプションに対する「Pub/Sub サブスクライバーroles/pubsub.subscriber)」 ロールを GSA に付与します。

# GSA に IAM ロールを紐付ける
$ gcloud pubsub subscriptions add-iam-policy-binding {サブスクリプション名} \
--role "roles/pubsub.subscriber" \
--member "serviceAccount:{GSA の名前}@{プロジェクト ID}.iam.gserviceaccount.com"
  
# 実行例
$ gcloud pubsub subscriptions add-iam-policy-binding mysubscription \
--role "roles/pubsub.subscriber" \
--member "serviceAccount:my-gsa@myproject.iam.gserviceaccount.com"

サブスクライバーが単一の場合の動作

次は、順序指定を有効化した状態で、メッセージを単一のサブスクライバーで処理します。

順序指定が有効化されたメッセージを単一の Pod から StreamingPull する

先ほどと同様の手順で Cloud Run job のジョブを実行し、ジョブの完了を確認してから GKE クラスタに Pod を作成します。

Cloud Run jobs で 50 個のタスクを並列実行し、各タスク 5 つ、合計 250 個のメッセージをパブリッシュした場合のサブスクライバー側 Pod のログは以下のようになります。
タスクのインデックス番号を順序指定キーとして設定したため、同一タスク(task#で識別)からのメッセージがパブリッシュされた順(messageNumber が 0→1→2→3→4)に処理されていることがわかります。

# Pod のログを確認する(全文)
# 出力例
$ kubectl logs pubsub-subscriber-944bcdb7b-9zrbz
task#27 messageNumber=0
task#8 messageNumber=0
task#7 messageNumber=0
task#46 messageNumber=0
task#16 messageNumber=0
task#29 messageNumber=0
task#3 messageNumber=0
task#43 messageNumber=0
task#14 messageNumber=0
task#21 messageNumber=0
task#21 messageNumber=1
task#21 messageNumber=2
task#21 messageNumber=3
task#21 messageNumber=4
task#45 messageNumber=0
task#45 messageNumber=1
task#45 messageNumber=2
task#45 messageNumber=3
task#45 messageNumber=4
task#41 messageNumber=0
task#39 messageNumber=0
task#49 messageNumber=0
task#17 messageNumber=0
task#17 messageNumber=1
task#17 messageNumber=2
task#32 messageNumber=0
task#0 messageNumber=0
task#0 messageNumber=1
task#40 messageNumber=0
task#35 messageNumber=0
task#15 messageNumber=0
task#11 messageNumber=0
task#2 messageNumber=0
task#2 messageNumber=1
task#2 messageNumber=2
task#2 messageNumber=3
task#2 messageNumber=4
task#36 messageNumber=0
task#36 messageNumber=1
task#9 messageNumber=0
task#9 messageNumber=1
task#9 messageNumber=2
task#9 messageNumber=3
task#9 messageNumber=4
task#23 messageNumber=0
task#23 messageNumber=1
task#23 messageNumber=2
task#10 messageNumber=0
task#10 messageNumber=1
task#10 messageNumber=2
task#10 messageNumber=3
task#10 messageNumber=4
task#37 messageNumber=0
task#22 messageNumber=0
task#22 messageNumber=1
task#22 messageNumber=2
task#24 messageNumber=0
task#24 messageNumber=1
task#24 messageNumber=2
task#24 messageNumber=3
task#24 messageNumber=4
task#4 messageNumber=0
task#4 messageNumber=1
task#30 messageNumber=0
task#30 messageNumber=1
task#30 messageNumber=2
task#38 messageNumber=0
task#48 messageNumber=0
task#44 messageNumber=0
task#44 messageNumber=1
task#44 messageNumber=2
task#42 messageNumber=0
task#42 messageNumber=1
task#42 messageNumber=2
task#42 messageNumber=3
task#42 messageNumber=4
task#33 messageNumber=0
task#26 messageNumber=0
task#26 messageNumber=1
task#26 messageNumber=2
task#26 messageNumber=3
task#26 messageNumber=4
task#20 messageNumber=0
task#20 messageNumber=1
task#20 messageNumber=2
task#31 messageNumber=0
task#31 messageNumber=1
task#31 messageNumber=2
task#47 messageNumber=0
task#25 messageNumber=0
task#25 messageNumber=1
task#25 messageNumber=2
task#25 messageNumber=3
task#25 messageNumber=4
task#19 messageNumber=0
task#19 messageNumber=1
task#19 messageNumber=2
task#19 messageNumber=3
task#19 messageNumber=4
task#13 messageNumber=0
task#13 messageNumber=1
task#13 messageNumber=2
task#13 messageNumber=3
task#13 messageNumber=4
task#12 messageNumber=0
task#1 messageNumber=0
task#6 messageNumber=0
task#6 messageNumber=1
task#6 messageNumber=2
task#6 messageNumber=3
task#18 messageNumber=0
task#6 messageNumber=4
task#5 messageNumber=0
task#5 messageNumber=1
task#5 messageNumber=2
task#5 messageNumber=3
task#34 messageNumber=0
task#34 messageNumber=1
task#34 messageNumber=2
task#34 messageNumber=3
task#34 messageNumber=4
task#28 messageNumber=0
task#3 messageNumber=1
task#3 messageNumber=2
task#3 messageNumber=3
task#3 messageNumber=4
task#7 messageNumber=1
task#7 messageNumber=2
task#7 messageNumber=3
task#7 messageNumber=4
task#8 messageNumber=1
task#8 messageNumber=2
task#8 messageNumber=3
task#8 messageNumber=4
task#27 messageNumber=1
task#27 messageNumber=2
task#27 messageNumber=3
task#27 messageNumber=4
task#29 messageNumber=1
task#29 messageNumber=2
task#29 messageNumber=3
task#29 messageNumber=4
task#14 messageNumber=1
task#14 messageNumber=2
task#14 messageNumber=3
task#14 messageNumber=4
task#46 messageNumber=1
task#46 messageNumber=2
task#46 messageNumber=3
task#46 messageNumber=4
task#16 messageNumber=1
task#16 messageNumber=2
task#16 messageNumber=3
task#16 messageNumber=4
task#41 messageNumber=1
task#41 messageNumber=2
task#41 messageNumber=3
task#41 messageNumber=4
task#43 messageNumber=1
task#43 messageNumber=2
task#43 messageNumber=3
task#43 messageNumber=4
task#47 messageNumber=1
task#47 messageNumber=2
task#47 messageNumber=3
task#47 messageNumber=4
task#31 messageNumber=3
task#31 messageNumber=4
task#12 messageNumber=1
task#30 messageNumber=3
task#30 messageNumber=4
task#28 messageNumber=1
task#28 messageNumber=2
task#28 messageNumber=3
task#28 messageNumber=4
task#44 messageNumber=3
task#44 messageNumber=4
task#5 messageNumber=4
task#38 messageNumber=1
task#38 messageNumber=2
task#38 messageNumber=3
task#38 messageNumber=4
task#4 messageNumber=2
task#4 messageNumber=3
task#4 messageNumber=4
task#1 messageNumber=1
task#1 messageNumber=2
task#1 messageNumber=3
task#1 messageNumber=4
task#12 messageNumber=2
task#12 messageNumber=3
task#12 messageNumber=4
task#48 messageNumber=1
task#48 messageNumber=2
task#48 messageNumber=3
task#48 messageNumber=4
task#20 messageNumber=3
task#33 messageNumber=1
task#20 messageNumber=4
task#18 messageNumber=1
task#18 messageNumber=2
task#18 messageNumber=3
task#18 messageNumber=4
task#33 messageNumber=2
task#33 messageNumber=3
task#33 messageNumber=4
task#36 messageNumber=2
task#36 messageNumber=3
task#36 messageNumber=4
task#23 messageNumber=3
task#23 messageNumber=4
task#22 messageNumber=3
task#22 messageNumber=4
task#37 messageNumber=1
task#37 messageNumber=2
task#37 messageNumber=3
task#37 messageNumber=4
task#35 messageNumber=1
task#35 messageNumber=2
task#35 messageNumber=3
task#35 messageNumber=4
task#17 messageNumber=3
task#17 messageNumber=4
task#39 messageNumber=1
task#39 messageNumber=2
task#39 messageNumber=3
task#39 messageNumber=4
task#15 messageNumber=1
task#15 messageNumber=2
task#15 messageNumber=3
task#15 messageNumber=4
task#0 messageNumber=2
task#0 messageNumber=3
task#0 messageNumber=4
task#11 messageNumber=1
task#11 messageNumber=2
task#11 messageNumber=3
task#11 messageNumber=4
task#49 messageNumber=1
task#49 messageNumber=2
task#49 messageNumber=3
task#49 messageNumber=4
task#40 messageNumber=1
task#40 messageNumber=2
task#40 messageNumber=3
task#40 messageNumber=4
task#32 messageNumber=1
task#32 messageNumber=2
task#32 messageNumber=3
task#32 messageNumber=4

サブスクライバーが複数の場合の動作(アフィニティの検証)

最後に Pod の数を 3 つに増やし、順序指定が有効化されたメッセージを StreamingPull API で Pull した場合に、メッセージがどのように分散するかを確認します。

順序指定が有効化されたメッセージを複数の Pod から StreamingPull する

マニフェストファイルの spec.replicas の値を 3 に変更します。

# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: pubsub-subscriber
spec:
  replicas: 3  # ここを変更する
  selector:
    matchLabels:
      app: subscriber
  template:
    metadata:
      labels:
        app: subscriber
    spec:
      containers:
        - name: subsc-container
          image: "asia-northeast1-docker.pkg.dev/myproject/pubsub-container/subscriber-orderingkey"  # サブスクライバーのコンテナイメージ
          env: 
          - name: "PROJECT_ID"
            value: "myproject"  # Pub/Sub を作成したプロジェクトの ID
          - name: "SUBSCRIPTION_ID"
            value: "mysubscription"  # Pub/Sub サブスクリプションの名前
          resources:
            requests:
              cpu: "500m"
      serviceAccountName: my-ksa  # Workload Identity で使用する ServiceAccount

今まで同様、Cloud Run jobs のジョブを実行してメッセージをパブリッシュした後、マニフェストファイルを適用して 3 つの Pod を作成します。
Pod がすべて正常に実行されているのを確認したら 各 Pod のログを確認します。

始めに、各 Pod に送られたメッセージ数を確認するために、ログの行数を見てみます。
メッセージはタスクごとに 5 つ送信されるため、タスクのインデックス番号を順序を指定キーとして分散処理を行った場合、メッセージの再配信が行われていなければ、アフィニティによって各 Pod が処理するメッセージの数は 5 の倍数になるはずです(※アフィニティがベストエフォートベースである点は注意)。
今回の結果を見たところ、各 Pod で 5 の倍数の数だけメッセージを処理しているようです。

# 出力例
# Pod のログの行数を確認する
$ kubectl logs pubsub-subscriber-944bcdb7b-pch4t | wc -l
75
$ kubectl logs pubsub-subscriber-944bcdb7b-s7xdq | wc -l
95
$ kubectl logs pubsub-subscriber-944bcdb7b-wp6jt | wc -l
80

実際のログを確認してみます。3 つの Pod のログを以下に記載します。
順序指定キーと StreamingPull API を使用した際のアフィニティにより、同一タスクからパブリッシュされたメッセージ(task#が同じもの)は同一の Pod に送信され、パブリッシュされた順(messageNumber が 0→1→2→3→4)に処理されていることがわかります。

# 1 つ目の Pod のログを確認する(全文)
# 出力例
$ kubectl logs pubsub-subscriber-944bcdb7b-pch4t
task#34 messageNumber=0
task#34 messageNumber=1
task#27 messageNumber=0
task#44 messageNumber=0
task#44 messageNumber=1
task#44 messageNumber=2
task#44 messageNumber=3
task#31 messageNumber=0
task#32 messageNumber=0
task#32 messageNumber=1
task#32 messageNumber=2
task#37 messageNumber=0
task#37 messageNumber=1
task#37 messageNumber=2
task#6 messageNumber=0
task#49 messageNumber=0
task#49 messageNumber=1
task#26 messageNumber=0
task#26 messageNumber=1
task#26 messageNumber=2
task#26 messageNumber=3
task#10 messageNumber=0
task#10 messageNumber=1
task#10 messageNumber=2
task#10 messageNumber=3
task#42 messageNumber=0
task#42 messageNumber=1
task#48 messageNumber=0
task#48 messageNumber=1
task#28 messageNumber=0
task#37 messageNumber=3
task#37 messageNumber=4
task#11 messageNumber=0
task#11 messageNumber=1
task#11 messageNumber=2
task#11 messageNumber=3
task#11 messageNumber=4
task#44 messageNumber=4
task#26 messageNumber=4
task#34 messageNumber=2
task#34 messageNumber=3
task#34 messageNumber=4
task#8 messageNumber=0
task#8 messageNumber=1
task#8 messageNumber=2
task#8 messageNumber=3
task#31 messageNumber=1
task#31 messageNumber=2
task#31 messageNumber=3
task#31 messageNumber=4
task#10 messageNumber=4
task#32 messageNumber=3
task#32 messageNumber=4
task#42 messageNumber=2
task#42 messageNumber=3
task#42 messageNumber=4
task#48 messageNumber=2
task#48 messageNumber=3
task#48 messageNumber=4
task#28 messageNumber=1
task#28 messageNumber=2
task#28 messageNumber=3
task#28 messageNumber=4
task#49 messageNumber=2
task#49 messageNumber=3
task#49 messageNumber=4
task#27 messageNumber=1
task#27 messageNumber=2
task#27 messageNumber=3
task#27 messageNumber=4
task#6 messageNumber=1
task#6 messageNumber=2
task#6 messageNumber=3
task#6 messageNumber=4
task#8 messageNumber=4
# 2 つ目の Pod のログを確認する(全文)
# 出力例
$ kubectl logs pubsub-subscriber-944bcdb7b-s7xdq
task#14 messageNumber=0
task#14 messageNumber=1
task#14 messageNumber=2
task#14 messageNumber=3
task#14 messageNumber=4
task#45 messageNumber=0
task#43 messageNumber=0
task#43 messageNumber=1
task#43 messageNumber=2
task#43 messageNumber=3
task#5 messageNumber=0
task#5 messageNumber=1
task#2 messageNumber=0
task#17 messageNumber=0
task#17 messageNumber=1
task#17 messageNumber=2
task#19 messageNumber=0
task#19 messageNumber=1
task#19 messageNumber=2
task#19 messageNumber=3
task#19 messageNumber=4
task#24 messageNumber=0
task#24 messageNumber=1
task#39 messageNumber=0
task#2 messageNumber=1
task#22 messageNumber=0
task#22 messageNumber=1
task#22 messageNumber=2
task#22 messageNumber=3
task#22 messageNumber=4
task#4 messageNumber=0
task#4 messageNumber=1
task#4 messageNumber=2
task#4 messageNumber=3
task#4 messageNumber=4
task#30 messageNumber=0
task#30 messageNumber=1
task#30 messageNumber=2
task#30 messageNumber=3
task#20 messageNumber=0
task#20 messageNumber=1
task#20 messageNumber=2
task#20 messageNumber=3
task#47 messageNumber=0
task#9 messageNumber=0
task#35 messageNumber=0
task#46 messageNumber=0
task#46 messageNumber=1
task#46 messageNumber=2
task#46 messageNumber=3
task#46 messageNumber=4
task#43 messageNumber=4
task#1 messageNumber=0
task#1 messageNumber=1
task#1 messageNumber=2
task#1 messageNumber=3
task#1 messageNumber=4
task#5 messageNumber=2
task#29 messageNumber=0
task#29 messageNumber=1
task#29 messageNumber=2
task#29 messageNumber=3
task#29 messageNumber=4
task#5 messageNumber=3
task#5 messageNumber=4
task#17 messageNumber=3
task#17 messageNumber=4
task#20 messageNumber=4
task#30 messageNumber=4
task#45 messageNumber=1
task#45 messageNumber=2
task#45 messageNumber=3
task#45 messageNumber=4
task#47 messageNumber=1
task#47 messageNumber=2
task#47 messageNumber=3
task#47 messageNumber=4
task#39 messageNumber=1
task#39 messageNumber=2
task#39 messageNumber=3
task#39 messageNumber=4
task#9 messageNumber=1
task#9 messageNumber=2
task#9 messageNumber=3
task#9 messageNumber=4
task#35 messageNumber=1
task#2 messageNumber=2
task#2 messageNumber=3
task#2 messageNumber=4
task#35 messageNumber=2
task#35 messageNumber=3
task#35 messageNumber=4
task#24 messageNumber=2
task#24 messageNumber=3
task#24 messageNumber=4
# 3 つ目の Pod のログを確認する(全文)
# 出力例
$ kubectl logs pubsub-subscriber-944bcdb7b-wp6jt
task#21 messageNumber=0
task#21 messageNumber=1
task#21 messageNumber=2
task#12 messageNumber=0
task#13 messageNumber=0
task#13 messageNumber=1
task#13 messageNumber=2
task#13 messageNumber=3
task#13 messageNumber=4
task#18 messageNumber=0
task#41 messageNumber=0
task#41 messageNumber=1
task#41 messageNumber=2
task#41 messageNumber=3
task#41 messageNumber=4
task#23 messageNumber=0
task#23 messageNumber=1
task#23 messageNumber=2
task#23 messageNumber=3
task#23 messageNumber=4
task#33 messageNumber=0
task#15 messageNumber=0
task#15 messageNumber=1
task#0 messageNumber=0
task#0 messageNumber=1
task#0 messageNumber=2
task#0 messageNumber=3
task#0 messageNumber=4
task#40 messageNumber=0
task#40 messageNumber=1
task#25 messageNumber=0
task#25 messageNumber=1
task#25 messageNumber=2
task#25 messageNumber=3
task#25 messageNumber=4
task#36 messageNumber=0
task#16 messageNumber=0
task#16 messageNumber=1
task#16 messageNumber=2
task#16 messageNumber=3
task#16 messageNumber=4
task#33 messageNumber=1
task#33 messageNumber=2
task#33 messageNumber=3
task#33 messageNumber=4
task#12 messageNumber=1
task#12 messageNumber=2
task#12 messageNumber=3
task#12 messageNumber=4
task#7 messageNumber=0
task#7 messageNumber=1
task#7 messageNumber=2
task#7 messageNumber=3
task#7 messageNumber=4
task#38 messageNumber=0
task#38 messageNumber=1
task#38 messageNumber=2
task#38 messageNumber=3
task#38 messageNumber=4
task#3 messageNumber=0
task#3 messageNumber=1
task#3 messageNumber=2
task#3 messageNumber=3
task#3 messageNumber=4
task#40 messageNumber=2
task#40 messageNumber=3
task#40 messageNumber=4
task#21 messageNumber=3
task#21 messageNumber=4
task#15 messageNumber=2
task#15 messageNumber=3
task#15 messageNumber=4
task#36 messageNumber=1
task#36 messageNumber=2
task#36 messageNumber=3
task#36 messageNumber=4
task#18 messageNumber=1
task#18 messageNumber=2
task#18 messageNumber=3
task#18 messageNumber=4

佐々木 駿太 (記事一覧)

G-gen最北端、北海道在住のクラウドソリューション部エンジニア

2022年6月にG-genにジョイン。Google Cloud Partner Top Engineer 2024に選出。好きなGoogle CloudプロダクトはCloud Run。

趣味はコーヒー、小説(SF、ミステリ)、カラオケなど。