Cloud Workflowsで簡易的なデータパイプラインを構築してみる

記事タイトルとURLをコピーする

G-gen の杉村です。 Google Cloud (旧称 GCP) には Cloud Workflows という簡易的なワークフローツールがあります。今回の記事は Cloud Workflows を使った簡易的なデータパイプラインの構築方法をご紹介します。

はじめに

Cloud Workflows とは

Cloud Workflows は Google Cloud (旧称 GCP) で提供されるワークフローツールです。

事前に定義したワークフロー (いわゆるジョブネット) に沿って、自動化ジョブを実行できます。ワークフローからは HTTP リクエスト (GET/POST) を発行したり、各種 Google Cloud API を呼び出すことができます。

フルマネージドサービスなのでインフラの管理・運用は一切不要であり、 安価で無料枠も大きい のが特徴です。

ワークフローではジョブを「順次」「分岐」「繰り返し」を利用して実行可能です。

ただしワークフローを GUI で編集することは現状できず、フローを yaml 形式 で記述します (簡易的な可視化機能はあります) 。

また、ワークフローは Cloud Scheduler と組み合わせて定期的な自動実行が可能です。

関連記事

Cloud Workflows と Cloud Scheduler については、以下の記事も参照してください。

blog.g-gen.co.jp

blog.g-gen.co.jp

この記事で作るもの

概要

当記事では Cloud Workflows を使って「 BigQuery のデータに対して SQL を実行して ELT 処理を行うワークフロー」を構築してみます。

当記事で紹介する手法では BigQuery Data Transfer API を利用するため「 Amazon S3 からデータを BigQuery に取り込み。その後、 ELT 処理を実行」というパイプラインも実現可能です。

イメージ

SQL 実行方法 1 - BigQuery API Connector

Cloud Workflows には Google Cloud の API を簡単に呼び出すための各種コネクタが存在します。

BigQuery API Connector もその一つで、簡単に BigQuery に対して SQL を実行できます。

ただし、このコネクタを使うとワークフローの yaml に直接 SQL を記述する必要があり、保守性が悪いため、今回はこの方法は 採りません

SQL 実行方法 2 - Scheduled Query

BigQuery の Scheduled Query (スケジュールされたクエリ) 機能では、 SQL を保存しておいて後から呼び出すことができます。

Scheduled Query は本来、定期的に SQL を自動実行するための BigQuery 備え付けの機能です。しかし呼び出されたときだけ SQL を実行する「オンデマンド」としての保存もできるため、今回はこれを呼び出す形を採ります。

保存されたオンデマンドの Scheduled Query を呼び出し

Scheduled Query の呼び出し方

Scheduled Query は BigQuery Data Transfer API 経由で呼び出すことができます。

Cloud Workflows には BigQuery Data Transfer API をノーコードで呼び出せる BigQuery Data Transfer API Connector も存在するのですが 2022 年 6 月現在で Preview 段階です。

本番利用を想定して、当記事ではワークフローから Cloud Functions の関数を呼び出し、その関数から BigQuery Data Transfer API をコールする形でデータパイプラインを実現していきます。

なお BigQuery Data Transfer API では他にも Amazon S3 から BigQuery へデータを転送するジョブ等も呼び出せるため、当記事のワークフローで「 Amazon S3 からデータを BigQuery に取り込み。その後、 ELT 処理を実行 」といった処理も可能です。

ワークフローの作成

フロー図

この記事では、以下の図のようなワークフローを構築します。

以下の図では実行されるジョブが query_01 の一つだけですが、数珠つなぎにジョブを繋げることで、クエリの完了を待って次のクエリを実行することが可能です。

ワークフローの例

Cloud Functions 関数

今回作成する Cloud Functions 関数については当記事では詳細は紹介しませんが、以下のような関数を想定しています。

No 関数名 処理内容 戻り値
1 execute-sql BigQuery Data Transfer API 経由で、与えられたリソース名の Scheduled Query を実行する 実行したジョブ名
例: projects/1234567890/locations/asia-northeast1/transferConfigs/11111111-0000-1111-1111-111111111111/runs/99999999-0000-9999-9999-999999999999
2 check-status BigQuery Data Transfer API 経由で、与えられたジョブ名のジョブのステータスを確認する ジョブのステータス
例: SUCEEDED / RUNNING / PENDING / FAILED 等

当記事ではこれらの関数が HTTP トリガ関数としてデプロイ済みである前提で解説します。

以下の資料をご参照ください。

yaml ファイル

以下のような yaml ファイルを作成します。

