Batch で重い CSV を ETL する

記事タイトルとURLをコピーする

G-gen の神谷です。本記事では、Batch を使って、大容量の CSV ファイルを BigQuery に ETL してみます。 Cloud Functions や Cloud Run といった類似サービスに比べて Batch の使いどころはどこかを検証します。

基本的な情報

Batch とは

Batch は、Compute Engine 仮想マシン(VM)インスタンスでバッチ処理ワークロードのスケジューリング、キューイング、実行を行えるフルマネージド サービスです。 Batch は、ユーザーに代わってリソースをプロビジョニングして容量を管理し、バッチ ワークロードを大規模に実行できるようにします。

Batch の利点

筆者が考える Batch の利点は以下です。

  • サーバレスのコンピューティング基盤であり、処理の負荷に応じて、CPU やメモリを自由に選択できる
  • コンテナが使えるため再利用性が高く、既存の資産を活用しやすい
  • ジョブ実行時のみ Spot VM(プリエンプティブル VM の最新バージョン) を起動するため、費用を抑えることができる

ユースケース

Google がユースケースとして紹介しているのは以下です。

  • ゲノム配列決定、創薬などに使用される再現可能なパイプラインのハイスループット処理
  • モンテカルロ シミュレーションを実行し、市場での取引に必要な結果を迅速に分析する
  • さまざまな入力に基づいて検証テストとシミュレーションを自動化し、設計を最適化する

料金

Batch そのものは無料です。しかし、バッチジョブの実行に使用される Google Cloud リソース( GCE の VM 料金。CPU、メモリ、GPU の利用料)に対しては、料金が発生します。

類似プロダクトとの比較

類似プロダクトとの比較です。 Batch は Cloud Run Jobs に近いですが、インスタンスサイズにほぼ制限がないことがポイントです。

Batch Cloud Functions(第 1 世代) Cloud Functions(第 2 世代) Cloud Run Jobs
言語対応 任意(コンテナのため) Python、Go、Java、Node.js、.Net Core、Ruby、PHP 第 1 世代 と同じ 任意(コンテナのため)
イメージレジストリ Artifact Registry(Container Registryは未確認) Container Registry または Artifact Registry Artifact Registry のみ Container Registry または Artifact Registry
インスタンスサイズ 事前定義されたマシンタイプまたはカスタム マシンタイプ 最大 8 GB の RAM(2 vCPU) 最大 16 GiB RAM(4 vCPU) 最大 32 GiB RAM(8 vCPU)
イベントタイプ - gcloud コマンド
- HTTP トリガー
- Cloud Scheduler
- Workflows
7 つのソースからのイベントの直接サポート

- HTTP トリガー
- イベント トリガー:
- Pub/Sub トリガー
- Cloud Storage トリガー
- Firestore トリガー
- Firebase 向け Google アナリティクス トリガー
- Firebase Realtime Database トリガー
- Firebase Authentication トリガー
- Firebase Remote Config トリガー
- HTTP トリガー
- イベント トリガー:
- Pub/Sub トリガー
- Cloud Storage トリガー
- 汎用の Eventarc トリガー
Eventarc でサポートされているすべてのイベントタイプをサポート(Cloud Audit Logs を介した 90 以上のイベントソースを含む)

※ Firebase、Firestore 系は一部未対応のため、第1世代を使う
- gcloud コマンド
- HTTP トリガー
- Cloud Scheduler
- Workflows
リクエストタイムアウト なし 最大 9 分 HTTP でトリガーされる関数の場合は最大 60 分
イベントによってトリガーされる関数の場合は最大 9 分
- ジョブ:明示的なタイムアウトなし
- タスク:最大 24 時間
同時リクエスト リージョンごとの1分あたりのエージェントリクエスト 最大 30,000 件 関数インスタンスごとに 1 件の同時リクエスト 関数インスタンスあたり最大 1,000 件の同時リクエスト コンテナ インスタンスあたり最大 1,000 件の同時リクエスト
並列処理 サポート対象 非対応 非対応 サポート対象
トラフィック分割(異なる関数のリビジョン間でトラフィックを分割するか、関数を以前のバージョンにロールバックする機能) 非対応(デプロイのたびにジョブが新規作成される) 非対応 サポート対象 サポート対象
リトライ あり(タスク単位で 0〜10 回) イベントドリブン関数ではあり。失敗後に正常完了 or 7日経過するまでリトライし続ける 第 1 世代 と同じ あり(0〜10回)

