Dataflowを徹底解説!

記事タイトルとURLをコピーする

G-gen の佐々木です。当記事では、Google Cloud(旧称 GCP)のマネージド ETL サービスである Dataflow を解説します。

概要

Dataflow とは

Dataflow は Google Cloud のインフラストラクチャ上でマネージドな Apache Beam 実行環境を提供するサービスです。Java や Python の SDK でパイプラインを記述することで、データをバッチ処理やストリーミング処理することができます。

Dataflow では Apache Beam SDK を使用してデータ処理のパイプライン(データの読み込みから変換、書き込みまでの一連の処理)をコードで記述します。それを Dataflow ジョブ として Google Cloud にデプロイするだけで、処理内容に応じて必要なぶんのコンピューティングリソースが自動でプロビジョニングされます。

したがって、ユーザーはパイプラインを実行するためのインフラストラクチャをほとんど意識することなく、パイプラインの設計のみに集中することができます。

Apache Beam とは

Apache Beam はバッチ処理、ストリーミング処理の両方を定義することができるオープンソースのプログラミングモデル で、大規模なデータ分散処理パイプラインを単純化して定義することができます。

Apache Beam では、パイプラインの実行時に ランナー (Runner) として実行環境を指定します。Dataflow は Apache Beam パイプラインの実行環境Dataflow ランナー」として動作するためのサービス であり、Apache Beam パイプライン自体は Google Cloud 以外の環境 (Dataflow ランナー以外のランナー) でも実行することができます。

Apache Beam は指定された Runner でパイプラインを実行する

ユースケース

Dataflow のユースケース

Apache Beam ではバッチ処理とストリーミング処理の双方が利用できるため、Dataflow のユースケースは多岐にわたります。

単純なデータのバッチ/ストリーミング変換処理だけではなく、Vertex AI 上の機械学習モデルにデータをストリーミングすることで、リアルタイムの予測分析や異常検知を行うこともできます。

例1 : リアルタイムのデータ取り込み

Dataflow では典型的なユースケースとして、メッセージングサービスである Pub/Sub と組み合わせたリアルタイムのデータ取り込み処理があります。

以下の図は、Pub/Sub と Dataflow を使用した クリックストリーム 分析の例です。
アプリケーション上でのエンドユーザーのアクションを Pub/Sub に送信し、Dataflow でデータのチェックや集計などを行ったあと、各種データストアに書き込みます。

Pub/Sub ではバックエンドに対する 最低 1回(at-least-once)の配信が保証されており、データの再配信が行われる場合があるため、データ重複が発生する可能性があります。Dataflow は Pub/Sub からデータを受け取るとき、Pub/Sub のメッセージに含まれるメッセージ ID を使用してデータの重複削除を行います。

Dataflow を使用したリアルタイムのクリックストリーム分析

例2 : データストア間のデータ移行

Dataflow では、各種データベースから BigQuery や Cloud Spanner などにデータを移行するためのテンプレートが提供されており、数クリックでデータ移行パイプラインを構築することができます。

Dataflow を使用したデータのバッチ移行

また、Datastream を Dataflow のデータソースとして使用することで、データベースの変更を検知し、それをリアルタイムで BigQuery などのデータストアに同期することができます。

Datastream と Dataflow によるデータのリアルタイム同期処理

開発

SDK

Apache Beam では、以下の言語の SDK が提供されています。

インストール手順等は以下の Google Cloud 公式ドキュメントをご参照ください。

パイプライン構成

Apache Beam のパイプラインは以下のような要素から構成されます。

要素名 説明
Pipeline Apache Beam によるデータ変換処理の開始から終了、つまり入力データの読み取りから変換、出力データの書き込みまでの一連の処理をカプセル化した概念です。
PCollection パイプラインで使用されるデータセットを抽象化するオブジェクトであり、パイプラインの各ステップにおける入力、出力となります。
PTransform 実際のデータ変換処理を表します。1 つ以上の PCollection を入力とし、その各要素に対して処理を実行した後、1 つ以上の PCollection を出力します。
I/O transforms Apache Beam の I/O コネクタ を使用して、外部ソースからパイプラインへのデータ読み込み、パイプラインから外部シンクへのデータの書き込みを行います。様々なデータ形式を読み書きすることができ、必要に応じて カスタムの I/O コネクタ を開発して使用することもできます。

