G-gen の杉村です。Pub/Sub の BigQuery サブスクリプションを使うと、Pub/Sub に対して発行したメッセージを簡単に BigQuery テーブルに書き込むことができます。
前提知識
BigQuery サブスクリプションとは
Pub/Sub は、Google Cloud(旧称 GCP)のフルマネージドなメッセージングサービスです。Pub/Sub の意義や、実現できるアーキテクチャは以下の記事もご参照ください。
Pub/Sub の BigQuery サブスクリプションを使うと、Pub/Sub に発行(パブリッシュ)されたメッセージを、直接 BigQuery に書き込むことができます。
BigQuery サブスクリプションを使わない場合は、Pub/Sub から取得したメッセージを BigQuery に書き込むためのサブスクライバー(メッセージを受け取る主体)を、Cloud Functions や Dataflow で実装する必要があります。一方で BigQuery サブスクリプションを使えば、サブスクライバーの実装は必要なく、Pub/Sub に発行された メッセージがそのまま BigQuery に書き込まれます。
スキーマ
BigQuery サブスクリプションには、設定方法が3つあります。トピックスキーマ、テーブルスキーマ、そしてスキーマなしです。
前者のトピックスキーマは、あらかじめ Pub/Sub トピックにスキーマ(メッセージの形式)を定義しておきます。トピックスキーマは、Apache Avro と Protocol Buffer の2つの形式に対応しています。
後者のテーブルスキーマは、Pub/Sub に投入された JSON メッセージのキー名と対応する列に、データが書き込まれます。あらかじめ Pub/Sub 側にスキーマを設定しておく必要はありません。
テーブルスキーマの使用例を示します。以下のような JSON メッセージであれば、BigQuery テーブルの「key1」列に「value1」が書き込まれ、「key2」列に「value2」が書き込まれ...というように、JSON キーに対応する列に値が書き込まれます。
{ "key1" : "value1", "key2" : "value2", "key3" : "value3" }
3つ目のスキーマなしを選択すると、データは対象 BigQuery テーブルの data という名称の列にメッセージを書き込みます。
当記事の検証では、2つ目のテーブルスキーマを使用しました。その流れと結果をご紹介します。
- 参考 : BigQuery properties
テーブルの作成
以下のように、任意のプロジェクトの任意のデータセットに、2つのカラムを持つシンプルなテーブルを作成しました。
Pub/Sub トピックの作成
Pub/Sub のトピックとサブスクリプションを作成します。
まず、bq-test
という名前のトピックを作成しました。トピックの詳細画面から以下のボタンを押すと、そのトピックに紐づくサブスクリプションを作成できます(次のステップ)。
Pub/Sub サブスクリプションの作成
サブスクリプションの作成画面では、配信タイプとして BigQuery への書き込み
を選択します。
書き込み先の BigQuery テーブルのプロジェクト、データセット名、テーブル名を指定します。
また、Schema Option には テーブルスキーマを使用する
を選択します。
なお、テーブル名を指定した際に、サービスアカウント(service-(プロジェクト番号)@gcp-sa-pubsub.iam.gserviceaccount.com
)に権限が不足している旨のエラーメッセージが表示されることがあります。その場合、該当の BigQuery データセットまたはテーブル、あるいはプロジェクトレベルで、同サービスアカウントに BigQuery データ編集者 (roles/bigquery.dataEditor)
のロールを付与してください。
サービス アカウント service-(プロジェクト番号)@gcp-sa-pubsub.iam.gserviceaccount.com に、次の BigQuery テーブルへの書き込みに必要な権限がありません: bigquery.tables.get、bigquery.tables.updateData。
メッセージ送信のテスト
ここまでで、事前準備が完了しました。動作確認をしてみます。
作成した Pub/Sub トピック bq-test
に、テストメッセージを送信(公開またはパブリッシュともいいます)してみます。
トピックの詳細画面から、「メッセージ」タブを開き、「メッセージをパブリッシュ」ボタンを押します。
表示された画面に、JSON メッセージを書き込みます。今回は以下のような JSON を書き込んでみます。
{ "id" : "10001", "data" : "Hello. This is a test message." }
最下部の「公開」ボタンを押下すると、メッセージがパブリッシュされます。
BigQuery テーブルの確認
書き込み対象のテーブルに SELECT 文を実行してみます。以下のように、テーブルにデータが書き込まれていました。
なお、メッセージをパブリッシュしてから SELECT 文を実行するまでには30秒ほどしか経っていませんでしたが、既に反映されていました。
杉村 勇馬 (記事一覧)
執行役員 CTO / クラウドソリューション部 部長
元警察官という経歴を持つ現 IT エンジニア。クラウド管理・運用やネットワークに知見。AWS 12資格、Google Cloud認定資格11資格。X (旧 Twitter) では Google Cloud や AWS のアップデート情報をつぶやいています。
Follow @y_sugi_it