サーバーレスVPCアクセス経由でMySQLサーバーからBigQueryにデータ転送してみた話

記事タイトルとURLをコピーする

G-gen の西島です。Google Cloud (旧称 GCP) の Cloud Run jobs 上に構築したジョブから Compute Engine 上の MySQL サーバーへ、サーバーレス VPC アクセス経由でクエリを発行し、その結果を BigQuery にロードするジョブの検証を行ったので、その紹介です。

はじめに

Compute Engine 上のリレーショナルデータベース上のデータに対して、BigQuery を用いてデータ分析を行いたいケースがあると思います。

今回は Cloud Run jobs を用いて、Private IP 経由で Compute Engine 上の MySQL サーバーへクエリを発行し、その結果を BigQuery にロードする仕組みを作成していきます。

構成

構成図

システム構成は、以下の図のとおりです。

構成図

認証情報

Compute Engine 上の MySQL への接続情報は Secret Manager へ保存します。

BigQuery への認証は、Cloud Run jobs にアタッチしたサービスアカウントを用います。

ビルドとデプロイ

コンテナイメージは Artifact Registry へ保存し、build・push・deploy の一連のフローは Cloud Build を用いて、開発者のローカルマシンからコマンドラインで実行します。

ネットワーク

Cloud Run jobs から Private IP 経由で VPC ネットワーク内のリソースにアクセスする場合、次のいずれかの方法を利用できます。

今回は Compute Engine 上に構築した MySQL データベースにサーバーレス VPC アクセス経由で接続しました。当検証実施した2023年9月現在は Direct VPC Egress が Preview 公開のため、本番環境に採用するのは GA 済みであるサーバーレス VPC アクセスが優先となるためです。

サーバーレス VPC アクセスでは、コネクタと呼ばれるリソースを作成する必要があり、コネクタの IP アドレス範囲は /28 の CIDR 範囲を指定しなければなりません。

Cloud Run jobs や Direct VPC Egress の詳細については、以下の記事もご参照ください。

blog.g-gen.co.jp

blog.g-gen.co.jp

ソースコード類の準備

Dockerfile

Cloud Run jobs はサーバーレスなコンテナサービスであるため、プログラムと実行に必要な依存関係を含んだコンテナイメージを作成する必要があります。

FROM python:3.10
WORKDIR /src
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt
COPY src .
CMD ["python", "main.py"]

requirements.txt

MySQL データベースへの接続用ライブラリとして、PyMySQL を採用しました。

MIT License で配布されているため商用利用可能で、かつ Python におけるリレーショナルデータベースを操作するための仕様である PEP 249 に準拠しています。

google-cloud-bigquery==3.11.4
PyMySQL==1.1.0
pandas==2.1.1
pyarrow==13.0.0

credential.txt

Secret Manager に登録する MySQL データベースへの接続情報は、以下のように JSON 形式のテキストファイルにしました。

{
    "DB_USER": "xxxxx",
    "DB_PASSWORD": "xxxxx",
    "DB_DATABASE": "xxxxx",
    "DB_HOST":  "xxxxx",
    "DB_PORT": "xxxxx",
    "BQ_PROJECT_ID": "xxxxx",
    "BQ_DATASET_ID": "xxxxx"
}

.gcloudignore

Cloud Build でのビルド時に、secret 情報をビルドパッケージに含まないようにするには、除外対象のファイル名を .gcloudignore ファイルで明示的に指定します。

credential.txt

main.py

メインとなるソースコードです。 プログラムの内容は、以下の流れになります。

  1. 環境変数経由で、Secret Managerに保存した接続情報を取得

  2. MySQLサーバーに接続、cityテーブルに対してクエリを発行し、結果をDataFrameに変換

  3. load_table_from_dataframe メソッドを利用し、BigQueryにロード

import json
import os
import sys
import logging
import traceback
import pymysql
import pandas as pd
from google.cloud import bigquery
import time
  
  
# ログの設定
logging.basicConfig(level=logging.INFO,
                    format="%(asctime)s - %(levelname)s:%(name)s - %(message)s")
logger = logging.getLogger(__name__)
  
  
def get_cred_config() -> dict[str, str]:
    secret = os.environ.get("CREDENTIALS_SECRET")
    if secret:
        return json.loads(secret)
  
  
