ThingsBoard规则引擎是对于处理复杂的事件具有灵活配置和高度定制化的特点,使用规则引擎的Filter、Enrichment和Transform节点通过设备和相关资产发出输入消息,使用规则引擎的Action、Externala节点触发各种操作与通信。
概念
规则引擎消息
规则引擎消息可以被被序列化并有着规定的数据结构同时可以表示系统中的各种消息,例如:
- 设备遥测, 属性更新或RPC调用;
- 实体生命周期事件: created、updated、deleted、assigned、unassigned、属性更新;
- 设备状态事件: connected, disconnected, active, inactive,等;
- 其他事件。
规则引擎消息包含以下信息:
- 消息ID:基于时间的通用唯一标识符;
- 消息发起者:Device,Asset或其他实体标识符;
- 消息类型:遥测或不活动的事件等;
- 消息负载:消息payload的JSON字符串;
- 元数据:键值对的列表以及与消息有关的其他数据。
规则节点
规则节点是规则引擎的基本组件每次处理单个输入消息并生成一个或多个输出消息。
规则节点是规则引擎的主要逻辑单元。
规则节点可以是Filter、Enrichment、Transform输入消息或者是执行Action与External节点对外部系统进行通信。
规则节点关系
规则节点之间存在关联性每个节点都有对应关系类型,用于标识关系的逻辑标签。
当规则节点生成输出消息时,它总是将消息路由到下一个指定的节点并通过关系类型进行关联。
表示成功与否的规则节点关系是Success和Failure。
表示逻辑运算的规则节点可以是True或False。
一些特定的规则节点可能使用完全不同的关系类型例如:“Post Telemetry”、“Attributes Updated”、“Entity Created”等。
规则链
规则链是规则节点及其关系的逻辑组;例如:下面的规则链将:
- 将所有遥测消息保存到数据库中;
- 如果消息中的温度字段高于50度,则发出“高温警报”;
- 如果消息中的温度字段低于-40度,则发出“低温警报”;
- 如果在脚本中发生逻辑或语法错误时,则无法执行温度脚本检查控制台记录。
租户管理员可以定义一个Root Rule Chain还可以定义多个其他规则链。根规则链处理所有输入的消息,并将其转发到其他规则链以进行其他处理。
例如:
- 如果消息中的温度字段高于50度,则发出“高温警报”;
- 如果消息中的温度字段小于50度,则清除“高温警报”
- 将有关“已创建”和“已清除”警报的事件转发到外部规则链,该规则链处理向相应用户的通知。
消息处理结果
有三种消息处理结果:成功、失败和超时。
当消息被规则引擎中所有节点处理成功,那么该消息将被标注为”Success”。
当消息被规则引擎中任一节点处理失败,那么此消息将被标记”Failure”。
当处理超过配置的阈值时将消息标记为“Timeout”。
可能存在的情况参见下图:
如果”Transformation”脚本失败则该消息不会标记为”Failed”,因为存在与”Failure”关系连接到”Save to DB”节点。
如果”Transformation”脚本成功则将通过REST API调用将其推送到”External System”。
如果外部系统阻塞则REST API调用可能会“等待”一段时间。
假设消息处理超时为20秒忽略Transformation脚本的执行的小于1毫秒时间。
如果”External System”在20秒内回复则消息将被成功处理。
如果”Save to DB”调用成功则消息将被成功处理。
如果外部系统在20秒内未答复将消息标记为”timed-out”。
如果”Save to DB”调用失败将该消息将标记为失败。
规则引擎队列
查看文档
提交策略
查看文档
处理策略
查看文档
默认队列
查看文档
预定义消息类型
预定义消息类型列表:
类型 | 显示名称 | 描述 | 元数据 | payload |
POST_ATTRIBUTES_REQUEST | 属性发布 | 发布设备客户端属性 (参见属性API) | deviceName - 设备名称, deviceType - 设备类型 |
键/值{ |
POST_TELEMETRY_REQUEST | 遥测发布 | 发布设备遥测数据(参见遥测api) | deviceName - 设备名称, deviceType - 设备类型, ts - 时间戳 (毫秒) |
键/值{ |
TO_SERVER_RPC_REQUEST | RPC Request from Device | 设备RPC请求(参见客户端rpc api) | deviceName - 设备名称, deviceType - 设备类型, requestId - RPC请求Id |
包含方法和参数的json:{ |
RPC_CALL_FROM_SERVER_TO_DEVICE | 服务端RPC响应 | 响应RPC请求(参见服务端rpc api) | requestUUID - sustem表示内部应答的请求id, expirationTime - 请求过期时间, oneway - 指定请求类型: true - 无响应, false - 有响应 |
包含方法和参数的json:{ |
ACTIVITY_EVENT | 活动事件 | 表明设备处于活动状态的事件 | deviceName - 设备名称, deviceType - 设备类型 |
包含设备活动信息的json: { |
INACTIVITY_EVENT | 不活动事件 | 表示设备处理非活动状态的事件 | deviceName - 设备名称, deviceType - 设备类型 |
设备活动信息的json活动事件 payload |
CONNECT_EVENT | 连接事件 | 设备连接时的事件 | deviceName - 设备名称, deviceType - 设备类型 |
设备活动信息的json活动事件 payload |
DISCONNECT_EVENT | 断开事件 | 设备断开连接产生的事件 | deviceName - 设备名称, deviceType - 设备类型 |
设备活动信息的json活动事件 payload |
ENTITY_CREATED | 实体创建 | 实体创建产生的事件 | userName - 实体创建的用户名, userId - 用户Id |
实体详细信息的json: { |
ENTITY_UPDATED | 实体更新 | 更新实体产生的事件 | userName - 更新实体的用户名, userId - 用户Id |
实体详细信息的json:参见实体创建 payload |
ENTITY_DELETED | 实体删除 | 删除实体产生的事件 | userName - 删除实体的用户名, userId - 用户Id |
实体详细信息的json:参见实体创建 payload |
ENTITY_ASSIGNED | 实体分配 | 实体分配给客户时生的事件 | userName - 分配实体的用户名, userId - 用户Id, assignedCustomerName -分配的客户名, assignedCustomerId - 客户Id |
实体详细信息的json:参见实体创建 payload |
ENTITY_UNASSIGNED | 取消实体分配 | 取消实体对客户分配时产生的事件 | userName - 取消分配操作的用户名, userId - 用户Id, unassignedCustomerName - 取消配客户名称, unassignedCustomerId - 取消配客户Id |
实体详细信息的json:参见实体创建 payload |
ADDED_TO_ENTITY_GROUP | 添加分组 | 将实体添加到实体分组时产生的事件。 仅用于ThingsBoard PE。 | userName - 操作的用户名, userId - 用户Id, addedToEntityGroupName - 分组名称, addedToEntityGroupId - 分组Id |
payload为空 |
REMOVED_FROM_ENTITY_GROUP | 移除分组 | 移除分组。仅用于ThingsBoard PE。 | userName - 操作的用户名, userId - 用户Id, removedFromEntityGroupName - 分组名称, removedFromEntityGroupId - 分组Id |
payload为空 |
ATTRIBUTES_UPDATED | 属性更新 | 实体属性更新时产生的事件 | userName - 操作的用户名, userId - 用户Id, scope - 属性更新作用 ( SERVER_SCOPE或SHARED_SCOPE) |
键/值json: { |
ATTRIBUTES_DELETED | 属性删除 | 实体属性删除时产生的事件 | userName - 操作的用户名, userId - 用户Id, scope - 属性删除作用 (SERVER_SCOPE或SHARED_SCOPE) |
已删除的属性的keys列表: { |
ALARM | 警报事件 | 创建、更新或删除警报时产生的事件 |
消息发起者元数据中的所有字段
isNewAlarm - 创建了一个新的Alram,则为true isExistingAlarm - 已存在警报,则为true isClearedAlarm - 清除了警报,则为true |
创建警报的json详细信息:
{
|
REST_API_REQUEST | REST API请求到规则引擎 | 执行REST API调用时产生的事件 | requestUUID - 请求id, expirationTime - 请求过期时间 |
json请求的playload |
规则节点类型
根据其性质将所有可用规则节点分组:
- Filter节点用于消息过滤和路由;
- Enrichment节点用于更新传入消息的元数据;
- Transformation节点用于更改传入的消息字段,例如Originator, Type, Payload, Metadata;
- Action节点根据传入的消息执行各种动作;
- External节点用于与外部系统进行交互。
Configuration
每一个规则节点具有特定的参数配置,例如:Filter节点可以通过自定义JS函数。External节点可以通过参数配置实现外部邮件服务器连接设置
可以通过在“规则链”编辑器中双击节点来打开“规则节点”配置窗口:
Javascript函数
一些规则节点具有特定的UI功能,允许用户测试JS函数。单击Test Filter Function后,你将看到JS编辑器,可使用该编辑器替换输入参数并验证函数的输出。
你可以定义:
- Message Type 左上角.
- Message payload 左侧中间.
- Metadata 右上角.
- JS script 实际脚本.
点击Test按钮将在右侧Output返回值
规则引擎统计
ThingsBoard已经为“规则引擎”统计信息准备了“默认”仪表板。
将为每个租户自动加载此仪表板。统计信息收集默认情况下处于启用状态,并通过配置属性进行控制。
你可能会在下面的仪表板上注意到有关处理错误及其原因的见解:
调试
ThingsBoard提供了查看每个规则节点的传入和传出消息的功能。 要启用调试用户需要确保在主配置窗口中选中“调试模式”复选框 (请参阅配置部分中的第一个图像)。
启用调试后只要相应的关系类型用户就可以查看传入和传出消息的信息。请参阅下图获取示例调试消息视图:
Import/Export
你可以将规则链导出为JSON格式,并将其导入到相同或其他ThingsBoard实例。 为了导出规则链,你应该导航到Rule Chains页面,然后单击位于特定规则链卡上的导出按钮。
要导入规则链应该导航到Rules Chains页面单击屏幕右下角的大“ +”按钮然后单击导入按钮。
架构
要了解有关规则引擎内部的更多信息,请参阅[架构]architecture页面。
自定义REST API调用规则引擎
ThingsBoard提供了将自定义REST API调用发送到规则引擎,处理请求的有效负载并在响应正文中返回处理结果的API。 这对于许多用例很有用。例如:
- 通过自定义API调用扩展平台的现有REST API;
- 利用设备/资产/客户的属性丰富REST API调用,并转发给外部系统以进行复杂处理;
- 为你的自定义部件提供自定义API.
要执行REST API调用可以使用规则引擎控制器REST APIs:
注意:你在呼叫中指定的实体ID将是“规则引擎”消息的始发者。如果你未指定实体ID参数,则你的用户实体将成为消息的发起者。
教程
ThingsBoard的作者准备了一些教程以帮助你通过示例开始设计规则链:
点击此处查看更多教程此处。
故障排查
如果使用Kafka队列来处理消息ThingsBoard提供了监控将消息推送到Kafka的速率是否快于使用和处理消息的速率(在这种情况下消息处理的延迟将不断增加)。 要启用此功能需要确保启用Kafka消费者统计信息(请参阅配置queue.kafka.consumer-stats属性部分))
启用Kafka消费者统计信息后将看到有关消费者组偏移滞后的日志(请参阅故障排查)(有consumer-group logs, tb-core, tb-rule-engine和transport服务)。
日志消息:
1
2021-03-19 15:01:59,794 [kafka-consumer-stats-11-thread-1] INFO o.t.s.q.k.TbKafkaConsumerStatsService - [re-Main-consumer] Topic partitions with lag: [[topic=[tb_rule_engine.main.0], partition=[0], committedOffset=[5413], endOffset=[5418], lag=[5]]].
从此消息中我们可以看到有5(5418-5413=5)条消息推送到main队列(tb_rule_engine.main.0 Kafka主题)但尚未处理。
日志结构:
1
TIME [STATS_PRINTING_THREAD_NAME] INFO o.t.s.q.k.TbKafkaConsumerStatsService - [CONSUMER_GROUP_NAME] Topic partitions with lag: [[topic=[KAFKA_TOPIC], partition=[KAFKA_TOPIC_PARTITION], committedOffset=[LAST_PROCESSED_MESSAGE_OFFSET], endOffset=[LAST_QUEUED_MESSAGE_OFFSET], lag=[LAG]],[topic=[ANOTHER_TOPIC], partition=[], committedOffset=[], endOffset=[], lag=[]],...].
Where:
CONSUMER_GROUP_NAME
- 正在处理消息的消费者组名称(可以是规则引擎队列、核心队列等)KAFKA_TOPIC
- 主题名称KAFKA_TOPIC_PARTITION
- 分区数量LAST_PROCESSED_MESSAGE_OFFSET
- 消费者处理的最后一条消息的序列号(规则引擎中的最后确认消息等)LAST_QUEUED_MESSAGE_OFFSET
- 成功推送到Kafka主题的最后一条消息的序列号
注意:仅消费者组存在滞后时才会打印有关使用者滞后的日志。