产品定价 立即试用
MQTT Broker
文档 > MQTT功能 > 共享订阅
入门
安装 架构 API 常见问题
目录

共享订阅

共享订阅是MQTT v5引入的高级能力,用户期待已久。 TBMQ不限定仅MQTT v5客户端使用,任意协议版本的客户端均可利用此特性。 官方文档对共享订阅有详细说明, 本教程聚焦该功能的核心要点。 理解并掌握共享订阅后,用户可在MQTT交互中充分发挥这一能力。

什么是共享订阅?

在传统或标准订阅中,每个已订阅客户端都会收到匹配订阅主题的每条发布消息的一份副本。 该方式确保所有客户端收到同一组消息。 共享订阅则引入更高效的机制,在指定组内的多个客户端间分配订阅负载。 这一称为客户端负载均衡的技术,使MQTT客户端能更高效地共同处理入站消息流, 优化网络带宽并降低整体处理开销。

共享订阅通过特定格式的主题过滤器标识。 共享订阅主题过滤器格式如下:

1
$share/{ShareName}/{TopicFilter}

其中:

  • $share —— 表示该订阅为共享订阅的常量。
  • {ShareName} —— 共享订阅的标识符,用于区分其他共享订阅。
  • {TopicFilter} —— 表示用于订阅的主题过滤器,与普通订阅类似。 可包含通配符如 #+ 以匹配多个主题。

例如,以下为共享订阅:

1
$share/group1/country/+/city/+/home/#

共享订阅用例

共享订阅可根据适用性和优势应用于多种场景。 以下是共享订阅特别有益的几个常见场景:

  • 后端应用的水平扩展。 当特定后端应用需要订阅MQTT消息流并处理大量消息时,共享订阅支持水平扩展。多个应用实例可加入共享订阅组,分配工作负载并提高可扩展性。
  • 高消息速率主题。 当某些主题的消息速率过高,单个客户端难以处理时,可使用共享订阅。通过在共享订阅组内的多个客户端之间分配负载,消息处理更高效,避免单个客户端过载。
  • 均衡系统资源。 共享订阅有助于均匀分配系统资源,降低瓶颈风险。通过利用共享订阅组内的多个客户端,可以优化消息处理和资源利用,确保系统均衡高效运行。

总之,共享订阅为涉及后端应用、高消息速率和资源优化的场景提供了灵活性和可扩展性,实现更好的MQTT消息流管理。

消息顺序保证

根据MQTT5.0规范,共享订阅不保证消息顺序。 协议允许broker向共享组中的任何客户端投递消息,而不保留消息发布的顺序。 这意味着虽然单个客户端可能按分配顺序接收消息,但整个组的全局顺序不会维护。 如果严格的消息顺序对你的应用至关重要,请考虑使用单个订阅者而非共享订阅组。

订阅共享订阅

在本教程中,我们将连接DEVICE非持久客户端并使用Mosquitto客户端库。 Ubuntu用户可使用以下命令安装:

1
sudo apt-get install mosquitto-clients

要启动共享订阅,请在两个独立终端中执行以下命令:

1
mosquitto_sub -d -h "YOUR_MQTT_BROKER_HOST" -p 1883 -t '$share/group/home/temp' -q 1 -V mqttv5 -i client1
1
mosquitto_sub -d -h "YOUR_MQTT_BROKER_HOST" -p 1883 -t '$share/group/home/temp' -q 1 -V mqttv5 -i client2

注意: 请将YOUR_MQTT_BROKER_HOST替换为你的主机名。 确保认证已禁用,否则请相应调整本指南中的命令。

例如,要使用系统默认的MQTT客户端凭据连接到本地部署的TBMQ,请运行以下命令:

1
mosquitto_sub -d -h "localhost" -p 1883 -t '$share/group/home/temp' -q 1 -V mqttv5 -i client1 -u tbmq_websockets_username
1
mosquitto_sub -d -h "localhost" -p 1883 -t '$share/group/home/temp' -q 1 -V mqttv5 -i client2 -u tbmq_websockets_username

结果,一个新的共享订阅被创建(ShareName为group),两个客户端(client1client2)订阅了home/temp主题。 两个客户端将均匀接收该主题上发布的消息。

为确保共享订阅中的客户端接收消息,你可以向broker发布一些指向home/temp主题的消息。 执行以下命令:

1
mosquitto_pub -d -h "YOUR_MQTT_BROKER_HOST" -p 1883 -t 'home/temp' -m 32 -q 1

使用以下命令通过默认凭据连接到本地部署的TBMQ:

1
mosquitto_pub -d -h "localhost" -p 1883 -t 'home/temp' -m 32 -q 1 -u tbmq_websockets_username

共享订阅组内的订阅数可通过将新客户端订阅到组中来增加, 或通过取消现有客户端的订阅来减少。 这种动态调整为管理共享订阅的负载分配和扩展提供了灵活性。

让我们看看共享订阅处理的实际效果:

image

共享订阅负载均衡策略

