Cloud Run jobsでGoogle Driveの差分ファイルをCloud Storageに同期してみた

記事タイトルとURLをコピーする

G-gen の山崎です。 当記事では、Cloud Run jobs を使用して Google Drive の差分を検知し、差分ファイルを Cloud Storage にアップロードする方法を解説します。

システム構成

今回構築したシステム構成は、以下のとおりです。

構成図

Cloud Scheduler を使って定期的に Cloud Run jobs を起動します。

起動した Cloud Run jobs は、以下の流れで前回処理を行った以降に発生した Google Drive の差分を検知し、その差分のファイルを Cloud Storage にアップロードします。

  1. ページトークン取得
  2. 差分ファイル情報取得
  3. ファイルアップロード
  4. ページトークン更新

ページトークンは、Google Drive API で変更を追跡する際の起点となる識別子です。 このトークンを保存・更新することで、前回処理を実行した時点からの差分のみを効率的に取得できます。

前提知識

今回使用する Google Cloud サービスの詳細は、以下の記事をご参照ください。

Cloud Storage

blog.g-gen.co.jp

Cloud Run jobs

blog.g-gen.co.jp

Cloud Scheduler

blog.g-gen.co.jp

環境構築

以下の順序で環境を構築しました。

  1. API の有効化
  2. サービスアカウントの構築
  3. Cloud Storage の構築
  4. Cloud Run jobs の構築
  5. Cloud Scheduler の構築

API の有効化

Google Cloud プロジェクトで、今回の構成に必要な Google Cloud サービスの API を有効化します。

gcloud services enable \
    artifactregistry.googleapis.com \
    cloudbuild.googleapis.com \
    run.googleapis.com \
    cloudscheduler.googleapis.com \
    drive.googleapis.com

サービスアカウントの構築

Cloud Run jobs、Cloud Scheduler 用のサービスアカウントを作成します。

Cloud Run jobs 用サービスアカウントの作成

Google Cloud コンソールのサービスアカウント画面で、[サービスアカウントを作成] を選択します。

任意のサービスアカウント名を入力し、[作成して続行] を選択します。

作成したサービスアカウントに「Storage オブジェクト管理者(roles/storage.objectAdmin)」「ログ書き込み(roles/logging.logWriter)」の権限を付与して[完了]を選択します。

Cloud Scheduler 用サービスアカウントの作成

再度[サービスアカウントを作成] を選択し、任意のサービスアカウント名の入力を行い、[作成して続行] を選択します。

作成したサービスアカウントに「Cloud Run ジョブ エグゼキュータ(roles/run.jobsExecutor)」の権限を付与して[完了]を選択します。

Cloud Storage の構築

ページトークン格納用、アップロードファイル格納用のバケットを作成します。

ページトークン格納用バケットの作成

Google Cloud コンソールのバケット画面で、[作成] を選択します。

グローバルに一意の名前を入力し、[続行] を選択します。

データの保存場所を選択し、[作成] を選択します。

「このバケットに対する公開アクセス禁止を適用する」にチェックがついていることを確認し、[確認] を選択します。

アップロードファイル格納用バケットの作成

再度バケット画面で、[作成] を選択し、グローバルに一意の名前を入力してアップロードファイル格納用バケットを作成します。(設定内容はページトークン格納用バケットと同様)

Cloud Run jobs の構築

Artifact Registry の作成、アプリケーションの準備、環境変数の設定、コンテナイメージのビルド、Cloud Run jobs のデプロイを行います。

Artifact Registry の作成

Google Cloud コンソールのリポジトリ画面で、[リポジトリの作成] を選択します。

任意のリポジトリ名とリージョンを指定し、[作成] を選択します。

アプリケーションの準備

今回開発する Python アプリケーションのディレクトリ構成は以下のとおりです。

drive-to-gcs-demo-app
|-- requirements.txt
|-- Dockerfile
|-- main.py

requirements.txt

google-api-python-client
google-auth
google-cloud-storage
google-cloud-logging

Dockerfile

FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY main.py .
CMD ["python", "main.py"]

main.py

import base64
import binascii
import io
import os
import sys
  
import google.auth
import google.cloud.logging
import logging
  
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from googleapiclient.http import MediaIoBaseDownload
from google.cloud import storage
from google.api_core import exceptions
  
  
# --- 環境変数から設定を読み込み ---
# アップロード先のGCSバケット名
UPLOAD_BUCKET_NAME = os.environ.get('UPLOAD_BUCKET_NAME')
# pageTokenを保存するためのGCSバケット名(UPLOAD_BUCKET_NAMEと同じでも可)
START_BUCKET_NAME = os.environ.get('START_BUCKET_NAME') 
# ログの出力レベル
LOG_LEVEL = int(os.environ.get('LOG_LEVEL'))  # ログレベル debug=10 / info=20 / warning=30
  
