Workflowsで実行周期の異なるジョブを並列に配置する

記事タイトルとURLをコピーする

G-gen の佐々木です。当記事では Google Cloud のジョブ自動化サービスである Workflows のユースケースとして、実行周期の異なる2つのジョブを、同一のワークフローに並列配置する方法を解説します。

はじめに

Workflows とは

Workflows(Cloud Workflows)は Google Cloud のジョブ自動化サービスであり、ユーザーが定義したワークフローをフルマネージドかつサーバーレスな環境で実行できるサービスです。

Workflows の詳細については、以下の記事をご一読ください。

blog.g-gen.co.jp

当記事の構成

当記事では、1つのワークフローで3つの Cloud Run jobs を実行する構成を想定します。Cloud Run jobs の各ジョブはジョブ A、ジョブ B、ジョブ C とします。

このワークフローでは、まずジョブ A が処理を行い、その後ジョブ B とジョブ C が別々に実行されます。それぞれがジョブ A の処理結果に対して異なる処理を行います。

ポイントとして、ここではジョブ B、ジョブ C の処理にはジョブ A の最新の実行結果が必要という要件を想定しています。例えば、ジョブ A がデータベースから情報を取得し、加工した結果を Cloud Storage バケットに格納するようなケースです。ジョブ B、ジョブ C はそれぞれジョブ A の結果を取得して、何かしらの処理を行います。

当記事で想定している構成の例

そして、ジョブ B は30分周期、ジョブ C は60分周期で実行しなければならないとします。したがって、ワークフロー全体は30分おきに実行されますが、60分周期のジョブ C は2回のワークフロー実行のうち1回だけ処理を行うようにする必要があります。

これを実現するために、実際の構成は以下のようにします。

2つのスケジューラからワークフローを実行し、スケジューラから渡される引数でジョブ C の実行要否を判定する

この構成では、2つの Cloud Scheduler ジョブからワークフローを実行しています。ワークフローは、どちらのスケジューラから実行されたかを区別できる情報を引数として受け取ります。この引数を元に、60分周期のジョブ C を実行するかどうかを判定します。

Cloud Run ジョブの作成

使用するコード

先ほどは Cloud Storage を使用してデータのやり取りをするケースを例に挙げましたが、当記事ではワークフロー定義の方法を解説するため、各 Cloud Run ジョブの具体的な実装は省略します。

ジョブが2つのスケジューラのどちらから実行されたかを明確にするため、スケジューラからワークフローに渡された引数を、ジョブの環境変数として受け取ってログ出力します。

// 環境変数の値をログ出力するジョブ(main.go)
package main
  
import (
    "log"
    "os"
)
  
func main() {
    name := os.Getenv("NAME")  // 環境変数からジョブ名を取得
    schedule := os.Getenv("SCHEDULE")  // 環境変数から実行周期を取得
    log.Printf("%s: %s\n", name, schedule)  // ログに出力
}

Cloud Run jobs には、ジョブに設定した環境変数をオーバーライドしてからジョブ実行する機能があります。これを利用して、ジョブの実行ごとに、スケジューラからワークフローに渡された引数を環境変数で受け取ります。

Cloud Run ジョブのデプロイ

3つのジョブ(A、B、C)をデプロイします。当記事ではジョブの詳細な実装は行わないため、同じコードを使用してそれぞれのジョブを作成します。

当記事ではコンテナイメージを事前に用意せず、ソースコードからジョブをデプロイします。

コードが存在するディレクトリ上で以下のコマンドを実行します。

# ジョブ A の作成
$ gcloud run jobs deploy wf-job-a \
    --region asia-northeast1 \
    --source . \
    --set-env-vars=NAME="wf-job-a",SCHEDULE=""
# ジョブ B の作成
$ gcloud run jobs deploy wf-job-b \
    --region asia-northeast1 \
    --source . \
    --set-env-vars=NAME="wf-job-b",SCHEDULE=""
# ジョブ C の作成
$ gcloud run jobs deploy wf-job-c \
    --region asia-northeast1 \
    --source . \
    --set-env-vars=NAME="wf-job-c",SCHEDULE=""

各ジョブに設定する SCHEDULE 環境変数は、ワークフローから値をオーバーライドするため空にしておきます。

