Google Cloud Platformに戻る

Pub/Sub

Pub/Subとは

Pub/Sub(Publish/Subscribe)は、Google Cloud Platform (GCP) が提供するフルマネージドのメッセージングサービスです。アプリケーション間で非同期にメッセージを送受信するためのプラットフォームを提供し、分散システムやマイクロサービスアーキテクチャにおけるコンポーネント間の疎結合な通信を実現します。Pub/Subは、高いスケーラビリティと信頼性を備え、グローバルに分散したアプリケーション間でもリアルタイムなメッセージ配信を可能にします。

Pub/Subの主な特徴

Pub/Subのアーキテクチャ

Pub/Subは以下の主要コンポーネントで構成されています:

コンポーネント 説明
トピック メッセージが発行される名前付きのリソース。パブリッシャーはトピックにメッセージを送信する
サブスクリプション トピックからメッセージを受信するための名前付きのリソース。サブスクライバーはサブスクリプションを通じてメッセージを受信する
メッセージ パブリッシャーからサブスクライバーに送信されるデータの単位。メッセージ本文と属性(キーと値のペア)で構成される
パブリッシャー トピックにメッセージを送信するアプリケーションまたはサービス
サブスクライバー サブスクリプションからメッセージを受信するアプリケーションまたはサービス
アクノリッジメント サブスクライバーがメッセージを正常に処理したことを確認する信号

Pub/Sub と他のメッセージングサービスの違い

Pub/Subと他のメッセージングサービスには以下のような違いがあります:

Pub/Subの使用方法

Pub/Subは、Google Cloud Console、gcloud CLIツール、クライアントライブラリ、またはRESTful APIを使用して利用できます。

トピックの作成

gcloud CLIを使用したトピックの作成

gcloud pubsub topics create my-topic

メッセージ保持期間の設定

gcloud pubsub topics update my-topic \
    --message-retention-duration=7d

サブスクリプションの作成

プル型サブスクリプションの作成

gcloud pubsub subscriptions create my-pull-subscription \
    --topic=my-topic \
    --ack-deadline=60 \
    --message-retention-duration=7d

プッシュ型サブスクリプションの作成

gcloud pubsub subscriptions create my-push-subscription \
    --topic=my-topic \
    --push-endpoint=https://example.com/push \
    --ack-deadline=60

メッセージの発行

gcloud CLIを使用したメッセージの発行

gcloud pubsub topics publish my-topic \
    --message="Hello, World!" \
    --attribute="key1=value1,key2=value2"

メッセージの受信(プル型)

gcloud CLIを使用したメッセージの受信

gcloud pubsub subscriptions pull my-pull-subscription \
    --auto-ack \
    --limit=10

クライアントライブラリを使用したPub/Subの操作

Node.jsでのメッセージ発行例

const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();

async function publishMessage() {
  const topicName = 'my-topic';
  const data = Buffer.from('Hello, World!');
  
  const messageId = await pubsub.topic(topicName).publish(data, {
    key1: 'value1',
    key2: 'value2'
  });
  
  console.log(`Message ${messageId} published.`);
  return messageId;
}

publishMessage();

Node.jsでのメッセージ受信例(プル型)

const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();

function listenForMessages() {
  const subscriptionName = 'my-pull-subscription';
  const subscription = pubsub.subscription(subscriptionName);
  
  const messageHandler = message => {
    console.log(`Received message ${message.id}:`);
    console.log(`Data: ${message.data.toString()}`);
    console.log(`Attributes: ${JSON.stringify(message.attributes)}`);
    
    // メッセージを確認(アクノリッジ)
    message.ack();
  };
  
  subscription.on('message', messageHandler);
  
  console.log(`Listening for messages on ${subscriptionName}...`);
}

listenForMessages();

Pythonでのメッセージ発行例

from google.cloud import pubsub_v1

def publish_message():
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path('my-project', 'my-topic')
    
    data = 'Hello, World!'.encode('utf-8')
    attributes = {
        'key1': 'value1',
        'key2': 'value2'
    }
    
    future = publisher.publish(topic_path, data, **attributes)
    message_id = future.result()
    
    print(f'Message {message_id} published.')
    return message_id

publish_message()

Pythonでのメッセージ受信例(プル型)

from google.cloud import pubsub_v1

def listen_for_messages():
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path('my-project', 'my-pull-subscription')
    
    def callback(message):
        print(f'Received message {message.message_id}:')
        print(f'Data: {message.data.decode("utf-8")}')
        print(f'Attributes: {message.attributes}')
        
        # メッセージを確認(アクノリッジ)
        message.ack()
    
    streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
    print(f'Listening for messages on {subscription_path}...')
    
    try:
        streaming_pull_future.result()
    except KeyboardInterrupt:
        streaming_pull_future.cancel()
        subscriber.close()

listen_for_messages()

Pub/Subの高度な機能

メッセージフィルタリング

