GitHub監査ログをWorkload Identity認証でBigQueryにエクスポートしてみた

記事タイトルとURLをコピーする

G-gen の三浦です。当記事では Workload Identity の仕組みを使うことで、サービスアカウントキーを使わずに GitHub Enterprise の監査ログを BigQuery にエクスポートする仕組みを構築したのでご紹介します。

GitHub Enterprise とは

概要

GitHub Enterprise は、複数組織の一元管理や Microsoft Entra ID などの IdP(Identity Provider)を使用した SSO(シングルサインオン)などの機能を提供する有償プランです。

本プランでは監査ログ API を使用して組織内の操作履歴を取得・管理できます。監査ログを BigQuery にエクスポートすると、長期保存や高度な分析を行い、セキュリティ対策やコンプライアンス強化に役立ちます。

監査ログ

GitHub の監査ログには、組織メンバーのレポジトリ作成、プルリクエストやマージなどの操作が記録され、過去180日分のログを確認できます。git.clone など一部の Git イベントは 7 日間のみ保持されます。

Google Cloud への監査ログエクスポート

GitHub の監査ログは JSON 形式で Cloud Storage にエクスポートできますが、通常はサービスアカウントキーが必要です。

サービスアカウントキーは厳重な管理が必要です。漏洩した場合、第三者による不正利用のリスクがあります。Google Cloud のベストプラクティスでは、サービスアカウントキーを使用せずに認証する方法が推奨されています。

以上のことから、当記事では Google Cloud の Workload Identity 機能を使うことで、サービスアカウントキーを使用せずに GitHub 監査ログを取得する仕組みを実装しました。

アーキテクチャ

構成図

構成は図のとおりです。GitHub Actions を使用して監査ログを取得し、BigQuery にエクスポートします。

構成図

ディレクトリ構成

ディレクトリ構成は以下の通りです。

.
└── app
    └── main.py # 監査ログを BigQuery へエクスポートするスクリプト
.github
└── workflows
    └── github-audit-log-to-bq.yml  # GitHub Actions を定義

環境構築

Workload Identity とサービスアカウントの作成

GitHub Actions と Google Cloud を連携させるための Workload Identity とサービスアカウントを作成します。作成方法は、次の記事を参照してください。

Google CloudとGitHub Actions(Terraform)を連携するWorkload Identityを作成するbashスクリプト blog.g-gen.co.jp

BigQuery データセットの作成とサービスアカウントへの権限付与

BigQuery のデータセットを作成します。テーブルは GitHub Actions で自動的に作成されます。

# 環境変数を設定
PROJECT_ID="gha-demo-prj" # プロジェクトID
SERVICE_ACCOUNT_NAME="gha-demo-sa" # サービスアカウント名
BQ_DATASET="my_dataset" # BigQueryのデータセット名
 
# BigQuery データセットを東京リージョンに作成
bq --project_id=$PROJECT_ID \
   mk --location=asia-northeast1 \
   $BQ_DATASET

サービスアカウントに BigQuery へのデータ書き込み権限とジョブの実行権限を付与します。

# サービスアカウントへ権限付与
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:$SERVICE_ACCOUNT_NAME@$PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/bigquery.dataEditor"
 
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:$SERVICE_ACCOUNT_NAME@$PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/bigquery.jobUser"

GitHub Actions ワークフローの作成

このワークフローは、GitHub の監査ログを1日に1回自動で取得し、BigQuery にエクスポートします。前回の自動取得時刻をアーティファクト(GitHub Actions の成果物保存機能)で管理することで差分のログのみを取得します。過去のログも手動で取得できます。

env: の箇所に、環境に応じたプロジェクト ID などの値を記載してください。

# github-audit-log-to-bq.yml

name: Fetch and Upload GitHub Audit Logs to BigQuery
 
on:
  schedule:  # スケジュール実行
    - cron: '0 15 * * *'  # 毎日 JST 0 時に実行(UTC 15 時)

  workflow_dispatch:  # 手動実行
    inputs:
      start_date:  # ログ取得の開始日
        description: "Start date for fetching logs (ISO 8601 format, e.g., 2024-09-01T00:00:00Z)"
        required: true
        default: "2024-10-01T00:00:00Z"
      end_date:  # ログ取得の終了日
        description: "End date for fetching logs (ISO 8601 format, e.g., 2024-09-30T23:59:59Z)"
        required: true
        default: "2024-10-31T23:59:59Z"
 
permissions:
  id-token: write
  contents: read
  actions: read
 
