G-gen の杉村です。 Google Cloud(旧称 GCP)には Cloud Workflows という簡易的なワークフローツールがあります。今回の記事は Cloud Workflows を使った簡易的なデータパイプラインの構築方法をご紹介します。
はじめに
Cloud Workflows とは
Cloud Workflows(単に Workflows とも呼ばれる)は、Google Cloud で提供されるワークフローツールです。事前に定義したワークフロー(いわゆるジョブネット)に沿って、自動化ジョブを実行できます。ワークフローからは HTTP リクエストを実行したり、各種 Google Cloud API を呼び出すことができます。
フルマネージドサービスなのでインフラの管理・運用は一切不要であり、安価で無料枠も大きいのが特徴です。
ワークフローではジョブを「順次」「分岐」「繰り返し」を利用して実行可能です。ただし、簡易的な可視化機能はあるものの、ワークフローを GUI で編集することはできず、フローを YAML または JSON 形式で記述します。
またワークフローは、Cloud Scheduler と組み合わせて、定期的な自動実行が可能です。
関連記事
Cloud Workflows と Cloud Scheduler については、以下の記事も参照してください。
この記事で作るもの
概要
当記事では Cloud Workflows を使って、BigQuery のデータに対して SQL を実行し、ELT 処理を行うワークフローを構築してみます。
当記事で紹介する手法では、BigQuery Data Transfer API を利用するため、少し改修すれば「Amazon S3 からデータを BigQuery に取り込み。その後、 ELT 処理を実行」というパイプラインも実現可能です。

SQL 実行方法 1 - BigQuery API Connector
Cloud Workflows には Google Cloud の API を簡単に呼び出すための各種コネクタが存在します。BigQuery API Connector はその1つであり、簡単に BigQuery に対して SQL を実行できます。
ただし、このコネクタを使うとワークフローの yaml に直接 SQL を記述する必要があり、保守性が悪いため、今回はこの方法は採用しません。
SQL 実行方法 2 - Scheduled Query
BigQuery の Scheduled query(スケジュールされたクエリ)機能では、BigQuery に保存した SQL を任意のタイミングで呼び出すことができます。
- 参考 : クエリのスケジューリング
Scheduled query は本来、定期的に SQL を自動実行するための BigQuery 備え付けの機能です。しかし呼び出されたときだけ SQL を実行する「オンデマンド」としての保存もできます。今回のワークフローでは、Scheduled query を呼び出す方法を採用します。

Scheduled Query の呼び出し方
Scheduled Query は BigQuery Data Transfer API 経由で呼び出すことができます。
Cloud Workflows には BigQuery Data Transfer API をノーコードで呼び出せる BigQuery Data Transfer API Connector も存在しますが、記事を執筆した2022年6月現在で Preview 段階でした。
当記事では本番利用を想定して、ワークフローから Cloud Run functions の関数を呼び出し、その関数から BigQuery Data Transfer API へリクエストする形でデータパイプラインを実現していきます。
なお BigQuery Data Transfer API では、他にも Amazon S3 から BigQuery へデータを転送するジョブ等も呼び出せるため、当記事のワークフローで「Amazon S3 からデータを BigQuery に取り込み。その後、 ELT 処理を実行」といった処理も可能です。
ワークフローの作成
フロー図
この記事では、以下の図のようなワークフローを構築します。
以下の図では実行されるジョブが query_01
の一つだけですが、数珠つなぎにジョブを繋げることで、クエリの完了を待って次のクエリを実行することが可能です。

