产品定价 立即试用
PE MQTT Broker
文档 > 其他功能 > 背压
入门
安装 架构 API 常见问题
目录

背压

在TBMQ这类高吞吐消息系统中,背压(backpressure) 处理对保证稳定性、维持性能、避免高负载下OOM至关重要。 背压可发生在两个方向:入站(数据从发布者流入broker)和出站(broker向订阅者投递数据)。 TBMQ针对两者均做了有效处理。

TBMQ使用Netty作为MQTT通信的骨干,处理所有底层网络和I/O操作。 虽然Netty提供高性能和可扩展性,但也需要仔细控制入站和出站消息流。

入站背压

TBMQ的架构设计能够处理来自发布者的几乎无限的负载。 传入消息不会无限期存储在内存中——而是立即持久化到Kafka,Kafka作为进一步处理和路由的骨干。 这种设计确保即使在极端的发布者吞吐量下,内存使用仍然稳定且可预测。

为支持不断增长的工作负载,TBMQ可通过部署多个broker实例进行水平扩展,分配负载并增加吞吐容量。 然而,当用户不希望水平扩展或避免大量投资基础设施或高级配置调优时,TBMQ提供了额外的机制来有效管理传入流量。

这些机制包括通过Netty的socket接收缓冲区控制的TCP级别背压,以及允许执行按客户端和集群范围消息速率策略的应用级别速率限制。 这些选项共同提供了灵活、经济的方式来保护broker免受过载,确保在不同流量条件下的稳定性能。

TCP级别背压

关键机制之一是通过Netty socket接收缓冲区实现的TCP级别背压。可使用so_receive_buffer参数配置此缓冲区:

1
2
3
4
# Socket receive buffer size for Netty in KB.
# If the buffer limit is reached, TCP will trigger backpressure and notify the sender to slow down.
# If set to 0 (default), the system's default buffer size will be used.
so_receive_buffer: "${NETTY_SO_RECEIVE_BUFFER:0}"

当接收缓冲区已满且未能足够快地排空(例如由于高负载或下游处理缓慢)时,TCP将通知远端发送者施加背压。 这使broker能够自然地减缓入站流量,而不会立即断开连接或使内存过载。

在大多数情况下,建议将此值保持为0,让操作系统应用优化的默认值。 仅在性能分析后的低延迟或高吞吐场景中,或当你需要更严格地控制内存使用和背压行为时,才考虑调整此参数。

速率限制

虽然不是反应式背压机制,但TBMQ中的速率限制通过主动控制传入消息量作为额外的保护层。 它通过在系统过载之前执行流量约束来补充真正的背压机制。 TBMQ支持集群范围的速率限制(控制总传入流量)和按客户端的速率限制(防止单个发布者压垮broker)。 这些设置允许运维人员定义消息速率策略,帮助维护系统稳定性、客户端之间的公平性,并防范流量峰值。

1
2
3
4
5
6
7
8
9
10
11
12
rate-limits:
  total:
    # Enable/disable total incoming and outgoing messages rate limits for the broker (per whole cluster)
    enabled: "${MQTT_TOTAL_RATE_LIMITS_ENABLED:false}"
    # Limit the total message rate across the cluster (e.g., 1000 messages per second, 50000 per minute)
    config: "${MQTT_TOTAL_RATE_LIMITS_CONFIG:1000:1,50000:60}"

  incoming-publish:
    # Enable/disable per-client publish rate limits
    enabled: "${MQTT_INCOMING_RATE_LIMITS_ENABLED:false}"
    # Limit how many messages each client can send over time (e.g., 10 messages per second, 300 per minute)
    client-config: "${MQTT_INCOMING_RATE_LIMITS_CLIENT_CONFIG:10:1,300:60}"

TCP背压和可配置的速率限制共同使TBMQ具有高度弹性,能够在任何内部处理瓶颈或内存压力出现之前自我调节流量。

出站背压

如果订阅者跟不上,broker的出站通道缓冲区可能会不堪重负。 没有背压控制,这可能导致不受控的内存增长,最终导致broker内存耗尽。 为解决此问题,TBMQ引入了背压感知投递机制,当Netty通道变为不可写时检测到并暂时暂停消息投递。 一旦通道再次变为可写,投递会自动恢复。 这确保了即使在高负载下也能高效使用内存和稳定运行。

Netty通道可写性监控

TBMQ使用Netty作为底层网络框架,Netty内置了对每个通道可写性channelWritabilityChanged事件)的监控支持。 这使TBMQ能够检测订阅者的连接何时被出站数据压垮,并通过暂停向该通道的进一步写入来施加背压。

Netty根据写缓冲区水位线(一对阈值)来确定可写性:

  • 高水位线:如果出站缓冲区大小超过此阈值,通道被标记为不可写。TBMQ将停止向该客户端发送消息,直到缓冲区排空。
  • 低水位线:当缓冲区大小降至此值以下时,通道再次变为可写,TBMQ恢复消息投递。

这些阈值可通过环境变量配置:

  • NETTY_WRITE_BUFFER_LOW_WATER_MARK——定义低水位线(字节),默认:32768,即32 KB
  • NETTY_WRITE_BUFFER_HIGH_WATER_MARK——定义高水位线(字节),默认:65536,即64 KB

这些值在Netty服务器启动时通过WRITE_BUFFER_WATER_MARK通道选项应用。

通过利用此机制,TBMQ确保没有客户端连接因未检查的消息投递而消耗过多内存。 相反,投递会根据通道健康状况动态暂停和恢复,在负载下保持broker稳定性。

非持久和持久客户端的处理

TBMQ根据订阅客户端是持久的还是非持久的来区分背压行为,确保内存和存储资源的高效使用。

