ThingsBoard文档

ThingsBoard平台使用文档。

规则节点开发指南

概述

在本教程中你将学习如何创建自定义节点:

  • 过滤节点检查入站消息的遥测数据中是否存在key。如果所选密钥存在-通过True链发送消息,否则使用False链。

image

image

  • 富集节点当字段以指定的Input Key开头时,将为每个设备分别计算遥测数据的总和,然后使用Output Key将结果添加到消息元数据中。

image

image

  • 转换节点当字段以指定的Input Key开头时,将为每个设备分别计算遥测数据的总和,然后使用Output Key将总和添加到新的消息payload中。

image

image

先决条件

我们假设你已完成以下指南并查看了以下文章:

定制

为了创建新的规则节点,你应该实现TbNode 接口:

package org.thingsboard.rule.engine.api;

...

public interface TbNode {

    void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException;

    void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException;

    void destroy();

}

并使用以下引用运行时的多值注释来注释你的实现:

org.thingsboard.rule.engine.api.RuleNode 

每个规则节点也可以具有实现NodeConfiguration接口的配置类。

package org.thingsboard.rule.engine.api;

public interface NodeConfiguration<T extends NodeConfiguration> {

    T defaultConfiguration();

}
    package org.thingsboard.rule.engine.node.filter;
    
    import lombok.Data;
    import org.thingsboard.rule.engine.api.NodeConfiguration;
    
    @Data
    public class TbKeyFilterNodeConfiguration implements NodeConfiguration<TbKeyFilterNodeConfiguration> {
    
        private String key;
    
        @Override
        public TbKeyFilterNodeConfiguration defaultConfiguration() {
            TbKeyFilterNodeConfiguration configuration = new TbKeyFilterNodeConfiguration();
            configuration.setKey(null);
            return configuration;
        }
    }
    package org.thingsboard.rule.engine.node.transform;
    
    import lombok.Data;
    import org.thingsboard.rule.engine.api.NodeConfiguration;
    
    
    @Data
    public class TbCalculateSumNodeConfiguration implements NodeConfiguration<TbCalculateSumNodeConfiguration> {
    
        private String inputKey;
        private String outputKey;
    
        @Override
        public TbCalculateSumNodeConfiguration defaultConfiguration() {
            TbCalculateSumNodeConfiguration configuration = new TbCalculateSumNodeConfiguration();
            configuration.setInputKey("temperature");
            configuration.setOutputKey("TemperatureSum");
            return configuration;
        }
    }
    package org.thingsboard.rule.engine.node.enrichment;
    
    import lombok.Data;
    import org.thingsboard.rule.engine.api.NodeConfiguration;
    
    @Data
    public class TbGetSumIntoMetadataConfiguration implements NodeConfiguration<TbGetSumIntoMetadataConfiguration> {
    
        private String inputKey;
        private String outputKey;
    
    
        @Override
        public TbGetSumIntoMetadataConfiguration defaultConfiguration() {
            TbGetSumIntoMetadataConfiguration configuration = new TbGetSumIntoMetadataConfiguration();
            configuration.setInputKey("temperature");
            configuration.setOutputKey("TemperatureSum");
            return configuration;
        }
    }

配置类在规则节点类中定义。

方法流程

在本节中介绍TbNode接口实现每个方法的目的:

init方法

void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException;

创建规则节点后对其进行初始化的方法。上述每个规则节点的init方法的内容全部一样。我们将传入的JSON配置转换为特定的NodeConfiguration实现。

    private TbKeyFilterNodeConfiguration config;
    private String key;
            
    @Override
    public void init(TbContext tbContext, TbNodeConfiguration configuration) throws TbNodeException {
        this.config = TbNodeUtils.convert(configuration, TbKeyFilterNodeConfiguration.class);
        key = config.getKey();
    }
    private TbCalculateSumConfiguration config;
    private String inputKey;
    private String outputKey;
    
    @Override
    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
        this.config = TbNodeUtils.convert(configuration, TbCalculateSumConfiguration.class);
        inputKey = config.getInputKey();
        outputKey = config.getOutputKey();
    }
    private TbGetSumIntoMetadataConfiguration config;
    private String inputKey;
    private String outputKey;
    
    @Override
    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
        this.config = TbNodeUtils.convert(configuration, TbGetSumIntoMetadataConfiguration.class);
        inputKey = config.getInputKey();
        outputKey = config.getOutputKey();
    }

仅在创建或更新规则节点后才调用它,并接受两个输入参数:

  • TbContext是一个接口,可让规则节点访问大多数服务,例如,将遥测保存到数据库,并通过WebSockets通知实体数据更改的所有订阅:
ctx.getTelemetryService().saveAndNotify(msg.getOriginator(), tsKvEntryList, ttl, new TelemetryNodeCallback(ctx, msg));
  • TbNodeConfiguration TbNodeConfiguration是一个简单的类,只有一个在规则节点Web UI上处理的字段:
