G-gen の佐々木です。当記事では Pub/Sub の Single Message Transforms(SMTs)機能によるメッセージの変換処理について解説します。
- 前提知識 : Pub/Sub とは
- Single Message Transforms(SMTs)とは
- 当記事の構成
- 宛先 BigQuery テーブルの作成
- SMTs を設定した Pub/Sub トピックの作成
- SMTs を設定した Pub/Sub サブスクリプションの作成
- メッセージの送信
前提知識 : Pub/Sub とは
Pub/Sub は、Google Cloud のフルマネージドなメッセージングサービスです。
Pub/Sub を始めとしたメッセージングサービスの詳細やユースケースについては、以下の記事をご一読ください。
Single Message Transforms(SMTs)とは
SMTs の基本
Single Message Transforms(以下、SMTs)は、Pub/Sub を使用したストリーミング パイプラインにおける単純なデータ変換を実現する機能です。
SMTs では Pub/Sub 自体にデータの変換処理を実装することで、Pub/Sub の前後で(Dataflow や Cloud Run functions などを使用して)データ変換処理を行うような、パイプラインの複雑化を回避することができます。
SMTs によるデータ変換処理は、Pub/Sub のトピックとサブスクリプションのそれぞれに対して設定することができます。
SMTs の設定箇所 | 変換処理のタイミング | ユースケース |
---|---|---|
トピック | メッセージがトピックに永続化される前に変換が行われる。 | ・サブスクリプションが複数ある場合に、共通の変換処理を行う。 ・後続の処理に渡すメッセージのデータ量を削減する。 ・無効なメッセージを検証してパブリッシュを抑制する。 |
サブスクリプション | メッセージがサブスクリプションに配信される前に変換が行われる。 | ・サブスクリプション特有の変換処理を行う。 ・無効なメッセージをデッドレタートピックに書き込んでアーカイブする。 |

SMTs は対象ごとに最大5個まで設定することができ、上に設定したものから順番に実行されます。

