G-gen の佐々木です。当記事では Google Cloud のジョブ自動化サービスである Workflows のユースケースとして、実行周期の異なる2つのジョブを、同一のワークフローに並列配置する方法を解説します。

はじめに
Workflows とは
Workflows(Cloud Workflows)は Google Cloud のジョブ自動化サービスであり、ユーザーが定義したワークフローをフルマネージドかつサーバーレスな環境で実行できるサービスです。
Workflows の詳細については、以下の記事をご一読ください。
当記事の構成
当記事では、1つのワークフローで3つの Cloud Run jobs を実行する構成を想定します。Cloud Run jobs の各ジョブはジョブ A、ジョブ B、ジョブ C とします。
このワークフローでは、まずジョブ A が処理を行い、その後ジョブ B とジョブ C が別々に実行されます。それぞれがジョブ A の処理結果に対して異なる処理を行います。
ポイントとして、ここではジョブ B、ジョブ C の処理にはジョブ A の最新の実行結果が必要という要件を想定しています。例えば、ジョブ A がデータベースから情報を取得し、加工した結果を Cloud Storage バケットに格納するようなケースです。ジョブ B、ジョブ C はそれぞれジョブ A の結果を取得して、何かしらの処理を行います。

そして、ジョブ B は30分周期、ジョブ C は60分周期で実行しなければならないとします。したがって、ワークフロー全体は30分おきに実行されますが、60分周期のジョブ C は2回のワークフロー実行のうち1回だけ処理を行うようにする必要があります。
これを実現するために、実際の構成は以下のようにします。

この構成では、2つの Cloud Scheduler ジョブからワークフローを実行しています。ワークフローは、どちらのスケジューラから実行されたかを区別できる情報を引数として受け取ります。この引数を元に、60分周期のジョブ C を実行するかどうかを判定します。
Cloud Run ジョブの作成
使用するコード
先ほどは Cloud Storage を使用してデータのやり取りをするケースを例に挙げましたが、当記事ではワークフロー定義の方法を解説するため、各 Cloud Run ジョブの具体的な実装は省略します。
ジョブが2つのスケジューラのどちらから実行されたかを明確にするため、スケジューラからワークフローに渡された引数を、ジョブの環境変数として受け取ってログ出力します。
// 環境変数の値をログ出力するジョブ(main.go) package main import ( "log" "os" ) func main() { name := os.Getenv("NAME") // 環境変数からジョブ名を取得 schedule := os.Getenv("SCHEDULE") // 環境変数から実行周期を取得 log.Printf("%s: %s\n", name, schedule) // ログに出力 }
Cloud Run jobs には、ジョブに設定した環境変数をオーバーライドしてからジョブ実行する機能があります。これを利用して、ジョブの実行ごとに、スケジューラからワークフローに渡された引数を環境変数で受け取ります。
Cloud Run ジョブのデプロイ
3つのジョブ(A、B、C)をデプロイします。当記事ではジョブの詳細な実装は行わないため、同じコードを使用してそれぞれのジョブを作成します。
当記事ではコンテナイメージを事前に用意せず、ソースコードからジョブをデプロイします。
コードが存在するディレクトリ上で以下のコマンドを実行します。
# ジョブ A の作成 $ gcloud run jobs deploy wf-job-a \ --region asia-northeast1 \ --source . \ --set-env-vars=NAME="wf-job-a",SCHEDULE=""
# ジョブ B の作成 $ gcloud run jobs deploy wf-job-b \ --region asia-northeast1 \ --source . \ --set-env-vars=NAME="wf-job-b",SCHEDULE=""
# ジョブ C の作成 $ gcloud run jobs deploy wf-job-c \ --region asia-northeast1 \ --source . \ --set-env-vars=NAME="wf-job-c",SCHEDULE=""
各ジョブに設定する SCHEDULE 環境変数は、ワークフローから値をオーバーライドするため空にしておきます。
ワークフローの定義
シンプルな並列実行ワークフローの作成
ワークフロー定義
まずは、周期のズレを考慮せずに、ジョブ A を実行したあとジョブ B とジョブ C を単純に並列実行するワークフローを作成してみます。
wf-parallel.yaml として、以下の内容でワークフロー定義ファイル(YAML 形式)を作成します。
# wf-parallel.yaml # ジョブAの実行後にジョブB、ジョブCを並列実行するワークフロー main: params: [args] steps: - define_common_variables: assign: - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")} # プロジェクトIDを取得して共通変数に設定 - location: "asia-northeast1" - results: ["", "", ""] - run_job_A: call: googleapis.run.v1.namespaces.jobs.run args: name: ${"namespaces/" + project_id + "/jobs/" + "wf-job-a"} # ジョブAの名前を指定 location: ${location} result: job_A_result - set_result_a: assign: - results[0]: ${job_A_result.metadata.name} - run_jobs_B_and_C_in_parallel: parallel: # 並列実行するジョブを定義 shared: [results] branches: - run_job_B_branch: # ジョブBを実行するブランチ steps: - call_job_B: call: googleapis.run.v1.namespaces.jobs.run args: name: ${"namespaces/" + project_id + "/jobs/" + "wf-job-b"} # ジョブBの名前を指定 location: ${location} result: job_B_result - set_result_b: assign: - results[1]: ${job_B_result.metadata.name} - run_job_C_branch: # ジョブCを実行するブランチ steps: - call_job_C: call: googleapis.run.v1.namespaces.jobs.run args: name: ${"namespaces/" + project_id + "/jobs/" + "wf-job-c"} # ジョブCの名前を指定 location: ${location} result: job_C_result - set_result_c: assign: - results[2]: ${job_C_result.metadata.name} - finalStep: return: job_A_execution: ${results[0]} job_B_execution: ${results[1]} job_C_execution: ${results[2]}
ワークフローの作成
以下のコマンドでワークフローを作成します。--source には先ほど作成したワークフロー定義ファイルを指定します。
$ gcloud workflows deploy wf-parallel \ --location=asia-northeast1 \ --source=./wf-parallel.yaml
作成したワークフローは以下のような構成になります。