Apache Beam パイプラインの構成

上記の基本的な構成要素の詳細、およびその他の構成要素については以下のドキュメントを参照してください。

Python SDK で記述したパイプラインの例

パイプラインの例として、Python 用の Apache Beam SDK で記述したシンプルな例を以下に示します。
この例では、3 人の人物の名前、年齢、身長を入力データとし、年齢と身長の平均をそれぞれ算出して出力するパイプラインが記述されています。

import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
  
  
# Dataflow ランナーで実行するためのパラメータ
pipeline_args = [
    #'--runner=DirectRunner' # ローカルで実行する場合
    # Dataflow ランナーで実行(以下は Dataflow 用のパラメータ)
    '--runner=DataflowRunner', 
    '--project=myproject',
    '--region=asia-northeast1',
    '--subnetwork=regions/asia-northeast1/subnetworks/mysubnet',
    '--temp_location=gs://mybucket/tmp/'
]
  
  
def run():
    # パイプラインの作成
    with beam.Pipeline(
        options=PipelineOptions(pipeline_args)
    ) as pipeline:   
        # PCollection を作成
        inputdata = (
            pipeline
            | 'Create' >> beam.Create([
                {'name': 'Ichiro', 'age': 32, 'height': 172},
                {'name': 'Jiro', 'age': 27, 'height': 168},
                {'name': 'Saburo', 'age': 25, 'height': 179}
            ])
        )
  
        # 平均年齢を算出する
        mean_age = (
            inputdata
            | 'Extract age' >> beam.Map(lambda x: x['age'])
            | 'Get mean age' >> beam.combiners.Mean.Globally()
            | 'Get KeyValue age' >> beam.Map(lambda x: ('mean age', x))
        )
  
        # 平均身長を算出する
        mean_height = (
            inputdata
            | 'Extract height' >> beam.Map(lambda x: x['height'])
            | 'Get mean height' >> beam.combiners.Mean.Globally()
            | 'Get KeyValue height' >> beam.Map(lambda x: ('mean height', x))
        )
  
        # 平均年齢と平均身長を出力する
        outputdata = (
            (mean_age, mean_height)
            | 'Flatten data' >> beam.Flatten()
            | 'Output' >> beam.Map(lambda x: logging.info(f'{x[0]} = {x[1]}'))
        )
  
  
if __name__ == '__main__':
    run()

このコードを実行すると Dataflow ジョブが作成され、ワーカーとなる Compute Engine VM が起動した後、パイプラインに記述された処理がそれぞれ ステップ として実行されます。一度限りのバッチジョブのため、処理が完了すると VM は削除されます。
このように、コンソールや CLI で直接 Dataflow ジョブを作成して実行するのではなく、Dataflow ランナーを実行環境として指定したコードを実行 するとジョブが作成されます。

以下の図は Dataflow のコンソールから確認することができるジョブのグラフです。

Dataflow のコンソールでグラフ化されたパイプラインを確認できる

コンソールからステップごとのログを確認できます。
「Output」ステップで平均年齢と平均身長をログ出力するように記述していたため、以下のログが確認できます。

ステップのログを確認する

Dataflow テンプレート

Dataflow では、ユーザーが Apache Beam SDK で記述したパイプラインを Dataflow ジョブとしてただ実行するだけでなく、テンプレートとしてパッケージ化することができます。

テンプレートは Google Cloud コンソールや CLI、REST API を使用して Dataflow ジョブとしてデプロイします。

テンプレートを使用することで、パイプライン開発とデプロイのプロセスが分離され、権限を持つユーザーであれば開発者が用意したパイプラインを必要なときにデプロイすることができるほか、テンプレート利用者がテンプレート側に設定されたパラメータを渡して実行することができます。

