产品定价 立即试用
PE MQTT Broker
API > REST APIs > Application Shared Subscriptions Management
入门 文档 安装 架构
常见问题
目录

应用共享订阅实体

Application Shared Subscription实体提供为 APPLICATION 客户端使用共享订阅的能力。该功能允许多个客户端订阅并接收来自共享订阅的消息。 创建Application Shared Subscription实体后,会自动创建对应的Kafka主题。 该Kafka主题作为该共享订阅所有消息的存储库。

要在系统中创建新的Application Shared Subscription实体,需要以Admin用户身份进行认证。

认证

要对 broker 执行管理操作,必须先登录系统并获取 Access Token。 该 Access Token 用于对管理操作进行认证和授权。

要获取 Access Token,可执行以下命令:

1
2
3
4
5
6
curl --location --request POST 'http://localhost:8083/api/auth/login' \
--header 'Content-Type: application/json' \
--data-raw '{
    "username":"sysadmin@thingsboard.org",
    "password":"sysadmin"
}'

请注意:若 broker 安装在远程服务器上,需将命令中的 “localhost” 替换为服务器公网 IP 或指定域名。 此外,请确保端口 8083 可从公网访问以建立连接。 同时,请将命令中的 “username” 和 “password” 替换为您环境中正确且有效的凭据。

授权成功后,响应中将包含一个名为 token 的字段。 后续所有 TBMQ 管理请求均需使用该 token。 为简化操作,您可以将 token 字段的值赋给名为 ACCESS_TOKEN 的环境变量, 或直接在本教程中涉及的请求里替换 $ACCESS_TOKEN 字符串。

1
export ACCESS_TOKEN=PLACE_YOUR_TOKEN_HERE

创建/更新Application Shared Subscription

1
2
3
4
5
6
7
8
curl --location --request POST 'http://localhost:8083/api/app/shared/subs' \
--header "X-Authorization: Bearer $ACCESS_TOKEN" \
--header 'Content-Type: application/json' \
--data-raw '{
    "name": "test",
    "partitions": 1,
    "topicFilter": "test/topic"
}'

执行上述请求后,将在PostgreSQL数据库中创建一个实体,并添加名为 tbmq.msg.app.shared.test.topic 的Kafka主题。 该Kafka主题将包含单个分区。 主题名称的构建方式是将 MQTT 主题过滤器映射到对应的 Kafka 主题。 通过遵循特定的命名约定(MQTT 主题过滤器 -> Kafka 主题)实现该映射。

1
2
3
test/topic -> tbmq.msg.app.shared.test.topic
test/# -> tbmq.msg.app.shared.test.mlw
test/+ -> tbmq.msg.app.shared.test.slw

其中

  • tbmq.msg.app.shared. 作为前缀添加
  • / 替换为 .
  • # 替换为 mlw(多级通配符)
  • + 替换为 slw(单级通配符)

若 MQTT 主题过滤器包含上述未提及的任何特殊字符(除字母数字外), 将使用主题过滤器派生的哈希创建 Kafka 主题。 此方式可确保生成的 Kafka 主题有效且符合必要的命名约定。

1
tbmq.msg.app.shared.$TOPIC_FILTER_HASH

上述行为可通过 TB_APP_PERSISTED_MSG_SHARED_TOPIC_VALIDATION 属性进行调控。 默认情况下该变量已启用,表示验证流程处于激活状态,确保正确创建主题。 但若将该变量设为 false 以禁用验证, 系统将不再为包含特殊字符的共享订阅创建 Kafka 主题, 导致无法创建对应主题。 配置环境和处理包含特殊字符的主题过滤器时请务必注意这一点。

请注意:实体创建后,无法更新 partitionstopicFilter 字段。 因此,在创建实体前,务必仔细考虑所需的主题过滤器和分区数量。 建议使用较多而非较少的分区数量创建实体。 这样可在未来添加新客户端到共享订阅时实现水平扩展。 若实体已使用不当的值或配置创建,则需删除该实体并重新创建具有正确值的新实体。 因此,在初次创建过程中务必谨慎并做出明智决策,以避免后续修改或重建。

举例而言,若预计有5个客户端将订阅某一主题过滤器, 建议将分区数量配置为5的倍数,例如5、10或15。 这样可确保负载在订阅者之间均匀分布,促进均衡处理并提升性能。

获取所有Application Shared Subscription实体

1
2
curl --location --request GET 'http://localhost:8083/api/app/shared/subs?pageSize=100&page=0' \
--header "X-Authorization: Bearer $ACCESS_TOKEN"

注意pageSize 参数为100,page 参数为0,因此上述请求将获取前100个Application Shared Subscription实体。

删除Application Shared Subscription实体

删除Application Shared Subscription实体后,对应的Kafka主题也将被删除。 注意:为此需将 TB_KAFKA_ENABLE_TOPIC_DELETION 环境变量设置为 true

1
2
curl --location --request DELETE 'http://localhost:8083/api/app/shared/subs/$APP_SHARED_SUBS_ID' \
--header "X-Authorization: Bearer $ACCESS_TOKEN"

请将 $APP_SHARED_SUBS_ID 替换为要删除的Application Shared Subscription实体的实际ID。