動作確認
ワークフローを手動で実行してみます。このワークフローではジョブ A の実行後にジョブ B、ジョブ C が並列実行され、それぞれのジョブの ID(Cloud Run jobs の実行 ID)が最終的に出力されます。

周期の異なる並列実行ワークフローの作成
ワークフロー定義
それでは本題として、実行周期の異なる2つのジョブ(B、C)を並列配置するワークフローを定義します。
wf-parallel-diff-cycles.yaml として、以下の内容でワークフロー定義ファイル(YAML 形式)を作成します。
# wf-parallel-diff-cycles.yaml # ジョブAの実行後にジョブB、ジョブCを並列実行するワークフロー # 引数scheduleの値によってジョブCを実行しない場合がある main: params: [args] steps: - define_common_variables: assign: - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")} # プロジェクトIDを取得して共通変数に設定 - location: "asia-northeast1" - results: ["", "", ""] - run_job_A: call: googleapis.run.v1.namespaces.jobs.run args: name: ${"namespaces/" + project_id + "/jobs/" + "wf-job-a"} # ジョブAの名前を指定 location: ${location} body: overrides: # オーバーライドするジョブ環境変数 containerOverrides: - env: - name: "SCHEDULE" value: ${args.schedule} # ワークフロー実行時に渡されたschedule引数を受け取る result: job_A_result - set_result_a: assign: - results[0]: ${job_A_result.metadata.name} - run_jobs_B_and_C_in_parallel: parallel: # 並列実行するジョブを定義 shared: [results] branches: - run_job_B_branch: # ジョブBを実行するブランチ steps: - call_job_B: call: googleapis.run.v1.namespaces.jobs.run args: name: ${"namespaces/" + project_id + "/jobs/" + "wf-job-b"} # ジョブBの名前を指定 location: ${location} body: overrides: # オーバーライドするジョブ環境変数 containerOverrides: - env: - name: "SCHEDULE" value: ${args.schedule} # ワークフロー実行時に渡されたschedule引数を受け取る result: job_B_result - set_result_b: assign: - results[1]: ${job_B_result.metadata.name} - run_job_C_branch: # ジョブCを実行するブランチ steps: - conditional_run_of_job_c: switch: # schedule引数の値によってジョブCを実行するかどうかを判定する - condition: ${args.schedule=="00"} # schedule引数の値が"00"の場合のみジョブCを実行する steps: - call_job_C: call: googleapis.run.v1.namespaces.jobs.run args: name: ${"namespaces/" + project_id + "/jobs/" + "wf-job-c"} # ジョブCの名前を指定 location: ${location} body: overrides: # オーバーライドするジョブ環境変数 containerOverrides: - env: - name: "SCHEDULE" value: ${args.schedule} # ワークフロー実行時に渡されたschedule引数を受け取る result: job_C_result - set_result_c: assign: - results[2]: ${job_C_result.metadata.name} - finalStep: return: job_A_execution: ${results[0]} job_B_execution: ${results[1]} job_C_execution: ${if(results[2] == "", "SKIPPED", results[2])} # ジョブCを実行しなかった場合は"SKIPPED"を返す
このワークフローでは、ワークフロー実行時の引数として schedule をキーとする値を受け取り、ジョブ C の実行ブランチで switch 文を使うことで、schedule の値によってジョブ C を実行するかどうかを判定します。ここでは値が 00 のときだけジョブ C を実行するように定義しています。
また、当記事では schedule の値を Cloud Run ジョブの環境変数にオーバーライドし、ジョブ内でその値を処理できるようにしています。
ワークフローの作成
以下のコマンドでワークフローを作成します。--source には新たに作成したワークフロー定義ファイルを指定します。
$ gcloud workflows deploy wf-parallel-diff-cycles \ --location=asia-northeast1 \ --source=./wf-parallel-diff-cycles.yaml
作成したワークフローは以下のような構成になります。ジョブ C 側のブランチで、ジョブ C 実行(call_job_C)の前に switch による分岐ができています。