main:
  steps:
    - initialize:
        assign:
          - query_01_name: "projects/1234567890/locations/asia-northeast1/transferConfigs/11111111-0000-1111-1111-111111111111"
          - query_01_timeout: 300
        next: query_01
    - query_01:
        steps:
          - execute_01:
              call: http.post
              args:
                url: https://asia-northeast1-your-project-name.cloudfunctions.net/execute-sql
                body:
                  scheduled_query_name: ${query_01_name}
                auth:
                  type: OIDC
              result: run_name
          - check_01:
              steps:
                - setup_01:
                    assign:
                      - elapsed_time: 0
                      - timeout: ${query_01_timeout}
                - check_status_01:
                    call: http.post
                    args:
                      url: https://asia-northeast1-your-project-name.cloudfunctions.net/check-status
                      body:
                        run_name: ${run_name.body}
                      auth:
                        type: OIDC
                    result: status
                - check_if_complete_01:
                    switch:
                      - condition: ${status.body == "SUCCEEDED"}
                        next: finalize
                      - condition: ${elapsed_time >= timeout}
                        raise: "Job timed out"
                      - condition: ${status.body == "RUNNING" OR status.body == "PENDING"}
                        assign:
                          - elapsed_time: ${elapsed_time+10}
                        next: wait_01
                      - condition: True
                        raise: "Job error"
                - wait_01:
                    call: sys.sleep
                    args:
                      seconds: 10
                    next: check_status_01
    - finalize:
        return: "SUCEEDED"

7 行目の以下の行は、実際の Scheduled Query として保存されたオンデマンドクエリのリソース名で置き換えてください。

  • query_01_name: "projects/1234567890/locations/asia-northeast1/transferConfigs/11111111-0000-1111-1111-111111111111"

15 行目および 31 行目の以下の 2 行は Cloud Fucntions 関数 (HTTP トリガ) の呼び出し URL ですので、実際に構築した Cloud Functions のものに置き換えてください。

  • url: https://asia-northeast1-your-project-name.cloudfunctions.net/execute-sql
  • url: https://asia-northeast1-your-project-name.cloudfunctions.net/check-status

実行の流れ

冒頭のフロー図に示したとおり、これにより以下のワークフローが定義されます。

  • execute-sql 関数が実行されジョブが投入される
  • check-status 関数が実行されジョブのステータスが確認される
  • ステータスにより分岐 (以下の順で評価)
    • ステータスが SUCEEDED であればジョブが正常終了
    • 設定したタイムアウト秒数を超えていればワークフローが異常終了
    • ステータスが PENDINGRUNNING であれば 10 秒待機
    • 10 秒間待機
  • check-status 関数の実行まで戻る

ワークフローのデプロイ

yaml ファイルができたら、以下のコマンドを実行します。

gcloud workflows deploy example-workflow \
--location=asia-northeast1 \
--source=example-workflow.yaml \
--service-account=hogehoge@hogehoge.gserviceaccount.com

4 行目の --service-account=hogehoge@hogehoge.gserviceaccount.com は、 Cloud Functions を起動する権限のあるサービスアカウントに置き換えてください。

ワークフローの実行

以下のコマンドにより、デプロイしたワークフローが実行可能です。

gcloud workflows run example-workflow --location=asia-northeast1 --call-log-level=log-errors-only

ログレベルの設定など詳細は ドキュメント をご確認ください。このコマンドでは、各ステップの実行履歴はログ出力されず、エラーが起きた場合のみログが Cloud Logging に出力されます。

なお、もちろん Cloud Workflows のコンソール画面からもワークフローのデプロイや実行が可能なほか、実行状態を確認することができます。

gcloud コマンドで実行した場合はワークフロー全体が完了するまでプロンプトが戻らないので、適宜コンソールで状態を確認したり、別ターミナルからステータスを確認する コマンド を実行します。

留意事項

データパイプラインをはじめ自動化ジョブではエラー時の検知やリトライを検討する必要があります。

当記事でご紹介したワークフローはあくまで簡易的なものです。実際の業務では以下を考慮してください。

  • 冪等性 (リトライ性)
  • エラーの検知

ワークフローが「コケた」際に、シンプルにワークフローを再実行すれば良いように、冪等性 (何度実行しても同じ結果になる性質) が担保されるように変換処理を設計します。

またエラー検知に関しては、ワークフローのエラーを Cloud Logging に出力させて Cloud Monitoring で検知してメールや Slack へ通知する等が考えられます。

当記事のワークフローでは関数のエラー/例外をワークフロー内でハンドリングしていません (関数がコケても戻り値が返ると次のステップが実行されてしまう) 。 Cloud Workflows にはエラー/例外をハンドリングする 仕組み もありますので、適宜利用します。

杉村 勇馬 (記事一覧)

執行役員 CTO / クラウドソリューション部 部長

元警察官という経歴を持つ現 IT エンジニア。クラウド管理・運用やネットワークに知見。AWS 12資格、Google Cloud認定資格11資格。X (旧 Twitter) では Google Cloud や AWS のアップデート情報をつぶやいています。