Pub/SubのBigQueryサブスクリプションを使ってみた

記事タイトルとURLをコピーする

G-gen の杉村です。Pub/Sub の BigQuery サブスクリプションを使うと、Pub/Sub に対して発行したメッセージを簡単に BigQuery テーブルに書き込むことができます。

前提知識

BigQuery サブスクリプションとは

Pub/Sub は、Google Cloud(旧称 GCP)のフルマネージドなメッセージングサービスです。Pub/Sub の意義や、実現できるアーキテクチャは以下の記事もご参照ください。

blog.g-gen.co.jp

Pub/Sub の BigQuery サブスクリプションを使うと、Pub/Sub に発行(パブリッシュ)されたメッセージを、直接 BigQuery に書き込むことができます。

BigQuery サブスクリプションを使わない場合は、Pub/Sub から取得したメッセージを BigQuery に書き込むためのサブスクライバー(メッセージを受け取る主体)を、Cloud Functions や Dataflow で実装する必要があります。一方で BigQuery サブスクリプションを使えば、サブスクライバーの実装は必要なく、Pub/Sub に発行された メッセージがそのまま BigQuery に書き込まれます。

テーブルスキーマの利用

BigQuery サブスクリプションには、設定方法が2つあります。トピックスキーマと、テーブルスキーマです。

前者のトピックスキーマは、あらかじめ Pub/Sub トピックにスキーマ(メッセージの形式)を定義しておきます。トピックスキーマは、Apache Avro と Protocol Buffer の2つの形式に対応しています。

後者のテーブルスキーマは、Pub/Sub に投入された JSON メッセージのキー名と対応する列に、データが書き込まれます。あらかじめ Pub/Sub 側にスキーマを設定しておく必要はありません。

テーブルスキーマの使用例を示します。以下のような JSON メッセージであれば、BigQuery テーブルの「key1」列に「value1」が書き込まれ、「key2」列に「value2」が書き込まれ...というように、JSON キーに対応する列に値が書き込まれます。

{
    "key1" : "value1",
    "key2" : "value2",
    "key3" : "value3"
}

当記事では、テーブルスキーマを使用した BigQuery サブスクリプション の利用方法について紹介します。

テーブルの作成

以下のように、任意のプロジェクトの任意のデータセットに、2つのカラムを持つシンプルなテーブルを作成しました。

Pub/Sub トピックの作成

Pub/Sub のトピックとサブスクリプションを作成します。

まず、bq-test という名前のトピックを作成しました。トピックの詳細画面から以下のボタンを押すと、そのトピックに紐づくサブスクリプションを作成できます(次のステップ)。

Pub/Sub サブスクリプションの作成

サブスクリプションの作成画面では、配信タイプとして BigQuery への書き込み を選択します。

書き込み先の BigQuery テーブルのプロジェクト、データセット名、テーブル名を指定します。

また、Schema Option には テーブルスキーマを使用する を選択します。

なお、テーブル名を指定した際に、サービスアカウント(service-(プロジェクト番号)@gcp-sa-pubsub.iam.gserviceaccount.com)に権限が不足している旨のエラーメッセージが表示されることがあります。その場合、該当の BigQuery データセットまたはテーブル、あるいはプロジェクトレベルで、同サービスアカウントに BigQuery データ編集者 (roles/bigquery.dataEditor) のロールを付与してください。

サービス アカウント service-(プロジェクト番号)@gcp-sa-pubsub.iam.gserviceaccount.com に、次の BigQuery テーブルへの書き込みに必要な権限がありません: bigquery.tables.get、bigquery.tables.updateData。

メッセージ送信のテスト

ここまでで、事前準備が完了しました。動作確認をしてみます。

作成した Pub/Sub トピック bq-test に、テストメッセージを送信(公開またはパブリッシュともいいます)してみます。

トピックの詳細画面から、「メッセージ」タブを開き、「メッセージをパブリッシュ」ボタンを押します。

表示された画面に、JSON メッセージを書き込みます。今回は以下のような JSON を書き込んでみます。

{
    "id" : "10001",
    "data" : "Hello. This is a test message."
}

最下部の「公開」ボタンを押下すると、メッセージがパブリッシュされます。

BigQuery テーブルの確認

書き込み対象のテーブルに SELECT 文を実行してみます。以下のように、テーブルにデータが書き込まれていました。

なお、メッセージをパブリッシュしてから SELECT 文を実行するまでには30秒ほどしか経っていませんでしたが、既に反映されていました。

杉村 勇馬 (記事一覧)

執行役員 CTO / クラウドソリューション部 部長

元警察官という経歴を持つ現 IT エンジニア。クラウド管理・運用やネットワークに知見。AWS 12資格、Google Cloud認定資格11資格。X (旧 Twitter) では Google Cloud や AWS のアップデート情報をつぶやいています。