G-gen の山崎です。 当記事では、Cloud Run jobs を使用して Google Drive の差分を検知し、差分ファイルを Cloud Storage にアップロードする方法を解説します。

システム構成
今回構築したシステム構成は、以下のとおりです。

Cloud Scheduler を使って定期的に Cloud Run jobs を起動します。
起動した Cloud Run jobs は、以下の流れで前回処理を行った以降に発生した Google Drive の差分を検知し、その差分のファイルを Cloud Storage にアップロードします。
- ページトークン取得
- 差分ファイル情報取得
- ファイルアップロード
- ページトークン更新
ページトークンは、Google Drive API で変更を追跡する際の起点となる識別子です。 このトークンを保存・更新することで、前回処理を実行した時点からの差分のみを効率的に取得できます。
- 参考 : 変更を取得する
前提知識
今回使用する Google Cloud サービスの詳細は、以下の記事をご参照ください。
Cloud Storage
Cloud Run jobs
Cloud Scheduler
環境構築
以下の順序で環境を構築しました。
- API の有効化
- サービスアカウントの構築
- Cloud Storage の構築
- Cloud Run jobs の構築
- Cloud Scheduler の構築
API の有効化
Google Cloud プロジェクトで、今回の構成に必要な Google Cloud サービスの API を有効化します。
gcloud services enable \ artifactregistry.googleapis.com \ cloudbuild.googleapis.com \ run.googleapis.com \ cloudscheduler.googleapis.com \ drive.googleapis.com
サービスアカウントの構築
Cloud Run jobs、Cloud Scheduler 用のサービスアカウントを作成します。
Cloud Run jobs 用サービスアカウントの作成
Google Cloud コンソールのサービスアカウント画面で、[サービスアカウントを作成] を選択します。

任意のサービスアカウント名を入力し、[作成して続行] を選択します。

作成したサービスアカウントに「Storage オブジェクト管理者(roles/storage.objectAdmin)」「ログ書き込み(roles/logging.logWriter)」の権限を付与して[完了]を選択します。

Cloud Scheduler 用サービスアカウントの作成
再度[サービスアカウントを作成] を選択し、任意のサービスアカウント名の入力を行い、[作成して続行] を選択します。

作成したサービスアカウントに「Cloud Run ジョブ エグゼキュータ(roles/run.jobsExecutor)」の権限を付与して[完了]を選択します。

Cloud Storage の構築
ページトークン格納用、アップロードファイル格納用のバケットを作成します。
ページトークン格納用バケットの作成
Google Cloud コンソールのバケット画面で、[作成] を選択します。

グローバルに一意の名前を入力し、[続行] を選択します。

データの保存場所を選択し、[作成] を選択します。

「このバケットに対する公開アクセス禁止を適用する」にチェックがついていることを確認し、[確認] を選択します。

アップロードファイル格納用バケットの作成
再度バケット画面で、[作成] を選択し、グローバルに一意の名前を入力してアップロードファイル格納用バケットを作成します。(設定内容はページトークン格納用バケットと同様)

Cloud Run jobs の構築
Artifact Registry の作成、アプリケーションの準備、環境変数の設定、コンテナイメージのビルド、Cloud Run jobs のデプロイを行います。
Artifact Registry の作成
Google Cloud コンソールのリポジトリ画面で、[リポジトリの作成] を選択します。

任意のリポジトリ名とリージョンを指定し、[作成] を選択します。

