Cloud Vision APIを用いて車のナンバープレートをマスキングしてみる

記事タイトルとURLをコピーする

G-gen 又吉です。Google Cloud (旧称 GCP) の事前トレーニング済みの API のひとつである Cloud Vision API を用いて車のナンバープレートをマスキングする処理をご紹介します。

Cloud Vision API と Cloud Functions でナンバープレートをマスキング

はじめに

Vision AI

Vision AI とは、Google Cloud 上で画像や動画から分析情報を取得することができる以下の 3 つのプロダクトの総称です。

今回は、その中の Vision API を使用します。

Vision API

Vision API とは、事前トレーニング済み Vision API モデル使用して、オブジェクトの検知や OCR などが行なえます。

Vision API には、主に以下のような機能があります。

No 機能タイプ 説明
1 Text detection 画像の光学式文字認識 (OCR)。画像内の UTF-8 テキストを識別し抽出できる。
2 Landmark detection ランドマークの名前、信頼度スコア、および境界ボックス (画像内の位置) が取得できる。
3 Label detection 「People」や「Car」のような一般化されたラベルが取得でき、各ラベルには説明と信頼度スコアが記載されている。
4 Object localization 複数のオブジェクトに「People」や「Car」のような一般的なラベルと境界ボックス (画像内の位置) が取得できる。
5 Face detection 顔を特定し、目、耳、鼻、口などの特定の顔の「ランドマーク」を信頼度スコアとともに取得でき、また表情から感情の尤度評価も取得できる。

その他の機能、また詳細については以下をご参照下さい。

また、Cloud Vision API の概要ページ からローカルの画像を Vision API に読み込ませ、どのような出力がでるかすぐに確認できます。

(※ 当記事で扱う車の画像データに含まれるナンバープレートは、個人情報保護の観点からモザイク処理をかけております。)

Cloud Vision API 概要ページの出力結果

Vision API は事前トレーニング済み Vision API モデルで解決できる場合に有用ですが、自社の製品をカスタムラベルとして設定したい等、別途トレーニングが必要な場合は Vertex AI Vision や AutoML などを用いる必要があります。

事前確認でわかったこと

事前に 2 枚の画像を、Cloud Vision API の概要ページからどのような出力結果がえられるのか試してみます。

出力結果①

出力結果②

出力結果①では、検出されたオブジェクトの中に Car (車)と License Plate (ナンバープレート) が含まれていることが確認できます。

しかし、出力結果②では Car は検出できているが License Plate が検出できていないことがわかります。

そこで、出力結果②の画像から車周辺を切り取りった新たな画像で確認してみます。

出力結果③

車周辺を切り取りった画像の出力 (出力結果③) では、License Plate がオブジェクトとして検出されました。

これらの事前確認結果から、Cloud Vision API へリクエストを送信する際、画像いっぱいに車体を入れることでナンバープレートを検出してくれる可能性が高まることがわかりました。

そこで今回は、画像内の車を検出したら車周辺を切り取ったものを新しい画像とし、その新しい画像からナンバープレートを検出しマスキング処理 (白塗り) を行う構成とします。

ナンバープレートをマスキングする処理の流れ

構成図

今回の構成図は以下のとおりです。

構成図

ユーザーが Raw Data バケットに画像データをアップロードすると、Cloud Storage トリガー経由で Cloud Functions (車検出&抽出関数) が起動します。画像内の車オブジェクトがあれば車周辺を切り取った新しい画像を Detected Car バケットに格納します。

次に、Detected Car バケットに画像データがアップロードされると、Cloud Storage トリガー経由で Cloud Functions (LP検出&マスキング関数) が起動します。画像内のナンバープレートオブジェクトがあればナンバープレートをマスキングした新しい画像を Detected LP バケットに格納します。

準備

ディレクトリ構成

開発環境は Cloud Shell を用いて行います。ディレクトリ構造は以下のとおりです。

terraform ディレクトリ配下は、以下のとおりです。

terraform
|-- gcf_source_code
|   |-- detect_car
|   |   |-- main.py
|   |   `-- requirements.txt
|   `-- detect_license_plate
|       |-- main.py
|       `-- requirements.txt
`-- main.tf

main.tf

main.tf には Terraform のコードを記述しています。

