产品定价 立即试用
社区版
入门 文档 指南 安装 架构 API 常见问题

mqtt

向外部MQTT broker发布消息,使用QoS 1(至少一次)及可配置的保留设置。支持基于入站消息数据动态构建topic模式,及包括TLS/SSL加密在内的多种认证方式。

配置

Topic pattern

指定发布消息的MQTT主题。该字段为必填,支持 templatization 以根据入站消息data或metadata动态构造主题名。

Connection settings(连接设置)

  • Host — MQTT broker的主机名或IP地址。该字段为必填。
  • Port — MQTT broker的端口号。该字段为必填。
  • Connection timeout (sec) — 建立与MQTT broker连接时的最大等待时间(秒),超时则连接失败。该字段为必填。

Client ID

MQTT客户端的可选标识符。若留空,节点将自动生成Client ID,格式为 netty-mqtt/ 后接8位随机字母数字字符。

自动生成Client ID示例netty-mqtt/aB3dEf7G

Singleton mode and cluster deployment(单例模式与集群部署)

平台以集群(微服务)模式运行时,规则节点可在两种模式下运行:

  • Singleton mode enabled(默认)— 无论集群中有多少规则引擎实例,规则节点仅在其中一个实例上启动,即只有一个MQTT客户端连接。
  • Singleton mode disabled — 规则节点在所有规则引擎实例上启动。若有三个规则引擎实例,则会启动三个规则节点,建立三个MQTT客户端连接。

可通过节点配置界面中的 Singleton/Disabled 开关切换单例模式。

Multiple connections issue(多连接问题)

大多数MQTT broker不允许同一Client ID建立多个并发连接。当集群中多个规则节点实例使用相同Client ID尝试连接时,broker会拒绝重复的客户端连接,导致失败。

Solving the multiple connections issue(解决多连接问题)

有两种解决方案:

Option 1: Enable Singleton mode — 确保集群中仅启动一个规则节点实例,只建立一个broker客户端连接。由于仅有一个客户端,不会发生Client ID冲突。

Option 2: Enable Add Service ID as suffix to Client ID — 允许多个规则节点实例同时连接。启用时,Service ID会自动附加到指定的Client ID作为后缀。因每个规则引擎实例具有唯一Service ID,每个规则节点实例会创建具有唯一标识符的客户端。

示例:若Client ID为 my-client,三个规则引擎实例的Service ID分别为 tb-rule-engine-1tb-rule-engine-2tb-rule-engine-3,最终Client ID为:

  • my-client_tb-rule-engine-1
  • my-client_tb-rule-engine-2
  • my-client_tb-rule-engine-3

Client ID length restrictions(Client ID长度限制)

允许的最大Client ID长度取决于协议版本:

  • MQTT 3.1:最大23字符
  • MQTT 3.1.1和MQTT 5:最大256字符

使用 Add Service ID as suffix 时,请确保Client ID与Service ID后缀的总长度不超过上述限制。若生成的Client ID超出最大长度,节点将报错失败。

Message settings(消息设置)

Parse to plain text

控制发布前如何处理消息data:

  • Disabled(默认)— 消息data原样发布。
  • Enabled — 若消息data为JSON编码字符串(用双引号包裹),则移除外层JSON编码。

示例

消息data:"\"Temperature is 25.5°C\""

  • Parse to plain text OFF:发布 "\"Temperature is 25.5°C\""(带JSON编码)
  • Parse to plain text ON:发布 "Temperature is 25.5°C"(无外层引号)
文档信息图标

注意:此设置仅影响JSON编码的字符串(用双引号包裹的字符串)。普通JSON对象如 {"temperature":25.5} 无论如何设置均原样发布。

Quality of Service(服务质量)

所有消息均以 QoS 1 (AT_LEAST_ONCE) 发布。这保证消息至少送达broker一次,网络异常时可能多次送达。QoS等级不可配置。

Retained(保留)

  • Disabled(默认)— broker不保留消息。
  • Enabled — broker保留消息并向topic的后续订阅者投递,即使他们在消息发布后才订阅。

保留消息适用于发布设备状态或配置,以便新订阅者订阅时立即收到。

Protocol version(协议版本)

选择连接使用的MQTT协议版本。可用选项:

  • MQTT 3.1 — 旧版,Client ID限制23字符
  • MQTT 3.1.1 — 最广泛支持的版本(默认)
  • MQTT 5 — 最新版本,功能增强
文档信息图标

注意:请确认MQTT broker支持所选协议版本。大多数现代broker支持MQTT 3.1.1和MQTT 5。

Session settings(会话设置)