jobs:
  fetch-and-upload:
    runs-on: ubuntu-latest  
 
    env:
      # BigQuery 設定
      BQ_GCP_PROJECT_ID: gha-demo-prj  # Google Cloud プロジェクト ID
      BQ_DATASET: my_dataset  # BigQuery データセット名
      BQ_TABLE: my_table  # BigQuery テーブル名

      # GitHub 設定
      GITHUB_ORG: myorg  # GitHub 組織名
      GH_TOKEN: ${{ github.token }}  # GitHub CLI 用のトークン

      # Workload Identity Federation 設定
      PROJECT_NUMBER: 1234567890 # プロジェクト番号 
      WORKLOAD_IDENTITY_POOL: gha-demo-pool  # Workload Identity プール名
      WORKLOAD_IDENTITY_POOL_PROVIDER: gha-demo-provider  # Workload Identity プールプロバイダ名
      SERVICE_ACCOUNT: gha-demo-sa@gha-demo-prj.iam.gserviceaccount.com  # 使用するサービスアカウント名

      # アーティファクト設定
      LAST_RUN_TIMESTAMP_NAME: "last_run_timestamp"  # 前回実行時刻のアーティファクト名
 
    steps:
      - uses: actions/checkout@v4
 
      # Google Cloud 認証 (Workload Identity)
      - id: 'auth'
        uses: 'google-github-actions/auth@v2'
        with:
          workload_identity_provider: 'projects/${{ env.PROJECT_NUMBER }}/locations/global/workloadIdentityPools/${{ env.WORKLOAD_IDENTITY_POOL }}/providers/${{ env.WORKLOAD_IDENTITY_POOL_PROVIDER }}'
          service_account: ${{ env.SERVICE_ACCOUNT }}
 
      # GitHub App トークンを生成
      - name: Generate GitHub App token
        id: generate_token
        uses: actions/create-github-app-token@v1
        with:
          app-id: ${{ secrets.APP_ID }}
          private-key: ${{ secrets.APP_PRIVATE_KEY }}
 
      # GitHub App トークンを環境変数に設定
      - name: Set GitHub App Access Token
        run: echo "ACCESS_TOKEN=${{ steps.generate_token.outputs.token }}" >> $GITHUB_ENV
 
      # Python環境をセットアップ
      - name: Set up Python environment
        run: |
          python3 -m venv venv
          . venv/bin/activate
          pip install --upgrade pip
          pip install google-cloud-bigquery requests jq pandas
 
      # 初回実行時のタイムスタンプを設定
      - name: Set Default Timestamp to Start of Today
        if: ${{ github.event_name == 'schedule' }}
        run: |
          DEFAULT_TIMESTAMP=$(date -u +"%Y-%m-%dT00:00:00Z")
          echo "DEFAULT_TIMESTAMP=$DEFAULT_TIMESTAMP" >> $GITHUB_ENV
 
      # 前回実行時刻のアーティファクトをダウンロード
      - name: Download previous timestamp artifact
        if: ${{ github.event_name == 'schedule' }}
        run: |
          mkdir -p artifacts

          ARTIFACT_URL=$(gh api -X GET "repos/${{ github.repository }}/actions/artifacts" \
            | jq -r '.artifacts[] | select(.name=="'${{ env.LAST_RUN_TIMESTAMP_NAME }}'") | .archive_download_url' | head -n 1)

          if [ -n "$ARTIFACT_URL" ]; then
            curl -L -o artifacts/${{ env.LAST_RUN_TIMESTAMP_NAME }}.zip -H "Authorization: token ${{ github.token }}" "$ARTIFACT_URL"
            unzip -o artifacts/${{ env.LAST_RUN_TIMESTAMP_NAME }}.zip -d artifacts/

            if [ ! -f "artifacts/${{ env.LAST_RUN_TIMESTAMP_NAME }}.txt" ]; then
              echo "${{ env.DEFAULT_TIMESTAMP }}" > artifacts/${{ env.LAST_RUN_TIMESTAMP_NAME }}.txt
            fi
          else
            echo "${{ env.DEFAULT_TIMESTAMP }}" > artifacts/${{ env.LAST_RUN_TIMESTAMP_NAME }}.txt
          fi
 
      # 前回実行時刻を読み込み
      - name: Read previous timestamp
        if: ${{ github.event_name == 'schedule' }}
        id: read-timestamp
        run: |
          if [ ! -f "artifacts/${{ env.LAST_RUN_TIMESTAMP_NAME }}.txt" ]; then
            echo "${{ env.DEFAULT_TIMESTAMP }}" > artifacts/${{ env.LAST_RUN_TIMESTAMP_NAME }}.txt
          fi
          last_run=$(cat artifacts/${{ env.LAST_RUN_TIMESTAMP_NAME }}.txt)
          echo "last_run=$last_run" >> $GITHUB_ENV
 
      # 今回実行時刻を環境変数に保存
      - name: Set execution time for last_run
        if: ${{ github.event_name == 'schedule' }}
        run: |
          execution_time=$(date --utc --iso-8601=seconds)
          echo "execution_time=$execution_time" >> $GITHUB_ENV
 
      # スケジュール実行時の監査ログ取得
      - name: Fetch GitHub Audit Logs for Scheduled Run
        if: ${{ github.event_name == 'schedule' }}
        env:
          last_run: ${{ env.last_run }}
          ACCESS_TOKEN: ${{ env.ACCESS_TOKEN }}
        run: |
          . venv/bin/activate
          rm -rf app/audit_logs
          mkdir -p app/audit_logs

          next_url="https://api.github.com/orgs/${{ env.GITHUB_ORG }}/audit-log?phrase=created:>$last_run&per_page=100&include=all"

          while [[ ! -z "$next_url" ]]; do
            response=$(curl -s -H "Authorization: Bearer $ACCESS_TOKEN" \
                                 -H "Accept: application/vnd.github.v3+json" \
                                 "$next_url")

            if echo "$response" | jq -e 'type == "object" and has("message")' >/dev/null 2>&1; then
              exit 1
            fi

            logs_count=$(echo "$response" | jq '. | length')
            echo "$response" > "app/audit_logs/log_$logs_count.json"

            next_url=$(curl -s -I -H "Authorization: Bearer $ACCESS_TOKEN" "$next_url" | grep -i '^link:' | sed -n 's/.*<\(.*\)>; rel="next".*/\1/p')
            
            sleep 2
          done
 
      # 手動実行時の監査ログ取得
      - name: Fetch GitHub Audit Logs for Manual Run
        if: ${{ github.event_name == 'workflow_dispatch' }}
        env:
          start_date: ${{ github.event.inputs.start_date }}
          end_date: ${{ github.event.inputs.end_date }}
          ACCESS_TOKEN: ${{ env.ACCESS_TOKEN }}
        run: |
          . venv/bin/activate
          rm -rf app/audit_logs
          mkdir -p app/audit_logs

          next_url="https://api.github.com/orgs/${{ env.GITHUB_ORG }}/audit-log?phrase=created%3A>${start_date}%20created%3A<${end_date}&per_page=100&include=all"

          while [[ ! -z "$next_url" ]]; do
            response=$(curl -s -H "Authorization: Bearer $ACCESS_TOKEN" \
                                 -H "Accept: application/vnd.github.v3+json" \
                                 "$next_url")

            if echo "$response" | jq -e 'type == "object" and has("message")' >/dev/null 2>&1; then
              exit 1
            fi

            logs_count=$(echo "$response" | jq '. | length')
            echo "$response" > "app/audit_logs/log_$logs_count.json"

            next_url=$(curl -s -I -H "Authorization: Bearer $ACCESS_TOKEN" "$next_url" | grep -i '^link:' | sed -n 's/.*<\(.*\)>; rel="next".*/\1/p')
            
            sleep 2
          done
 
      # 監査ログを BigQuery にアップロード
      - name: Run script to upload logs to BigQuery
        run: |
          . venv/bin/activate
          python3 app/main.py
 
      # 今回実行時刻をファイルに書き込み
      - name: Write new execution time to file
        if: ${{ success() && github.event_name == 'schedule' }}
        run: |
          mkdir -p artifacts
          echo "$execution_time" > artifacts/${{ env.LAST_RUN_TIMESTAMP_NAME }}.txt
 
      # 今回実行時刻をアーティファクトとしてアップロード
      - name: Upload updated timestamp artifact
        if: ${{ success() && github.event_name == 'schedule' }}
        uses: actions/upload-artifact@v4
        with:
          name: ${{ env.LAST_RUN_TIMESTAMP_NAME }}
          path: artifacts/${{ env.LAST_RUN_TIMESTAMP_NAME }}.txt