locals {
  terraform_service_account            = ${Terraform 実行に使われるサービスアカウントのメールアドレス}
  project_name                         = ${プロジェクト名}
  project_id                           = ${プロジェクト ID}
  folder_id                            = ${フォルダ ID}
  billing_account_id                   = ${請求先アカウント ID}
}
  
# terraform & provider の設定
terraform {
  required_providers {
    google = {
      source  = "hashicorp/google"
      version = ">= 4.0.0"
    }
  }
  required_version = ">= 1.3.0"
  
  backend "gcs" {
    bucket = ${tfstate ファイルを格納する Cloud Storage バケット名}
    impersonate_service_account = ${Terraform 実行に使われるサービスアカウントのメールアドレス}
    }
}
  
# サービスアカウント権限借用の設定
provider "google" {
    alias = "impersonation"
    scopes = [
        "https://www.googleapis.com/auth/cloud-platform",
        "https://www.googleapis.com/auth/userinfo.email",
    ]
}
  
data "google_service_account_access_token" "default" {
    provider               = google.impersonation
    target_service_account = local.terraform_service_account
    scopes                 = ["userinfo-email", "cloud-platform"]
    lifetime               = "1200s"
}
  
# Google プロバイダの設定
provider "google" {
  project         = local.project_id
  region          = "asia-northeast1"
  access_token    = data.google_service_account_access_token.default.access_token
  request_timeout = "60s"
}
  
  
######################################
### プロジェクトの作成と API の有効化 ###
######################################
  
# プロジェクトの作成
resource "google_project" "poc" {
  name            = local.project_name
  project_id      = local.project_id
  folder_id       = local.folder_id
  billing_account = local.billing_account_id
}
  
# API の有効化
module "tenant_a_project_services" {
  source  = "terraform-google-modules/project-factory/google//modules/project_services"
  version = "14.2.1"
  project_id  = google_project.poc.project_id
  enable_apis = true
  activate_apis = [
    "iam.googleapis.com",
    "cloudbuild.googleapis.com",
    "run.googleapis.com",
    "cloudfunctions.googleapis.com",
    "pubsub.googleapis.com",
    "eventarc.googleapis.com",
    "artifactregistry.googleapis.com",
    "storage.googleapis.com",
    "vision.googleapis.com"
  ]
  disable_services_on_destroy = false
}
  
  
#######################################
### サービスアカウントの作成と権限の付与 ##
#######################################
  
# Cloud Functions 用サービスアカウントの作成と権限付与
resource "google_service_account" "sa_gcf" {
  project      = google_project.poc.project_id
  account_id   = "sa-gcf"
  display_name = "Cloud Functions 用サービスアカウント"
}
  
resource "google_project_iam_member" "invoke_gcf" {
  project = google_project.poc.project_id
  role    = "roles/run.invoker"
  member  = "serviceAccount:${google_service_account.sa_gcf.email}"
}
  
resource "google_project_iam_member" "storage_admin" {
  project = google_project.poc.project_id
  role    = "roles/storage.admin"
  member  = "serviceAccount:${google_service_account.sa_gcf.email}"
}
  
resource "google_project_iam_member" "event_receiving" {
  project = google_project.poc.project_id
  role    = "roles/eventarc.eventReceiver"
  member  = "serviceAccount:${google_service_account.sa_gcf.email}"
  depends_on = [google_project_iam_member.invoke_gcf]
}
  
resource "google_project_iam_member" "artifactregistry_reader" {
  project = google_project.poc.project_id
  role     = "roles/artifactregistry.reader"
  member   = "serviceAccount:${google_service_account.sa_gcf.email}"
  depends_on = [google_project_iam_member.event_receiving]
}
  
# Eventarc のサービスアカウントに権限付与
resource "google_project_iam_member" "serviceAccount_token_creator" {
  project = google_project.poc.project_id
  role     = "roles/iam.serviceAccountTokenCreator"
  member   = "serviceAccount:service-${google_project.poc.number}@gcp-sa-pubsub.iam.gserviceaccount.com"
  depends_on = [module.tenant_a_project_services]
}
  
# Cloud Storage のサービスアカウントに権限付与
data "google_storage_project_service_account" "gcs_account" {
  project = google_project.poc.project_id
  depends_on = [ module.tenant_a_project_services ]
}
  
