概述
AWS Kinesis可轻松实时采集、处理和分析视频与数据流,以便您及时获取洞察并快速响应新信息。 将AWS Kinesis与ThingsBoard集成后,可在ThingsBoard IoT平台中处理和可视化AWS Kinesis流中的数据。
继续前请确保了解AWS Kinesis基础知识以及AWS Kinesis streams的一般概念。
AWS Kinesis设置
安装和配置AWS CLI
第一步是获取AWS账户的AWS Access Keys。您的AWS账户访问密钥必须能够创建AWS Kinesis流、向流中写入数据以及从流中读取数据。请前往Managing Access Keys for Your AWS Account Root User创建AWS访问密钥。
此外,请确保您账户的访问密钥有权访问AWS Kinesis、AWS DynamoDB和AWS CloudWatch服务。
创建Access Key后,请记下AWS Access Key ID和AWS Secret Access Key:
- AWS Access Key ID:XXXXXXXXXXXXXXX
- AWS Secret Access Key:YYYYYYYYYYYYYYY
此外,请确保您知道默认的AWS region名称:
- AWS region name:ZZZZZZZ
在后续配置AWS Kinesis ThingsBoard集成时,我们将分别使用AWS_ACCESS_KEY_ID、AWS_SECRET_ACCESS_KEY和AWS_REGION来引用这些信息。
第二步是安装和配置AWS CLI,以便能够通过命令行创建流、向流中写入记录以及从流中读取记录。 前往AWS CLI install and configuration页面,将AWS CLI安装到您的机器上。
安装完成后,请确保您可以在命令行中看到帮助信息:
1
aws kinesis help
应该看到如下输出:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
KINESIS() KINESIS()
NAME
kinesis -
DESCRIPTION
Amazon Kinesis Data Streams is a managed service that scales elasti-
cally for real-time processing of streaming big data.
AVAILABLE COMMANDS
o add-tags-to-stream
o create-stream
o decrease-stream-retention-period
o delete-stream
o deregister-stream-consumer
o describe-limits
o describe-stream
o describe-stream-consumer
o describe-stream-summary
o disable-enhanced-monitoring
....
请按q键关闭帮助信息。
Kinesis流数据格式
Kinesis在流中使用Base64格式存储数据。ThingsBoard AWS Kinesis集成会自动将Base64编码转换为字符负载。
如果您的应用程序以CSV格式发送数据,您将在ThingsBoard转换器中收到相同的CSV负载。如果您的应用程序以JSON字符串发送数据,ThingsBoard转换器将在文本负载中收到JSON字符串。
在本教程中,我们将使用JSON字符串向Kinesis数据流中写入记录,同时也会从ThingsBoard以JSON字符串的形式将数据发送回Kinesis流。 在实际场景中,您可以自行选择用于编码/解码数据的格式。
AWS Kinesis示例流
本示例中我们将使用两个AWS Kinesis流:
- uplink流 - 用于向ThingsBoard传入数据。
- downlink流 - 用于从ThingsBoard传出数据。
使用AWS CLI创建uplink流:
1
aws kinesis create-stream --stream-name tb-test-uplink --shard-count 1
请验证流是否创建成功:
1
aws kinesis describe-stream --stream-name tb-test-uplink
输出应类似如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
{
"StreamDescription": {
"KeyId": null,
"EncryptionType": "NONE",
"StreamStatus": "ACTIVE",
"StreamName": "tb-test-uplink",
"Shards": [
{
"ShardId": "shardId-000000000000",
"HashKeyRange": {
"EndingHashKey": "340282366920938463463374607431768211455",
"StartingHashKey": "0"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49599874927422395224927130964668873176443726009932447746"
}
}
],
"StreamARN": "arn:aws:kinesis:eu-west-1:XXXXXXXXXX:stream/tb-test-uplink",
"EnhancedMonitoring": [
{
"ShardLevelMetrics": []
}
],
"StreamCreationTimestamp": 1569503664.0,
"RetentionPeriodHours": 24
}
}
下一步创建downlink流:
1
aws kinesis create-stream --stream-name tb-test-downlink --shard-count 1
请验证流是否创建成功:
1
aws kinesis describe-stream --stream-name tb-test-downlink
输出应类似如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
{
"StreamDescription": {
"KeyId": null,
"EncryptionType": "NONE",
"StreamStatus": "ACTIVE",
"StreamName": "tb-test-downlink",
"Shards": [
{
"ShardId": "shardId-000000000000",
"HashKeyRange": {
"EndingHashKey": "340282366920938463463374607431768211455",
"StartingHashKey": "0"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49599874927422395224927130964668873176443726009932447746"
}
}
],
"StreamARN": "arn:aws:kinesis:eu-west-1:XXXXXXXXXX:stream/tb-test-downlink",
"EnhancedMonitoring": [
{
"ShardLevelMetrics": []
}
],
"StreamCreationTimestamp": 1569503664.0,
"RetentionPeriodHours": 24
}
}
与ThingsBoard集成
AWS Kinesis端的所有必要步骤已完成,现在可以开始配置ThingsBoard。
ThingsBoard上行数据转换器
首先,我们需要创建上行数据转换器,用于转换从AWS Kinesis接收到的消息。转换器应将传入的负载转换为所需的消息格式。 消息必须包含deviceName和deviceType字段,这些字段用于将数据提交到正确的设备。如果未找到设备,则会自动创建新设备。 以下是来自AWS Kinesis的示例负载:
1
2
3
4
5
{
"devName": "kitchen_thermostat",
"devType": "thermostat",
"temperature": 22
}
我们将把devName映射到deviceName,将devType映射到deviceType。您可以在具体的使用场景中采用其他映射方式。 此外,我们将获取temperature字段的值并将其作为设备遥测数据。
前往Data Converters页面,使用以下函数创建新的uplink转换器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
var data = decodeToJson(payload);
var deviceName = data.devName;
var deviceType = data.devType;
var result = {
deviceName: deviceName,
deviceType: deviceType,
telemetry: {
temperature: data.temperature
}
};
function decodeToString(payload) {
return String.fromCharCode.apply(String, payload);
}
function decodeToJson(payload) {
var str = decodeToString(payload);
var data = JSON.parse(str);
return data;
}
return result;

