产品定价 立即试用
IoT网关
文档 > 自定义 > 自定义指南(串行连接器)
入门
安装
目录

自定义IoT Gateway串口连接器

自定义连接器实现

连接器是网关中连接外部系统或直接连接设备的组件。网关内置多种连接器(如MQTT、OPC-UA服务器、Modbus、BLE等)。
连接后,连接器通过轮询或订阅从这些系统获取数据,具体取决于协议能力。

自定义连接器的主要目的是支持以任意协议连接任意设备。连接器使用Python编写。

下面通过示例演示如何创建自定义连接器。
假设我们需要连接器连接设备串口、以字节形式读取数据,且设备也能接收字节形式的命令。
连接器可通过串口向设备推送数据。我们将该连接器命名为SerialConnector
请参阅将SerialConnector添加到网关的逐步指南。
您可基于此示例创建自己的自定义连接器。

注意:网关已包含此连接器,您可以在extensions文件夹中找到它

假设我们的串口设备推送如下所示的UTF-8编码字符串:

1
48\r2430947595\n

其中48是湿度,\r是值之间的分隔符,2430947595是设备序列号,消息之间用\n符号分隔。
为了解析传入数据,我们希望使用两种方式——按字节索引和按分隔符(本示例中两种方式都会使用)。

步骤1.定义SerialConnector配置

首先,我们需要为串口连接器创建配置文件。让我们在config文件夹中创建文件(即tb_gateway.json文件所在的文件夹)。

1
touch custom_serial.json

然后我们需要在文件中为此连接器添加一些配置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
{
"name": "Custom serial connector",
"logLevel": "DEBUG",
"uplinkQueueSize": 100000,
"devices": [
{
"name": "SerialDevice1",
"type": "default",
"port": "/dev/ttyUSB0",
"baudrate": 9600,
"converter": "SerialUplinkConverter",
"downlink_converter": "SerialDownlinkConverter",
"telemetry": [
{
"type": "float",
"key": "humidity",
"untilDelimiter": "\r"
}
],
"attributes": [
{
"key": "SerialNumber",
"type": "string",
"fromByte": 4,
"toByte": -1
}
],
"attributeUpdates": [
{
"attributeOnPlatform": "attr1",
"stringToDevice": "value = ${attr1}\n"
}
],
"serverSideRpc": [
{
"method": "setValue",
"type": "int",
"withResponse": true,
"responseType": "string",
"responseUntilDelimiter": "\r",
"responseTimeoutSec": 5
},
{
"method": "getValue",
"type": "string",
"withResponse": false
}
]
}
]
}

在此文件中,我们编写将在连接器代码中使用的配置。

配置中的参数:

  1. “name” -连接器名称,应与tb_gateway.json文件中的连接器名称一致。网关用它来查找已保存设备的正确连接器。
  2. “logLevel” -连接器的日志级别。(TRACE、DEBUG、INFO、WARNING、ERROR、CRITICAL)
  3. “devices” -设备配置数组(可以提供多个设备。)
  4. “uplinkQueueSize” -上行数据队列大小。(即我们将发送到平台实例的数据。)

在”devices”数组配置文件中,包含设备JSON对象及其配置参数。

设备对象中的参数:

  • “name” -平台上的设备名称。
  • “type” -平台上的设备类型。
  • “port” -设备端口。
  • “baudrate” -连接设备的端口波特率。
    注意:您还可以使用串口配置中的其他参数,如校验位、停止位等。
    您可以在此处了解更多关于串口参数的信息,或在代码中查找(SerialDevice类的__init__方法)
  • “converter” -用作此设备串口连接器上行转换器的转换器类名。
  • “downlink_converter” -用作此设备串口连接器下行转换器的转换器类名。
  • “telemetry” -对象数组,包含处理设备数据的配置,此部分配置处理的数据将被解释为设备遥测数据。
  • “attributes” -对象数组,包含处理设备数据的配置,此部分配置处理的数据将被解释为设备属性。
  • “attributesUpdates” -对象数组,包含处理从平台到设备的属性更新请求的配置。
  • “serverSideRpc” -对象数组,包含处理从平台到设备的RPC请求的配置。

步骤2.定位扩展文件夹

连接器文件应放置在extensions文件夹中,具体位置取决于安装类型:

Docker Compose(默认卷):

1
tb-gw-extensions

守护进程:

1
/var/lib/thingsboard_gateway/extensions

Pip安装:

安装命令 路径 描述
sudo pip3 install thingsboard-gateway /usr/lib/python3/site-packages/thingsboard_gateway/extensions 包安装在系统层,适用于所有用户。
pip3 install thingsboard-gateway /usr/local/lib/python3/dist-packages/thingsboard-gateway 包仅为当前用户安装。