resource "google_project_iam_member" "gcs_pubsub_publishing" {
  project = google_project.poc.project_id
  role    = "roles/pubsub.publisher"
  member  = "serviceAccount:${data.google_storage_project_service_account.gcs_account.email_address}"
}
  
  
################################
### バケットとオブジェクトの作成 ###
################################
  
# Cloud Functions のソースコード格納用バケットの作成
resource "google_storage_bucket" "source_gcf" {
  project       = google_project.poc.project_id
  location      = "asia-northeast1"
  name          = "${google_project.poc.project_id}-source-gcf"
  force_destroy = true
}
  
# Cloud Functions で使うソースコードを ZIP 化
data "archive_file" "detect_car" {
  type        = "zip"
  source_dir  = "./gcf_source_code/detect_car"
  output_path = "./zip_source_code/detect_car.zip"
}
  
data "archive_file" "detect_license_plate" {
  type        = "zip"
  source_dir  = "./gcf_source_code/detect_license_plate"
  output_path = "./zip_source_code/detect_license_plate.zip"
}
  
# ZIP 化したソースコードをバケットに追加
resource "google_storage_bucket_object" "detect_car" {
  name   = "detect-car.${data.archive_file.detect_car.output_md5}.zip"
  bucket = google_storage_bucket.source_gcf.name
  source = data.archive_file.detect_car.output_path
}
  
resource "google_storage_bucket_object" "detect_license_plate" {
  name   = "detect-license-plate.${data.archive_file.detect_license_plate.output_md5}.zip"
  bucket = google_storage_bucket.source_gcf.name
  source = data.archive_file.detect_license_plate.output_path
}
  
# raw_data バケットの作成
resource "google_storage_bucket" "raw_data" {
  project       = google_project.poc.project_id
  location      = "asia-northeast1"
  name          = "${google_project.poc.project_id}-raw-data"
  force_destroy = true
}
  
# detected_car バケットの作成
resource "google_storage_bucket" "detected_car" {
  project       = google_project.poc.project_id
  location      = "asia-northeast1"
  name          = "${google_project.poc.project_id}-detected-car"
  force_destroy = true
}
  
# not_detected_car バケットの作成
resource "google_storage_bucket" "not_detected_car" {
  project       = google_project.poc.project_id
  location      = "asia-northeast1"
  name          = "${google_project.poc.project_id}-not-detected-car"
  force_destroy = true
}
  
# detected_license_plate バケットの作成
resource "google_storage_bucket" "detected_license_plate" {
  project       = google_project.poc.project_id
  location      = "asia-northeast1"
  name          = "${google_project.poc.project_id}-detected-license-plate"
  force_destroy = true
}
  
# not_detected_license_plate バケットの作成
resource "google_storage_bucket" "not_detected_license_plate" {
  project       = google_project.poc.project_id
  location      = "asia-northeast1"
  name          = "${google_project.poc.project_id}-not-detected-license-plate"
  force_destroy = true
}
  
  
############################
### Cloud Functions 作成 ###
############################
  
# detect_car 関数
resource "google_cloudfunctions2_function" "detect_car" {
  depends_on = [
    google_project_iam_member.event_receiving,
    google_project_iam_member.artifactregistry_reader,
    google_project_iam_member.serviceAccount_token_creator
  ]
  name = "detect-car"
  location = "asia-northeast1"
  description = "画像から車を切り取って detect_car バケットに格納する関数"
  build_config {
    runtime     = "python310"
    entry_point = "main" # Set the entry point in the code
    source {
      storage_source {
        bucket = google_storage_bucket.source_gcf.name
        object = google_storage_bucket_object.detect_car.name
      }
    }
  }
  service_config {
    max_instance_count  = 3
    min_instance_count = 1
    available_memory    = "256M"
    timeout_seconds     = 60
    environment_variables = {
      DETECTED_CAR_BUCHET_NAME = google_storage_bucket.detected_car.name
      NOT_DETECTED_CAR_BUCHET_NAME = google_storage_bucket.not_detected_car.name
      KEY = "Car"
    }
    service_account_email = google_service_account.sa_gcf.email
  }
  event_trigger {
    trigger_region = "asia-northeast1"
    event_type = "google.cloud.storage.object.v1.finalized"
    retry_policy = "RETRY_POLICY_RETRY"
    service_account_email = google_service_account.sa_gcf.email
    event_filters {
      attribute = "bucket"
      value = google_storage_bucket.raw_data.name
    }
  }
}
  