ワークフローの定義

シンプルな並列実行ワークフローの作成

ワークフロー定義

まずは、周期のズレを考慮せずに、ジョブ A を実行したあとジョブ B とジョブ C を単純に並列実行するワークフローを作成してみます。

wf-parallel.yaml として、以下の内容でワークフロー定義ファイル(YAML 形式)を作成します。

# wf-parallel.yaml
# ジョブAの実行後にジョブB、ジョブCを並列実行するワークフロー
main:
  params: [args]
  steps:
    - define_common_variables:
        assign:
          - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}  # プロジェクトIDを取得して共通変数に設定
          - location: "asia-northeast1"
          - results: ["", "", ""]
  
    - run_job_A:
        call: googleapis.run.v1.namespaces.jobs.run
        args:
          name: ${"namespaces/" + project_id + "/jobs/" + "wf-job-a"}  # ジョブAの名前を指定
          location: ${location}
        result: job_A_result
    - set_result_a:
        assign:
          - results[0]: ${job_A_result.metadata.name}
  
    - run_jobs_B_and_C_in_parallel:
        parallel:  # 並列実行するジョブを定義
          shared: [results]
          branches:
            - run_job_B_branch:  # ジョブBを実行するブランチ
                steps:
                  - call_job_B:
                      call: googleapis.run.v1.namespaces.jobs.run
                      args:
                        name: ${"namespaces/" + project_id + "/jobs/" + "wf-job-b"}  # ジョブBの名前を指定
                        location: ${location}
                      result: job_B_result
                  - set_result_b:
                      assign:
                        - results[1]: ${job_B_result.metadata.name}
            - run_job_C_branch:  # ジョブCを実行するブランチ
                steps:
                  - call_job_C:
                      call: googleapis.run.v1.namespaces.jobs.run
                      args:
                        name: ${"namespaces/" + project_id + "/jobs/" + "wf-job-c"}  # ジョブCの名前を指定
                        location: ${location}
                      result: job_C_result
                  - set_result_c:
                      assign:
                        - results[2]: ${job_C_result.metadata.name}
  
    - finalStep:
        return:
          job_A_execution: ${results[0]}
          job_B_execution: ${results[1]}
          job_C_execution: ${results[2]}

ワークフローの作成

以下のコマンドでワークフローを作成します。--source には先ほど作成したワークフロー定義ファイルを指定します。

$ gcloud workflows deploy wf-parallel \
    --location=asia-northeast1 \
    --source=./wf-parallel.yaml

作成したワークフローは以下のような構成になります。

ジョブB、ジョブCを同じ周期で並列実行するワークフロー

動作確認

ワークフローを手動で実行してみます。このワークフローではジョブ A の実行後にジョブ B、ジョブ C が並列実行され、それぞれのジョブの ID(Cloud Run jobs の実行 ID)が最終的に出力されます。

ワークフローを手動実行した結果

周期の異なる並列実行ワークフローの作成

ワークフロー定義

それでは本題として、実行周期の異なる2つのジョブ(B、C)を並列配置するワークフローを定義します。

wf-parallel-diff-cycles.yaml として、以下の内容でワークフロー定義ファイル(YAML 形式)を作成します。

