产品定价 立即试用
专业版
文档 > 贡献指南 > 规则节点开发
入门
指南 安装 架构 API 常见问题
目录

规则节点开发指南

概述

本教程将介绍如何创建自定义Rule Node并将其添加到ThingsBoard服务器实例。 将涉及三种Rule Node:Filter、Enrichment和Transformation。

前提条件

假设你已完成以下指南并阅读了下列文章:

并已安装以下第三方依赖:

Step 1. 下载并构建示例项目

克隆仓库并进入仓库目录:

1
2
git clone -b release-4.3 https://github.com/thingsboard/rule-node-examples
cd rule-node-examples

默认情况下,示例项目配置为使用ThingsBoard社区版的API。 这使你的Rule Node与平台的Community版和Professional版均兼容。

若需使用Professional Edition专属API(例如操作Entity Groups等), 请在 pom.xml 中修改 “thingsboard.version” 参数:

1
nano pom.xml

例如,以下属性设置为 4.3.0.1PE Professional Edition:

1
2
3
4
5
6
7
...
    <properties>
        ...
        <thingsboard.version>4.3.0.1PE</thingsboard.version>
        ...
    </properties>
...

最后,构建项目:

1
mvn clean install

预期输出:

1
2
3
4
5
6
7
...
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  1.431 s
[INFO] Finished at: 2020-08-18T11:01:40+03:00
[INFO] ------------------------------------------------------------------------

Step 2. 将项目导入IDE

确保已安装 Lombok 插件。将项目以Maven项目形式导入IDE。

Step 3. 创建Rule Node

创建新Rule Node时,需实现 TbNode interface and annotate it with the RuleNode annotation.

例如,可参考一个根据消息payload中key是否存在过滤传入消息的简单Rule Node。 此Rule Node是上一步下载项目的一部分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@RuleNode(
        type = ComponentType.FILTER,
        name = "check key",
        relationTypes = {"True", "False"},
        configClazz = TbKeyFilterNodeConfiguration.class,
        nodeDescription = "Checks the existence of the selected key in the message payload.",
        nodeDetails = "If the selected key exists - send Message via <b>True</b> chain, otherwise <b>False</b> chain is used.",
        uiResources = {"static/rulenode/custom-nodes-config.js"},
        configDirective = "tbFilterNodeCheckKeyConfig")
public class TbKeyFilterNode implements TbNode {

    private static final ObjectMapper mapper = new ObjectMapper();

    private TbKeyFilterNodeConfiguration config;
    private String key;


    @Override
    public void init(TbContext tbContext, TbNodeConfiguration configuration) throws TbNodeException {
        this.config = TbNodeUtils.convert(configuration, TbKeyFilterNodeConfiguration.class);
        key = config.getKey();
    }

    @Override
    public void onMsg(TbContext ctx, TbMsg msg) {
        try {
            ctx.tellNext(msg, mapper.readTree(msg.getData()).has(key) ? "True" : "False");
        } catch (IOException e) {
            ctx.tellFailure(msg, e);
        }
    }

    @Override
    public void destroy() {
    }
}

上述源码需注意以下几点:

@RuleNode注解

@RuleNode注解定义节点类型、名称、描述、UI表单及出站 relations

可用参数说明如下:

  • type:可选 Rule Node 类型 之一,决定Rule Chain Editor中Rule Node所属分组;
  • name:Rule Node名称,用于Rule Chain Editor及调试信息;
  • nodeDescription:节点简要描述,在Rule Chain Editor中展示;
  • nodeDetails:节点完整描述,支持HTML,在Rule Chain Editor中展示;
  • configClazz:描述配置JSON的完整类名;
  • relationTypes:预定义 relation types 字符串数组,应与 TbContext.tellNext 中使用的值一致;
  • customRelations:布尔值,表示是否在 TbContext.tellNext 中使用自定义relations;
  • configDirective:用于编辑Rule Node配置的Angular UI指令名,可选,为空时使用原始JSON编辑器;
  • uiResources:包含配置指令的Angular UI文件路径,可选,为空时使用原始JSON编辑器;
  • icon:angular material包中的图标名;
  • iconUrl:Rule Chain Editor节点列表中显示Rule Node的图标完整URL;
  • docUrl:Rule Node文档页链接,在Rule Chain Editor中可见。

Rule Node生命周期

“init” 方法在创建新Rule Node时由Rule Engine调用, 例如将Rule Node添加到Rule Chain或系统重启时。 此方法主要用于解析JSON配置或获取 TbContext 本地副本。 “TbNodeUtils.convert” 将原始配置解析为指定类的Java对象。