# detected_license_plate 関数
resource "google_cloudfunctions2_function" "detected_license_plate" {
  depends_on = [
    google_project_iam_member.event_receiving,
    google_project_iam_member.artifactregistry_reader,
    google_project_iam_member.serviceAccount_token_creator
  ]
  name = "detected-license-plate"
  location = "asia-northeast1"
  description = "画像からナンバープレートをマスキングして detected_license_plate バケットに格納する関数"
  build_config {
    runtime     = "python310"
    entry_point = "main" # Set the entry point in the code
    source {
      storage_source {
        bucket = google_storage_bucket.source_gcf.name
        object = google_storage_bucket_object.detect_license_plate.name
      }
    }
  }
  service_config {
    max_instance_count  = 3
    min_instance_count = 1
    available_memory    = "256M"
    timeout_seconds     = 60
    environment_variables = {
      DETECTED_LICENSE_PLATE = google_storage_bucket.detected_license_plate.name
      NOT_DETECTED_LICENSE_PLATE = google_storage_bucket.not_detected_license_plate.name
      KEY = "License plate"
    }
    service_account_email = google_service_account.sa_gcf.email
  }
  event_trigger {
    trigger_region = "asia-northeast1"
    event_type = "google.cloud.storage.object.v1.finalized"
    retry_policy = "RETRY_POLICY_RETRY"
    service_account_email = google_service_account.sa_gcf.email
    event_filters {
      attribute = "bucket"
      value = google_storage_bucket.detected_car.name
    }
  }
}

gcf_source_code/detect_car

main.py

gcf_source_code/detect_car には、画像から車を切り取ってバケットに格納する処理を行う Cloud Functions のソースコードを格納しています。

from io import BytesIO
import os
  
from cloudevents.http import CloudEvent
import functions_framework
from google.cloud import vision
from google.cloud import storage
from PIL import Image
  
  
DETECTED_CAR_BUCHET_NAME = os.environ.get("DETECTED_CAR_BUCHET_NAME")
NOT_DETECTED_CAR_BUCHET_NAME = os.environ.get("NOT_DETECTED_CAR_BUCHET_NAME")
KEY = os.environ.get("KEY")
  
# クライアントを初期化
vision_image_annotator_client = vision.ImageAnnotatorClient()
storage_client = storage.Client()
  
  
def download_blob(bucket_name, blob_name):
    # バケットを取得
    bucket = storage_client.get_bucket(bucket_name)
  
    # オブジェクト(画像)を取得
    blob = bucket.blob(blob_name)
    image_data = blob.download_as_bytes()
  
    return image_data
  
  
def localize_objects(image_data):
  
    # Vision API 実行
    image = vision.Image(content=image_data)
    objects = vision_image_annotator_client.object_localization(image=image).localized_object_annotations
  
    # 辞書型に整形
    localize_object_di = {}
    for object_ in objects:
        vertex_li = []
        for vertex in object_.bounding_poly.normalized_vertices:
            vertex_li.append({"x": vertex.x, "y": vertex.y})
        localize_object_di[object_.name] = {
            "score" : object_.score,
            "vertex" : vertex_li
        }
  
    return localize_object_di
  
  
def cropped_image_upload_detected_car_bucket(
        image_data, key_di, 
        detected_car_bucket_name, 
        blob_name):
  
    # BytesIOを使って画像データを読み込み
    image = Image.open(BytesIO(image_data))
  
    # 画像のサイズを取得
    width, height = image.size
  
    # 左下と右上の座標に margin を追加
    margin = 0.05
    key_di['vertex'][0]["x"] = key_di['vertex'][0]["x"] - margin # 左下 x 軸
    key_di['vertex'][0]["y"] = key_di['vertex'][0]["y"] - margin # 左下 y 軸
    key_di['vertex'][2]["x"] = key_di['vertex'][2]["x"] + margin # 右上 x 軸
    key_di['vertex'][2]["y"] = key_di['vertex'][2]["y"] + margin # 右上 y 軸
  
    x_coords = [v["x"] * width for v in key_di["vertex"]]
    y_coords = [v["y"] * height for v in key_di["vertex"]]
  
    # 切り抜きの領域を設定
    left = min(x_coords)
    upper = min(y_coords)
    right = max(x_coords)
    lower = max(y_coords)
  
    # 画像を切り抜く
    cropped_image = image.crop((left, upper, right, lower))
  
    # ローカルに一時的に保存
    tmp_file = f"/tmp/{blob_name}"
    cropped_image.save(tmp_file)
  
    # Cloud Storage へアップロード
    bucket = storage_client.get_bucket(detected_car_bucket_name)
    blob_cropped = bucket.blob(blob_name)
    blob_cropped.upload_from_filename(tmp_file)
  
    # ローカルから削除
    os.remove(tmp_file)
  
    return "ok"
  
  
