G-gen の三浦です。当記事では Workload Identity の仕組みを使うことで、サービスアカウントキーを使わずに GitHub Enterprise の監査ログを BigQuery にエクスポートする仕組みを構築したのでご紹介します。
GitHub Enterprise とは
概要
GitHub Enterprise は、複数組織の一元管理や Microsoft Entra ID などの IdP(Identity Provider)を使用した SSO(シングルサインオン)などの機能を提供する有償プランです。
本プランでは監査ログ API を使用して組織内の操作履歴を取得・管理できます。監査ログを BigQuery にエクスポートすると、長期保存や高度な分析を行い、セキュリティ対策やコンプライアンス強化に役立ちます。
- 参考 : エンタープライズ向け GitHub について
- 参考 : プランご紹介
監査ログ
GitHub の監査ログには、組織メンバーのレポジトリ作成、プルリクエストやマージなどの操作が記録され、過去180日分のログを確認できます。git.clone
など一部の Git イベントは 7 日間のみ保持されます。
- 参考 : 企業の監査ログにアクセスする
- 参考 : エンタープライズの監査ログ イベント
Google Cloud への監査ログエクスポート
GitHub の監査ログは JSON 形式で Cloud Storage にエクスポートできますが、通常はサービスアカウントキーが必要です。
サービスアカウントキーは厳重な管理が必要です。漏洩した場合、第三者による不正利用のリスクがあります。Google Cloud のベストプラクティスでは、サービスアカウントキーを使用せずに認証する方法が推奨されています。
以上のことから、当記事では Google Cloud の Workload Identity 機能を使うことで、サービスアカウントキーを使用せずに GitHub 監査ログを取得する仕組みを実装しました。
アーキテクチャ
構成図
構成は図のとおりです。GitHub Actions を使用して監査ログを取得し、BigQuery にエクスポートします。
ディレクトリ構成
ディレクトリ構成は以下の通りです。
. └── app └── main.py # 監査ログを BigQuery へエクスポートするスクリプト .github └── workflows └── github-audit-log-to-bq.yml # GitHub Actions を定義
環境構築
Workload Identity とサービスアカウントの作成
GitHub Actions と Google Cloud を連携させるための Workload Identity とサービスアカウントを作成します。作成方法は、次の記事を参照してください。
Google CloudとGitHub Actions(Terraform)を連携するWorkload Identityを作成するbashスクリプト blog.g-gen.co.jp
BigQuery データセットの作成とサービスアカウントへの権限付与
BigQuery のデータセットを作成します。テーブルは GitHub Actions で自動的に作成されます。
# 環境変数を設定 PROJECT_ID="gha-demo-prj" # プロジェクトID SERVICE_ACCOUNT_NAME="gha-demo-sa" # サービスアカウント名 BQ_DATASET="my_dataset" # BigQueryのデータセット名 # BigQuery データセットを東京リージョンに作成 bq --project_id=$PROJECT_ID \ mk --location=asia-northeast1 \ $BQ_DATASET
サービスアカウントに BigQuery へのデータ書き込み権限とジョブの実行権限を付与します。
# サービスアカウントへ権限付与 gcloud projects add-iam-policy-binding $PROJECT_ID \ --member="serviceAccount:$SERVICE_ACCOUNT_NAME@$PROJECT_ID.iam.gserviceaccount.com" \ --role="roles/bigquery.dataEditor" gcloud projects add-iam-policy-binding $PROJECT_ID \ --member="serviceAccount:$SERVICE_ACCOUNT_NAME@$PROJECT_ID.iam.gserviceaccount.com" \ --role="roles/bigquery.jobUser"
GitHub Actions ワークフローの作成
このワークフローは、GitHub の監査ログを1日に1回自動で取得し、BigQuery にエクスポートします。前回の自動取得時刻をアーティファクト(GitHub Actions の成果物保存機能)で管理することで差分のログのみを取得します。過去のログも手動で取得できます。
- 参考 : ワークフローからのデータの格納と共有
env:
の箇所に、環境に応じたプロジェクト ID などの値を記載してください。
# github-audit-log-to-bq.yml name: Fetch and Upload GitHub Audit Logs to BigQuery on: schedule: # スケジュール実行 - cron: '0 15 * * *' # 毎日 JST 0 時に実行(UTC 15 時) workflow_dispatch: # 手動実行 inputs: start_date: # ログ取得の開始日 description: "Start date for fetching logs (ISO 8601 format, e.g., 2024-09-01T00:00:00Z)" required: true default: "2024-10-01T00:00:00Z" end_date: # ログ取得の終了日 description: "End date for fetching logs (ISO 8601 format, e.g., 2024-09-30T23:59:59Z)" required: true default: "2024-10-31T23:59:59Z" permissions: id-token: write contents: read actions: read jobs: fetch-and-upload: runs-on: ubuntu-latest env: # BigQuery 設定 BQ_GCP_PROJECT_ID: gha-demo-prj # Google Cloud プロジェクト ID BQ_DATASET: my_dataset # BigQuery データセット名 BQ_TABLE: my_table # BigQuery テーブル名 # GitHub 設定 GITHUB_ORG: myorg # GitHub 組織名 GH_TOKEN: ${{ github.token }} # GitHub CLI 用のトークン # Workload Identity Federation 設定 PROJECT_NUMBER: 1234567890 # プロジェクト番号 WORKLOAD_IDENTITY_POOL: gha-demo-pool # Workload Identity プール名 WORKLOAD_IDENTITY_POOL_PROVIDER: gha-demo-provider # Workload Identity プールプロバイダ名 SERVICE_ACCOUNT: gha-demo-sa@gha-demo-prj.iam.gserviceaccount.com # 使用するサービスアカウント名 # アーティファクト設定 LAST_RUN_TIMESTAMP_NAME: "last_run_timestamp" # 前回実行時刻のアーティファクト名 steps: - uses: actions/checkout@v4 # Google Cloud 認証 (Workload Identity) - id: 'auth' uses: 'google-github-actions/auth@v2' with: workload_identity_provider: 'projects/${{ env.PROJECT_NUMBER }}/locations/global/workloadIdentityPools/${{ env.WORKLOAD_IDENTITY_POOL }}/providers/${{ env.WORKLOAD_IDENTITY_POOL_PROVIDER }}' service_account: ${{ env.SERVICE_ACCOUNT }} # GitHub App トークンを生成 - name: Generate GitHub App token id: generate_token uses: actions/create-github-app-token@v1 with: app-id: ${{ secrets.APP_ID }} private-key: ${{ secrets.APP_PRIVATE_KEY }} # GitHub App トークンを環境変数に設定 - name: Set GitHub App Access Token run: echo "ACCESS_TOKEN=${{ steps.generate_token.outputs.token }}" >> $GITHUB_ENV # Python環境をセットアップ - name: Set up Python environment run: | python3 -m venv venv . venv/bin/activate pip install --upgrade pip pip install google-cloud-bigquery requests jq pandas # 初回実行時のタイムスタンプを設定 - name: Set Default Timestamp to Start of Today if: ${{ github.event_name == 'schedule' }} run: | DEFAULT_TIMESTAMP=$(date -u +"%Y-%m-%dT00:00:00Z") echo "DEFAULT_TIMESTAMP=$DEFAULT_TIMESTAMP" >> $GITHUB_ENV # 前回実行時刻のアーティファクトをダウンロード - name: Download previous timestamp artifact if: ${{ github.event_name == 'schedule' }} run: | mkdir -p artifacts ARTIFACT_URL=$(gh api -X GET "repos/${{ github.repository }}/actions/artifacts" \ | jq -r '.artifacts[] | select(.name=="'${{ env.LAST_RUN_TIMESTAMP_NAME }}'") | .archive_download_url' | head -n 1) if [ -n "$ARTIFACT_URL" ]; then curl -L -o artifacts/${{ env.LAST_RUN_TIMESTAMP_NAME }}.zip -H "Authorization: token ${{ github.token }}" "$ARTIFACT_URL" unzip -o artifacts/${{ env.LAST_RUN_TIMESTAMP_NAME }}.zip -d artifacts/ if [ ! -f "artifacts/${{ env.LAST_RUN_TIMESTAMP_NAME }}.txt" ]; then echo "${{ env.DEFAULT_TIMESTAMP }}" > artifacts/${{ env.LAST_RUN_TIMESTAMP_NAME }}.txt fi else echo "${{ env.DEFAULT_TIMESTAMP }}" > artifacts/${{ env.LAST_RUN_TIMESTAMP_NAME }}.txt fi # 前回実行時刻を読み込み - name: Read previous timestamp if: ${{ github.event_name == 'schedule' }} id: read-timestamp run: | if [ ! -f "artifacts/${{ env.LAST_RUN_TIMESTAMP_NAME }}.txt" ]; then echo "${{ env.DEFAULT_TIMESTAMP }}" > artifacts/${{ env.LAST_RUN_TIMESTAMP_NAME }}.txt fi last_run=$(cat artifacts/${{ env.LAST_RUN_TIMESTAMP_NAME }}.txt) echo "last_run=$last_run" >> $GITHUB_ENV # 今回実行時刻を環境変数に保存 - name: Set execution time for last_run if: ${{ github.event_name == 'schedule' }} run: | execution_time=$(date --utc --iso-8601=seconds) echo "execution_time=$execution_time" >> $GITHUB_ENV # スケジュール実行時の監査ログ取得 - name: Fetch GitHub Audit Logs for Scheduled Run if: ${{ github.event_name == 'schedule' }} env: last_run: ${{ env.last_run }} ACCESS_TOKEN: ${{ env.ACCESS_TOKEN }} run: | . venv/bin/activate rm -rf app/audit_logs mkdir -p app/audit_logs next_url="https://api.github.com/orgs/${{ env.GITHUB_ORG }}/audit-log?phrase=created:>$last_run&per_page=100&include=all" while [[ ! -z "$next_url" ]]; do response=$(curl -s -H "Authorization: Bearer $ACCESS_TOKEN" \ -H "Accept: application/vnd.github.v3+json" \ "$next_url") if echo "$response" | jq -e 'type == "object" and has("message")' >/dev/null 2>&1; then exit 1 fi logs_count=$(echo "$response" | jq '. | length') echo "$response" > "app/audit_logs/log_$logs_count.json" next_url=$(curl -s -I -H "Authorization: Bearer $ACCESS_TOKEN" "$next_url" | grep -i '^link:' | sed -n 's/.*<\(.*\)>; rel="next".*/\1/p') sleep 2 done # 手動実行時の監査ログ取得 - name: Fetch GitHub Audit Logs for Manual Run if: ${{ github.event_name == 'workflow_dispatch' }} env: start_date: ${{ github.event.inputs.start_date }} end_date: ${{ github.event.inputs.end_date }} ACCESS_TOKEN: ${{ env.ACCESS_TOKEN }} run: | . venv/bin/activate rm -rf app/audit_logs mkdir -p app/audit_logs next_url="https://api.github.com/orgs/${{ env.GITHUB_ORG }}/audit-log?phrase=created%3A>${start_date}%20created%3A<${end_date}&per_page=100&include=all" while [[ ! -z "$next_url" ]]; do response=$(curl -s -H "Authorization: Bearer $ACCESS_TOKEN" \ -H "Accept: application/vnd.github.v3+json" \ "$next_url") if echo "$response" | jq -e 'type == "object" and has("message")' >/dev/null 2>&1; then exit 1 fi logs_count=$(echo "$response" | jq '. | length') echo "$response" > "app/audit_logs/log_$logs_count.json" next_url=$(curl -s -I -H "Authorization: Bearer $ACCESS_TOKEN" "$next_url" | grep -i '^link:' | sed -n 's/.*<\(.*\)>; rel="next".*/\1/p') sleep 2 done # 監査ログを BigQuery にアップロード - name: Run script to upload logs to BigQuery run: | . venv/bin/activate python3 app/main.py # 今回実行時刻をファイルに書き込み - name: Write new execution time to file if: ${{ success() && github.event_name == 'schedule' }} run: | mkdir -p artifacts echo "$execution_time" > artifacts/${{ env.LAST_RUN_TIMESTAMP_NAME }}.txt # 今回実行時刻をアーティファクトとしてアップロード - name: Upload updated timestamp artifact if: ${{ success() && github.event_name == 'schedule' }} uses: actions/upload-artifact@v4 with: name: ${{ env.LAST_RUN_TIMESTAMP_NAME }} path: artifacts/${{ env.LAST_RUN_TIMESTAMP_NAME }}.txt
main.py の作成
このスクリプトは GitHub Actions で取得した監査ログを加工し、BigQuery へエクスポートします。エクスポート先のテーブルが存在しない場合、新規に作成します。
import json import logging import os import sys import time import pandas as pd from datetime import datetime, timezone from google.cloud import bigquery from google.api_core.exceptions import NotFound, GoogleAPIError import re # ログの設定: ログメッセージの出力形式を指定し、INFOレベル以上のメッセージを記録 logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') def get_bigquery_client(): """BigQueryクライアントを初期化して返す""" return bigquery.Client() def flatten_json(y): """ネストされたJSONデータを平坦化""" out = {} def flatten(x, name=''): if isinstance(x, dict): for a in x: flatten(x[a], name + a + '_') elif isinstance(x, list): for i, a in enumerate(x): flatten(a, name + str(i) + '_') else: out[name[:-1]] = x flatten(y) return out def clean_field_name(field_name): """フィールド名をBigQueryで許可された形式に変換""" field_name = re.sub(r'[^a-zA-Z0-9_]', '_', field_name) if field_name[0].isdigit(): field_name = '_' + field_name return field_name def infer_schema_from_logs(logs_df): """DataFrameからBigQuery用のスキーマを推測して生成""" schema = [] for column in logs_df.columns: clean_column = clean_field_name(column) dtype = logs_df[column].dtype if clean_column == "_timestamp" and pd.api.types.is_integer_dtype(dtype): schema.append(bigquery.SchemaField("timestamp", "TIMESTAMP")) elif pd.api.types.is_integer_dtype(dtype): schema.append(bigquery.SchemaField(clean_column, "INTEGER")) elif pd.api.types.is_float_dtype(dtype): schema.append(bigquery.SchemaField(clean_column, "FLOAT")) elif pd.api.types.is_bool_dtype(dtype): schema.append(bigquery.SchemaField(clean_column, "BOOLEAN")) elif pd.api.types.is_datetime64_any_dtype(dtype): schema.append(bigquery.SchemaField(clean_column, "TIMESTAMP")) else: schema.append(bigquery.SchemaField(clean_column, "STRING")) return schema def create_table_if_not_exists(client, table_ref, logs_df): """テーブルが存在しない場合、新規作成""" try: table = client.get_table(table_ref) logging.info(f"Table {table_ref} already exists.") return table except NotFound: schema = infer_schema_from_logs(logs_df) table = bigquery.Table(table_ref, schema=schema) table = client.create_table(table) logging.info(f"Table {table_ref} created with schema.") return table def load_audit_logs(logs_dir='app/audit_logs'): """監査ログをJSONファイルから読み込みDataFrameに統合""" logs = [] for filename in os.listdir(logs_dir): if filename.endswith(".json"): file_path = os.path.join(logs_dir, filename) try: with open(file_path, 'r') as f: file_logs = json.load(f) for log in file_logs: logs.append(flatten_json(log)) except json.JSONDecodeError as e: logging.error(f"Failed to load {file_path}: {e}") return pd.DataFrame(logs) def transform_audit_logs(logs_df, schema): """監査ログをBigQuery用の形式に変換""" transformed_logs = [] schema_field_names = {field.name for field in schema} logs_df.columns = [clean_field_name(col) for col in logs_df.columns] if "_timestamp" in logs_df.columns: logs_df["_timestamp"] = pd.to_datetime(logs_df["_timestamp"], unit='ms', utc=True) logs_df = logs_df.rename(columns={"_timestamp": "timestamp"}) for _, log in logs_df.iterrows(): transformed_log = {} for key, value in log.items(): if key in schema_field_names and not pd.isnull(value): if isinstance(value, pd.Timestamp): value = value.isoformat() elif isinstance(value, bool): value = int(value) transformed_log[key] = value if transformed_log: transformed_logs.append(transformed_log) return transformed_logs def insert_rows_with_retry(client, table_ref, rows, retries=3, delay=10): """BigQueryにデータをリトライ付きで挿入""" for attempt in range(retries): try: errors = client.insert_rows_json(table_ref, rows) if errors: logging.error(f"Errors occurred during insertion: {errors}") time.sleep(delay) else: logging.info("Data uploaded successfully.") return except GoogleAPIError as e: logging.error(f"Failed to upload data to BigQuery: {e}") time.sleep(delay) logging.error(f"Failed to insert rows into {table_ref} after {retries} attempts.") sys.exit(1) def main(): project_id = os.getenv('BQ_GCP_PROJECT_ID') dataset_id = os.getenv('BQ_DATASET') table_id = os.getenv('BQ_TABLE') table_ref = f"{project_id}.{dataset_id}.{table_id}" client = get_bigquery_client() logs_df = load_audit_logs() if not logs_df.empty: table = create_table_if_not_exists(client, table_ref, logs_df) rows_to_insert = transform_audit_logs(logs_df, table.schema) insert_rows_with_retry(client, table_ref, rows_to_insert) else: logging.error("No logs to process. Exiting.") sys.exit(1) if __name__ == "__main__": main()
GitHub App の作成
監査ログを取得するための GitHub App を作成します。詳細な手順は以下を参照してください。
GitHub App の Permissions は以下のとおりに設定します。
- Repository permissions
- Administration:
Read-only
- Metadata:
Read-only
- Administration:
Organization permissions
- Administration:
Read-only
- Administration:
参考 : GitHub Appの権限について
(General)から App ID を確認し、控えておきます。(後で GitHub の secret に登録します)
同画面から(Private keys)を作成し、ダウンロードします。(後で GitHub の secret に登録します)
(Install App)から対象の組織を確認し、(Install)を選択します。
(Only select repositories)から GitHub Actions を構築するレポジトリを選択し、(Install)を選択します。
GitHub Actions のシークレット登録
作成した GitHub App を GitHub Actions で使用するため、リポジトリの Secrets(シークレット)に登録します。登録対象は以下の 2 つです。
App ID
- 名前:
APP_ID
- 値: 前手順で確認した GitHub App の App ID
- 名前:
Private keys
- 名前:
APP_PRIVATE_KEY
- 値: PEM キーの値をコピーして貼り付ける
- 名前:
- 参考 : リポジトリのシークレットの作成
動作確認
手動実行
GitHub の(Actions)タブから対象ワークフローを選択し、取得期間のパラメータ(Start date
、End date
)を入力して(Run workflow)で実行します。
※ 一度に180日分を取得可能ですが、ログ数が多い場合、BigQuery へエクスポートする際に以下エラーで失敗することがあります。発生した場合、取得期間を短くして再実行してください。
Error: -06 08:40:51,290 [ERROR] Failed to upload data to BigQuery: Timeout of 600.0s exceeded, last exception: HTTPSConnectionPool(host='bigquery.googleapis.com', port=443): Max retries exceeded with url: /bigquery/v2/projects/xxxxx/datasets/xxxxx/tables/xxxxx/insertAll?prettyPrint=false (Caused by SSLError(SSLEOFError(8, 'EOF occurred in violation of protocol (_ssl.c:2426)')))
また、監査ログ API にはレート上限があり、1 時間に最大 1,750 クエリ(1 クエリで最大 100 件のログ取得が可能)の制限があります。この制限を超えた場合、403 または 429 エラー応答で失敗するため、ご注意ください。
- 参考 : レート上限
実行が成功したら、BigQuery を確認し、指定した期間の監査ログが格納されたテーブルがあることを確認します。
スケジュール実行
JST 0時にスケジュール実行され、ログが BigQuery にエクスポートされます。GitHub 側の負荷により実行までに遅延が発生する場合もありますので、ご注意ください。
- 参考 : schedule
GitHub の(Actions)から対象のワークフローが成功していることを確認します。(Event)から(schedule)を選択することで、スケジュール実行されたワークフローのみが表示されます。
処理が成功し、Artifacts にファイル(実行時間が記録された txt ファイル)があることを確認します。次回の実行時はこの時間以降のログを取得し、BigQuery へエクスポートします。
BigQuery を確認し、監査ログが出力されていることを確認します。