# pageTokenを保存するファイル名
PAGE_TOKEN_FILE = 'drive_page_token.txt'
  
# Cloud Logging クライアントを初期化し、ロギングを設定
client = google.cloud.logging.Client()
client.setup_logging()
  
# 標準の logger を取得し、ログレベルを設定
logger = logging.getLogger()
logger.setLevel(LOG_LEVEL)
  
# Google Drive APIのスコープ(読み取り専用)
DRIVE_SCOPES = ['https://www.googleapis.com/auth/drive.readonly']
  
  
def get_drive_service():
    """
    実行環境のデフォルト認証情報を取得する
    """
    credentials, _ = google.auth.default(scopes=DRIVE_SCOPES)
    return build('drive', 'v3', credentials=credentials)
  
  
def read_page_token_from_gcs(storage_client: storage.Client) -> str | None:
    """GCSから前回のpageTokenを読み込む"""
    try:
        bucket = storage_client.bucket(START_BUCKET_NAME)
        blob = bucket.blob(PAGE_TOKEN_FILE)
        return blob.download_as_text()
    except exceptions.NotFound:
        # ファイルが存在しない場合は初回実行とみなし、Noneを返す
        logger.info(f"'{PAGE_TOKEN_FILE}' not found in GCS. Assuming first run.")
        return None
    except Exception as e:
        logger.critical(
            f"CRITICAL: Failed to read page token from GCS due to an unexpected error. "
            f"This will cause the job to fail. Error: {e}"
        )
        raise
  
  
def write_page_token_to_gcs(storage_client: storage.Client, token: str):
    """次回の実行のために新しいpageTokenをGCSに保存する"""
    try:
        bucket = storage_client.bucket(START_BUCKET_NAME)
        blob = bucket.blob(PAGE_TOKEN_FILE)
        blob.upload_from_string(token)
        logger.info(f"Successfully saved new page token to GCS: {token}")
    except Exception as e:
        logger.critical(f"CRITICAL: Failed to save new page token to GCS. {e}")
        # ここで失敗すると次回実行時に変更が重複する可能性があるため、エラーを明示
        raise
  
  
def stream_drive_file_to_gcs(drive_service, storage_client, drive_file_id, drive_file_name, bucket_name):
    """
    特定のファイルをDriveからGCSへ、一時ファイルを使わずに直接ストリーミングコピーする
    """
    # GCSのオブジェクトパスとして、Driveのファイル名をそのまま使用する
    object_path = drive_file_name
    logger.info(f"--- Starting copy for '{drive_file_name}' ({drive_file_id}) to gs://{bucket_name}/{object_path} ---")
  
    try:
        # 1. Google Driveからファイルのメタデータを取得 (サイズ、MIMEタイプ、チェックサム)
        file_metadata = drive_service.files().get(
            fileId=drive_file_id,
            supportsTeamDrives=True,
            fields='size, mimeType, md5Checksum'
        ).execute()
  
        total_size = int(file_metadata.get('size', 0))
        content_type = file_metadata.get('mimeType', 'application/octet-stream')
        source_md5_checksum = file_metadata.get('md5Checksum')
  
        if not source_md5_checksum:
            logger.warning(f"Warning: MD5 checksum not available for '{drive_file_name}'. Skipping verification.")
  
        # 2. GCSのアップロードストリームを開く
        bucket = storage_client.bucket(bucket_name)
        destination_blob = bucket.blob(object_path)
  
        with destination_blob.open('wb', content_type=content_type) as gcs_stream:
            # 3. Driveのダウンローダーを初期化し、GCSストリームに直接接続
            request = drive_service.files().get_media(fileId=drive_file_id)
            downloader = MediaIoBaseDownload(fd=gcs_stream, request=request, chunksize=10 * 1024 * 1024)
  
            # 4. ダウンロード(=アップロード)を開始
            done = False
            while not done:
                status, done = downloader.next_chunk(num_retries=3)
                if status:
                    logger.info(f"  Copy in progress: {int(status.progress() * 100)}%")
          
        logger.info("  Streaming finished.")
  
        # 5. 整合性チェック (MD5チェックサムが取得できている場合のみ)
        if source_md5_checksum:
            destination_blob.reload() # GCSから最新のメタデータを取得
            gcs_md5_b64 = destination_blob.md5_hash
            gcs_md5_hex = binascii.hexlify(base64.b64decode(gcs_md5_b64)).decode('utf-8')
              
            if source_md5_checksum == gcs_md5_hex:
                logger.info(f"  Success! MD5 checksums match for '{drive_file_name}'.")
            else:
                logger.error(f"  ERROR: MD5 CHECKSUM MISMATCH for '{drive_file_name}'!")
                raise
          
        logger.info(f"--- Successfully copied '{drive_file_name}' ---")
        return True
  
    except HttpError as e:
        # 特にダウンロード権限がない(403)やファイルが見つからない(404)などの場合に発生
        logger.error(f"ERROR: An HTTP error occurred while copying '{drive_file_name}': {e}")
        raise
    except Exception as e:
        logger.error(f"ERROR: An unexpected error occurred while copying '{drive_file_name}': {e}")
        raise
  
  