main.py の作成

このスクリプトは GitHub Actions で取得した監査ログを加工し、BigQuery へエクスポートします。エクスポート先のテーブルが存在しない場合、新規に作成します。

import json
import logging
import os
import sys
import time
  
import pandas as pd
    
from datetime import datetime, timezone
from google.cloud import bigquery
from google.api_core.exceptions import NotFound, GoogleAPIError
import re
 
# ログの設定: ログメッセージの出力形式を指定し、INFOレベル以上のメッセージを記録
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
 
def get_bigquery_client():
    """BigQueryクライアントを初期化して返す"""
    return bigquery.Client()
  
  
def flatten_json(y):
    """ネストされたJSONデータを平坦化"""
    out = {}
    def flatten(x, name=''):
        if isinstance(x, dict):
            for a in x:
                flatten(x[a], name + a + '_')
        elif isinstance(x, list):
            for i, a in enumerate(x):
                flatten(a, name + str(i) + '_')
        else:
            out[name[:-1]] = x
    flatten(y)
    return out
  
  
def clean_field_name(field_name):
    """フィールド名をBigQueryで許可された形式に変換"""
    field_name = re.sub(r'[^a-zA-Z0-9_]', '_', field_name)
    if field_name[0].isdigit():
        field_name = '_' + field_name
    return field_name
  
  