検証要件と設計

要件

今回の検証におけるデータ基盤の要件は以下です。

  • データソース

    • BigQuery のサンプルテーブル「wikipedia(313,797,035件、35.69 GB)
    • Zip で圧縮された大容量の CSV ファイル(1 GB、10 GB、35.7 GB)が GCS に連携されてくる
    • CSV は Shift-JIS エンコードになっている(日本語含む)
  • DWH

    • BigQuery にスキーマ(文字列型、数値型、日付型等)を持った状態で保持したい

設計ポイント

上記要件に対して、技術選定、アーキテクチャ設計を行う上でのポイントは以下です。

  • BigQuery の機能制約で、Zip をそのまま取り込むことができないので、解凍が必要になる
  • BigQuery の機能制約で、Shift-JIS をそのまま取り込むと日本語が文字化けするので、UTF8 変換を行う必要がある
  • 大容量のファイルを扱えるスケーラブルな処理基盤を選択する必要がある
  • 型変換などの簡単な前処理を行える必要がある

実装方式

これらを踏まえて、今回は Batch を使います。実装方式は以下です。

  • 言語は Python
  • Pandas を使って、Zip の解凍、型変換、エンコードを実現する
  • 再利用性を意識して、コンテナを使う
  • Batch の並列化機能を使って、データ分割しながら処理を行う

実装

大まかな実装の流れはこちらを参考にしました。

ディレクトリ構成

ディレクトリ構成は以下の通りです。

.
├── config
│   ├── batch-test-case1.json
            ・
            ・
│   └── batch-test-case4_1.json
│   └── batch-test-case4_2.json
└── src
    ├── main.py
    └── requirements.txt

main 処理

メインとなる Python の処理は以下です。 並列化する場合としない場合とで、pd.read_csv での読み込み方法を変えています。

  • 並列化する場合( parallelism==True ):BATCH_TASK_INDEX と chunksize によって読み込み対象行を指定し、分割した単位ごとに並列処理
  • 並列化しない場合( parallelism==False ):chunksize ごとの iterator を取得し、逐次処理
import os
import random
import sys
import time
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
import pandas as pd
from google.cloud import bigquery
import psutil
from distutils.util import strtobool
from string import Template
  
# Batch の環境変数として定義されている。並列化を行う際に活用する  
BATCH_TASK_COUNT = int(os.environ.get('BATCH_TASK_COUNT'))
BATCH_TASK_INDEX = int(os.environ.get('BATCH_TASK_INDEX'))
  
  
def display_resource_usage(message):
  
    # メモリ情報を取得
    mem = psutil.virtual_memory()
    # CPU 情報を取得
    cpu = psutil.cpu_percent(interval=1, percpu=True)
    print(
        f"memory_total: {round(mem.total / 1024 / 1024 / 1024, 2)} GB, memory_used: {round(mem.used / 1024 / 1024 / 1024, 2)} GB, memory_available: {round(mem.available / 1024 / 1024 / 1024, 2)} GB, cpu: {cpu}, BATCH_TASK_INDEX: {BATCH_TASK_INDEX}, message: {message}")
  
  