たとえば、テンプレート利用者が任意の入力ファイルやデータ出力先を指定してからパイプラインを実行できるようにテンプレートを作成できます。

Dataflow でテンプレートを使用してパイプラインをデプロイする

テンプレート利用者がテンプレートに設定されたパラメータを渡すことができる

Dataflow のテンプレートには以下の 3種類があります。

テンプレートの種類 説明
Google 提供のテンプレート ユーザーがパイプラインを開発する代わりに、一般的なシナリオ用に事前定義されたテンプレートを使用できます(Google 提供のテンプレート一覧)。
クラシックテンプレート ユーザーが開発したパイプラインを Cloud Storage にステージングして Dataflow から利用します。
Flex テンプレート 開発したパイプラインを Docker イメージとしてパッケージ化し、Container Registry/Artifact Registry に格納します。
クラシックテンプレートと比較して処理内容を柔軟に変更することができ、入力パラメータに基づいて動的に I/O コネクタを選択したり、パイプライン作成中に入力パラメータの検証などの前処理を行ったりできます。

構成要素

Dataflow ワーカー

Dataflow ジョブは、パイプラインのデプロイ時にプロビジョニングされる Compute Engine VM インスタンスで実行されます。この VM を Dataflow ワーカー と呼びます。ワーカー上ではコンテナ化された Apache Beam SDK プロセスが起動されます。

ワーカー VM に使用されるマシンタイプや、起動するワーカーの台数などはデプロイ時に パイプラインオプション で指定することができます。

リージョンエンドポイント

ジョブごとに リージョンエンドポイント (Regional endpoints) を指定します。ジョブの作成時にリージョンを指定すると、そのリージョンのエンドポイント経由で Dataflow ジョブのメタデータが保存されたり、自動スケーリング等の Dataflow ワーカーの動作もこのリージョンから制御されます。

リージョンエンドポイントを作成するリージョンは Dataflow ワーカーを作成するリージョンと別にすることができますが、コンプライアンス上の理由から特定リージョン内にデータを閉じたい場合や、リージョン間のネットワークレイテンシによるパフォーマンス影響を抑えるには、ワーカーと同一リージョンを指定することが推奨されます。

Dataflow ワーカーとリージョンエンドポイント

Dataflow GPU

Dataflow ワーカーは GPU を使用することができます。GPU を使うと、特定の計算や画像処理、機械学習タスクを CPU よりも高速に実行することができます。

GPU は、後述する Dataflow Runner v2 を使用するパイプラインでのみサポートされています。

カスタムコンテナ

デフォルトの設定では、Dataflow ワーカーは Apache Beam イメージ を使用するコンテナを起動して処理を実行します。

カスタムコンテナを使用することで実行環境をカスタマイズし、起動時間を短縮したり、外部ライブラリなどの依存関係をプリインストールしたりできます。

GPU 同様、カスタムコンテナも Dataflow Runner v2 を使用するパイプラインでのみサポートされています。

周辺機能

Dataflow SQL

Dataflow SQL は、SQL を Apache Beam パイプラインに変換して Dataflow ジョブとして実行できる仕組みです。

Dataflow SQL のクエリは Dataflow コンソールの SQL ワークスペース や gcloud コマンドなどから作成し、そのままパイプラインへの変換、実行を行うことができます。

例として、公式ドキュメント のサンプルを実行してみます。
以下のコマンドでは、一般公開されている Pub/Sub トピック taxirides-realtime から 1分ごとにデータを pull し、1 分間のタクシーの乗客数を計算して BigQuery テーブルに書き込む Dataflow ジョブが作成されます。

$ gcloud dataflow sql query \
    --job-name=dataflow-sql-quickstart \
    --region=asia-northeast1 \
    --subnetwork=regions/asia-northeast1/subnetworks/mysubnet \
    --bigquery-dataset=taxirides \
    --bigquery-table=passengers_per_minute \
'SELECT
     TUMBLE_START("INTERVAL 60 SECOND") as period_start,
     SUM(passenger_count) AS pickup_count,