步骤3.定义连接器实现

要创建自定义连接器,我们需要创建一个继承自Connector类的类。
预期将包含自定义连接器的文件(例如”serial_connector.py“)放置在extensions文件夹的”serial“文件夹中。
然后,我们在连接器文件中编写连接器类并重写父类的某些方法。
您可以在此处找到Connector接口的完整方法列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
from queue import Queue
from threading import Event, Thread, Lock
from typing import List, TYPE_CHECKING

import serial.tools
import serial.tools.list_ports
from thingsboard_gateway.tb_utility.tb_utility import TBUtility
from time import monotonic, sleep

try:
    import serial
except ImportError:
    print("pyserial library not found - installing...")
    TBUtility.install_package("pyserial")
    import serial

from thingsboard_gateway.connectors.connector import Connector
from thingsboard_gateway.tb_utility.tb_loader import TBModuleLoader
from thingsboard_gateway.tb_utility.tb_logger import init_logger

if TYPE_CHECKING:
    #  necessary for type checking to avoid circular import
    from thingsboard_gateway.gateway.tb_gateway_service import TBGatewayService


class SerialDevice(Thread):
    """
    Serial device class is used to represent a device that is connected to the serial port.
    It is used to read data from the device and send it to the platform.
    """
    def __init__(self, device_config, uplink_converter, stop_event: Event, logger, uplink_queue):
        super().__init__()
        self.__log = logger
        self.uplink_queue = uplink_queue
        self.daemon = True
        self.stopped = True
        self.__connector_stopped = stop_event
        self.config = device_config
        self.name = self.config.get('deviceName', self.config.get('name', 'SerialDevice'))
        self.type = self.config.get('deviceType', self.config.get('type', 'default'))
        self.uplink_converter = uplink_converter
        self.downlink_converter = None
        self.delimiter = self.config.get('delimiter', '\n')
        self.__rpc_in_progress = Event()
        self.__previous_connect = 0

        self.port = self.config.get('port', '/dev/ttyUSB0')
        self.baudrate = self.config.get('baudrate', 9600)
        self.timeout = self.config.get('timeout', 1)
        self.bytesize = self.config.get('bytesize', serial.EIGHTBITS)
        self.stopbits = self.config.get('stopbits', serial.STOPBITS_ONE)
        self.parity = self.config.get('parity', serial.PARITY_NONE)
        self.dsrdtr = self.config.get('dsrdtr', False)
        self.rtscts = self.config.get('rtscts', False)
        self.xonxoff = self.config.get('xonxoff', False)
        self.write_timeout = self.config.get('writeTimeout', None)
        self.inter_byte_timeout = self.config.get('interByteTimeout', None)
        self.exclusive = self.config.get('exclusive', None)
        self.__serial_lock = Lock()
        self.__serial = None

    def get_serial(self):
        """
        Method to get serial connection to the device.
        If connection is not established, it tries to connect to the device.
        """
        with self.__serial_lock:
            if self.__serial is None or not self.__serial.is_open:
                try:
                    self.__serial = serial.Serial(
                        port=self.port,
                        baudrate=self.baudrate,
                        timeout=self.timeout,
                        bytesize=self.bytesize,
                        stopbits=self.stopbits,
                        parity=self.parity,
                        dsrdtr=self.dsrdtr,
                        rtscts=self.rtscts,
                        xonxoff=self.xonxoff,
                        write_timeout=self.write_timeout,
                        inter_byte_timeout=self.inter_byte_timeout,
                        exclusive=self.exclusive
                    )
                    self.__log.info("Connected to device %s", self.name)
                except Exception as e:
                    self.__log.error("Failed to connect to device %s: %s", self.name, e)
                    self.__serial = None
        return self.__serial

    def run(self):
        """
        Main method to read data from the device and send it to the platform.
        """
        self.__log.info("Device %s started", self.name)
        self.stopped = False
        self.get_serial()
        while not self.__connector_stopped.is_set() and not self.stopped:
            try:
                if not self.__rpc_in_progress.is_set():
                    data_from_device = self.__read_data_from_serial()
                    if data_from_device:
                        try:
                            converted_data = self.uplink_converter.convert(None, data_from_device)
                            self.uplink_queue.put(converted_data)
                        except Exception as e:
                            self.__log.error("Failed to convert data from device %s: %s", self.name, e)
            except Exception as e:
                self.__log.exception("Error in device %s: %s", self.name, e)
                self.stop()
        self.__log.info("Device %s stopped", self.name)

    def handle_rpc_request(self, rpc_method, params):
        """
        Method to process RPC requests from the platform.
        """
        result = {"success": True}
        processed = False
        for rpc_config in self.config.get("serverSideRpc", []):
            if rpc_method == rpc_config.get("method"):
                processed = True
                self.__rpc_in_progress.set()
                try:
                    if self.downlink_converter is not None:
                        converted_data = self.downlink_converter.convert(rpc_config, params)
                        if converted_data:
                            with_response = rpc_config.get("withResponse", False)
                            response_timeout = rpc_config.get("responseTimeoutSec", 5)
                            response = self.write(converted_data,
                                                  with_response=with_response,
                                                  response_timeout=response_timeout)
                            if with_response:
                                response_uplink_config = {}
                                if rpc_config.get("responseType"):
                                    response_uplink_config["type"] = rpc_config.get("responseType")
                                if rpc_config.get("responseFromByte"):
                                    response_uplink_config["fromByte"] = rpc_config.get("responseFromByte")
                                if rpc_config.get("responseToByte"):
                                    response_uplink_config["toByte"] = rpc_config.get("responseToByte")
                                if rpc_config.get("responseUntilDelimiter"):
                                    response_uplink_config["delimiter"] = rpc_config.get("responseUntilDelimiter")
                                if response_uplink_config and response:
                                    result = self.uplink_converter.convert(response_uplink_config, response)
                                else:
                                    result = {"error": "Cannot convert response with config: %r and response: %r" % (
                                        response_uplink_config, response), "success": False}
                        else:
                            result = {"error": "No data to send", "success": False}
                    else:
                        result = {"error": "Downlink converter not defined", "success": False}
                except Exception as e:
                    self.__log.error("Failed to process RPC with method: %r, params: %r, config: %r - Error: %s",
                                     rpc_method, params, rpc_config, e)
                    result = {"error": str(e), "success": False}
                finally:
                    self.__rpc_in_progress.clear()
        if not processed:
            result = {"error": "Method not found", "success": False}
        return result

    def write(self, data, with_response=False, response_timeout=5):
        """
        Method to write data to the device.
        If with_response is set to True, it waits for the response from the device.
        """
        try:
            serial_conn = self.get_serial()
            if serial_conn:
                with self.__serial_lock:
                    serial_conn.write(data)
                    self.__log.debug("Written to device %s: %s", self.name, data)
                if with_response:
                    return self.__read_data_from_serial(response_timeout)
        except Exception as e:
            self.__log.exception("Failed to write to device %s: %s", self.name, e)
        return None

    def __read_data_from_serial(self, timeout=1):
        """
        Method to read data from the device.
        It reads data until the delimiter is found.
        """
        data_from_device = b''
        serial_conn = None
        try:
            serial_conn = self.get_serial()
            previous_timeout = serial_conn.timeout
            if serial_conn:
                while not data_from_device.endswith(self.delimiter.encode('utf-8')):
                    serial_conn.timeout = timeout
                    chunk = serial_conn.read(1)
                    if chunk:
                        data_from_device += chunk
                    if self.__connector_stopped.is_set() or not chunk or self.stopped:
                        break
        except Exception as e:
            self.__log.exception("Failed to read from device %s: %s", self.name, e)
        finally:
            if serial_conn:
                serial_conn.timeout = previous_timeout
        return data_from_device

    def stop(self):
        self.stopped = True
        with self.__serial_lock:
            if self.__serial:
                self.__serial.close()
                self.__serial = None

    def is_connected_reconnect_if_needed(self):
        """
        Method to check if the device is connected.
        If the device is not connected, it tries to reconnect.
        """
        if self.__serial is None or not self.__serial.isOpen():
            if monotonic() - self.__previous_connect > 1:
                self.__previous_connect = monotonic()
                self.__log.info("Reconnecting to device %s", self.name)
                self.get_serial()
                return self.__serial is None or not self.__serial.isOpen()
        else:
            return True