def is_first_task_done(dest_table_path):
  
    client = bigquery.Client()
    now = datetime.now(ZoneInfo("Asia/Tokyo"))
  
    # 直近 60 min 以内で最大 300 件のジョブ一覧を取得
    for job in client.list_jobs(max_results=300, min_creation_time=(now + timedelta(minutes=-60))):
  
        # 宛先テーブルに書き込む初回のジョブが完了していた場合
        if int(job.labels.get("task_index")) == 0 and job.labels.get("dest_table_path") == dest_table_path and job.state == "DONE":
            print(
                f"First batch job is done {dest_table_path}, end_time is {job.ended.astimezone(ZoneInfo('Asia/Tokyo'))}")
            return True
  
    return False
  
  
def transfer(df, dest_table_path, index):
  
    # Dataframeによる前処理
    print(f"TASK_INDEX: {index}, {df[:1]}")
  
    # Dataframのメモリ使用量を見る
    print(f"TASK_INDEX: {index}, {df.info()}")
  
    # UNIX時間をTimestamp型に変換
    df['timestamp'] = pd.to_datetime(
        df['timestamp'], unit='s').dt.tz_localize('UTC')
  
    # BQのクライアントライブラリ
    client = bigquery.Client()
  
    # スキーマ定義
    schema_definition = [
        bigquery.SchemaField(
            "title", bigquery.enums.SqlTypeNames.STRING, 'NULLABLE'),
        bigquery.SchemaField(
            "id", bigquery.enums.SqlTypeNames.INTEGER, 'NULLABLE'),
        bigquery.SchemaField(
            "language", bigquery.enums.SqlTypeNames.STRING, 'NULLABLE'),
        bigquery.SchemaField(
            "wp_namespace", bigquery.enums.SqlTypeNames.INTEGER, 'REQUIRED'),
        bigquery.SchemaField(
            "is_redirect", bigquery.enums.SqlTypeNames.BOOLEAN, 'NULLABLE'),
        bigquery.SchemaField(
            "revision_id", bigquery.enums.SqlTypeNames.INTEGER, 'NULLABLE'),
        bigquery.SchemaField(
            "contributor_ip", bigquery.enums.SqlTypeNames.STRING, 'NULLABLE'),
        bigquery.SchemaField(
            "contributor_id", bigquery.enums.SqlTypeNames.INTEGER, 'NULLABLE'),
        bigquery.SchemaField("contributor_username",
                             bigquery.enums.SqlTypeNames.STRING, 'NULLABLE'),
        bigquery.SchemaField("timestamp",
                             bigquery.enums.SqlTypeNames.TIMESTAMP, 'REQUIRED'),
        bigquery.SchemaField(
            "is_minor", bigquery.enums.SqlTypeNames.BOOLEAN, 'NULLABLE'),
        bigquery.SchemaField(
            "is_bot", bigquery.enums.SqlTypeNames.BOOLEAN, 'NULLABLE'),
        bigquery.SchemaField(
            "reversion_id", bigquery.enums.SqlTypeNames.INTEGER, 'NULLABLE'),
        bigquery.SchemaField(
            "comment", bigquery.enums.SqlTypeNames.STRING, 'NULLABLE'),
        bigquery.SchemaField(
            "num_characters", bigquery.enums.SqlTypeNames.INTEGER, 'REQUIRED')
    ]
  
    # ジョブを識別するためのラベル用
    dest_table_path_ = dest_table_path.replace(".", "-")
  
    # DataframeをBQにロードするジョブ生成
    # 最初の1回は洗い替えをし、残りは追加を行う.
    if index == 0:
        job_config = bigquery.LoadJobConfig(
            write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,  # 上書き
            schema=schema_definition,
            labels={"dest_table_path": dest_table_path_,
                    "task_index": index}
        )
        print(
            f"TASK_INDEX : {index}, WRITE_TRUNCATE"
        )
    else:
        total_sec = 0
  
        # 「TASK_INDEX = 0」のジョブが完了したかチェック
        while is_first_task_done(dest_table_path_) is False:
  
            # 未完了の場合、スリープ
            sec = random.uniform(10, 20)
            time.sleep(sec)
            total_sec += sec
  
        job_config = bigquery.LoadJobConfig(
            write_disposition=bigquery.WriteDisposition.WRITE_APPEND,  # 追加
            schema=schema_definition,
            labels={"dest_table_path": dest_table_path_,
                    "task_index": index}
        )
        print(
            f"TASK_INDEX : {index}, WRITE_APPEND, SLEEP_TIME: {total_sec} sec"
        )
  
    # 書き込み先のテーブルpathを指定
    # dest_table_path = "jkamiya.batch_test.sample1"
    job = client.load_table_from_dataframe(
        df,
        dest_table_path,
        job_config=job_config,
    )
    print(f"TASK_INDEX: {index}, Job is {job}")
    display_resource_usage("after client.load_table_from_dataframe")
  
    # ジョブの完了を待つ
    job.result()
  
    # ジョブが完了すると、書き込み件数などが取得できる
    table = client.get_table(dest_table_path)  # Make an API request.
  
    print(
        f"TASK_INDEX: {index}, Loaded {table.num_rows} rows and {len(table.schema)} columns to {dest_table_path}"
    )
  
  
