产品定价 立即试用
云平台
欧洲地区
文档 > 集成 > MQTT
入门
指南 API 常见问题
目录

MQTT集成

文档信息图标
ThingsBoard PE 功能

专业版支持Platform Integrations功能。
请使用ThingsBoard Cloud自行安装平台实例。

MQTT集成用于连接外部MQTT broker,订阅其数据流并将设备各类负载转换为ThingsBoard消息格式。典型场景是设备已接入外部MQTT broker或采用MQTT后端的物联网平台/连接提供商。

请参阅集成示意图了解更多。

ThingsBoard MQTT集成作为MQTT客户端运行。 订阅主题并将数据转换为遥测与属性更新。 对于下行消息,MQTT集成会转换为设备适用格式并推送到外部MQTT broker。 注意:MQTT broker需与ThingsBoard实例同机部署或部署在云端并具有有效DNS名或静态IP。运行在云端的ThingsBoard实例无法连接到部署在局域网中的MQTT broker。

MQTT集成配置

本教程将配置MQTT集成,使设备可连接平台并能向设备发送RPC命令。

前置条件

本教程将使用:

  • ThingsBoard专业版实例 — eu.thingsboard.cloud

  • ThingsBoard PE可访问的MQTT broker — broker.hivemq.com(端口1883);
  • mosquitto_pub和mosquitto_sub客户端用于收发消息;
  • 用于RPC仿真的高级设备模拟器

假设我们有一台正在发送当前温度读数的传感器。 传感器设备SN-001将温度读数发布到’tb/mqtt-integration-tutorial/sensors/SN-001/temperature‘,并订阅’tb/mqtt-integration-tutorial/sensors/SN-001/rx‘接收RPC调用。

ThingsBoard配置

配置MQTT集成前,需创建上行和下行转换器。

上行转换器用于解析和转换MQTT集成接收的数据。

下行转换器解析并转换ThingsBoard发送的数据,使之符合现有设备的格式。

上行转换器

解码函数用于将入站数据和metadata解析为ThingsBoard可处理的格式。 deviceNamedeviceType为必填,attributestelemetry可选。 Attributestelemetry为扁平键值对象,不支持嵌套。

创建上行转换器:进入Integrations center -> Data converters页面,点击“plus”。 命名为“MQTT Uplink Converter”,选择类型Uplink。暂时启用debug mode

调试模式

启用调试模式可追踪与integrations执行相关的事件、状态及潜在错误,便于开发和排障。

文档信息图标

注意:调试模式可能迅速增加磁盘占用,因为所有调试事件都会存入数据库。 自ThingsBoard 3.9起,平台仅在integrations创建后的前15分钟内存储完整调试事件,之后仅保留错误事件。

调试模式设置可组合使用或完全关闭。

可使用 TBEL(TBEL)或 JavaScript 开发用户自定义函数。 建议使用 TBEL,其在ThingsBoard 中的执行效率远高于 JS。

请将以下脚本复制并粘贴到 Decoder function 区域:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/** Decoder **/

// decode payload to string
var payloadStr = decodeToString(payload);
var data = JSON.parse(payloadStr);

var deviceName =  metadata.topic.split("/")[3];
// decode payload to JSON
var deviceType = 'sensor';

// Result object with device attributes/telemetry data
var result = {
    deviceName: deviceName,
    deviceType: deviceType,
    attributes: {
        integrationName: metadata['integrationName'],
    },
    telemetry: {
        temperature: data.value,
    }
};

/** Helper functions 'decodeToString' and 'decodeToJson' are already built-in **/

return result;

image

请将以下脚本复制并粘贴到 Decoder function 区域:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/** Decoder **/

// decode payload to string
var payloadStr = decodeToString(payload);
var data = JSON.parse(payloadStr);

var deviceName =  metadata.topic.split("/")[3];
// decode payload to JSON
var deviceType = 'sensor';

// Result object with device attributes/telemetry data
var result = {
    deviceName: deviceName,
    deviceType: deviceType,
    attributes: {
        integrationName: metadata['integrationName'],
    },
    telemetry: {
        temperature: data.value,
    }
};

/** Helper functions **/

function decodeToString(payload) {
    return String.fromCharCode.apply(String, payload);
}

function decodeToJson(payload) {
    // convert payload to string.
    var str = decodeToString(payload);

    // parse string to JSON
    var data = JSON.parse(str);
    return data;
}

return result;

image

下行转换器

下行转换器转换出站RPC消息,随后由集成发送到外部MQTT broker。

文档信息图标

注意:
即使不发送下行RPC,也需创建占位下行转换器。

