产品定价 立即试用
云平台
北美地区
指南 > Tutorials > Basics > Using queues for message reprocessing
入门 文档
API 常见问题
目录

使用队列进行消息重处理

应用场景

假设您的设备正在向ThingsBoard发送温度和湿度数据。我们将使用Rest API调用节点模拟将消息发送到外部服务器。

在本教程中,我们将配置ThingsBoard规则引擎,使用具有“重试失败和超时消息”处理策略的队列。 尽管该场景为虚构示例,您将学习如何使用队列在发生处理失败或超时时允许消息重处理,并在实际应用中运用该知识。

前置条件

我们假定您已完成以下指南并阅读了所列文章:

此外,您需要在环境中至少预置一个设备。

步骤1:创建规则链

image

我们将添加一个 “生成器” 节点,以1秒延迟模拟6条消息。

image

所有消息将被放入名为 “HighPriority” 的队列。该队列使用名为 “RETRY_FAILED_AND_TIMED_OUT” 的消息处理策略 ,即失败或超时的消息将被重新处理。

image

最后,消息将被发送到外部服务器。

image

步骤2:配置外部服务器

假设我们有一台用于接收消息的服务器。我们已在Spring Boot应用中创建了一个简单的Controller。 此外,我们模拟了每第三条消息处理失败。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@RestController
@RequestMapping("/api/v1/test")
@Slf4j
public class Controller {

    private AtomicLong atomicLong = new AtomicLong(0);

    @RequestMapping(value = {"/"}, headers = "Content-Type=application/json", method = {RequestMethod.POST})
    @ResponseStatus(value = HttpStatus.OK)
    public DeferredResult<ResponseEntity> processRequest(@RequestBody JsonNode msg) {
        DeferredResult<ResponseEntity> deferredResult = new DeferredResult<>();

        log.info("Received message: {}", msg);

        long counter = atomicLong.incrementAndGet();
        if (counter % 3 == 0) {
            log.warn("Bad request: {}", msg);
            deferredResult.setResult(new ResponseEntity<>("Bad Request", HttpStatus.BAD_REQUEST));
        } else {
            log.info("Success: {}", msg);
            deferredResult.setResult(new ResponseEntity<>("Ok", HttpStatus.OK));
        }

        return deferredResult;
    }
}

步骤3:验证规则链逻辑

保存规则链并启动外部服务器以验证我们的逻辑是否正确。生成器将开始产生消息:

“Checkpoint” 节点接收了六条消息:

image

我们可以看到,下一个Rest API调用节点 “Send Request” 处理了八条消息。

image

每第三条消息(初始六条中的两条)处理失败。

image

最后两条消息是经过重处理的消息(之前失败的消息)。 这说明我们的逻辑正确无误。

快速总结

下载并导入本教程附带的规则链JSON 文件。 别忘了在生成器节点中填写您的具体设备。

下一步