def upload_not_detected_car_bucket(image_data, not_detected_car_bucket_name, blob_name):
    # BytesIOを使って画像データを読み込み
    image = Image.open(BytesIO(image_data))
  
    # ローカルに一時的に保存
    tmp_file = f"/tmp/{blob_name}"
    image.save(tmp_file)
  
    # Cloud Storage へアップロード
    bucket = storage_client.get_bucket(not_detected_car_bucket_name)
    blob_cropped = bucket.blob(blob_name)
    blob_cropped.upload_from_filename(tmp_file)
  
    return "ok"
  
  
def is_check_key(localize_object_di, key):
    # 辞書のキーの中に特定の文字列が1つだけあるかどうかをチェック
    key_to_check = key
    key_count = list(localize_object_di.keys()).count(key_to_check)
    if key_count == 1:
        return True
    else:
        return False
  
  
@functions_framework.cloud_event
def main(cloud_event: CloudEvent):
  
    # CloudEvent から渡されたデータを取得
    data = cloud_event.data
  
    bucket_name = data["bucket"]
    file_name = data["name"]
  
    image_data = download_blob(bucket_name=bucket_name, blob_name=file_name)
  
    localize_object_di = localize_objects(image_data=image_data)
  
    if is_check_key(localize_object_di=localize_object_di, key=KEY):
        cropped_image_upload_detected_car_bucket(
                image_data=image_data, 
                key_di=localize_object_di[KEY], 
                detected_car_bucket_name=DETECTED_CAR_BUCHET_NAME,
                blob_name=file_name)
    else:
        upload_not_detected_car_bucket(
                image_data=image_data, 
                not_detected_car_bucket_name=NOT_DETECTED_CAR_BUCHET_NAME, 
                blob_name=file_name)
    
    return "ok"

requirements.txt

functions-framework==3.*
cloudevents==1.9.0
google-cloud-vision==3.4.4
google-cloud-storage==2.10.0
numpy==1.25.1
Pillow==10.0.0
opencv-python==4.8.0.74

gcf_source_code/detect_license_plate

main.py

gcf_source_code/detect_license_plate には、ナンバープレートをマスキングしてバケットに格納する処理を行う Cloud Functions のソースコードを格納しています。

from io import BytesIO
import os
  
from cloudevents.http import CloudEvent
import functions_framework
from google.cloud import vision
from google.cloud import storage
from PIL import Image
from PIL import ImageDraw
  
  
DETECTED_LICENSE_PLATE = os.environ.get("DETECTED_LICENSE_PLATE")
NOT_DETECTED_LICENSE_PLATE = os.environ.get("NOT_DETECTED_LICENSE_PLATE")
KEY = os.environ.get("KEY")
  
# クライアントを初期化
vision_image_annotator_client = vision.ImageAnnotatorClient()
storage_client = storage.Client()
  
  
def download_blob(bucket_name, blob_name):
    # バケットを取得
    bucket = storage_client.get_bucket(bucket_name)
  
    # オブジェクト(画像)を取得
    blob = bucket.blob(blob_name)
    image_data = blob.download_as_bytes()
    
    return image_data
  
  
def localize_objects(image_data):
  
    # Vision API 実行
    image = vision.Image(content=image_data)
    objects = vision_image_annotator_client.object_localization(image=image).localized_object_annotations
  
    # 辞書型に整形
    localize_object_di = {}
    for object_ in objects:
        vertex_li = []
        for vertex in object_.bounding_poly.normalized_vertices:
            vertex_li.append({"x": vertex.x, "y": vertex.y})
        localize_object_di[object_.name] = {
            "score" : object_.score,
            "vertex" : vertex_li
        }
  
    return localize_object_di
  
  