def get_df_nrows(source_file_path, encoding, chunksize):
  
    try:
        display_resource_usage("before pd.read_csv")
  
        # GCSにあるzipのCSVを直接読み込む(日本語が文字化けしないように、encoding="shift_jis" or "cp932"を指定)
        # skiprowsとnrowsで読み込み対象範囲を指定する
        df = pd.read_csv(
            source_file_path,
            low_memory=False,
            encoding=encoding,
            names=['title', 'id', 'language', 'wp_namespace', 'is_redirect', 'revision_id', 'contributor_ip', 'contributor_id',
                   'contributor_username', 'timestamp', 'is_minor', 'is_bot', 'reversion_id', 'comment', 'num_characters'],
            header=0,
  
            # skiprows:先頭から数えて「BATCH_TASK_INDEX * chunksize」行をスキップする
            skiprows=BATCH_TASK_INDEX * chunksize,
  
            # nrows:chunksize行分読み込む
            nrows=chunksize
  
        )
  
        display_resource_usage("after pd.read_csv")
  
        return df
  
    except Exception as e:
        print(f"Exception: {e}, TASK_INDEX: {BATCH_TASK_INDEX}")
        return
  
  
def get_df_iterator(source_file_path, encoding, chunksize):
  
    try:
        display_resource_usage("before pd.read_csv")
  
        # chunksizeで指定した件数分を分割して読み込む
        df_iterator = pd.read_csv(source_file_path,
                                  low_memory=False,
                                  encoding=encoding,
                                  chunksize=chunksize)
  
        display_resource_usage("after pd.read_csv")
  
    except Exception as e:
        print(f"Exception: {e}")
        return
  
    return df_iterator
  
  
def main_proc(source_file_path, dest_table_path,
              encoding, parallelism, chunksize):
  
    # 時間計測開始
    time_sta = time.perf_counter()
  
    # 並列化するかどうか
    if parallelism:
        # する場合は、BATCH_TASK_INDEX と chunksize によって読み込み対象行を指定し、分割した単位ごとに並列処理
        df = get_df_nrows(source_file_path, encoding, chunksize)
        transfer(df, dest_table_path, BATCH_TASK_INDEX)
        t = Template(
            'TASK_INDEX: ${BATCH_TASK_INDEX}, processing_time: ${processing_time} sec')
    else:
        # しない場合は、chunksize ごとの iterator を取得し、逐次処理
        df_iterator = get_df_iterator(source_file_path, encoding, chunksize)
        index = 0
        for df in df_iterator:
            transfer(df, dest_table_path, index)
            index += 1
        t = Template('processing_time: ${processing_time} sec')
  
    # 時間計測終了
    time_end = time.perf_counter()
    # 経過時間(秒)
    tim = time_end - time_sta
    print(t.substitute(processing_time=round(
        tim, 3), BATCH_TASK_INDEX=BATCH_TASK_INDEX))
  
  