Clean session

  • Enabled(默认)— broker不为客户端存储任何会话状态。客户端重连时从干净状态开始。
  • Disabled — broker为客户端存储会话状态及未投递的QoS 1、QoS 2消息。客户端重连时可接收断开期间队列中的消息。

因本规则节点仅发布消息且不订阅任何topic,通常应保持clean session启用。

Security settings(安全设置)

Enable SSL

  • Disabled(默认)— 连接使用明文TCP,无加密。
  • Enabled — 连接使用TLS/SSL加密。启用时需配置凭据。

Credentials(凭据)

连接MQTT broker的认证凭据。可用凭据类型:

Anonymous(匿名)

无需认证。当MQTT broker允许匿名连接时使用。

Basic(基本)

用户名与密码认证。

配置

  • Username — MQTT broker认证用户名
  • Password — MQTT broker认证密码

PEM Certificate(PEM证书)

使用PEM编码文件的证书认证。通过双向TLS提供最高安全级别。

配置

  • Server CA certificate file — 签署broker证书的CA证书,用于验证broker身份。
  • Client certificate file — 客户端公钥证书,发送给broker用于客户端认证。
  • Client private key file — 与客户端证书对应的客户端私钥。
  • Private key password — 私钥文件加密时的可选密码。
文档信息图标

注意:使用PEM凭据时,至少需提供Server CA证书文件,或同时提供Client证书与Client私钥文件。

文档信息图标

注意:证书和密钥文件可直接上传,或从 Secrets storage 引用以增强安全性。

Advanced settings(高级设置)

Force acknowledgement(强制确认)

强制确认机制由环境变量 ACTORS_RULE_EXTERNAL_NODE_FORCE_ACK 控制。该变量设为 true 时,适用于包括本MQTT节点在内的所有外部节点。

启用强制确认时的行为

  • 入站消息立即被确认并创建副本
  • 执行MQTT发布操作
  • 发布完成后,消息副本加入队列供下一节点处理
  • 可避免慢速MQTT broker导致的消息处理超时

禁用强制确认时(默认):

  • 原始入站消息在MQTT发布完成前保持占用
  • 随后将消息传递至下一节点

MQTT retransmission(MQTT重传)

节点使用平台内部MQTT客户端,包含重传机制以提高QoS 1消息的可靠性。发送PUBLISH消息后,客户端等待broker确认。若在可配置的延迟时间内未收到确认,则重传消息。

重传间隔采用带抖动的指数退避策略:

  • 延迟从初始值开始,每次重试翻倍
  • 抖动因子引入随机偏差(±百分比)以避免多客户端同步重试

示例:最大尝试3次、初始延迟5,000 ms、抖动因子0.15时,重传大致发生在:

  • 5,000 ms(±15%)
  • 10,000 ms(±15%)
  • 20,000 ms(±15%)

若所有重试后仍未收到确认,消息将被丢弃并经 Failure 连接路由并附带适当错误信息。

配置

重传参数在 thingsboard.yml 中全局配置,适用于平台上所有MQTT客户端:

1
2
3
4
5
6
mqtt:
  client:
    retransmission:
      max_attempts: "${TB_MQTT_CLIENT_RETRANSMISSION_MAX_ATTEMPTS:3}"
      initial_delay_millis: "${TB_MQTT_CLIENT_RETRANSMISSION_INITIAL_DELAY_MILLIS:5000}"
      jitter_factor: "${TB_MQTT_CLIENT_RETRANSMISSION_JITTER_FACTOR:0.15}"

JSON Schema

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "title": "TbMqttNodeConfiguration",
  "type": "object",
  "properties": {
    "topicPattern": {
      "type": "string",
      "minLength": 1,
      "description": "MQTT topic where messages will be published (supports templatization)."
    },
    "host": {
      "type": "string",
      "minLength": 1,
      "description": "Hostname or IP address of the MQTT broker."
    },
    "port": {
      "type": "integer",
      "minimum": 1,
      "maximum": 65535,
      "description": "Port number of the MQTT broker."
    },
    "connectTimeoutSec": {
      "type": "integer",
      "minimum": 1,
      "description": "Maximum time to wait for connection establishment (seconds)."
    },
    "clientId": {
      "type": "string",
      "description": "Optional MQTT client identifier. Leave empty for auto-generated ID."
    },
    "appendClientIdSuffix": {
      "type": "boolean",
      "description": "Whether to append Service ID as suffix to Client ID."
    },
    "retainedMessage": {
      "type": "boolean",
      "description": "Whether the broker should retain the message for future subscribers."
    },
    "cleanSession": {
      "type": "boolean",
      "description": "Whether to start with a clean session (no stored state)."
    },
    "ssl": {
      "type": "boolean",
      "description": "Whether to use TLS/SSL encryption."
    },
    "parseToPlainText": {
      "type": "boolean",
      "description": "Whether to convert JSON message data to plain text."
    },
    "protocolVersion": {
      "type": "string",
      "enum": [
        "MQTT_3_1",
        "MQTT_3_1_1",
        "MQTT_5"
      ],
      "description": "MQTT protocol version to use."
    },
    "credentials": {
      "type": "object",
      "description": "Authentication credentials for the MQTT broker."
    }
  },
  "required": [
    "topicPattern",
    "host",
    "port",
    "connectTimeoutSec",
    "cleanSession",
    "ssl",
    "retainedMessage",
    "parseToPlainText",
    "protocolVersion",
    "credentials"
  ],
  "additionalProperties": false
}

