G-gen 又吉です。Google Cloud (旧称 GCP) の事前トレーニング済みの API のひとつである Cloud Vision API を用いて車のナンバープレートをマスキングする処理をご紹介します。

はじめに
Vision AI
Vision AI とは、Google Cloud 上で画像や動画から分析情報を取得することができる以下の 3 つのプロダクトの総称です。
- Vertex AI Vision
- Custom ML models (AutoML or Vertex AI 独自モデル)
- Vision API
今回は、その中の Vision API を使用します。
Vision API
Vision API とは、事前トレーニング済み Vision API モデル使用して、オブジェクトの検知や OCR などが行なえます。
Vision API には、主に以下のような機能があります。
No | 機能タイプ | 説明 |
---|---|---|
1 | Text detection | 画像の光学式文字認識 (OCR)。画像内の UTF-8 テキストを識別し抽出できる。 |
2 | Landmark detection | ランドマークの名前、信頼度スコア、および境界ボックス (画像内の位置) が取得できる。 |
3 | Label detection | 「People」や「Car」のような一般化されたラベルが取得でき、各ラベルには説明と信頼度スコアが記載されている。 |
4 | Object localization | 複数のオブジェクトに「People」や「Car」のような一般的なラベルと境界ボックス (画像内の位置) が取得できる。 |
5 | Face detection | 顔を特定し、目、耳、鼻、口などの特定の顔の「ランドマーク」を信頼度スコアとともに取得でき、また表情から感情の尤度評価も取得できる。 |
その他の機能、また詳細については以下をご参照下さい。
また、Cloud Vision API の概要ページ からローカルの画像を Vision API に読み込ませ、どのような出力がでるかすぐに確認できます。
(※ 当記事で扱う車の画像データに含まれるナンバープレートは、個人情報保護の観点からモザイク処理をかけております。)

Vision API は事前トレーニング済み Vision API モデルで解決できる場合に有用ですが、自社の製品をカスタムラベルとして設定したい等、別途トレーニングが必要な場合は Vertex AI Vision や AutoML などを用いる必要があります。
事前確認でわかったこと
事前に 2 枚の画像を、Cloud Vision API の概要ページからどのような出力結果がえられるのか試してみます。


出力結果①では、検出されたオブジェクトの中に Car (車)と License Plate (ナンバープレート) が含まれていることが確認できます。
しかし、出力結果②では Car は検出できているが License Plate が検出できていない
ことがわかります。
そこで、出力結果②の画像から車周辺を切り取りった新たな画像で確認してみます。

車周辺を切り取りった画像の出力 (出力結果③) では、License Plate がオブジェクトとして検出されました。
これらの事前確認結果から、Cloud Vision API へリクエストを送信する際、画像いっぱいに車体を入れることでナンバープレートを検出してくれる可能性が高まることがわかりました。
そこで今回は、画像内の車を検出したら車周辺を切り取ったものを新しい画像とし、その新しい画像からナンバープレートを検出しマスキング処理 (白塗り) を行う構成とします。

構成図
今回の構成図は以下のとおりです。

