- 概述
- 前提条件
- Step 1. 下载并构建示例项目
- Step 2. 将项目导入IDE
- Step 3. 创建Rule Node
- Step 4. 导入自定义Rule Nodes实例
- Step 5. 在thingsboard.yml中添加自定义包名
- Step 6. 排查Rule Node问题
- 下一步
概述
本教程将介绍如何创建自定义Rule Node并将其添加到ThingsBoard服务器实例。 将涉及三种Rule Node:Filter、Enrichment和Transformation。
前提条件
假设你已完成以下指南并阅读了下列文章:
- 快速入门 指南
- Rule Engine 概述 文档
并已安装以下第三方依赖:
Step 1. 下载并构建示例项目
克隆仓库并进入仓库目录:
1
2
git clone -b release-4.3 https://github.com/thingsboard/rule-node-examples
cd rule-node-examples
默认情况下,示例项目配置为使用ThingsBoard社区版的API。 这使你的Rule Node与平台的Community版和Professional版均兼容。
若需使用Professional Edition专属API(例如操作Entity Groups等),
请在 pom.xml 中修改 “thingsboard.version” 参数:
1
nano pom.xml
例如,以下属性设置为 4.3.0.1PE Professional Edition:
1
2
3
4
5
6
7
...
<properties>
...
<thingsboard.version>4.3.0.1PE</thingsboard.version>
...
</properties>
...
最后,构建项目:
1
mvn clean install
预期输出:
1
2
3
4
5
6
7
...
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 1.431 s
[INFO] Finished at: 2020-08-18T11:01:40+03:00
[INFO] ------------------------------------------------------------------------
Step 2. 将项目导入IDE
确保已安装 Lombok 插件。将项目以Maven项目形式导入IDE。
Step 3. 创建Rule Node
创建新Rule Node时,需实现 TbNode interface and annotate it with the RuleNode annotation.
例如,可参考一个根据消息payload中key是否存在过滤传入消息的简单Rule Node。 此Rule Node是上一步下载项目的一部分。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@RuleNode(
type = ComponentType.FILTER,
name = "check key",
relationTypes = {"True", "False"},
configClazz = TbKeyFilterNodeConfiguration.class,
nodeDescription = "Checks the existence of the selected key in the message payload.",
nodeDetails = "If the selected key exists - send Message via <b>True</b> chain, otherwise <b>False</b> chain is used.",
uiResources = {"static/rulenode/custom-nodes-config.js"},
configDirective = "tbFilterNodeCheckKeyConfig")
public class TbKeyFilterNode implements TbNode {
private static final ObjectMapper mapper = new ObjectMapper();
private TbKeyFilterNodeConfiguration config;
private String key;
@Override
public void init(TbContext tbContext, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbKeyFilterNodeConfiguration.class);
key = config.getKey();
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
try {
ctx.tellNext(msg, mapper.readTree(msg.getData()).has(key) ? "True" : "False");
} catch (IOException e) {
ctx.tellFailure(msg, e);
}
}
@Override
public void destroy() {
}
}
上述源码需注意以下几点:
@RuleNode注解
@RuleNode注解定义节点类型、名称、描述、UI表单及出站 relations。
可用参数说明如下:
- type:可选 Rule Node 类型 之一,决定Rule Chain Editor中Rule Node所属分组;
- name:Rule Node名称,用于Rule Chain Editor及调试信息;
- nodeDescription:节点简要描述,在Rule Chain Editor中展示;
- nodeDetails:节点完整描述,支持HTML,在Rule Chain Editor中展示;
- configClazz:描述配置JSON的完整类名;
- relationTypes:预定义 relation types 字符串数组,应与 TbContext.tellNext 中使用的值一致;
- customRelations:布尔值,表示是否在 TbContext.tellNext 中使用自定义relations;
- configDirective:用于编辑Rule Node配置的Angular UI指令名,可选,为空时使用原始JSON编辑器;
- uiResources:包含配置指令的Angular UI文件路径,可选,为空时使用原始JSON编辑器;
- icon:angular material包中的图标名;
- iconUrl:Rule Chain Editor节点列表中显示Rule Node的图标完整URL;
- docUrl:Rule Node文档页链接,在Rule Chain Editor中可见。
Rule Node生命周期
“init” 方法在创建新Rule Node时由Rule Engine调用, 例如将Rule Node添加到Rule Chain或系统重启时。 此方法主要用于解析JSON配置或获取 TbContext 本地副本。 “TbNodeUtils.convert” 将原始配置解析为指定类的Java对象。
“destroy” 方法在Rule Node销毁时由Rule Engine调用, 例如从Rule Chain中移除Rule Node或系统停止时。
用户修改现有Rule Node配置时,Rule Engine会依次调用 “destroy” 和 “init”。
处理传入消息
Rule Node实现必须使用下列方法之一告知Rule Engine消息已成功处理:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 表示 Rule Node 已成功处理消息。
* 将消息发送到 Rule Chain 中通过 "Success" relationType
* 连接到当前 Rule Node 的所有 Rule Node。
*
* @param msg
*/
void tellSuccess(TbMsg msg);
/**
* 将消息发送到 Rule Chain 中通过指定 relationType
* 连接到当前 Rule Node 的所有 Rule Node。
*
* @param msg
* @param relationType
*/
void tellNext(TbMsg msg, String relationType);
/**
* 将消息发送到 Rule Chain 中通过指定 relationTypes 之一
* 连接到当前 Rule Node 的所有 Rule Node。
*
* @param msg
* @param relationTypes
*/
void tellNext(TbMsg msg, Set<String> relationTypes);
若消息处理失败,Rule Node实现必须调用 “tellFailure” 方法:
1
2
3
4
5
6
7
8
9
/**
* 向 Rule Engine 通知当前消息处理失败。
*
* @param msg - 消息
* @param th - 异常
*/
void tellFailure(TbMsg msg, Throwable th);
若Rule Node实现未调用上述任何方法,Rule Engine将等待可配置超时,阻塞其他消息处理,并最终将当前消息标记为失败。
使用ThingsBoard服务
TbContext 包含多种有用服务的 “getter”。 请在IDE中勾选 “Download Sources” 以方便浏览这些服务的接口。 可用服务getter简要列表如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 操作实体 attributes:读取和保存;
AttributesService getAttributesService();
// 对 customer 实体执行 CRUD(Create、Read、Update、Delete)操作;
CustomerService getCustomerService();
// 对用户执行 CRUD 操作;
UserService getUserService();
// 对 assets 执行 CRUD 操作;
AssetService getAssetService();
// 对 devices 执行 CRUD 操作;
DeviceService getDeviceService();
// 对 entity views 执行 CRUD 操作;
EntityViewService getEntityViewService();
// 以编程方式创建和管理 dashboards;
DashboardService getDashboardService();
// 创建和清除 alarms;
RuleEngineAlarmService getAlarmService();
// 以编程方式创建和管理 rule chains;
RuleChainService getRuleChainService();
// 向设备发送 RPC 命令;
RuleEngineRpcService getRpcService();
// 将 telemetry 存储到数据库并通过 WebSocket 推送到 dashboards;
RuleEngineTelemetryService getTelemetryService();
// 查找 telemetry 并保存到数据库,不向 dashboards 发送通知;
TimeseriesService getTimeseriesService();
// 以编程方式查询和管理实体 relations;
RelationService getRelationService();
ThingsBoard PE用户可通过 TbContext.getPeContext() 访问额外服务。TbPeContext提供以下服务:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 以编程方式创建和管理 integrations;
IntegrationService getIntegrationService();
// 以编程方式创建和管理 entity groups;
EntityGroupService getEntityGroupService();
// 以编程方式创建 reports;
ReportService getReportService();
// 以编程方式管理 blob 实体;
BlobEntityService getBlobEntityService();
// 以编程方式管理 group 权限;
GroupPermissionService getGroupPermissionService();
// 以编程方式管理 roles;
RoleService getRoleService();
// 获取实体 owner(TenantId 或 CustomerId)
EntityId getOwner(TenantId tenantId, EntityId entityId);
// 清除实体 owners 缓存
void clearOwners(EntityId entityId);
// 获取当前实体的所有子 customers
Set<EntityId> getChildOwners(TenantId tenantId, EntityId parentOwnerId);
// 变更实体 owner,targetOwnerId 为 TenantId 或 CustomerId
void changeDashboardOwner(TenantId tenantId, EntityId targetOwnerId, Dashboard dashboard) throws ThingsboardException;
void changeUserOwner(TenantId tenantId, EntityId targetOwnerId, User user) throws ThingsboardException;
void changeCustomerOwner(TenantId tenantId, EntityId targetOwnerId, Customer customer) throws ThingsboardException;
void changeEntityViewOwner(TenantId tenantId, EntityId targetOwnerId, EntityView entityView) throws ThingsboardException;
void changeAssetOwner(TenantId tenantId, EntityId targetOwnerId, Asset asset) throws ThingsboardException;
void changeDeviceOwner(TenantId tenantId, EntityId targetOwnerId, Device device) throws ThingsboardException;
void changeEntityOwner(TenantId tenantId, EntityId targetOwnerId, EntityId entityId, EntityType entityType) throws ThingsboardException;
// 向 integration 推送自定义 downlink 消息
void pushToIntegration(IntegrationId integrationId, TbMsg tbMsg, FutureCallback<Void> callback);
从Rule Node创建新消息
有时需要基于当前消息创建并推送派生消息到Rule Engine。 例如,编写一个将当前customer的消息复制到其所有devices的自定义Rule Node:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
EntityId msgOriginator = msg.getOriginator();
// 确认消息来源为 Customer;
if (EntityType.CUSTOMER.equals(msgOriginator.getEntityType())) {
CustomerId customerId = new CustomerId(msgOriginator.getId());
boolean hasNext = true;
// 创建 page link 以遍历 devices;
PageLink pageLink = new PageLink(1024);
while (hasNext) {
// 使用 Device Service 从数据库获取 devices;
PageData<Device> devices = ctx.getDeviceService().findDevicesByTenantIdAndCustomerId(ctx.getTenantId(), customerId, pageLink);
hasNext = devices.hasNext();
pageLink = pageLink.nextPageLink();
for (Device device : devices.getData()) {
// 创建具有不同 originator 的新消息
TbMsg newMsg = TbMsg.newMsg(msg.getQueueName(), msg.getType(), device.getId(), msg.getMetaData(), msg.getData());
// 使用 enqueueForTellNext 而非 tellNext 推送到队列,确保消息被持久化;
ctx.enqueueForTellNext(newMsg, "Success");
}
}
// 不要忘记确认原始消息或调用 ctx.tellSuccess(msg);
ctx.ack(msg);
} else {
ctx.tellFailure(msg, new IllegalArgumentException("Msg originator is not Customer!"));
}
}
我们使用 TbContext.enqueueForTellNext 将新消息推送到Rule Engine。 消息将根据relation type推送到相关Rule Nodes。另一种方式是将消息置于处理起点,即root rule chain。
1
2
3
void enqueue(TbMsg msg, String queueName, Runnable onSuccess, Consumer<Throwable> onFailure);
也可使用另一方法,以接收新消息成功入队的确认:
1
2
3
void enqueueForTellNext(TbMsg msg, String queueName, String relationType, Runnable onSuccess, Consumer<Throwable> onFailure);
多线程
Rule Engine采用 actor model 实现,对Rule Node邮箱中的每条新消息 依次调用 TbNode.onMsg。 因此,若在同一线程中处理消息,实现是线程安全的。
但出于性能考虑,大部分API调用在单独线程中执行。 例如,以下为保存传入消息中telemetry的正确方式:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
try {
// 解析传入消息;
ObjectNode json = (ObjectNode) mapper.readTree(msg.getData());
// 将温度从 °F 转为 °C
double temperatureF = json.get("temperature").asDouble();
double temperatureC = (temperatureF - 32) * 5 / 9;
// 创建 telemetry 数据点
TsKvEntry tsKvEntry = new BasicTsKvEntry(System.currentTimeMillis(), new DoubleDataEntry("temperature", temperatureC));
// 使用异步 API 并通过 callback 保存 telemetry
ctx.getTelemetryService().saveAndNotify(ctx.getTenantId(), msg.getOriginator(), Collections.singletonList(tsKvEntry), new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Void aVoid) {
// telemetry 已保存,可确认消息;
ctx.tellSuccess(msg);
}
@Override
public void onFailure(Throwable throwable) {
// telemetry 未保存,需由 Rule Engine 重新处理消息;
ctx.tellFailure(msg, throwable);
}
});
} catch (JsonProcessingException e) {
ctx.tellFailure(msg, e);
}
}
注意,我们是在callback线程中通过TbContext.tellSuccess “确认”或”转发”消息,而非主线程。
集群模式
每个rule-engine微服务会启动一个Rule Node实例。例如有3个rule engine实例时,每个都会启动一个RuleNode实例。 Rule Engine messages 按消息originator id(device或asset id)分区。 因此,来自同一device的消息会始终发送到特定rule engine微服务上的同一Rule Node实例。 仅当Rule Nodes被添加或移除时会出现 “repartition” 事件。
作为Rule Node开发者,可重写 TbNode.onPartitionChangeMsg 以响应集群拓扑变化。对根据消息originator(device/asset)id缓存信息的有状态节点很有用。 要判断当前entity id是否属于当前分配的partition列表,可使用 TbContext.isLocalEntity。 完整示例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package org.thingsboard.rule.engine.node.filter;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.EmptyNodeConfiguration;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
@Slf4j
@RuleNode(
type = ComponentType.FILTER,
name = "Cache example",
relationTypes = {"True", "False"},
configClazz = EmptyNodeConfiguration.class,
nodeDescription = "Checks that the incoming value exceeds certain threshold",
nodeDetails = "If temperature is too high - send Message via <b>True</b> chain, otherwise <b>False</b> chain is used.",
uiResources = {"static/rulenode/rulenode-core-config.js"},
configDirective = "tbNodeEmptyConfig")
public class TbCacheExampleNode implements TbNode {
private static final ObjectMapper mapper = new ObjectMapper();
private ConcurrentMap<EntityId, Double> cache;
@Override
public void init(TbContext tbContext, TbNodeConfiguration configuration) throws TbNodeException {
this.cache = new ConcurrentHashMap<>();
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
try {
// 解析传入消息;
ObjectNode json = (ObjectNode) mapper.readTree(msg.getData());
double temperature = json.get("temperature").asDouble();
// 从缓存或数据库获取 temperatureThreshold attribute
Double temperatureThreshold = getCacheValue(ctx, msg.getOriginator(), "temperatureThreshold", 42);
// 比较并对比较结果进行处理;
ctx.tellNext(msg, temperature > temperatureThreshold ? "True" : "False");
} catch (JsonProcessingException e) {
ctx.tellFailure(msg, e);
}
}
@Override
public void onPartitionChangeMsg(TbContext ctx, PartitionChangeMsg msg) {
// 清除已不再分配给当前服务器 partition 的实体缓存
cache.entrySet().removeIf(entry -> !ctx.isLocalEntity(entry.getKey()));
}
private double getCacheValue(TbContext ctx, EntityId entityId, String attributeKey, double defaultValue) {
// 从缓存或数据库获取值。
return cache.computeIfAbsent(entityId, id -> {
try {
Optional<AttributeKvEntry> attr = ctx.getAttributesService().find(ctx.getTenantId(), entityId, DataConstants.SERVER_SCOPE, attributeKey).get();
if (attr.isPresent()) {
return attr.get().getDoubleValue().orElse(defaultValue);
} else {
return defaultValue;
}
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
}
@Override
public void destroy() {
// 若配置已变更,建议清空整个缓存。
cache.clear();
}
}
Step 4. 导入自定义Rule Nodes实例
完成Rule Node编码后,再次执行构建命令:
1
mvn clean install
然后,将jar文件作为依赖库放入ThingsBoard项目。构建产物位于:
1
target/rule-engine-1.0.0-custom-nodes.jar
现在可将包含Rule Nodes的jar文件添加到ThingsBoard实例:
- 首先,执行以下命令将jar文件复制到ThingsBoard extensions:
1
sudo cp rule-engine-1.0.0-custom-nodes.jar /usr/share/thingsboard/extensions/
- 接着执行以下命令将owner改为ThingsBoard:
1
sudo chown thingsboard:thingsboard /usr/share/thingsboard/extensions/*
重启ThingsBoard服务:
1
sudo service thingsboard restart
ThingsBoard重启后需清除浏览器缓存并刷新页面以重新加载Rule Nodes的UI
Step 5. 在thingsboard.yml中添加自定义包名
注意:若将包名从 org.thingsboard.rule.engine 改为公司包名(如 com.example.rule.engine), 需在 thingsboard.yml 的plugins部分添加该包名:
1
2
3
4
5
# 插件配置参数
plugins:
# 插件 classpath 扫描时使用的逗号分隔包列表
scan_packages: "${PLUGINS_SCAN_PACKAGES:org.thingsboard.server.extensions,org.thingsboard.rule.engine,com.example.rule.engine}"
Step 6. 排查Rule Node问题
验证自定义Rule Node的简便方式是创建 generator Rule Node 并连接到自定义Rule Node,以生成可配置的传入消息流。 完成后,为自定义Rule Node开启 debug 以验证节点输出并检查错误。