# wf-parallel-diff-cycles.yaml
# ジョブAの実行後にジョブB、ジョブCを並列実行するワークフロー
# 引数scheduleの値によってジョブCを実行しない場合がある
main:
  params: [args]
  steps:
    - define_common_variables:
        assign:
          - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}  # プロジェクトIDを取得して共通変数に設定
          - location: "asia-northeast1"
          - results: ["", "", ""]
  
    - run_job_A:
        call: googleapis.run.v1.namespaces.jobs.run
        args:
          name: ${"namespaces/" + project_id + "/jobs/" + "wf-job-a"}  # ジョブAの名前を指定
          location: ${location}
          body:
            overrides:  # オーバーライドするジョブ環境変数
              containerOverrides:
                - env:
                    - name: "SCHEDULE"
                      value: ${args.schedule}  # ワークフロー実行時に渡されたschedule引数を受け取る
        result: job_A_result
    - set_result_a:
        assign:
          - results[0]: ${job_A_result.metadata.name}
  
    - run_jobs_B_and_C_in_parallel:
        parallel:  # 並列実行するジョブを定義
          shared: [results]
          branches:
            - run_job_B_branch:  # ジョブBを実行するブランチ
                steps:
                  - call_job_B:
                      call: googleapis.run.v1.namespaces.jobs.run
                      args:
                        name: ${"namespaces/" + project_id + "/jobs/" + "wf-job-b"}  # ジョブBの名前を指定
                        location: ${location}
                        body:
                          overrides:  # オーバーライドするジョブ環境変数
                            containerOverrides:
                              - env:
                                  - name: "SCHEDULE"
                                    value: ${args.schedule}  # ワークフロー実行時に渡されたschedule引数を受け取る
                      result: job_B_result
                  - set_result_b:
                      assign:
                        - results[1]: ${job_B_result.metadata.name}
            - run_job_C_branch:  # ジョブCを実行するブランチ
                steps:
                  - conditional_run_of_job_c:
                      switch:  # schedule引数の値によってジョブCを実行するかどうかを判定する
                        - condition: ${args.schedule=="00"}  # schedule引数の値が"00"の場合のみジョブCを実行する
                          steps:
                            - call_job_C:
                                call: googleapis.run.v1.namespaces.jobs.run
                                args:
                                  name: ${"namespaces/" + project_id + "/jobs/" + "wf-job-c"}  # ジョブCの名前を指定
                                  location: ${location}
                                  body:
                                    overrides:  # オーバーライドするジョブ環境変数
                                      containerOverrides:
                                        - env:
                                            - name: "SCHEDULE"
                                              value: ${args.schedule}  # ワークフロー実行時に渡されたschedule引数を受け取る
                                result: job_C_result
                            - set_result_c:
                                assign:
                                  - results[2]: ${job_C_result.metadata.name}
  
    - finalStep:
        return:
          job_A_execution: ${results[0]}
          job_B_execution: ${results[1]}
          job_C_execution: ${if(results[2] == "", "SKIPPED", results[2])}  # ジョブCを実行しなかった場合は"SKIPPED"を返す

このワークフローでは、ワークフロー実行時の引数として schedule をキーとする値を受け取り、ジョブ C の実行ブランチで switch 文を使うことで、schedule の値によってジョブ C を実行するかどうかを判定します。ここでは値が 00 のときだけジョブ C を実行するように定義しています。

また、当記事では schedule の値を Cloud Run ジョブの環境変数にオーバーライドし、ジョブ内でその値を処理できるようにしています。

ワークフローの作成

以下のコマンドでワークフローを作成します。--source には新たに作成したワークフロー定義ファイルを指定します。

$ gcloud workflows deploy wf-parallel-diff-cycles \
    --location=asia-northeast1 \
    --source=./wf-parallel-diff-cycles.yaml

作成したワークフローは以下のような構成になります。ジョブ C 側のブランチで、ジョブ C 実行(call_job_C)の前に switch による分岐ができています。

並列配置されたジョブB、ジョブCを異なる周期で実行できるワークフロー

動作確認

こちらのワークフローも手動で実行してみます。

まず、ジョブ A の実行後にジョブ B とジョブ C を並列実行するパターンを試します。以下の入力をワークフローに渡して処理を実行します。

{"schedule": "00"}

ジョブ B とジョブ C を並列実行するパターンのワークフロー実行

ジョブA の実行後にジョブB とジョブ C が並列で実行され、それぞれのジョブの ID(Cloud Run jobs の実行 ID)が最終的に出力されています。

ジョブ B とジョブ C が並列実行されている

次に、以下の入力でワークフローを実行してみます。schedule 引数の値が 00 ではないため、ジョブ C の実行がスキップされる想定です。

{"schedule": "30"}

ワークフローを実行すると、ジョブ A の実行後にジョブ B が実行されますが、ジョブ C はスキップされていることがわかります。ジョブ C はスキップされたため、ワークフロー出力にはジョブ ID の代わりに SKIPPED という文字列を出力しています。

ジョブ B は実行されるが、ジョブ C がスキップされている

Cloud Scheduler ジョブの作成