创建另一转换器,命名为“MQTT Downlink Converter”,类型Downlink。保留默认脚本,点击Add

可使用 TBEL(TBEL)或 JavaScript 开发用户自定义函数。 建议使用 TBEL,其在ThingsBoard 中的执行效率远高于 JS。

请将以下脚本复制并粘贴到 Encoder function 区域:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// Encode downlink data from incoming Rule Engine message

// msg - JSON message payload downlink message json
// msgType - type of message, for ex. 'ATTRIBUTES_UPDATED', 'POST_TELEMETRY_REQUEST', etc.
// metadata - list of key-value pairs with additional data about the message
// integrationMetadata - list of key-value pairs with additional data defined in Integration executing this converter

/** Encoder **/

var data = {};

// Process data from incoming message and metadata

data.tempFreq = msg.temperatureUploadFrequency;
data.humFreq = msg.humidityUploadFrequency;

data.devSerialNumber = metadata['ss_serialNumber'];

// Result object with encoded downlink payload
var result = {

    // downlink data content type: JSON, TEXT or BINARY (base64 format)
    contentType: "JSON",

    // downlink data
    data: JSON.stringify(data),

    // Optional metadata object presented in key/value format
    metadata: {
        topic: metadata['deviceType']+'/'+metadata['deviceName']+'/upload'
    }

};

return result;

image

请将以下脚本复制并粘贴到 Encoder function 区域:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// Encode downlink data from incoming Rule Engine message

// msg - JSON message payload downlink message json
// msgType - type of message, for ex. 'ATTRIBUTES_UPDATED', 'POST_TELEMETRY_REQUEST', etc.
// metadata - list of key-value pairs with additional data about the message
// integrationMetadata - list of key-value pairs with additional data defined in Integration executing this converter

/** Encoder **/

var data = {};

// Process data from incoming message and metadata

data.tempFreq = msg.temperatureUploadFrequency;
data.humFreq = msg.humidityUploadFrequency;

data.devSerialNumber = metadata['ss_serialNumber'];

// Result object with encoded downlink payload
var result = {

    // downlink data content type: JSON, TEXT or BINARY (base64 format)
    contentType: "JSON",

    // downlink data
    data: JSON.stringify(data),

    // Optional metadata object presented in key/value format
    metadata: {
        topic: metadata['deviceType']+'/'+metadata['deviceName']+'/upload'
    }

};

return result;

image

MQTT集成配置

  • 进入Integrations center -> Integrations页面,点击“plus”添加新集成。命名为“MQTT Integration”,选择类型MQTT

image

  • 下一步添加刚创建的uplinkdownlink转换器;

image

image

  • 在连接步骤中指定hostbroker.hivemq.comport1883

  • 添加topic filter

1
tb/mqtt-integration-tutorial/sensors/+/temperature
  • 也可选择MQTT QoS级别。默认使用MQTT QoS 0(At most once);

image

  • 进入advanced settings。建议取消勾选Clean session。多数broker不支持粘性会话,启用该选项可能导致静默断开;

  • 保持Downlink topic pattern为默认,即集成将使用metadata.topic作为下行topic;

image

  • [可选] 点击Check connection检查与Service Bus topic的连接。点击Add创建集成。

image

发送上行消息

模拟设备向集成发送温度读数。

在终端发送简单格式的温度读数消息{"value":25.1}

1
mosquitto_pub -h broker.hivemq.com -p 1883 -t "tb/mqtt-integration-tutorial/sensors/SN-001/temperature" -m '{"value":25.1}'

image

进入Device Groups -> All后,应能看到由集成配置的SN-001设备。 点击设备,进入Latest Telemetry标签,可看到”temperature”键及其值(25.1)。

image

返回Integrations页面,进入Events标签,可看到MQTT Integration消费的消息。

image

MQTT Uplink ConverterEvents标签中有“In”“Out”“Metadata”列。 “In”和“Metadata”为数据转换器输入,“Out”为输出。

image


总结:上行数据转换器定义设备配置及输入数据解读方式。 本示例从topic中提取设备名(SN-001),设置默认设备类型(sensor)并填入遥测值。 更复杂场景可编写脚本,从data或metadata任意部分获取数据。

向设备发送单向RPC

本节说明如何使用控制部件向设备发送单向RPC请求。

  • 进入Dashboards页面,创建名为MQTT RPC的新仪表板并打开,点击Entity aliases图标添加别名;
  • 命名别名(如Sensor),选择筛选类型Single Entity、类型Device,选择SN-001传感器。点击AddSave
  • 添加新部件,从下拉菜单选择Control Widgets包,选择Knob Control部件;
  • Data字段选择已创建的别名(Sensor)。将Number of digits after floating point设为0;
  • 进入Advanced标签,将Minimum value设为15、Maximum value设为45。其余保持默认。点击Add创建部件;
  • 保存更改。

