Pub/SubのCloud Storage import topicを使ってみた

記事タイトルとURLをコピーする

G-gen の杉村です。Pub/Sub の Cloud Storage インポートトピック(Cloud Storage import topic)を使うと、事前に指定した Cloud Storage バケットに Put されたテキストオブジェクトを、ノーコードで Pub/Sub トピックにパブリッシュし、簡単に Pub/Sub サブスクリプションに配信できます。

前提知識

Cloud Storage インポートトピックとは

Pub/Sub は、Google Cloud(旧称 GCP)のフルマネージドなメッセージングサービスです。Pub/Sub の意義や、実現できるアーキテクチャは以下の記事もご参照ください。

blog.g-gen.co.jp

Pub/Sub の Cloud Storage インポートトピック(Cloud Storage import topic)を使うと、事前に指定した Cloud Storage バケットに Put されたテキストオブジェクトを、ノーコードで Pub/Sub トピックにパブリッシュし、簡単に Pub/Sub サブスクリプションに配信できます。

この Cloud Storage インポートトピックを使わない場合は、Cloud Storage にオブジェクトが Put されたことを Eventarc で検知し、Cloud Run functions 等を起動、記述したプログラムで配信先に書き込むという処理の開発が必要になります。

一方で、当記事で紹介する Cloud Storage インポートトピックを使えば、Cloud Storage バケットに書き込まれたオブジェクトを読み取るプログラムを開発することなく、自動的に Pub/Sub トピックにパブリッシュすることができます。

そのようにして Pub/Sub トピックにパブリッシュされたメッセージは、サブスクリプションを経由して、BigQuery に書き込んだり、他の API エンドポイントに Push することなどが可能です。

通常のトピックと Cloud Storage インポートトピックの違い

設定値

Cloud Storage インポートトピックには、以下のような設定値があります。

設定名 説明
取り込み元バケット データを取り込む Cloud Storage バケットを指定
オブジェクトの形式 Text、Avro、Pub/Sub Avro から選択
区切り文字 Text の場合のみ指定。この区切り文字に基づいてメッセージが分割される。1文字まで。省略すると \n (改行)
最短のオブジェクト作成時間 取り込み開始時刻。この時刻より前に作成されたオブジェクトは取り込まれない
glob パターン ここで指定したパスパターンに一致するオブジェクトのみが取り込まれる。** で全オブジェクト。**.txt で拡張子指定等

以下は、Google Cloud コンソールにおける設定画面のスクリーンショットです。

トピック作成画面

検証の概要

当記事では、以下の構成で検証を行います。

検証 1

上記の構成では、Cloud Storage インポートトピックによる自動的なメッセージ取り込みを行います。取り込んだメッセージは、BigQuery サブスクリプション経由で BigQuery テーブルに書き込みます。これによりソースコードを一切書くことなく、Cloud Storage に到着したテキストデータを順次、BigQuery に書き込むことが可能です。

Pub/Sub の BigQuery サブスクリプションの詳細は、以下の記事を参照してください。

blog.g-gen.co.jp

さらに、追加検証として、メッセージ本文の構成を確かめるため、Pull サブスクリプションを作成して直接メッセージの内容を確認する検証も行います。

検証 2

環境構築

サービスエージェントへ IAM 権限の付与(パブリッシュ)

Pub/Sub が利用するサービスエージェント(Google Cloud サービスが利用するサービスアカウントのこと)に、必要な IAM 権限を付与します。

サービスエージェント名は service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com です。{PROJECT_NUMBER} にはプロジェクト番号が入ります。このサービスエージェントに、プロジェクトレベルで Pub/Sub パブリッシャー(roles/pubsub.publisher)ロールを付与します。このロールは、サービスエージェントが Cloud Storage インポートトピックにデータをパブリッシュするのに必要になります。

プロジェクトレベルの IAM バインディング一覧画面では、サービスエージェントに付与された IAM ロールは非表示になっています。以下のスクリーンショットのように「Google 提供のロール付与を含める」チェックボックスをオンにすると、IAM ロールが表示されます。