规则节点初始化

规则节点初始化时会建立与MQTT broker的连接:

  1. 节点使用配置的 CredentialsHostPort 处连接broker。
  2. 连接使用配置的 Protocol versionClean session 设置。
  3. 若启用 SSL,建立TLS安全连接。
  4. 连接须在 Connection timeout 规定时间内完成,否则初始化失败。

连接建立后,在规则节点生命周期内保持打开,随时可发布消息。

消息处理

对每条入站消息,节点执行以下步骤:

  1. 若启用 Force acknowledgement,入站消息立即被确认并创建副本。
  2. 节点处理 Topic pattern,用入站消息data和metadata中的值替换模板,构建最终MQTT topic。
  3. 若启用 Parse to plain text,处理消息data以移除外层JSON编码。
  4. 节点将消息payload发布到MQTT broker:
    • 消息以 QoS 1 (AT_LEAST_ONCE) 发布到构建的topic。
    • 按配置应用 Retained 标志。
  5. 发布操作完成时:
    • 成功:原始消息(或启用强制确认时的消息副本)经 Success 连接转发。
    • 失败:在消息metadata的 error 键下添加错误详情,消息经 Failure 连接转发。

规则节点关闭

规则节点关闭时,会断开与MQTT broker的连接并释放相关资源。

关闭发生在以下场景:

  • Rule node configuration is updated — 节点被销毁并使用新配置重新初始化。
  • Rule node is deleted — 节点被销毁且不重新初始化。
文档信息图标

注意:若规则引擎实例崩溃或被强制终止(如SIGTERM、SIGKILL),不会执行关闭流程。

出站消息格式

出站消息格式取决于 Force acknowledgement 设置:

禁用强制确认时(默认):

  • 出站消息为原始入站消息
  • 消息data、metadata、originator和type保持不变

启用强制确认时

  • 出站消息为原始入站消息的副本
  • 副本为独立消息实例,但包含相同的data、metadata、originator和type

失败时

  • 在消息metadata的 error 键下添加错误详情,格式为:ExceptionClass: error message
  • 其他消息属性保持不变

输出连接

  • Success
    • 消息已成功发布到MQTT broker。
  • Failure
    • 因网络问题或broker错误导致发布失败。
    • 处理过程中发生意外错误。

示例

示例1 — 向云MQTT broker发布telemetry

温度传感器发送需转发到云MQTT broker的telemetry数据。消息按设备类型和名称发布到对应topic。强制确认已禁用。

入站消息

Originator:DEVICE(Temperature Sensor)

Metadata:

1
2
3
4
5
{
  "deviceType": "temperature",
  "deviceName": "sensor-warehouse-01",
  "ts": 1672531200000
}

Data:

1
2
3
4
{
  "temperature": 22.5,
  "humidity": 65
}

节点配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
{
  "topicPattern": "factory/sensors/${deviceType}/${deviceName}",
  "host": "mqtt.cloud-provider.com",
  "port": 8883,
  "connectTimeoutSec": 10,
  "clientId": "thingsboard-publisher",
  "appendClientIdSuffix": true,
  "retainedMessage": false,
  "cleanSession": true,
  "ssl": true,
  "parseToPlainText": false,
  "protocolVersion": "MQTT_3_1_1",
  "credentials": {
    "type": "basic",
    "username": "factory-user",
    "password": "secure-password"
  }
}

出站消息

出站消息与入站消息相同。因强制确认已禁用,发布成功后原始入站消息传递至下一节点。

结果

消息通过TLS安全连接和用户名/密码认证,发布到云MQTT broker的topic factory/sensors/temperature/sensor-warehouse-01。JSON payload原样发布。随后消息经 Success 连接路由。