private final JsonNode data; 

onMsg方法

void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException;

处理接收到消息的方法。每当消息到达节点时都会调用它。并且还接受两个输入参数:

  • TbMsg是最终的序列化类,它使你可以访问消息中的字段,并且还允许你复制消息,将消息转换为ByteBuffer以及进行其他操作。

    可用字段:

          msg.getData();
          msg.getMetaData();
          msg.getOriginator();
          msg.getType();
          msg.getId();
          msg.getRuleNodeId();
          msg.getRuleChainId();
          msg.getClusterPartition();
          msg.getDataType();

复制消息:

        TbMsg copy = msg.copy(UUIDs.timeBased(), entityId, targetId, DEFAULT_CLUSTER_PARTITION);
        
    **×注意**×在群集模式下使用。 
  • TbContext接口还具有在链上路由出站消息的方法,例如

image image

true & false:

        ctx.tellNext(msg, mapper.readTree(msg.getData()).has(key) ? "True" : "False");

failure:

        ctx.tellFailure(msg, e);
        
        ctx.tellNext(msg, FAILURE, new Exception());

tellSelf和updateSelf方法:

        ctx.tellSelf(tickMsg, curDelay);
        
        ctx.updateSelf(ruleNode);
***注意***tellSelf()方法在基于特定延迟的规则节点中使用。每个更新规则节点上使用的updateSelf()方法。

此处, TbContext允许创建新消息:

        String data = "{temperature:20, humidity:30}"
     
        ctx.newMsg(msg.getType(), msg.getOriginator(), msg.getMetaData(), data);

并转换消息:

     ctx.transformMsg(origMsg, origMsg.getType(), origMsg.getOriginator(), metaData, origMsg.getData());
***注意***这些方法之间的区别在于:
    
       TbMsgnewMsg方法使用新的messageId创建一条新消息;
    
       TbMsgtransformMsg方法转换已存在的消息;

destroy方法

void destroy();

仅在规则节点停止或更新并且没有输入参数之后才调用的此方法。

构建

  • 克隆规则节点示例项目:
git clone git@github.com:thingsboard/rule-node-examples.git
  • 从rule-node-examples文件夹执行以下命令以构建项目:

    • 注意 你需要先从Thingsboard文件夹执行此命令 。
mvn clean install

将可执行的jar文件导入到ThingsBoard实例中

将jar文件作为依赖库导入到Thingsboard项目中,应该在这里:

./target/rule-engine-1.0.0-custom-nodes.jar

使用IDE的Thingsboard:

重新启动ThingsBoard服务器端容器。请参考以下链接以查看如何执行相关操作: 服务器端容器运行.

 **一旦ThingsBoard重新启动,你需要清除浏览器缓存并刷新网页以重新加载规则节点的用户界面**

Thingsboard服务:

  • 你需要先执行以下命令将jar文件移动到Thingsboard扩展:
sudo mv rule-engine-1.0.0-custom-nodes.jar /usr/share/thingsboard/extensions/
  • 接下来执行以下操作以将权限更改为Thingsboard:
sudo chown thingsboard:thingsboard /usr/share/thingsboard/extensions/*

重启Thingsboard服务:

sudo service thingsboard restart

**一旦ThingsBoard重新启动,你需要清除浏览器缓存并刷新网页以重新加载规则节点的用户界面**

UI 配置

ThingsBoard规则节点UI在官方github仓库. 另一个项目。请参阅以下连接 查看相关说明。

在热部署模式下运行Rule Node UI容器

要以热重新部署方式运行Rule Node UI容器:

  • 你需要先在server.js文件中将常量ruleNodeUiforwardPort从8080更改为5000,该常量应位于此处:
cd ${TB_WORK_DIR}/ui-ngx/proxy.conf.js
  • 其次你需要在热部署模式下运行UI容器。请参考以下链接以了解如何执行此操作:以热重新部署模式运行UI容器

  • 接下来你需要将server.js文件中的常量forwardPort8080更改为3000,应在此处:

cd ${TB_RULE_NODE_UI_WORK_DIR}/ui/server.js
  • 最后一步是从本地目录TB_RULE_NODE_UI_WORK_DIR执行如下命令:

    npm start
    

这会将规则节点UI请求转发到在4200端口上侦听的服务器。

下一步

  • 入门指南 - 这些指南提供了ThingsBoard主要功能的快速概述。

  • 安装指南 - 了解如何在各种操作系统上安装ThingsBoard。

  • 设备连接 - 了解如何根据您的连接方式或解决方案连接设备。

  • 数据看板 - 这些指南包含有关如何配置复杂的ThingsBoard仪表板的说明。

  • 数据处理 - 了解如何使用ThingsBoard规则引擎。

  • 数据分析 - 了解如何使用规则引擎执行基本的分析任务。

  • 硬件样品 - 了解如何将各种硬件平台连接到ThingsBoard。

  • 高级功能 - 了解高级ThingsBoard功能。