if __name__ == '__main__':
  
    args = sys.argv
    source_file_path = args[1]
    dest_table_path = args[2]
    encoding = str(args[3])
    parallelism = strtobool(args[4])
    chunksize = int(args[5])
  
    print(
        f"source_file_path: {source_file_path}, encoding: {encoding}, parallelism: {args[4]}, chunksize: {chunksize}")
  
    main_proc(source_file_path, dest_table_path,
              encoding, parallelism, chunksize)

main.py と同一階層に「requirements.txt」を置き、必要なライブラリを定義します。

pandas==1.4.1
google-cloud-storage==2.2.1
gcsfs==2022.11.0
google-cloud-bigquery==3.4.1
pyarrow==10.0.1
psutil==5.9.4

コンテナ作成

次に、buildpacks を使って、Dockerfile を用意することなくコンテナを作ります。 ローカルからコンテナを作る場合には以下を実行します。

sudo pack build --builder=gcr.io/buildpacks/builder batch-sample --env GOOGLE_ENTRYPOINT="python"

Cloud Build 経由で作る場合には以下を実行します。あらかじめ、Artifact Registry にコンテナ格納場所を作っておきます。 (筆者の環境だと作成までに20-25分程度かかりました)

# 「containers」というリポジトリを作成
gcloud artifacts repositories create containers --repository-format=docker --location=asia-northeast1

# 「asia-northeast1-docker.pkg.dev/jkamiya/containers」というパスでリポジトリが生成されるので、そこに「batch-sample」というコンテナを配置
gcloud builds submit ./src/ --pack image=asia-northeast1-docker.pkg.dev/jkamiya/containers/batch-sample,env=GOOGLE_ENTRYPOINT=python

Batch の設定

次に、今回の検証の肝となる Batch の設定を書いていきます(参考)。 以下は並列化する場合の設定例です。

{
    "taskGroups": [
        {
            "taskSpec": {
                "runnables": [
                    {
                        "container": {
                            "imageUri": "asia-northeast1-docker.pkg.dev/jkamiya/containers/batch-sample",
                            "commands": [
                                "main.py",
                                "gs://kamiya-batch-test/export/case4/test_000000000000_shiftjis.zip",
                                "jkamiya.batch_test.sample",
                                "cp932",
                                "true",
                                "1000000"
                            ]
                        }
                    }
                ],
                "computeResource": {
                    "cpuMilli": 64,
                    "memoryMib": 128
                },
                "maxRetryCount": 10
            },
            "taskCount": 300,
            "parallelism": 100
        }
    ],
    "allocationPolicy": {
        "instances": [
            {
                "policy": {
                    "machineType": "n2d-highmem-64"
                }
            }
        ]
    },
    "logsPolicy": {
        "destination": "CLOUD_LOGGING"
    }
}

設定のポイントは以下です。

  • imageUri:ビルドしたイメージのパス(Artifact Registry)
  • commands:コンテナに渡す引数。ビルド時のエンドポイントで「GOOGLE_ENTRYPOINT=python」を指定したため、pythonコマンド実行のあとに続くファイル名や実行時引数をセット
  • computeResource:1 タスクあたりに確保するコンピュータリソース。並列数を上げたい場合、多く取りすぎないほうが良い
    • cpuMilli:CPU はそんなに使い切れないので、低めで良い(machineType に対して cpuMilli が大きすぎると、parallelism で指定した並列数を割り当てられない)
    • memoryMib:タスク単位でデータがギリギリ乗るサイズをセット。仮に、1 タスクで 扱う データ(Dataframe の 1 chunk) が 110 MBであれば、 memoryMib は 128 MB でも事足りる
  • maxRetryCount:タスクレベルでの最大リトライ数。未指定の場合は0となる。ジョブを並列化したときにタスクが1つでもエラーになっていればジョブ全体がエラーとなってしまうため、ここは指定していたほうが良い(筆者の場合だと、タスク100個のうち2,3個はエラーになっていた)
  • taskCount:タスク数。computeResource と machineType に応じた設定を行う
  • parallelism:並列数。computeResource と machineType に応じた設定を行う
  • machineType:VM のマシンタイプ(バッチの裏側は Compute Engine の VM)
  • destination:ログ出力先指定(Cloud Logging)

