产品定价 立即试用
社区版
入门 文档 指南 安装 架构 API 常见问题

保存 time series

将传入消息数据存储为消息发起者的时序数据。

前置条件

节点接受 POST_TELEMETRY_REQUEST 类型的消息,支持以下三种消息数据格式

  1. 键值对:对象中每个属性名表示时序key,对应值为时序值。
    1
    2
    3
    4
    
     {
       "temperature": 42.2,
       "humidity": 70
     }
    
  2. 带时间戳的键值对:对象包含时间戳的 ts 属性和键值对的 values 属性(格式1定义)。
    1
    2
    3
    4
    5
    6
    7
    
     {
       "ts": 1737963587742,
       "values": {
         "temperature": 42.2,
         "humidity": 70
       }
     }
    
  3. 多条带时间戳的键值对:带时间戳的键值对对象数组(格式2定义)。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    
     [
       {
         "ts": 1737963595638,
         "values": {
           "temperature": 42.2,
           "humidity": 70
         }
       },
       {
         "ts": 1737963601607,
         "values": {
           "pressure": 2.56,
           "velocity": 0.553
         }
       }
     ]
    

配置

处理设置

保存时序节点可执行四项操作,每项由可配置的处理策略控制:

  • Time series:将时序数据保存到数据库的 ts_kv 表。
  • Latest values:若新数据较新,则更新数据库 ts_kv_latest 表中的时序数据。
  • WebSockets:通知WebSocket订阅关于时序数据的更新。
  • Calculated fields:通知字段计算关于时序数据的更新。

对每项操作,可从以下处理策略中选择:

  • 每条消息均执行:对每条入站消息执行操作。
  • 去重:将来自同一来源实体的消息按时间间隔分组,仅在每个间隔内的第一条消息上执行操作。 间隔时长由去重间隔设置指定。 系统使用以下公式计算消息所属的去重间隔编号:
    1
    
    long intervalNumber = ts / deduplicationIntervalMillis;
    

    其中:

    • ts 为用于去重的时间戳(毫秒)。
    • deduplicationIntervalMillis 为配置的去重间隔(自动转换为毫秒)。
    • intervalNumber 决定消息所属的逻辑时间桶。

    时间戳 ts 按以下优先级确定:

    1. 若消息元数据中包含 ts 属性(UNIX毫秒),则使用该值。
    2. 否则,使用消息创建时间。

    所有时间戳均为UNIX毫秒(UTC)。

    示例

    60秒去重间隔下:

    • 设备在10:00:15、10:00:45、10:01:10发送消息
    • 前两条消息(10:00:15和10:00:45)落在同一间隔内,仅处理10:00:15的消息
    • 10:01:10的消息落在下一间隔,会被处理
  • 跳过:永不执行操作。
文档信息图标

注意:处理策略自TB 4.0起可用。早期TB版本的“Skip latest persistence”开关对应Latest values的“Skip”策略。

处理策略可通过基础高级处理设置配置。

image

  • Basic processing settings - 为所有操作提供预定义策略:
    • On every message:对所有操作应用每条消息策略。所有消息都会执行所有操作。
    • Deduplicate:对所有操作应用去重策略(指定间隔)。
    • WebSockets only:对Time series和Latest values应用 Skip 策略,对WebSockets应用 每条消息 策略。 实际上不会写入数据库;数据仅通过WebSocket订阅实时可用。

image

  • Advanced processing settings - allow you to configure each action’s processing strategy independently.

image