作成するジョブ(スケジューラ)について

Cloud Scheduler を使用して、ワークフローを定期的にトリガーするジョブ(Cloud Run ジョブと区別するため以降はスケジューラと記載)を作成します。

想定している要件ではジョブ B を30分周期、ジョブ C を60分周期で実行しなければならないため、ワークフローをトリガーする周期を調整した2つのスケジューラを作成します。

スケジューラ名 トリガー周期 ワークフローに渡す引数 ワークフローで実行される Cloud Run ジョブ
wf-trigger-every-00m 毎時0分 {"schedule": "00"} ジョブA、ジョブB、ジョブC
wf-trigger-every-30m 毎時30分 {"schedule": "30"} ジョブA、ジョブB

この2つのスケジューラは1時間おきに実行されますが、それぞれ毎時0分、毎時30分に実行されます。したがって、ワークフローは30分に1回実行されることになります。

ジョブ C の実行要否を判断する引数をスケジューラからワークフローに渡すことで、毎時0分のワークフローではジョブ C を実行し、毎時30分のワークフローではジョブ C をスキップします。これにより、ジョブ B を30分周期で実行しつつ、同じワークフロー上のジョブ C を60分周期で実行することができます。

毎時0分のワークフロー実行ではジョブ B、ジョブ C ともに実行される

毎時30分のワークフロー実行ではジョブ C が実行されない

スケジューラの作成

以下のコマンドで、ワークフローを呼び出すスケジューラを2つ作成します。当記事ではスケジューラに紐づけるサービスアカウントとして Compute Engine のデフォルトのサービス アカウントを使用していますが、本番運用する場合はワークフローの実行権限のみ付与したカスタムサービスアカウントの使用を推奨します。

# 毎時0分に実行されるスケジューラ
$ gcloud scheduler jobs create http wf-trigger-every-00m \
    --location=asia-northeast1 \
    --schedule="0 * * * *" \
    --uri="https://workflowexecutions.googleapis.com/v1/projects/<プロジェクトID>/locations/asia-northeast1/workflows/wf-parallel-diff-cycles/executions" \
    --message-body="{\"argument\": \"{\\\"schedule\\\": \\\"00\\\"}\"}" \
    --time-zone="Asia/Tokyo" \
    --oauth-service-account-email="<プロジェクト番号>-compute@developer.gserviceaccount.com"
# 毎時30分に実行されるスケジューラ
$ gcloud scheduler jobs create http wf-trigger-every-30m \
    --location=asia-northeast1 \
    --schedule="30 * * * *" \
    --uri="https://workflowexecutions.googleapis.com/v1/projects/<プロジェクトID>/locations/asia-northeast1/workflows/wf-parallel-diff-cycles/executions" \
    --message-body="{\"argument\": \"{\\\"schedule\\\": \\\"30\\\"}\"}" \
    --time-zone="Asia/Tokyo" \
    --oauth-service-account-email="<プロジェクト番号>-compute@developer.gserviceaccount.com"

動作確認

Cloud Run ジョブのコードは、以下のようにジョブの名前とスケジューラからワークフローに渡された schedule 引数をログ出力するように実装されています。

// main.go 抜粋
func main() {
    name := os.Getenv("NAME")  // 環境変数からジョブ名を取得
    schedule := os.Getenv("SCHEDULE")  // 環境変数から実行周期を取得
    log.Printf("%s: %s\n", name, schedule)  // ログに出力
}

Cloud Logging でジョブ B、ジョブ C のログ出力を確認することで、30分周期(毎時0分と毎時30分)でジョブ B が、60分周期(毎時0分)でジョブ C が実行されていることを確認できます。

ジョブ B が30分周期で実行されている(schedule 引数が "00" と "30" のとき)

ジョブ C が60分周期で実行されている(schedule 引数が "00" のとき)

佐々木 駿太 (記事一覧)

G-gen 最北端、北海道在住のクラウドソリューション部エンジニア

2022年6月に G-gen にジョイン。Google Cloud Partner Top Engineer に選出(2024 / 2025 Fellow / 2026)。好きな Google Cloud プロダクトは Cloud Run。

趣味はコーヒー、小説(SF、ミステリ)、カラオケなど。