ジョブの実行

以下のコマンドでジョブを実行します。 現時点だと、「asia-northeast1(東京)」は Batch のロケーションとして、許可されていないようです。

gcloud batch jobs submit batch-sample-`date +"%Y%m%d%H%M%S"` --location us-central1 --config ./config/batch-test-case1.json

無事デプロイされジョブがトリガーされると以下のようになります。

ジョブ実行

ジョブの詳細画面です。ジョブのマシンタイプやタスク数、並列数、実行時にセットした引数等を確認できます。 並列化の場合、画面右上にランニングとなっているタスク数や、失敗したタスク数が表示されます。 ここでエラーが多ければ、並列化が上手く行われていなかったり、VM としてジョブに必要なリソースが不足している可能性があります。 machineType を上げたり、parallelism や computeResource を下げたりといった調整します。 1 タスクあたりに必要十分な computeResource を確定した後、ジョブ全体として処理すべきデータ量が多い場合には、 machineType を上げて十分なメモリを確保すると良いでしょう。

ジョブ詳細

Compute Engine の画面です。ジョブごとに VM が作成され、ジョブが完了したら削除されます。 並列化が効いている場合には、グループインスタンスによって複数の VM が起動します。 parallelism を大きくすると、そのぶん起動する VM が多くなります。

Compute Engine の VM

CPU やメモリ等のリソース利用状況を細かく見たければ、Compute Engine のダッシュボードではなく、Cloud Monitoring を使います。 よく見るのは以下のような指標です。

  • compute.googleapis.com/instance/cpu/utilization
  • compute.googleapis.com/instance/memory/balloon/ram_used

Cloud Monitoring のメトリクス

ジョブが完了したので、BigQuery を見てみましょう。 想定通りデータが書き込まれているようです。

BigQuery への書き込み結果

性能検証と結果

大容量データを使った性能検証を行います。今回試すのは 3 種類の CSV ファイル(1 GB、10.7 GB、35.7 GB)です。

検証結果

結果は以下の通りです。

1 GB の csv

No 元ファイルサイズ zipファイルサイズ 件数 ジョブ実行時間 chunk件数(1chunkあたりの想定サイズ) 1chunkあたりの想定サイズ computeResource.cpuMilli computeResource.memoryMib taskCount parallelism マシンタイプ vCPUs Memory 料金単価 VM 数 料金 備考
1 1GB 451.8MB 10,000,000 0:03:19 1,000,000 0.1GB デフォルト(2000) デフォルト(2000) 1 1 e2-standard-4 4 vCPUs 16GB $0.04021 hourly 1 $0.00222

10.7 GB の csv

No 元ファイルサイズ zipファイルサイズ 件数 ジョブ実行時間 chunk件数(1chunkあたりの想定サイズ) 1chunkあたりの想定サイズ computeResource.cpuMilli computeResource.memoryMib taskCount parallelism マシンタイプ vCPUs Memory 料金単価 VM 数 料金 備考
2 10.7GB 4.5GB 100,000,000 0:39:49 1,000,000 0.1GB デフォルト(2000) デフォルト(2000) 1 1 e2-standard-4 4 vCPUs 16GB $0.04021 hourly 1 $0.02668
3 10.7GB 4.5GB 100,000,000 0:26:19 10,000,000 1.1GB デフォルト(2000) デフォルト(2000) 1 1 e2-standard-4 4 vCPUs 16GB $0.04021 hourly 1 $0.01764
4 10.7GB 4.5GB 100,000,000 0:05:59 1,000,000 0.1GB 64 128 100 100 e2-standard-32 32 vCPUs 128GB $0.34368 hourly 5 $0.17136