class SerialConnector(Thread, Connector):
    """
    Serial connector class is used to represent a serial connector.
    It is used to manage devices connected to the serial ports.
    """
    def __init__(self, gateway: 'TBGatewayService', config, connector_type):
        super().__init__()
        self._connector_type = connector_type  # required to have for get connector type method
        self.__config = config  # required to have for get config method
        self.__id = self.__config["id"]  # required to have for get id method
        self.__gateway = gateway  # required to have for send data to storage method or to use other gateway methods
        self.name = self.__config["name"]  # required to have for get name method
        self.__connected = False  # required to have for is connected method
        self.__uplink_queue = Queue(self.__config.get('uplinkQueueSize', 100000))
        self._log = init_logger(self.__gateway, self.name, level=self.__config.get('logLevel'),
                                enable_remote_logging=self.__config.get('enableRemoteLogging', False),
                                is_connector_logger=True)
        self._converter_log = init_logger(self.__gateway, self.name, level=self.__config.get('logLevel'),
                                          enable_remote_logging=self.__config.get('enableRemoteLogging', False),
                                          is_converter_logger=True)
        self._log.info("Starting %s connector", self.get_name())
        self.daemon = True
        self.stopped = Event()
        self.stopped.set()
        self.__devices: List[SerialDevice] = []
        self._log.info('Connector %s initialization success.', self.get_name())

    def __start_devices(self):
        failed_to_connect_devices = len(self.__devices)
        for device in self.__devices:
            try:
                device.start()
                failed_to_connect_devices -= 1
            except Exception as e:
                self._log.exception("Failed to start device %s, error: %s", device.name, e)
        self.__connected = failed_to_connect_devices == 0

    def open(self):
        """
        Service method to start the connector.
        """
        self.stopped.clear()
        self.start()

    def get_name(self):
        return self.name

    def get_type(self):
        return self._connector_type

    def is_connected(self):
        return self.__connected

    def is_stopped(self):
        return self.stopped.is_set()

    def get_config(self):
        return self.__config

    def get_id(self):
        return self.__id

    def __load_devices(self):
        """
        Method to create devices objects using configuration file and create converters for them.
        """
        devices_config = self.__config.get('devices')
        try:
            if devices_config is not None:
                for device_config in devices_config:
                    device = None
                    uplink_converter_class_name = device_config.get('converter', device_config.get('uplink_converter'))
                    if uplink_converter_class_name is not None:
                        converter_class = TBModuleLoader.import_module(self._connector_type,
                                                                       uplink_converter_class_name)
                        uplink_converter = converter_class(device_config, self._log)
                        device = SerialDevice(device_config, uplink_converter, self.stopped,
                                              self._log, self.__uplink_queue)
                    else:
                        self._log.error('Converter configuration for the connector %s -- \
                            not found, please check your configuration file.', self.get_name())
                    if device_config.get('downlink_converter') is not None:
                        downlink_converter_class = TBModuleLoader.import_module(self._connector_type,
                                                                                device_config.get('downlink_converter'))
                        if device is not None:
                            device.downlink_converter = downlink_converter_class(device_config, self._converter_log)
                    if device is not None:
                        self.__devices.append(device)
            else:
                self._log.error('Section "devices" in the configuration not found. \
                    A connector %s has being stopped.', self.get_name())
                self.close()
        except Exception as e:
            self._log.error('Failed to load devices, error: %s', e)

    def run(self):
        """
        Main method to manage devices connected to the serial ports and process data from them.
        """
        try:
            self.__load_devices()
            self.__start_devices()
            self._log.info("Devices in configuration file found: %s ",
                           '\n'.join(device.name for device in self.__devices))
            while not self.stopped.is_set():
                try:
                    connected_devices = len(self.__devices)
                    for device in self.__devices:
                        if not device.stopped and not device.is_connected_reconnect_if_needed():
                            connected_devices -= 1
                            self._log.error("Device %s is not connected", device.name)
                            device.stop()
                            device.join()
                            device = SerialDevice(device.config, device.uplink_converter, self.stopped,
                                                  self._log, self.__uplink_queue)
                            device.start()
                    self.__connected = connected_devices == len(self.__devices)
                    if not self.__uplink_queue.empty():
                        data = self.__uplink_queue.get()
                        self.__gateway.send_to_storage(self.name, self.__id, data)
                    else:
                        sleep(0.05)
                except Exception as e:
                    self._log.error("Failed to process data from device %s, error: %s", self.name, e)
        except Exception as e:
            self._log.error("Failed to process data from device %s, error: %s", self.name, e)

    def close(self):
        """
        Service method to stop the connector and all devices connected to it.
        """
        self.stopped.set()
        for device in self.__devices:
            self.__gateway.del_device(device.name)
            device.stop()
        self._log.stop()

    def on_attributes_update(self, content):
        """
        Callback method to process attribute updates from the platform.
        """
        self._log.debug("Received attribute update: %s", content)
        device_name = content.get("device")
        if device_name is not None:
            for device in self.__devices:
                if device_name == device.name:
                    request_config = device.config.get("attributeUpdates")
                    if request_config is not None:
                        attribute_config_found = False
                        for attribute_config in request_config:
                            attribute = attribute_config.get("attributeOnPlatform")
                            if attribute is not None and attribute in content["data"]:
                                attribute_config_found = True
                                try:
                                    value = content["data"][attribute]
                                    str_to_send = str(attribute_config["stringToDevice"]
                                                      .replace("${" + attribute + "}", str(value))
                                                      .replace("${deviceName}", device_name)
                                                      .replace("${deviceType}", device.type)
                                                      ).encode("UTF-8")
                                    device.write(str_to_send)
                                except Exception as e:
                                    self._log.error("Failed to send attribute update to device %s: %s",
                                                    device_name, e)
                        if not attribute_config_found:
                            self._log.error("Attribute update configuration for key %s for device %s not found",
                                            list(content['data'].keys())[0], device_name)
                    else:
                        self._log.error("Attribute update configuration for device %s not found", device_name)
        else:
            self._log.error("Device name is not provided in the attribute update request: %s", content)

    def server_side_rpc_handler(self, content):
        """
        Callback method to process RPC requests from the platform.
        """
        self._log.debug("Received RPC request: %s", content)
        device_name = content.get("device")
        rpc_data = content.get("data", {})
        rpc_method = rpc_data.get("method")
        req_id = rpc_data.get("id")
        params = rpc_data.get("params")
        if device_name is not None:
            for device in self.__devices:
                if device_name == device.name:
                    result = device.handle_rpc_request(rpc_method, params)
                    if "error" in result:
                        self._log.error("Failed to process RPC request for device %s, error: %s",
                                        device_name, result["error"])
                    if result is not None:
                        self.__gateway.send_rpc_reply(device=device_name,
                                                      req_id=req_id,
                                                      content=result,
                                                      wait_for_publish=True,
                                                      quality_of_service=1)
        else:
            self._log.error("Device name is not provided in the RPC request: %s", content)

