Cloud WorkflowsとDataformを用いたデータ分析パイプラインを構築してみた

記事タイトルとURLをコピーする

G-gen 又吉です。当記事では、Cloud Workflows と Dataform を用いてデータ分析パイプラインを構築してみたいと思います。

前提知識

Cloud Workflows

Cloud Workflows はGoogle Cloud のワークフロー管理サービスです。フルマネージドかつサーバーレスであるためインフラの管理は必要なく、また非常に安価に利用できるのが特徴です。

詳細については、以下の記事をご参照下さい。

blog.g-gen.co.jp

Dataform

Dataform は、BigQuery のための SQL ワークフロー管理サービスです。フルマネージドであり、Dataform の利用自体は無料で利用できる点が特徴です。

詳細については、以下の記事をご参照下さい。

blog.g-gen.co.jp

ETL と ELT

ETL と ELT はどちらもデータを変換処理する際の流れを説明しています。

ETL は、Extract (抽出)、 Transform (変換) 、Load (書き出し) の順序でデータの変換処理を行います。主に、データを利用しやすい形に変換したり、DWH が読み込める形に変換して DWH 等の分析基盤に格納する際に利用します。

ELT は、Extract (抽出)、 Load (書き出し) 、Transform (変換) の順序でデータの変換処理を行います。ETL は変換処理後に DWH 等の分析基盤にデータをロードしますが、ELT の場合、先に DWH 等の分析基盤にデータをロードし、DWH 内でデータの変換処理を行います。

ETL と ELT の違い

概要

今回の構成

BigQuery に分析対象のデータがある場合、 Dataform を用いることで BigQuery 内で複数の SQL の依存関係を管理しつつデータ変換 (ELT) を行うことができます。

しかし、データレイクを Cloud Storage やオンプレミスに構えており、必要に応じ分析対象のデータのみを BigQuery にインポートするケースも多いです。

その際、BigQuery にインポートするデータが、BigQuery のデータの取り込み方式に従っている必要があり、もし対応していない場合はデータ転送時にデータの加工 (ETL) を行う必要があります。

今回は、データレイクと見立てた Cloud Storage のデータを、一部加工を行い BigQuery に格納後、SQL を用いてマートテーブルを作成するデータパイプラインのワークフローを構築します。

今回構築する構成図

Cloud Workflows のスコープ

Cloud Workflows では、疎結合になっている Cloud Functions (ETL 処理用) と、Dataform (ELT 処理用) の依存関係を確立しながらワークフローを管理します。

まずはじめに、parallel_step でそれぞれの Cloud Functions を並列で呼び出しております。Cloud Functions では、BigQuery に格納できる最低限のデータ変換処理を行うため、各 csv に対し以下の変換処理を行います。

  • カラム名を日本語からローマ字に変換
  • 日付のフォーマットを YYYY/MM/DD から YYYY-MM-DD に変換

次に、execute_elt で Dataform の呼び出しとステータス確認を行っております。ポイントは、Dataform の呼び出し (create_workflow_invocation) とステータス確認 (get_workflow_invocation) は別の API 実行として設定します。理由は、Dataform の呼び出し API のレスポンスにもステータスは返ってきますが、API 実行直後のステータスとなるため、SQL ワークフローの完了を待たずすて RUNNING 等のステータスが返却されるケースがあります。ステータス確認後、ステータスの状態によって条件分岐で処理を分けております。

参考:Dataform ワークフロー呼び出し時のステータス一覧

Cloud Workflows で定義したワークフロー図

Dataform のスコープ

Dataform では、BigQuery に格納されたデータの変換処理 (SQL) における依存関係を管理します。今回は検証のため 2 つのテーブルを結合してマートテーブルを作成する簡易的な変換処理を行っております。

Dataform 内の SQL ワークフロー

準備

ディレクトリ構造

開発環境は Cloud Shell を用いて行います。ディレクトリ構造は以下のとおりです。

terraform ディレクトリ配下は、以下のとおりです。