アプリケーションの準備
今回開発する Python アプリケーションのディレクトリ構成は以下のとおりです。
drive-to-gcs-demo-app |-- requirements.txt |-- Dockerfile |-- main.py
requirements.txt
google-api-python-client google-auth google-cloud-storage google-cloud-logging
Dockerfile
FROM python:3.12-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY main.py . CMD ["python", "main.py"]
main.py
import base64 import binascii import io import os import sys import google.auth import google.cloud.logging import logging from googleapiclient.discovery import build from googleapiclient.errors import HttpError from googleapiclient.http import MediaIoBaseDownload from google.cloud import storage from google.api_core import exceptions # --- 環境変数から設定を読み込み --- # アップロード先のGCSバケット名 UPLOAD_BUCKET_NAME = os.environ.get('UPLOAD_BUCKET_NAME') # pageTokenを保存するためのGCSバケット名(UPLOAD_BUCKET_NAMEと同じでも可) START_BUCKET_NAME = os.environ.get('START_BUCKET_NAME') # ログの出力レベル LOG_LEVEL = int(os.environ.get('LOG_LEVEL')) # ログレベル debug=10 / info=20 / warning=30 # pageTokenを保存するファイル名 PAGE_TOKEN_FILE = 'drive_page_token.txt' # Cloud Logging クライアントを初期化し、ロギングを設定 client = google.cloud.logging.Client() client.setup_logging() # 標準の logger を取得し、ログレベルを設定 logger = logging.getLogger() logger.setLevel(LOG_LEVEL) # Google Drive APIのスコープ(読み取り専用) DRIVE_SCOPES = ['https://www.googleapis.com/auth/drive.readonly'] def get_drive_service(): """ 実行環境のデフォルト認証情報を取得する """ credentials, _ = google.auth.default(scopes=DRIVE_SCOPES) return build('drive', 'v3', credentials=credentials) def read_page_token_from_gcs(storage_client: storage.Client) -> str | None: """GCSから前回のpageTokenを読み込む""" try: bucket = storage_client.bucket(START_BUCKET_NAME) blob = bucket.blob(PAGE_TOKEN_FILE) return blob.download_as_text() except exceptions.NotFound: # ファイルが存在しない場合は初回実行とみなし、Noneを返す logger.info(f"'{PAGE_TOKEN_FILE}' not found in GCS. Assuming first run.") return None except Exception as e: logger.critical( f"CRITICAL: Failed to read page token from GCS due to an unexpected error. " f"This will cause the job to fail. Error: {e}" ) raise def write_page_token_to_gcs(storage_client: storage.Client, token: str): """次回の実行のために新しいpageTokenをGCSに保存する""" try: bucket = storage_client.bucket(START_BUCKET_NAME) blob = bucket.blob(PAGE_TOKEN_FILE) blob.upload_from_string(token) logger.info(f"Successfully saved new page token to GCS: {token}") except Exception as e: logger.critical(f"CRITICAL: Failed to save new page token to GCS. {e}") # ここで失敗すると次回実行時に変更が重複する可能性があるため、エラーを明示 raise def stream_drive_file_to_gcs(drive_service, storage_client, drive_file_id, drive_file_name, bucket_name): """ 特定のファイルをDriveからGCSへ、一時ファイルを使わずに直接ストリーミングコピーする """ # GCSのオブジェクトパスとして、Driveのファイル名をそのまま使用する object_path = drive_file_name logger.info(f"--- Starting copy for '{drive_file_name}' ({drive_file_id}) to gs://{bucket_name}/{object_path} ---") try: # 1. Google Driveからファイルのメタデータを取得 (サイズ、MIMEタイプ、チェックサム) file_metadata = drive_service.files().get( fileId=drive_file_id, supportsTeamDrives=True, fields='size, mimeType, md5Checksum' ).execute() total_size = int(file_metadata.get('size', 0)) content_type = file_metadata.get('mimeType', 'application/octet-stream') source_md5_checksum = file_metadata.get('md5Checksum') if not source_md5_checksum: logger.warning(f"Warning: MD5 checksum not available for '{drive_file_name}'. Skipping verification.") # 2. GCSのアップロードストリームを開く bucket = storage_client.bucket(bucket_name) destination_blob = bucket.blob(object_path) with destination_blob.open('wb', content_type=content_type) as gcs_stream: # 3. Driveのダウンローダーを初期化し、GCSストリームに直接接続 request = drive_service.files().get_media(fileId=drive_file_id) downloader = MediaIoBaseDownload(fd=gcs_stream, request=request, chunksize=10 * 1024 * 1024) # 4. ダウンロード(=アップロード)を開始 done = False while not done: status, done = downloader.next_chunk(num_retries=3) if status: logger.info(f" Copy in progress: {int(status.progress() * 100)}%") logger.info(" Streaming finished.") # 5. 整合性チェック (MD5チェックサムが取得できている場合のみ) if source_md5_checksum: destination_blob.reload() # GCSから最新のメタデータを取得 gcs_md5_b64 = destination_blob.md5_hash gcs_md5_hex = binascii.hexlify(base64.b64decode(gcs_md5_b64)).decode('utf-8') if source_md5_checksum == gcs_md5_hex: logger.info(f" Success! MD5 checksums match for '{drive_file_name}'.") else: logger.error(f" ERROR: MD5 CHECKSUM MISMATCH for '{drive_file_name}'!") raise logger.info(f"--- Successfully copied '{drive_file_name}' ---") return True except HttpError as e: # 特にダウンロード権限がない(403)やファイルが見つからない(404)などの場合に発生 logger.error(f"ERROR: An HTTP error occurred while copying '{drive_file_name}': {e}") raise except Exception as e: logger.error(f"ERROR: An unexpected error occurred while copying '{drive_file_name}': {e}") raise def main(): """ Google Driveの変更を検知してGCSにファイルを同期するメインの処理 """ if not UPLOAD_BUCKET_NAME or not START_BUCKET_NAME or not LOG_LEVEL: logger.error("Error: Environment variables 'UPLOAD_BUCKET_NAME' and 'START_BUCKET_NAME' and 'LOG_LEVEL' must be set.") raise ValueError("Required environment variables are not set.") drive_service = get_drive_service() storage_client = storage.Client() # 処理対象と対象外のファイル名を記録するためのリストを初期化 processed_files = [] skipped_files = [] page_token = read_page_token_from_gcs(storage_client) if page_token is None: # 初回実行時は、全ての変更を検知するための最初のトークンを取得する response = drive_service.changes().getStartPageToken().execute() page_token = response.get('startPageToken') logger.info(f"First run detected. Starting with token: {page_token}") logger.info(f"Processing changes starting from page token: {page_token}") while page_token is not None: response = drive_service.changes().list( pageToken=page_token, spaces='drive', includeItemsFromAllDrives=True, supportsAllDrives=True, fields='nextPageToken, newStartPageToken, changes(fileId, file(id, name, trashed, mimeType))' ).execute() # 取得した差分情報の出力(debug) logger.debug(f"response info: {response}") for change in response.get('changes', []): drive_file_id = change.get('fileId') drive_file_info = change.get('file') # ファイルが存在し、ゴミ箱になく、かつMIMEタイプが'application/pdf'である場合のみ処理を続行 if drive_file_info and not drive_file_info.get('trashed') and drive_file_info.get('mimeType') == 'application/pdf': drive_file_name = drive_file_info.get('name') logger.info(f"\nPDF change detected, will process: '{drive_file_name}' (ID: {drive_file_id})") # 処理対象リストにファイル名を追加 processed_files.append(drive_file_name) # ストリーミングコピー関数を呼び出す stream_drive_file_to_gcs( drive_service=drive_service, storage_client=storage_client, drive_file_id=drive_file_id, drive_file_name=drive_file_name, bucket_name=UPLOAD_BUCKET_NAME ) else: # 処理対象外のファイルとして記録 if drive_file_info: file_name_to_skip = drive_file_info.get('name', drive_file_id) skipped_files.append(file_name_to_skip) # スキップ理由をログに記録 reason = "" if drive_file_info.get('trashed'): reason = "it is in trash" elif drive_file_info.get('mimeType') == 'application/vnd.google-apps.folder': reason = "it is a folder" else: reason = f"its MIME type is not PDF ('{drive_file_info.get('mimeType')}')" logger.info(f"Skipping '{file_name_to_skip}' because {reason}.") else: # ファイル情報自体がない場合 logger.info(f"Skipping change for file ID {drive_file_id} because file information could not be retrieved.") # 次のページのトークンがあればループを継続、なければ終了 if 'nextPageToken' in response: page_token = response.get('nextPageToken') else: # ループ終了。次回の実行のために新しいStartPageTokenを保存 new_start_page_token = response.get('newStartPageToken') write_page_token_to_gcs(storage_client, new_start_page_token) logger.info("\nFinished processing all changes.") page_token = None # すべての処理ループが終わった後で、最終的な結果をINFOレベルでロギング logger.info("--- Execution Summary ---") summary_processed = f"処理対象ファイル:{', '.join(processed_files)}" if processed_files else "処理対象ファイル:なし" summary_skipped = f"処理対象外ファイル:{', '.join(skipped_files)}" if skipped_files else "処理対象外ファイル:なし" logger.info(summary_processed) logger.info(summary_skipped) if __name__ == "__main__": logger.info("Cloud Run Job started.") try: main() logger.info("Cloud Run Job finished successfully.") except Exception as e: logger.critical(f"Job failed with a critical error: {e}", exc_info=True) sys.exit(1) finally: if 'client' in locals() and client: client.close()
環境変数の設定
以降の手順で繰り返し使用する Google Cloud プロジェクト ID やリージョンなどの情報を環境変数に設定します。
ご自身の環境に合わせて、以下のコマンドで環境変数を設定してください。
# Google Cloud プロジェクト ID export PROJECT_ID="your-project-id" # プロジェクトのデフォルトリージョン export LOCATION="asia-northeast1" # Artifact Registry の名前 export REPO="my-repo" # Image の名前 export IMAGE="my-image" # Cloud Run jobs の名前 export JOB_NAME="my-job" # Cloud Run jobs 用サービスアカウントの名前 export SERVICE_ACCOUNT_NAME="cloud-run-jobs-sa" # アップロードファイル格納用のバケットの名前 export UPLOAD_BUCKET="upload-drive-pdf" # ページトークン格納用のバケットの名前 export START_BUCKET="pagetoken"
コンテナイメージのビルド
Dockerfile が存在するディレクトリで、以下の gcloud コマンドを実行します。
gcloud builds submit --tag ${LOCATION}-docker.pkg.dev/${PROJECT_ID}/${REPO}/${IMAGE}:latest
Cloud Run jobs のデプロイ
作成したコンテナイメージを使用して、Cloud Run jobs をデプロイします。
gcloud run jobs deploy ${JOB_NAME} \ --image ${LOCATION}-docker.pkg.dev/${PROJECT_ID}/${REPO}/${IMAGE}:latest \ --region=${LOCATION} \ --service-account="${SERVICE_ACCOUNT_NAME}@$PROJECT_ID.iam.gserviceaccount.com" \ --set-env-vars="UPLOAD_BUCKET_NAME=${UPLOAD_BUCKET}" \ --set-env-vars="START_BUCKET_NAME=${START_BUCKET}" \ --set-env-vars="LOG_LEVEL=20" \ --tasks=1 \ --max-retries=3\ --task-timeout=1440m
Cloud Scheduler の構築
Google Cloud コンソールのジョブ画面で、作成した Cloud Run jobs を選択します。