在高级模式下配置处理策略时,某些组合可能导致意外行为。请考虑以下场景:

  • 跳过数据库存储

    选择禁用一个或多个持久化操作(例如跳过Time series或Latest values的数据库存储,同时保持WebSocket更新启用)会带来仅部分数据可用的风险:

    • 若消息仅用于实时通知(WebSocket)且未存入数据库,历史查询可能与仪表板上的数据不一致。
    • 当Time series和Latest values的处理策略不同步时,遥测数据可能只存入一个表(如Time series),而另一个表(如Latest values)中缺失相同数据。
  • 禁用WebSocket (WS) 更新

    若禁用WS更新,对时序数据的任何更改不会推送到仪表板(或其他WS订阅)。 这意味着即使数据库已更新,仪表板可能不会显示更新后的数据,直到重新加载浏览器页面。

  • 跳过字段计算重新计算

    若遥测数据保存到数据库但绕过了字段计算重新计算,汇总值可能不会更新以反映最新数据。 反之,若字段计算用新数据重新计算但对应的遥测值未持久化到数据库,字段计算的值可能包含未存储的数据。

  • 跨操作的不同去重间隔

    为各操作配置不同的去重间隔时,同一传入消息可能对不同操作进行不同处理。 例如,消息可能立即存入Time series表(若设为 On every message),而因Latest values表的去重间隔未到而不存入。 若WebSocket更新配置了不同间隔,仪表板显示的更新可能与数据库中的存储内容不一致。

  • 去重缓存清理

    去重机制使用内存缓存按间隔跟踪已处理消息。此缓存最多保留100个间隔,最长2天,但因其使用软引用,条目可能随时被清理。 因此,即使负载较轻,去重也不保证。例如,去重间隔为一天且每小时收到一条消息,若在两次到达之间缓存被清理,每条消息仍可能被处理。 去重应视为性能优化,而非每间隔单次处理的严格保证。

  • 整条消息去重

    需注意的是,去重应用于整条传入消息,而非单个时序key。 例如,若第一条消息包含key A并已处理,第二条消息(在去重间隔内收到)包含key B,第二条消息将被跳过——尽管包含新key。 要安全利用去重,请确保消息保持一致结构,使所有必需key出现在同一条消息中,避免意外数据丢失。

鉴于上述场景,独立配置每项持久化操作(包括不同去重间隔)应视为性能优化,而非严格的处理保证。

高级设置

image

  • Use server timestamp - 启用时,当时序数据无显式时间戳时(使用数据格式1),规则节点将使用当前服务器时间。

    在顺序处理场景中,多条消息可能来自不同源且时间戳乱序,使用服务器时间尤为重要。 数据库层有优化:若新记录时间戳早于前一条,会忽略属性和最新值的更新。 因此,为确保所有消息被正确处理,在顺序消息处理场景中应启用此参数。

  • Default TTL - 决定存储数据在数据库中的保留时长。

文档信息图标

注意:TTL为0表示数据永不过期。