进入Rule Chains页面,打开Root Rule Chain。双击message type switch节点并启用其Debug mode

返回仪表板,旋转旋钮几次。

image

message type switch节点的Events标签中应能看到消息类型为RPC_CALL_FROM_SERVER_TO_DEVICE、关系类型为RPC Request to Device的入站消息。可查看Knob Control发送给规则引擎的data和metadata。

要使传感器执行该命令,需将RPC Request to Device类型消息转发到下行数据转换器。 在Root Rule Chain编辑器中找到integration downlink节点,拖放到规则链。命名为MQTT Integration Downlink,选择MQTT Integration,点击Add。 从Message Type Switch拖拽连线到MQTT Integration Downlink,标签为”RPC Request to Device”,点击add。

进入Data converters页面,打开“MQTT Downlink Converter”,将默认脚本替换为以下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/** 编码器 **/

var value = parseInt(msg.params.replaceAll("[\"]",""));
var data = {value: value};
// 包含编码下行负载的结果对象
var result = {

    // 下行数据内容类型:JSON、TEXT 或 BINARY(base64 格式)
    contentType: "JSON",

    // 下行数据
    data: JSON.stringify(data),

    // 以键/值格式呈现的可选元数据对象
    metadata: {
        topic: 'tb/mqtt-integration-tutorial/sensors/'+metadata['deviceName']+'/rx'
    }

};

return result;

image

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/** 编码器 **/

