Hugo Future Imperfect Slim

Blackdiz's Garage

心得、筆記、雜記

3 分鐘

Pub/Sub 是 GCP 上類似 Message Queue 的服務,為了測試方便,Google 提供了可在本機端啟動的模擬程式讓開發者可以不用連上 GCP 直接使用本機的 Pub/Sub service。

Pub/Sub 的基本概念是我們在 Pub/Sub 上建立一個 TopicTopic 算是訊息通道的概念,接著我們要在程式中建立 Subscription 去訂閱這個 Topic,每當有訊息進入 Topic 時,有訂閱的程式就接收這個訊息進行處理。

Subscription 分為 Pull SubscriptionPush SubscriptionPull Subscription 是程式必須主動去跟 Topic 取回訊息來處理,而 Push Subscription 則是我們在建立 Subscription 時預先指定一個 HTTPS 的 URL,當有訊息進到 Topic 時,Pub/Sub 會將訊息推送到這個 URL,更詳細的說明可以參照 Subscriber overview

除了 TopicSubscription,最後我們需要 Publisher 負責產生訊息並傳送到 Topic 中,整個架構圖為:

Picture from Google Cloud

這篇算是記錄一下整個安裝過程和使用 Java 程式連接本機 Pub/Sub 的程式片段。

安裝 Google Cloud SDK

首先需要安裝 Google Cloud SDK,目前只有在 MAC 上安裝,所以下面只簡單記錄一下 MAC 下的安裝步驟,其他環境可以到 Google Cloud SDK Guide 有詳細的說明。

如果 Mac 是 M1 版本下載 google-cloud-sdk-354.0.0-darwin-arm.tar.gz,非 M1 則下載 google-cloud-sdk-354.0.0-darwin-x86_64.tar.gz,解壓縮後執行:

./google-cloud-sdk/install.sh && ./google-cloud-sdk/bin/gcloud init

安裝 Pub/Sub emulator

執行:

gcloud components install pubsub-emulator
gcloud components update

啟動 Pub/Sub emulator

執行:

gcloud beta emulators pubsub start --host-port=0.0.0.0:8085 --project=PUBSUB_PROJECT_ID

PUBSUB_PROJECT_ID 可以自由命名,在後面連線到 Pub/Sub 時會用到。另外 Pub/Sub 每次啟動時的 listen port 是隨機的,為了方便起見,我們用 --host-port=0.0.0.0:8085 指定 listen port 為 8085

接著設定環境變數 PUBSUB_EMULATOR_HOST,執行:

$(gcloud beta emulators pubsub env-init)

會自動取得 Pub/Sub 啟動時的 listen port 設為環境變數。

這邊為了方便寫了簡單的 script 一次做完並將 Pub/Sub 的 log 輸出到檔案中:

#!/bin/bash
# 將 log 輸出到 ~/pubsub.log,可以依需要自行修改路徑
gcloud beta emulators pubsub start --host-port=0.0.0.0:8085 --project=PUBSUB_PROJECT_ID >> ~/pubsub.log 2>&1 &

echo "Set PUBSUB_EMULATOR_HOST"

# 這裡不確定原因但用 $(gcloud beta emulators pubsub env-init) 無法正常設置環境變數。因為我們已經指定了 listen port,所以直接用 export 指定即可
export PUBSUB_EMULATOR_HOST=0.0.0.0:8085

使用 Java 程式連線

建立 TopicSubscription

 // 從環境變數 PUBSUB_EMULATOR_HOST 取得 Pub/Sub 的 listen port
String hostport = System.getenv("PUBSUB_EMULATOR_HOST");
ManagedChannel channel = ManagedChannelBuilder.forTarget(hostport).usePlaintext().build();
TransportChannelProvider channelProvider = FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));

CredentialsProvider credentialsProvider = NoCredentialsProvider.create();
TopicAdminClient topicClient =
        TopicAdminClient.create(
                TopicAdminSettings.newBuilder()
                        .setTransportChannelProvider(channelProvider)
                        .setCredentialsProvider(credentialsProvider)
                        .build());

// 這裡的 PUBSUB_PROJECT_ID 即之前啟動 Pub/Sub 時設定的 PUBSUB_PROJECT_ID,而 PUBSUB_TOPIC_ID 則是 Topic 的 id 可以自行設定不同名稱
TopicName topicName = TopicName.of(PUBSUB_PROJECT_ID, PUBSUB_TOPIC_ID);

// 先確定同樣 id 的 Topic 未建立過再建立新的 Topic,否則就重用之前建立 Topic 就好
Topic topic = topicClient.getTopic(topicName);
if (topic == null) {
    topic = topicClient.createTopic(topicName);
}

SubscriberStubSettings subscriberStubSettings = SubscriberStubSettings.newBuilder()
    .setTransportChannelProvider(channelProvider)
    .setCredentialsProvider(credentialsProvider)
    .build();

SubscriptionAdminSettings subscriptionAdminSettings = SubscriptionAdminSettings.newBuilder()
        .setCredentialsProvider(credentialsProvider)
        .setTransportChannelProvider(channelProvider)
        .build();
SubscriptionAdminClient client = SubscriptionAdminClient.create(subscriptionAdminSettings);

// 因為我們要建立的是 Push Subscription,所以要指定 endpoint,這裡的 https://localhost:8080/endpoint 依實際需要修改
PushConfig pushConfig = PushConfig.newBuilder().setPushEndpoint("https://localhost:8080/endpoint").build();

// 同樣的,PUBSUB_PROJECT_ID 為啟動時設定的值,而每個 Subscription 需要指定名稱,這裡用 subscription-name 可依需要自行修改
ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(PUBSUB_PROJECT_ID, "subscription-name");

client.createSubscription(subscriptionName, topicName, pushConfig, 0);

建立 Publisher 並傳送訊息:

 // 從環境變數 PUBSUB_EMULATOR_HOST 取得 Pub/Sub 的 listen port
String hostport = System.getenv("PUBSUB_EMULATOR_HOST");
ManagedChannel channel = ManagedChannelBuilder.forTarget(hostport).usePlaintext().build();

// 這裡的 PUBSUB_PROJECT_ID 即之前啟動 Pub/Sub 時設定的 PUBSUB_PROJECT_ID,而 PUBSUB_TOPIC_ID 則要使用剛剛建立 Subscription 時的相同 Topic id
TopicName topicName = ProjectTopicName.of(PUBSUB_PROJECT_ID, PUBSUB_TOPIC_ID);
Publisher publisher = null;

try {
    TransportChannelProvider channelProvider =
            FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
    CredentialsProvider credentialsProvider = NoCredentialsProvider.create();

    TopicAdminClient topicClient =
            TopicAdminClient.create(
                    TopicAdminSettings.newBuilder()
                            .setTransportChannelProvider(channelProvider)
                            .setCredentialsProvider(credentialsProvider)
                            .build());

    publisher = Publisher.newBuilder(topicName).setChannelProvider(channelProvider).setCredentialsProvider(credentialsProvider).build();

    // data 即為我們要傳送的訊息
    String data = "test message"
    PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

    ApiFuture<String> future = publisher.publish(pubsubMessage);

} finally {
    if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
    }
}

以上即是用 Java 連線 Pub/Sub 傳送訊息的程式片段。


參考資料:

如果有什麼想法或需要指正的地方,歡迎您留言或來信 😄

說些甚麼

留言

最新文章

分類

關於

I'm so weak, so I learn from every master for things that I don't know