def get_connection(cred: dict[str, str]) -> pymysql.Connection:
    connection = pymysql.connect(
        user=cred["DB_USER"],
        password=cred["DB_PASSWORD"],
        host=cred["DB_HOST"],
        database=cred["DB_DATABASE"],
        port=int(cred["DB_PORT"]),
        charset='utf8',
        cursorclass=pymysql.cursors.DictCursor
    )
    return connection
  
  
def fetch_data(cred: dict[str, str]) -> pd.DataFrame:
    with get_connection(cred) as connection:
        with connection.cursor() as cursor:
            sql = "SELECT * FROM city;"
            cursor.execute(sql)
            result = cursor.fetchall()

            return pd.DataFrame(result).rename(columns={
                "ID": "id",
                "Name": "name",
                "CountryCode": "country_code",
                "District": "district",
                "Population": "population"
            })
  
  
def main():
    logger.info(f"Start")
    # 時間計測開始
    start_time = time.perf_counter()
  
    cred = get_cred_config()
    df = fetch_data(cred)
  
    job_config = bigquery.LoadJobConfig(
        schema=[
            bigquery.SchemaField(
                "id", bigquery.enums.SqlTypeNames.INT64),
            bigquery.SchemaField(
                "name", bigquery.enums.SqlTypeNames.STRING),
            bigquery.SchemaField(
                "country_code", bigquery.enums.SqlTypeNames.STRING),
            bigquery.SchemaField(
                "district", bigquery.enums.SqlTypeNames.STRING),
            bigquery.SchemaField(
                "population", bigquery.enums.SqlTypeNames.INT64),
        ],
        write_disposition="WRITE_TRUNCATE",
    )
  
    client = bigquery.Client()
    table_id = f"{cred['BQ_PROJECT_ID']}.{cred['BQ_DATASET_ID']}.city"
    job = client.load_table_from_dataframe(
        dataframe=df, destination=table_id, job_config=job_config
    )
    job.result()
  
    # 時間計測終了
    end_time = time.perf_counter()
    logger.info(f"Finish, 経過時間(秒) #{end_time - start_time}")
  
  
if __name__ == "__main__":
    try:
        main()
    except Exception as err:
        logger.error(f"TraceBack: {traceback.format_exc()}")
  
        sys.exit(1)

cloudbuild.yaml

Cloud Build では構成ファイルの記述に沿って各ステップが実行されます。

下記の構成ファイルでは、ビルド・プッシュ・デプロイのステップがあり、name フィールドで各ステップを実行するコンテナイメージを指定しています。

steps:
  - id: "build"
    name: "gcr.io/cloud-builders/docker"
    args:
      [
        "build",
        "-t",
        "asia-northeast1-docker.pkg.dev/${PROJECT_ID}/docker-repo/job:latest",
        ".",
      ]
    dir: "."
  - id: "push"
    name: "gcr.io/cloud-builders/docker"
    args:
      [
        "push",
        "asia-northeast1-docker.pkg.dev/${PROJECT_ID}/docker-repo/job:latest",
      ]
  - id: "deploy"
    name: "gcr.io/cloud-builders/gcloud"
    entrypoint: "gcloud"
    args:
      [
        "run",
        "jobs",
        "deploy",
        "job",
        "--image",
        "asia-northeast1-docker.pkg.dev/${PROJECT_ID}/docker-repo/job:latest",
        "--region",
        "asia-northeast1",
        "--tasks",
        "1",
        "--max-retries",
        "0",
        "--service-account",
        "job-sa@${PROJECT_ID}.iam.gserviceaccount.com",
        "--set-secrets",
        "CREDENTIALS_SECRET=secret:latest",
        "--vpc-connector",
        "vpc-access-connecctor"
      ]

リソースの作成

プロジェクトと API

# 環境変数の設定
export PROJECT_ID={プロジェクト ID}
export PROJECT_NAME={プロジェクト名}
export BILLING_ACCOUNT_ID={請求先アカウント ID}
  
# プロジェクト作成
gcloud projects create ${PROJECT_ID} --name=${PROJECT_NAME}
 
# プロジェクト設定の変更
gcloud config set project ${PROJECT_ID}
  
# 請求先アカウントの紐づけ
gcloud beta billing projects link ${PROJECT_ID} \
    --billing-account=${BILLING_ACCOUNT_ID}
  
# API の有効化
gcloud services enable compute.googleapis.com \
    secretmanager.googleapis.com \
    run.googleapis.com \
    artifactregistry.googleapis.com \
    cloudbuild.googleapis.com \
    vpcaccess.googleapis.com