なお以下のスクリーンショットでは、デフォルトで付与されている「Cloud Pub/Sub サービス エージェント」、今回付与した「Pub/Sub パブリッシャー」に加えて「BigQuery データ編集者」が付与されていますが、これは今回の検証で BigQuery サブスクリプションを利用するためです。

プロジェクトの IAM 画面

バケットの作成

Cloud Storage バケットを作成します。今回は、以下のようなバケットを作成しました。

Cloud Storage バケットの作成

トピックの作成

Cloud Storage インポートトピックを作成します。

オブジェクトの形式は Text とし、区切り文字は空白とします。区切り文字を明示しない場合、\n (改行)が区切り文字として認識されます。つまり、あるテキストファイルをバケットに Put した場合、1行が1メッセージとして Pub/Sub トピックに取り込まれます。

Cloud Storage インポートトピックの作成

サービスエージェントへ IAM 権限の付与(バケット読み取り)

このとき、Pub/Sub のサービスエージェント(service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com)が対象バケットへの読み取り権限を持っていない場合、以下のようなメッセージが表示されます。

権限の確認と付与

「権限の設定」を押下すると、以下のような画面が表示され、「ロールの付与」をクリックすることで必要な IAM 権限を付与できます。

IAM 権限の付与

これを行うと、対象バケットのレベルで「Storage オブジェクト閲覧者」「Storage レガシー バケット読み取り」ロールが付与されます。以下のように、バケットの「権限」タブで付与された IAM ロールを確認できます。

バケットレベルの権限の確認

BigQuery テーブルと BigQuery サブスクリプションの作成

データ格納先の BigQuery テーブルを作成します。id 列と data 列を持つシンプルなテーブルを作成します。

BigQuery テーブル

その後、Cloud Storage インポートトピックに紐づくサブスクリプションを「BigQuery サブスクリプション」タイプで作成します。

BigQuery サブスクリプションの作成

BigQuery サブスクリプションは、トピックが受け取ったメッセージを自動的に BigQuery に書き込んでくれます。今回は「スキーマを使用しない」に設定したので、メッセージは data という名称を持つ列に文字列として書き込まれます。

トピックの状態を確認

ここで、トピックの状態を確認します。作成した Cloud Storage インポートトピックの詳細画面で、トピックのステータスが以下のように緑色で表示されていれば、正しく設定が完了しています。

トピックが正しく設定されている状態

もし以下のように表示されていた場合、Pub/Sub のサービスエージェントがトピックへのパブリッシュ権限を持っていないことが考えられます。当記事の サービスエージェントへ IAM 権限の付与(パブリッシュ) に戻り、プロジェクトレベルで Pub/Sub パブリッシャー(roles/pubsub.publisher)ロールを付与してください。あるいは、トピックレベルでロールを付与することも可能です。

取り込みのリソースエラー: 権限の公開が拒否されました

取り込みのリソースエラー: 権限の公開が拒否されました

動作確認

オブジェクトの Put

動作確認のため、Cloud Storage バケットにオブジェクトを Put します。

今回は、以下のようなファイルを Put しました。改行で区切られた、2行のテキスト情報です。

test-file-01.txt

This is the first message.
This is the second message.

オブジェクトの Put

テーブルの確認

Put されたテキストファイル内のテキスト情報は、Cloud Storage インポートトピックに自動的に取り込まれ、BigQuery サブスクリプションによって、対象テーブルの data 列に書き込まれるはずです。

対象テーブルに SELECT 文を実行すると、想定どおり、2行のテキスト情報が書き込まれていました。オブジェクトの Put からテーブルに情報が書き込まれるまで、およそ1分程度のラグがありました。

BigQuery にテキストデータが格納された

追加検証(区切り文字の追加)

追加検証として、先程は省略したトピックの「区切り文字」の設定を追加してみます。