def main():
    """
    Google Driveの変更を検知してGCSにファイルを同期するメインの処理
    """
    if not UPLOAD_BUCKET_NAME or not START_BUCKET_NAME or not LOG_LEVEL:
        logger.error("Error: Environment variables 'UPLOAD_BUCKET_NAME' and 'START_BUCKET_NAME' and 'LOG_LEVEL' must be set.")
        raise ValueError("Required environment variables are not set.")
  
    drive_service = get_drive_service()
    storage_client = storage.Client()
  
    # 処理対象と対象外のファイル名を記録するためのリストを初期化
    processed_files = []
    skipped_files = []
  
    page_token = read_page_token_from_gcs(storage_client)
    if page_token is None:
        # 初回実行時は、全ての変更を検知するための最初のトークンを取得する
        response = drive_service.changes().getStartPageToken().execute()
        page_token = response.get('startPageToken')
        logger.info(f"First run detected. Starting with token: {page_token}")
  
    logger.info(f"Processing changes starting from page token: {page_token}")
    while page_token is not None:
        response = drive_service.changes().list(
            pageToken=page_token,
            spaces='drive',
            includeItemsFromAllDrives=True,
            supportsAllDrives=True,
            fields='nextPageToken, newStartPageToken, changes(fileId, file(id, name, trashed, mimeType))'
        ).execute()
  
        # 取得した差分情報の出力(debug)
        logger.debug(f"response info: {response}")
  
        for change in response.get('changes', []):
            drive_file_id = change.get('fileId')
            drive_file_info = change.get('file')
  
            # ファイルが存在し、ゴミ箱になく、かつMIMEタイプが'application/pdf'である場合のみ処理を続行
            if drive_file_info and not drive_file_info.get('trashed') and drive_file_info.get('mimeType') == 'application/pdf':
                drive_file_name = drive_file_info.get('name')
                logger.info(f"\nPDF change detected, will process: '{drive_file_name}' (ID: {drive_file_id})")
  
                # 処理対象リストにファイル名を追加
                processed_files.append(drive_file_name)
  
                # ストリーミングコピー関数を呼び出す
                stream_drive_file_to_gcs(
                    drive_service=drive_service,
                    storage_client=storage_client,
                    drive_file_id=drive_file_id,
                    drive_file_name=drive_file_name,
                    bucket_name=UPLOAD_BUCKET_NAME
                )
            else:
                # 処理対象外のファイルとして記録
                if drive_file_info:
                    file_name_to_skip = drive_file_info.get('name', drive_file_id)
                    skipped_files.append(file_name_to_skip)
                      
                    # スキップ理由をログに記録
                    reason = ""
                    if drive_file_info.get('trashed'):
                        reason = "it is in trash"
                    elif drive_file_info.get('mimeType') == 'application/vnd.google-apps.folder':
                        reason = "it is a folder"
                    else:
                        reason = f"its MIME type is not PDF ('{drive_file_info.get('mimeType')}')"
                    logger.info(f"Skipping '{file_name_to_skip}' because {reason}.")
                else:
                    # ファイル情報自体がない場合
                    logger.info(f"Skipping change for file ID {drive_file_id} because file information could not be retrieved.")
  
        # 次のページのトークンがあればループを継続、なければ終了
        if 'nextPageToken' in response:
            page_token = response.get('nextPageToken')
        else:
            # ループ終了。次回の実行のために新しいStartPageTokenを保存
            new_start_page_token = response.get('newStartPageToken')
            write_page_token_to_gcs(storage_client, new_start_page_token)
            logger.info("\nFinished processing all changes.")
            page_token = None
  
    # すべての処理ループが終わった後で、最終的な結果をINFOレベルでロギング
    logger.info("--- Execution Summary ---")
    summary_processed = f"処理対象ファイル:{', '.join(processed_files)}" if processed_files else "処理対象ファイル:なし"
    summary_skipped = f"処理対象外ファイル:{', '.join(skipped_files)}" if skipped_files else "処理対象外ファイル:なし"
    logger.info(summary_processed)
    logger.info(summary_skipped)
  
  