def infer_schema_from_logs(logs_df):
    """DataFrameからBigQuery用のスキーマを推測して生成"""
    schema = []
    for column in logs_df.columns:
        clean_column = clean_field_name(column)
        dtype = logs_df[column].dtype
        if clean_column == "_timestamp" and pd.api.types.is_integer_dtype(dtype):
            schema.append(bigquery.SchemaField("timestamp", "TIMESTAMP"))
        elif pd.api.types.is_integer_dtype(dtype):
            schema.append(bigquery.SchemaField(clean_column, "INTEGER"))
        elif pd.api.types.is_float_dtype(dtype):
            schema.append(bigquery.SchemaField(clean_column, "FLOAT"))
        elif pd.api.types.is_bool_dtype(dtype):
            schema.append(bigquery.SchemaField(clean_column, "BOOLEAN"))
        elif pd.api.types.is_datetime64_any_dtype(dtype):
            schema.append(bigquery.SchemaField(clean_column, "TIMESTAMP"))
        else:
            schema.append(bigquery.SchemaField(clean_column, "STRING"))
    return schema
  
  
def create_table_if_not_exists(client, table_ref, logs_df):
    """テーブルが存在しない場合、新規作成"""
    try:
        table = client.get_table(table_ref)
        logging.info(f"Table {table_ref} already exists.")
        return table
    except NotFound:
        schema = infer_schema_from_logs(logs_df)
        table = bigquery.Table(table_ref, schema=schema)
        table = client.create_table(table)
        logging.info(f"Table {table_ref} created with schema.")
        return table
  
  
def load_audit_logs(logs_dir='app/audit_logs'):
    """監査ログをJSONファイルから読み込みDataFrameに統合"""
    logs = []
    for filename in os.listdir(logs_dir):
        if filename.endswith(".json"):
            file_path = os.path.join(logs_dir, filename)
            try:
                with open(file_path, 'r') as f:
                    file_logs = json.load(f)
                    for log in file_logs:
                        logs.append(flatten_json(log))
            except json.JSONDecodeError as e:
                logging.error(f"Failed to load {file_path}: {e}")
    return pd.DataFrame(logs)
  
  
def transform_audit_logs(logs_df, schema):
    """監査ログをBigQuery用の形式に変換"""
    transformed_logs = []
    schema_field_names = {field.name for field in schema}
    logs_df.columns = [clean_field_name(col) for col in logs_df.columns]
    if "_timestamp" in logs_df.columns:
        logs_df["_timestamp"] = pd.to_datetime(logs_df["_timestamp"], unit='ms', utc=True)
        logs_df = logs_df.rename(columns={"_timestamp": "timestamp"})
    for _, log in logs_df.iterrows():
        transformed_log = {}
        for key, value in log.items():
            if key in schema_field_names and not pd.isnull(value):
                if isinstance(value, pd.Timestamp):
                    value = value.isoformat()
                elif isinstance(value, bool):
                    value = int(value)
                transformed_log[key] = value
        if transformed_log:
            transformed_logs.append(transformed_log)
    return transformed_logs
  
  