動作確認
こちらのワークフローも手動で実行してみます。
まず、ジョブ A の実行後にジョブ B とジョブ C を並列実行するパターンを試します。以下の入力をワークフローに渡して処理を実行します。
{"schedule": "00"}

ジョブA の実行後にジョブB とジョブ C が並列で実行され、それぞれのジョブの ID(Cloud Run jobs の実行 ID)が最終的に出力されています。

次に、以下の入力でワークフローを実行してみます。schedule 引数の値が 00 ではないため、ジョブ C の実行がスキップされる想定です。
{"schedule": "30"}
ワークフローを実行すると、ジョブ A の実行後にジョブ B が実行されますが、ジョブ C はスキップされていることがわかります。ジョブ C はスキップされたため、ワークフロー出力にはジョブ ID の代わりに SKIPPED という文字列を出力しています。

Cloud Scheduler ジョブの作成
作成するジョブ(スケジューラ)について
Cloud Scheduler を使用して、ワークフローを定期的にトリガーするジョブ(Cloud Run ジョブと区別するため以降はスケジューラと記載)を作成します。
想定している要件ではジョブ B を30分周期、ジョブ C を60分周期で実行しなければならないため、ワークフローをトリガーする周期を調整した2つのスケジューラを作成します。
| スケジューラ名 | トリガー周期 | ワークフローに渡す引数 | ワークフローで実行される Cloud Run ジョブ |
|---|---|---|---|
| wf-trigger-every-00m | 毎時0分 | {"schedule": "00"} |
ジョブA、ジョブB、ジョブC |
| wf-trigger-every-30m | 毎時30分 | {"schedule": "30"} |
ジョブA、ジョブB |
この2つのスケジューラは1時間おきに実行されますが、それぞれ毎時0分、毎時30分に実行されます。したがって、ワークフローは30分に1回実行されることになります。
ジョブ C の実行要否を判断する引数をスケジューラからワークフローに渡すことで、毎時0分のワークフローではジョブ C を実行し、毎時30分のワークフローではジョブ C をスキップします。これにより、ジョブ B を30分周期で実行しつつ、同じワークフロー上のジョブ C を60分周期で実行することができます。


スケジューラの作成
以下のコマンドで、ワークフローを呼び出すスケジューラを2つ作成します。当記事ではスケジューラに紐づけるサービスアカウントとして Compute Engine のデフォルトのサービス アカウントを使用していますが、本番運用する場合はワークフローの実行権限のみ付与したカスタムサービスアカウントの使用を推奨します。
# 毎時0分に実行されるスケジューラ $ gcloud scheduler jobs create http wf-trigger-every-00m \ --location=asia-northeast1 \ --schedule="0 * * * *" \ --uri="https://workflowexecutions.googleapis.com/v1/projects/<プロジェクトID>/locations/asia-northeast1/workflows/wf-parallel-diff-cycles/executions" \ --message-body="{\"argument\": \"{\\\"schedule\\\": \\\"00\\\"}\"}" \ --time-zone="Asia/Tokyo" \ --oauth-service-account-email="<プロジェクト番号>-compute@developer.gserviceaccount.com"
# 毎時30分に実行されるスケジューラ $ gcloud scheduler jobs create http wf-trigger-every-30m \ --location=asia-northeast1 \ --schedule="30 * * * *" \ --uri="https://workflowexecutions.googleapis.com/v1/projects/<プロジェクトID>/locations/asia-northeast1/workflows/wf-parallel-diff-cycles/executions" \ --message-body="{\"argument\": \"{\\\"schedule\\\": \\\"30\\\"}\"}" \ --time-zone="Asia/Tokyo" \ --oauth-service-account-email="<プロジェクト番号>-compute@developer.gserviceaccount.com"
- 参考 : Schedule a workflow using Cloud Scheduler
- 参考 : gcloud scheduler jobs create
- 参考 : Service accounts - Compute Engine default service account
動作確認
Cloud Run ジョブのコードは、以下のようにジョブの名前とスケジューラからワークフローに渡された schedule 引数をログ出力するように実装されています。
// main.go 抜粋 func main() { name := os.Getenv("NAME") // 環境変数からジョブ名を取得 schedule := os.Getenv("SCHEDULE") // 環境変数から実行周期を取得 log.Printf("%s: %s\n", name, schedule) // ログに出力 }
Cloud Logging でジョブ B、ジョブ C のログ出力を確認することで、30分周期(毎時0分と毎時30分)でジョブ B が、60分周期(毎時0分)でジョブ C が実行されていることを確認できます。


佐々木 駿太 (記事一覧)
G-gen 最北端、北海道在住のクラウドソリューション部エンジニア
2022年6月に G-gen にジョイン。Google Cloud Partner Top Engineer に選出(2024 / 2025 Fellow / 2026)。好きな Google Cloud プロダクトは Cloud Run。
趣味はコーヒー、小説(SF、ミステリ)、カラオケなど。
Follow @sasashun0805