步骤4.创建上行转换器

上行转换器的目的是将设备数据转换为平台期望的格式。
上行转换器应位于我们在步骤2中使用的同一扩展文件夹中(在我们的例子中是”extensions“中的”serial“文件夹)。
我们的上行转换器文件名为”uplink_serial_converter.py“。它应包含一个继承自Converter类的类,并重写convert方法。
您可以在此处找到Converter接口的完整方法列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
from typing import Any, Tuple
from simplejson import loads

from thingsboard_gateway.connectors.converter import Converter
from thingsboard_gateway.gateway.constants import REPORT_STRATEGY_PARAMETER, TELEMETRY_PARAMETER, TIMESERIES_PARAMETER
from thingsboard_gateway.gateway.entities.converted_data import ConvertedData
from thingsboard_gateway.gateway.entities.datapoint_key import DatapointKey
from thingsboard_gateway.gateway.entities.report_strategy_config import ReportStrategyConfig
from thingsboard_gateway.gateway.entities.telemetry_entry import TelemetryEntry
from thingsboard_gateway.tb_utility.tb_utility import TBUtility


class SerialUplinkConverter(Converter):
    """
    Uplink converter is used to convert incoming data to the format that platform expects.
    Such as we create uplink converter for each configured device,
    this converter is used to convert incoming data from only one device.
    Because config, that we passed to init method, is device specific.
    If your connector can handle multiple devices, you can create one converter for all devices.
    """

    def __init__(self, config, logger):
        self._log = logger
        self.__config = config
        self.__device_report_strategy = None
        self.__device_name = self.__config.get('deviceName', self.__config.get('name', 'SerialDevice'))
        self.__device_type = self.__config.get('deviceType', self.__config.get('type', 'default'))
        try:
            self.__device_report_strategy = ReportStrategyConfig(self.__config.get(REPORT_STRATEGY_PARAMETER))
        except ValueError as e:
            self._log.trace("Report strategy config is not specified for device %s: %s", self.__device_name, e)

    def convert(self, config, data: bytes):
        """Converts incoming data to the format that platform expects. Config is specified only for RPC responses."""
        self._log.debug("Data to convert: %s", data)
        if config is not None:
            converted_data = {"result": self.__convert_value_to_type(data, config)}
            return converted_data
        else:
            converted_data = ConvertedData(self.__device_name, self.__device_type)
            for datapoint_config in self.__config.get(TIMESERIES_PARAMETER, self.__config.get(TELEMETRY_PARAMETER, [])):
                try:
                    telemetry_entry = self.__convert_telemetry_datapoint(data, datapoint_config)
                    if telemetry_entry:
                        converted_data.add_to_telemetry(telemetry_entry)
                except Exception as e:
                    self._log.error("Error converting telemetry datapoint: %s", e)
            for datapoint_config in self.__config.get('attributes', []):
                try:
                    attribute_data = self.__convert_attributes_datapoint(data, datapoint_config)
                    if attribute_data:
                        converted_data.add_to_attributes(*attribute_data)
                except Exception as e:
                    self._log.error("Error converting attribute datapoint: %s", e)
            self._log.debug("Converted data: %s", converted_data)
        return converted_data

    def __convert_telemetry_datapoint(self, data, dp_config) -> TelemetryEntry:
        key = dp_config.get('key')
        datapoint_key = self.__convert_datapoint_key(key, dp_config, self.__device_report_strategy, self._log)
        value = self.__convert_value_to_type(data, dp_config)
        if not datapoint_key or not value:
            self._log.trace("Datapoint %s - not found in incoming data: %s", key, data.hex())
            return None
        return TelemetryEntry({datapoint_key: value})

    def __convert_attributes_datapoint(self, data, dp_config) -> Tuple[DatapointKey, Any]:
        key = dp_config.get('key')
        datapoint_key = self.__convert_datapoint_key(key, dp_config, self.__device_report_strategy, self._log)
        value = self.__convert_value_to_type(data, dp_config)
        if not datapoint_key or not value:
            self._log.trace("Datapoint %s - not found in incoming data: %s", key, data.hex())
            return None
        return (datapoint_key, value)

    @staticmethod
    def __convert_value_to_type(data, dp_config):
        type_ = dp_config.get('type')
        data_for_conversion = data
        if dp_config.get("untilDelimiter") or dp_config.get("fromDelimiter"):
            fromDelimiter = dp_config.get("fromDelimiter")
            untilDelimiter = dp_config.get("untilDelimiter")
            fromDelimiterPosition = data_for_conversion.find(
                fromDelimiter.encode('UTF-8')) if fromDelimiter else 0
            untilDelimiterPosition = data_for_conversion.find(
                untilDelimiter.encode('UTF-8')) if untilDelimiter else -1
            if fromDelimiterPosition != -1 \
                    and untilDelimiterPosition != -1 \
                    and fromDelimiterPosition < untilDelimiterPosition:
                data_for_conversion = data_for_conversion[fromDelimiterPosition:untilDelimiterPosition]
            elif fromDelimiterPosition != -1 and fromDelimiterPosition < len(data_for_conversion):
                data_for_conversion = data_for_conversion[fromDelimiterPosition:]
            elif untilDelimiterPosition != -1 and untilDelimiterPosition < len(data_for_conversion):
                data_for_conversion = data_for_conversion[:untilDelimiterPosition]
        elif dp_config.get("fromByte") or dp_config.get("toByte"):
            if dp_config.get("fromByte") and dp_config.get("toByte") \
                    and dp_config["fromByte"] < dp_config["toByte"] \
                    and len(data_for_conversion) > dp_config["toByte"]:
                data_for_conversion = data_for_conversion[dp_config["toByte"]:dp_config["fromByte"]]
            else:
                if dp_config.get("fromByte") and len(data_for_conversion) > dp_config.get("fromByte", 0):
                    data_for_conversion = data_for_conversion[dp_config["fromByte"]:]
                if dp_config.get("toByte") and \
                        (len(data_for_conversion) > dp_config.get("toByte", 0) or dp_config["toByte"] == -1):
                    data_for_conversion = data_for_conversion[:dp_config["toByte"]]

        if type_ == 'string':
            value = data_for_conversion.decode('UTF-8').strip()
        elif type_ == 'json':
            value = loads(data_for_conversion.decode('UTF-8'))
        elif type_ == 'int':
            value = int(data_for_conversion)
        elif type_ == 'float' or type_ == 'double':
            value = float(data_for_conversion)
        elif type_ == 'bool':
            try:
                value = bool(int(data_for_conversion))
            except ValueError:
                return data_for_conversion.decode('UTF-8').strip().lower() == 'true'
        else:
            value = data_for_conversion.hex()
        return value

    @staticmethod
    def __convert_datapoint_key(key, dp_config, device_report_strategy, logger):
        return TBUtility.convert_key_to_datapoint_key(key, device_report_strategy, dp_config, logger)

