- 概述
- 必备条件
- 步骤 1. 下载项目
- 步骤 2. 导入IDE
- 步骤 3. 创建节点
- 步骤 4. 节点导入
- 步骤 5. 添加包名至thingsboard.yml
- 步骤 6. 问题排查
- 步骤 7. 自定义自定义(可选)
- 下一步
概述
在本教程中你将学习如何创建自定义规则节点并将它们添加到你的ThingsBoard服务器实例。 我们将回顾三种不同类型的规则节点:Filter、Enrichment和Transformation。
必备条件
我们假设你已完成以下指南并查看了以下文章:
安装以下第三方工具:
- OpenJDK 11
- Maven 3.6.0+
- IntelliJ IDEA
- [可选]安装Lombok插件
步骤 1. 下载项目
克隆代码到本地文件夹:
1
2
git clone https://github.com/thingsboard/rule-node-examples
cd rule-node-examples
示例项目默认配置的社区版的API规则节点这样可以兼容社区版和专业版。
如果使用独立部署的专业版API必须更改thingsboard.yml中的”thingsboard.version”参数:
1
nano pom.xml
For example, the property below is set to 3.5.1PE Professional Edition:
1
2
3
4
5
6
7
...
<properties>
...
<thingsboard.version>3.5.1PE</thingsboard.version>
...
</properties>
...
构建项目:
1
mvn clean install
执行输出结果:
1
2
3
4
5
6
7
...
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 1.431 s
[INFO] Finished at: 2020-08-18T11:01:40+03:00
[INFO] ------------------------------------------------------------------------
步骤 2. 导入IDE
将项目能过Maven导入并确保Lombok已经安装到IDEA中。
步骤 3. 创建节点
例如你可以看到一个非常简单的规则节点根据消息payload中是否存在键来过滤消息,本规则节点是在上一步中下载的项目的一部分。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@RuleNode(
type = ComponentType.FILTER,
name = "check key",
relationTypes = {"True", "False"},
configClazz = TbKeyFilterNodeConfiguration.class,
nodeDescription = "Checks the existence of the selected key in the message payload.",
nodeDetails = "If the selected key exists - send Message via <b>True</b> chain, otherwise <b>False</b> chain is used.",
uiResources = {"static/rulenode/custom-nodes-config.js"},
configDirective = "tbFilterNodeCheckKeyConfig")
public class TbKeyFilterNode implements TbNode {
private static final ObjectMapper mapper = new ObjectMapper();
private TbKeyFilterNodeConfiguration config;
private String key;
@Override
public void init(TbContext tbContext, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbKeyFilterNodeConfiguration.class);
key = config.getKey();
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
try {
ctx.tellNext(msg, mapper.readTree(msg.getData()).has(key) ? "True" : "False");
} catch (IOException e) {
ctx.tellFailure(msg, e);
}
}
@Override
public void destroy() {
}
}
在上面列出的源代码中需要注意的几点:
@RuleNode注解
@RuleNode注解定义了类型、名称、描述, UI和输出[关系]。
参数说明:
- type - 规则节点类型;
- name - 规则节点名称;
- nodeDescription - 节点简短描述;
- nodeDetails - 节点带有HTML的详细描述;
- configClazz - 配置文件完整类名;
- relationTypes - 预定义关系类型参数值对应TbContext.tellNext方法中的值;
- customRelations - 定义TbContext.tellNext方法中的关系;
- configDirective - Angular中UI指令的名称如果为空参看到配置的JSON信息;
- uiResources - Angular中UI指令的文件路径;
- icon - icon图标;
- iconUrl - icon图标的完整地址;
- docUrl - 规则节点的使用文档地址。
生命周期
“init”方法用于解析当前节点的JSON的配置和获取TbContext对象”TbNodeUtils.convert”方法将配置转换库java对象。
“destroy”方法用于节点销毁时调用如果在次编辑节点配置将依次调用“destroy”和“init”方法。
处理消息
规则节点实现must使用以下方法通知规则引擎已成功处理:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* Indicates that message was successfully processed by the rule node.
* Sends message to all Rule Nodes in the Rule Chain
* that are connected to the current Rule Node using "Success" relationType.
*
* @param msg
*/
void tellSuccess(TbMsg msg);
/**
* Sends message to all Rule Nodes in the Rule Chain
* that are connected to the current Rule Node using specified relationType.
*
* @param msg
* @param relationType
*/
void tellNext(TbMsg msg, String relationType);
/**
* Sends message to all Rule Nodes in the Rule Chain
* that are connected to the current Rule Node using one of specified relationTypes.
*
* @param msg
* @param relationTypes
*/
void tellNext(TbMsg msg, Set<String> relationTypes);
如果消息处理失败规则节点实现必须调用”tellFailure”方法:
1
2
3
4
5
6
7
8
9
/**
* Notifies Rule Engine about failure to process current message.
*
* @param msg - message
* @param th - exception
*/
void tellFailure(TbMsg msg, Throwable th);
如果规则节点实现不调用上面列出的任何方法则规则引擎将等待可配置的超时并阻塞其他消息处理并最终将当前消息标记为失败。
服务
TbContext包含许多有的”getters”用服务下面列出可用的服务简短列表:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// Allows to work with entity attributes: get and save them;
AttributesService getAttributesService();
// Allows CRUD (Create, Read, Updat, Delete) operations over the customer entities;
CustomerService getCustomerService();
// Allows CRUD operations over users;
UserService getUserService();
// Allows CRUD operations over assets;
AssetService getAssetService();
// Allows CRUD operations over devices;
DeviceService getDeviceService();
// Allows CRUD operations over entity views;
EntityViewService getEntityViewService();
// Allows to programmatically create and manage dashboards;
DashboardService getDashboardService();
// Allows to create and clear alarms;
RuleEngineAlarmService getAlarmService();
// Allows to programmatically create and manage rule chains;
RuleChainService getRuleChainService();
// Allows to send RPC commands to devices;
RuleEngineRpcService getRpcService();
// Allows to store telemetry to the database and push notifications to the dashbaords via WebSockets;
RuleEngineTelemetryService getTelemetryService();
// Allows to find telemetry and save it to the database without notifications to the dashboards;
TimeseriesService getTimeseriesService();
// Allows to programmatically query and manage entity relations;
RelationService getRelationService();
ThingsBoard专业版通过TbContext.getPeContext()提供可以访问的以下服务:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// Allows to programmatically create and manage integrations;
IntegrationService getIntegrationService();
// Allows to programmatically create and manage entity groups;
EntityGroupService getEntityGroupService();
// Allows to programmatically create reports;
ReportService getReportService();
// Allows to programmatically manage blob entities;
BlobEntityService getBlobEntityService();
// Allows to programmatically manage group permissions;
GroupPermissionService getGroupPermissionService();
// Allows to programmatically manage roles;
RoleService getRoleService();
// Get entity owner (TenantId or CustomerId)
EntityId getOwner(TenantId tenantId, EntityId entityId);
// Clear entity owners cache
void clearOwners(EntityId entityId);
// Get all sub-customers of the current entity
Set<EntityId> getChildOwners(TenantId tenantId, EntityId parentOwnerId);
// Allows to change entity owner. Expects TenantId or CustomerId as targetOwnerId
void changeDashboardOwner(TenantId tenantId, EntityId targetOwnerId, Dashboard dashboard) throws ThingsboardException;
void changeUserOwner(TenantId tenantId, EntityId targetOwnerId, User user) throws ThingsboardException;
void changeCustomerOwner(TenantId tenantId, EntityId targetOwnerId, Customer customer) throws ThingsboardException;
void changeEntityViewOwner(TenantId tenantId, EntityId targetOwnerId, EntityView entityView) throws ThingsboardException;
void changeAssetOwner(TenantId tenantId, EntityId targetOwnerId, Asset asset) throws ThingsboardException;
void changeDeviceOwner(TenantId tenantId, EntityId targetOwnerId, Device device) throws ThingsboardException;
void changeEntityOwner(TenantId tenantId, EntityId targetOwnerId, EntityId entityId, EntityType entityType) throws ThingsboardException;
// Allows to push custom downlink message to the integration
void pushToIntegration(IntegrationId integrationId, TbMsg tbMsg, FutureCallback<Void> callback);
创建节点
需要创建从当前消息派生的消息并将其推送到规则引擎;例如:编写一个自定义规则节点将消息从当前客户复制到所有客户设备:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
EntityId msgOriginator = msg.getOriginator();
// Checking that the message originator is a Customer;
if (EntityType.CUSTOMER.equals(msgOriginator.getEntityType())) {
CustomerId customerId = new CustomerId(msgOriginator.getId());
boolean hasNext = true;
// Creating the page link to iterate through the devices;
PageLink pageLink = new PageLink(1024);
while (hasNext) {
// Using the Device Service to get devices from the database;
PageData<Device> devices = ctx.getDeviceService().findDevicesByTenantIdAndCustomerId(ctx.getTenantId(), customerId, pageLink);
hasNext = devices.hasNext();
pageLink = pageLink.nextPageLink();
for (Device device : devices.getData()) {
// Creating new message with different originator
TbMsg newMsg = TbMsg.newMsg(msg.getQueueName(), msg.getType(), device.getId(), msg.getMetaData(), msg.getData());
// Pushing new message to the queue instead of tellNext to make sure that the message will be persisted;
ctx.enqueueForTellNext(newMsg, "Success");
}
}
// Don't forget to acknowledge original message or use ctx.tellSuccess(msg);
ctx.ack(msg);
} else {
ctx.tellFailure(msg, new IllegalArgumentException("Msg originator is not Customer!"));
}
}
通过TbContext.enqueueForTellNext方法将新消息推送到规则引擎,消息将根据关系类型推送到相关的规则节点另一种选择是将消息放在处理根规则链。
1
2
3
void enqueue(TbMsg msg, String queueName, Runnable onSuccess, Consumer<Throwable> onFailure);
该方法将新消息推送到队列:
1
2
3
void enqueueForTellNext(TbMsg msg, String queueName, String relationType, Runnable onSuccess, Consumer<Throwable> onFailure);
多线程
规则引擎是按顺序调用TbNode.onMsg方法的actor模型实现节点消息流转因此在同一线程中处理消息是的线程安全的。
但是出于性能原因大多数API调用都在单独的线程中执行,例如如何保存传入消息中的遥测数据:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
try {
// Parsing the incoming message;
ObjectNode json = (ObjectNode) mapper.readTree(msg.getData());
// Converting temperature from °F to °C
double temperatureF = json.get("temperature").asDouble();
double temperatureC = (temperatureF - 32) * 5 / 9;
// Creating the telemetry data point
TsKvEntry tsKvEntry = new BasicTsKvEntry(System.currentTimeMillis(), new DoubleDataEntry("temperature", temperatureC));
// Using async API call to save telemetry with the callback
ctx.getTelemetryService().saveAndNotify(ctx.getTenantId(), msg.getOriginator(), Collections.singletonList(tsKvEntry), new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Void aVoid) {
// Telemetry is saved, now we can acknowledge the message;
ctx.tellSuccess(msg);
}
@Override
public void onFailure(Throwable throwable) {
// Telemetry is not saved, we need rule engine to reprocess the message;
ctx.tellFailure(msg, throwable);
}
});
} catch (JsonProcessingException e) {
ctx.tellFailure(msg, e);
}
}
通过回调线程中的TbContext.tellSuccess方法处理”acknowledge”或”forward”消息不是在主线程中。
群集
规则引擎微服务为每个规则节点启动单个实例如果有三个规则引擎实例则每个实例将启动RuleNode的一个实例,规则引擎消息根据消息的发起者ID(设备或资产)进行分区;因此设备的消息将始终转到特定的规则引擎微服务上的同一规则节点实例。 只有在添加或删除规则节点情况下将重新执行”repartition”事件进行分区。
规则节点开发人员可以覆盖TbNode.onPartitionChangeMsg默认方法更改群集的分区这对于决定根据消息的发起者(设备/资产)ID缓存信息的有状态节点十分有用,如果需要确定当前实体ID是否属于当前分区通过TbContext.isLocalEntity方法处理。
请参阅下面的完整示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package org.thingsboard.rule.engine.node.filter;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.EmptyNodeConfiguration;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
@Slf4j
@RuleNode(
type = ComponentType.FILTER,
name = "Cache example",
relationTypes = {"True", "False"},
configClazz = EmptyNodeConfiguration.class,
nodeDescription = "Checks that the incoming value exceeds certain threshold",
nodeDetails = "If temperature is too high - send Message via <b>True</b> chain, otherwise <b>False</b> chain is used.",
uiResources = {"static/rulenode/rulenode-core-config.js"},
configDirective = "tbNodeEmptyConfig")
public class TbCacheExampleNode implements TbNode {
private static final ObjectMapper mapper = new ObjectMapper();
private ConcurrentMap<EntityId, Double> cache;
@Override
public void init(TbContext tbContext, TbNodeConfiguration configuration) throws TbNodeException {
this.cache = new ConcurrentHashMap<>();
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
try {
// Parsing the incoming message;
ObjectNode json = (ObjectNode) mapper.readTree(msg.getData());
double temperature = json.get("temperature").asDouble();
// Fetching temperatureThreshold attribute from cache or from the database
Double temperatureThreshold = getCacheValue(ctx, msg.getOriginator(), "temperatureThreshold", 42);
// Compare and do something with the result of comparison;
ctx.tellNext(msg, temperature > temperatureThreshold ? "True" : "False");
} catch (JsonProcessingException e) {
ctx.tellFailure(msg, e);
}
}
@Override
public void onPartitionChangeMsg(TbContext ctx, PartitionChangeMsg msg) {
// Cleanup the cache for all entities that are no longer assigned to current server partitions
cache.entrySet().removeIf(entry -> !ctx.isLocalEntity(entry.getKey()));
}
private double getCacheValue(TbContext ctx, EntityId entityId, String attributeKey, double defaultValue) {
// Get value from cache or from the database.
return cache.computeIfAbsent(entityId, id -> {
try {
Optional<AttributeKvEntry> attr = ctx.getAttributesService().find(ctx.getTenantId(), entityId, DataConstants.SERVER_SCOPE, attributeKey).get();
if (attr.isPresent()) {
return attr.get().getDoubleValue().orElse(defaultValue);
} else {
return defaultValue;
}
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
}
@Override
public void destroy() {
// In case you have changed the configuration, it is good idea to clear the entire cache.
cache.clear();
}
}
步骤 4. 节点导入
完成规则节点的编码后再次执行构建命令:
1
mvn clean install
在ThingsBoard项目找到jar文件作:
1
target/rule-engine-1.0.0-custom-nodes.jar
现在将准备好的规则节点jar文件添加到ThingsBoard实例中:
- 如果是ThingsBoard服务安装参见步骤4.1
- 如果是ThingsBoard源码运行参见步骤4.2
步骤 4.1 添加JAR文件到服务
- 需要执行以下命令将jar文件复制到extensions目录:
1
sudo cp rule-engine-1.0.0-custom-nodes.jar /usr/share/thingsboard/extensions/
- 执行以下命令将所有者更改为ThingsBoard:
1
sudo chown thingsboard:thingsboard /usr/share/thingsboard/extensions/*
重启服务:
1
sudo service thingsboard restart
重启ThingsBoard服务后需要清除浏览器缓存重新加载规则节点的UI
步骤 4.2 添加JAR文件到本地
重启ThingsBoard服务器端容器请参考以下链接了解如何执行此操作:运行服务器端容器。
重启ThingsBoard服务后需要清除浏览器缓存重新加载规则节点的UI
步骤 5. 添加包名至thingsboard.yml
注意:如果你已将软件包名称从org.thingsboard.rule.engine更改为公司软件包名称例如com.example.rule.engine还需要在插件部分的thingsboard.yml文件中添加你的软件包名称:
1
2
3
4
5
# Plugins configuration parameters
plugins:
# Comma separated package list used during classpath scanning for plugins
scan_packages: "${PLUGINS_SCAN_PACKAGES:org.thingsboard.server.extensions,org.thingsboard.rule.engine,com.example.rule.engine}"
步骤 6. 问题排查
验证自定义规则节点的最简单方法是创建generator规则节点并将其连接到自定义规则节点将消息流分发至自定义规则节点启用debug验证节点输出并检查它们是否存在错误。
步骤 7. 自定义自定义(可选)
ThingsBoard规则节点UI在官方github仓库中可以找到,请参考下以连接查看构建说明。
通过热部署模式方式运行请执行以下操作:
- 修改proxy.conf.js文件中的ruleNodeUiforwardPort常量将8080修改5000端口。
1
nano ${TB_WORK_DIR}/ui-ngx/proxy.conf.js
-
需要在热部署模式下重新运行UI容器请参考以下链接了解如何运行热部署模式。
-
在本地目录TB_RULE_NODE_UI_WORK_DIR执行以下命令:
1
npm start
下一步
-
入门指南 - 快速学习ThingsBoard相关功能。
-
安装指南 - 学习如何在各种操作系统上安装ThingsBoard。
-
连接设备 - 学习如何根据你的连接方式或解决方案连接设备。
-
可 视 化 - 学习如何配置复杂的ThingsBoard仪表板说明。
-
数据处理 - 学习如何使用ThingsBoard规则引擎。
-
数据分析 - 学习如何使用规则引擎执行基本的分析任务。
-
硬件样品 - 学习如何将各种硬件平台连接到ThingsBoard。
-
高级功能 - 学习高级ThingsBoard功能。