JSON Schema

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "title": "TbMsgTimeseriesNodeConfiguration",
  "type": "object",
  "properties": {
    "defaultTTL": {
      "type": "integer",
      "description": "Default time series data time-to-live in seconds"
    },
    "useServerTs": {
      "type": "boolean",
      "description": "Whether to use server timestamp when time series data lacks explicit timestamp"
    },
    "processingSettings": {
      "$ref": "#/$defs/ProcessingSettings"
    }
  },
  "required": [
    "defaultTTL",
    "useServerTs",
    "processingSettings"
  ],
  "additionalProperties": false,
  "$defs": {
    "ProcessingSettings": {
      "description": "Polymorphic processing settings for timeseries data",
      "discriminator": {
        "propertyName": "type"
      },
      "oneOf": [
        {
          "$ref": "#/$defs/OnEveryMessage"
        },
        {
          "$ref": "#/$defs/WebSocketsOnly"
        },
        {
          "$ref": "#/$defs/Deduplicate"
        },
        {
          "$ref": "#/$defs/Advanced"
        }
      ]
    },
    "OnEveryMessage": {
      "type": "object",
      "properties": {
        "type": {
          "const": "ON_EVERY_MESSAGE"
        }
      },
      "required": [
        "type"
      ],
      "additionalProperties": false
    },
    "WebSocketsOnly": {
      "type": "object",
      "properties": {
        "type": {
          "const": "WEBSOCKETS_ONLY"
        }
      },
      "required": [
        "type"
      ],
      "additionalProperties": false
    },
    "Deduplicate": {
      "type": "object",
      "properties": {
        "type": {
          "const": "DEDUPLICATE"
        },
        "deduplicationIntervalSecs": {
          "type": "integer",
          "minimum": 1,
          "maximum": 86400,
          "description": "Deduplication interval in seconds (1 second to 1 day)"
        }
      },
      "required": [
        "type",
        "deduplicationIntervalSecs"
      ],
      "additionalProperties": false
    },
    "Advanced": {
      "type": "object",
      "properties": {
        "type": {
          "const": "ADVANCED"
        },
        "timeseries": {
          "$ref": "#/$defs/ProcessingStrategy"
        },
        "latest": {
          "$ref": "#/$defs/ProcessingStrategy"
        },
        "webSockets": {
          "$ref": "#/$defs/ProcessingStrategy"
        },
        "calculatedFields": {
          "$ref": "#/$defs/ProcessingStrategy"
        }
      },
      "required": [
        "type",
        "timeseries",
        "latest",
        "webSockets",
        "calculatedFields"
      ],
      "additionalProperties": false
    },
    "ProcessingStrategy": {
      "description": "Polymorphic processing strategy",
      "discriminator": {
        "propertyName": "type"
      },
      "oneOf": [
        {
          "$ref": "#/$defs/OnEveryMessageStrategy"
        },
        {
          "$ref": "#/$defs/DeduplicateStrategy"
        },
        {
          "$ref": "#/$defs/SkipStrategy"
        }
      ]
    },
    "OnEveryMessageStrategy": {
      "type": "object",
      "properties": {
        "type": {
          "const": "ON_EVERY_MESSAGE"
        }
      },
      "required": [
        "type"
      ],
      "additionalProperties": false
    },
    "DeduplicateStrategy": {
      "type": "object",
      "properties": {
        "type": {
          "const": "DEDUPLICATE"
        },
        "deduplicationIntervalSecs": {
          "type": "integer",
          "minimum": 1,
          "maximum": 86400,
          "description": "Deduplication interval in seconds (1 second to 1 day)"
        }
      },
      "required": [
        "type",
        "deduplicationIntervalSecs"
      ],
      "additionalProperties": false
    },
    "SkipStrategy": {
      "type": "object",
      "properties": {
        "type": {
          "const": "SKIP"
        }
      },
      "required": [
        "type"
      ],
      "additionalProperties": false
    }
  }
}

消息处理算法

  1. 节点验证消息类型为 POST_TELEMETRY_REQUEST。若非此类型,处理失败,消息路由到 Failure
  2. 对无显式时间戳的时序数据(数据格式1),节点按以下优先级确定时间戳:
    • 若配置中启用了 Use server timestamp,则使用当前服务器时间
    • 若存在,使用消息元数据的 ts 属性(UNIX毫秒)
    • 否则使用消息创建时间戳
  3. 节点按以下优先级计算数据TTL:
    • 若存在,使用消息元数据的 TTL 属性(秒)
    • 若节点配置的 Default TTL 非0,则使用该值
    • 否则使用租户配置的默认存储TTL
  4. 按配置的处理策略将时序数据保存到数据库。
    • 数据保存后,消息路由到 Success
    • 处理过程中发生任何错误时,消息路由到 Failure

输出连接

  • Success
    • 消息已成功处理。
  • Failure
    • 消息类型不是 POST_TELEMETRY_REQUEST
    • 消息数据为空(如 {}[][{}, {}, {}])。
    • 消息处理过程中发生意外错误。

Examples

Example 1 — On every message strategy

Incoming message

Type: POST_TELEMETRY_REQUEST

Originator: DEVICE

Data:

1
2
3
4
5
{
  "temperature": 23.5,
  "humidity": 65.2,
  "pressure": 1013.25
}

Node configuration

1
2
3
4
5
6
7
{
  "defaultTTL": 86400,
  "useServerTs": true,
  "processingSettings": {
    "type": "ON_EVERY_MESSAGE"
  }
}

Outgoing message

The outgoing message is identical to the incoming one. Routed via the Success connection.

Result

Three time series values are saved:

  • temperature: 23.5
  • humidity: 65.2
  • pressure: 1013.25

All values use the current server timestamp and will expire after 24 hours (86400 seconds). Data is saved to both ts_kv and ts_kv_latest tables, WebSocket subscriptions and calculated fields are notified.