35.7 GB の csv

No 元ファイルサイズ zipファイルサイズ 件数 ジョブ実行時間 chunk件数(1chunkあたりの想定サイズ) 1chunkあたりの想定サイズ computeResource.cpuMilli computeResource.memoryMib taskCount parallelism マシンタイプ vCPUs Memory 料金単価 VM 数 料金 備考
5 35.7GB 14.2GB 313,797,035 1:25:00 10,000,000 1.1GB デフォルト(2000) デフォルト(2000) 1 1 e2-standard-4 4 vCPUs 16GB $0.04021 hourly 1 $0.05696
6 35.7GB 14.2GB 313,797,035 -(エラー) 30,000,000 3.3GB デフォルト(2000) デフォルト(2000) 1 1 e2-standard-4 4 vCPUs 16GB $0.04021 hourly 1 - CSVを読み込めない(メモリ不足)
7 35.7GB 14.2GB 313,797,035 1:19:00 30,000,000 3.3GB デフォルト(2000) デフォルト(2000) 1 1 e2-standard-8 8 vCPUs 32GB $0.08041 hourly 1 $0.10587
8 35.7GB 14.2GB 313,797,035 1:14:00 30,000,000 3.3GB デフォルト(2000) デフォルト(2000) 1 1 e2-standard-32 32 vCPUs 128GB $0.34368 hourly 1 $0.42387
9 35.7GB 14.2GB 313,797,035 -(エラー) 35,000,000 3.85GB デフォルト(2000) デフォルト(2000) 1 1 e2-standard-8 8 vCPUs 32GB $0.08041 hourly 1 - BQ書き込みエラー(3.9+GB)
10 35.7GB 14.2GB 313,797,035 -(エラー) 40,000,000 4.4GB デフォルト(2000) デフォルト(2000) 1 1 e2-standard-4 4 vCPUs 16GB $0.04021 hourly 1 - BQ書き込みエラー(4.6+GB)
11 35.7GB 14.2GB 313,797,035 0:32:10 1,000,000 0.1GB 64 128 314 100 n2d-highmem-64 64 vCPUs 512GB $2.91789 hourly 5 $7.82157

計 11 回実行して、8 回成功、3 回 はエラーになりました。マシンタイプは、「e2-standard-4」、「e2-standard-8」、「e2-standard-32」、「n2d-highmem-64」の 4 つを使っています。

  • No.6 と No.7 を比較

    • 1 chunk あたりの想定サイズが 3.3 GB であり、これが「e2-standard-4」のマシンタイプではメモリエラーになっている
    • マシンタイプを「e2-standard-8」にスケールアップすることで、無事ジョブが成功している
  • No.9 と No.10 のエラー

    • BigQuery へのロード時に BigQuery 側でメモリエラーになっている(原因については後述)
  • No.7、No.8、No.11 を比較

    • 35.7GB のデータをシングルタスクでやった場合(No.7、No.8)と、並列化した場合(No.11)の違い
    • 並列化によって倍以上のスピードで終わっているが、料金は 10 倍以上かかっている
  • No.3、No.4 を比較

    • 10.7GB のデータをシングルタスクでやった場合(No.3)と、並列化した場合(No.4)の違い
    • 並列化によって 5 倍以上のスピードで終わっているが、料金は 17 倍以上かかっている

わかったこと