ThingsBoard下行数据转换器
要从ThingsBoard向Kinesis流发送下行消息,需要定义下行转换器。 通常,下行转换器的输出应具有以下结构:
1
2
3
4
5
6
7
8
{
"contentType": "JSON",
"data": "{\"devName\":\"kitchen_thermostat\",\"version\":\"0.11\"}",
"metadata": {
"streamName": "tb-test-downlink",
"partitionKey": "1234"
}
}
- contentType - 定义数据的编码方式 {TEXT | JSON | BINARY}
- data - 将发送到AWS Kinesis流的实际数据。
- metadata - 在此对象中,您必须填写正确的streamName值和partitionKey,用于在AWS Kinesis中标识正确的流和分区键
前往Data Converters页面,使用以下函数创建新的downlink转换器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
var data = {
devName: "kitchen_thermostat",
version: msg.version
};
var result = {
contentType: "JSON",
data: JSON.stringify(data),
metadata: {
streamName: "tb-test-downlink",
partitionKey: "123",
}
};
return result;
此转换器将从传入消息中获取version字段,并将其作为出站消息的负载字段。此外,我们还会在出站消息中加入设备名称。这样,业务流程中从下行流消费消息的后续应用程序就能够识别该负载属于kitchen_thermostat设备。

AWS Kinesis集成
接下来,我们将在ThingsBoard中创建与AWS Kinesis的集成。打开Integrations部分,添加类型为AWS Kinesis的新集成。
- Name:kinesis_integration
- Type:AWS Kinesis
- Uplink data converter:kinesis_converter
- Downlink data converter:kinesis_downlink_version
- Stream name:tb-test-uplink
- Region:AWS_REGION
- Access Key Id:AWS_ACCESS_KEY_ID
- Access Key:SECRET_ACCESS_KEY
- Use credentials from the Amazon EC2 Instance Metadata Service:false(详情请参阅IAM Roles for Amazon EC2)
- Use Consumers with Enhanced Fan-Out:false(详情请参阅Using Consumers with Enhanced Fan-Out)