ネットワーク

# VPC ネットワークの作成
gcloud compute networks create vpc1 \
    --subnet-mode=custom \
    --bgp-routing-mode=regional \
    --mtu=1460
  
# サブネットの作成
## VM用
gcloud compute networks subnets create subnet1 --network=vpc1 \
    --range=10.0.0.0/28 --region=asia-northeast1 --enable-flow-logs \
    --logging-flow-sampling=1.0
  
## サーバーレス VPC アクセスコネクタ用サブネット
gcloud compute networks subnets create subnet-for-serverless-vpc-access --network=vpc1 \
    --range=10.8.0.0/28 --region=asia-northeast1 --enable-flow-logs \
    --logging-flow-sampling=1.0
  
# Cloud Router の作成
gcloud compute routers create nat-router \
    --network=vpc1 \
    --region=asia-northeast1
  
# Cloud NAT の作成
gcloud compute routers nats create nat \
    --router=nat-router \
    --auto-allocate-nat-external-ips \
    --nat-all-subnet-ip-ranges \
    --region=asia-northeast1
  
# Firewallの作成
##  IAP 経由で VM に SSH する許可ルール
gcloud compute firewall-rules create iap-rule --network=vpc1 --action=allow \
    --source-ranges=35.235.240.0/20 \
    --rules=tcp:22 --destination-ranges=10.0.0.4
  
# サーバーレス VPC アクセスコネクタの構築
gcloud compute networks vpc-access connectors create vpc-access-connector \
    --region asia-northeast1 \
    --subnet subnet-for-serverless-vpc-access \
    --min-instances 2 \
    --max-instances 3 \
    --machine-type f1-micro

Compute Engine VM

# VM の作成
gcloud compute instances create mysql-server \
    --zone=asia-northeast1-b \
    --machine-type=e2-micro \
    --network-interface=private-network-ip=10.0.0.4,stack-type=IPV4_ONLY,subnet=subnet1,no-address \
    --create-disk=auto-delete=yes,boot=yes,device-name=mysql-server,image=projects/debian-cloud/global/images/debian-11-bullseye-v20230912,mode=rw,size=10

Artifact Registry

# Artifact Registryにコンテナリポジトリを作成
gcloud artifacts repositories create docker-repo \
    --repository-format=docker \
    --location=asia-northeast1 \
    --description="Docker repository"

Secret Manager

# Secret Manager に認証情報を登録
gcloud secrets create secret && \
gcloud secrets versions add secret --data-file="./credential.txt"

BigQuery

# BigQueryにデータセットを作成
bq --location=asia-northeast1 mk \
    --dataset \
    ${PROJECT_ID}:world

サービスアカウント・IAM

# サービスアカウントの作成
gcloud iam service-accounts create job-sa \
    --description="Cloud Run jobsで使用するサービスアカウント"
  
# 各リソースにおけるサービスアカウントへのロール付与
  
## Secret Manager Secret Accessor (シークレットレベル)
gcloud secrets add-iam-policy-binding secret \
    --member="serviceAccount:job-sa@${PROJECT_ID}.iam.gserviceaccount.com" \
    --role="roles/secretmanager.secretAccessor"
  
## BigQuery Job User (プロジェクトレベル)
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:job-sa@${PROJECT_ID}.iam.gserviceaccount.com" \
    --role="roles/bigquery.jobUser"
  
## BigQuery Data Editor (プロジェクトレベル)
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:job-sa@${PROJECT_ID}.iam.gserviceaccount.com" \
    --role="roles/bigquery.dataEditor"
  
# Cloud Build から Cloud Run jobs へデプロイするため Cloud Build サービスアカウントに
# Cloud Run 管理者ロールとサービスアカウントユーザーロールを付与 (プロジェクトレベル)
export PROJECT_NUMBER=`gcloud projects describe $PROJECT_ID --format="value(projectNumber)"`
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member serviceAccount:${PROJECT_NUMBER}@cloudbuild.gserviceaccount.com \
    --role roles/run.admin
  
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member serviceAccount:${PROJECT_NUMBER}@cloudbuild.gserviceaccount.com \
    --role roles/iam.serviceAccountUser

MySQL のセットアップ

Compute Engine 上の VM に MySQL をインストールする方法は、Google Cloud の公式ドキュメントを参照ください。

次に、インストールした MySQL にサンプルデータをインポートし、Cloud Run jobs から接続するためのユーザを作成します。