今回の検証で分かったことは以下です。

  • タスクの並列化
    • タスクの並列実行機能を使って、chunk ごとに処理を行うことで効率的にマシンリソースを活用できる
    • CSV の読み込み時に chunksize で指定した件数ごとに分割して処理を実行しているため、全データをメモリに乗せてしまって(Pandas のデフォルトの挙動)、メモリエラーになることはない
    • chunksize で処理を分割することで BigQuery ロード時のメモリエラー(後述)にも対処できる
  • マシンリソース
    • CPU 使用率を 100 % に近づけつつ、メモリ使用量が溢れないよう設定を調整する必要がある
    • memoryMib は 1 タスクで扱うデータサイズよりちょっと大きい値をセットする
    • cpuMilli はそんなに取らなくて良い(Cloud Monitoring で使用率を見ると、余らせているケースが多い)
    • GCE の Cloud Logging を観察し、task task/j-a1f61f46-a24e-49b3-b013-8b9d2cf3231b-group0-230/2/0 runnable 0 wait error: exit status 137 のようなタスクレベルのエラーが出ている場合には、VM のメモリ不足が原因のことが多い
    • Python の psutil 等を使ってメモリ使用率を確認し、machineType のメモリ値に近い場合には、machineType 自体のスペックを上げて十分にメモリを確保する
  • 速度と料金
    • 並列化によってかなり速くなるが、その分料金がかかる
  • 実装方式
    • Dask や Modin、Vaex といった Pandas 高速化ライブラリを使わずに余裕を持ってスケールできる点はメリットだと考えられる(コンテナなので、これらを利用することもできる)

Batch の使いどころ

今回の検証を通じてわかった Batch の使いどころは以下です。

  • Cloud Run Jobs で手に負えない重い処理
    • マシンリソースのスケールアップ(machineType のスペックアップ)やスケールアウト(並列数に応じてグループインスタンス内のVM数を増やす)の制限がないため、現実的な時間で実行できる
  • 既存のワークロードの変更を最小限にしつつ、大容量のデータを扱いたいケース
    • データ分割のロジックを入れるだけなので、分散処理用のライブラリやフレームワークで書き換える必要がない
  • データ基盤の初期移行のようなショットで発生し、定常的に行うわけではないケース
    • 瞬間的にクラウド料金がかかるが、作り込みの工数よりは費用が抑えられる

上記以外にも、MPI(Message Passing Interface)ライブラリや GPU を使いたいケースも向きそうですが、これらは今後の検証課題にしたいと思います。

補足(BigQuery ロード時のメモリエラー)

事象

Pandas の Dataframe を BigQuery にアップロードする際に、一回のデータ量が多すぎると(約 4GB 以上)、VM側ではなく BQ 側でメモリーエラーが発生します。 以下のようなエラーログが出ます。

Resources exceeded during query execution: The query could not be executed in the allotted memory. Peak usage: 126% of limit. Top memory consumer(s): input table/file scan: 100%
Error while reading data, error message: Failed to read a column from Parquet file gs://bigquery-prod-upload-asia-northeast1/prod-scotty-191e6c74-86b6-4251-9a29-37b393e64ebb: row_group_index = 0, column = 9. Exception message: Unknown error: CANCELLED: . Detail: CANCELLED:

事象を整理すると、

となります。仕様なのか、実装方法によっては解決できるのかまだ不明で、根本原因は引き続き調査していきます。

暫定対応と課題

暫定対応としては、Pandas の Dataframe を分割しながら BQ にロードすると良いです(参考)。 Batch の設定によって並列数を増やせば、1 chunk あたり 4 GB 以下しか処理できなくても、問題なく大容量データに対応できます。 ただし、このロード方法はトランザクションが管理されません。 同じテーブルに書き込む他のジョブとタイミングが重なった場合には、データ不整合が起きる可能性があります。 一旦中間テーブルに書き込んで、その後にターゲットテーブルに Merge するといったパイプライン設計を行うと良いでしょう。

神谷 乗治 (記事一覧)

データアナリティクス準備室 室長

クラウドエンジニア。2017 年頃から Google Cloud を用いたデータ基盤構築や ML エンジニアリング、データ分析に従事。クラウドエース株式会社システム開発部マネージャーを経て現職。Google Cloud Partner Top Engineer 2023、Google Cloud Champion Innovators(Database)、著書:「GCPの教科書III【Cloud AIプロダクト編】」