立即试用 商务报价
专业版
文档 > 规则引擎 > 队列

本页目录

队列

介绍

ThingsBoard平台使用队列来保证消息处理和削峰值并保持系统在极端负载下正常运行,可以查看架构了解更多有关队列的信息。

ThingsBoard支持消息代理/队列提供商(Kafka,RabbitMQ,AWS SQS,Azure Service Bus,Google Pub/Sub)在以后的版本中将添加新的实现,在3.4版本中通过UI简化设置和管理过提升用户体验简单来讲规则引擎在启动时订阅队列并轮询消息默认采用Main主题作为消息的入口并使用Checkpoint将消息路由到另一个主题并自动确认目标主题对应消息。

配置

只有系统管理员用户才能配置队列并有两个配置选项:公共队列独立租户的队列配置了解有关独立租户的详细信息,请阅读租户配置文档。

通用配置

所有消息(如遥测、连接或生命周期事件等)都将推送到默认的Main或其他主题,当禁用独立处理时会将所有租户的消息放在公共主题中(默认)。

优点:这种方法节约成本无需管理额外的虚拟机或容器。

缺点:单个规则引擎服务在所有租户之间共享。

  • 步骤1. 打开队列菜单
  • 步骤2. 设置轮询
  • 步骤3. 创建的自定义队列

注意:可以调整Main队列设置但无法重命名或删除主题本身。

独立租户

请参见租户文档

设置

队列的定义由以下参数和模块组成:

  • name -用于统计和记录;
  • poll-interval - 如果没有新消息到达则两次轮询之间的持续时间(以毫秒为单位);
  • partitions - 队列关联的分区数用于扩展并行处理的消息数;
  • pack-processing-timeout - 处理消费者返回的特定消息包时间间隔(以毫秒为单位);
  • submit-strategy - 定义向规则引擎提交消息的逻辑和顺序,请参阅下面的段落;
  • processing-strategy - 定义消息确认的逻辑,请参阅下面的段落。
提交策略

规则引擎服务不断轮询主题一旦有返回消息它就会创建TbMsgPackProcessingContext对象,有5种策略控制如何提交TbMsgPackProcessingContex消息到规则链:

  • BURST - 所有消息按到达的先后顺序提交到规则链。
  • BATCH - 使用queue.rule-engine.queues[queue index].batch-size配置参数将消息分组切片在确认之前的切片之前不会提交新切片。
  • SEQUENTIAL_BY_ORIGINATOR - 消息在特定实体(消息发起者)内按顺序提交消息;例如:在确认设备A的上一个消息之前不会提交设备A的新消息。
  • SEQUENTIAL_BY_TENANT - 消息在租户(消息发起者的所有者)内按顺序提交消息;例如:在确认租户A的上一个消息之前不会提交租户A的新消息。
  • SEQUENTIAL - 这是一个较慢的消息处理,消息按顺序提交在确认上一个消息之前不会提交新消息。

有关提交策略用例的示例请参阅本指南](/docs/pe/user-guide/rule-engine-2-5/tutorials/queues-for-synchronization/)。

处理策略

有5种策略控制消息失败或超时的重新处理方式:

  • SKIP_ALL_FAILURES - 如果忽略所有故障和超时会导致失败的消息丢失;例如:如果数据库关闭消息将不会保存但会标记为“acknowledged”并从队列中删除,此策略是为了兼容以前的版本和开发及演示已提交到规则链进行处理的超时消息将不会取消。 意味着超时规则引擎仍将尝试进行处理。
  • SKIP_ALL_FAILURES_AND_TIMED_OUT - 如果忽略所有故障和超时会导致超时的消息丢失; 例如:如果数据库关闭消息将不会保存但会标记为“acknowledged”并从队列中删除,已提交到规则链进行处理的超时消息将被取消规则节点不会已取消的消息。但是在取消之前开始处理消息的规则节点不会中断。
  • RETRY_ALL - :重试处理所有消息;如果100条消息中有1条失败策略将重新处理(重新提交到规则引擎)100条消息。
  • RETRY_FAILED - 重试处理所有失败的消息;如果每100条消息中有1条失败策略将仅重新处理(重新提交到规则引擎)1条消息同时超时的消息将不会重新处理。
  • RETRY_TIMED_OUT - 重试处理所有超时消息;如果每100条消息中有1条超时策略将仅重新处理(重新提交到规则引擎)1条消息同时失败的消息将不会被重新处理。
  • RETRY_FAILED_AND_TIMED_OUT - 重试处理所有失败和超时的消息。

所有“RETRY*”策略都支持配置参数:

  • retries - 重试次数0表示无限;
  • failure-percentage - 如果失败或超时少于消息的X百分比则跳过重试;
  • pause-between-retries - 在重试之前在线程中等待时间(以秒为单位);

有关处理策略用例的示例请参阅本指南

默认队列

基于提交和处理策略的不同队列配置了三个默认:Main、HighPriority和SequentialByOriginator,规则引擎处理主题中的消息并可以选择使用“Checkpoint”规则节点将其放入其他主题,默认情况下主题只忽略失败的消息这样做的目的是为了让以前的版本向后兼容,但是你需要自行承担重新配置带来的风险;请注意:如果由于规则节点脚本中的某些故障而有未处理的消息则可能会阻止处理下一条消息,我们已经设计了特定的仪表板监视规则引擎的处理和故障。

HighPriority主题用于传递警报或处理步骤在发生故障的情况下HighPriority主题中的消息会不断进行重新处理直到消息处理成功为止;如果SMTP服务器或外部系统中断规则引擎将重试消息发送直到处理完成为止。

如果确保正确的消息处理顺序SequentialByOriginator主题很重要对来自同一实体的消息将按照到达队列的先后顺序进行处理,在确认相同实体ID的上一条消息之前规则引擎不会向规则链提交新消息。

下一步

  • 入门指南 - 快速学习ThingsBoard相关功能。

  • 安装指南 - 学习如何在各种操作系统上安装ThingsBoard。

  • 连接设备 - 学习如何根据你的连接方式或解决方案连接设备。

  • 可 视 化 - 学习如何配置复杂的ThingsBoard仪表板说明。

  • 数据分析 - 学习如何使用规则引擎执行基本的分析任务。

  • 硬件样品 - 学习如何将各种硬件平台连接到ThingsBoard。

  • 高级功能 - 学习高级ThingsBoard功能。