terraform
|-- gcf_source_code
|   |-- etl_raw_data
|   |   |-- main.py
|   |   `-- requirements.txt
|   `-- etl_weather_data
|       |-- main.py
|       `-- requirements.txt
|-- main.tf
`-- source_data
    |-- raw_data.csv
    `-- weather_data.csv

main.tf

main.tf には Terraform のコードを記述しています。

locals {
  terraform_service_account            = ${Terraform 実行に使われるサービスアカウントのメールアドレス}
  project_name                         = ${プロジェクト名}
  project_id                           = ${プロジェクト ID}
  folder_id                            = ${フォルダ ID}
  billing_account_id                   = ${請求先アカウント ID}
}
  
# terraform & provider の設定
terraform {
  required_providers {
    google = {
      source  = "hashicorp/google"
      version = ">= 4.0.0"
    }
  }
  required_version = ">= 1.3.0"
  
  backend "gcs" {
    bucket = ${tfstate ファイルを格納する Cloud Storage バケット名}
    impersonate_service_account = ${Terraform 実行に使われるサービスアカウントのメールアドレス}
    }
}
  
# サービスアカウント権限借用の設定
provider "google" {
    alias = "impersonation"
    scopes = [
        "https://www.googleapis.com/auth/cloud-platform",
        "https://www.googleapis.com/auth/userinfo.email",
    ]
}
  
data "google_service_account_access_token" "default" {
    provider               = google.impersonation
    target_service_account = local.terraform_service_account
    scopes                 = ["userinfo-email", "cloud-platform"]
    lifetime               = "1200s"
}
  
# Google プロバイダの設定
provider "google" {
  project         = local.project_id
  region          = "asia-northeast1"
  access_token    = data.google_service_account_access_token.default.access_token
  request_timeout = "60s"
}
  
provider "google-beta" {
  project         = local.project_id
  region          = "asia-northeast1"
  access_token    = data.google_service_account_access_token.default.access_token
  request_timeout = "60s"
}
  
  
######################################
### プロジェクトの作成と API の有効化 ###
######################################
  
# プロジェクトの作成
resource "google_project" "poc" {
  name            = local.project_name
  project_id      = local.project_id
  folder_id       = local.folder_id
  billing_account = local.billing_account_id
}
  
# API の有効化
module "tenant_a_project_services" {
  source  = "terraform-google-modules/project-factory/google//modules/project_services"
  version = "14.2.1"
  # version = "~> 13.0"
  
  project_id  = google_project.poc.project_id
  enable_apis = true
  activate_apis = [
    "iam.googleapis.com",
    "cloudbuild.googleapis.com",
    "run.googleapis.com",
    "cloudfunctions.googleapis.com",
    "cloudscheduler.googleapis.com",
    "artifactregistry.googleapis.com",
    "workflows.googleapis.com",
    "bigquery.googleapis.com",
    "dataform.googleapis.com",
  ]
  disable_services_on_destroy = false
}
  
#######################################
### サービスアカウントの作成と権限の付与 ##
#######################################
  
# Dataform サービスアカウントに権限付与
resource "google_project_iam_member" "bq_jobuser" {
  depends_on = [ 
    module.tenant_a_project_services,
    google_dataform_repository.dataform_repository
  ]
  project = google_project.poc.project_id
  role    = "roles/bigquery.jobUser"
  member  = "serviceAccount:service-${google_project.poc.number}@gcp-sa-dataform.iam.gserviceaccount.com"
}
  
resource "google_project_iam_member" "bq_data_editer" {
  depends_on = [ 
    module.tenant_a_project_services,
    google_dataform_repository.dataform_repository
  ]
  project = google_project.poc.project_id
  role    = "roles/bigquery.dataEditor"
  member  = "serviceAccount:service-${google_project.poc.number}@gcp-sa-dataform.iam.gserviceaccount.com"
}
  
# Cloud Functions 用サービスアカウントの作成と権限付与
resource "google_service_account" "sa_gcf" {
  project      = google_project.poc.project_id
  account_id   = "sa-gcf"
  display_name = "Cloud Functions 用サービスアカウント"
}
  
resource "google_project_iam_member" "invoke_gcf" {
  project = google_project.poc.project_id
  role    = "roles/run.invoker"
  member  = "serviceAccount:${google_service_account.sa_gcf.email}"
}
  
resource "google_project_iam_member" "bq_jobuser2" {
  project = google_project.poc.project_id
  role    = "roles/bigquery.jobUser"
  member  = "serviceAccount:${google_service_account.sa_gcf.email}"
}
  
resource "google_bigquery_dataset_iam_member" "source_dataset_viewer2" {
  project = google_project.poc.project_id
  dataset_id = google_bigquery_dataset.source_dataset.dataset_id
  role       = "roles/bigquery.dataEditor"
  member     = "serviceAccount:${google_service_account.sa_gcf.email}"
}
  
resource "google_bigquery_dataset_iam_member" "mart_dataset_viewer2" {
  project = google_project.poc.project_id
  dataset_id = google_bigquery_dataset.mart_dataset.dataset_id
  role       = "roles/bigquery.dataEditor"
  member     = "serviceAccount:${google_service_account.sa_gcf.email}"
}
  
resource "google_storage_bucket_iam_member" "source_data" {
  bucket = google_storage_bucket.source_data.name
  role = "roles/storage.admin"
  member     = "serviceAccount:${google_service_account.sa_gcf.email}"
}
  
resource "google_storage_bucket_iam_member" "source_gcf" {
  bucket = google_storage_bucket.source_gcf.name
  role = "roles/storage.admin"
  member     = "serviceAccount:${google_service_account.sa_gcf.email}"
}
  
# Cloud Workflows 用サービスアカウント作成と権限付与
resource "google_service_account" "sa_wf" {
  project      = google_project.poc.project_id
  account_id   = "sa-cloud-wf"
  display_name = "Cloud Workflows 用サービスアカウント"
}
  
resource "google_project_iam_member" "invoke_gcf2" {
  project = google_project.poc.project_id
  role    = "roles/run.invoker"
  member  = "serviceAccount:${google_service_account.sa_wf.email}"
}
  
resource "google_project_iam_member" "invoke_dataform" {
  project = google_project.poc.project_id
  role    = "roles/dataform.editor"
  member  = "serviceAccount:${google_service_account.sa_wf.email}"
}
  
# Cloud Scheduler 用サービスアカウント作成と権限付与
resource "google_service_account" "sa_scheduler" {
  project      = google_project.poc.project_id
  account_id   = "sa-scheduler"
  display_name = "Cloud Scheduler 用サービスアカウント"
}
  
resource "google_project_iam_member" "workflow_invoker" {
  project      = google_project.poc.project_id
  role   = "roles/workflows.invoker"
  member = "serviceAccount:${google_service_account.sa_scheduler.email}"
}
  
################################
### バケットとオブジェクトの作成 ###
################################
  
# ソースデータ格納用バケットの作成
resource "google_storage_bucket" "source_data" {
  project       = google_project.poc.project_id
  location      = "ASIA-NORTHEAST1"
  name          = "${google_project.poc.project_id}-source-data"
  force_destroy = true
}
  
# ソースデータをバケットに追加
resource "google_storage_bucket_object" "source_raw_data" {
  name   = "raw_data.csv"
  bucket = google_storage_bucket.source_data.name
  source = "source_data/raw_data.csv"
  content_type = "text/csv"
}
  
resource "google_storage_bucket_object" "source_weather_data" {
  name   = "weather_data.csv"
  bucket = google_storage_bucket.source_data.name
  source = "source_data/weather_data.csv"
  content_type = "text/csv"
}
  
# Cloud Functions のソースコード格納用バケットの作成
resource "google_storage_bucket" "source_gcf" {
  project       = google_project.poc.project_id
  location      = "ASIA-NORTHEAST1"
  name          = "${google_project.poc.project_id}-source-gcf"
  force_destroy = true
}
  
# Cloud Functions で使うソースコードを ZIP 化
data "archive_file" "etl_raw_data" {
  type        = "zip"
  source_dir  = "./gcf_source_code/etl_raw_data"
  output_path = "./zip_source_code/etl_raw_data.zip"
}
  
data "archive_file" "etl_weather_data" {
  type        = "zip"
  source_dir  = "./gcf_source_code/etl_weather_data"
  output_path = "./zip_source_code/etl_weather_data.zip"
}
  
# ZIP 化したソースコードをバケットに追加
resource "google_storage_bucket_object" "etl_raw_data" {
  name   = "etl-raw-data.${data.archive_file.etl_raw_data.output_md5}.zip"
  bucket = google_storage_bucket.source_gcf.name
  source = data.archive_file.etl_raw_data.output_path
}
  
resource "google_storage_bucket_object" "etl_weather_data" {
  name   = "etl-weather-data.${data.archive_file.etl_weather_data.output_md5}.zip"
  bucket = google_storage_bucket.source_gcf.name
  source = data.archive_file.etl_weather_data.output_path
}
  
################################
### BigQuery データセット作成 ###
################################
  
# データセットの作成
resource "google_bigquery_dataset" "source_dataset" {
  project    = google_project.poc.project_id
  dataset_id = "source_dataset"
  location   = "asia-northeast1"
  delete_contents_on_destroy = true
}
  
resource "google_bigquery_dataset" "mart_dataset" {
  project    = google_project.poc.project_id
  dataset_id = "mart_dataset"
  location   = "asia-northeast1"
  delete_contents_on_destroy = true
}
  
##############################
### Dataform リポジトリ作成 ###
##############################
  
# Dataform リポジトリ作成
resource "google_dataform_repository" "dataform_repository" {
  provider = google-beta
  name = "dataform_repository"
}
  
############################
### Cloud Functions 作成 ###
############################
  
# etl_raw_data 関数の作成
resource "google_cloudfunctions2_function" "etl_raw_data" {
  name     = "etl-raw-data"
  location = "asia-northeast1"
  build_config {
    runtime     = "python310"
    entry_point = "excute_etl" # Set the entry point
    source {
      storage_source {
        bucket = google_storage_bucket.source_gcf.name
        object = google_storage_bucket_object.etl_raw_data.name
      }
    }
  }
  service_config {
    max_instance_count    = 3
    available_memory      = "256M"
    timeout_seconds       = 60
    service_account_email = google_service_account.sa_gcf.email
    environment_variables = {
      BUCKET_NAME         = google_storage_bucket.source_data.name
      FILE_PATH           = google_storage_bucket_object.source_raw_data.name
      DATASET_ID          = google_bigquery_dataset.source_dataset.dataset_id
      TABLE_ID            = "raw_data"
    }
  }
}
  
# etl_weather_data 関数の作成
resource "google_cloudfunctions2_function" "etl_weather_data" {
  name     = "etl-weather-data"
  location = "asia-northeast1"
  build_config {
    runtime     = "python310"
    entry_point = "excute_etl" # Set the entry point
    source {
      storage_source {
        bucket = google_storage_bucket.source_gcf.name
        object = google_storage_bucket_object.etl_weather_data.name
      }
    }
  }
  service_config {
    max_instance_count    = 3
    available_memory      = "256M"
    timeout_seconds       = 60
    service_account_email = google_service_account.sa_gcf.email
    environment_variables = {
      BUCKET_NAME         = google_storage_bucket.source_data.name
      FILE_PATH           = google_storage_bucket_object.source_weather_data.name
      DATASET_ID          = google_bigquery_dataset.source_dataset.dataset_id
      TABLE_ID            = "weather_data"
    }
  }
}
  
############################
### Cloud Scheduler 作成 ###
############################
  
# Cloud Scheduler の作成
resource "google_cloud_scheduler_job" "cron" {
  name             = "cron"
  region   = "asia-northeast1"
  schedule         = "0 10 * * MON-FRI"  # Run the job every weekday at 10:00 AM
  time_zone        = "Asia/Tokyo"
  
  http_target {
    http_method = "POST"
    uri         = "https://workflowexecutions.googleapis.com/v1/projects/${local.project_id}/locations/${google_workflows_workflow.etl_and_elt.region}/workflows/${google_workflows_workflow.etl_and_elt.name}/executions"
    oauth_token {
      service_account_email = google_service_account.sa_scheduler.email
    }
    body = base64encode(jsonencode({
      "argument": jsonencode({
        "raw_data_gcf_url" = "${google_cloudfunctions2_function.etl_raw_data.service_config[0].uri}",
        "weather_data_dcf_url" = "${google_cloudfunctions2_function.etl_weather_data.service_config[0].uri}",
        "project_id" = "${google_project.poc.project_id}",
        "repository" = "projects/${google_project.poc.project_id}/locations/asia-northeast1/repositories/${google_dataform_repository.dataform_repository.name}"
      })
    }))
    headers = {
      "Content-Type" = "application/json"
    }
  }
  retry_config {
    retry_count = 3
  }
}
  
############################
### Cloud Workflows 作成 ###
############################
  
# Cloud Workflows の作成
resource "google_workflows_workflow" "etl_and_elt" {
  name          = "daily-workflows"
  region        = "asia-northeast1"
  service_account = google_service_account.sa_wf.email
  source_contents = <<-EOF
main:
    params: [args]
    steps:
    - init:
        assign:
        - repository: $${args.repository}
        - raw_data_gcf_url: $${args.raw_data_gcf_url}
        - weather_data_dcf_url: $${args.weather_data_dcf_url}
    - parallel_step:
        parallel:
          branches:
            - etl_01:
                steps:
                  - etl_raw_data:
                        call: http.post
                        args:
                            url: $${raw_data_gcf_url}
                            auth:
                                type: OIDC
                        result: run_name
            - etl_02:
                steps:
                  - etl_weather_data:
                        call: http.post
                        args:
                            url: $${weather_data_dcf_url}
                            auth:
                                type: OIDC
                        result: run_name
    - execute_elt:
        steps:
            - create_compilation_result:
                call: http.post
                args:
                    url: $${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
                    auth:
                        type: OAuth2
                    body:
                        gitCommitish: main
                result: compilationResult
            - create_workflow_invocation:
                call: http.post
                args:
                    url: $${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
                    auth:
                        type: OAuth2
                    body:
                        compilation_result: $${compilationResult.body.name}
                result: workflowInvocation
            - complete:
                return: $${workflowInvocation.body.name}

EOF
}

gcf_source_code/etl_raw_data

main.py

gcf_source_code/etl_raw_data には、Cloud Storage バケットに格納された raw_data.cev の ETL 処理を行う Cloud Functions のソースコードを格納しています。

import os
  
import functions_framework
from io import BytesIO
import pandas as pd
from google.cloud import storage
from google.cloud import bigquery
  
  
BUCKET_NAME = os.environ.get("BUCKET_NAME")
FILE_PATH = os.environ.get("FILE_PATH")
DATASET_ID = os.environ.get("DATASET_ID")
TABLE_ID = os.environ.get("TABLE_ID")
  
try:
    # クライアントをインスタンス化
    storage_client = storage.Client()
    bigquery_client = bigquery.Client()
except Exception as e:
    print(f"An error occurred: {e}")
    raise e
  
def extract():
    try:
        # バケットを取得
        bucket = storage_client.get_bucket(BUCKET_NAME)
  
        # BLOB を構成
        blob = bucket.blob(FILE_PATH)
  
        # オブジェクトのデータを取得
        content = blob.download_as_bytes()
    except Exception as e:
        print(f"An error occurred: {e}")
        raise e
  
    # データフレームを作成
    df = pd.read_csv(BytesIO(content))
  
    return df
  
def transform(df):
    # カラム名を変更
    df = df.rename(columns = {
        "日付" : "date",
        "デバイスID" : "device_id",
        "発電量" : "electric_generating_capacity",
        "都道府県" : "prefectures"
    })
  
    # 日付のフォーマットを変更
    df["date"] = pd.to_datetime(df["date"]).dt.strftime("%Y-%m-%d")
  
    return df
  
def load(df):
    try:
        # テーブル情報を取得
        table_ref = bigquery_client.dataset(DATASET_ID).table(TABLE_ID)
  
        # BigQueryテーブルへデータを挿入
        job = bigquery_client.load_table_from_dataframe(df, table_ref)
  
        # 結果を確認
        job.result()
    except Exception as e:
            print(f"An error occurred: {e}")
            raise e
  
    print("Loaded dataframe to {}".format(table_ref.path))
    return table_ref.path
  
@functions_framework.http
def excute_etl(request):
    df = extract()
    df_after_dransform = transform(df)
    path = load(df_after_dransform)
    return path

requirements.txt

functions-framework==3.*
bytesbufio==1.0.3
pandas==2.0.3
google-cloud-storage==2.10.0
google-cloud-bigquery==3.11.3
pyarrow==12.0.1

gcf_source_code/etl_weather_data

main.py

gcf_source_code/etl_weather_data には、Cloud Storage バケットに格納された weather_data.cev の ETL 処理を行う Cloud Functions のソースコードを格納しています。

import os
  
import functions_framework
from io import BytesIO
import pandas as pd
from google.cloud import storage
from google.cloud import bigquery
  
  
BUCKET_NAME = os.environ.get("BUCKET_NAME")
FILE_PATH = os.environ.get("FILE_PATH")
DATASET_ID = os.environ.get("DATASET_ID")
TABLE_ID = os.environ.get("TABLE_ID")
  
try:
    # クライアントをインスタンス化
    storage_client = storage.Client()
    bigquery_client = bigquery.Client()
except Exception as e:
    print(f"An error occurred: {e}")
    raise e
  
def extract():
    try:
        # バケットを取得
        bucket = storage_client.get_bucket(BUCKET_NAME)
  
        # BLOB を構成
        blob = bucket.blob(FILE_PATH)
  
        # オブジェクトのデータを取得
        content = blob.download_as_bytes()
    except Exception as e:
        print(f"An error occurred: {e}")
        raise e
  
    # データフレームを作成
    df = pd.read_csv(BytesIO(content))
  
    return df
  
def transform(df):
    # カラム名を変更
    df = df.rename(columns = {
        "日付" : "date",
        "都道府県" : "prefectures",
        "気温" : "temperature",
        "降水量" : "precipitation"
    })
  
    # 日付のフォーマットを変更
    df["date"] = pd.to_datetime(df["date"]).dt.strftime("%Y-%m-%d")
  
    return df
  
def load(df):
    try:
        # テーブル情報を取得
        table_ref = bigquery_client.dataset(DATASET_ID).table(TABLE_ID)
  
        # BigQueryテーブルへデータを挿入
        job = bigquery_client.load_table_from_dataframe(df, table_ref)
  
        # 結果を確認
        job.result()
    except Exception as e:
            print(f"An error occurred: {e}")
            raise e
  
    print("Loaded dataframe to {}".format(table_ref.path))
    return table_ref.path
  
@functions_framework.http
def excute_etl(request):
    df = extract()
    df_after_dransform = transform(df)
    path = load(df_after_dransform)
    return path

requirements.txt

functions-framework==3.*
bytesbufio==1.0.3
pandas==2.0.3
google-cloud-storage==2.10.0
google-cloud-bigquery==3.11.3
pyarrow==12.0.1

source_data

source_data には、Cloud Storage に格納する raw_data.csv と weather_data.csv をそれぞれ配置します。

対象の csv は、以下のスプレッドシートからダウンロードが可能です。

ブログ用ダミーデータ

実行

Terraform 実行

Cloud Shell のターミナルで terraform ディレクトリに移動し、terraform init で初期化を行い、 terraform plan 問題なければ terraform apply でデプロイを行います。

Dataform の設定

概要

今回、Terraform では Dataform のリポジトリ作成までを行いましたが sqlx ファイルの作成はできていないため、コンソールから以下の手順で sqlx ファイルを作成しリポジトリを完成させていきます。

  1. Dataform > 作成したリポジトリ をクリック > 開発ワークスペースを作成 をクリック
  2. 任意のワークスペース名を入力して 作成 をクリック
  3. 2 で作成したワークスペースを選択して ワークスペースを初期化 をクリック

ワークスペースの初期化を行うと以下のようなファイル群が自動生成されます。

開発ワークスペースの初期化後ファイル群

definitions ディレクトリ内に sqlx ファイルを作成し、SQL ワークフローを定義していきます。definitions ディレクトリ配下を以下の構成に変更してください。

修正後の開発ワークスペースファイル群

各ファイルの中身を以下のように記述します。

difinitions/source/raw_data.sqlx

config {
  type: "declaration",
  database: ${BigQuery が属するプロジェクト ID},  
  schema: "source_dataset",
  name: "raw_data",
}

参考:Dataform 徹底解説 データソースの宣言

difinitions/source/weather_data.sqlx

config {
  type: "declaration",
  database: ${BigQuery が属するプロジェクト ID},  
  schema: "source_dataset",
  name: "weather_data",
}

difinitions/transform/mart_data.sqlx

config {
  type: "table", 
  schema: "mart_dataset",
}

SELECT
  A.date,
  A.device_id,
  A.electric_generating_capacity,
  A.prefectures,
  B.temperature,
  B.Precipitation
FROM
  ${ref("raw_data")} as A
LEFT JOIN
  ${ref("weather_data")} as B
ON A.date = B.date AND A.prefectures = B.prefectures

ファイル修正後は、COMMIT とフォルトブランチへの PUSH を行ってください。

動作検証

検証 1

検証 1 は、そのまま実行してみます。尚、今回は、Cloud Scheduler を手動で強制実行していきたいと思います。

コンソールにて、Cloud Scheduler > 作成したジョブの 操作 から 強制実行 をクリックします。

Cloud Scheduler のコンソール画面

すると、Cloud Workflows がトリガーされ実行されます。以下が Cloud Workflows の実行詳細画面です。 Cloud Workflows のワークフローが成功したことが確認できました。

[検証1] Cloud Workflows の実行詳細画面

また、以下が Dataform の実行詳細画面です。Dataform も無事成功したことが確認できました。

Dataform の実行詳細画面

一連のワークフローが完了したことが確認できたため、最後に BigQuery 上でマートテーブルが作成できているか確認します。

BigQuery のコンソール画面

無事マートテーブルも作成できていました。

[検証1] Cloud Workflows のワークフロー図

検証 2

検証 2 では、Cloud Storage バケット上の raw_data.csv ファイルを削除してから、Cloud Scheduler を強制実行してみます。

以下が Cloud Workflows の実行詳細画面です。Cloud Functions を実行している parallel_step でエラーが発生したので、想定通りの挙動を確認できました。

[検証2] Cloud Workflows 実行詳細画面

[検証2] Cloud Workflows のワークフロー図

検証 3

検証 3 では、Cloud Storage バケット上のファイルを検証 1 の状態に戻し、Dataform 上の definitions/source/raw_data.sqlx ファイルを以下のように存在しない schema に書き換えてみます。

config {
  type: "declaration",
  database: "matayuuu-etl-elt",  
  schema: "hoge_dataset",
  name: "raw_data",
}

ファイル修正後は、COMMIT とフォルトブランチへの PUSH を行い、Cloud Scheduler を強制実行してみます。

以下が Cloud Workflows の実行詳細画面です。Dataform のステータス確認を行っている check_if_complete でエラーが発生したので、想定通りの挙動を確認できました。

[検証3] Cloud Workflows の実行詳細画面

[検証3] Cloud Workflows のワークフロー図

クリーンアップ

Dataform の開発ワークスペースを削除し、作成した Terraform を destroy コマンドで削除します。

  1. BigQuery > Dataform > リポジトリを選択し、手動で作成した開発ワークスペース を削除
  2. Cloud Shell にて terraform destroy を実行

本番運用時の考慮事項

エラー発生時の通知機能

今回は検証のため Cloud Workflows がエラーになってもメールや Slack 通知等で管理者へ通知する仕組みは作っておりませんが、本番運用時にはエラー発生時に即座に対応できるようにしておくとよいでしょう。

例えば、Cloud Workflows のログは Cloud Logging と連携しているため、ログベースのアラート を構成することで容易にエラー検知ができます。

再試行時の考慮

何らかの理由で Cloud Workflows のワークフローや Dataform の SQL ワークフローが途中で失敗した際に、それぞれのワークフローを再実行しても同じ結果を得るように設計しておくことが重要となります。

そこで使われるのが 冪等性 の担保です。何度繰り返しても、いつ実行しても、特定の入力セットに対して同じ動作をすることを冪等性が保たれている状態です。

その他にも、可能であればチェックポイントを設定することで、ワークフローが再開された際に、最初からジョブを再開するのではなく、中断したところから再開できるようにする方法も検討してみると良いでしょう。

参考:Jobs retries and checkpoints best practices

又吉 佑樹(記事一覧)

クラウドソリューション部

はいさい、沖縄出身のクラウドエンジニア!

セールスからエンジニアへ転身。Google Cloud 全 11 資格保有。Google Cloud Champion Innovator (AI/ML)。Google Cloud Partner Top Engineer 2024。Google Cloud 公式ユーザー会 Jagu'e'r でエバンジェリスト。好きな分野は生成 AI。