Pub/Sub(Publish/Subscribe)は、Google Cloud Platform (GCP) が提供するフルマネージドのメッセージングサービスです。アプリケーション間で非同期にメッセージを送受信するためのプラットフォームを提供し、分散システムやマイクロサービスアーキテクチャにおけるコンポーネント間の疎結合な通信を実現します。Pub/Subは、高いスケーラビリティと信頼性を備え、グローバルに分散したアプリケーション間でもリアルタイムなメッセージ配信を可能にします。
Pub/Subは以下の主要コンポーネントで構成されています:
コンポーネント | 説明 |
---|---|
トピック | メッセージが発行される名前付きのリソース。パブリッシャーはトピックにメッセージを送信する |
サブスクリプション | トピックからメッセージを受信するための名前付きのリソース。サブスクライバーはサブスクリプションを通じてメッセージを受信する |
メッセージ | パブリッシャーからサブスクライバーに送信されるデータの単位。メッセージ本文と属性(キーと値のペア)で構成される |
パブリッシャー | トピックにメッセージを送信するアプリケーションまたはサービス |
サブスクライバー | サブスクリプションからメッセージを受信するアプリケーションまたはサービス |
アクノリッジメント | サブスクライバーがメッセージを正常に処理したことを確認する信号 |
Pub/Subと他のメッセージングサービスには以下のような違いがあります:
Pub/Subは、Google Cloud Console、gcloud CLIツール、クライアントライブラリ、またはRESTful APIを使用して利用できます。
gcloud pubsub topics create my-topic
gcloud pubsub topics update my-topic \
--message-retention-duration=7d
gcloud pubsub subscriptions create my-pull-subscription \
--topic=my-topic \
--ack-deadline=60 \
--message-retention-duration=7d
gcloud pubsub subscriptions create my-push-subscription \
--topic=my-topic \
--push-endpoint=https://example.com/push \
--ack-deadline=60
gcloud pubsub topics publish my-topic \
--message="Hello, World!" \
--attribute="key1=value1,key2=value2"
gcloud pubsub subscriptions pull my-pull-subscription \
--auto-ack \
--limit=10
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();
async function publishMessage() {
const topicName = 'my-topic';
const data = Buffer.from('Hello, World!');
const messageId = await pubsub.topic(topicName).publish(data, {
key1: 'value1',
key2: 'value2'
});
console.log(`Message ${messageId} published.`);
return messageId;
}
publishMessage();
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();
function listenForMessages() {
const subscriptionName = 'my-pull-subscription';
const subscription = pubsub.subscription(subscriptionName);
const messageHandler = message => {
console.log(`Received message ${message.id}:`);
console.log(`Data: ${message.data.toString()}`);
console.log(`Attributes: ${JSON.stringify(message.attributes)}`);
// メッセージを確認(アクノリッジ)
message.ack();
};
subscription.on('message', messageHandler);
console.log(`Listening for messages on ${subscriptionName}...`);
}
listenForMessages();
from google.cloud import pubsub_v1
def publish_message():
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path('my-project', 'my-topic')
data = 'Hello, World!'.encode('utf-8')
attributes = {
'key1': 'value1',
'key2': 'value2'
}
future = publisher.publish(topic_path, data, **attributes)
message_id = future.result()
print(f'Message {message_id} published.')
return message_id
publish_message()
from google.cloud import pubsub_v1
def listen_for_messages():
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path('my-project', 'my-pull-subscription')
def callback(message):
print(f'Received message {message.message_id}:')
print(f'Data: {message.data.decode("utf-8")}')
print(f'Attributes: {message.attributes}')
# メッセージを確認(アクノリッジ)
message.ack()
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f'Listening for messages on {subscription_path}...')
try:
streaming_pull_future.result()
except KeyboardInterrupt:
streaming_pull_future.cancel()
subscriber.close()
listen_for_messages()
属性に基づいてメッセージをフィルタリングするサブスクリプションを作成できます:
gcloud pubsub subscriptions create my-filtered-subscription \
--topic=my-topic \
--filter="attributes.key1 = \"value1\" AND attributes.key2 = \"value2\""
処理できないメッセージを特別なトピックに転送するデッドレターポリシーを設定できます:
gcloud pubsub subscriptions create my-subscription \
--topic=my-topic \
--dead-letter-topic=my-dead-letter-topic \
--max-delivery-attempts=5
メッセージの順序を保証するには、順序キーを使用します:
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();
async function publishOrderedMessages() {
const topicName = 'my-topic';
const topic = pubsub.topic(topicName, {
messageOrdering: true
});
// 同じ順序キーを持つメッセージは順序通りに配信される
const orderingKey = 'user-123';
for (let i = 1; i <= 10; i++) {
const data = Buffer.from(`Message ${i}`);
const messageId = await topic.publish(data, {}, orderingKey);
console.log(`Message ${messageId} published with ordering key: ${orderingKey}`);
}
}
publishOrderedMessages();
メッセージの構造を定義し、検証するためのスキーマを使用できます:
# Avroスキーマの作成
gcloud pubsub schemas create my-schema \
--type=AVRO \
--definition="{\"type\":\"record\",\"name\":\"MyRecord\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"value\",\"type\":\"int\"}]}"
# スキーマをトピックに関連付け
gcloud pubsub topics create my-schema-topic \
--schema=my-schema \
--message-encoding=JSON
Pub/Subは、メッセージングシステムのセキュリティを確保するための様々な機能を提供しています:
IAMを使用して、トピックとサブスクリプションへのアクセスを制御できます:
# トピックへの発行権限を付与
gcloud pubsub topics add-iam-policy-binding my-topic \
--member=serviceAccount:my-service-account@my-project.iam.gserviceaccount.com \
--role=roles/pubsub.publisher
# サブスクリプションへのサブスクライブ権限を付与
gcloud pubsub subscriptions add-iam-policy-binding my-subscription \
--member=serviceAccount:my-service-account@my-project.iam.gserviceaccount.com \
--role=roles/pubsub.subscriber
Pub/Subは、保存時と転送時のデータを自動的に暗号化します。さらに、カスタマー管理の暗号化キー(CMEK)を使用することもできます:
gcloud pubsub topics create my-cmek-topic \
--topic-encryption-key=projects/my-project/locations/global/keyRings/my-keyring/cryptoKeys/my-key
VPCサービスコントロールを使用して、Pub/Subへのアクセスをセキュリティ境界内に制限できます。
Pub/SubはGoogle Cloud Monitoringと統合されており、トピックとサブスクリプションの状態を監視できます:
Cloud Loggingを使用して、Pub/Subの操作ログを分析できます:
gcloud logging read 'resource.type="pubsub_topic" AND resource.labels.topic_id="my-topic"'
Pub/Subは以下のようなユースケースに適しています:
Pub/Subは他のGoogle Cloudサービスと統合して、より強力なソリューションを構築できます:
サービス | 統合の利点 |
---|---|
Cloud Functions | Pub/Subメッセージをトリガーとしてサーバーレス関数を実行 |
Cloud Run | Pub/Subメッセージに応じてサーバーレスコンテナを実行 |
Dataflow | Pub/Subからのストリームデータを処理するパイプラインを構築 |
BigQuery | Pub/Subからのデータを直接BigQueryに取り込み |
Cloud Storage | Cloud Storageのイベント通知をPub/Subトピックに送信 |
Cloud Logging | ログエントリをPub/Subトピックにエクスポート |
Cloud Tasks | Pub/Subメッセージに基づいてタスクを生成 |
gcloud functions deploy process-pubsub-message \
--runtime nodejs14 \
--trigger-topic=my-topic \
--entry-point=processMessage
メッセージが複数回処理されても安全なように、べき等性を確保する方法:
Pub/Sub は、Google Cloud Platform上での非同期メッセージングを簡素化するフルマネージドサービスです。高いスケーラビリティと信頼性を備え、分散システムやマイクロサービスアーキテクチャにおけるコンポーネント間の疎結合な通信を実現します。
Pub/Subの主な利点は以下の通りです:
Pub/Subを効果的に活用するには、適切なトピックとサブスクリプションの設計、べき等性の確保、バッチ処理の最適化、モニタリング戦略の実装が重要です。また、他のGCPサービスと組み合わせることで、より強力なイベント駆動型アーキテクチャを構築することができます。