应用场景
假设您的设备正在向ThingsBoard发送温度和湿度数据。我们将使用Rest API调用节点模拟将消息发送到外部服务器。
在本教程中,我们将配置ThingsBoard规则引擎,使用具有“重试失败和超时消息”处理策略的队列。 尽管该场景为虚构示例,您将学习如何使用队列在发生处理失败或超时时允许消息重处理,并在实际应用中运用该知识。
前置条件
我们假定您已完成以下指南并阅读了所列文章:
此外,您需要在环境中至少预置一个设备。
步骤1:创建规则链

我们将添加一个 “生成器” 节点,以1秒延迟模拟6条消息。

所有消息将被放入名为 “HighPriority” 的队列。该队列使用名为 “RETRY_FAILED_AND_TIMED_OUT” 的消息处理策略 ,即失败或超时的消息将被重新处理。

最后,消息将被发送到外部服务器。

步骤2:配置外部服务器
假设我们有一台用于接收消息的服务器。我们已在Spring Boot应用中创建了一个简单的Controller。 此外,我们模拟了每第三条消息处理失败。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@RestController
@RequestMapping("/api/v1/test")
@Slf4j
public class Controller {
private AtomicLong atomicLong = new AtomicLong(0);
@RequestMapping(value = {"/"}, headers = "Content-Type=application/json", method = {RequestMethod.POST})
@ResponseStatus(value = HttpStatus.OK)
public DeferredResult<ResponseEntity> processRequest(@RequestBody JsonNode msg) {
DeferredResult<ResponseEntity> deferredResult = new DeferredResult<>();
log.info("Received message: {}", msg);
long counter = atomicLong.incrementAndGet();
if (counter % 3 == 0) {
log.warn("Bad request: {}", msg);
deferredResult.setResult(new ResponseEntity<>("Bad Request", HttpStatus.BAD_REQUEST));
} else {
log.info("Success: {}", msg);
deferredResult.setResult(new ResponseEntity<>("Ok", HttpStatus.OK));
}
return deferredResult;
}
}
步骤3:验证规则链逻辑
保存规则链并启动外部服务器以验证我们的逻辑是否正确。生成器将开始产生消息:
“Checkpoint” 节点接收了六条消息:

我们可以看到,下一个Rest API调用节点 “Send Request” 处理了八条消息。

每第三条消息(初始六条中的两条)处理失败。

最后两条消息是经过重处理的消息(之前失败的消息)。 这说明我们的逻辑正确无误。
快速总结
下载并导入本教程附带的规则链JSON 文件。 别忘了在生成器节点中填写您的具体设备。