“destroy” 方法在Rule Node销毁时由Rule Engine调用, 例如从Rule Chain中移除Rule Node或系统停止时。

用户修改现有Rule Node配置时,Rule Engine会依次调用 “destroy”“init”

处理传入消息

Rule Node实现必须使用下列方法之一告知Rule Engine消息已成功处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
 * 表示 Rule Node 已成功处理消息。
 * 将消息发送到 Rule Chain 中通过 "Success" relationType
 * 连接到当前 Rule Node 的所有 Rule Node。
 *
 * @param msg
 */
void tellSuccess(TbMsg msg);

/**
 * 将消息发送到 Rule Chain 中通过指定 relationType
 * 连接到当前 Rule Node 的所有 Rule Node。
 *
 * @param msg
 * @param relationType
 */
void tellNext(TbMsg msg, String relationType);

/**
 * 将消息发送到 Rule Chain 中通过指定 relationTypes 之一
 * 连接到当前 Rule Node 的所有 Rule Node。
 *
 * @param msg
 * @param relationTypes
 */
void tellNext(TbMsg msg, Set<String> relationTypes);

若消息处理失败,Rule Node实现必须调用 “tellFailure” 方法:

1
2
3
4
5
6
7
8
9
/**
 * 向 Rule Engine 通知当前消息处理失败。
 *
 * @param msg - 消息
 * @param th  - 异常
 */
void tellFailure(TbMsg msg, Throwable th);

若Rule Node实现未调用上述任何方法,Rule Engine将等待可配置超时,阻塞其他消息处理,并最终将当前消息标记为失败。

使用ThingsBoard服务

TbContext 包含多种有用服务的 “getter”。 请在IDE中勾选 “Download Sources” 以方便浏览这些服务的接口。 可用服务getter简要列表如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 操作实体 attributes:读取和保存;
AttributesService getAttributesService();

// 对 customer 实体执行 CRUD(Create、Read、Update、Delete)操作;
CustomerService getCustomerService();

// 对用户执行 CRUD 操作;
UserService getUserService();

// 对 assets 执行 CRUD 操作;
AssetService getAssetService();

// 对 devices 执行 CRUD 操作;
DeviceService getDeviceService();

// 对 entity views 执行 CRUD 操作;
EntityViewService getEntityViewService();

// 以编程方式创建和管理 dashboards;
DashboardService getDashboardService();

// 创建和清除 alarms;
RuleEngineAlarmService getAlarmService();

// 以编程方式创建和管理 rule chains;
RuleChainService getRuleChainService();

// 向设备发送 RPC 命令;
RuleEngineRpcService getRpcService();

// 将 telemetry 存储到数据库并通过 WebSocket 推送到 dashboards;
RuleEngineTelemetryService getTelemetryService();

// 查找 telemetry 并保存到数据库,不向 dashboards 发送通知;
TimeseriesService getTimeseriesService();

// 以编程方式查询和管理实体 relations;
RelationService getRelationService();

ThingsBoard PE用户可通过 TbContext.getPeContext() 访问额外服务。TbPeContext提供以下服务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 以编程方式创建和管理 integrations;
IntegrationService getIntegrationService();

// 以编程方式创建和管理 entity groups;
EntityGroupService getEntityGroupService();

// 以编程方式创建 reports;
ReportService getReportService();

// 以编程方式管理 blob 实体;
BlobEntityService getBlobEntityService();

// 以编程方式管理 group 权限;
GroupPermissionService getGroupPermissionService();

// 以编程方式管理 roles;
RoleService getRoleService();

// 获取实体 owner(TenantId 或 CustomerId)
EntityId getOwner(TenantId tenantId, EntityId entityId);

// 清除实体 owners 缓存
void clearOwners(EntityId entityId);

// 获取当前实体的所有子 customers
Set<EntityId> getChildOwners(TenantId tenantId, EntityId parentOwnerId);

// 变更实体 owner,targetOwnerId 为 TenantId 或 CustomerId
void changeDashboardOwner(TenantId tenantId, EntityId targetOwnerId, Dashboard dashboard) throws ThingsboardException;

void changeUserOwner(TenantId tenantId, EntityId targetOwnerId, User user) throws ThingsboardException;

void changeCustomerOwner(TenantId tenantId, EntityId targetOwnerId, Customer customer) throws ThingsboardException;

void changeEntityViewOwner(TenantId tenantId, EntityId targetOwnerId, EntityView entityView) throws ThingsboardException;

void changeAssetOwner(TenantId tenantId, EntityId targetOwnerId, Asset asset) throws ThingsboardException;

