TBMQ MQTT集成支持将消息转发至外部MQTT代理,实现与第三方平台的实时数据投递。适用于以下场景:
- 需将TBMQ数据转发至外部MQTT代理或IoT平台。
- 将TBMQ用作内部源与外部基于MQTT的系统之间的路由中间层。
数据流概述
MQTT集成按以下步骤处理消息并转发至外部MQTT代理或系统:
- 设备(客户端)发布MQTT消息到匹配集成主题过滤器的主题。
- TBMQ代理接收消息并转发至TBMQ Integration Executor。
- TBMQ Integration Executor处理消息,按需格式化并转发至外部MQTT代理或系统。
- 外部系统接收消息并按需处理数据。

前置条件
设置集成前,请确保:
- 有正在运行的TBMQ实例。
- 有可发布MQTT消息的客户端(如TBMQ WebSocket Client)。
- 有可接收MQTT消息的客户端(如TBMQ WebSocket Client)。
创建TBMQ MQTT集成
- 进入集成页面,点击 ”+” 创建新集成。
- 选择 MQTT 作为集成类型,点击下一步。
- 在主题过滤器中订阅主题
tbmq/mqtt-integration,点击下一步。 - 在配置步骤:
- 输入 Host(
localhost); - 输入 Port(
1883); - 将“动态主题名”设为
false,“主题名”设为sensors/mqtt-integration; - 将“凭证”类型设为
Basic,“用户名”设为tbmq_websockets_username;
- 输入 Host(
- 点击添加保存集成。
你可以使用”Check connection”按钮测试与配置的MQTT代理的连接。 这将创建一个MQTT客户端尝试连接到代理,然后立即断开连接。
主题过滤器
Topic filters 定义基于 MQTT 的订阅,并作为 TBMQ HTTP Integration 的触发器。当 broker 收到与 configured topic filters 匹配的消息时,integration 会处理并转发到指定外部系统。
若 integration 配置了如下 topic filter:
1
tbmq/devices/+/status
则匹配该模式的消息都会触发 integration,例如:
1
2
tbmq/devices/device-01/status
tbmq/devices/gateway-01/status
配置
| 字段 | 描述 |
|---|---|
| 仅发送消息负载 | 启用时,传入消息的负载按原样转发。禁用时,发送包含负载及其他属性的JSON对象。 |
| Host | MQTT代理主机。 |
| Port | MQTT代理端口。 |
| Client ID | 用于连接外部代理的客户端标识符。 |
| Dynamic topic name | 启用时,消息将使用传入消息的主题名进行转发。 |
| Topic name | 当Dynamic topic name禁用时,使用此主题名发送消息。 |
| Credentials | 支持的认证选项: |
| Anonymous——无认证。 | |
Basic Authentication——使用Username和Password进行认证。 |
|
| PEM-based authentication——使用PEM证书进行认证。 | |
| Enable SSL——使用SSL/TLS启用安全连接。 | |
| Keep alive (seconds) | 表示代理和客户端在会话关闭前可以保持无通信的持续时间。 |
| Connect timeout (seconds) | 等待收到”CONNACK”的时间。 |
| Reconnect period (seconds) | 定义连接丢失时TBMQ应多久尝试重新连接一次。 |
| Dynamic QoS | 启用时,消息将使用传入消息的QoS进行转发。 |
| QoS | 当Dynamic QoS禁用时,可设置所需的QoS。 |
| Dynamic retain | 启用时,消息将使用传入消息的Retain标志进行转发。 |
| Retain | 当Dynamic retain禁用时,可将所需的Retain设置为”true”或”false”。 |
| Metadata | 可用于额外处理的自定义元数据。 |
事件
TBMQ 为 integration 相关事件提供日志,便于调试和排查 integration 行为。 以下为三种「Event」类型:
-
Lifecycle Events – 记录
Started、Created、Updated、Stopped等事件 -
Statistics – 提供 integration 性能洞察,包括已处理消息数和错误数
-
Errors – 记录与认证、超时、payload 格式或外部服务连通性相关的失败
Lifecycle Events – 记录 Started、Created、Updated、Stopped 等事件
Statistics – 提供 integration 性能洞察,包括已处理消息数和错误数
Errors – 记录与认证、超时、payload 格式或外部服务连通性相关的失败
发送上行消息
要发送消息,请按以下步骤操作:
- 进入WebSocket Client页面。
- 选择”WebSocket Default Connection”或任何其他可用的工作连接,然后点击Connect。确保”Connection status”显示为
Connected。 - 将”Topic”字段设置为
tbmq/mqtt-integration以匹配集成的”Topic Filter”。 - 点击Send图标发布消息。
- 如果成功,”Messages”表中应出现两条新消息:
- 一条由WebSocket Client发送。
- 一条从MQTT Integration接收,消息负载类似于:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
{ "payload": "eyJ0ZW1wZXJhdHVyZSI6MjV9", "topicName": "tbmq/mqtt-integration", "clientId": "tbmq_7QUvZzow", "eventType": "PUBLISH_MSG", "qos": 1, "retain": false, "tbmqIeNode": "tbmq_ie_node", "tbmqNode": "tbmq_node", "ts": 1742554969254, "props": {}, "metadata": { "integrationName": "MQTT integration" } }
消息字段说明:
- payload:MQTT消息的Base64编码内容(例如
"eyJ0ZW1wZXJhdHVyZSI6MjV9"是JSON对象{"temperature": 25})。 - topicName:消息发布到的MQTT主题。
- clientId:发布消息的MQTT客户端ID。
- eventType:MQTT事件类型,此处为发布消息(目前唯一支持的类型)。
- qos:传入消息使用的服务质量等级。
- retain:指示消息是否为MQTT保留消息。
- tbmqIeNode:处理该消息的Integration Executor服务的节点ID。
- tbmqNode:接收该消息的TBMQ代理的节点ID。
- ts:消息接收时的时间戳(毫秒)。
- props:MQTT5.0用户属性或其他MQTT属性。
- metadata:从集成配置添加的额外元数据,例如处理该消息的集成名称,默认添加。