if __name__ == "__main__":
    logger.info("Cloud Run Job started.")
    try:
        main()
        logger.info("Cloud Run Job finished successfully.")
    except Exception as e:
        logger.critical(f"Job failed with a critical error: {e}", exc_info=True)
        sys.exit(1)
    finally:
        if 'client' in locals() and client:
            client.close()

環境変数の設定

以降の手順で繰り返し使用する Google Cloud プロジェクト ID やリージョンなどの情報を環境変数に設定します。

ご自身の環境に合わせて、以下のコマンドで環境変数を設定してください。

# Google Cloud プロジェクト ID
export PROJECT_ID="your-project-id"
  
# プロジェクトのデフォルトリージョン
export LOCATION="asia-northeast1"
  
# Artifact Registry の名前
export REPO="my-repo"
  
# Image の名前
export IMAGE="my-image"
  
# Cloud Run jobs の名前
export JOB_NAME="my-job"
  
# Cloud Run jobs 用サービスアカウントの名前
export SERVICE_ACCOUNT_NAME="cloud-run-jobs-sa"
  
# アップロードファイル格納用のバケットの名前
export UPLOAD_BUCKET="upload-drive-pdf"
  
# ページトークン格納用のバケットの名前
export START_BUCKET="pagetoken"

コンテナイメージのビルド

Dockerfile が存在するディレクトリで、以下の gcloud コマンドを実行します。

gcloud builds submit --tag ${LOCATION}-docker.pkg.dev/${PROJECT_ID}/${REPO}/${IMAGE}:latest

Cloud Run jobs のデプロイ

作成したコンテナイメージを使用して、Cloud Run jobs をデプロイします。

gcloud run jobs deploy ${JOB_NAME} \
    --image ${LOCATION}-docker.pkg.dev/${PROJECT_ID}/${REPO}/${IMAGE}:latest \
    --region=${LOCATION} \
    --service-account="${SERVICE_ACCOUNT_NAME}@$PROJECT_ID.iam.gserviceaccount.com" \
    --set-env-vars="UPLOAD_BUCKET_NAME=${UPLOAD_BUCKET}" \
    --set-env-vars="START_BUCKET_NAME=${START_BUCKET}" \
    --set-env-vars="LOG_LEVEL=20" \
    --tasks=1 \
    --max-retries=3\
    --task-timeout=1440m

Cloud Scheduler の構築

Google Cloud コンソールのジョブ画面で、作成した Cloud Run jobs を選択します。

ジョブの詳細画面で、[トリガー]を選択し、[スケジューラトリガーを追加]を選択します。

任意の名前、リージョン、頻度、タイムゾーンを指定し、[続行] を選択します。

サービスアカウントに Cloud Scheduler 用サービスアカウントを設定し、[作成] を選択します。

動作確認

以下の順番で動作確認を行います。

  1. Google Drive のフォルダに対して Cloud Run jobs 用サービスアカウントに参照権限を付与
  2. Cloud Run jobs を実行し、現在のページトークンを取得
  3. Google Drive のフォルダに対してファイルを配置し、Cloud Run jobs を実行

Google Drive のフォルダに対して Cloud Run jobs 用サービスアカウントに参照権限を付与

差分検知を行う Google Drive のフォルダ上で、共有アイコンを選択します。

Cloud Run jobs 用サービスアカウントに閲覧者権限を付与し、[共有]を選択します。

Cloud Run jobs を実行し、現在のページトークンを取得

今回は動作確認のため、Cloud Schduler による時刻起動は行わず、Google Cloud コンソールのジョブの詳細画面で、[実行] を選択します。

ページトークン格納用バケットにページトークンを保持するテキストファイルが配置されています。

Google Drive のフォルダに対してファイルを配置し、Cloud Run jobs を実行

差分検知を行う Google Drive のフォルダ上に PDF ファイルを配置します。

再度、Google Cloud コンソールのジョブの詳細画面で、[実行] を選択します。

アップロードファイル格納用のバケットに、Google Drive のフォルダに配置した PDF ファイルが配置されています。

補足 : Google Drive Events API

2025年7月7日に Google Drive Events API が Preview 公開されました。

Google ドライブのファイル変更通知を CloudEvent 仕様に従ってフォーマットされた Pub/Sub メッセージで受け取ることが可能となるため、より活用の幅が広がることが期待されます。

山崎 曜(記事一覧)

クラウドソリューション部

元は日系大手SIerにて金融の決済領域のお客様に対して、PM/APエンジニアとして、要件定義〜保守運用まで全工程に従事。
Google Cloud Partner Top Engineer 2025 選出。
Google Cloud 全 13 資格保有。
フルスタックな人材を目指し、日々邁進。