void changeDeviceOwner(TenantId tenantId, EntityId targetOwnerId, Device device) throws ThingsboardException;

void changeEntityOwner(TenantId tenantId, EntityId targetOwnerId, EntityId entityId, EntityType entityType) throws ThingsboardException;

// 向 integration 推送自定义 downlink 消息
void pushToIntegration(IntegrationId integrationId, TbMsg tbMsg, FutureCallback<Void> callback);

从Rule Node创建新消息

有时需要基于当前消息创建并推送派生消息到Rule Engine。 例如,编写一个将当前customer的消息复制到其所有devices的自定义Rule Node:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
    EntityId msgOriginator = msg.getOriginator();
    // 确认消息来源为 Customer;
    if (EntityType.CUSTOMER.equals(msgOriginator.getEntityType())) {
        CustomerId customerId = new CustomerId(msgOriginator.getId());
        boolean hasNext = true;
        // 创建 page link 以遍历 devices;
        PageLink pageLink = new PageLink(1024);
        while (hasNext) {
            // 使用 Device Service 从数据库获取 devices;
            PageData<Device> devices = ctx.getDeviceService().findDevicesByTenantIdAndCustomerId(ctx.getTenantId(), customerId, pageLink);
            hasNext = devices.hasNext();
            pageLink = pageLink.nextPageLink();
            for (Device device : devices.getData()) {
                // 创建具有不同 originator 的新消息
                TbMsg newMsg = TbMsg.newMsg(msg.getQueueName(), msg.getType(), device.getId(), msg.getMetaData(), msg.getData());
                // 使用 enqueueForTellNext 而非 tellNext 推送到队列,确保消息被持久化;
                ctx.enqueueForTellNext(newMsg, "Success");
            }
        }
        // 不要忘记确认原始消息或调用 ctx.tellSuccess(msg);
        ctx.ack(msg);
    } else {
        ctx.tellFailure(msg, new IllegalArgumentException("Msg originator is not Customer!"));
    }
}

我们使用 TbContext.enqueueForTellNext 将新消息推送到Rule Engine。 消息将根据relation type推送到相关Rule Nodes。另一种方式是将消息置于处理起点,即root rule chain。

1
2
3
void enqueue(TbMsg msg, String queueName, Runnable onSuccess, Consumer<Throwable> onFailure);

也可使用另一方法,以接收新消息成功入队的确认:

1
2
3
void enqueueForTellNext(TbMsg msg, String queueName, String relationType, Runnable onSuccess, Consumer<Throwable> onFailure);

多线程

Rule Engine采用 actor model 实现,对Rule Node邮箱中的每条新消息 依次调用 TbNode.onMsg。 因此,若在同一线程中处理消息,实现是线程安全的。

但出于性能考虑,大部分API调用在单独线程中执行。 例如,以下为保存传入消息中telemetry的正确方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
    try {
        // 解析传入消息;
        ObjectNode json = (ObjectNode) mapper.readTree(msg.getData());
        // 将温度从 °F 转为 °C
        double temperatureF = json.get("temperature").asDouble();
        double temperatureC = (temperatureF - 32) * 5 / 9;
        // 创建 telemetry 数据点
        TsKvEntry tsKvEntry = new BasicTsKvEntry(System.currentTimeMillis(), new DoubleDataEntry("temperature", temperatureC));
        // 使用异步 API 并通过 callback 保存 telemetry
        ctx.getTelemetryService().saveAndNotify(ctx.getTenantId(), msg.getOriginator(), Collections.singletonList(tsKvEntry), new FutureCallback<Void>() {
            @Override
            public void onSuccess(@Nullable Void aVoid) {
                // telemetry 已保存,可确认消息;
                ctx.tellSuccess(msg);
            }

            @Override
            public void onFailure(Throwable throwable) {
                // telemetry 未保存,需由 Rule Engine 重新处理消息;
                ctx.tellFailure(msg, throwable);
            }
        });
    } catch (JsonProcessingException e) {
        ctx.tellFailure(msg, e);
    }
}

注意,我们是在callback线程中通过TbContext.tellSuccess “确认”或”转发”消息,而非主线程。

集群模式

每个rule-engine微服务会启动一个Rule Node实例。例如有3个rule engine实例时,每个都会启动一个RuleNode实例。 Rule Engine messages 按消息originator id(device或asset id)分区。 因此,来自同一device的消息会始终发送到特定rule engine微服务上的同一Rule Node实例。 仅当Rule Nodes被添加或移除时会出现 “repartition” 事件。

