SDK支持:
- 未加密及加密(TLS v1.2)连接;
- QoS 0和1;
- 自动重连;
- 全部Device MQTT APIs
- 全部Gateway MQTT APIs
SDK基于Paho MQTT库。
安装
使用pip安装:
1
pip3 install tb-mqtt-client
快速入门
客户端初始化与遥测发布。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from tb_device_mqtt import TBDeviceMqttClient, TBPublishInfo
telemetry = {"temperature": 41.9, "enabled": False, "currentFirmwareVersion": "v1.2.2"}
client = TBDeviceMqttClient("127.0.0.1", username="A1_TEST_TOKEN")
# Connect to ThingsBoard
client.connect()
# Sending telemetry without checking the delivery status
client.send_telemetry(telemetry)
# Sending telemetry and checking the delivery status (QoS = 1 by default)
result = client.send_telemetry(telemetry)
# get is a blocking call that awaits delivery status
success = result.get() == TBPublishInfo.TB_ERR_SUCCESS
# Disconnect from ThingsBoard
client.disconnect()
使用TLS连接
连接至localhost的TLS。 要通过MQTT over SSL连接ThingsBoard,首先需生成证书,并拥有类似以下代码:
1
2
3
4
5
6
7
8
9
10
from socket import gethostname
from tb_device_mqtt import TBDeviceMqttClient
client = TBDeviceMqttClient(gethostname())
client.connect(tls=True,
ca_certs="mqttserver.pub.pem",
cert_file="mqttclient.nopass.pem")
client.disconnect()
使用Device APIs
TBDeviceMqttClient提供对ThingsBoard平台Device MQTT APIs的访问。 支持发布遥测和属性更新、订阅属性变更、发送和接收RPC命令等。
订阅属性
如需接收共享属性更新,可使用类似以下代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
from time import sleep
from tb_device_mqtt import TBDeviceMqttClient
def callback(result):
print(result)
client = TBDeviceMqttClient("127.0.0.1", username="A1_TEST_TOKEN")
client.connect()
client.subscribe_to_attribute("uploadFrequency", callback)
client.subscribe_to_all_attributes(callback)
while True:
sleep(1)
遥测包发送
为向ThingsBoard发送数据,可使用类似以下代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from time import time
from tb_device_mqtt import TBDeviceMqttClient, TBPublishInfo
telemetry_with_ts = {"ts": int(round(time() * 1000)), "values": {"temperature": 42.1, "humidity": 70}}
client = TBDeviceMqttClient("127.0.0.1", username="A1_TEST_TOKEN")
# we set maximum amount of messages sent to send them at the same time. it may stress memory but increases performance
client.max_inflight_messages_set(100)
client.connect()
results = []
result = True
for i in range(0, 100):
results.append(client.send_telemetry(telemetry_with_ts))
for tmp_result in results:
result &= tmp_result.get() == TBPublishInfo.TB_ERR_SUCCESS
print("Result", str(result))
client.disconnect()
从服务器请求属性
为从ThingsBoard请求共享属性值,可使用以下示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from time import sleep
from tb_device_mqtt import TBDeviceMqttClient
def on_attributes_change(result, exception):
if exception is not None:
print("Exception:", str(exception))
else:
print(result)
client = TBDeviceMqttClient("127.0.0.1", username="A1_TEST_TOKEN")
client.connect()
client.request_attributes(["configuration","targetFirmwareVersion"], callback=on_attributes_change)
while True:
sleep(1)
响应服务器RPC调用
若需对某RPC请求发送响应,可使用如下代码中的逻辑。 以下示例连接至ThingsBoard本地实例并等待RPC请求。 收到RPC请求时,客户端将向ThingsBoard发送包含来自运行客户端的机器数据的响应,设备名称为Test Device A1。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from psutil import cpu_percent, virtual_memory
from time import sleep
from tb_device_mqtt import TBDeviceMqttClient
# dependently of request method we send different data back
def on_server_side_rpc_request(client, request_id, request_body):
print(request_id, request_body)
if request_body["method"] == "getCPULoad":
client.send_rpc_reply(request_id, {"CPU percent": cpu_percent()})
elif request_body["method"] == "getMemoryUsage":
client.send_rpc_reply(request_id, {"Memory": virtual_memory().percent})
client = TBDeviceMqttClient("127.0.0.1", username="A1_TEST_TOKEN")
client.set_server_side_rpc_request_handler(on_server_side_rpc_request)
client.connect()
while True:
sleep(1)
使用Gateway APIs
TBGatewayMqttClient继承TBDeviceMqttClient,因此可像普通设备一样访问其全部API。 此外,网关可代表与其连接的多个设备。例如,代表其他受限设备发送遥测或属性。更多网关信息见此处。
遥测和属性发送
为向ThingsBoard发送设备Test Device A1的数据,可使用类似以下代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
from time import time
from tb_gateway_mqtt import TBGatewayMqttClient
gateway = TBGatewayMqttClient("127.0.0.1", username="TEST_GATEWAY_TOKEN")
gateway.connect()
gateway.gw_connect_device("Test Device A1")
gateway.gw_send_telemetry("Test Device A1", {"ts": int(round(time() * 1000)), "values": {"temperature": 42.2}})
gateway.gw_send_attributes("Test Device A1", {"firmwareVersion": "2.3.1"})
gateway.gw_disconnect_device("Test Device A1")
gateway.disconnect()
从服务器请求属性
为从ThingsBoard请求设备Test Device A1的共享属性值,可使用以下示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from time import sleep
from tb_gateway_mqtt import TBGatewayMqttClient
def callback(result, exception):
if exception is not None:
print("Exception: " + str(exception))
else:
print(result)
gateway = TBGatewayMqttClient("127.0.0.1", username="TEST_GATEWAY_TOKEN")
gateway.connect()
gateway.gw_request_shared_attributes("Test Device A1", ["temperature"], callback)
while True:
sleep(1)
响应服务器RPC调用
若需对某RPC请求发送响应,可使用如下代码中的逻辑。 以下示例将连接至ThingsBoard本地实例并等待RPC请求。 收到RPC请求时,客户端将向ThingsBoard发送设备Test Device A1的响应数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from time import sleep
from psutil import cpu_percent, virtual_memory
from tb_gateway_mqtt import TBGatewayMqttClient
def rpc_request_response(client, request_id, request_body):
# request body contains id, method and other parameters
print(request_body)
method = request_body["data"]["method"]
device = request_body["device"]
req_id = request_body["data"]["id"]
# dependently of request method we send different data back
if method == 'getCPULoad':
gateway.gw_send_rpc_reply(device, req_id, {"CPU load": cpu_percent()})
elif method == 'getMemoryLoad':
gateway.gw_send_rpc_reply(device, req_id, {"Memory": virtual_memory().percent})
else:
print('Unknown method: ' + method)
gateway = TBGatewayMqttClient("127.0.0.1", username="TEST_GATEWAY_TOKEN")
gateway.connect()
# now rpc_request_response will process rpc requests from servers
gateway.gw_set_server_side_rpc_request_handler(rpc_request_response)
# without device connection it is impossible to get any messages
gateway.gw_connect_device("Test Device A1")
while True:
sleep(1)