Pub/SubのSingle Message Transforms (SMTs) を使ってみた

記事タイトルとURLをコピーする

G-gen の佐々木です。当記事では Pub/SubSingle Message Transforms(SMTs)機能によるメッセージの変換処理について解説します。

前提知識 : Pub/Sub とは

Pub/Sub は、Google Cloud のフルマネージドなメッセージングサービスです。

Pub/Sub を始めとしたメッセージングサービスの詳細やユースケースについては、以下の記事をご一読ください。

blog.g-gen.co.jp

Single Message Transforms(SMTs)とは

SMTs の基本

Single Message Transforms(以下、SMTs)は、Pub/Sub を使用したストリーミング パイプラインにおける単純なデータ変換を実現する機能です。

SMTs では Pub/Sub 自体にデータの変換処理を実装することで、Pub/Sub の前後で(Dataflow や Cloud Run functions などを使用して)データ変換処理を行うような、パイプラインの複雑化を回避することができます。

SMTs によるデータ変換処理は、Pub/Sub のトピックサブスクリプションのそれぞれに対して設定することができます。

SMTs の設定箇所 変換処理のタイミング ユースケース
トピック メッセージがトピックに永続化される前に変換が行われる。 ・サブスクリプションが複数ある場合に、共通の変換処理を行う。
・後続の処理に渡すメッセージのデータ量を削減する。
・無効なメッセージを検証してパブリッシュを抑制する。
サブスクリプション メッセージがサブスクリプションに配信される前に変換が行われる。 ・サブスクリプション特有の変換処理を行う。
・無効なメッセージをデッドレタートピックに書き込んでアーカイブする。

SMTs はトピックとサブスクリプションに設定することができる

SMTs は対象ごとに最大5個まで設定することができ、上に設定したものから順番に実行されます。

SMTs を複数設定すると上から順に実行される(順番は変更できる)

SMTs のユースケース

SMTs のユースケースを以下に示します。

  • 文字列操作、日付の書式変換、数値計算など、単純なデータ変換
  • 異なるシステム間で互換性を担保するためのデータ形式の変換
  • クレジットカード番号や個人情報(PII)などのデータのマスキング・編集
  • 不要なメッセージの破棄(フィルタリング)

メッセージのフィルタリングについては Pub/Sub 組み込みのフィルタリング機能で実現することもできますが、SMTs ではより複雑な条件でフィルタリングを行うことができます。

UDF による定義

SMTs は、JavaScript による User-Defined Function(UDF)を使用して実装します。

SMTs で使用する UDF は、単一のメッセージを message 引数として受け取り、処理の結果を返します。

function <関数名>(message, metadata) {
  // ここに処理内容を記述
  return message; // 戻り値は `null` も可能
}

メッセージのペイロードは message.data、メッセージ属性の KeyValue ペアは message.attributes で取得できます。UDF の戻り値を null とした場合、処理対象のメッセージは破棄されます。

なお、UDF でエラーが発生した場合は、トピックの SMTs であればパブリッシャーにエラーを返し、サブスクリプションの SMTs であればメッセージを否定応答(nack)します。

SMTs における UDF の詳細については以下のドキュメントを参照してください。

SMTs の注意事項

SMTs を使用する際の注意事項を以下に示します。

  • SMTs はトピックおよびサブスクリプションのそれぞれに対して最大5つまで設定可能
  • SMTs は単一のメッセージに対して動作するものであり、複数のメッセージを集約するような処理はできない
  • 順序付けが有効になっているメッセージに対する SMTs の処理がエラーとなった場合、後続のメッセージは配信されない。この場合、デッドレタートピックを使用してエラーが発生したメッセージをバッグログから削除する必要がある

また、UDF の実行には以下の制限があります。

  • UDF あたりのコード量は最大20KB
  • メッセージごとの UDF の最大実行時間(タイムアウト)は500ミリ秒
  • 外部 API の呼び出しは不可
  • 外部ライブラリのインポートは不可

これらの制限に抵触するような処理を実行したい場合は、Dataflow や Cloud Run functions を用いた実装を検討するとよいでしょう。

当記事の構成

当記事では BigQuery サブスクリプションを使用し、トピックとサブスクリプションに SMTs を設定して、変換後のメッセージを BigQuery テーブルに書き込む処理を試してみます。

トピックとサブスクリプションの両方で SMTs によるメッセージ変換を行い、2つのテーブル(users, scores)に異なるデータを挿入してみます。

トピックとサブスクリプションで SMTs を使用するサンプル構成

まず、トピックの SMTs では、メッセージのペイロード内の id フィールドの値を、int 型から string 型に変換します。

そして、トピックで変換したメッセージを各サブスクリプションの SMTs で処理します。 BigQuery サブスクリプションを2つ用意して、一方では score フィールドを削除して users テーブルに書き込み、もう一方では name フィールドを削除して scores テーブルに書き込みます。

