G-genの杉村です。Google Cloud (旧称 GCP) のジョブ自動化サービスである Cloud Workflows(または単に Workflows)を解説します。
概要
Cloud Workflows とは
Cloud Workflows(または単に Workflows)はGoogle Cloud のジョブ自動化サービスです。フルマネージドかつサーバーレスであるためインフラの管理は必要なく、また非常に安価に利用できるのが特徴です。
Cloud Workflows のワークフローからは、定義した順番通りに Cloud Functions や Cloud Run などで定義したプログラムを実行したり、BigQuery へ SQL を発行したり、任意の HTTP エンドポイントへリクエストを送信することができます。
ワークフローは YAML または JSON フォーマットで定義され、順次実行のほか、条件分岐、繰り返し、並列実行などを定義できます。
実行契機としては Cloud Scheduler による定期実行のほか API 経由のオンデマンド実行も可能です。また実行時にはワークフローに引数を渡すことができます。
以下は、ワークフローの例です。
![](https://cdn-ak.f.st-hatena.com/images/fotolife/g/ggen-sugimura/20230322/20230322101105.png)
ユースケース
Cloud Workflows は以下のようなユースケースで利用されます。
No | ユースケース名 | 例 |
---|---|---|
1 | データパイプライン | 業務システムからデータ抽出・BigQuery へロード・SQL で変換 (ELT) |
2 | バッチジョブ | 日次でファイルを転送する / 機械学習ジョブを実行する |
3 | イベントドリブンジョブ | Cloud Storage にアップロードされた画像ファイルを変換し別バケットに格納後、メタデータを Firestore に書き込み |
- 参考 : 主なユースケース
料金
課金の仕組み
Cloud Workflows の料金は、実際に実行されたワークフローの ステップ数 に応じた従量課金です。
ステップ とは、ワークフローに定義された個別の手順です。例えば「Cloud Functions 関数 A を実行する」「A の処理が完了後、Cloud Functions 関数 B を実行する」というワークフローがあれば、これは計 2 ステップとなります。
後述しますが、ステップには内部ステップと外部ステップが存在します。内部ステップは 5,000回/月、外部ステップは 2,000回/月の無料枠があります。1ヶ月間に実行されたステップ数が無料枠内であれば、Cloud Workflows を完全無料で利用することができます。
無料枠を超過した後は、内部ステップは 1,000 ステップあたり $0.01、外部ステップは 1,000 ステップあたり $0.025 が課金されます。
なお上記は 2023 年 3 月現在の料金ですので、最新の料金は公式の料金表をご参照ください。
- 参考 : ワークフローの料金
内部ステップと外部ステップ
ステップは内部ステップと外部ステップに分けられます。
内部ステップ は *.googleapis.com
API や *.appspot.com
ドメインに送信されるリクエスト、Cloud Functions や Cloud Run の実行、またワークフロー内の変数割当や評価など、Google Cloud の世界の中で完結するステップを指します。
反対に 外部ステップ は Google Cloud の外部への HTTP API のリクエストや、Google Cloud リソースであってもカスタムドメインを使っているエンドポイントへのリクエストです。また コールバック待機 のステップも外部ステップとして扱われます。
前述の料金表の通り、外部ステップのほうが単価が高く設定されています。
- 参考 : 内部および外部のステップ
ワークフロー定義
定義の基本
ワークフローは YAML または JSON で定義します。文法のリファレンスは以下に用意されています。
- 参考 : Syntax overview
ワークフローの基本的な概念として以下があります。
No | 名称 | 説明 |
---|---|---|
1 | Step (ステップ/手順) | ステップで変数を定義したり、Cloud Functions をコールしたり、BigQuery へ SQL を投入する |
2 | Condition (条件) | switch 条件式によりワークフローを分岐 |
3 | Iteration (繰り返し) | for により繰り返し処理を実行 |
4 | Parallel step (並列ステップ) | parallel によりステップを並列実行 |
5 | Subworkflow (サブワークフロー) | プログラミング言語の関数のように、再利用可能なステップ群を定義して実行できる |
これらを組み合わせることで、順次・分岐・繰り返しを含む複雑なワークフローを構築することができます。
ランタイム引数
ワークフローの実行時に JSON 形式で引数を渡すことができます。
例えば、以下の 2 ステップのみのシンプルなワークフローに {"myName": "John Doe"}
という引数を渡して実行します。
main: params: [arguments] steps: - getName: assign: - result: ${"Hello, " + arguments.myName + "!"} - finalize: return: ${result}
すると、返り値は以下の通りとなります。
"Hello, John Doe!"
サービス呼び出し
各ステップでは以下のような呼び出しが可能です。
- HTTP API
- コネクタ
- 標準ライブラリ
- 環境変数
HTTP API は、ステップから HTTP リクエスト (GET/POST/PUT 等) を実行する方法です。Cloud Functions や Cloud Run の呼び出しも、これにより行います。
コネクタ は、Cloud Workflows が用意する各種 Google Cloud サービスへのコネクタです。BigQuery、Cloud Storage、Cloud Build、Dataflow、Compute Engine など多くの Google Cloud サービス用コネクタが用意されており、API コールを実行することができます。認証はワークフローに紐付けられたサービスアカウントの IAM 権限により行われます。
標準ライブラリ はワークフロー内で利用可能な処理です。数値を計算したり、テキストを結合・分解したり、base64 エンコード/デコードや wait を入れることも可能です。
環境変数 は Cloud Workflows によって用意された組み込みの変数です。プロジェクト ID やサービスアカウント名、ワークフローの実行 ID を得ることができます。
エラー処理
ワークフローではある程度のエラー処理が可能です。 raise
try
except
などをワークフロー内で利用できます。
参考 : Workflow errors
またエラーを Cloud Logging に出力させることで、そのログから文字列を検知してメールや Slack に発報するなどの監視も可能です。
ログの監視についての詳細は以下の記事もご参照ください。
待機
コールバック
ワークフロー内で何らかの処理の完了を待ってから次のステップへ進みたい場合、コールバックエンドポイント を利用することができます。
実行したサービスによる処理が完了したらワークフローのコールバックエンドポイントへリクエストを投げてもらうようにしておくことで、一時停止していたワークフローを再開できます。もちろん、タイムアウトを設定することが可能です。
スリープ
標準ライブラリに sys.sleep
が用意されており、一定時間待機することができます。
処理終了をポーリングして確認する繰り返し処理を組み合わせることで ポーリングによる待機 を実現することが可能です。ただしこの場合は、タイムアウト処理は自前で実装する必要があります。
なお後ほど紹介する 当社記事 では sys.sleep
と switch
による評価を組み合わせたポーリング待機処理を実装しています。
ワークフローの実行
実行契機 (トリガ)
Cloud Workflows のワークフローは以下をトリガにして実行することができます。
- 自動
- Cloud Scheduler による定期実行
- 手動
- Google Cloud コンソール
- Google Cloud CLI (gcloud コマンド)
- Cloud SDK クライアントライブラリ (Java, Node.js, Python...)
- REST API
Cloud Scheduler による定期実行
最も一般的な実行方法は、Cloud Scheduler のスケジュールによる定期的な実行です。
スケジュールにはサービスアカウントを設定し、roles/workflows.invoker
( ワークフロー起動元
) ロールを付与することで、ワークフローを起動する権限を与える必要があります。
またスケジュール実行時に、ワークフローに引数を渡すことができます。
詳細な手順は以下をご参照ください。
また Cloud Scheduler の詳細は、以下の当社記事もご参照ください。
イベントドリブン実行
Eventarc での実行
例えば Cloud Storage へのオブジェクト Put をトリガにワークフローを開始したい場合等は、Eventarc を利用することでイベンドリブンでワークフローを起動することが可能です。
Eventarc からは CloudEvents 形式 でイベントの内容がワークフローに引数として渡されます。
ワークフローに渡す引数を特定の形式に変更したい場合は後述の Cloud Functions を用いた起動方法を検討します。
![](https://cdn-ak.f.st-hatena.com/images/fotolife/g/ggen-sugimura/20230322/20230322101238.png)
Cloud Storage の例のほかにも、特定の監査ログ (Cloud Audit Logs) の出力をトリガにしてワークフローを起動するなど、Eventarc が対応している様々なイベントをトリガにしてワークフローを実行できます。
Cloud Functions での実行
Eventarc を使う方法のほかに、Cloud Functions からワークフローを起動することもできます。
以下は Cloud Storage へのオブジェクト Put をトリガとして Cloud Functions 関数が起動し、Cloud SDK クライアントライブラリを使って Cloud Workflows を実行する場合の例です。
![](https://cdn-ak.f.st-hatena.com/images/fotolife/g/ggen-sugimura/20230322/20230322101215.png)
実行履歴
ワークフローの実行履歴は、Cloud Workflows の Web UI や、REST API 経由で確認できます。
![](https://cdn-ak.f.st-hatena.com/images/fotolife/g/ggen-sugimura/20240120/20240120112118.png)
ワークフローのサンプル
サンプルコード
以下は YAML で定義したワークフローのサンプルです。2つの Cloud Functions 関数を順番にコールするシンプルなものです。最初の関数の戻り値を次の関数に渡しています。
main: steps: - initialize: assign: - my_parameter_01: "hogehoge" - execute_01: call: http.post args: url: https://asia-northeast1-your-project-name.cloudfunctions.net/my-function-01 body: my_parameter: ${my_parameter_01} auth: type: OIDC result: my_parameter_02 - execute_02: call: http.post args: url: https://asia-northeast1-your-project-name.cloudfunctions.net/my-function-02 body: my_parameter: ${my_parameter_02} auth: type: OIDC result: result - finalize: return: ${result}
公式のサンプルコード
様々なユースケースに応じたサンプルコードが公式に用意されていますのでご参照ください。
データパイプラインのサンプル
以下の記事で、Cloud Workflows + BigQuery (Scheduled Query / Data Transfer Service) を組み合わせた簡易的なデータパイプラインをご紹介しています。
Cloud Workflows の YAML も公開していますので、ご参照ください。
杉村 勇馬 (記事一覧)
執行役員 CTO / クラウドソリューション部 部長
元警察官という経歴を持つ現 IT エンジニア。クラウド管理・運用やネットワークに知見。AWS 12資格、Google Cloud認定資格11資格。X (旧 Twitter) では Google Cloud や AWS のアップデート情報をつぶやいています。
Follow @y_sugi_it