处理48\r2430947595\n后,我们获得包含以下数据的ConvertedData对象:

设备名称:”SerialDevice1”
设备类型:”default”
遥测数据:[{“humidity”: 48}]
属性:[{“SerialNumber”: “2430947595”}]

此数据将传递给网关的send_to_storage方法,经过存储后将发送到平台。

步骤5.创建下行转换器

下行转换器的目的是将平台数据转换为设备格式。
下行转换器应位于我们在步骤2中使用的同一扩展文件夹中(在我们的例子中是”extensions“中的”serial“文件夹)。
我们的下行转换器文件名为”downlink_serial_converter.py“。它应包含一个继承自Converter类的类,并重写convert方法。
您可以在此处找到Converter接口的完整方法列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from math import ceil
from struct import pack, unpack

from thingsboard_gateway.connectors.converter import Converter


class SerialDownlinkConverter(Converter):
    """
    Downlink converter is used to convert data that can be sent to device.
    Such as we create downlink converter for each configured device,
    this converter is used to convert data that can be sent to only one device.
    Because config, that we passed to init method, is device specific.
    If your connector can handle multiple devices, you can create one converter for all devices.
    """

    def __init__(self, config, logger):
        self._log = logger
        self.__config = config

    def convert(self, config, data) -> bytes:
        """Method to convert data that can be send to serial port."""
        self._log.debug("Data to convert: %s", data)
        byteorder = self.__config.get('byteorder', 'big').lower()
        if data is None:
            return None
        type_ = config.get("type")
        if type_ == "int":
            length = ceil(data.bit_length() / 8)
            return data.to_bytes(length, byteorder=byteorder)
        elif type_ == "float" or type_ == "double":
            fmt_single_precision = ('>' if byteorder == 'big' else '<') + 'f'
            single_precision_bytes = pack(fmt_single_precision, data)
            if unpack(fmt_single_precision, single_precision_bytes)[0] == data:
                return single_precision_bytes
            fmt_double_precision = ('>' if byteorder == 'big' else '<') + 'd'
            return pack(fmt_double_precision, data)
        return data.encode("UTF-8")