def masked_image_upload_detected_license_plate_bucket(
        image_data, key_di, 
        detected_license_plate_bucket, 
        blob_name):
  
    # BytesIOを使って画像データを読み込み
    image = Image.open(BytesIO(image_data))
  
    # 画像のサイズを取得
    width, height = image.size
  
    x_coords = [v["x"] * width for v in key_di["vertex"]]
    y_coords = [v["y"] * height for v in key_di["vertex"]]
  
    # 切り抜きの領域を設定
    left = min(x_coords)
    upper = min(y_coords)
    right = max(x_coords)
    lower = max(y_coords)
  
    # 画像に白い矩形を描画
    ImageDraw.Draw(image).rectangle(((left, upper), (right, lower)), fill="white")
  
    # ローカルに一時的に保存
    tmp_file = f"/tmp/{blob_name}"
    image.save(tmp_file)
  
    # Cloud Storage へアップロード
    bucket = storage_client.get_bucket(detected_license_plate_bucket)
    blob_cropped = bucket.blob(blob_name)
    blob_cropped.upload_from_filename(tmp_file)
  
    # ローカルから削除
    os.remove(tmp_file)
  
    return "ok"
  
  
def upload_not_detected_license_plate_bucket(image_data, not_detected_license_plate_bucket, blob_name):
    # BytesIOを使って画像データを読み込み
    image = Image.open(BytesIO(image_data))
  
    # ローカルに一時的に保存
    tmp_file = f"/tmp/{blob_name}"
    image.save(tmp_file)
  
    # Cloud Storage にアップロード
    bucket = storage_client.get_bucket(not_detected_license_plate_bucket)
    blob_cropped = bucket.blob(blob_name)
    blob_cropped.upload_from_filename(tmp_file)
  
    return "ok"
  
  
def is_check_key(localize_object_di, key):
    # 辞書のキーの中に特定の文字列が1つだけあるかどうかをチェック
    key_to_check = key
    key_count = list(localize_object_di.keys()).count(key_to_check)
    if key_count == 1:
        return True
    else:
        return False
  
  
@functions_framework.cloud_event
def main(cloud_event: CloudEvent):
  
    # Cloud Storage から渡されたデータを取得
    data = cloud_event.data
  
    bucket_name = data["bucket"]
    file_name = data["name"]
  
    image_data = download_blob(bucket_name=bucket_name, blob_name=file_name)
  
    localize_object_di = localize_objects(image_data=image_data)
  
    if is_check_key(localize_object_di=localize_object_di, key=KEY):
        masked_image_upload_detected_license_plate_bucket(
                image_data=image_data, 
                key_di=localize_object_di[KEY], 
                detected_license_plate_bucket=DETECTED_LICENSE_PLATE,
                blob_name=file_name)
    else:
        upload_not_detected_license_plate_bucket(
                image_data=image_data, 
                not_detected_license_plate_bucket=NOT_DETECTED_LICENSE_PLATE, 
                blob_name=file_name)
    
    return "ok"

requirements.txt

functions-framework==3.*
cloudevents==1.9.0
google-cloud-vision==3.4.4
google-cloud-storage==2.10.0
numpy==1.25.1
Pillow==10.0.0
opencv-python==4.8.0.74

動作検証

検証データ

以下の 3 枚の画像データで動作検証を行います。

検証前データ

実行

生データ格納用バケットに検証データをアップロードします。

生データ格納用バケットコンソール画面

直後に Cloud Storage トリガー経由で Cloud Functions が起動し、最終的にナンバープレートマスキング加工後格納バケットにオブジェクトが生成されました。

ナンバープレートマスキング加工後格納バケットコンソール画面

中身は以下のようになってました。

検証後データ

検証前データと比較すると、車周辺部分で切り取られ、ナンバープレートがマスキングできていることを確認できました。

又吉 佑樹(記事一覧)

クラウドソリューション部

はいさい、沖縄出身のクラウドエンジニア!

セールスからエンジニアへ転身。Google Cloud 全 11 資格保有。Google Cloud Champion Innovator (AI/ML)。Google Cloud Partner Top Engineer 2024。Google Cloud 公式ユーザー会 Jagu'e'r でエバンジェリストとして活動中。好きな分野は AI/ML。