将传入消息数据存储到自定义Cassandra表中。
配置
- Custom table name - 目标自定义表名,不含
cs_tb_前缀。表中必须已存在于Cassandra集群中,完整名称为cs_tb_{tableName}。前缀用于区分自定义表与ThingsBoard标准表。 - Fields mapping - 消息字段名(键)与表列名(值)的映射。每个条目定义消息数据中的字段如何存入对应表列。
- Default TTL - 插入记录的生存时间(秒)。设为0表示禁用TTL。设为正值时,记录将在指定时长后自动过期。
JSON Schema
消息处理算法
- 将传入消息数据解析为JSON并验证为JSON对象。
- 对每个字段映射条目:
- 若消息字段键为
$entityId,使用消息发起者的ID - 否则在消息数据中查找该字段
- 将JSON基本类型映射到对应Cassandra数据类型:
- 带小数点的数字 → Double
- 不带小数点的数字 → Long
- 布尔值 → Boolean
- 字符串 → String
- JSON对象转为字符串表示
- 缺少字段或非基本/非对象值将导致处理失败
- 若消息字段键为
- 构造并执行带映射值的
INSERT语句。 - 若配置了TTL(> 0),则对插入记录应用TTL。
- 插入成功后路由到
Success,发生任何错误时路由到Failure。
输出连接
Success- 数据已成功保存到自定义表
Failure- 目标表不存在
- 消息数据不是有效JSON对象
- 必需的消息字段缺失
- 消息字段值不是JSON基本类型或对象
- 数据库连接或插入错误
示例
示例1 — 基本数据插入
传入消息
Data:
1
2
3
4
5
6
{
"temperature": 23.5,
"humidity": 60,
"location": "Room A",
"active": true
}
节点配置
1
2
3
4
5
6
7
8
9
10
{
"tableName": "sensor_data",
"fieldsMapping": {
"temperature": "temp_value",
"humidity": "humidity_level",
"location": "room_name",
"active": "is_active"
},
"defaultTtl": 0
}
系统状态
- 表
cs_tb_sensor_data存在,列为:temp_value、humidity_level、room_name、is_active。
传出消息
传出消息与传入相同。经由 Success 连接路由。
结果
在 cs_tb_sensor_data 中插入一条新记录:
temp_value: 23.5 (Double)humidity_level: 60 (Long)room_name: “Room A” (String)is_active: true (Boolean)
示例2 — 使用实体ID映射
传入消息
Originator: DEVICE,ID为 a1b2c3d4-e5f6-7g8h-9i0j-k1l2m3n4o5p6
Data:
1
2
3
4
{
"reading": 150.7,
"timestamp": 1640995200
}
节点配置
1
2
3
4
5
6
7
8
9
{
"tableName": "device_readings",
"fieldsMapping": {
"$entityId": "device_id",
"reading": "sensor_value",
"timestamp": "read_time"
},
"defaultTtl": 3600
}
系统状态
- 表
cs_tb_device_readings存在,列为:device_id、sensor_value、read_time。
传出消息
传出消息与传入相同。经由 Success 连接路由。
结果
在 cs_tb_device_readings 中插入一条新记录,TTL为3600秒:
device_id:a1b2c3d4-e5f6-7g8h-9i0j-k1l2m3n4o5p6(发起者设备ID)sensor_value: 150.7 (Double)read_time: 1640995200 (Long)