G-gen の西島です。Google Cloud (旧称 GCP) の Cloud Run jobs 上に構築したジョブから Compute Engine 上の MySQL サーバーへ、サーバーレス VPC アクセス経由でクエリを発行し、その結果を BigQuery にロードするジョブの検証を行ったので、その紹介です。
はじめに
Compute Engine 上のリレーショナルデータベース上のデータに対して、BigQuery を用いてデータ分析を行いたいケースがあると思います。
今回は Cloud Run jobs を用いて、Private IP 経由で Compute Engine 上の MySQL サーバーへクエリを発行し、その結果を BigQuery にロードする仕組みを作成していきます。
構成
構成図
システム構成は、以下の図のとおりです。
認証情報
Compute Engine 上の MySQL への接続情報は Secret Manager へ保存します。
BigQuery への認証は、Cloud Run jobs にアタッチしたサービスアカウントを用います。
ビルドとデプロイ
コンテナイメージは Artifact Registry へ保存し、build・push・deploy の一連のフローは Cloud Build を用いて、開発者のローカルマシンからコマンドラインで実行します。
ネットワーク
Cloud Run jobs から Private IP 経由で VPC ネットワーク内のリソースにアクセスする場合、次のいずれかの方法を利用できます。
今回は Compute Engine 上に構築した MySQL データベースにサーバーレス VPC アクセス経由で接続しました。当検証実施した2023年9月現在は Direct VPC Egress が Preview 公開のため、本番環境に採用するのは GA 済みであるサーバーレス VPC アクセスが優先となるためです。
サーバーレス VPC アクセスでは、コネクタと呼ばれるリソースを作成する必要があり、コネクタの IP アドレス範囲は /28 の CIDR 範囲を指定しなければなりません。
Cloud Run jobs や Direct VPC Egress の詳細については、以下の記事もご参照ください。
ソースコード類の準備
Dockerfile
Cloud Run jobs はサーバーレスなコンテナサービスであるため、プログラムと実行に必要な依存関係を含んだコンテナイメージを作成する必要があります。
FROM python:3.10 WORKDIR /src COPY requirements.txt requirements.txt RUN pip install -r requirements.txt COPY src . CMD ["python", "main.py"]
requirements.txt
MySQL データベースへの接続用ライブラリとして、PyMySQL を採用しました。
MIT License で配布されているため商用利用可能で、かつ Python におけるリレーショナルデータベースを操作するための仕様である PEP 249 に準拠しています。
google-cloud-bigquery==3.11.4 PyMySQL==1.1.0 pandas==2.1.1 pyarrow==13.0.0
credential.txt
Secret Manager に登録する MySQL データベースへの接続情報は、以下のように JSON 形式のテキストファイルにしました。
{ "DB_USER": "xxxxx", "DB_PASSWORD": "xxxxx", "DB_DATABASE": "xxxxx", "DB_HOST": "xxxxx", "DB_PORT": "xxxxx", "BQ_PROJECT_ID": "xxxxx", "BQ_DATASET_ID": "xxxxx" }
.gcloudignore
Cloud Build でのビルド時に、secret 情報をビルドパッケージに含まないようにするには、除外対象のファイル名を .gcloudignore
ファイルで明示的に指定します。
credential.txt
main.py
メインとなるソースコードです。 プログラムの内容は、以下の流れになります。
環境変数経由で、Secret Managerに保存した接続情報を取得
MySQLサーバーに接続、cityテーブルに対してクエリを発行し、結果をDataFrameに変換
load_table_from_dataframe メソッドを利用し、BigQueryにロード
import json import os import sys import logging import traceback import pymysql import pandas as pd from google.cloud import bigquery import time # ログの設定 logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s:%(name)s - %(message)s") logger = logging.getLogger(__name__) def get_cred_config() -> dict[str, str]: secret = os.environ.get("CREDENTIALS_SECRET") if secret: return json.loads(secret) def get_connection(cred: dict[str, str]) -> pymysql.Connection: connection = pymysql.connect( user=cred["DB_USER"], password=cred["DB_PASSWORD"], host=cred["DB_HOST"], database=cred["DB_DATABASE"], port=int(cred["DB_PORT"]), charset='utf8', cursorclass=pymysql.cursors.DictCursor ) return connection def fetch_data(cred: dict[str, str]) -> pd.DataFrame: with get_connection(cred) as connection: with connection.cursor() as cursor: sql = "SELECT * FROM city;" cursor.execute(sql) result = cursor.fetchall() return pd.DataFrame(result).rename(columns={ "ID": "id", "Name": "name", "CountryCode": "country_code", "District": "district", "Population": "population" }) def main(): logger.info(f"Start") # 時間計測開始 start_time = time.perf_counter() cred = get_cred_config() df = fetch_data(cred) job_config = bigquery.LoadJobConfig( schema=[ bigquery.SchemaField( "id", bigquery.enums.SqlTypeNames.INT64), bigquery.SchemaField( "name", bigquery.enums.SqlTypeNames.STRING), bigquery.SchemaField( "country_code", bigquery.enums.SqlTypeNames.STRING), bigquery.SchemaField( "district", bigquery.enums.SqlTypeNames.STRING), bigquery.SchemaField( "population", bigquery.enums.SqlTypeNames.INT64), ], write_disposition="WRITE_TRUNCATE", ) client = bigquery.Client() table_id = f"{cred['BQ_PROJECT_ID']}.{cred['BQ_DATASET_ID']}.city" job = client.load_table_from_dataframe( dataframe=df, destination=table_id, job_config=job_config ) job.result() # 時間計測終了 end_time = time.perf_counter() logger.info(f"Finish, 経過時間(秒) #{end_time - start_time}") if __name__ == "__main__": try: main() except Exception as err: logger.error(f"TraceBack: {traceback.format_exc()}") sys.exit(1)
cloudbuild.yaml
Cloud Build では構成ファイルの記述に沿って各ステップが実行されます。
下記の構成ファイルでは、ビルド・プッシュ・デプロイのステップがあり、name フィールドで各ステップを実行するコンテナイメージを指定しています。
steps: - id: "build" name: "gcr.io/cloud-builders/docker" args: [ "build", "-t", "asia-northeast1-docker.pkg.dev/${PROJECT_ID}/docker-repo/job:latest", ".", ] dir: "." - id: "push" name: "gcr.io/cloud-builders/docker" args: [ "push", "asia-northeast1-docker.pkg.dev/${PROJECT_ID}/docker-repo/job:latest", ] - id: "deploy" name: "gcr.io/cloud-builders/gcloud" entrypoint: "gcloud" args: [ "run", "jobs", "deploy", "job", "--image", "asia-northeast1-docker.pkg.dev/${PROJECT_ID}/docker-repo/job:latest", "--region", "asia-northeast1", "--tasks", "1", "--max-retries", "0", "--service-account", "job-sa@${PROJECT_ID}.iam.gserviceaccount.com", "--set-secrets", "CREDENTIALS_SECRET=secret:latest", "--vpc-connector", "vpc-access-connecctor" ]
リソースの作成
プロジェクトと API
# 環境変数の設定 export PROJECT_ID={プロジェクト ID} export PROJECT_NAME={プロジェクト名} export BILLING_ACCOUNT_ID={請求先アカウント ID} # プロジェクト作成 gcloud projects create ${PROJECT_ID} --name=${PROJECT_NAME} # プロジェクト設定の変更 gcloud config set project ${PROJECT_ID} # 請求先アカウントの紐づけ gcloud beta billing projects link ${PROJECT_ID} \ --billing-account=${BILLING_ACCOUNT_ID} # API の有効化 gcloud services enable compute.googleapis.com \ secretmanager.googleapis.com \ run.googleapis.com \ artifactregistry.googleapis.com \ cloudbuild.googleapis.com \ vpcaccess.googleapis.com
ネットワーク
# VPC ネットワークの作成 gcloud compute networks create vpc1 \ --subnet-mode=custom \ --bgp-routing-mode=regional \ --mtu=1460 # サブネットの作成 ## VM用 gcloud compute networks subnets create subnet1 --network=vpc1 \ --range=10.0.0.0/28 --region=asia-northeast1 --enable-flow-logs \ --logging-flow-sampling=1.0 ## サーバーレス VPC アクセスコネクタ用サブネット gcloud compute networks subnets create subnet-for-serverless-vpc-access --network=vpc1 \ --range=10.8.0.0/28 --region=asia-northeast1 --enable-flow-logs \ --logging-flow-sampling=1.0 # Cloud Router の作成 gcloud compute routers create nat-router \ --network=vpc1 \ --region=asia-northeast1 # Cloud NAT の作成 gcloud compute routers nats create nat \ --router=nat-router \ --auto-allocate-nat-external-ips \ --nat-all-subnet-ip-ranges \ --region=asia-northeast1 # Firewallの作成 ## IAP 経由で VM に SSH する許可ルール gcloud compute firewall-rules create iap-rule --network=vpc1 --action=allow \ --source-ranges=35.235.240.0/20 \ --rules=tcp:22 --destination-ranges=10.0.0.4 # サーバーレス VPC アクセスコネクタの構築 gcloud compute networks vpc-access connectors create vpc-access-connector \ --region asia-northeast1 \ --subnet subnet-for-serverless-vpc-access \ --min-instances 2 \ --max-instances 3 \ --machine-type f1-micro
Compute Engine VM
# VM の作成 gcloud compute instances create mysql-server \ --zone=asia-northeast1-b \ --machine-type=e2-micro \ --network-interface=private-network-ip=10.0.0.4,stack-type=IPV4_ONLY,subnet=subnet1,no-address \ --create-disk=auto-delete=yes,boot=yes,device-name=mysql-server,image=projects/debian-cloud/global/images/debian-11-bullseye-v20230912,mode=rw,size=10
Artifact Registry
# Artifact Registryにコンテナリポジトリを作成 gcloud artifacts repositories create docker-repo \ --repository-format=docker \ --location=asia-northeast1 \ --description="Docker repository"
Secret Manager
# Secret Manager に認証情報を登録 gcloud secrets create secret && \ gcloud secrets versions add secret --data-file="./credential.txt"
BigQuery
# BigQueryにデータセットを作成 bq --location=asia-northeast1 mk \ --dataset \ ${PROJECT_ID}:world
サービスアカウント・IAM
# サービスアカウントの作成 gcloud iam service-accounts create job-sa \ --description="Cloud Run jobsで使用するサービスアカウント" # 各リソースにおけるサービスアカウントへのロール付与 ## Secret Manager Secret Accessor (シークレットレベル) gcloud secrets add-iam-policy-binding secret \ --member="serviceAccount:job-sa@${PROJECT_ID}.iam.gserviceaccount.com" \ --role="roles/secretmanager.secretAccessor" ## BigQuery Job User (プロジェクトレベル) gcloud projects add-iam-policy-binding ${PROJECT_ID} \ --member="serviceAccount:job-sa@${PROJECT_ID}.iam.gserviceaccount.com" \ --role="roles/bigquery.jobUser" ## BigQuery Data Editor (プロジェクトレベル) gcloud projects add-iam-policy-binding ${PROJECT_ID} \ --member="serviceAccount:job-sa@${PROJECT_ID}.iam.gserviceaccount.com" \ --role="roles/bigquery.dataEditor" # Cloud Build から Cloud Run jobs へデプロイするため Cloud Build サービスアカウントに # Cloud Run 管理者ロールとサービスアカウントユーザーロールを付与 (プロジェクトレベル) export PROJECT_NUMBER=`gcloud projects describe $PROJECT_ID --format="value(projectNumber)"` gcloud projects add-iam-policy-binding ${PROJECT_ID} \ --member serviceAccount:${PROJECT_NUMBER}@cloudbuild.gserviceaccount.com \ --role roles/run.admin gcloud projects add-iam-policy-binding ${PROJECT_ID} \ --member serviceAccount:${PROJECT_NUMBER}@cloudbuild.gserviceaccount.com \ --role roles/iam.serviceAccountUser
MySQL のセットアップ
Compute Engine 上の VM に MySQL をインストールする方法は、Google Cloud の公式ドキュメントを参照ください。
次に、インストールした MySQL にサンプルデータをインポートし、Cloud Run jobs から接続するためのユーザを作成します。
サンプルデータは MySQL 公式の world database を使用します。
今回作成したユーザでは、接続元ホストをすべて許可していますが、本番運用の際は許可するホストに制限をかける事を推奨します。
# VM へ SSH 接続 gcloud compute ssh --tunnel-through-iap mysql-server --zone=asia-northeast1-b
# サンプルデータのダウンロード sudo apt-get install -y unzip && \ wget https://downloads.mysql.com/docs/world-db.zip && \ unzip world-db.zip # データベースへ接続 mysql -u root -p
# インポート SOURCE ./world-db/world.sql; # DB 接続用ユーザ作成 CREATE USER ユーザ名@'%' IDENTIFIED BY '接続時のパスワード'; # 権限付与 (world データベースのすべてのテーブルに対して全権限を付与) GRANT ALL PRIVILEGES ON world.* TO ユーザ名@'%'; # 権限反映 flush privileges;
job のデプロイ
# Cloud Build で Docker イメージのビルド・プッシュ、Cloud Run jobs にデプロイ gcloud builds submit --region asia-northeast1 --config cloudbuild.yaml
動作検証
Cloud Run jobsの実行
以下のコマンドでデプロイ済みの job を実行します。
gcloud run jobs execute job --region=asia-northeast1
Cloud Run jobsのジョブ詳細画面から実行の成否が確認できます。
BigQueryのコンソール画面からは、MySQLにあったテーブルデータが転送されていることが確認できます。
次に、MySQLサーバーへの接続が、サーバーレス VPC アクセス経由でされているか確認します。確認方法として、Cloud Logging に出力される VPC フローログを確認する方法があります。
gcloud コマンドで確認する場合は、次のコマンドを実行します。
gcloud logging read logName="projects/${PROJECT_ID}/logs/compute.googleapis.com%2Fvpc_flows"
出力結果の jsonPayload.connection.dest_ip
、jsonPayload.connection.src_ip
は、接続先・接続元のIPアドレスです。
今回は、接続先 (Compute Engine 上の VM)のIPアドレスに 10.0.0.4 を割り当て、接続元 (サーバーレス VPC アクセスコネクタ)のサブネットの CIDR に 10.8.0.0/28 を割り当てたため、それぞれの値がフローログに出力されていることが確認出来ます。
insertId: r6y9jef2tr757 jsonPayload: bytes_sent: '9792' connection: dest_ip: 10.0.0.4 dest_port: 3306 protocol: 6 src_ip: 10.8.0.5 src_port: 42755 dest_instance: project_id: xxxxxx region: asia-northeast1 vm_name: mysql-server zone: asia-northeast1-b dest_vpc: project_id: xxxxxx subnetwork_name: subnet1 vpc_name: vpc1 end_time: '2023-09-30T11:50:20.864864005Z' packets_sent: '64' reporter: DEST rtt_msec: '1' src_instance: project_id: xxxxxx region: asia-northeast1 vm_name: aet-asianortheast1-vpc--access--connector-7zw0 zone: asia-northeast1-b src_vpc: project_id: xxxxxx subnetwork_name: subnet-for-serverless-vpc-access vpc_name: vpc1 start_time: '2023-09-30T11:50:20.796486189Z' logName: projects/xxxxxx/logs/compute.googleapis.com%2Fvpc_flows receiveTimestamp: '2023-09-30T11:50:33.956154690Z' resource: labels: location: asia-northeast1-b project_id: xxxxxx subnetwork_id: '6548845232760910546' subnetwork_name: subnet1 type: gce_subnetwork timestamp: '2023-09-30T11:50:33.956154690Z'
リソースの削除
以下のコマンドを実行し、検証で作成したプロジェクトを削除します。
プロジェクトを削除すると、プロジェクト内のリソースは全て削除されるため、実際のご利用時はご注意ください。
gcloud projects delete ${PROJECT_ID}
西島 昌太(記事一覧)
カスタマーサクセス課 データエンジニア
2023年4月に新卒入社。
元はフロントエンド開発を主戦場に、現在はデータエンジニアリングを勉強中。何でも屋さんを目指して、日々邁進。 休日は大体プログラムを書いてる人