产品定价 立即试用
PE MQTT Broker
文档 > 集成 > MQTT
入门
安装 架构 API 常见问题
目录

MQTT集成

TBMQ MQTT集成支持将消息转发至外部MQTT代理,实现与第三方平台的实时数据投递。适用于以下场景:

  • 需将TBMQ数据转发至外部MQTT代理或IoT平台。
  • 将TBMQ用作内部源与外部基于MQTT的系统之间的路由中间层。

数据流概述

MQTT集成按以下步骤处理消息并转发至外部MQTT代理或系统:

  1. 设备(客户端)发布MQTT消息到匹配集成主题过滤器的主题。
  2. TBMQ代理接收消息并转发至TBMQ Integration Executor。
  3. TBMQ Integration Executor处理消息,按需格式化并转发至外部MQTT代理或系统。
  4. 外部系统接收消息并按需处理数据。

image

前置条件

设置集成前,请确保:

  • 有正在运行的TBMQ实例
  • 有可发布MQTT消息的客户端(如TBMQ WebSocket Client)。
  • 有可接收MQTT消息的客户端(如TBMQ WebSocket Client)。

创建TBMQ MQTT集成

  1. 进入集成页面,点击 ”+” 创建新集成。
  2. 选择 MQTT 作为集成类型,点击下一步
  3. 主题过滤器中订阅主题 tbmq/mqtt-integration,点击下一步
  4. 配置步骤:
    • 输入 Hostlocalhost);
    • 输入 Port1883);
    • 将“动态主题名”设为 false,“主题名”设为 sensors/mqtt-integration
    • 将“凭证”类型设为 Basic,“用户名”设为 tbmq_websockets_username
  5. 点击添加保存集成。
文档警告图标

重要提示:当配置集成将消息发布回TBMQ时,请确保禁用”Dynamic topic name”,并确保发布主题与集成的主题过滤器不重叠,以防止产生无限消息循环

你可以使用”Check connection”按钮测试与配置的MQTT代理的连接。 这将创建一个MQTT客户端尝试连接到代理,然后立即断开连接。

主题过滤器

Topic filters 定义基于 MQTT 的订阅,并作为 TBMQ HTTP Integration 的触发器。当 broker 收到与 configured topic filters 匹配的消息时,integration 会处理并转发到指定外部系统。

若 integration 配置了如下 topic filter:

1
tbmq/devices/+/status

则匹配该模式的消息都会触发 integration,例如:

1
2
tbmq/devices/device-01/status
tbmq/devices/gateway-01/status

配置

字段 描述
仅发送消息负载 启用时,传入消息的负载按原样转发。禁用时,发送包含负载及其他属性的JSON对象。
Host MQTT代理主机。
Port MQTT代理端口。
Client ID 用于连接外部代理的客户端标识符。
Dynamic topic name 启用时,消息将使用传入消息的主题名进行转发。
Topic name Dynamic topic name禁用时,使用此主题名发送消息。
Credentials 支持的认证选项:
  Anonymous——无认证。
  Basic Authentication——使用UsernamePassword进行认证。
  PEM-based authentication——使用PEM证书进行认证。
  Enable SSL——使用SSL/TLS启用安全连接。
Keep alive (seconds) 表示代理和客户端在会话关闭前可以保持无通信的持续时间。
Connect timeout (seconds) 等待收到”CONNACK”的时间。
Reconnect period (seconds) 定义连接丢失时TBMQ应多久尝试重新连接一次。
Dynamic QoS 启用时,消息将使用传入消息的QoS进行转发。
QoS Dynamic QoS禁用时,可设置所需的QoS。
Dynamic retain 启用时,消息将使用传入消息的Retain标志进行转发。
Retain Dynamic retain禁用时,可将所需的Retain设置为”true”或”false”。
Metadata 可用于额外处理的自定义元数据。

事件

TBMQ 为 integration 相关事件提供日志,便于调试和排查 integration 行为。 以下为三种「Event」类型:

  • Lifecycle Events – 记录 StartedCreatedUpdatedStopped 等事件

  • Statistics – 提供 integration 性能洞察,包括已处理消息数和错误数

  • Errors – 记录与认证、超时、payload 格式或外部服务连通性相关的失败

发送上行消息

要发送消息,请按以下步骤操作:

  1. 进入WebSocket Client页面。
  2. 选择”WebSocket Default Connection”或任何其他可用的工作连接,然后点击Connect。确保”Connection status”显示为Connected
  3. 将”Topic”字段设置为tbmq/mqtt-integration以匹配集成的”Topic Filter”。
  4. 点击Send图标发布消息。
  5. 如果成功,”Messages”表中应出现两条新消息:
  • 一条由WebSocket Client发送。
  • 一条从MQTT Integration接收,消息负载类似于:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    
    {
     "payload": "eyJ0ZW1wZXJhdHVyZSI6MjV9",
     "topicName": "tbmq/mqtt-integration",
     "clientId": "tbmq_7QUvZzow",
     "eventType": "PUBLISH_MSG",
     "qos": 1,
     "retain": false,
     "tbmqIeNode": "tbmq_ie_node",
     "tbmqNode": "tbmq_node",
     "ts": 1742554969254,
     "props": {},
     "metadata": {
        "integrationName": "MQTT integration"
     }
    }
    

消息字段说明:

  • payload:MQTT消息的Base64编码内容(例如"eyJ0ZW1wZXJhdHVyZSI6MjV9"是JSON对象{"temperature": 25})。
  • topicName:消息发布到的MQTT主题。
  • clientId:发布消息的MQTT客户端ID。
  • eventType:MQTT事件类型,此处为发布消息(目前唯一支持的类型)。
  • qos:传入消息使用的服务质量等级。
  • retain:指示消息是否为MQTT保留消息。
  • tbmqIeNode:处理该消息的Integration Executor服务的节点ID。
  • tbmqNode:接收该消息的TBMQ代理的节点ID。
  • ts:消息接收时的时间戳(毫秒)。
  • props:MQTT5.0用户属性或其他MQTT属性。
  • metadata:从集成配置添加的额外元数据,例如处理该消息的集成名称,默认添加。