FROM pubsub.topic.`pubsub-public-data`.`taxirides-realtime`
WHERE
    ride_status = "pickup"
GROUP BY
    TUMBLE(event_timestamp, "INTERVAL 60 SECOND")'

コマンドを実行すると Dataflow ジョブが作成され、処理が開始されます。

Dataflow SQL で作成されたジョブのグラフ

BigQuery のテーブルを確認すると、1分ごとのタクシー乗客数のデータが記録されています。このサンプルのジョブはストリーミング処理のため、ジョブを停止するまで動作し続けます。実際に試す場合はジョブの止め忘れに気をつけましょう。

Dataflow ジョブの出力先となる BigQuery テーブル

Dataflow Prime

Dataflow Prime は「Dataflow のサーバーレス版」とも言うべき仕組みです。サーバーレスなプラットフォーム上で Apache Beam パイプラインを実行することができます。

Dataflow Prime ではワーカーのリソース最適化に役立つ機能が提供されており、たとえば先述の水平自動スケーリングに加え、垂直方向の自動スケーリング を利用することができます。Dataflow ワーカーで使用可能なメモリは、パイプライン実行時に必要に応じて自動で調整され、メモリ不足エラー(OOM)によるジョブの失敗を防止し、パイプラインのリソース効率を最適化することができます。

Dataflow ML

Dataflow ML では Dataflow ジョブ内で機械学習 (AI/ML) モデルによる推論を実行することができます。

Apache Beam の RunInference API を使用して機械学習モデルをパイプラインに追加することでこれを実現します。以下は、Pub/Sub で受信したデータを Dataflow のパイプライン内で処理し、機械学習モデルに送信して推論を行わせたあと、その結果を BigQuery に書き込むストリーミング処理の例です。

機械学習モデルに推論を実行させる処理をパイプラインに組み込む

ノートブックでの開発

Dataflow ではパイプラインの開発、実行、解析を JupyterLab ノートブック上でインタラクティブに行うことができます。

Apache Beam ノートブックは Vertex AI Workbench ユーザー管理ノートブック として提供されており、JupyterLab ノートブックと Apache Beam インタラクティブランナー がプリインストールされています。

パイプライン最適化の仕組み

Dataflow Runner v2

Dataflow Runner v2 は Dataflow ランナー (Apache Beam 実行環境) の新しいバージョンで、今後 Dataflow に実装される新機能は Dataflow Runner v2 でのみ使用することができます。

従来のランナーを使用することもできますが、Dataflow Runner v2 は高パフォーマンス、低コストで利用できる傾向があるため、基本的にはこちらを使用すると良いでしょう。SDK によっては Dataflow Runner v2 で使用できない機能があるなど、いくつかの 制限事項 があります。

自動スケーリング

水平自動スケーリング

Dataflow ジョブで水平自動スケーリングを有効化することで、ワーカー VM をジョブの実行に必要な数だけ自動的にスケールアウト/スケールインするようにできます。

また、ストリーミング処理を実行するワーカーは、ジョブの実行中に手動でスケーリングすることも可能です。

動的スレッドスケーリング

Dataflow では、パイプラインの実行時に複数のワーカーに処理を分散します。ワーカーは複数のスレッドを起動してタスクを並列実行します。

動的スレッドスケーリング (Dynamic Thread Scaling) を有効化すると、Dataflow は各ワーカーのリソース使用率に基づき、ワーカーで実行するスレッドの数を自動的に最適化します。

スレッド数の自動スケーリングはワーカーごとに行われ、ワーカーのメモリ使用率が 50% 未満かつ CPU 使用率が 65% 未満の場合はスレッド数を増やし、ワーカーのメモリ使用率が 70% を超えた場合にスレッド数を減らします。

動的作業再調整

動的作業再調整 (Dynamic work rebalancing) 機能を使用すると、Dataflow はワーカーごとのタスク割り当ての不均衡や、終了に時間がかかるタスクが割り当たっているワーカー、タスクが早期に終了するワーカーを自動的に検出し、余裕のあるワーカーに対して動的にタスクを割り当てることでジョブ全体の処理時間を短縮します。