def insert_rows_with_retry(client, table_ref, rows, retries=3, delay=10):
    """BigQueryにデータをリトライ付きで挿入"""
    for attempt in range(retries):
        try:
            errors = client.insert_rows_json(table_ref, rows)
            if errors:
                logging.error(f"Errors occurred during insertion: {errors}")
                time.sleep(delay)
            else:
                logging.info("Data uploaded successfully.")
                return
        except GoogleAPIError as e:
            logging.error(f"Failed to upload data to BigQuery: {e}")
            time.sleep(delay)
    logging.error(f"Failed to insert rows into {table_ref} after {retries} attempts.")
    sys.exit(1)
  
  
def main():
    project_id = os.getenv('BQ_GCP_PROJECT_ID')
    dataset_id = os.getenv('BQ_DATASET')
    table_id = os.getenv('BQ_TABLE')
    table_ref = f"{project_id}.{dataset_id}.{table_id}"
    client = get_bigquery_client()
    logs_df = load_audit_logs()
    if not logs_df.empty:
        table = create_table_if_not_exists(client, table_ref, logs_df)
        rows_to_insert = transform_audit_logs(logs_df, table.schema)
        insert_rows_with_retry(client, table_ref, rows_to_insert)
    else:
        logging.error("No logs to process. Exiting.")
        sys.exit(1)
  
  
if __name__ == "__main__":
    main()

GitHub App の作成

監査ログを取得するための GitHub App を作成します。詳細な手順は以下を参照してください。

GitHub App の Permissions は以下のとおりに設定します。

  • Repository permissions
    • Administration: Read-only
    • Metadata: Read-only
  • Organization permissions

    • Administration: Read-only
  • 参考 : GitHub Appの権限について

(General)から App ID を確認し、控えておきます。(後で GitHub の secret に登録します)

App ID の確認

同画面から(Private keys)を作成し、ダウンロードします。(後で GitHub の secret に登録します)

Private keys の作成

(Install App)から対象の組織を確認し、(Install)を選択します。

組織へのインストール

(Only select repositories)から GitHub Actions を構築するレポジトリを選択し、(Install)を選択します。

GitHub App をインストールするレポジトリの選択

GitHub Actions のシークレット登録

作成した GitHub App を GitHub Actions で使用するため、リポジトリの Secrets(シークレット)に登録します。登録対象は以下の 2 つです。

  1. App ID

    • 名前: APP_ID
    • : 前手順で確認した GitHub App の App ID
  2. Private keys

    • 名前: APP_PRIVATE_KEY
    • : PEM キーの値をコピーして貼り付ける

Secrets の登録

動作確認

手動実行

GitHub の(Actions)タブから対象ワークフローを選択し、取得期間のパラメータ(Start dateEnd date)を入力して(Run workflow)で実行します。

手動実行

※ 一度に180日分を取得可能ですが、ログ数が多い場合、BigQuery へエクスポートする際に以下エラーで失敗することがあります。発生した場合、取得期間を短くして再実行してください。

Error: -06 08:40:51,290 [ERROR] Failed to upload data to BigQuery: Timeout of 600.0s exceeded, last exception: HTTPSConnectionPool(host='bigquery.googleapis.com', port=443): Max retries exceeded with url: /bigquery/v2/projects/xxxxx/datasets/xxxxx/tables/xxxxx/insertAll?prettyPrint=false (Caused by SSLError(SSLEOFError(8, 'EOF occurred in violation of protocol (_ssl.c:2426)')))

また、監査ログ API にはレート上限があり、1 時間に最大 1,750 クエリ(1 クエリで最大 100 件のログ取得が可能)の制限があります。この制限を超えた場合、403 または 429 エラー応答で失敗するため、ご注意ください。

実行が成功したら、BigQuery を確認し、指定した期間の監査ログが格納されたテーブルがあることを確認します。

手動実行成功

監査ログの確認

スケジュール実行

JST 0時にスケジュール実行され、ログが BigQuery にエクスポートされます。GitHub 側の負荷により実行までに遅延が発生する場合もありますので、ご注意ください。

GitHub の(Actions)から対象のワークフローが成功していることを確認します。(Event)から(schedule)を選択することで、スケジュール実行されたワークフローのみが表示されます。

フィルタ方法

処理が成功し、Artifacts にファイル(実行時間が記録された txt ファイル)があることを確認します。次回の実行時はこの時間以降のログを取得し、BigQuery へエクスポートします。

スケジュール実行成功
last_run_timestamp の中身

BigQuery を確認し、監査ログが出力されていることを確認します。

監査ログの確認

三浦 健斗(記事一覧)

クラウドソリューション部

2023年10月よりG-genにジョイン。元オンプレ中心のネットワークエンジニア。ネットワーク・セキュリティ・唐揚げ・辛いものが好き。