向外部MQTT broker发布消息,使用QoS 1(至少一次)及可配置的保留设置。支持基于入站消息数据动态构建topic模式,及包括TLS/SSL加密在内的多种认证方式。
配置
Topic pattern
指定发布消息的MQTT主题。该字段为必填,支持 templatization 以根据入站消息data或metadata动态构造主题名。
Connection settings(连接设置)
- Host — MQTT broker的主机名或IP地址。该字段为必填。
- Port — MQTT broker的端口号。该字段为必填。
- Connection timeout (sec) — 建立与MQTT broker连接时的最大等待时间(秒),超时则连接失败。该字段为必填。
Client ID
MQTT客户端的可选标识符。若留空,节点将自动生成Client ID,格式为 netty-mqtt/ 后接8位随机字母数字字符。
自动生成Client ID示例:netty-mqtt/aB3dEf7G
Singleton mode and cluster deployment(单例模式与集群部署)
平台以集群(微服务)模式运行时,规则节点可在两种模式下运行:
- Singleton mode enabled(默认)— 无论集群中有多少规则引擎实例,规则节点仅在其中一个实例上启动,即只有一个MQTT客户端连接。
- Singleton mode disabled — 规则节点在所有规则引擎实例上启动。若有三个规则引擎实例,则会启动三个规则节点,建立三个MQTT客户端连接。
可通过节点配置界面中的 Singleton/Disabled 开关切换单例模式。
Multiple connections issue(多连接问题)
大多数MQTT broker不允许同一Client ID建立多个并发连接。当集群中多个规则节点实例使用相同Client ID尝试连接时,broker会拒绝重复的客户端连接,导致失败。
Solving the multiple connections issue(解决多连接问题)
有两种解决方案:
Option 1: Enable Singleton mode — 确保集群中仅启动一个规则节点实例,只建立一个broker客户端连接。由于仅有一个客户端,不会发生Client ID冲突。
Option 2: Enable Add Service ID as suffix to Client ID — 允许多个规则节点实例同时连接。启用时,Service ID会自动附加到指定的Client ID作为后缀。因每个规则引擎实例具有唯一Service ID,每个规则节点实例会创建具有唯一标识符的客户端。
示例:若Client ID为 my-client,三个规则引擎实例的Service ID分别为 tb-rule-engine-1、tb-rule-engine-2、tb-rule-engine-3,最终Client ID为:
my-client_tb-rule-engine-1my-client_tb-rule-engine-2my-client_tb-rule-engine-3
Client ID length restrictions(Client ID长度限制)
允许的最大Client ID长度取决于协议版本:
- MQTT 3.1:最大23字符
- MQTT 3.1.1和MQTT 5:最大256字符
使用 Add Service ID as suffix 时,请确保Client ID与Service ID后缀的总长度不超过上述限制。若生成的Client ID超出最大长度,节点将报错失败。
Message settings(消息设置)
Parse to plain text
控制发布前如何处理消息data:
- Disabled(默认)— 消息data原样发布。
- Enabled — 若消息data为JSON编码字符串(用双引号包裹),则移除外层JSON编码。
示例:
消息data:"\"Temperature is 25.5°C\""
- Parse to plain text OFF:发布
"\"Temperature is 25.5°C\""(带JSON编码) - Parse to plain text ON:发布
"Temperature is 25.5°C"(无外层引号)
Quality of Service(服务质量)
所有消息均以 QoS 1 (AT_LEAST_ONCE) 发布。这保证消息至少送达broker一次,网络异常时可能多次送达。QoS等级不可配置。
Retained(保留)
- Disabled(默认)— broker不保留消息。
- Enabled — broker保留消息并向topic的后续订阅者投递,即使他们在消息发布后才订阅。
保留消息适用于发布设备状态或配置,以便新订阅者订阅时立即收到。
Protocol version(协议版本)
选择连接使用的MQTT协议版本。可用选项:
- MQTT 3.1 — 旧版,Client ID限制23字符
- MQTT 3.1.1 — 最广泛支持的版本(默认)
- MQTT 5 — 最新版本,功能增强
Session settings(会话设置)
Clean session
- Enabled(默认)— broker不为客户端存储任何会话状态。客户端重连时从干净状态开始。
- Disabled — broker为客户端存储会话状态及未投递的QoS 1、QoS 2消息。客户端重连时可接收断开期间队列中的消息。
因本规则节点仅发布消息且不订阅任何topic,通常应保持clean session启用。
Security settings(安全设置)
Enable SSL
- Disabled(默认)— 连接使用明文TCP,无加密。
- Enabled — 连接使用TLS/SSL加密。启用时需配置凭据。
Credentials(凭据)
连接MQTT broker的认证凭据。可用凭据类型:
Anonymous(匿名)
无需认证。当MQTT broker允许匿名连接时使用。
Basic(基本)
用户名与密码认证。
配置:
- Username — MQTT broker认证用户名
- Password — MQTT broker认证密码
PEM Certificate(PEM证书)
使用PEM编码文件的证书认证。通过双向TLS提供最高安全级别。
配置:
- Server CA certificate file — 签署broker证书的CA证书,用于验证broker身份。
- Client certificate file — 客户端公钥证书,发送给broker用于客户端认证。
- Client private key file — 与客户端证书对应的客户端私钥。
- Private key password — 私钥文件加密时的可选密码。
Advanced settings(高级设置)
Force acknowledgement(强制确认)
强制确认机制由环境变量 ACTORS_RULE_EXTERNAL_NODE_FORCE_ACK 控制。该变量设为 true 时,适用于包括本MQTT节点在内的所有外部节点。
启用强制确认时的行为:
- 入站消息立即被确认并创建副本
- 执行MQTT发布操作
- 发布完成后,消息副本加入队列供下一节点处理
- 可避免慢速MQTT broker导致的消息处理超时
禁用强制确认时(默认):
- 原始入站消息在MQTT发布完成前保持占用
- 随后将消息传递至下一节点
MQTT retransmission(MQTT重传)
节点使用平台内部MQTT客户端,包含重传机制以提高QoS 1消息的可靠性。发送PUBLISH消息后,客户端等待broker确认。若在可配置的延迟时间内未收到确认,则重传消息。
重传间隔采用带抖动的指数退避策略:
- 延迟从初始值开始,每次重试翻倍
- 抖动因子引入随机偏差(±百分比)以避免多客户端同步重试
示例:最大尝试3次、初始延迟5,000 ms、抖动因子0.15时,重传大致发生在:
- 5,000 ms(±15%)
- 10,000 ms(±15%)
- 20,000 ms(±15%)
若所有重试后仍未收到确认,消息将被丢弃并经 Failure 连接路由并附带适当错误信息。
配置:
重传参数在 thingsboard.yml 中全局配置,适用于平台上所有MQTT客户端:
1
2
3
4
5
6
mqtt:
client:
retransmission:
max_attempts: "${TB_MQTT_CLIENT_RETRANSMISSION_MAX_ATTEMPTS:3}"
initial_delay_millis: "${TB_MQTT_CLIENT_RETRANSMISSION_INITIAL_DELAY_MILLIS:5000}"
jitter_factor: "${TB_MQTT_CLIENT_RETRANSMISSION_JITTER_FACTOR:0.15}"
JSON Schema
规则节点初始化
规则节点初始化时会建立与MQTT broker的连接:
- 节点使用配置的 Credentials 在 Host 和 Port 处连接broker。
- 连接使用配置的 Protocol version 和 Clean session 设置。
- 若启用 SSL,建立TLS安全连接。
- 连接须在 Connection timeout 规定时间内完成,否则初始化失败。
连接建立后,在规则节点生命周期内保持打开,随时可发布消息。
消息处理
对每条入站消息,节点执行以下步骤:
- 若启用 Force acknowledgement,入站消息立即被确认并创建副本。
- 节点处理 Topic pattern,用入站消息data和metadata中的值替换模板,构建最终MQTT topic。
- 若启用 Parse to plain text,处理消息data以移除外层JSON编码。
- 节点将消息payload发布到MQTT broker:
- 消息以 QoS 1 (AT_LEAST_ONCE) 发布到构建的topic。
- 按配置应用 Retained 标志。
- 发布操作完成时:
- 成功:原始消息(或启用强制确认时的消息副本)经
Success连接转发。 - 失败:在消息metadata的
error键下添加错误详情,消息经Failure连接转发。
- 成功:原始消息(或启用强制确认时的消息副本)经
规则节点关闭
规则节点关闭时,会断开与MQTT broker的连接并释放相关资源。
关闭发生在以下场景:
- Rule node configuration is updated — 节点被销毁并使用新配置重新初始化。
- Rule node is deleted — 节点被销毁且不重新初始化。
出站消息格式
出站消息格式取决于 Force acknowledgement 设置:
禁用强制确认时(默认):
- 出站消息为原始入站消息
- 消息data、metadata、originator和type保持不变
启用强制确认时:
- 出站消息为原始入站消息的副本
- 副本为独立消息实例,但包含相同的data、metadata、originator和type
失败时:
- 在消息metadata的
error键下添加错误详情,格式为:ExceptionClass: error message - 其他消息属性保持不变
输出连接
- Success
- 消息已成功发布到MQTT broker。
- Failure
- 因网络问题或broker错误导致发布失败。
- 处理过程中发生意外错误。
示例
示例1 — 向云MQTT broker发布telemetry
温度传感器发送需转发到云MQTT broker的telemetry数据。消息按设备类型和名称发布到对应topic。强制确认已禁用。
入站消息
Originator:DEVICE(Temperature Sensor)
Metadata:
1
2
3
4
5
{
"deviceType": "temperature",
"deviceName": "sensor-warehouse-01",
"ts": 1672531200000
}
Data:
1
2
3
4
{
"temperature": 22.5,
"humidity": 65
}
节点配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
{
"topicPattern": "factory/sensors/${deviceType}/${deviceName}",
"host": "mqtt.cloud-provider.com",
"port": 8883,
"connectTimeoutSec": 10,
"clientId": "thingsboard-publisher",
"appendClientIdSuffix": true,
"retainedMessage": false,
"cleanSession": true,
"ssl": true,
"parseToPlainText": false,
"protocolVersion": "MQTT_3_1_1",
"credentials": {
"type": "basic",
"username": "factory-user",
"password": "secure-password"
}
}
出站消息
出站消息与入站消息相同。因强制确认已禁用,发布成功后原始入站消息传递至下一节点。
结果
消息通过TLS安全连接和用户名/密码认证,发布到云MQTT broker的topic factory/sensors/temperature/sensor-warehouse-01。JSON payload原样发布。随后消息经 Success 连接路由。