Cloud Run functions 関数
今回作成する Cloud Run functions 関数については、当記事では詳細は紹介しませんが、以下のような関数を想定しています。
No | 関数名 | 処理内容 | 戻り値 |
---|---|---|---|
1 | execute-sql | BigQuery Data Transfer API 経由で、与えられたリソース名の Scheduled Query を実行する | 実行したジョブ名 例: projects/1234567890/locations/asia-northeast1/transferConfigs/11111111-0000-1111-1111-111111111111/runs/99999999-0000-9999-9999-999999999999 |
2 | check-status | BigQuery Data Transfer API 経由で、与えられたジョブ名のジョブのステータスを確認する | ジョブのステータス 例: SUCEEDED / RUNNING / PENDING / FAILED 等 |
当記事ではこれらの関数が HTTP トリガ関数としてデプロイ済みである前提で解説します。
以下の資料も参考にしてください。
yaml ファイル
以下のような yaml ファイルを作成します。
main: steps: - initialize: assign: - query_01_name: "projects/1234567890/locations/asia-northeast1/transferConfigs/11111111-0000-1111-1111-111111111111" - query_01_timeout: 300 next: query_01 - query_01: steps: - execute_01: call: http.post args: url: https://asia-northeast1-your-project-name.cloudfunctions.net/execute-sql body: scheduled_query_name: ${query_01_name} auth: type: OIDC result: run_name - check_01: steps: - setup_01: assign: - elapsed_time: 0 - timeout: ${query_01_timeout} - check_status_01: call: http.post args: url: https://asia-northeast1-your-project-name.cloudfunctions.net/check-status body: run_name: ${run_name.body} auth: type: OIDC result: status - check_if_complete_01: switch: - condition: ${status.body == "SUCCEEDED"} next: finalize - condition: ${elapsed_time >= timeout} raise: "Job timed out" - condition: ${status.body == "RUNNING" OR status.body == "PENDING"} assign: - elapsed_time: ${elapsed_time+10} next: wait_01 - condition: True raise: "Job error" - wait_01: call: sys.sleep args: seconds: 10 next: check_status_01 - finalize: return: "SUCEEDED"
7 行目の以下の行は、実際の Scheduled Query として保存されたオンデマンドクエリのリソース名で置き換えてください。
query_01_name: "projects/1234567890/locations/asia-northeast1/transferConfigs/11111111-0000-1111-1111-111111111111"
15 行目および 31 行目の以下の 2 行は Cloud Fucntions 関数 (HTTP トリガ) の呼び出し URL ですので、実際に構築した Cloud Run functions のものに置き換えてください。
url: https://asia-northeast1-your-project-name.cloudfunctions.net/execute-sql
url: https://asia-northeast1-your-project-name.cloudfunctions.net/check-status
実行の流れ
冒頭のフロー図に示したとおり、これにより以下のワークフローが定義されます。
- execute-sql 関数が実行されジョブが投入される
- check-status 関数が実行されジョブのステータスが確認される
- ステータスにより分岐 (以下の順で評価)
- ステータスが
SUCEEDED
であればジョブが正常終了 - 設定したタイムアウト秒数を超えていればワークフローが異常終了
- ステータスが
PENDING
やRUNNING
であれば 10 秒待機 - 10 秒間待機
- ステータスが
- check-status 関数の実行まで戻る
ワークフローのデプロイ
yaml ファイルができたら、以下のコマンドを実行します。
gcloud workflows deploy example-workflow \ --location=asia-northeast1 \ --source=example-workflow.yaml \ --service-account=hogehoge@hogehoge.gserviceaccount.com
4 行目の --service-account=hogehoge@hogehoge.gserviceaccount.com
は、Cloud Run functions を起動する権限のあるサービスアカウントに置き換えてください。
ワークフローの実行
以下のコマンドにより、デプロイしたワークフローが実行可能です。
gcloud workflows run example-workflow --location=asia-northeast1 --call-log-level=log-errors-only
ログレベルの設定など詳細は、以下のドキュメントをご確認ください。このコマンドでは、各ステップの実行履歴はログ出力されず、エラーが起きた場合のみログが Cloud Logging に出力されます。
- 参考 : ワークフローを実行する - gcloud
なお、Cloud Workflows のコンソール画面からもワークフローのデプロイや実行が可能なほか、実行状態を確認することができます。
gcloud コマンドで実行した場合は、ワークフロー全体が完了するまでプロンプトが戻らないので、適宜コンソールで状態を確認したり、別ターミナルからステータスを確認するコマンドを実行してください。
留意事項
データパイプラインをはじめとする自動化ジョブでは、エラー時の検知やリトライを検討する必要があります。
当記事でご紹介したワークフローはあくまで簡易的なものです。実際の業務では以下を考慮してください。
- 冪等性(リトライ性)
- エラーの検知
ワークフローが失敗した際に、シンプルにワークフローを再実行すれば良いように、冪等性(何度実行しても同じ結果になる性質)が担保されるように変換処理を設計することが望ましいといえます。
また、エラー検知に関しては、ワークフローのエラーを Cloud Logging に出力させて、Cloud Monitoring で検知してメールや Slack へ通知する等が考えられます。
当記事のワークフローでは、関数のエラーや例外をワークフロー内でハンドリングしていません。つまり、関数が失敗しても、戻り値が返ると次のステップが実行されてしまいます。Cloud Workflows にはエラーや例外をハンドリングする仕組みもありますので、これの利用を検討してください。
- 参考 : Workflow errors
杉村 勇馬 (記事一覧)
執行役員 CTO / クラウドソリューション部 部長
元警察官という経歴を持つ現 IT エンジニア。クラウド管理・運用やネットワークに知見。AWS 認定資格および Google Cloud 認定資格はすべて取得。X(旧 Twitter)では Google Cloud や Google Workspace のアップデート情報をつぶやいています。
Follow @y_sugi_it