BigQuery サブスクリプションの詳細については、以下の記事をご一読ください。

blog.g-gen.co.jp

宛先 BigQuery テーブルの作成

まず、BigQuery サブスクリプションの宛先となるテーブルを2つ作成します。

1つ目の users テーブルは以下のスキーマで作成します。

フィールド名 種類
id STRING
name STRING

users テーブルのスキーマ

2つ目の scores テーブルは以下のスキーマで作成します。

フィールド名 種類
id STRING
score INTEGER

scores テーブルのスキーマ

SMTs を設定した Pub/Sub トピックの作成

パブリッシャーからメッセージを受け取るトピックを作成します。

トピック作成画面の「変換」項目で、トピックで実行される SMTs を設定することができます。

関数名に convertIdToString と入力し、以下の UDF を記述します。

// convertIdToString 関数
function convertIdToString(message, metadata) {
  const data = JSON.parse(message.data);
  data['id'] = String(data["id"]);
  message.data = JSON.stringify(data);
  return message;
}

この UDF により、メッセージペイロード(data)の id フィールドが int 型から string 型に変換され、id フィールド変換後のメッセージ全体が後続のサブスクリプションに送信されます。

SMTs によるメッセージ変換を行うトピックを作成する

SMTs を設定した Pub/Sub サブスクリプションの作成

users テーブルに書き込むサブスクリプション

users テーブルに idname のデータを書き込む BigQuery サブスクリプションを作成します。

まず、サブスクリプション作成画面の「配信タイプ」項目で「BigQuery への書き込み」にチェックを入れ、users テーブルを指定します。

そして、メッセージペイロードのフィールド名をテーブルの列名と対応づけるため、「スキーマ構成」項目で「テーブル スキーマを使用する」にチェックを入れます。

BigQuery の users テーブルへの書き込み設定

「変換」項目で、サブスクリプションで実行される SMTs を設定することができます。

関数名に deleteScore と入力し、以下の UDF を記述します。

// deleteScore 関数
function deleteScore(message, metadata) {
  const data = JSON.parse(message.data);
  delete data['score'];
  message.data = JSON.stringify(data);
  return message;
}

この UDF により、メッセージペイロードの score フィールドが削除され、削除後のメッセージが BigQuery の users テーブルに書き込まれます。

users テーブルに書き込むサブスクリプションの SMTs を設定する

scores テーブルに書き込むサブスクリプション

先ほどと同様に、今度は scores テーブルに書き込むサブスクリプションを作成します。

BigQuery の scores テーブルへの書き込み設定

「変換」項目にて、関数名に deleteName と入力し、以下の UDF を記述します。

// deleteName 関数
function deleteName(message, metadata) {
  const data = JSON.parse(message.data);
  delete data['name'];
  message.data = JSON.stringify(data);
  return message;
}

この UDF により、メッセージペイロードの name フィールドが削除され、削除後のメッセージが BigQuery の scores テーブルに書き込まれます。

scores テーブルに書き込むサブスクリプションの SMTs を設定する

メッセージの送信

SMTs を設定したトピック、サブスクリプションの準備が完了したので、動作確認を行います。

トピックの詳細画面から「メッセージ」タブを開き、「メッセージをパブリッシュ」を押下します。

「メッセージ本文」に以下のメッセージを記述し、「公開」を押下してメッセージをパブリッシュします。

{
  "id": 1001,
  "name":"sasashun",
  "score":100
}

このメッセージの id フィールドの値は int 型であり、対応する BigQuery テーブルの id 列は STRING 型であるため、トピックの SMTs で型の変換を行います。

その後、users テーブルに書き込むサブスクリプションでは scores フィールドを、scores テーブルに書き込むサブスクリプションでは name フィールドを、それぞれ SMTs を使用して削除します。

テストメッセージのパブリッシュ

メッセージのパブリッシュ後、BigQuery の各テーブルを確認してみます。

users テーブルには、最初にトピックで id フィールドの値を int 型から string 型に変換し、次にサブスクリプションで score フィールドが削除された結果のデータが挿入されています。

users テーブルの SELECT クエリ結果

scores テーブルには、最初にトピックで id フィールドの値を int 型から string 型に変換し、次にサブスクリプションで name フィールドが削除された結果のデータが挿入されています。

scores テーブルの SELECT クエリ結果

佐々木 駿太 (記事一覧)

G-gen最北端、北海道在住のクラウドソリューション部エンジニア

2022年6月にG-genにジョイン。Google Cloud Partner Top Engineer 2025 Fellowに選出。好きなGoogle CloudプロダクトはCloud Run。

趣味はコーヒー、小説(SF、ミステリ)、カラオケなど。