作为Rule Node开发者,可重写 TbNode.onPartitionChangeMsg 以响应集群拓扑变化。对根据消息originator(device/asset)id缓存信息的有状态节点很有用。 要判断当前entity id是否属于当前分配的partition列表,可使用 TbContext.isLocalEntity。 完整示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package org.thingsboard.rule.engine.node.filter;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.EmptyNodeConfiguration;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;

import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;


@Slf4j
@RuleNode(
        type = ComponentType.FILTER,
        name = "Cache example",
        relationTypes = {"True", "False"},
        configClazz = EmptyNodeConfiguration.class,
        nodeDescription = "Checks that the incoming value exceeds certain threshold",
        nodeDetails = "If temperature is too high - send Message via <b>True</b> chain, otherwise <b>False</b> chain is used.",
        uiResources = {"static/rulenode/rulenode-core-config.js"},
        configDirective = "tbNodeEmptyConfig")
public class TbCacheExampleNode implements TbNode {

    private static final ObjectMapper mapper = new ObjectMapper();
    private ConcurrentMap<EntityId, Double> cache;

    @Override
    public void init(TbContext tbContext, TbNodeConfiguration configuration) throws TbNodeException {
        this.cache = new ConcurrentHashMap<>();
    }

    @Override
    public void onMsg(TbContext ctx, TbMsg msg) {
        try {
            // 解析传入消息;
            ObjectNode json = (ObjectNode) mapper.readTree(msg.getData());
            double temperature = json.get("temperature").asDouble();
            // 从缓存或数据库获取 temperatureThreshold attribute
            Double temperatureThreshold = getCacheValue(ctx, msg.getOriginator(), "temperatureThreshold", 42);
            // 比较并对比较结果进行处理;
            ctx.tellNext(msg, temperature > temperatureThreshold ? "True" : "False");
        } catch (JsonProcessingException e) {
            ctx.tellFailure(msg, e);
        }
    }

    @Override
    public void onPartitionChangeMsg(TbContext ctx, PartitionChangeMsg msg) {
        // 清除已不再分配给当前服务器 partition 的实体缓存
        cache.entrySet().removeIf(entry -> !ctx.isLocalEntity(entry.getKey()));
    }

    private double getCacheValue(TbContext ctx, EntityId entityId, String attributeKey, double defaultValue) {
        // 从缓存或数据库获取值。
        return cache.computeIfAbsent(entityId, id -> {
            try {
                Optional<AttributeKvEntry> attr = ctx.getAttributesService().find(ctx.getTenantId(), entityId, DataConstants.SERVER_SCOPE, attributeKey).get();
                if (attr.isPresent()) {
                    return attr.get().getDoubleValue().orElse(defaultValue);
                } else {
                    return defaultValue;
                }
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        });
    }


    @Override
    public void destroy() {
        // 若配置已变更,建议清空整个缓存。
        cache.clear();
    }
}

Step 4. 导入自定义Rule Nodes实例

完成Rule Node编码后,再次执行构建命令:

1
mvn clean install

然后,将jar文件作为依赖库放入ThingsBoard项目。构建产物位于:

1
target/rule-engine-1.0.0-custom-nodes.jar

现在可将包含Rule Nodes的jar文件添加到ThingsBoard实例:

  • 首先,执行以下命令将jar文件复制到ThingsBoard extensions:
1
sudo cp rule-engine-1.0.0-custom-nodes.jar /usr/share/thingsboard/extensions/
  • 接着执行以下命令将owner改为ThingsBoard:
1
sudo chown thingsboard:thingsboard /usr/share/thingsboard/extensions/*

重启ThingsBoard服务:

1
sudo service thingsboard restart

ThingsBoard重启后需清除浏览器缓存并刷新页面以重新加载Rule Nodes的UI

Step 5. 在thingsboard.yml中添加自定义包名

注意:若将包名从 org.thingsboard.rule.engine 改为公司包名(如 com.example.rule.engine), 需在 thingsboard.yml 的plugins部分添加该包名:

1
2
3
4
5
# 插件配置参数
plugins:
  # 插件 classpath 扫描时使用的逗号分隔包列表
  scan_packages: "${PLUGINS_SCAN_PACKAGES:org.thingsboard.server.extensions,org.thingsboard.rule.engine,com.example.rule.engine}"

Step 6. 排查Rule Node问题

验证自定义Rule Node的简便方式是创建 generator Rule Node 并连接到自定义Rule Node,以生成可配置的传入消息流。 完成后,为自定义Rule Node开启 debug 以验证节点输出并检查错误。

下一步