ジョブの詳細画面で、[トリガー]を選択し、[スケジューラトリガーを追加]を選択します。

任意の名前、リージョン、頻度、タイムゾーンを指定し、[続行] を選択します。

サービスアカウントに Cloud Scheduler 用サービスアカウントを設定し、[作成] を選択します。

動作確認
以下の順番で動作確認を行います。
- Google Drive のフォルダに対して Cloud Run jobs 用サービスアカウントに参照権限を付与
- Cloud Run jobs を実行し、現在のページトークンを取得
- Google Drive のフォルダに対してファイルを配置し、Cloud Run jobs を実行
Google Drive のフォルダに対して Cloud Run jobs 用サービスアカウントに参照権限を付与
差分検知を行う Google Drive のフォルダ上で、共有アイコンを選択します。

Cloud Run jobs 用サービスアカウントに閲覧者権限を付与し、[共有]を選択します。

Cloud Run jobs を実行し、現在のページトークンを取得
今回は動作確認のため、Cloud Schduler による時刻起動は行わず、Google Cloud コンソールのジョブの詳細画面で、[実行] を選択します。

ページトークン格納用バケットにページトークンを保持するテキストファイルが配置されています。

Google Drive のフォルダに対してファイルを配置し、Cloud Run jobs を実行
差分検知を行う Google Drive のフォルダ上に PDF ファイルを配置します。

再度、Google Cloud コンソールのジョブの詳細画面で、[実行] を選択します。
アップロードファイル格納用のバケットに、Google Drive のフォルダに配置した PDF ファイルが配置されています。

補足 : Google Drive Events API
2025年7月7日に Google Drive Events API が Preview 公開されました。
Google ドライブのファイル変更通知を CloudEvent 仕様に従ってフォーマットされた Pub/Sub メッセージで受け取ることが可能となるため、より活用の幅が広がることが期待されます。
山崎 曜(記事一覧)
クラウドソリューション部
元は日系大手SIerにて金融の決済領域のお客様に対して、PM/APエンジニアとして、要件定義〜保守運用まで全工程に従事。
Google Cloud Partner Top Engineer 2025 選出。
Google Cloud 全 13 資格保有。
フルスタックな人材を目指し、日々邁進。