步骤5.将连接器添加到网关主配置文件

要将串口连接器添加到网关,我们需要在tb_gateway.json文件的connectors部分添加以下内容。
注意:如果使用远程配置功能,在网关版3.7.0之前的版本中,您应在配置中使用”custom”而不是”serial”作为文件夹和type字段。

1
2
3
4
5
6
{
  "name": "Serial Connector",
  "type": "serial",
  "configuration": "custom_serial.json",
  "class": "SerialConnector"
}

其中:
name -连接器名称
type - extensions中包含连接器文件的文件夹名称
configuration - tb_gateway.json文件所在文件夹中的连接器配置文件
class - extensions中连接器文件里的连接器类名

步骤6.运行IoT网关

要运行网关,您应执行以下命令,具体取决于安装类型:

-如果您将IoT网关安装为守护进程,应使用以下命令重启以应用配置更改:

1
sudo systemctl restart thingsboard-gateway

-如果您将IoT网关安装为Python模块,应从tb_gateway.json所在文件夹运行(或更改tb_gateway.json文件的路径),使用以下命令应用配置更改:

1
sudo python3 -c 'from thingsboard_gateway.gateway.tb_gateway_service import TBGatewayService; TBGatewayService("./tb_gateway.json")'

您可以通过查看您在logs.json文件中指定的文件夹中的日志来检查IoT网关的状态。
根据安装选项,日志文件夹的默认位置:
Docker Compose - “tb-gw-logs”卷
Daemon - “/var/log/thingsboard-gateway/”
Python模块(pip)- “./logs/”