验证
验证上行消息
让我们验证集成是否正常工作。首先,向uplink流中写入一条消息,ThingsBoard将获取该消息。 在控制台中输入:
1
aws kinesis put-record --stream-name tb-test-uplink --partition-key 123 --data '{"devName": "kitchen_thermostat", "devType": "thermostat", "temperature": 22}'
前往Device Group -> All -> kitchen_thermostat,可以看到:
- 新设备已在ThingsBoard中注册
- 在Latest Telemetry部分,可以看到最近提交的temperature = 22。

验证下行消息
为了测试下行消息,我们将更新根规则链,使其在设备属性变更时发送下行消息。 打开并编辑Root Rule Chain。添加Integration Downlink动作节点,并使用Attributes Updated关系将其与Message Type Switch节点连接。


保存更改。
前往Device Group -> All -> kitchen_thermostat -> attributes部分。添加名为version、值为v.0.11的Shared attribute。

通过此操作,我们触发了向下行流tb-test-downlink发送的下行消息,该消息应包含设备名称和version字段的值。 接下来使用AWS CLI命令行工具验证我们是否已在下行流中收到消息。 首先,我们需要获取下行流的分片迭代器,以便从流中获取记录:
1
aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name tb-test-downlink
输出中将包含迭代器的ID:
1
2
3
{
"ShardIterator": "AAAAAAAAAAHHpat8cWmSZ1LSNg2oOra2yaRN/75ZhdwXFNZjnznXU1BQVjo9B+IVi3Pk6ltKN3V8Vhihi74MtXvG9rKcL9VRP1xRebo/NVjaMWd5FmAcp1rvs/DOahY+0FVm6UC1fKFwUXmgnsCzxkU9V8G9cFyGU/4Vvpf2PcrGsHG/JtNouCYT4wwdoXZ6FIFTuWdW7/Qs48PAZCkSjluwA7K1pld4"
}
现在,使用上面JSON中的ShardIterator执行get-records命令:
1
aws kinesis get-records --shard-iterator **ShardIterator**
在我们的示例中:
1
aws kinesis get-records --shard-iterator AAAAAAAAAAFQtL3oAo74irn+ccC3vghADqqmh2MH+HKI9qYTi1NP957vDe8KyV6VdQ+I4shIP0HIRRVYyTZs0W9v6jaai9LevlJayMw6TgdPkVIGmV5SYZF8sGWgtd0wJuRqB+6QwCAUHQ52dgT4m+lypNSzJJw4Mo6h+9Wdk5fpwQxu/GlM8J+Uqblnq4EEr17FkWLahikaSZXktfLq5dh23+LEIc22
输出中应收到下行流中JSON格式的记录:
1
2
3
4
5
6
7
8
9
10
11
12
{
"Records": [
{
"Data": "eyJkZXZOYW1lIjoia2l0Y2hlbl90aGVybW9zdGF0IiwidmVyc2lvbiI6InYuMC4xMSJ9",
"PartitionKey": "1234",
"ApproximateArrivalTimestamp": 1569609612.27,
"SequenceNumber": "49599912710236940383450082324919185009278025474345271298"
}
],
"NextShardIterator": "AAAAAAAAAAFQlgSyxBdpKxlRrocJCYT9YDrCi/vxl0sstJgg4CM+pttVsK4AjjQwJ/QJsags5vdpQdopaqk9aKefAUOWobgwHVaZvhI4tdkmHBr45uO0Hq9AxUlKDxfiYbM0qgN33+5SvGxU8gJBUihYFY4ydPWOWdVTf2lOxp0a9X6DFrjsUqwMXR9skLw8/lQkBmHVFBlFURPy+z/AMuYHga5mDch/",
"MillisBehindLatest": 0
}
由于Kinesis使用Base64表示数据,我们需要使用在线Base64转换工具将Base64解码为JSON格式。最终将得到以下JSON负载:
1
2
3
4
{
"devName":"kitchen_thermostat",
"version":"v.0.11"
}
现在,其他应用程序可以监听此下行流,并根据您的业务逻辑做出相应响应。
另请参阅
通过此集成,您还可以配置下行转换器,并使用规则引擎节点触发所需的操作。