以下のように、区切り文字を ,(カンマ)として設定します。

区切り文字をカンマに設定

その後、Cloud Storage バケットに次のファイルを Put します。半角スペースの扱いを確かめるため、3列目の文字列 3rd field の手前には、あえて半角スペースを入れています。

test-csv-02.csv

1st field,2nd field, 3rd field

結果は、以下のように想定どおり格納されました。カンマのあとの半角スペースも反映されており、3rd field の手前には半角スペースが入っています。

カンマ区切りのデータが格納された

追加検証(メッセージ本文の確認)

さらに追加検証として、Pull サブスクリプションを作成して、Cloud Storage インポートトピックから発行されるメッセージの本文を確認します。

以下のように、Pull サブスクリプションを作成します。

Pull サブスクリプションを作成

以下のファイルを Put します。

another 1st field,another 2nd field, another 3rd field

その後、gcloud pubsub subscriptions pull コマンドでサブスクリプションからメッセージを Pull します。--limit オプションをつけずに実行すると、1個のメッセージのみが Pull されます。今回は --limit=10 を指定し、3つのメッセージが取得できることを確認します。

3つのメッセージが Pull できた

想定どおり、3つのメッセージが Pull できました。データ本文(data)は Base64 でエンコードされているため、echo "${data}" | base64 --decode のようにしてデコードすると、内容が確認できます。

メッセージ本文は、以下のような形式であることがわかりました。

[
  {
    "ackId": "UAYWLF1GSFE3GQhoUQ5PXiM_NSAoRRsGCBQFfH1wU1x1XVx0aFENGXJ9YHxpW0VQAEVWe1lRGwdoTm11H7aF5ftLQ1RrWBYHBEBae19TGQhoXFp3D3nlneOW2-TYfQk9OqLbgtZtO-vw5OtHZiM9XxJLLD5-MSpFQV5AEkw6H0RJUytDCypYEU4EISE-MD5FU0Q",
    "message": {
      "data": "YW5vdGhlciAxc3QgZmllbGQ=",
      "messageId": "12566595082500628",
      "publishTime": "2024-11-12T09:08:52.185Z"
    }
  },
  {
    "ackId": "UAYWLF1GSFE3GQhoUQ5PXiM_NSAoRRsGCBQFfH1wU1x1XVx0aFENGXJ9YHxpW0VQAEVWe1lRGgdoTm11H7aF5ftLQ1RrWBYHBEBae19TGQhoXFp3DnnlneOW2-TYfQk9OqLbgtZtO-vw5OtHZiM9XxJLLD5-MSpFQV5AEkw6H0RJUytDCypYEU4EISE-MD5FU0Q",
    "message": {
      "data": "YW5vdGhlciAybmQgZmllbGQ=",
      "messageId": "12566595082500629",
      "publishTime": "2024-11-12T09:08:52.185Z"
    }
  },
  {
    "ackId": "UAYWLF1GSFE3GQhoUQ5PXiM_NSAoRRsGCBQFfH1wU1x1XVx0aFENGXJ9YHxpW0VQAEVWe1lRGQdoTm11H7aF5ftLQ1RrWBYHBEBae19TGQhoXFp2B3nlneOW2-TYfQk9OqLbgtZtO-vw5OtHZiM9XxJLLD5-MSpFQV5AEkw6H0RJUytDCypYEU4EISE-MD5FU0Q",
    "message": {
      "data": "IGFub3RoZXIgM3JkIGZpZWxk",
      "messageId": "12566595082500630",
      "publishTime": "2024-11-12T09:08:52.185Z"
    }
  }
]

杉村 勇馬 (記事一覧)

執行役員 CTO / クラウドソリューション部 部長

元警察官という経歴を持つ現 IT エンジニア。クラウド管理・運用やネットワークに知見。AWS 12資格、Google Cloud認定資格11資格。X (旧 Twitter) では Google Cloud や AWS のアップデート情報をつぶやいています。