G-gen の堂原です。本記事では Google Cloud(旧称 GCP)の BigQuery において、データセットとリモートモデルが異なるリージョンに存在する場合の、AI データ処理パイプラインを紹介します。
はじめに
本記事の趣旨
本記事では、BigQuery データセット・テーブルと BigQuery ML のリモートモデルが異なるリージョンに存在する状況下における、データ処理パイプラインについて紹介します。
具体的には、BigQuery Data Transfer Service を用いて、処理の対象となるデータをリモートモデルが存在するリージョンにコピーし、コピーしたデータに対して処理をします。
ML.GENERATE_TEXT 関数と Gemini 2.0 の対応リージョン
BigQuery ML の ML.GENERATE_TEXT
は、SQL クエリから直接大規模言語モデル(LLM)を呼び出し、テキスト生成や分類などのタスクを実行できる関数です。これにより、データの前処理から LLM による推論までを一貫して BigQuery で完結させることができます。
BigQuery ML の制約として、テーブルとリモートモデルは同一のリージョンに存在する必要があります。一方、2025 年 5 月現在、Gemini 2.0 や Gemini 2.5 は東京リージョン(asia-northeast1)で利用できず、東京リージョンに保存されている BigQuery のデータを直接処理できません。
- 参考 : Gemini 2.0 Flash | Generative AI on Vertex AI | Google Cloud
- 参考 : Gemini 2.5 Pro | Generative AI on Vertex AI | Google Cloud
概要
今回用いるデータ
今回使用するデータは、架空のスマートフォンに寄せられたユーザーフィードバックです。以下のスキーマを持つテーブルを想定しています。
フィールド名 | 型 | 説明 |
---|---|---|
timestamp | TIMESTAMP | フィードバックを受けた日時 |
userEmail | STRING | ユーザーのメールアドレス |
userFeedback | STRING | ユーザーのフィードバックメッセージ |
以下にサンプルデータを示します。
timestamp | userEmail | userFeedback |
---|---|---|
2025-05-24 09:20:00 UTC | user_gg@example.com | サポートセンターの対応が迅速で丁寧でした。これからも応援しています! |
2025-05-23 21:55:00 UTC | user_ee@test.jp | 通知音が小さくて気づかないことが多いです。もっと大きくしてください。マナーモードでも振動が弱いです。 |
テーブル構成と処理概要
当記事では、架空のスマートフォンに寄せられたユーザーフィードバックを、Gemini 2.0 Flash を使って「ポジティブ」「ネガティブ」「ニュートラル」に分類するパイプラインを構築します。
まず、登場する複数のデータセットとテーブルの関係性を図に示します。
パイプラインは以下の5つのステップで構成され、それぞれスケジュールされた時間で実行されます。
ステップ | リージョン | コンポーネント | 開始時間 | 処理内容 |
---|---|---|---|---|
1 | asia-northeast1 | スケジュールドクエリ | 毎日 01:00 | us-central1 へ転送するデータ抽出(「daily-feedback」テーブル更新) |
2 | us-central1 | Data Transfer Service | 毎日 01:30 | データセット「tokyo_from_tokyo_to_us」のテーブルをデータセット「us」にコピー |
3 | us-central1 | スケジュールドクエリ | 毎日 02:00 | リモートモデル「gemini-model-20-flash」を用いてテーブル「daily-feedback」を処理(「evaluation-of-daily-feedback」テーブル更新) |
4 | asia-northeast1 | Data Transfer Service | 毎日 02:30 | データセット「us_from_us_to_tokyo」のテーブルをデータセット「tokyo」にコピー |
5 | asia-northeast1 | スケジュールドクエリ | 毎日 03:00 | 評価済みデータの整形(「evaluation-of-feedback」テーブル更新) |
留意点
単純化するため、当記事ではスケジュールドクエリと Data Transfer Service の設定を個別に行い、30 分ごとに実行するようにしていますが、実際には Cloud Workflows などのオーケストレーションツールを用いて一連のパイプラインとして統合することが推奨されます。
また、本パイプラインでは、異なるリージョン間でデータを転送するため、BigQuery のデータ転送料金が発生します。同一リージョン内で処理を完結させる場合に比べて追加の費用がかかる点にご留意ください。詳細な料金については、Google Cloud の公式ドキュメントをご確認ください。
実装
1. us-central1 へ転送するデータ抽出
asia-northeast1 に存在するフィードバックデータ「product-feedback」から、当日に処理したいデータを抽出します。この抽出結果は、一時的にデータセット「tokyo_from_tokyo_to_us」のテーブル「daily-feedback」に格納されます。
これは、Data Transfer Service によるデータ転送時の課金対象データ量を削減するためです。
-- 挿入先のテーブルを空に TRUNCATE TABLE `tokyo_from_tokyo_to_us.daily-feedback`; -- データ挿入 INSERT INTO `tokyo_from_tokyo_to_us.daily-feedback` SELECT * FROM `tokyo.product-feedback` WHERE DATE(timestamp, "Asia/Tokyo") = DATE(@run_time, "Asia/Tokyo") - 1;
この SQL は、BigQuery のスケジュールドクエリとして毎日午前 1 時に実行されるように設定します。@run_time
パラメータは、スケジュールドクエリの実行時に自動的に現在のタイムスタンプが渡されます。
2. データセット「tokyo_from_tokyo_to_us」のテーブルをデータセット「us」にコピー
次に、ステップ 1 で抽出したデータを asia-northeast1 から us-central1 へコピーします。これには Data Transfer Service を使用します。 Data Transfer Service の設定は以下の通りです。
- 送信元データセット : tokyo_from_tokyo_to_us (asia-northeast1)
- 送信先データセット : us (us-central1)
- オプション
- Overwrite destination table : 有効
このデータ転送は、毎日午前 1 時半に実行されるようにスケジュールします。
3. リモートモデル「gemini-model-20-flash」を用いてテーブル「daily-feedback」を処理
us-central1 にコピーされたデータに対して、BigQuery ML の ML.GENERATE_TEXT
関数を用いてフィードバックの評価を行います。評価結果は、データセット「us_from_us_to_tokyo」内の「evaluation-of-daily-feedback」テーブルに格納されます。
-- プロンプト DECLARE prompt_template STRING; SET prompt_template = """スマートフォン「ryu-do phone」に対する、ユーザーからのフィードバックを与えます。そのフィードバックが以下の評価のうち、どれに最も当てはまるかを回答してください。: - ポジティブ - ネガティブ - ニュートラル # フィードバック UIが直感的でとても使いやすいです。 # 評価 ポジティブ # フィードバック 指紋認証の精度が悪く、なかなか認識してくれません。ストレスが溜まります。 # 評価 ネガティブ # フィードバック __feedback__ # 評価"""; -- 挿入先のテーブルを空に TRUNCATE TABLE `us_from_us_to_tokyo.evaluation-of-daily-feedback`; -- Geminiによるフィードバックの評価 INSERT INTO `us_from_us_to_tokyo.evaluation-of-daily-feedback` SELECT timestamp, userEmail, ml_generate_text_llm_result AS evaluation FROM ML.GENERATE_TEXT( MODEL `us.gemini-model-20-flash`, ( SELECT REPLACE(prompt_template, "__feedback__", userFeedback) AS prompt, * FROM `us.daily-feedback` ), STRUCT( 0 AS temperature, 8192 AS max_output_tokens, TRUE AS flatten_json_output ) );
この SQL は、BigQuery のスケジュールドクエリとして毎日午前 2 時に実行されるように設定します。
4. データセット「us_from_us_to_tokyo」のテーブルをデータセット「tokyo」にコピー
ステップ 3 で処理された評価結果を、再び asia-northeast1 へコピーします。これも Data Transfer Service を使用します。
- 送信元データセット : us_from_us_to_tokyo (us-central1)
- 送信先データセット : tokyo (asia-northeast1)
- オプション
- Overwrite destination table : 有効
このデータ転送は、毎日午前 2 時半に実行されるようにスケジュールします。
5. 評価済みデータの整形
最後に、asia-northeast1 にコピーされた評価済みデータを、元のフィードバックデータと結合し、最終的な形式に整形します。この整形結果は、分析用テーブル「evaluation-of-feedback」に格納されます。
-- データの重複を防ぐため、同一期間のデータを削除 DELETE FROM `tokyo.evaluation-of-feedback` WHERE DATE(timestamp, "Asia/Tokyo") = DATE(@run_time, "Asia/Tokyo") - 1; -- 処理内容を挿入 INSERT INTO `tokyo.evaluation-of-feedback` SELECT evaluation.timestamp, evaluation.userEmail, feedback.userFeedback, evaluation.evaluation[f:id:ggen-sugimura:20250627090010p:plain] FROM `tokyo.evaluation-of-daily-feedback` AS evaluation INNER JOIN `tokyo_from_tokyo_to_us.daily-feedback` AS feedback ON evaluation.timestamp = feedback.timestamp AND evaluation.userEmail = feedback.userEmail;
この SQL は、BigQuery のスケジュールドクエリとして毎日午前 3 時に実行されるように設定します。
処理結果
上記のパイプラインを実行することで、テーブル「evaluation-of-feedback」には以下のようなデータが格納されます。
堂原 竜希(記事一覧)
クラウドソリューション部クラウドエクスプローラ課。2023年4月より、G-genにジョイン。
Google Cloud Partner Top Engineer 2023, 2024, 2025に選出 (2024年はRookie of the year、2025年はFellowにも選出)。休みの日はだいたいゲームをしているか、時々自転車で遠出をしています。
Follow @ryu_dohara