步骤6.在ThingsBoard实例上检查结果

要检查结果,您应连接设备,然后进入ThingsBoard UI的”设备”选项卡。
如果设备正确连接并已发送一些数据,您将看到名为”SerialDevice1”的设备。
要检查数据,请打开设备并进入遥测数据选项卡。
您应能看到配置中的遥测数据(humidity),值为48(示例中的值,您的值可能不同)。

自定义连接器方法参考

自定义连接器的必需方法,可能被网关调用:
__init__ –创建对象时调用(在示例中用于加载转换器、将配置数据保存到对象变量和创建串口对象)。
open –用于启动连接器。
get_name –用于获取连接器名称。
get_type –用于获取连接器类型,在我们的例子中是serial
is_connected –用于检查与设备的连接状态。
is_stopped –用于检查连接器的状态。
get_config –用于获取连接器配置,预期返回传递给构造函数的配置数据字典。
get_id –用于获取连接器ID,预期返回传递给构造函数的配置中的ID字符串。
run –线程的主方法,必须包含无限循环和所有数据接收/处理函数的调用。
close –网关停止时调用的方法,应包含关闭连接/端口等的处理。
on_attributes_update –网关从平台接收到设备的属性更新时调用。
server_side_rpc_handler –网关从平台接收到设备和连接器的RPC请求时调用。

重写的Connector类方法

__init__方法

参数:

1
def __init__(self, gateway, config, connector_type):

self –当前对象
gateway –网关对象(将用于保存数据)
config –包含连接器配置文件数据的字典
connector_type –连接器类型(用于从全局配置中加载此连接器类型的转换器)

注意:配置包含生成的ID、名称和配置文件中的其他数据。
在上面的示例中,我们使用此方法来初始化我们将要处理的数据。

open方法
1
def open(self):

self –当前对象