非持久客户端

对于非持久客户端,如果通道变为不可写,TBMQ不会存储消息。相反,当检测到背压时:

  • broker跳过向该客户端的消息投递。
  • 这些丢弃的消息不会保留或重试,这符合MQTT对非持久会话的预期。
  • 维护一个全局丢弃消息计数器,以跟踪因背压而跳过了多少消息。此指标提供了高负载下系统行为的可见性,并有助于识别瓶颈。

这种方法避免了不期望维护状态的短生命周期或不可靠客户端的内存堆积。

持久客户端

持久客户端具有消息投递保证,因此跳过消息是不可接受的。 TBMQ通过使用持久存储进行消息排队并根据通道可写性控制投递,确保即使在背压下也能保持持久性。

对于Device客户端,消息在投递前存储在Redis中。如果通道变为不可写,消息发送暂停。 一旦通道再次变为可写,TBMQ通过从Redis读取待处理消息恢复投递。

  • Redis有按客户端的消息队列限制(如10,000条消息)。如果在客户端变为可写之前超过此限制,较旧的消息可能会被丢弃。
  • 此限制可通过环境变量MQTT_PERSISTENT_SESSION_DEVICE_PERSISTED_MESSAGES_LIMIT配置。
  • 此外,Redis中存储的每条消息都有生存时间(TTL),以确保过期消息最终被清理。 TTL可通过环境变量MQTT_PERSISTENT_SESSION_DEVICE_PERSISTED_MESSAGES_TTL配置。 对于指定了message expiry interval的MQTT5.0客户端,TBMQ会尊重客户端定义的值并使用它来替代配置的默认值。

对于Application客户端,消息存储在Kafka中。如果到客户端的通道变为不可写, TBMQ会临时暂停该客户端的Kafka消费者以避免轮询和缓冲不必要的消息。 一旦通道变为可写,消费者恢复并继续消息投递。

  • Kafka’s retention policy ensures that even when consumers are paused, messages remain available for a defined period:
    1
    2
    
    retention.ms=604800000 (7 days)
    retention.bytes=1048576000 (1 GB)
    
  • 这些设置可通过环境变量TB_KAFKA_APP_PERSISTED_MSG_TOPIC_PROPERTIES自定义。

此机制确保持久客户端即使在背压下也能可靠地接收消息,而不会使broker过载或丢失数据。

共享订阅与背压处理

TBMQ还在共享订阅的上下文中应用背压处理逻辑,确保所有订阅类型的可靠高效消息投递。 共享订阅组可包含一个或多个订阅者,消息根据MQTT5.0规则在它们之间分配。 当检测到背压时,broker根据共享组的类型和持久化级别调整投递。

非持久共享订阅组

如果组中的订阅者变为不可写,TBMQ跳过该订阅者并尝试将消息投递给组中另一个可写的订阅者。 如果组中所有订阅者都是不可写的,消息将被完全丢弃,不会排队或保留。 此行为符合对非持久客户端的预期,在过载条件下消息丢失是可接受的。

持久Device共享订阅组

如果订阅者不可写,会被跳过,消息路由到同一组中另一个可写的订阅者。 如果所有订阅者均不可写,消息被保存到Redis,使用与共享订阅键关联的按组队列。 Redis确保一旦组中任何订阅者再次变为可写,投递将从存储的消息恢复。 队列大小和TTL通过与单个持久Device客户端相同的配置控制。

持久Application共享订阅组

当组中的订阅者变为不可写时,TBMQ将其从与共享订阅关联的Kafka消费者组中移除。 其他可写的订阅者继续正常从Kafka轮询消息。 如果组中所有订阅者都变为不可写,消费者组暂时变为空,不再轮询消息。 Kafka根据主题配置的保留策略(环境变量TB_KAFKA_APP_PERSISTED_MSG_SHARED_TOPIC_PROPERTIES)保留未投递的消息, 确保一旦任何订阅者变为可写并重新加入组,消息投递即恢复。

此方法确保TBMQ即使在压力下处理共享订阅时也能维持性能、可靠性和资源效率。每种策略都针对组中客户端的持久化级别进行了定制。

建议

为最大化TBMQ背压处理的有效性并确保在可变负载下的系统弹性,我们建议如下:

  • 监控不可写客户端数量:使用nonWritableClients计数器跟踪当前处于出站背压下的客户端数量。此指标在日志和监控系统(如Prometheus)中均可用。对于生产环境,建议在该值意外增加或持续升高时设置告警。
  • 从默认背压设置开始:对于大多数部署,默认Netty缓冲区阈值——32 KB低水位线和64 KB高水位线——提供了强健的性能。这些设置已经过测试,在典型条件下可支持每个订阅者约10,000条消息/秒
  • 确保充足的Redis和Kafka容量:持久客户端缓冲依赖Redis和Kafka。监控它们的内存、磁盘和吞吐量以避免次级瓶颈。
  • 使用水平扩展:对于持续的高吞吐量,水平扩展broker节点。背压不能替代充足的计算和I/O资源。
  • 在负载下测试:使用模拟的慢速和快速消费者进行负载测试,以验证你的配置在实际场景中如何处理背压。

通过遵循这些实践,你可以充分利用TBMQ的背压处理机制,确保即使在要求苛刻的MQTT工作负载中也能可靠运行、高效使用资源和高性能。

总结

TBMQ中的背压处理机制在处理不同客户端消费速率时显著增强了broker的弹性和效率。 通过动态监控通道可写性、智能控制消息投递,以及集成传输层和应用层流量控制, TBMQ确保了可靠的性能和最优的资源利用——即使在持续或突发的高负载条件下也是如此。 这使TBMQ非常适合大规模的高要求MQTT工作负载。