動的作業再調整によるタスクの動的割り当て

Dataflow Shuffle / Streaming Engine

Dataflow ShuffleStreaming Engine は、それぞれ Dataflow ワーカー間のデータ移動(シャッフル)処理をワーカー外にオフロードする機能です。バッチ処理では Dataflow Shuffle、ストリーミング処理では Streaming Engine と呼ばれます。

パイプラインの中で GroupByKey のようなデータのグループ化処理を行う場合、特定のキーを持つすべてのデータを 1 つのワーカーに集める必要があります。この場合、それぞれのワーカー同士がメッシュ状にデータの移動が行うため、ワーカーの負荷や通信のオーバーヘッドが発生します。また、特定のワーカーで障害が発生した場合にデータが失われてしまい、ジョブ全体が失敗してしまう可能性があります。

Dataflow Shuffle / Streaming Engine を使用しない場合

Dataflow Shuffle および Streaming Engine では、処理の中間状態をワーカー外に保存し、次の処理ではここからデータを読み出します。

これにより、ワーカー間でメッシュ状の通信を行うことなくデータの移動を行うことができます。そして特定のワーカーで障害が発生した場合でも、正常なワーカーが中間に保存されているデータを再度読み出すことでジョブを続行できます。

Dataflow Shuffle / Streaming Engine を使用する場合

Flexible Resource Scheduling(FlexRS)

Flexible Resource Scheduling (FlexRS) は「遅延したスケジューリング」と「プリエンプティブル VM インスタンス」の活用により バッチ処理のコストを削減する機能 です。FlexRS を使用するジョブは FlexRS ジョブ といいます。

Flex ワーカー(FlexRS で使用される Dataflow ワーカー)として、プリエンプティブル VM と通常の VM の組み合わせが使用され、通常の Dataflow ワーカーよりもリソース使用料が安く設定されています。

FlexRS ジョブではリソース使用量が安い代わりに「遅延したスケジューリング」により、ジョブの作成後すぐに実行されず、Google Cloud が所有するリソースに余裕があるタイミングで実行されます(作成から 6時間以内には実行される)。したがって、時間の制約が厳しくないジョブを実行する際に有効な機能となります。

FlexRS ジョブは開始される前に 早期検証 が行われ、Dataflow ジョブの実行パラメータ、IAM ロール、ネットワーク構成などの設定が事前に検証され、潜在的なエラーがあれば報告されます。

FlexRS を使用するには Dataflow Shuffle が有効化されている必要があります。

モニタリング

Dataflow モニタリングインターフェースは、ウェブベースのパフォーマンスモニタリング用の画面です。Google Cloud コンソールから閲覧できます。