var value = parseInt(msg.params.replace(/"/g,""));
var data = {value: value};
// 包含编码下行负载的结果对象
var result = {

    // 下行数据内容类型:JSON、TEXT 或 BINARY(base64 格式)
    contentType: "JSON",

    // 下行数据
    data: JSON.stringify(data),

    // 以键/值格式呈现的可选元数据对象
    metadata: {
        topic: 'tb/mqtt-integration-tutorial/sensors/'+metadata['deviceName']+'/rx'
    }

};

return result;

image

上述脚本从msg.params值中移除引号(原为带引号的字符串)并解析为整数,然后构建传给集成的结果对象。 结果对象结构需满足:data(原样发送到外部MQTT broker的消息负载)和metadata(供集成使用)。如前所述,集成下行topic配置为${topic},即集成将使用metadata.topic作为下行topic。

打开终端执行:

1
mosquitto_sub -h broker.hivemq.com -p 1883 -t "tb/mqtt-integration-tutorial/sensors/+/rx"

image

返回仪表板再次旋转旋钮。终端中应能看到旋钮控制部件发送的一系列入站消息:

1
2
{"value":33}
{"value":42}

image

image

模拟双向RPC

模拟向设备发送RPC请求并接收响应。

首先修改转换器,使下行消息发送到topic ‘tb/mqtt-integration-tutorial/sensors/+/rx/twoway‘,设备响应接收自topic tb/mqtt-integration-tutorial/sensors/+/rx/response

修改下行转换器,使消息发送到topic ‘tb/mqtt-integration-tutorial/sensors/+/rx/twoway‘。打开“MQTT Downlink Converter”,将第16行改为

1
topic: 'tb/mqtt-integration-tutorial/sensors/'+metadata['deviceName']+'/rx/twoway'

或将下列代码粘贴到编码器窗口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/** 编码器 **/

var value = parseInt(msg.params.replaceAll("[\"]",""));
var data = {value: value};
// 包含编码下行负载的结果对象
var result = {

    // 下行数据内容类型:JSON、TEXT 或 BINARY(base64 格式)
    contentType: "JSON",

    // 下行数据
    data: JSON.stringify(data),

    // 以键/值格式呈现的可选元数据对象
    metadata: {
        topic: 'tb/mqtt-integration-tutorial/sensors/'+metadata['deviceName']+'/rx/twoway'
    }

};

return result;

image

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/** 编码器 **/

var value = parseInt(msg.params.replace(/"/g,""));
var data = {value: value};
// 包含编码下行负载的结果对象
var result = {

    // 下行数据内容类型:JSON、TEXT 或 BINARY(base64 格式)
    contentType: "JSON",

    // 下行数据
    data: JSON.stringify(data),

    // 以键/值格式呈现的可选元数据对象
    metadata: {
        topic: 'tb/mqtt-integration-tutorial/sensors/'+metadata['deviceName']+'/rx/twoway'
    }

};

return result;

image

然后配置上行转换器以接收响应消息。打开“MQTT Uplink Converter”,将下列代码粘贴到解码器窗口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/** 解码器 **/

// 将 payload 解码为字符串
var payloadStr = decodeToString(payload);
var data = JSON.parse(payloadStr);
var deviceName =  metadata.topic.split("/")[3];
// 将 payload 解码为 JSON
var deviceType = 'sensor';

// 包含设备属性/遥测数据的结果对象
var telemetry;
if (metadata.topic.endsWith('/temperature')) {
    // 按之前的方式转换传入数据
    telemetry = getTemperatureTelemetry(data);
} else if (metadata.topic.endsWith('/rx/response')) {
    // 按原样获取输入值
    telemetry = data;
}

var result = {
    deviceName: deviceName,
    deviceType: deviceType,
    attributes: {
        integrationName: metadata['integrationName'],
    },
    telemetry: telemetry
};

/** 辅助函数 'decodeToString' 和 'decodeToJson' 已内置 **/

return result;

image

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/** 解码器 **/

// 将 payload 解码为字符串
var payloadStr = decodeToString(payload);
var data = JSON.parse(payloadStr);
var deviceName =  metadata.topic.split("/")[3];
// 将 payload 解码为 JSON
var deviceType = 'sensor';

// 包含设备属性/遥测数据的结果对象
var telemetry;
if (metadata.topic.endsWith('/temperature')) {
    // 按之前的方式转换传入数据
    telemetry = getTemperatureTelemetry(data);
} else if (metadata.topic.endsWith('/rx/response')) {
    // 按原样获取输入值
    telemetry = data;
}

var result = {
    deviceName: deviceName,
    deviceType: deviceType,
    attributes: {
        integrationName: metadata['integrationName'],
    },
    telemetry: telemetry
};

/** 辅助函数 **/

function getTemperatureTelemetry(data) {
    return {temperature: data.value}
}

function decodeToString(payload) {
    return String.fromCharCode.apply(String, payload);
}

function decodeToJson(payload) {
    // 将 payload 转换为字符串
    var str = decodeToString(payload);

    // 将字符串解析为 JSON
    var data = JSON.parse(str);
    return data;
}

return result;

image

上述脚本与初始版本略有不同,可区分Post Telemetry请求和RPC调用响应,向规则引擎发布不同类型输出。

还需在集成中添加topic filter以接收RPC响应消息:MQTT Integration -> Topic filters -> Add topic filter。 使用默认QoS添加该topic:

1
tb/mqtt-integration-tutorial/sensors/+/rx/response

应用更改。

image

运行设备模拟器。mosquitto_pubmosquitto_sub不足以完成本示例,请启动高级模拟器

1
python mqtt-client.py

image

在仪表板上旋转旋钮。终端中应有类似输出:

1
2
3
4
5
Incoming message
Topic: tb/mqtt-integration-tutorial/sensors/SN-001/rx/twoway
Message: {"value":40}
This is a Two-way RPC call. Going to reply now!
Sending a response message: {"rpcReceived":"OK"}

image

进入Devices页面,在SN-001设备的Latest telemetry标签中可看到rpcReceived遥测值为”OK“。

image

MQTT重传机制

MQTT集成使用ThingsBoard内置MQTT客户端。

ThingsBoard内置MQTT客户端包含重传机制,用于提高需要确认的消息类型的可靠性。 该机制适用于以下MQTT消息类型:

  • PUBLISH(仅QoS1或2)
  • SUBSCRIBE
  • UNSUBSCRIBE
  • PUBREL(仅QoS2)

发送上述任一类消息后,客户端会在可配置的延迟内等待确认。 若未收到确认,将重传消息。重传间隔采用指数退避策略,从初始延迟开始,每次翻倍。 抖动因子会在延迟中加入随机波动,以减少集中重试。

例如,若将重传配置为三次尝试初始延迟5,000ms抖动因子0.15,则重传大约会在5,000ms10,000ms20,000ms触发,每次延迟会因±15%抖动而略有变化。 若最后一次尝试后仍未收到确认,系统会按指数退避加抖动计算下一次延迟,在等待完成后将消息视为无法送达并丢弃。

MQTT消息被丢弃时,对应规则引擎消息将通过Failure链路由,并带有相应异常信息。

视频教程

配置MQTT集成

本视频为MQTT集成的分步配置教程。


配置向设备发送RPC请求

可通过本视频了解如何配置向设备发送RPC请求,并通过MQTT集成接收模拟响应。


下一步