Example 2 — Timestamped data with Deduplicate strategy

Incoming message

Type: POST_TELEMETRY_REQUEST

Originator: DEVICE

Data:

1
2
3
4
5
6
7
{
  "ts": 1737963587742,
  "values": {
    "batteryLevel": 85,
    "signalStrength": -65
  }
}

Node configuration

1
2
3
4
5
6
7
8
{
  "defaultTTL": 0,
  "useServerTs": false,
  "processingSettings": {
    "type": "DEDUPLICATE",
    "deduplicationIntervalSecs": 60
  }
}

State of the system

Message for this device that falls within current interval was already processed.

Outgoing message

The outgoing message is identical to the incoming one. Routed via the Success connection.

Result

Since deduplication is enabled with a 60-second interval and the message was already processed, the data is not persisted to the database. WebSocket notifications and calculated fields are also not triggered.

Example 3 — Multiple timestamped entries

Incoming message

Type: POST_TELEMETRY_REQUEST

Originator: DEVICE

Data:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
[
  {
    "ts": 1737963595638,
    "values": {
      "temperature": 22.1,
      "humidity": 60
    }
  },
  {
    "ts": 1737963601607,
    "values": {
      "temperature": 22.3,
      "humidity": 61
    }
  },
  {
    "ts": 1737963607542,
    "values": {
      "temperature": 22.5,
      "humidity": 62
    }
  }
]

Node configuration

1
2
3
4
5
6
7
{
  "defaultTTL": 604800,
  "useServerTs": false,
  "processingSettings": {
    "type": "ON_EVERY_MESSAGE"
  }
}

Outgoing message

The outgoing message is identical to the incoming one. Routed via the Success connection.

Result

Six time series entries are saved (3 timestamps × 2 keys):

  • At timestamp 1737963595638: temperature = 22.1, humidity = 60
  • At timestamp 1737963601607: temperature = 22.3, humidity = 61
  • At timestamp 1737963607542: temperature = 22.5, humidity = 62

All entries will expire after 7 days (604800 seconds).

Example 4 — Advanced processing with mixed strategies

Incoming message

Type: POST_TELEMETRY_REQUEST

Originator: DEVICE

Data:

1
2
3
4
5
{
  "cpuUsage": 45.7,
  "memoryUsage": 78.2,
  "diskUsage": 62.5
}

Node configuration

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
{
  "defaultTTL": 2592000,
  "useServerTs": true,
  "processingSettings": {
    "type": "ADVANCED",
    "timeseries": {
      "type": "DEDUPLICATE",
      "deduplicationIntervalSecs": 300
    },
    "latest": {
      "type": "ON_EVERY_MESSAGE"
    },
    "webSockets": {
      "type": "ON_EVERY_MESSAGE"
    },
    "calculatedFields": {
      "type": "SKIP"
    }
  }
}

State of the system

Message for this device that falls within current deduplication interval was already processed.

Outgoing message

The outgoing message is identical to the incoming one. Routed via the Success connection.

Result

  • Time series data (ts_kv) is NOT saved due to deduplication
  • Latest values (ts_kv_latest) ARE updated with new values
  • WebSocket subscriptions ARE notified
  • Calculated fields are NOT triggered (SKIP strategy)
  • Data has TTL of 30 days (2592000 seconds)

Example 6 — TTL override from message metadata

Incoming message

Type: POST_TELEMETRY_REQUEST

Originator: DEVICE

Data:

1
2
3
4
{
  "waterLevel": 3.45,
  "flowRate": 125.8
}

Message metadata:

1
2
3
{
  "TTL": 3600
}

Node configuration

1
2
3
4
5
6
7
{
  "defaultTTL": 86400,
  "useServerTs": false,
  "processingSettings": {
    "type": "ON_EVERY_MESSAGE"
  }
}

Outgoing message

The outgoing message is identical to the incoming one. Routed via the Success connection.

Result

Time series values are saved with TTL of 3600 seconds (1 hour) from metadata, overriding the node’s default TTL of 86400 seconds.