立即试用 商务报价
云服务
文档 > 集成整合 > Kafka

本页目录

Kafka Integration

ThingsBoard PE功能

仅限专业版支持Platform Integrations功能。
基于ThingsBoard云服务专业版平台实例。

Kafka Integration

Apache Kafka — is an open-source distributed software message broker under the Apache foundation. It is written in the Java and Scala programming languages.

Designed as a distributed, horizontally scalable system that provides capacity growth both with an increase in the number and load from the sources, and the number of subscriber systems. Subscribers can be combined into groups. Supports the ability to temporarily store data for subsequent batch processing.

In some scenarios, Kafka can be used instead of a message queue, in cases where there is no stable connection between the device and an instance.

image

Required environment

Before you start setting up the integration, you should already have a prepared Broker Kafka server. This is either a local installation or a cloud solution. If you haven’t installed Kafka Broker yet, there is an example of basic installation of Kafka Broker locally on our site. If you need to use a cloud solution, then you can consider Kafka Confluent, on the basis of which examples will be built in this guide.

Before creating the integration, you need to create an Uplink converter in Data converters. Uplink is necessary in order to convert the incoming data from the device into the required format for displaying them in ThingsBoard. Click on the “plus” and on “Create new converter”. To view the events, enable Debug. In the function decoder field, specify a script to parse and transform data.

Note: While debug mode is very useful for development and troubleshooting, leaving it enabled in production mode can significantly increase the disk space used by the database since all debug data is stored there. After debugging is complete, it is highly recommended turning off debug mode.

Let’s review sample uplink message from Kafka:

1
2
3
4
5
6
7
8
9
{
  "EUI"  : "43T1YH-REE",
  "ts"   : 1638876127000,
  "data"  : "3d1f0059",
  "port" : 10,
  "freq" : 24300,
  "rssi" : -130,
  "serial"  : "230165HRT"
}

EUI is responsible for the name of the device. The “data” is a telemetry concatenation by two characters, where the first value “3d” - temperature, “1f” - humidity, “00” - fan speed, “59” - pressure.

You can use the following code, copy it to the decoder function section:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// Decode an uplink message from a buffer
// payload - array of bytes
// metadata - key/value object
/** Decoder **/
// decode payload to JSON
var payloadJson = decodeToJson(payload);
// Use EUI as unique device name.
var deviceName = payloadJson.EUI;
// Specify the device type. Use one data converter per device type or application.
var deviceType = 'Monitoring-sensor';
// Optionally, add the customer name and device group to automatically create them in ThingsBoard and assign new device to it.
// var customerName = 'customer';
// var groupName = 'thermostat devices';
// Result object with device/asset attributes/telemetry data
var result = {
   deviceName: deviceName,
   deviceType: deviceType,
//   customerName: customerName,
//   groupName: groupName,
   attributes: {},
   telemetry: {
        ts: payloadJson.ts,
        values: {
            Temperature:hexToInt(payloadJson.data.substring(0,2)),
            Humidity:   hexToInt(payloadJson.data.substring(2,4)),
            Fan:        hexToInt(payloadJson.data.substring(4,6)),
            Port:       payloadJson.port,
            Freq:       payloadJson.freq,
            Pressure:   hexToInt(payloadJson.data.substring(6,8)),
            rssi:       payloadJson.rssi,
            serial:     payloadJson.serial
       }
   }
};
/** Helper functions **/
function decodeToString(payload) {
   return String.fromCharCode.apply(String, payload);
}

function decodeToJson(payload) {
   // covert payload to string.
   var str = decodeToString(payload);
   // parse string to JSON
   var data = JSON.parse(str);
   return data;
}

function hexToInt(value) {
    return parseInt('0x' + value.match(/../g).reverse().join(''));
}

return result;

You can change the parameters and decoder code when creating a converter or editing. If the converter has already been created, click the pencil icon to edit it. Copy the sample converter configuration (or use your own configuration) and paste it into the decoder function. Then save the changes by clicking the checkmark icon.

Create Integration

After creating the Uplink converter, it is possible to create an integration. At this stage, you need to set the parameters to establish a connection between ThingsBoard and Kafka Broker. After the connection is established, the integration will be transmitting all received data to the Uplink converter for processing and subsequent transfer to Rule Chain according to the Device profile specified in the Device.

