G-gen の佐々木です。当記事では、Google Cloud (旧称 GCP) が提供するメッセージングサービスである Cloud Pub/Sub の StreamingPull API と 順序指定キー を使用し、メッセージを Pub/Sub トピックに送信された順にリアルタイム処理する仕組みを実装していきます。
- 前提知識
- Pub/Sub におけるメッセージの配信順序について
- 順序指定キーの特性
- 検証
前提知識
Cloud Pub/Sub とは
Cloud Pub/Sub(以下、Pub/Sub)とは、メッセージを生成するアプリケーション (パブリッシャー)とそれを処理するアプリケーション(サブスクライバー)を切り離すマネージドな メッセージング サービス です。
Pub/Sub を使用することで、パブリッシャーとサブスクライバーの互換性・拡張性を担保した、粗結合なシステムを構成できます。
- 参考 (公式ドキュメント) : Pub/Sub とは
- 参考 (G-gen Tech Blog) : Google Cloudで理解する疎結合アーキテクチャとメッセージングサービス
StreamingPull API とは
Pub/Sub のクライアントライブラリでは、1 つの pull リクエストで 1 つの pull レスポンスが返る 単項 Pull のほかに、高スループット・低レイテンシのメッセージ処理を目的とした StreamingPull を使用することができます。
StreamingPull API を使用することで、メッセージを処理するアプリケーションと Pub/Sub との間に永続的な双方向接続が維持され、Pub/Sub でメッセージが利用可能になるとすぐにアプリケーションから pull されるようになります。
StreamingPull API の詳細については、以下の記事もご一読ください。
Pub/Sub におけるメッセージの配信順序について
Pub/Sub のデフォルトの設定では、パブリッシュされたメッセージがサブスクリプションに配信される際、その配信順序は保証されていません。つまり、先にパブリッシュされたメッセージが次のメッセージよりも後に処理される可能性があるということです。
Pub/Subでは 順序指定キー (Ordering Key) を使用することで、メッセージの配信順序を制御することができます。 順序指定キーを使用するには、パブリッシャー側と Pub/Sub サブスクリプション側の 両方で 以下のような設定をします。
対象 | 設定内容 |
---|---|
パブリッシャー側のコード | クライアントライブラリで送信するメッセージに順序指定キーを設定する。 |
Pub/Sub サブスクリプション | enable_message_ordering プロパティを true (有効)にする。 |
- 参考 : メッセージの順序指定
順序指定キーの特性
順序付け
同じ順序指定キーをもつメッセージは、パブリッシュされた順番を保持したままサブスクリプションに配信されるようになります(first-in-first-out)。
再配信の一貫性
順序指定が有効になっているメッセージのいずれかがエラーにより再配信される場合、Pub/Sub はメッセージの順序を維持するため、同じ順序指定キーを持つすべてのメッセージを再配信します。
したがって、メッセージ重複に対する追加の処理をサブスクライバー側に実装し、冪等性を担保する必要があります。
アフィニティ
サブスクライバーが複数のワーカーで構成され、StreamingPull API を使用する場合、同じ順序指定キーを持つメッセージは ベストエフォート ベースで 同じサブスクライバーに送信されます。
ただしこのアフィニティの仕様は公式ガイドには記載されておらず、Google Cloud Pub/Sub チームの開発者によって以下のコラムに記載されていたものです。この記載は公式ガイドではないものの、Google Cloud 開発チームの名前と共に発表されていることに加え、公式ガイドからリンクが貼られていることもあり、当記事でも公式に準ずる仕様としてご紹介しました。
検証
ここからは、Pub/Subにおける StreamingPull API 使用時の順序指定キーの動作を検証していきます。
構成
使用するサービス
メッセージのパブリッシャーには Cloud Run jobs を使用し、並列実行されるタスクから複数のメッセージを送信します。
サブスクライバーとしては Google Kubernetes Engine(以下、GKE)を使用し、複数の Pod から StreamingPull API によるメッセージの Pull を行います。
Cloud Run jobs について
Cloud Run jobs とは、サーバーレス コンテナコンピューティングサービスである Cloud Run の 1 機能です。
HTTP リクエストを処理の起点とする Cloud Run services に対して、Cloud Run jobs はコンテナイメージとして実装したジョブを、手動、スケジュール、ワークフローなど、ユーザの任意タイミングで並列して実行することができます。
当記事では、同一の処理(タスク)を容易に並列実行できる特性を利用し、Pub/Sub にメッセージを送信する複数のパブリッシャーの役割を持たせます。
Cloud Run jobs の詳細については、以下の記事で解説しています。
Google Kubernetes Engine (GKE) について
GKE はコンテナオーケストレーションツールである Kubernetes を、Google マネージドのクラスタで使用することができるサービスです。
当記事ではコンピュートリソースである Pod を容易に水平スケールできる特性を利用し、StreamingPull を実行するサブスクライバーが複数ある場合のメッセージ送信先のアフィニティを検証します。
GKE の詳細については以下の記事で解説しています。
Pub/Sub トピック、サブスクリプションの作成
Pub/Sub のトピックとサブスクリプションをそれぞれ作成します。
始めは順序指定キーを設定しない場合の動作を確認するため、デフォルトの設定のまま作成します。
# Pub/Sub トピックの作成 $ gcloud pubsub topics create {トピック名} # 実行例 $ gcloud pubsub topics create mytopic
# Pub/Sub サブスクリプションの作成 $ gcloud pubsub subscriptions create {サブスクリプション名} --topic={トピック名} # 実行例 $ gcloud pubsub subscriptions create mysubscription --topic=mytopic
パブリッシャーの作成(Cloud Run jobs)
使用するコード(Go)
公式ドキュメント のサンプルコードを元に、Pub/Sub に順序指定キーを指定したメッセージを送信する処理を実装します。
メッセージのパブリッシュには Pub/Sub のクライアントライブラリである cloud.google.com/go/pubsub
を使用します。
Cloud Run jobs のデフォルトの環境変数 CLOUD_RUN_TASK_INDEX
からタスクのインデックス番号を取得し、これを Pub/Sub の順序指定キーとして使用します。
並列実行される Cloud Run jobs タスクごとに task#{タスクのインデックス番号} messageNumber={メッセージ番号}
の形式でメッセージがパブリッシュされます。
package main import ( "context" "fmt" "log" "os" "sync" "sync/atomic" "cloud.google.com/go/pubsub" // Pub/Subクライアントライブラリ "google.golang.org/api/option" ) type Message struct { message string orderingKey string } // メッセージを生成する関数 func generateMessages(taskNum string) ([]Message, error) { var messages []Message // メッセージを 5 個生成 for i := 0; i < 5; i++ { m := Message{ message: fmt.Sprintf("task#%v messageNumber=%v", taskNum, i), orderingKey: fmt.Sprintf("task#%v", taskNum), } messages = append(messages, m) } fmt.Printf("Generate messages: %v\n", messages) return messages, nil } // メッセージをパブリッシュする関数 func publishWithOrderingKey(messages []Message, projectID, topicID string) error { ctx := context.Background() // Pub/Sub Client client, err := pubsub.NewClient(ctx, projectID, option.WithEndpoint("asia-northeast1-pubsub.googleapis.com:443")) if err != nil { return fmt.Errorf("pubsub.NewClient: %v", err) } defer client.Close() var wg sync.WaitGroup var totalErrors uint64 t := client.Topic(topicID) // トピックの指定 t.EnableMessageOrdering = true // 順序指定キーの有効化 // メッセージのパブリッシュ for _, m := range messages { res := t.Publish(ctx, &pubsub.Message{ Data: []byte(m.message), OrderingKey: m.orderingKey, }) wg.Add(1) go func(res *pubsub.PublishResult) { defer wg.Done() _, err := res.Get(ctx) if err != nil { fmt.Printf("Failed to publish: %s\n", err) atomic.AddUint64(&totalErrors, 1) return } }(res) } wg.Wait() if totalErrors > 0 { fmt.Printf("%d messages did not publish successfully", totalErrors) return nil } fmt.Println("Published messages with ordering keys successfully") return nil } // メイン関数 func main() { // タスク番号を取得(Cloud Run jobs のデフォルトの環境変数) taskNum := os.Getenv("CLOUD_RUN_TASK_INDEX") // Cloud Run jobs に設定した環境変数からプロジェクト ID と Pub/Sub トピック ID を取得 projectID := os.Getenv("PROJECT_ID") topicID := os.Getenv("TOPIC_ID") messages, err := generateMessages(taskNum) if err != nil { log.Fatal(err) } err = publishWithOrderingKey(messages, projectID, topicID) if err != nil { log.Fatal(err) } }
このコードは 1 回の実行で 5 つのメッセージを生成・パブリッシュするため、並列して実行する Cloud Run jobs のタスク 1 つにつき、以下に示す各行のメッセージが順番にパブリッシュされます。
# タスクのインデックス番号が 0 の場合 task#0 messageNumber=0 task#0 messageNumber=1 task#0 messageNumber=2 task#0 messageNumber=3 task#0 messageNumber=4
コンテナイメージのビルド
Dockerfile を使用せず、Buildpack を使用してコンテナイメージをビルドします。Buildpack を使用することで、ソースコードを自動でパッケージ化し、デプロイ可能なコンテナイメージを生成することができます。
Artifact Registry にコンテナイメージをプッシュするため、リポジトリがない場合は以下のコマンドを実行する前に作成してください。
# Buildpack を使用してイメージをビルドする $ gcloud builds submit --pack image={リポジトリの URL}/{コンテナイメージ名} # 実行例(リポジトリに Artifact Registry を使用) $ gcloud builds submit --pack image=asia-northeast1-docker.pkg.dev/myproject/pubsub-container/publisher-orderingkey
ジョブの作成
Artifact Registry にプッシュしたコンテナイメージを使用して、Cloud Run jobs のジョブを作成します。
環境変数として プロジェクト ID
と Pub/Sub トピック名
を設定し、同時に実行する Task の数を 50
に設定しています。
# Cloud Run jobs のジョブを作成する(実行 Task 数 = 50) $ gcloud run jobs create {ジョブ名} \ --image {イメージの URL} \ --region {リージョン} \ --tasks 50 \ --set-env-vars PROJECT_ID={プロジェクト ID},TOPIC_ID={Pub/Sub トピック名} # 実行例 $ gcloud run jobs create jobs-publisher-orderingkey \ --image asia-northeast1-docker.pkg.dev/myproject/pubsub-container/publisher-orderingkey \ --region asia-northeast1 \ --tasks 50 \ --set-env-vars PROJECT_ID=myproject,TOPIC_ID=mytopic
サブスクライバーの作成(GKE)
使用するコード(Go)
公式ドキュメント を参考に、StreamingPull API を使用して Pub/Sub のメッセージを受け取り、そのままログ出力する処理を実装します。
このアプリケーションは GKE クラスタ上の Pod で動作し、Pub/Sub から Pull したメッセージをそのまま標準出力に出力します。
パブリッシュの処理と同様、cloud.google.com/go/pubsub
ライブラリを使用して Pub/Sub API にアクセスします。
package main import ( "context" "fmt" "io" "log" "os" "cloud.google.com/go/pubsub" // Pub/Sub クライアントライブラリ ) // メッセージを StreamingPull する関数 func pullMessages(w io.Writer, c context.Context, projectId, subId string) error { // Pub/Sub Client client, err := pubsub.NewClient(c, projectId) if err != nil { return fmt.Errorf("pubsub.NewClient: %v", err) } defer client.Close() // サブスクリプションの参照 sub := client.Subscription(subId) // メッセージを pull し続ける err = sub.Receive(c, func(_ context.Context, msg *pubsub.Message) { fmt.Fprintf(w, "%v\n", string(msg.Data)) // メッセージを標準出力に出力 msg.Ack() }) if err != nil { return fmt.Errorf("sub.Receive: %v", err) } return nil } func main() { ctx := context.Background() // 環境変数からプロジェクト ID と PubSub トピック ID を取得 projectId := os.Getenv("PROJECT_ID") subId := os.Getenv("SUBSCRIPTION_ID") err := pullMessages(os.Stdout, ctx, projectId, subId) if err != nil { log.Fatal(err) } }
コンテナイメージを Artifact Registry にプッシュ
GKE クラスタ上で実行される Pod にアプリケーションをデプロイするため、こちらもコンテナイメージを作成して Artifact Registry にプッシュします。
# Buildpack を使用してイメージをビルドする $ gcloud builds submit --pack image={リポジトリの URL}/{コンテナイメージ名} # 実行例(リポジトリに Artifact Registry を使用) $ gcloud builds submit --pack image=asia-northeast1-docker.pkg.dev/myproject/pubsub-container/subscriber-orderingkey
GKE クラスタの作成
当記事では Autopilot モードの GKE クラスタ上でサブスクライバー用の Pod を実行します。
使用できる VPC、サブネットがない場合は以下のコマンドを実行する前に作成してください。
# Autopilot モードの GKE クラスタを作成する $ gcloud container clusters create-auto {クラスタ名} \ --region {リージョン} \ --network {VPC名} --subnetwork {サブネット名} # 実行例 $ gcloud container clusters create-auto mycluster-autopilot \ --region asia-northeast1 \ --network myvpc \ --subnetwork mysubnet
Workload Identity の設定
Autopilot モードの GKE クラスタ上で実行される Pod から Pub/Sub などの Google Cloud APIs を使用するためには、Pod に設定する Kubernetes の ServiceAccount と Pub/Sub の権限を付与した Google Cloud のサービスアカウント を Workload Identity によって紐づける必要があります。
当記事では便宜上、Kubernetes の ServiceAccount を KSA
、Google Cloud のサービスアカウントを GSA
と呼びます。
Workload Identity は以下の記事で解説しているので、詳細についてはこちらもご一読ください。
GSA の作成
まず、何も権限を持たない GSA を作成します。
# GSA を作成する $ gcloud iam service-accounts create {GSA の名前} --project {プロジェクト ID} # 実行例 $ gcloud iam service-accounts create my-gsa --project myproject
次に、作成した Pub/Sub サブスクリプションからメッセージを Pull するために「Pub/Sub サブスクライバー(roles/pubsub.subscriber
)」 ロールを GSA に付与します。
# GSA に IAM ロールを紐付ける $ gcloud pubsub subscriptions add-iam-policy-binding {サブスクリプション名} \ --role "roles/pubsub.subscriber" \ --member "serviceAccount:{GSA の名前}@{プロジェクト ID}.iam.gserviceaccount.com" # 実行例 $ gcloud pubsub subscriptions add-iam-policy-binding mysubscription \ --role "roles/pubsub.subscriber" \ --member "serviceAccount:my-gsa@myproject.iam.gserviceaccount.com"
KSA の作成
GKE クラスタに接続し、クラスタに KSA を作成します。
以下の内容でマニフェストファイル(ksa.yaml
)を作成し、クラスタに適用します。
# ksa.yaml apiVersion: v1 kind: ServiceAccount metadata: name: my-ksa annotations: # Workload Identity で紐付ける GSA を指定する iam.gke.io/gcp-service-account: my-gsa@myproject.iam.gserviceaccount.com
# GKE クラスタに接続する $ gcloud container clusters get-credentials {クラスタ名} --region asia-northeast1 --project {プロジェクト名} # 実行例 $ gcloud container clusters get-credentials mycluster-autopilot --region asia-northeast1 --project myproject
# GKE クラスタに KSA を作成する $ kubectl apply -f ksa.yaml
KSA と GSA の紐付け
KSA が GSA の権限を借用して Google Cloud APIs にアクセスできるように、 GSA に対する「Workload Identity User (roles/iam.workloadIdentityUser
)」ロールを KSA に紐付けます。
# KSA と GSA を紐付ける $ gcloud iam service-accounts add-iam-policy-binding {GSAの名前}@{プロジェクトID}.iam.gserviceaccount.com \ --role roles/iam.workloadIdentityUser \ --member "serviceAccount:{プロジェクトID}.svc.id.goog[{KSAを作成したNamespace}/{KSAの名前}]" # 実行例(default名前空間を使用している場合) $ gcloud iam service-accounts add-iam-policy-binding my-gsa@myproject.iam.gserviceaccount.com \ --role roles/iam.workloadIdentityUser \ --member "serviceAccount:myproject.svc.id.goog[default/my-ksa]"
アプリケーションのデプロイ(GKE)
サブスクライバーのアプリケーションを GKE にデプロイするマニフェストファイル(deployment.yaml
)は以下のようになります。
当記事では始めにサブスクライバー(Pod)の数が 1 の場合の動作検証をするため、spec.replicas
の値を 1
にしています。
マニフェストファイルは、この時点ではまだクラスタに適用しません。
# deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: name: pubsub-subscriber spec: replicas: 1 selector: matchLabels: app: subscriber template: metadata: labels: app: subscriber spec: containers: - name: subsc-container image: "asia-northeast1-docker.pkg.dev/myproject/pubsub-container/subscriber-orderingkey" # サブスクライバーのコンテナイメージ env: - name: "PROJECT_ID" value: "myproject" # Pub/Sub を作成したプロジェクトの ID - name: "SUBSCRIPTION_ID" value: "mysubscription" # Pub/Sub サブスクリプションの名前 resources: requests: cpu: "500m" serviceAccountName: my-ksa # Workload Identity で使用する ServiceAccount
動作確認
動作確認の基本的な流れ
動作確認は、基本的に以下の流れで行います。
① Cloud Run jobs のタスク実行(メッセージのパブリッシュ)
↓
② GKE クラスタにサブスクライバー用 Pod を作成(メッセージの StreamingPull 処理)
↓
③ Pod のログを確認
↓
④ Pod の削除
①と②は順番が逆のように見えますが、②→①の順に実施してしまうと、(メッセージがそれほど多くない場合)メッセージが Pub/Sub に貯まることなくすぐに Pod によって処理されてしまうため、順序指定を有効化するまでもなく順番通りに処理されてしまい、順序指定の効果が確認できなくなります。したがって、この検証では①→②の順に実施します。
メッセージの順序指定を有効化しない場合の動作
まず、順序指定を 有効化しない 場合の動作を確認します。
ここまでに作成したリソースは以下のような構成になっています。パブリッシャーのコードで順序指定キーを設定していますが、サブスクリプションでメッセージの順序指定を有効化していません。この場合は順序指定キーは機能しません。
以下のコマンドで Cloud Run のジョブを実行し、メッセージをパブリッシュします。Cloud Run jobs では 50 個のタスクを並列実行し、各タスク 5 つ、合計 250 個のメッセージを Pub/Sub トピックにパブリッシュします。
# ジョブの実行(Cloud Run jobs) $ gcloud run jobs execute jobs-publisher-orderingkey --region asia-northeast1 --wait
ジョブの完了を確認したら、GKE クラスタに deployment.yaml
を適用し、Pod を作成します。
# Pod の作成 $ kubectl apply -f deployment.yaml
# Pod のステータスを確認 $ kubectl get pods # 出力例 $ kubectl get pods NAME READY STATUS RESTARTS AGE pubsub-subscriber-545b97bb97-lnkkb 1/1 Running 0 3m37s
Pod のステータスが Running
になったら、Pod のログを確認します。
順序指定が有効になっていないため、タスクのインデックス番号(task#の数字)が同じであっても messageNumber
の順番が 0→1→2→3→4
になっていない(メッセージのパブリッシュ順に処理されていない)ケースがいくつかあることがわかります。
# Pod のログを確認する(順序指定キーを設定していない場合) $ kubectl logs {Pod 名} # 出力例(抜粋) $ kubectl logs pubsub-subscriber-545b97bb97-lnkkb task#5 messageNumber=0 task#5 messageNumber=1 task#7 messageNumber=4 ~~~省略~~~ task#43 messageNumber=0 task#43 messageNumber=1 task#43 messageNumber=2 task#43 messageNumber=3 task#43 messageNumber=4 task#7 messageNumber=0 task#7 messageNumber=1 task#7 messageNumber=2 task#7 messageNumber=3 task#13 messageNumber=2 task#13 messageNumber=3 task#13 messageNumber=4 task#35 messageNumber=0 task#35 messageNumber=1 task#35 messageNumber=2 task#35 messageNumber=3 task#35 messageNumber=4 task#11 messageNumber=4 task#11 messageNumber=0 task#11 messageNumber=1 task#11 messageNumber=2 task#11 messageNumber=3 ~~~省略~~~
次の検証のため、Pod を一旦削除します。
# Pod の削除 $ kubectl delete -f deployment.yaml
メッセージの順序指定が有効化されたサブスクリプションの作成
順序指定が有効化されたサブスクリプションを作成します。
順序指定の設定はサブスクリプション作成後に変更することはできないため、一度サブスクリプションを削除します。
# 順序指定が有効化されていない Pub/Sub サブスクリプションの削除 $ gcloud pubsub subscriptions delete {サブスクリプション名} # 実行例 $ gcloud pubsub subscriptions delete mysubscription
サブスクリプションを削除したら、順序指定を有効化した同名のサブスクリプションを作成します。
# 順序指定が有効化された Pub/Sub サブスクリプションの作成 $ gcloud pubsub subscriptions create {サブスクリプション名} \ --topic={トピック名} \ --enable-message-ordering # 実行例 $ gcloud pubsub subscriptions create mysubscription \ --topic=mytopic \ --enable-message-ordering
Pub/Sub サブスクリプションを作り直したため、再度サブスクリプションに対する「Pub/Sub サブスクライバー(roles/pubsub.subscriber
)」 ロールを GSA に付与します。
# GSA に IAM ロールを紐付ける $ gcloud pubsub subscriptions add-iam-policy-binding {サブスクリプション名} \ --role "roles/pubsub.subscriber" \ --member "serviceAccount:{GSA の名前}@{プロジェクト ID}.iam.gserviceaccount.com" # 実行例 $ gcloud pubsub subscriptions add-iam-policy-binding mysubscription \ --role "roles/pubsub.subscriber" \ --member "serviceAccount:my-gsa@myproject.iam.gserviceaccount.com"
サブスクライバーが単一の場合の動作
次は、順序指定を有効化した状態で、メッセージを単一のサブスクライバーで処理します。
先ほどと同様の手順で Cloud Run job のジョブを実行し、ジョブの完了を確認してから GKE クラスタに Pod を作成します。
Cloud Run jobs で 50 個のタスクを並列実行し、各タスク 5 つ、合計 250 個のメッセージをパブリッシュした場合のサブスクライバー側 Pod のログは以下のようになります。
タスクのインデックス番号を順序指定キーとして設定したため、同一タスク(task#で識別)からのメッセージがパブリッシュされた順(messageNumber が 0→1→2→3→4
)に処理されていることがわかります。
# Pod のログを確認する(全文) # 出力例 $ kubectl logs pubsub-subscriber-944bcdb7b-9zrbz task#27 messageNumber=0 task#8 messageNumber=0 task#7 messageNumber=0 task#46 messageNumber=0 task#16 messageNumber=0 task#29 messageNumber=0 task#3 messageNumber=0 task#43 messageNumber=0 task#14 messageNumber=0 task#21 messageNumber=0 task#21 messageNumber=1 task#21 messageNumber=2 task#21 messageNumber=3 task#21 messageNumber=4 task#45 messageNumber=0 task#45 messageNumber=1 task#45 messageNumber=2 task#45 messageNumber=3 task#45 messageNumber=4 task#41 messageNumber=0 task#39 messageNumber=0 task#49 messageNumber=0 task#17 messageNumber=0 task#17 messageNumber=1 task#17 messageNumber=2 task#32 messageNumber=0 task#0 messageNumber=0 task#0 messageNumber=1 task#40 messageNumber=0 task#35 messageNumber=0 task#15 messageNumber=0 task#11 messageNumber=0 task#2 messageNumber=0 task#2 messageNumber=1 task#2 messageNumber=2 task#2 messageNumber=3 task#2 messageNumber=4 task#36 messageNumber=0 task#36 messageNumber=1 task#9 messageNumber=0 task#9 messageNumber=1 task#9 messageNumber=2 task#9 messageNumber=3 task#9 messageNumber=4 task#23 messageNumber=0 task#23 messageNumber=1 task#23 messageNumber=2 task#10 messageNumber=0 task#10 messageNumber=1 task#10 messageNumber=2 task#10 messageNumber=3 task#10 messageNumber=4 task#37 messageNumber=0 task#22 messageNumber=0 task#22 messageNumber=1 task#22 messageNumber=2 task#24 messageNumber=0 task#24 messageNumber=1 task#24 messageNumber=2 task#24 messageNumber=3 task#24 messageNumber=4 task#4 messageNumber=0 task#4 messageNumber=1 task#30 messageNumber=0 task#30 messageNumber=1 task#30 messageNumber=2 task#38 messageNumber=0 task#48 messageNumber=0 task#44 messageNumber=0 task#44 messageNumber=1 task#44 messageNumber=2 task#42 messageNumber=0 task#42 messageNumber=1 task#42 messageNumber=2 task#42 messageNumber=3 task#42 messageNumber=4 task#33 messageNumber=0 task#26 messageNumber=0 task#26 messageNumber=1 task#26 messageNumber=2 task#26 messageNumber=3 task#26 messageNumber=4 task#20 messageNumber=0 task#20 messageNumber=1 task#20 messageNumber=2 task#31 messageNumber=0 task#31 messageNumber=1 task#31 messageNumber=2 task#47 messageNumber=0 task#25 messageNumber=0 task#25 messageNumber=1 task#25 messageNumber=2 task#25 messageNumber=3 task#25 messageNumber=4 task#19 messageNumber=0 task#19 messageNumber=1 task#19 messageNumber=2 task#19 messageNumber=3 task#19 messageNumber=4 task#13 messageNumber=0 task#13 messageNumber=1 task#13 messageNumber=2 task#13 messageNumber=3 task#13 messageNumber=4 task#12 messageNumber=0 task#1 messageNumber=0 task#6 messageNumber=0 task#6 messageNumber=1 task#6 messageNumber=2 task#6 messageNumber=3 task#18 messageNumber=0 task#6 messageNumber=4 task#5 messageNumber=0 task#5 messageNumber=1 task#5 messageNumber=2 task#5 messageNumber=3 task#34 messageNumber=0 task#34 messageNumber=1 task#34 messageNumber=2 task#34 messageNumber=3 task#34 messageNumber=4 task#28 messageNumber=0 task#3 messageNumber=1 task#3 messageNumber=2 task#3 messageNumber=3 task#3 messageNumber=4 task#7 messageNumber=1 task#7 messageNumber=2 task#7 messageNumber=3 task#7 messageNumber=4 task#8 messageNumber=1 task#8 messageNumber=2 task#8 messageNumber=3 task#8 messageNumber=4 task#27 messageNumber=1 task#27 messageNumber=2 task#27 messageNumber=3 task#27 messageNumber=4 task#29 messageNumber=1 task#29 messageNumber=2 task#29 messageNumber=3 task#29 messageNumber=4 task#14 messageNumber=1 task#14 messageNumber=2 task#14 messageNumber=3 task#14 messageNumber=4 task#46 messageNumber=1 task#46 messageNumber=2 task#46 messageNumber=3 task#46 messageNumber=4 task#16 messageNumber=1 task#16 messageNumber=2 task#16 messageNumber=3 task#16 messageNumber=4 task#41 messageNumber=1 task#41 messageNumber=2 task#41 messageNumber=3 task#41 messageNumber=4 task#43 messageNumber=1 task#43 messageNumber=2 task#43 messageNumber=3 task#43 messageNumber=4 task#47 messageNumber=1 task#47 messageNumber=2 task#47 messageNumber=3 task#47 messageNumber=4 task#31 messageNumber=3 task#31 messageNumber=4 task#12 messageNumber=1 task#30 messageNumber=3 task#30 messageNumber=4 task#28 messageNumber=1 task#28 messageNumber=2 task#28 messageNumber=3 task#28 messageNumber=4 task#44 messageNumber=3 task#44 messageNumber=4 task#5 messageNumber=4 task#38 messageNumber=1 task#38 messageNumber=2 task#38 messageNumber=3 task#38 messageNumber=4 task#4 messageNumber=2 task#4 messageNumber=3 task#4 messageNumber=4 task#1 messageNumber=1 task#1 messageNumber=2 task#1 messageNumber=3 task#1 messageNumber=4 task#12 messageNumber=2 task#12 messageNumber=3 task#12 messageNumber=4 task#48 messageNumber=1 task#48 messageNumber=2 task#48 messageNumber=3 task#48 messageNumber=4 task#20 messageNumber=3 task#33 messageNumber=1 task#20 messageNumber=4 task#18 messageNumber=1 task#18 messageNumber=2 task#18 messageNumber=3 task#18 messageNumber=4 task#33 messageNumber=2 task#33 messageNumber=3 task#33 messageNumber=4 task#36 messageNumber=2 task#36 messageNumber=3 task#36 messageNumber=4 task#23 messageNumber=3 task#23 messageNumber=4 task#22 messageNumber=3 task#22 messageNumber=4 task#37 messageNumber=1 task#37 messageNumber=2 task#37 messageNumber=3 task#37 messageNumber=4 task#35 messageNumber=1 task#35 messageNumber=2 task#35 messageNumber=3 task#35 messageNumber=4 task#17 messageNumber=3 task#17 messageNumber=4 task#39 messageNumber=1 task#39 messageNumber=2 task#39 messageNumber=3 task#39 messageNumber=4 task#15 messageNumber=1 task#15 messageNumber=2 task#15 messageNumber=3 task#15 messageNumber=4 task#0 messageNumber=2 task#0 messageNumber=3 task#0 messageNumber=4 task#11 messageNumber=1 task#11 messageNumber=2 task#11 messageNumber=3 task#11 messageNumber=4 task#49 messageNumber=1 task#49 messageNumber=2 task#49 messageNumber=3 task#49 messageNumber=4 task#40 messageNumber=1 task#40 messageNumber=2 task#40 messageNumber=3 task#40 messageNumber=4 task#32 messageNumber=1 task#32 messageNumber=2 task#32 messageNumber=3 task#32 messageNumber=4
サブスクライバーが複数の場合の動作(アフィニティの検証)
最後に Pod の数を 3 つに増やし、順序指定が有効化されたメッセージを StreamingPull API で Pull した場合に、メッセージがどのように分散するかを確認します。
マニフェストファイルの spec.replicas
の値を 3
に変更します。
# deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: name: pubsub-subscriber spec: replicas: 3 # ここを変更する selector: matchLabels: app: subscriber template: metadata: labels: app: subscriber spec: containers: - name: subsc-container image: "asia-northeast1-docker.pkg.dev/myproject/pubsub-container/subscriber-orderingkey" # サブスクライバーのコンテナイメージ env: - name: "PROJECT_ID" value: "myproject" # Pub/Sub を作成したプロジェクトの ID - name: "SUBSCRIPTION_ID" value: "mysubscription" # Pub/Sub サブスクリプションの名前 resources: requests: cpu: "500m" serviceAccountName: my-ksa # Workload Identity で使用する ServiceAccount
今まで同様、Cloud Run jobs のジョブを実行してメッセージをパブリッシュした後、マニフェストファイルを適用して 3 つの Pod を作成します。
Pod がすべて正常に実行されているのを確認したら 各 Pod のログを確認します。
始めに、各 Pod に送られたメッセージ数を確認するために、ログの行数を見てみます。
メッセージはタスクごとに 5 つ送信されるため、タスクのインデックス番号を順序を指定キーとして分散処理を行った場合、メッセージの再配信が行われていなければ、アフィニティによって各 Pod が処理するメッセージの数は 5 の倍数になるはずです(※アフィニティがベストエフォートベースである点は注意)。
今回の結果を見たところ、各 Pod で 5 の倍数の数だけメッセージを処理しているようです。
# 出力例 # Pod のログの行数を確認する $ kubectl logs pubsub-subscriber-944bcdb7b-pch4t | wc -l 75 $ kubectl logs pubsub-subscriber-944bcdb7b-s7xdq | wc -l 95 $ kubectl logs pubsub-subscriber-944bcdb7b-wp6jt | wc -l 80
実際のログを確認してみます。3 つの Pod のログを以下に記載します。
順序指定キーと StreamingPull API を使用した際のアフィニティにより、同一タスクからパブリッシュされたメッセージ(task#が同じもの)は同一の Pod に送信され、パブリッシュされた順(messageNumber が 0→1→2→3→4
)に処理されていることがわかります。
# 1 つ目の Pod のログを確認する(全文) # 出力例 $ kubectl logs pubsub-subscriber-944bcdb7b-pch4t task#34 messageNumber=0 task#34 messageNumber=1 task#27 messageNumber=0 task#44 messageNumber=0 task#44 messageNumber=1 task#44 messageNumber=2 task#44 messageNumber=3 task#31 messageNumber=0 task#32 messageNumber=0 task#32 messageNumber=1 task#32 messageNumber=2 task#37 messageNumber=0 task#37 messageNumber=1 task#37 messageNumber=2 task#6 messageNumber=0 task#49 messageNumber=0 task#49 messageNumber=1 task#26 messageNumber=0 task#26 messageNumber=1 task#26 messageNumber=2 task#26 messageNumber=3 task#10 messageNumber=0 task#10 messageNumber=1 task#10 messageNumber=2 task#10 messageNumber=3 task#42 messageNumber=0 task#42 messageNumber=1 task#48 messageNumber=0 task#48 messageNumber=1 task#28 messageNumber=0 task#37 messageNumber=3 task#37 messageNumber=4 task#11 messageNumber=0 task#11 messageNumber=1 task#11 messageNumber=2 task#11 messageNumber=3 task#11 messageNumber=4 task#44 messageNumber=4 task#26 messageNumber=4 task#34 messageNumber=2 task#34 messageNumber=3 task#34 messageNumber=4 task#8 messageNumber=0 task#8 messageNumber=1 task#8 messageNumber=2 task#8 messageNumber=3 task#31 messageNumber=1 task#31 messageNumber=2 task#31 messageNumber=3 task#31 messageNumber=4 task#10 messageNumber=4 task#32 messageNumber=3 task#32 messageNumber=4 task#42 messageNumber=2 task#42 messageNumber=3 task#42 messageNumber=4 task#48 messageNumber=2 task#48 messageNumber=3 task#48 messageNumber=4 task#28 messageNumber=1 task#28 messageNumber=2 task#28 messageNumber=3 task#28 messageNumber=4 task#49 messageNumber=2 task#49 messageNumber=3 task#49 messageNumber=4 task#27 messageNumber=1 task#27 messageNumber=2 task#27 messageNumber=3 task#27 messageNumber=4 task#6 messageNumber=1 task#6 messageNumber=2 task#6 messageNumber=3 task#6 messageNumber=4 task#8 messageNumber=4
# 2 つ目の Pod のログを確認する(全文) # 出力例 $ kubectl logs pubsub-subscriber-944bcdb7b-s7xdq task#14 messageNumber=0 task#14 messageNumber=1 task#14 messageNumber=2 task#14 messageNumber=3 task#14 messageNumber=4 task#45 messageNumber=0 task#43 messageNumber=0 task#43 messageNumber=1 task#43 messageNumber=2 task#43 messageNumber=3 task#5 messageNumber=0 task#5 messageNumber=1 task#2 messageNumber=0 task#17 messageNumber=0 task#17 messageNumber=1 task#17 messageNumber=2 task#19 messageNumber=0 task#19 messageNumber=1 task#19 messageNumber=2 task#19 messageNumber=3 task#19 messageNumber=4 task#24 messageNumber=0 task#24 messageNumber=1 task#39 messageNumber=0 task#2 messageNumber=1 task#22 messageNumber=0 task#22 messageNumber=1 task#22 messageNumber=2 task#22 messageNumber=3 task#22 messageNumber=4 task#4 messageNumber=0 task#4 messageNumber=1 task#4 messageNumber=2 task#4 messageNumber=3 task#4 messageNumber=4 task#30 messageNumber=0 task#30 messageNumber=1 task#30 messageNumber=2 task#30 messageNumber=3 task#20 messageNumber=0 task#20 messageNumber=1 task#20 messageNumber=2 task#20 messageNumber=3 task#47 messageNumber=0 task#9 messageNumber=0 task#35 messageNumber=0 task#46 messageNumber=0 task#46 messageNumber=1 task#46 messageNumber=2 task#46 messageNumber=3 task#46 messageNumber=4 task#43 messageNumber=4 task#1 messageNumber=0 task#1 messageNumber=1 task#1 messageNumber=2 task#1 messageNumber=3 task#1 messageNumber=4 task#5 messageNumber=2 task#29 messageNumber=0 task#29 messageNumber=1 task#29 messageNumber=2 task#29 messageNumber=3 task#29 messageNumber=4 task#5 messageNumber=3 task#5 messageNumber=4 task#17 messageNumber=3 task#17 messageNumber=4 task#20 messageNumber=4 task#30 messageNumber=4 task#45 messageNumber=1 task#45 messageNumber=2 task#45 messageNumber=3 task#45 messageNumber=4 task#47 messageNumber=1 task#47 messageNumber=2 task#47 messageNumber=3 task#47 messageNumber=4 task#39 messageNumber=1 task#39 messageNumber=2 task#39 messageNumber=3 task#39 messageNumber=4 task#9 messageNumber=1 task#9 messageNumber=2 task#9 messageNumber=3 task#9 messageNumber=4 task#35 messageNumber=1 task#2 messageNumber=2 task#2 messageNumber=3 task#2 messageNumber=4 task#35 messageNumber=2 task#35 messageNumber=3 task#35 messageNumber=4 task#24 messageNumber=2 task#24 messageNumber=3 task#24 messageNumber=4
# 3 つ目の Pod のログを確認する(全文) # 出力例 $ kubectl logs pubsub-subscriber-944bcdb7b-wp6jt task#21 messageNumber=0 task#21 messageNumber=1 task#21 messageNumber=2 task#12 messageNumber=0 task#13 messageNumber=0 task#13 messageNumber=1 task#13 messageNumber=2 task#13 messageNumber=3 task#13 messageNumber=4 task#18 messageNumber=0 task#41 messageNumber=0 task#41 messageNumber=1 task#41 messageNumber=2 task#41 messageNumber=3 task#41 messageNumber=4 task#23 messageNumber=0 task#23 messageNumber=1 task#23 messageNumber=2 task#23 messageNumber=3 task#23 messageNumber=4 task#33 messageNumber=0 task#15 messageNumber=0 task#15 messageNumber=1 task#0 messageNumber=0 task#0 messageNumber=1 task#0 messageNumber=2 task#0 messageNumber=3 task#0 messageNumber=4 task#40 messageNumber=0 task#40 messageNumber=1 task#25 messageNumber=0 task#25 messageNumber=1 task#25 messageNumber=2 task#25 messageNumber=3 task#25 messageNumber=4 task#36 messageNumber=0 task#16 messageNumber=0 task#16 messageNumber=1 task#16 messageNumber=2 task#16 messageNumber=3 task#16 messageNumber=4 task#33 messageNumber=1 task#33 messageNumber=2 task#33 messageNumber=3 task#33 messageNumber=4 task#12 messageNumber=1 task#12 messageNumber=2 task#12 messageNumber=3 task#12 messageNumber=4 task#7 messageNumber=0 task#7 messageNumber=1 task#7 messageNumber=2 task#7 messageNumber=3 task#7 messageNumber=4 task#38 messageNumber=0 task#38 messageNumber=1 task#38 messageNumber=2 task#38 messageNumber=3 task#38 messageNumber=4 task#3 messageNumber=0 task#3 messageNumber=1 task#3 messageNumber=2 task#3 messageNumber=3 task#3 messageNumber=4 task#40 messageNumber=2 task#40 messageNumber=3 task#40 messageNumber=4 task#21 messageNumber=3 task#21 messageNumber=4 task#15 messageNumber=2 task#15 messageNumber=3 task#15 messageNumber=4 task#36 messageNumber=1 task#36 messageNumber=2 task#36 messageNumber=3 task#36 messageNumber=4 task#18 messageNumber=1 task#18 messageNumber=2 task#18 messageNumber=3 task#18 messageNumber=4
佐々木 駿太 (記事一覧)
G-gen最北端、北海道在住のクラウドソリューション部エンジニア
2022年6月にG-genにジョイン。Google Cloud Partner Top Engineer 2024に選出。好きなGoogle CloudプロダクトはCloud Run。
趣味はコーヒー、小説(SF、ミステリ)、カラオケなど。
Follow @sasashun0805