在本教程中你将学习如何创建自定义节点:
我们假设你已完成以下指南并查看了以下文章:
为了创建新的规则节点,你应该实现TbNode 接口:
package org.thingsboard.rule.engine.api;
...
public interface TbNode {
void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException;
void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException;
void destroy();
}
并使用以下引用运行时的多值注释来注释你的实现:
org.thingsboard.rule.engine.api.RuleNode
每个规则节点也可以具有实现NodeConfiguration接口的配置类。
package org.thingsboard.rule.engine.api;
public interface NodeConfiguration<T extends NodeConfiguration> {
T defaultConfiguration();
}
package org.thingsboard.rule.engine.node.filter;
import lombok.Data;
import org.thingsboard.rule.engine.api.NodeConfiguration;
@Data
public class TbKeyFilterNodeConfiguration implements NodeConfiguration<TbKeyFilterNodeConfiguration> {
private String key;
@Override
public TbKeyFilterNodeConfiguration defaultConfiguration() {
TbKeyFilterNodeConfiguration configuration = new TbKeyFilterNodeConfiguration();
configuration.setKey(null);
return configuration;
}
}
package org.thingsboard.rule.engine.node.transform;
import lombok.Data;
import org.thingsboard.rule.engine.api.NodeConfiguration;
@Data
public class TbCalculateSumNodeConfiguration implements NodeConfiguration<TbCalculateSumNodeConfiguration> {
private String inputKey;
private String outputKey;
@Override
public TbCalculateSumNodeConfiguration defaultConfiguration() {
TbCalculateSumNodeConfiguration configuration = new TbCalculateSumNodeConfiguration();
configuration.setInputKey("temperature");
configuration.setOutputKey("TemperatureSum");
return configuration;
}
}
package org.thingsboard.rule.engine.node.enrichment;
import lombok.Data;
import org.thingsboard.rule.engine.api.NodeConfiguration;
@Data
public class TbGetSumIntoMetadataConfiguration implements NodeConfiguration<TbGetSumIntoMetadataConfiguration> {
private String inputKey;
private String outputKey;
@Override
public TbGetSumIntoMetadataConfiguration defaultConfiguration() {
TbGetSumIntoMetadataConfiguration configuration = new TbGetSumIntoMetadataConfiguration();
configuration.setInputKey("temperature");
configuration.setOutputKey("TemperatureSum");
return configuration;
}
}
配置类在规则节点类中定义。
在本节中介绍TbNode接口实现每个方法的目的:
void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException;
创建规则节点后对其进行初始化的方法。上述每个规则节点的init方法的内容全部一样。我们将传入的JSON配置转换为特定的NodeConfiguration实现。
private TbKeyFilterNodeConfiguration config;
private String key;
@Override
public void init(TbContext tbContext, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbKeyFilterNodeConfiguration.class);
key = config.getKey();
}
private TbCalculateSumConfiguration config;
private String inputKey;
private String outputKey;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbCalculateSumConfiguration.class);
inputKey = config.getInputKey();
outputKey = config.getOutputKey();
}
private TbGetSumIntoMetadataConfiguration config;
private String inputKey;
private String outputKey;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbGetSumIntoMetadataConfiguration.class);
inputKey = config.getInputKey();
outputKey = config.getOutputKey();
}
仅在创建或更新规则节点后才调用它,并接受两个输入参数:
ctx.getTelemetryService().saveAndNotify(msg.getOriginator(), tsKvEntryList, ttl, new TelemetryNodeCallback(ctx, msg));
private final JsonNode data;
void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException;
处理接收到消息的方法。每当消息到达节点时都会调用它。并且还接受两个输入参数:
TbMsg是最终的序列化类,它使你可以访问消息中的字段,并且还允许你复制消息,将消息转换为ByteBuffer以及进行其他操作。
可用字段:
msg.getData();
msg.getMetaData();
msg.getOriginator();
msg.getType();
msg.getId();
msg.getRuleNodeId();
msg.getRuleChainId();
msg.getClusterPartition();
msg.getDataType();
复制消息:
TbMsg copy = msg.copy(UUIDs.timeBased(), entityId, targetId, DEFAULT_CLUSTER_PARTITION);
**×注意**×在群集模式下使用。
true & false:
ctx.tellNext(msg, mapper.readTree(msg.getData()).has(key) ? "True" : "False");
failure:
ctx.tellFailure(msg, e);
ctx.tellNext(msg, FAILURE, new Exception());
tellSelf和updateSelf方法:
ctx.tellSelf(tickMsg, curDelay);
ctx.updateSelf(ruleNode);
***注意***tellSelf()方法在基于特定延迟的规则节点中使用。每个更新规则节点上使用的updateSelf()方法。
此处, TbContext允许创建新消息:
String data = "{temperature:20, humidity:30}"
ctx.newMsg(msg.getType(), msg.getOriginator(), msg.getMetaData(), data);
并转换消息:
ctx.transformMsg(origMsg, origMsg.getType(), origMsg.getOriginator(), metaData, origMsg.getData());
***注意***这些方法之间的区别在于:
TbMsg的newMsg方法使用新的messageId创建一条新消息;
TbMsg的transformMsg方法转换已存在的消息;
void destroy();
仅在规则节点停止或更新并且没有输入参数之后才调用的此方法。
git clone git@github.com:thingsboard/rule-node-examples.git
从rule-node-examples文件夹执行以下命令以构建项目:
mvn clean install
将jar文件作为依赖库导入到Thingsboard项目中,应该在这里:
./target/rule-engine-1.0.0-custom-nodes.jar
重新启动ThingsBoard服务器端容器。请参考以下链接以查看如何执行相关操作: 服务器端容器运行.
**一旦ThingsBoard重新启动,你需要清除浏览器缓存并刷新网页以重新加载规则节点的用户界面**
sudo mv rule-engine-1.0.0-custom-nodes.jar /usr/share/thingsboard/extensions/
sudo chown thingsboard:thingsboard /usr/share/thingsboard/extensions/*
重启Thingsboard服务:
sudo service thingsboard restart
**一旦ThingsBoard重新启动,你需要清除浏览器缓存并刷新网页以重新加载规则节点的用户界面**
ThingsBoard规则节点UI在官方github仓库. 另一个项目。请参阅以下连接 查看相关说明。
要以热重新部署方式运行Rule Node UI容器:
cd ${TB_WORK_DIR}/ui-ngx/proxy.conf.js
其次你需要在热部署模式下运行UI容器。请参考以下链接以了解如何执行此操作:以热重新部署模式运行UI容器。
接下来你需要将server.js文件中的常量forwardPort从8080更改为3000,应在此处:
cd ${TB_RULE_NODE_UI_WORK_DIR}/ui/server.js
最后一步是从本地目录TB_RULE_NODE_UI_WORK_DIR执行如下命令:
npm start
这会将规则节点UI请求转发到在4200端口上侦听的服务器。
入门指南 - 这些指南提供了ThingsBoard主要功能的快速概述。
安装指南 - 了解如何在各种操作系统上安装ThingsBoard。
设备连接 - 了解如何根据您的连接方式或解决方案连接设备。
数据看板 - 这些指南包含有关如何配置复杂的ThingsBoard仪表板的说明。
数据处理 - 了解如何使用ThingsBoard规则引擎。
数据分析 - 了解如何使用规则引擎执行基本的分析任务。
硬件样品 - 了解如何将各种硬件平台连接到ThingsBoard。
高级功能 - 了解高级ThingsBoard功能。