ユーザーが Raw Data バケットに画像データをアップロードすると、Cloud Storage トリガー経由で Cloud Functions (車検出&抽出関数) が起動します。画像内の車オブジェクトがあれば車周辺を切り取った新しい画像を Detected Car バケットに格納します。
次に、Detected Car バケットに画像データがアップロードされると、Cloud Storage トリガー経由で Cloud Functions (LP検出&マスキング関数) が起動します。画像内のナンバープレートオブジェクトがあればナンバープレートをマスキングした新しい画像を Detected LP バケットに格納します。
準備
ディレクトリ構成
開発環境は Cloud Shell を用いて行います。ディレクトリ構造は以下のとおりです。
terraform
ディレクトリ配下は、以下のとおりです。
terraform |-- gcf_source_code | |-- detect_car | | |-- main.py | | `-- requirements.txt | `-- detect_license_plate | |-- main.py | `-- requirements.txt `-- main.tf
main.tf
main.tf には Terraform のコードを記述しています。
locals { terraform_service_account = ${Terraform 実行に使われるサービスアカウントのメールアドレス} project_name = ${プロジェクト名} project_id = ${プロジェクト ID} folder_id = ${フォルダ ID} billing_account_id = ${請求先アカウント ID} } # terraform & provider の設定 terraform { required_providers { google = { source = "hashicorp/google" version = ">= 4.0.0" } } required_version = ">= 1.3.0" backend "gcs" { bucket = ${tfstate ファイルを格納する Cloud Storage バケット名} impersonate_service_account = ${Terraform 実行に使われるサービスアカウントのメールアドレス} } } # サービスアカウント権限借用の設定 provider "google" { alias = "impersonation" scopes = [ "https://www.googleapis.com/auth/cloud-platform", "https://www.googleapis.com/auth/userinfo.email", ] } data "google_service_account_access_token" "default" { provider = google.impersonation target_service_account = local.terraform_service_account scopes = ["userinfo-email", "cloud-platform"] lifetime = "1200s" } # Google プロバイダの設定 provider "google" { project = local.project_id region = "asia-northeast1" access_token = data.google_service_account_access_token.default.access_token request_timeout = "60s" } ###################################### ### プロジェクトの作成と API の有効化 ### ###################################### # プロジェクトの作成 resource "google_project" "poc" { name = local.project_name project_id = local.project_id folder_id = local.folder_id billing_account = local.billing_account_id } # API の有効化 module "tenant_a_project_services" { source = "terraform-google-modules/project-factory/google//modules/project_services" version = "14.2.1" project_id = google_project.poc.project_id enable_apis = true activate_apis = [ "iam.googleapis.com", "cloudbuild.googleapis.com", "run.googleapis.com", "cloudfunctions.googleapis.com", "pubsub.googleapis.com", "eventarc.googleapis.com", "artifactregistry.googleapis.com", "storage.googleapis.com", "vision.googleapis.com" ] disable_services_on_destroy = false } ####################################### ### サービスアカウントの作成と権限の付与 ## ####################################### # Cloud Functions 用サービスアカウントの作成と権限付与 resource "google_service_account" "sa_gcf" { project = google_project.poc.project_id account_id = "sa-gcf" display_name = "Cloud Functions 用サービスアカウント" } resource "google_project_iam_member" "invoke_gcf" { project = google_project.poc.project_id role = "roles/run.invoker" member = "serviceAccount:${google_service_account.sa_gcf.email}" } resource "google_project_iam_member" "storage_admin" { project = google_project.poc.project_id role = "roles/storage.admin" member = "serviceAccount:${google_service_account.sa_gcf.email}" } resource "google_project_iam_member" "event_receiving" { project = google_project.poc.project_id role = "roles/eventarc.eventReceiver" member = "serviceAccount:${google_service_account.sa_gcf.email}" depends_on = [google_project_iam_member.invoke_gcf] } resource "google_project_iam_member" "artifactregistry_reader" { project = google_project.poc.project_id role = "roles/artifactregistry.reader" member = "serviceAccount:${google_service_account.sa_gcf.email}" depends_on = [google_project_iam_member.event_receiving] } # Eventarc のサービスアカウントに権限付与 resource "google_project_iam_member" "serviceAccount_token_creator" { project = google_project.poc.project_id role = "roles/iam.serviceAccountTokenCreator" member = "serviceAccount:service-${google_project.poc.number}@gcp-sa-pubsub.iam.gserviceaccount.com" depends_on = [module.tenant_a_project_services] } # Cloud Storage のサービスアカウントに権限付与 data "google_storage_project_service_account" "gcs_account" { project = google_project.poc.project_id depends_on = [ module.tenant_a_project_services ] } resource "google_project_iam_member" "gcs_pubsub_publishing" { project = google_project.poc.project_id role = "roles/pubsub.publisher" member = "serviceAccount:${data.google_storage_project_service_account.gcs_account.email_address}" } ################################ ### バケットとオブジェクトの作成 ### ################################ # Cloud Functions のソースコード格納用バケットの作成 resource "google_storage_bucket" "source_gcf" { project = google_project.poc.project_id location = "asia-northeast1" name = "${google_project.poc.project_id}-source-gcf" force_destroy = true } # Cloud Functions で使うソースコードを ZIP 化 data "archive_file" "detect_car" { type = "zip" source_dir = "./gcf_source_code/detect_car" output_path = "./zip_source_code/detect_car.zip" } data "archive_file" "detect_license_plate" { type = "zip" source_dir = "./gcf_source_code/detect_license_plate" output_path = "./zip_source_code/detect_license_plate.zip" } # ZIP 化したソースコードをバケットに追加 resource "google_storage_bucket_object" "detect_car" { name = "detect-car.${data.archive_file.detect_car.output_md5}.zip" bucket = google_storage_bucket.source_gcf.name source = data.archive_file.detect_car.output_path } resource "google_storage_bucket_object" "detect_license_plate" { name = "detect-license-plate.${data.archive_file.detect_license_plate.output_md5}.zip" bucket = google_storage_bucket.source_gcf.name source = data.archive_file.detect_license_plate.output_path } # raw_data バケットの作成 resource "google_storage_bucket" "raw_data" { project = google_project.poc.project_id location = "asia-northeast1" name = "${google_project.poc.project_id}-raw-data" force_destroy = true } # detected_car バケットの作成 resource "google_storage_bucket" "detected_car" { project = google_project.poc.project_id location = "asia-northeast1" name = "${google_project.poc.project_id}-detected-car" force_destroy = true } # not_detected_car バケットの作成 resource "google_storage_bucket" "not_detected_car" { project = google_project.poc.project_id location = "asia-northeast1" name = "${google_project.poc.project_id}-not-detected-car" force_destroy = true } # detected_license_plate バケットの作成 resource "google_storage_bucket" "detected_license_plate" { project = google_project.poc.project_id location = "asia-northeast1" name = "${google_project.poc.project_id}-detected-license-plate" force_destroy = true } # not_detected_license_plate バケットの作成 resource "google_storage_bucket" "not_detected_license_plate" { project = google_project.poc.project_id location = "asia-northeast1" name = "${google_project.poc.project_id}-not-detected-license-plate" force_destroy = true } ############################ ### Cloud Functions 作成 ### ############################ # detect_car 関数 resource "google_cloudfunctions2_function" "detect_car" { depends_on = [ google_project_iam_member.event_receiving, google_project_iam_member.artifactregistry_reader, google_project_iam_member.serviceAccount_token_creator ] name = "detect-car" location = "asia-northeast1" description = "画像から車を切り取って detect_car バケットに格納する関数" build_config { runtime = "python310" entry_point = "main" # Set the entry point in the code source { storage_source { bucket = google_storage_bucket.source_gcf.name object = google_storage_bucket_object.detect_car.name } } } service_config { max_instance_count = 3 min_instance_count = 1 available_memory = "256M" timeout_seconds = 60 environment_variables = { DETECTED_CAR_BUCHET_NAME = google_storage_bucket.detected_car.name NOT_DETECTED_CAR_BUCHET_NAME = google_storage_bucket.not_detected_car.name KEY = "Car" } service_account_email = google_service_account.sa_gcf.email } event_trigger { trigger_region = "asia-northeast1" event_type = "google.cloud.storage.object.v1.finalized" retry_policy = "RETRY_POLICY_RETRY" service_account_email = google_service_account.sa_gcf.email event_filters { attribute = "bucket" value = google_storage_bucket.raw_data.name } } } # detected_license_plate 関数 resource "google_cloudfunctions2_function" "detected_license_plate" { depends_on = [ google_project_iam_member.event_receiving, google_project_iam_member.artifactregistry_reader, google_project_iam_member.serviceAccount_token_creator ] name = "detected-license-plate" location = "asia-northeast1" description = "画像からナンバープレートをマスキングして detected_license_plate バケットに格納する関数" build_config { runtime = "python310" entry_point = "main" # Set the entry point in the code source { storage_source { bucket = google_storage_bucket.source_gcf.name object = google_storage_bucket_object.detect_license_plate.name } } } service_config { max_instance_count = 3 min_instance_count = 1 available_memory = "256M" timeout_seconds = 60 environment_variables = { DETECTED_LICENSE_PLATE = google_storage_bucket.detected_license_plate.name NOT_DETECTED_LICENSE_PLATE = google_storage_bucket.not_detected_license_plate.name KEY = "License plate" } service_account_email = google_service_account.sa_gcf.email } event_trigger { trigger_region = "asia-northeast1" event_type = "google.cloud.storage.object.v1.finalized" retry_policy = "RETRY_POLICY_RETRY" service_account_email = google_service_account.sa_gcf.email event_filters { attribute = "bucket" value = google_storage_bucket.detected_car.name } } }
gcf_source_code/detect_car
main.py
gcf_source_code/detect_car には、画像から車を切り取ってバケットに格納する処理を行う Cloud Functions のソースコードを格納しています。
from io import BytesIO import os from cloudevents.http import CloudEvent import functions_framework from google.cloud import vision from google.cloud import storage from PIL import Image DETECTED_CAR_BUCHET_NAME = os.environ.get("DETECTED_CAR_BUCHET_NAME") NOT_DETECTED_CAR_BUCHET_NAME = os.environ.get("NOT_DETECTED_CAR_BUCHET_NAME") KEY = os.environ.get("KEY") # クライアントを初期化 vision_image_annotator_client = vision.ImageAnnotatorClient() storage_client = storage.Client() def download_blob(bucket_name, blob_name): # バケットを取得 bucket = storage_client.get_bucket(bucket_name) # オブジェクト(画像)を取得 blob = bucket.blob(blob_name) image_data = blob.download_as_bytes() return image_data def localize_objects(image_data): # Vision API 実行 image = vision.Image(content=image_data) objects = vision_image_annotator_client.object_localization(image=image).localized_object_annotations # 辞書型に整形 localize_object_di = {} for object_ in objects: vertex_li = [] for vertex in object_.bounding_poly.normalized_vertices: vertex_li.append({"x": vertex.x, "y": vertex.y}) localize_object_di[object_.name] = { "score" : object_.score, "vertex" : vertex_li } return localize_object_di def cropped_image_upload_detected_car_bucket( image_data, key_di, detected_car_bucket_name, blob_name): # BytesIOを使って画像データを読み込み image = Image.open(BytesIO(image_data)) # 画像のサイズを取得 width, height = image.size # 左下と右上の座標に margin を追加 margin = 0.05 key_di['vertex'][0]["x"] = key_di['vertex'][0]["x"] - margin # 左下 x 軸 key_di['vertex'][0]["y"] = key_di['vertex'][0]["y"] - margin # 左下 y 軸 key_di['vertex'][2]["x"] = key_di['vertex'][2]["x"] + margin # 右上 x 軸 key_di['vertex'][2]["y"] = key_di['vertex'][2]["y"] + margin # 右上 y 軸 x_coords = [v["x"] * width for v in key_di["vertex"]] y_coords = [v["y"] * height for v in key_di["vertex"]] # 切り抜きの領域を設定 left = min(x_coords) upper = min(y_coords) right = max(x_coords) lower = max(y_coords) # 画像を切り抜く cropped_image = image.crop((left, upper, right, lower)) # ローカルに一時的に保存 tmp_file = f"/tmp/{blob_name}" cropped_image.save(tmp_file) # Cloud Storage へアップロード bucket = storage_client.get_bucket(detected_car_bucket_name) blob_cropped = bucket.blob(blob_name) blob_cropped.upload_from_filename(tmp_file) # ローカルから削除 os.remove(tmp_file) return "ok" def upload_not_detected_car_bucket(image_data, not_detected_car_bucket_name, blob_name): # BytesIOを使って画像データを読み込み image = Image.open(BytesIO(image_data)) # ローカルに一時的に保存 tmp_file = f"/tmp/{blob_name}" image.save(tmp_file) # Cloud Storage へアップロード bucket = storage_client.get_bucket(not_detected_car_bucket_name) blob_cropped = bucket.blob(blob_name) blob_cropped.upload_from_filename(tmp_file) return "ok" def is_check_key(localize_object_di, key): # 辞書のキーの中に特定の文字列が1つだけあるかどうかをチェック key_to_check = key key_count = list(localize_object_di.keys()).count(key_to_check) if key_count == 1: return True else: return False @functions_framework.cloud_event def main(cloud_event: CloudEvent): # CloudEvent から渡されたデータを取得 data = cloud_event.data bucket_name = data["bucket"] file_name = data["name"] image_data = download_blob(bucket_name=bucket_name, blob_name=file_name) localize_object_di = localize_objects(image_data=image_data) if is_check_key(localize_object_di=localize_object_di, key=KEY): cropped_image_upload_detected_car_bucket( image_data=image_data, key_di=localize_object_di[KEY], detected_car_bucket_name=DETECTED_CAR_BUCHET_NAME, blob_name=file_name) else: upload_not_detected_car_bucket( image_data=image_data, not_detected_car_bucket_name=NOT_DETECTED_CAR_BUCHET_NAME, blob_name=file_name) return "ok"
requirements.txt
functions-framework==3.* cloudevents==1.9.0 google-cloud-vision==3.4.4 google-cloud-storage==2.10.0 numpy==1.25.1 Pillow==10.0.0 opencv-python==4.8.0.74
gcf_source_code/detect_license_plate
main.py
gcf_source_code/detect_license_plate には、ナンバープレートをマスキングしてバケットに格納する処理を行う Cloud Functions のソースコードを格納しています。
from io import BytesIO import os from cloudevents.http import CloudEvent import functions_framework from google.cloud import vision from google.cloud import storage from PIL import Image from PIL import ImageDraw DETECTED_LICENSE_PLATE = os.environ.get("DETECTED_LICENSE_PLATE") NOT_DETECTED_LICENSE_PLATE = os.environ.get("NOT_DETECTED_LICENSE_PLATE") KEY = os.environ.get("KEY") # クライアントを初期化 vision_image_annotator_client = vision.ImageAnnotatorClient() storage_client = storage.Client() def download_blob(bucket_name, blob_name): # バケットを取得 bucket = storage_client.get_bucket(bucket_name) # オブジェクト(画像)を取得 blob = bucket.blob(blob_name) image_data = blob.download_as_bytes() return image_data def localize_objects(image_data): # Vision API 実行 image = vision.Image(content=image_data) objects = vision_image_annotator_client.object_localization(image=image).localized_object_annotations # 辞書型に整形 localize_object_di = {} for object_ in objects: vertex_li = [] for vertex in object_.bounding_poly.normalized_vertices: vertex_li.append({"x": vertex.x, "y": vertex.y}) localize_object_di[object_.name] = { "score" : object_.score, "vertex" : vertex_li } return localize_object_di def masked_image_upload_detected_license_plate_bucket( image_data, key_di, detected_license_plate_bucket, blob_name): # BytesIOを使って画像データを読み込み image = Image.open(BytesIO(image_data)) # 画像のサイズを取得 width, height = image.size x_coords = [v["x"] * width for v in key_di["vertex"]] y_coords = [v["y"] * height for v in key_di["vertex"]] # 切り抜きの領域を設定 left = min(x_coords) upper = min(y_coords) right = max(x_coords) lower = max(y_coords) # 画像に白い矩形を描画 ImageDraw.Draw(image).rectangle(((left, upper), (right, lower)), fill="white") # ローカルに一時的に保存 tmp_file = f"/tmp/{blob_name}" image.save(tmp_file) # Cloud Storage へアップロード bucket = storage_client.get_bucket(detected_license_plate_bucket) blob_cropped = bucket.blob(blob_name) blob_cropped.upload_from_filename(tmp_file) # ローカルから削除 os.remove(tmp_file) return "ok" def upload_not_detected_license_plate_bucket(image_data, not_detected_license_plate_bucket, blob_name): # BytesIOを使って画像データを読み込み image = Image.open(BytesIO(image_data)) # ローカルに一時的に保存 tmp_file = f"/tmp/{blob_name}" image.save(tmp_file) # Cloud Storage にアップロード bucket = storage_client.get_bucket(not_detected_license_plate_bucket) blob_cropped = bucket.blob(blob_name) blob_cropped.upload_from_filename(tmp_file) return "ok" def is_check_key(localize_object_di, key): # 辞書のキーの中に特定の文字列が1つだけあるかどうかをチェック key_to_check = key key_count = list(localize_object_di.keys()).count(key_to_check) if key_count == 1: return True else: return False @functions_framework.cloud_event def main(cloud_event: CloudEvent): # Cloud Storage から渡されたデータを取得 data = cloud_event.data bucket_name = data["bucket"] file_name = data["name"] image_data = download_blob(bucket_name=bucket_name, blob_name=file_name) localize_object_di = localize_objects(image_data=image_data) if is_check_key(localize_object_di=localize_object_di, key=KEY): masked_image_upload_detected_license_plate_bucket( image_data=image_data, key_di=localize_object_di[KEY], detected_license_plate_bucket=DETECTED_LICENSE_PLATE, blob_name=file_name) else: upload_not_detected_license_plate_bucket( image_data=image_data, not_detected_license_plate_bucket=NOT_DETECTED_LICENSE_PLATE, blob_name=file_name) return "ok"
requirements.txt
functions-framework==3.* cloudevents==1.9.0 google-cloud-vision==3.4.4 google-cloud-storage==2.10.0 numpy==1.25.1 Pillow==10.0.0 opencv-python==4.8.0.74
動作検証
検証データ
以下の 3 枚の画像データで動作検証を行います。

実行
生データ格納用バケットに検証データをアップロードします。

直後に Cloud Storage トリガー経由で Cloud Functions が起動し、最終的にナンバープレートマスキング加工後格納バケットにオブジェクトが生成されました。

中身は以下のようになってました。

検証前データと比較すると、車周辺部分で切り取られ、ナンバープレートがマスキングできていることを確認できました。
又吉 佑樹(記事一覧)
クラウドソリューション部
はいさーい!沖縄出身のクラウドエンジニアです!!
前職は SIer テクニカルセールス。Google Cloud の魅力に惚れ、技術を磨きたくセールスからエンジニアへ転身。Google Cloud 認定資格は全 11 資格保有。Google Cloud 公式ユーザー会 Jagu'e'r でエバンジェリストとして活動中。好きな分野は AI/ML。
Follow @matayuuuu