向 RabbitMQ exchange发布消息,入站消息data作为消息体发送。
配置
Exchange and routing configuration(Exchange与路由配置)
定义目标RabbitMQ exchange及路由行为。
- Exchange name pattern — 发布消息的RabbitMQ exchange名称。留空则发布到默认exchange。 支持基于消息data和metadata的模板以动态选择exchange。
- Routing key pattern — 在exchange内路由消息的routing key。对direct和topic类型exchange,决定哪些queue接收消息。 支持基于消息data和metadata的模板以动态路由。若不需要routing key(如fanout exchange)可留空。
Message properties(消息属性)
附加到发布消息的预定义AMQP消息属性。消息属性控制投递模式、内容类型和持久化。
可用选项:
- BASIC — 非持久化,无特定内容类型
- TEXT_PLAIN — 非持久化,text/plain内容类型
- MINIMAL_BASIC — 最小非持久化属性
- MINIMAL_PERSISTENT_BASIC — 最小持久化属性(消息在broker重启后保留)
- PERSISTENT_BASIC — 持久化,无特定内容类型
- PERSISTENT_TEXT_PLAIN — 持久化,text/plain内容类型
Leave empty for no predefined properties.
Connection configuration
Configure the connection to your RabbitMQ broker.
- Host – The hostname or IP address of the RabbitMQ server.
- Port – The port number for the AMQP connection. Default:
5672(standard AMQP port). - Virtual host – The RabbitMQ virtual host to connect to. Default:
/(the default virtual host). - Username – The username for authentication to the RabbitMQ broker. Default:
guest. - Password – The password for authentication to the RabbitMQ broker. Default:
guest.
Advanced settings(高级设置)
配置连接行为及自定义客户端属性。
- Automatic recovery — 网络故障后自动恢复连接和拓扑(exchanges、queues、bindings)。默认:
false。启用后,RabbitMQ客户端在连接丢失时会自动尝试重连并恢复拓扑。 - Connection timeout (ms) — 建立与RabbitMQ broker连接的超时时间(毫秒)。默认:
60000(60秒)。 -
Handshake timeout (ms) — TCP连接建立后完成AMQP握手的超时时间(毫秒)。默认:
10000(10秒)。 - Client properties — 连接时发送给RabbitMQ broker的自定义键值对。键和值均以字符串形式发送。
JSON Schema
输出消息格式
无论成功或失败,节点均保留原始消息data和metadata。失败时会在消息metadata中添加错误详情。
Success case(成功时)
消息成功发布到RabbitMQ时,出站消息原样透传。原始消息data和metadata保持接收时的状态。
Failure case(失败时)
发布失败时,在出站消息metadata中添加以下字段:
error— 以class <ExceptionClass>: <error message>格式包含异常类名和错误信息
示例:
原始消息metadata:
1
2
3
{
"deviceType": "sensor"
}
失败后(如连接超时),metadata变为:
1
2
3
4
{
"deviceType": "sensor",
"error": "class java.net.SocketTimeoutException: connect timed out"
}
消息data保持不变。
消息确认行为
节点的消息确认行为由环境变量 ACTORS_RULE_EXTERNAL_NODE_FORCE_ACK 控制:
- 设为
true— 入站消息在接收后立即被确认并标记为已成功处理。创建包含更新metadata的新消息并加入队列供下一节点处理。 - 设为
false(默认)— 入站消息在整个RabbitMQ发布操作期间保持处理中状态。消息就地转换,若发生错误则更新metadata,发布完成后将修改后的消息传递至下一节点。
消息处理算法
- 节点构建RabbitMQ发布请求:
- If the exchange name pattern is configured, templates are resolved using values from the incoming message data and metadata. If empty, the default exchange is used.
- If the routing key pattern is configured, templates are resolved using values from the incoming message data and metadata.
- If message properties are specified, the corresponding AMQP BasicProperties are retrieved.
- The message data is converted to bytes using UTF-8 encoding.
- The publish operation is executed asynchronously:
- The message is published to the specified exchange with the resolved routing key.
- If message properties are configured, they are included in the publish.
- When RabbitMQ accepts the message successfully:
- The original message passes through unchanged.
- The resulting message is forwarded via the
Successconnection.
- If an error occurs during publishing:
- Error details are added to the outgoing message metadata under the
errorkey. - The message is forwarded via the
Failureconnection.
- Error details are added to the outgoing message metadata under the
输出连接
- Success
- 消息已成功发布到RabbitMQ。
- 原始消息原样透传。
- Failure
- Publishing error:RabbitMQ broker拒绝消息或exchange/路由配置无效。
- Unexpected error:消息处理过程中发生意外错误。
示例
示例1 — 基本telemetry发布
将telemetry数据发布到专用telemetry exchange,按设备类型路由。
入站消息
Data:
1
2
3
4
{
"temperature": 25.5,
"humidity": 60.2
}
Metadata:
1
2
3
4
{
"deviceType": "TH-Sensor",
"deviceName": "Sensor-001"
}
Node configuration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
"exchangeNamePattern": "telemetry",
"routingKeyPattern": "${deviceType}",
"messageProperties": "PERSISTENT_TEXT_PLAIN",
"host": "rabbitmq.example.com",
"port": 5672,
"virtualHost": "/",
"username": "iot-user",
"password": "secure-password",
"automaticRecoveryEnabled": true,
"connectionTimeout": 60000,
"handshakeTimeout": 10000,
"clientProperties": {
"application": "thingsboard",
"environment": "production"
}
}
出站消息
Data和metadata:未更改。
经 Success 连接路由。
结果
telemetry数据已成功发布到 “telemetry” exchange,routing key为 “TH-Sensor”。消息为持久化,broker重启后仍保留。RabbitMQ会将消息路由到与该exchange绑定且routing key匹配的queue。
示例2 — 动态exchange与routing key
根据alarm数据向按严重程度划分的exchange发布alarm消息,使用分层routing key。
入站消息
Data:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
"id": {
"entityType": "ALARM",
"id": "bfb13620-7737-400b-9c89-d569a0835de6"
},
"type": "HighTemperature",
"severity": "CRITICAL",
"originator": {
"entityType": "DEVICE",
"id": "b3e86d40-78f5-11f0-8e01-57f51829cedc"
},
"status": "ACTIVE_UNACK",
"details": {
"temperature": 95.5
}
}
Metadata:
1
2
3
4
{
"tenantId": "888e6780-78f5-11f0-8e01-57f51829cedc",
"deviceType": "temperature-sensor"
}
Node configuration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"exchangeNamePattern": "alarms-$[severity]",
"routingKeyPattern": "alarm.$[type].${deviceType}",
"messageProperties": "PERSISTENT_BASIC",
"host": "rabbitmq.example.com",
"port": 5672,
"virtualHost": "/iot",
"username": "alarm-publisher",
"password": "secure-password",
"automaticRecoveryEnabled": true,
"connectionTimeout": 60000,
"handshakeTimeout": 10000,
"clientProperties": {}
}
出站消息
Data和metadata:未更改。
经 Success 连接路由。
结果
alarm已发布到 “alarms-CRITICAL” exchange,routing key为 “alarm.HighTemperature.temperature-sensor”。消费者可使用topic exchange模式订阅特定alarm严重程度和类型。
示例3 — 使用client properties进行监控
Publish messages with custom client properties that help identify the connection in RabbitMQ management interface.
Incoming message
Data:
1
2
3
{
"value": 42
}
节点配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
{
"exchangeNamePattern": "metrics",
"routingKeyPattern": "performance",
"messageProperties": "TEXT_PLAIN",
"host": "rabbitmq.example.com",
"port": 5672,
"virtualHost": "/monitoring",
"username": "metrics-user",
"password": "secure-password",
"automaticRecoveryEnabled": true,
"connectionTimeout": 60000,
"handshakeTimeout": 10000,
"clientProperties": {
"application": "thingsboard",
"version": "3.6.0",
"environment": "production",
"node_id": "rule-engine-1"
}
}
出站消息
Data和metadata:未更改。
经 Success 连接路由。
结果
消息已成功发布。client properties会显示在RabbitMQ管理界面中,便于识别是哪台ThingsBoard规则引擎节点创建的连接,适用于监控、调试和容量规划。