此方法由核心用于启动连接器。在上面的示例中,我们使用此方法启动连接器主循环的线程。

get_name方法
1
def get_name(self):    # Function used for logging, sending data and statistic

self –当前对象

获取连接器名称的方法。

get_type方法
1
def get_type(self):    # Function used for logging, sending data and statistic

self –当前对象

获取连接器类型的方法。

is_connected方法
1
def is_connected(self):    # Function for checking connection state

self –当前对象

检查当前连接状态的方法。

is_stopped方法
1
def is_stopped(self):    # Function for checking state of connector

self –当前对象

检查连接器当前状态的方法。

get_config方法
1
def get_config(self):    # Function for getting configuration of connector

self –当前对象

获取连接器配置的方法。

get_id方法
1
def get_id(self):    # Function for getting id of connector

self –当前对象

获取连接器ID的方法。ID由网关生成并作为配置的一部分传递,键为”id”。

run方法

来自threading模块的方法,在网关初始化后调用。

参数:

1
def run(self):

self –当前对象。

在示例中,我们使用此方法连接设备、从设备读取数据并运行转换器。

close方法
1
def close(self):

self –当前对象。

此方法由核心用于停止连接器。在示例中,我们使用此方法停止所有设备并关闭所有连接。

on_attributes_update方法

当网关从平台接收到属性更新时,核心会调用此方法。

1
def on_attributes_update_method(self, content):

self –当前对象。
content –来自平台的数据字典。

内容示例:

1
{"device": "SerialDevice1", "data": {"attr1": 25}}

如果attributesUpdates部分的配置如下,连接器将向设备发送字符串”value = 25\n”。

1
2
3
4
5
6
      "attributeUpdates": [
        {
          "attributeOnThingsBoard": "attr1",
          "stringToDevice": "value = ${attr1}\n"
        }
      ]
server_side_rpc_handler

当网关从平台接收到RPC请求时,核心会调用此方法。

1
def server_side_rpc_handler(self, content):

self –当前对象。
content –来自平台的数据字典。

内容示例:

1
{"device": "SerialDevice1", "data": {"id": 1, "method": "toggle_gpio", "params": {"pin":1}}}

RPC请求处理有两种类型——有响应和无响应。

处理请求后,您只需使用以下网关方法:

1
self.__gateway.send_rpc_reply(device, req_id, content, wait_for_publish, quality_of_service)

其中:
device -设备名称字符串。
req_id -来自平台的RPC请求ID。
contentsuccess -取决于RPC类型:
-如果无响应: success = True -如果有响应,content应为任意字典,包含您希望作为响应发送到平台的内容。 wait_for_publish -(可选)布尔值,如果为True,当服务质量 >= 1时,网关将等待平台的MQTT消息确认。
quality_of_service -(可选)整数,发送到平台的MQTT消息的服务质量。

其他内部串口连接器方法

__load_devices方法

使用配置文件创建设备对象并为其创建转换器的方法。

1
def __load_devices(self):

self –当前对象。

在上面的示例中,我们使用此方法创建SerialDevice对象并为其加载转换器。
它允许我们轻松管理设备、其连接和数据处理。

__start_devices方法

启动设备的方法。

1
def __start_devices(self):

self –当前对象。

在上面的示例中,我们使用此方法启动在__load_devices方法中创建的所有设备。
管理连接器与设备连接的初始状态。

自定义转换器方法参考

您应实现以下方法:
__init__ –创建对象时调用。
convert –用于将数据从设备格式转换为平台格式或反之亦然的方法,取决于转换器类型。

__init__方法

参数:

1
def __init__(self, config, logger):

self –当前对象。
config –包含连接器配置文件数据的字典。
logger –日志记录器对象,可用于日志记录,由连接器创建。

在上面的示例中,我们使用此方法加载配置数据并创建用于日志记录的日志记录器对象。

convert方法

此方法用于将数据从设备格式转换为平台格式或反之亦然,取决于转换器类型。

参数:

1
def convert(self, config, data):

self –当前对象。
config –连接器配置文件中此设备的配置部分。
data –来自设备或平台的需要转换的数据。

上行转换器

此方法应将数据从设备格式转换为平台格式。
建议返回ConvertedData对象,包含设备名称、设备类型、遥测和属性数据。

上行转换器应返回的数据示例:
ConvertedData对象包含以下数据:
设备名称:”SerialDevice1”
设备类型:”default”
遥测数据:[{“humidity”: 48}]
属性:{“SerialNumber”: “2430947595”}

下行转换器

此方法应将数据从平台格式转换为设备格式。
在当前情况下,它应返回包含应发送到设备的数据的bytes对象。

串口连接器下行转换器应返回的数据示例:

1
b'downlink data'