Dataflow モニタリングインターフェースでは以下のような項目が表示されます。

  • 現在実行中のジョブを含む、過去 30日間に実行された Dataflow ジョブのリスト
  • 各パイプラインの図(ジョブグラフ
  • 自動スケーリング、スループット、リソース使用率などの指標(参考
  • ジョブ実行中に発生したエラーや警告
  • ジョブのパフォーマンス向上、コスト削減などに関する推奨事項

詳細は以下のドキュメントもご参照ください。

セキュリティ

データの暗号化

Dataflow では、データパイプライン全体で処理中のデータ、保存されるデータの両方に対して、デフォルトで暗号化が行われています。デフォルトの暗号化は Google が管理する暗号鍵によって行われます。

また、Google 管理の暗号鍵の代わりに Cloud Key Management Service(KMS)による顧客管理の暗号鍵(CMEK)や Cloud HSM を使用することもできます。

プライベート IP アドレスの使用

Dataflow では ワーカー VM のパブリック IP を無効にし、すべてのネットワーク通信にプライベート IP アドレスを使用するように指定することができます。パブリック IP を無効化する場合、ワーカーがリージョンエンドポイントにアクセスするために、ワーカー VM が使用するサブネットで 限定公開の Google アクセス を有効化する必要があります。

ネットワークタグの使用

通常の Compute Engine VM 同様に、Dataflow ワーカー VM にはネットワークタグを付与し、ファイアウォールルールでトラフィックの制限を行うことができます。

Dataflow ジョブで Dataflow Shuffle または Streaming Engine を使用しない場合(シャッフル処理を VM 外にオフロードしない場合)、VM 間でデータのやり取りを行うために Dataflow 用のファイアウォールルール で TCP ポート 1234512346 の通信を許可する必要があります。

料金

Dataflow の最新の料金については以下のドキュメントを参照してください。

Dataflow の料金

Dataflow の料金はジョブごとに秒単位で発生します。 Dataflow ワーカー VM のリソース料金を基本として、GPU、Dataflow Shuffle を使用する場合に追加で料金が発生します。
東京リージョン(asia-northeast1)で利用する場合の料金は以下のようになります。

ジョブの種類 vCPU(1 vCPU、1 時間あたり) メモリ(1 GB、1時間あたり)  シャッフル時に処理されたデータ(GB 単位)
※Dataflow Shuffle、Streaming Engine 使用時
バッチ $0.0728 $0.0046241 $0.0143
FlexRS $0.0437 $0.0027745 $0.0143
ストリーミング $0.0897 $0.0046241 $0.0234

バッチ処理で Dataflow Shuffle を使用する場合、上記の シャッフル時に処理されたデータ の料金はデータ量に応じて割引されます。シャッフル処理されたデータの最初の 250 GB までは 75%、次の 4,870 GB~5,120 GB は 50%、シャッフル時の処理料金が割引されます。5,120 GB 以降は上記の表の料金がそのまま適用されます。

例として、合計 10,240 GB(10 TB)のデータをシャッフル処理した場合、課金対象のデータ量は 7,617.5 GB(= 250 GB × 25% + 4,870 GB × 50% + 5,120 GB)となります。

ワーカー VM が使用するストレージ、GPU の料金は以下のようになっており、ジョブの種類ごとの違いはありません。

ストレージ - 標準永続ディスク(1 GB 1時間あたり) ストレージ - SSD 永続ディスク(1 GB 1時間あたり) NVIDIA® Tesla® K80 GPU(1 GPU 1 時間あたり) NVIDIA® Tesla® P100 GPU(1 GPU 1 時間あたり) NVIDIA® Tesla® V100 GPU(1 GPU 1 時間あたり) NVIDIA® Tesla® T4 GPU(1 GPU 1 時間あたり) NVIDIA® Tesla® P4 GPU(1 GPU 1 時間あたり)
$0.0000702 $0.0003874 東京リージョンで使用不可 東京リージョンで使用不可 東京リージョンで使用不可 $0.4440 東京リージョンで使用不可

Dataflow Prime の料金

Dataflow Prime は課金単位が異なり、Dataflow Processing Unit(DPU) という概念によって料金が決定されます。ジョブのリソース使用状況から DPU 使用量が測定され、リソース消費量に比例して DPU 使用量が多くなります。 1 DPU は 「1 vCPUメモリ 4 GB250 GB の永続ディスク を使用するワーカー VM が 1時間処理を行った場合に使用されるリソース」に相当します。

Dataflow Prime を東京リージョン(asia-northeast1)で使用する場合の料金は以下のようになります。

ジョブの種類 1 DPU、1 時間あたりの料金
バッチ $0.091
ストリーミング $0.105

Dataflow Prime では使用する DPU 数を直接指定することができないため、コスト節約にはメモリ消費量の削減、シャッフルの際に処理されるデータ量の削減などをパイプラインの実装時に工夫する必要があります。先述した Dataflow モニタリングインターフェースはこれらの最適化に役立つ情報を提供してくれます。

佐々木 駿太 (記事一覧)

G-gen最北端、北海道在住のクラウドソリューション部エンジニア

2022年6月にG-genにジョイン。Google Cloud Partner Top Engineer 2024に選出。好きなGoogle CloudプロダクトはCloud Run。

趣味はコーヒー、小説(SF、ミステリ)、カラオケなど。