属性に基づいてメッセージをフィルタリングするサブスクリプションを作成できます:

gcloud pubsub subscriptions create my-filtered-subscription \
    --topic=my-topic \
    --filter="attributes.key1 = \"value1\" AND attributes.key2 = \"value2\""

デッドレターポリシー

処理できないメッセージを特別なトピックに転送するデッドレターポリシーを設定できます:

gcloud pubsub subscriptions create my-subscription \
    --topic=my-topic \
    --dead-letter-topic=my-dead-letter-topic \
    --max-delivery-attempts=5

順序保証

メッセージの順序を保証するには、順序キーを使用します:

const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();

async function publishOrderedMessages() {
  const topicName = 'my-topic';
  const topic = pubsub.topic(topicName, {
    messageOrdering: true
  });
  
  // 同じ順序キーを持つメッセージは順序通りに配信される
  const orderingKey = 'user-123';
  
  for (let i = 1; i <= 10; i++) {
    const data = Buffer.from(`Message ${i}`);
    const messageId = await topic.publish(data, {}, orderingKey);
    console.log(`Message ${messageId} published with ordering key: ${orderingKey}`);
  }
}

publishOrderedMessages();

スキーマ

メッセージの構造を定義し、検証するためのスキーマを使用できます:

# Avroスキーマの作成
gcloud pubsub schemas create my-schema \
    --type=AVRO \
    --definition="{\"type\":\"record\",\"name\":\"MyRecord\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"value\",\"type\":\"int\"}]}"

# スキーマをトピックに関連付け
gcloud pubsub topics create my-schema-topic \
    --schema=my-schema \
    --message-encoding=JSON

Pub/Subのセキュリティ

Pub/Subは、メッセージングシステムのセキュリティを確保するための様々な機能を提供しています:

アクセス制御

IAMを使用して、トピックとサブスクリプションへのアクセスを制御できます:

# トピックへの発行権限を付与
gcloud pubsub topics add-iam-policy-binding my-topic \
    --member=serviceAccount:my-service-account@my-project.iam.gserviceaccount.com \
    --role=roles/pubsub.publisher

# サブスクリプションへのサブスクライブ権限を付与
gcloud pubsub subscriptions add-iam-policy-binding my-subscription \
    --member=serviceAccount:my-service-account@my-project.iam.gserviceaccount.com \
    --role=roles/pubsub.subscriber

暗号化

Pub/Subは、保存時と転送時のデータを自動的に暗号化します。さらに、カスタマー管理の暗号化キー(CMEK)を使用することもできます:

gcloud pubsub topics create my-cmek-topic \
    --topic-encryption-key=projects/my-project/locations/global/keyRings/my-keyring/cryptoKeys/my-key

VPCサービスコントロール

VPCサービスコントロールを使用して、Pub/Subへのアクセスをセキュリティ境界内に制限できます。

Pub/Subのモニタリングと可観測性

Pub/SubはGoogle Cloud Monitoringと統合されており、トピックとサブスクリプションの状態を監視できます:

主なモニタリングメトリクス

ログの分析

Cloud Loggingを使用して、Pub/Subの操作ログを分析できます:

gcloud logging read 'resource.type="pubsub_topic" AND resource.labels.topic_id="my-topic"'

Pub/Subのユースケース

Pub/Subは以下のようなユースケースに適しています:

Pub/Subとの統合サービス

Pub/Subは他のGoogle Cloudサービスと統合して、より強力なソリューションを構築できます:

サービス 統合の利点
Cloud Functions Pub/Subメッセージをトリガーとしてサーバーレス関数を実行
Cloud Run Pub/Subメッセージに応じてサーバーレスコンテナを実行
Dataflow Pub/Subからのストリームデータを処理するパイプラインを構築
BigQuery Pub/Subからのデータを直接BigQueryに取り込み
Cloud Storage Cloud Storageのイベント通知をPub/Subトピックに送信
Cloud Logging ログエントリをPub/Subトピックにエクスポート
Cloud Tasks Pub/Subメッセージに基づいてタスクを生成

Cloud Functionsとの統合例

gcloud functions deploy process-pubsub-message \
    --runtime nodejs14 \
    --trigger-topic=my-topic \
    --entry-point=processMessage

Pub/Subのベストプラクティス

べき等性の実装例

メッセージが複数回処理されても安全なように、べき等性を確保する方法:

まとめ

Pub/Sub は、Google Cloud Platform上での非同期メッセージングを簡素化するフルマネージドサービスです。高いスケーラビリティと信頼性を備え、分散システムやマイクロサービスアーキテクチャにおけるコンポーネント間の疎結合な通信を実現します。

Pub/Subの主な利点は以下の通りです:

Pub/Subを効果的に活用するには、適切なトピックとサブスクリプションの設計、べき等性の確保、バッチ処理の最適化、モニタリング戦略の実装が重要です。また、他のGCPサービスと組み合わせることで、より強力なイベント駆動型アーキテクチャを構築することができます。