SMTs のユースケース
SMTs のユースケースを以下に示します。
- 文字列操作、日付の書式変換、数値計算など、単純なデータ変換
- 異なるシステム間で互換性を担保するためのデータ形式の変換
- クレジットカード番号や個人情報(PII)などのデータのマスキング・編集
- 不要なメッセージの破棄(フィルタリング)
メッセージのフィルタリングについては Pub/Sub 組み込みのフィルタリング機能で実現することもできますが、SMTs ではより複雑な条件でフィルタリングを行うことができます。
UDF による定義
SMTs は、JavaScript による User-Defined Function(UDF)を使用して実装します。
SMTs で使用する UDF は、単一のメッセージを message
引数として受け取り、処理の結果を返します。
function <関数名>(message, metadata) { // ここに処理内容を記述 return message; // 戻り値は `null` も可能 }
メッセージのペイロードは message.data
、メッセージ属性の KeyValue ペアは message.attributes
で取得できます。UDF の戻り値を null
とした場合、処理対象のメッセージは破棄されます。
なお、UDF でエラーが発生した場合は、トピックの SMTs であればパブリッシャーにエラーを返し、サブスクリプションの SMTs であればメッセージを否定応答(nack)します。
SMTs における UDF の詳細については以下のドキュメントを参照してください。
SMTs の注意事項
SMTs を使用する際の注意事項を以下に示します。
- SMTs はトピックおよびサブスクリプションのそれぞれに対して最大5つまで設定可能
- SMTs は単一のメッセージに対して動作するものであり、複数のメッセージを集約するような処理はできない
- 順序付けが有効になっているメッセージに対する SMTs の処理がエラーとなった場合、後続のメッセージは配信されない。この場合、デッドレタートピックを使用してエラーが発生したメッセージをバッグログから削除する必要がある
また、UDF の実行には以下の制限があります。
- UDF あたりのコード量は最大20KB
- メッセージごとの UDF の最大実行時間(タイムアウト)は500ミリ秒
- 外部 API の呼び出しは不可
- 外部ライブラリのインポートは不可
これらの制限に抵触するような処理を実行したい場合は、Dataflow や Cloud Run functions を用いた実装を検討するとよいでしょう。
当記事の構成
当記事では BigQuery サブスクリプションを使用し、トピックとサブスクリプションに SMTs を設定して、変換後のメッセージを BigQuery テーブルに書き込む処理を試してみます。
トピックとサブスクリプションの両方で SMTs によるメッセージ変換を行い、2つのテーブル(users, scores)に異なるデータを挿入してみます。

まず、トピックの SMTs では、メッセージのペイロード内の id
フィールドの値を、int 型から string 型に変換します。
そして、トピックで変換したメッセージを各サブスクリプションの SMTs で処理します。 BigQuery サブスクリプションを2つ用意して、一方では score
フィールドを削除して users
テーブルに書き込み、もう一方では name
フィールドを削除して scores
テーブルに書き込みます。
BigQuery サブスクリプションの詳細については、以下の記事をご一読ください。
宛先 BigQuery テーブルの作成
まず、BigQuery サブスクリプションの宛先となるテーブルを2つ作成します。
1つ目の users
テーブルは以下のスキーマで作成します。
フィールド名 | 種類 |
---|---|
id | STRING |
name | STRING |

2つ目の scores
テーブルは以下のスキーマで作成します。
フィールド名 | 種類 |
---|---|
id | STRING |
score | INTEGER |

SMTs を設定した Pub/Sub トピックの作成
パブリッシャーからメッセージを受け取るトピックを作成します。
トピック作成画面の「変換」項目で、トピックで実行される SMTs を設定することができます。
関数名に convertIdToString
と入力し、以下の UDF を記述します。
// convertIdToString 関数 function convertIdToString(message, metadata) { const data = JSON.parse(message.data); data['id'] = String(data["id"]); message.data = JSON.stringify(data); return message; }
この UDF により、メッセージペイロード(data)の id
フィールドが int 型から string 型に変換され、id
フィールド変換後のメッセージ全体が後続のサブスクリプションに送信されます。

SMTs を設定した Pub/Sub サブスクリプションの作成
users テーブルに書き込むサブスクリプション
users
テーブルに id
と name
のデータを書き込む BigQuery サブスクリプションを作成します。
まず、サブスクリプション作成画面の「配信タイプ」項目で「BigQuery への書き込み」にチェックを入れ、users
テーブルを指定します。
そして、メッセージペイロードのフィールド名をテーブルの列名と対応づけるため、「スキーマ構成」項目で「テーブル スキーマを使用する」にチェックを入れます。

「変換」項目で、サブスクリプションで実行される SMTs を設定することができます。
関数名に deleteScore
と入力し、以下の UDF を記述します。
// deleteScore 関数 function deleteScore(message, metadata) { const data = JSON.parse(message.data); delete data['score']; message.data = JSON.stringify(data); return message; }
この UDF により、メッセージペイロードの score
フィールドが削除され、削除後のメッセージが BigQuery の users
テーブルに書き込まれます。

scores テーブルに書き込むサブスクリプション
先ほどと同様に、今度は scores
テーブルに書き込むサブスクリプションを作成します。

「変換」項目にて、関数名に deleteName
と入力し、以下の UDF を記述します。
// deleteName 関数 function deleteName(message, metadata) { const data = JSON.parse(message.data); delete data['name']; message.data = JSON.stringify(data); return message; }
この UDF により、メッセージペイロードの name
フィールドが削除され、削除後のメッセージが BigQuery の scores
テーブルに書き込まれます。

メッセージの送信
SMTs を設定したトピック、サブスクリプションの準備が完了したので、動作確認を行います。
トピックの詳細画面から「メッセージ」タブを開き、「メッセージをパブリッシュ」を押下します。
「メッセージ本文」に以下のメッセージを記述し、「公開」を押下してメッセージをパブリッシュします。
{ "id": 1001, "name":"sasashun", "score":100 }
このメッセージの id
フィールドの値は int 型であり、対応する BigQuery テーブルの id
列は STRING 型であるため、トピックの SMTs で型の変換を行います。
その後、users
テーブルに書き込むサブスクリプションでは scores
フィールドを、scores
テーブルに書き込むサブスクリプションでは name
フィールドを、それぞれ SMTs を使用して削除します。

メッセージのパブリッシュ後、BigQuery の各テーブルを確認してみます。
users
テーブルには、最初にトピックで id
フィールドの値を int 型から string 型に変換し、次にサブスクリプションで score
フィールドが削除された結果のデータが挿入されています。

scores
テーブルには、最初にトピックで id
フィールドの値を int 型から string 型に変換し、次にサブスクリプションで name
フィールドが削除された結果のデータが挿入されています。

佐々木 駿太 (記事一覧)
G-gen最北端、北海道在住のクラウドソリューション部エンジニア
2022年6月にG-genにジョイン。Google Cloud Partner Top Engineer 2025 Fellowに選出。好きなGoogle CloudプロダクトはCloud Run。
趣味はコーヒー、小説(SF、ミステリ)、カラオケなど。
Follow @sasashun0805