G-gen の杉村です。Pub/Sub の Cloud Storage インポートトピック(Cloud Storage import topic)を使うと、事前に指定した Cloud Storage バケットに Put されたテキストオブジェクトを、ノーコードで Pub/Sub トピックにパブリッシュし、簡単に Pub/Sub サブスクリプションに配信できます。
前提知識
Cloud Storage インポートトピックとは
Pub/Sub は、Google Cloud(旧称 GCP)のフルマネージドなメッセージングサービスです。Pub/Sub の意義や、実現できるアーキテクチャは以下の記事もご参照ください。
Pub/Sub の Cloud Storage インポートトピック(Cloud Storage import topic)を使うと、事前に指定した Cloud Storage バケットに Put されたテキストオブジェクトを、ノーコードで Pub/Sub トピックにパブリッシュし、簡単に Pub/Sub サブスクリプションに配信できます。
この Cloud Storage インポートトピックを使わない場合は、Cloud Storage にオブジェクトが Put されたことを Eventarc で検知し、Cloud Run functions 等を起動、記述したプログラムで配信先に書き込むという処理の開発が必要になります。
一方で、当記事で紹介する Cloud Storage インポートトピックを使えば、Cloud Storage バケットに書き込まれたオブジェクトを読み取るプログラムを開発することなく、自動的に Pub/Sub トピックにパブリッシュすることができます。
そのようにして Pub/Sub トピックにパブリッシュされたメッセージは、サブスクリプションを経由して、BigQuery に書き込んだり、他の API エンドポイントに Push することなどが可能です。
設定値
Cloud Storage インポートトピックには、以下のような設定値があります。
設定名 | 説明 |
---|---|
取り込み元バケット | データを取り込む Cloud Storage バケットを指定 |
オブジェクトの形式 | Text、Avro、Pub/Sub Avro から選択 |
区切り文字 | Text の場合のみ指定。この区切り文字に基づいてメッセージが分割される。1文字まで。省略すると \n (改行) |
最短のオブジェクト作成時間 | 取り込み開始時刻。この時刻より前に作成されたオブジェクトは取り込まれない |
glob パターン | ここで指定したパスパターンに一致するオブジェクトのみが取り込まれる。** で全オブジェクト。**.txt で拡張子指定等 |
以下は、Google Cloud コンソールにおける設定画面のスクリーンショットです。
検証の概要
当記事では、以下の構成で検証を行います。
上記の構成では、Cloud Storage インポートトピックによる自動的なメッセージ取り込みを行います。取り込んだメッセージは、BigQuery サブスクリプション経由で BigQuery テーブルに書き込みます。これによりソースコードを一切書くことなく、Cloud Storage に到着したテキストデータを順次、BigQuery に書き込むことが可能です。
Pub/Sub の BigQuery サブスクリプションの詳細は、以下の記事を参照してください。
さらに、追加検証として、メッセージ本文の構成を確かめるため、Pull サブスクリプションを作成して直接メッセージの内容を確認する検証も行います。
環境構築
サービスエージェントへ IAM 権限の付与(パブリッシュ)
Pub/Sub が利用するサービスエージェント(Google Cloud サービスが利用するサービスアカウントのこと)に、必要な IAM 権限を付与します。
サービスエージェント名は service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com
です。{PROJECT_NUMBER}
にはプロジェクト番号が入ります。このサービスエージェントに、プロジェクトレベルで Pub/Sub パブリッシャー(roles/pubsub.publisher
)ロールを付与します。このロールは、サービスエージェントが Cloud Storage インポートトピックにデータをパブリッシュするのに必要になります。
- 参考 : Add the Pub/Sub publisher role to the Pub/Sub service account
- 参考 : サービスエージェントとは何か ‐ G-gen Tech Blog
プロジェクトレベルの IAM バインディング一覧画面では、サービスエージェントに付与された IAM ロールは非表示になっています。以下のスクリーンショットのように「Google 提供のロール付与を含める」チェックボックスをオンにすると、IAM ロールが表示されます。
なお以下のスクリーンショットでは、デフォルトで付与されている「Cloud Pub/Sub サービス エージェント」、今回付与した「Pub/Sub パブリッシャー」に加えて「BigQuery データ編集者」が付与されていますが、これは今回の検証で BigQuery サブスクリプションを利用するためです。
バケットの作成
Cloud Storage バケットを作成します。今回は、以下のようなバケットを作成しました。
トピックの作成
Cloud Storage インポートトピックを作成します。
オブジェクトの形式は Text とし、区切り文字は空白とします。区切り文字を明示しない場合、\n
(改行)が区切り文字として認識されます。つまり、あるテキストファイルをバケットに Put した場合、1行が1メッセージとして Pub/Sub トピックに取り込まれます。
サービスエージェントへ IAM 権限の付与(バケット読み取り)
このとき、Pub/Sub のサービスエージェント(service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com
)が対象バケットへの読み取り権限を持っていない場合、以下のようなメッセージが表示されます。
「権限の設定」を押下すると、以下のような画面が表示され、「ロールの付与」をクリックすることで必要な IAM 権限を付与できます。
これを行うと、対象バケットのレベルで「Storage オブジェクト閲覧者」「Storage レガシー バケット読み取り」ロールが付与されます。以下のように、バケットの「権限」タブで付与された IAM ロールを確認できます。
BigQuery テーブルと BigQuery サブスクリプションの作成
データ格納先の BigQuery テーブルを作成します。id 列と data 列を持つシンプルなテーブルを作成します。
その後、Cloud Storage インポートトピックに紐づくサブスクリプションを「BigQuery サブスクリプション」タイプで作成します。
BigQuery サブスクリプションは、トピックが受け取ったメッセージを自動的に BigQuery に書き込んでくれます。今回は「スキーマを使用しない」に設定したので、メッセージは data
という名称を持つ列に文字列として書き込まれます。
トピックの状態を確認
ここで、トピックの状態を確認します。作成した Cloud Storage インポートトピックの詳細画面で、トピックのステータスが以下のように緑色で表示されていれば、正しく設定が完了しています。
もし以下のように表示されていた場合、Pub/Sub のサービスエージェントがトピックへのパブリッシュ権限を持っていないことが考えられます。当記事の サービスエージェントへ IAM 権限の付与(パブリッシュ)
に戻り、プロジェクトレベルで Pub/Sub パブリッシャー(roles/pubsub.publisher
)ロールを付与してください。あるいは、トピックレベルでロールを付与することも可能です。
取り込みのリソースエラー: 権限の公開が拒否されました
動作確認
オブジェクトの Put
動作確認のため、Cloud Storage バケットにオブジェクトを Put します。
今回は、以下のようなファイルを Put しました。改行で区切られた、2行のテキスト情報です。
test-file-01.txt
This is the first message. This is the second message.
テーブルの確認
Put されたテキストファイル内のテキスト情報は、Cloud Storage インポートトピックに自動的に取り込まれ、BigQuery サブスクリプションによって、対象テーブルの data 列に書き込まれるはずです。
対象テーブルに SELECT
文を実行すると、想定どおり、2行のテキスト情報が書き込まれていました。オブジェクトの Put からテーブルに情報が書き込まれるまで、およそ1分程度のラグがありました。
追加検証(区切り文字の追加)
追加検証として、先程は省略したトピックの「区切り文字」の設定を追加してみます。
以下のように、区切り文字を ,
(カンマ)として設定します。
その後、Cloud Storage バケットに次のファイルを Put します。半角スペースの扱いを確かめるため、3列目の文字列 3rd field
の手前には、あえて半角スペースを入れています。
test-csv-02.csv
1st field,2nd field, 3rd field
結果は、以下のように想定どおり格納されました。カンマのあとの半角スペースも反映されており、3rd field
の手前には半角スペースが入っています。
追加検証(メッセージ本文の確認)
さらに追加検証として、Pull サブスクリプションを作成して、Cloud Storage インポートトピックから発行されるメッセージの本文を確認します。
以下のように、Pull サブスクリプションを作成します。
以下のファイルを Put します。
another 1st field,another 2nd field, another 3rd field
その後、gcloud pubsub subscriptions pull
コマンドでサブスクリプションからメッセージを Pull します。--limit
オプションをつけずに実行すると、1個のメッセージのみが Pull されます。今回は --limit=10
を指定し、3つのメッセージが取得できることを確認します。
想定どおり、3つのメッセージが Pull できました。データ本文(data
)は Base64 でエンコードされているため、echo "${data}" | base64 --decode
のようにしてデコードすると、内容が確認できます。
メッセージ本文は、以下のような形式であることがわかりました。
[ { "ackId": "UAYWLF1GSFE3GQhoUQ5PXiM_NSAoRRsGCBQFfH1wU1x1XVx0aFENGXJ9YHxpW0VQAEVWe1lRGwdoTm11H7aF5ftLQ1RrWBYHBEBae19TGQhoXFp3D3nlneOW2-TYfQk9OqLbgtZtO-vw5OtHZiM9XxJLLD5-MSpFQV5AEkw6H0RJUytDCypYEU4EISE-MD5FU0Q", "message": { "data": "YW5vdGhlciAxc3QgZmllbGQ=", "messageId": "12566595082500628", "publishTime": "2024-11-12T09:08:52.185Z" } }, { "ackId": "UAYWLF1GSFE3GQhoUQ5PXiM_NSAoRRsGCBQFfH1wU1x1XVx0aFENGXJ9YHxpW0VQAEVWe1lRGgdoTm11H7aF5ftLQ1RrWBYHBEBae19TGQhoXFp3DnnlneOW2-TYfQk9OqLbgtZtO-vw5OtHZiM9XxJLLD5-MSpFQV5AEkw6H0RJUytDCypYEU4EISE-MD5FU0Q", "message": { "data": "YW5vdGhlciAybmQgZmllbGQ=", "messageId": "12566595082500629", "publishTime": "2024-11-12T09:08:52.185Z" } }, { "ackId": "UAYWLF1GSFE3GQhoUQ5PXiM_NSAoRRsGCBQFfH1wU1x1XVx0aFENGXJ9YHxpW0VQAEVWe1lRGQdoTm11H7aF5ftLQ1RrWBYHBEBae19TGQhoXFp2B3nlneOW2-TYfQk9OqLbgtZtO-vw5OtHZiM9XxJLLD5-MSpFQV5AEkw6H0RJUytDCypYEU4EISE-MD5FU0Q", "message": { "data": "IGFub3RoZXIgM3JkIGZpZWxk", "messageId": "12566595082500630", "publishTime": "2024-11-12T09:08:52.185Z" } } ]
杉村 勇馬 (記事一覧)
執行役員 CTO / クラウドソリューション部 部長
元警察官という経歴を持つ現 IT エンジニア。クラウド管理・運用やネットワークに知見。AWS 12資格、Google Cloud認定資格11資格。X (旧 Twitter) では Google Cloud や AWS のアップデート情報をつぶやいています。
Follow @y_sugi_it