サンプルデータは MySQL 公式の world database を使用します。

今回作成したユーザでは、接続元ホストをすべて許可していますが、本番運用の際は許可するホストに制限をかける事を推奨します。

# VM へ SSH 接続
gcloud compute ssh --tunnel-through-iap mysql-server --zone=asia-northeast1-b 
# サンプルデータのダウンロード
sudo apt-get install -y unzip && \
wget https://downloads.mysql.com/docs/world-db.zip && \
unzip world-db.zip
  
# データベースへ接続
mysql -u root -p 
# インポート
SOURCE ./world-db/world.sql;
  
# DB 接続用ユーザ作成
CREATE USER ユーザ名@'%' IDENTIFIED BY '接続時のパスワード';
  
# 権限付与 (world データベースのすべてのテーブルに対して全権限を付与)
GRANT ALL PRIVILEGES ON world.* TO ユーザ名@'%';
  
# 権限反映
flush privileges;

job のデプロイ

# Cloud Build で Docker イメージのビルド・プッシュ、Cloud Run jobs にデプロイ
gcloud builds submit --region asia-northeast1 --config cloudbuild.yaml

動作検証

Cloud Run jobsの実行

以下のコマンドでデプロイ済みの job を実行します。

gcloud run jobs execute job --region=asia-northeast1

Cloud Run jobsのジョブ詳細画面から実行の成否が確認できます。

Cloud Run jobsのジョブ詳細画面

BigQueryのコンソール画面からは、MySQLにあったテーブルデータが転送されていることが確認できます。

BigQueryのコンソール画面

次に、MySQLサーバーへの接続が、サーバーレス VPC アクセス経由でされているか確認します。確認方法として、Cloud Logging に出力される VPC フローログを確認する方法があります。

gcloud コマンドで確認する場合は、次のコマンドを実行します。

gcloud logging read logName="projects/${PROJECT_ID}/logs/compute.googleapis.com%2Fvpc_flows"

出力結果の jsonPayload.connection.dest_ipjsonPayload.connection.src_ip は、接続先・接続元のIPアドレスです。

今回は、接続先 (Compute Engine 上の VM)のIPアドレスに 10.0.0.4 を割り当て、接続元 (サーバーレス VPC アクセスコネクタ)のサブネットの CIDR に 10.8.0.0/28 を割り当てたため、それぞれの値がフローログに出力されていることが確認出来ます。

insertId: r6y9jef2tr757
jsonPayload:
  bytes_sent: '9792'
  connection:
    dest_ip: 10.0.0.4
    dest_port: 3306
    protocol: 6
    src_ip: 10.8.0.5
    src_port: 42755
  dest_instance:
    project_id: xxxxxx
    region: asia-northeast1
    vm_name: mysql-server
    zone: asia-northeast1-b
  dest_vpc:
    project_id: xxxxxx
    subnetwork_name: subnet1
    vpc_name: vpc1
  end_time: '2023-09-30T11:50:20.864864005Z'
  packets_sent: '64'
  reporter: DEST
  rtt_msec: '1'
  src_instance:
    project_id: xxxxxx
    region: asia-northeast1
    vm_name: aet-asianortheast1-vpc--access--connector-7zw0
    zone: asia-northeast1-b
  src_vpc:
    project_id: xxxxxx
    subnetwork_name: subnet-for-serverless-vpc-access
    vpc_name: vpc1
  start_time: '2023-09-30T11:50:20.796486189Z'
logName: projects/xxxxxx/logs/compute.googleapis.com%2Fvpc_flows
receiveTimestamp: '2023-09-30T11:50:33.956154690Z'
resource:
  labels:
    location: asia-northeast1-b
    project_id: xxxxxx
    subnetwork_id: '6548845232760910546'
    subnetwork_name: subnet1
  type: gce_subnetwork
timestamp: '2023-09-30T11:50:33.956154690Z'

リソースの削除

以下のコマンドを実行し、検証で作成したプロジェクトを削除します。

プロジェクトを削除すると、プロジェクト内のリソースは全て削除されるため、実際のご利用時はご注意ください。

gcloud projects delete ${PROJECT_ID}

西島 昌太(記事一覧)

データアナリティクス準備室 データエンジニア
2023年4月に新卒入社。

元はフロントエンド開発を主戦場に、現在はデータエンジニアリングを勉強中。何でも屋さんを目指して、日々邁進。 休日は大体プログラムを書いてる人