目前,TBMQ支持共享订阅的ROUND_ROBIN负载均衡策略类型。 这意味着共享订阅的入站消息以轮询方式均匀分配给已订阅的客户端。 组中的每个客户端按顺序接收消息,轮流处理消息负载。 我们正在持续增强TBMQ,计划在不久的将来引入更多负载均衡策略类型, 可能包括随机和基于哈希的负载均衡策略。请关注更新,了解TBMQ能力的扩展。

共享订阅与客户端类型

TBMQ中的DEVICEAPPLICATION客户端实现方式不同,这影响了共享订阅功能的使用方式以及每种客户端类型的消息处理方式。

如果你创建具有相同结构的共享订阅,并同时使用DEVICE和APPLICATION客户端订阅,TBMQ会将它们视为独立的共享订阅组。 这意味着发布到共享订阅主题的消息仅在同类型客户端之间分配。 DEVICE客户端在DEVICE共享订阅组内接收消息,而APPLICATION客户端在APPLICATION共享订阅组内接收消息。

因此,在TBMQ中使用共享订阅时,考虑客户端类型非常重要, 因为消息将根据客户端类型在各自的共享订阅组内进行相应的处理和分配。

DEVICE客户端类型

从用户角度来看,在TBMQ中为DEVICE客户端使用共享订阅功能是无缝的。 只需将客户端订阅到共享订阅即可,该功能将按预期工作。

但是,当共享订阅组中涉及持久客户端时有一些注意事项:

  1. 如果共享订阅组包含一些持久客户端,它们也会共享订阅主题的消息负载,直到它们离线。一旦离线,消息将不再分配给它们。
  2. 另一方面,如果共享订阅组完全由持久客户端组成,且它们全部离线,新接收的消息将按共享订阅组存储在Redis数据库中。一旦组中的第一个客户端重新连接到broker,它将接收与共享订阅相关的已存储持久消息。

这些注意事项确保包含持久客户端的共享订阅组的消息分配和持久化得到适当处理。

建议: 避免在同一共享订阅组内混合使用持久和非持久客户端。 这种设置可能导致不一致的投递行为,并使判断哪些客户端接收哪些消息变得更加困难。 为了可预测的消息流和更好的调试,建议每个组使用全部持久客户端或全部非持久客户端。

APPLICATION客户端类型

要在TBMQ中为APPLICATION客户端使用共享订阅功能,需要执行额外步骤。 首先,需要在PostgreSQL数据库中创建Application Shared Subscription实体。 请按照以下指南中的说明操作。 也可通过REST API完成,详细说明请参阅文档。 实体创建过程包括自动创建相应的Kafka主题。

文档信息图标

If this step is missed, the persistent APPLICATION client will be prevented from subscribing to the shared subscription.

创建的Kafka主题命名规则遵循以下格式:

1
tbmq.msg.app.shared.$TOPIC_FILTER

其中$TOPIC_FILTER表示共享订阅的主题过滤器。

重要提示: 如果主题过滤器包含非字母数字字符(以及’/’、’+’、’#’以外的字符),将使用从主题过滤器派生的哈希值来构建Kafka主题。 这确保了与Kafka命名规范的兼容性,因为某些特殊字符可能不允许出现在主题名称中。 通过生成哈希值,任何不支持的字符都能得到安全处理,确保正确的主题创建和功能。

1
tbmq.msg.app.shared.$TOPIC_FILTER_HASH

实体创建后,即可开始使用共享订阅功能。 当客户端连接到broker并发起共享订阅时,会创建一个新的Kafka消费者并添加到Consumer Group(CG)中。 随着更多客户端订阅,它们的消费者也会被添加到CG中,使它们能够共享消息负载。 同样,当客户端取消订阅时,其消费者将从CG中移除,组会自动重新平衡。

重要的是,即使所有APPLICATION客户端都离线,发布到共享订阅的消息仍会继续写入Kafka。 这些消息会被保留,并在组中任何客户端重新连接时投递,确保不会丢失数据。

对Kafka能力的利用为TBMQ中APPLICATION客户端的共享订阅提供了增强的性能、可扩展性和可靠性。 通过利用Kafka的特性,系统能够有效管理和分配已订阅客户端之间的工作负载,确保最佳性能和容错能力。

共享订阅中的服务质量(QoS)

MQTT中的共享订阅遵循每个订阅者请求的服务质量(QoS)级别。 broker对共享订阅应用与常规订阅相同的QoS处理规则,但消息投递在共享组内的客户端之间分配。

要点:

  • 有效QoS是发布者和订阅者QoS的最小值。
  • 如果共享组中的订阅者使用不同的QoS级别,每个客户端按其请求的QoS接收消息。
  • 无论QoS如何,组中只有一个客户端接收每条匹配的消息。
  • QoS不影响负载均衡行为——它仅定义每个客户端的投递保证。

最佳实践:

建议共享组内的所有订阅者使用相同的QoS级别。 这种一致性简化了消息流,有助于确保可预测的行为,并使跨客户端调试和监控消息投递更加容易。