Field 描述
Name The name of your integration.
Type Choose Kafka type.
‘Enable’ Checkbox Enable / Disable Integration.
‘Debug Mode’ Checkbox Enable during integration debugging.
Allow create devices or assets If there was no device in ThingsBoard, the device will be created.
Uplink data converter Select the previously created converter.
Downlink data converter This option is not supported through the integration, More details about Downlink below in the guide.
‘Execute remotely’ Checkbox Activate if you want to execute integration remotely from main ThingsBoard instance. For more information on remote integration follow the link (Remote Integrations).
Group ID Specifies the name of the consumer group to which the Kafka consumer belongs.
Client ID An Kafka consumer identifier in a consumer group.
Topics Topics that ThingsBoard will subscribe to after connecting to the Kafka broker.
Bootstrap servers Host and port pair that is the address of the Kafka broker to which the Kafka client first connects for bootstrapping.
Poll interval Duration in milliseconds between polling of the messages if no new messages arrive.
Auto create topics Set Enable if need topics to be created automatically
Other properties Any other additional properties could be provided for kafka broker connection..
Metadata Metadata is a key-value map with some integration specific fields. For example, you can put device type.

The screenshot shows the basic configuration for establishing a connection between ThingsBoard and Kafka Broker. With these settings, the integration will request updates from the Kafka broker every 5 seconds. And if set a topic does not exist at the broker, it will be created automatically. Note: With debug mode enabled, you can view errors, connection status and other events by opening the Events tab.

Configure kafka integration

So in ThingsBoard instance open the Integration menu, select Add integration or Edit action - Details tab.

Set Name, Choose type and select your Uplink data converter from dropdown menu. And fill other required fields: Topics, Bootstrap server, properties.

In Confluent select the created environment, then open Cluster, Cluster settings.

After, find Bootstrap server URL, it looks like URL_OF_YOUR_BOOTSTRAP_SERVER:9092

You should copy it to integration:

Also, need will be to add several other properties, namely:

KeyValue
ssl.endpoint.identification.algorithm https
sasl.mechanism PLAIN
sasl.jaas.config org.apache.kafka.common.security.plain.PlainLoginModule required username="CLUSTER_API_KEY" password="CLUSTER_API_SECRET";
security.protocol SASL_SSL
  • CLUSTER_API_KEY - your access key from Cluster settings.
  • CLUSTER_API_SECRET - your access secret from Cluster settings.

To generate the required API key and secret for it, in the cluster you must go to the Data Integration menu, select the API Keys submenu, pick Create key and Select the Scope for the API Key. Here you will see the key and secret to it, which should be used in the integration properties.

It remains to create a topic on Confluent. To do this, select the “Topics” menu, select “Create Topics”, set the name to my-topic (It is important that the topics coincide with the specified in the integration. At the next stage, if necessary, you can change the Storage and Message size parameters, and then confirm the creation by the Create with defaults button.

With these settings, the integration will request updates from the Kafka broker every 5 seconds.

You can simulate a message from a device or server using a terminal. To send an uplink message, you need a Kafka endpoint URL from the integration.

1
echo "{\"EUI\":\"43T1YH-REE\",\"ts\":1638876127000,\"data\":\"3d1f0059\",\"port\":10,\"freq\":24300,\"rssi\":-130,\"serial\":\"230165HRT\"}" | /usr/local/kafka/bin/kafka-console-producer.sh --broker-list URL_OF_YOUR_BOOTSTRAP_SERVER:9092 --topic my-topic > /dev/null

Result:

Also, you can check through the terminal what data came to Kafka.

1
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server URL_OF_YOUR_BOOTSTRAP_SERVER:9092 --topic my-topic --from-beginning

You can simulate a message from the Confluent cloud to ThingsBoard, for this use the available Confluent functionality. Navigate to the topics in the cluster, the Messages tab, and select the Produce a new message to this topic.

Result matches all keys, timestamps and values:

To get functionality such as Kafka Producer, you need to use the Kafka Rule Node in which you can specify Bootstrap servers, Topic and other parameters to connect to the Kafka broker, you can find more details in the corresponding guide .

If it is not possible to send commands directly to devices to manage from ThingsBoard, but only through a broker, then in this case you can use the Kafka Downlink Rule Node. Let’s consider a small example with its Node, suppose the data came from the broker and passed the converter and, according to the config of Device Profile, were directed to the custom Rule Chain (“Monitoring-sensor”) and at the end of all processing, we will send a response about success or failure back to the broker ( you can change the response to commands to control your device, etc.)

Сheck whether the message has been transmitted, you can see in the Events tab of Kafka Rule Node with enable Debug Mode:

Note: Using the same broker topic for uplink and downlink connections can lead to data loops.

Next steps

  • 入门指南 - 快速学习ThingsBoard相关功能。

  • 可 视 化 - 学习如何配置复杂的ThingsBoard仪表板说明。

  • 数据处理 - 学习如何使用ThingsBoard规则引擎。

  • 数据分析 - 学习如何使用规则引擎执行基本的分析任务。

  • 硬件样品 - 学习如何将各种硬件平台连接到ThingsBoard。

  • 高级功能 - 学习高级ThingsBoard功能。