MQTT is a lightweight communication platform, ideal for messaging between servers and IoT devices. ThingsBoard is a powerful IoT platform which supports device provisioning and telemetry logging via MQTT. One very useful feature is the ability to mark a device as a gateway. Rather than having to hold credentials for individual devices, a single device acting as an IoT gateway (or a script pulling data from multiple devices) can log to multiple ThingsBoard devices by only communicating to the MQTT gateway node.
MQTT logging to a single device
There are a number of MQTT libraries available for Python, the paho library from Eclipse is very popular. Although there is also a ThingsBoard specific library, the version I looked at had a bug and was not usable at the time.
To post telemetry to a device, the device needs to be created in ThingsBoard, and the access token obtained. The device does not need to be set as public. The client procedure to send a single element of telemetry is:
- Define a client object
- Set up any call back functions
- Set the username / password to be the device access token
- Connect to the server
- Start the client loop
- Publish telemetry
- Stop the client loop
- Disconnect
The callback functions will execute on the specific events, connect, publish and disconnect. By setting a global variable mqtt_connected, to track connection status, we can deal with disconnects. While this is unlikely in a script which sends a single telemetry set, it is useful for long running applications where the connection may be dropped. The callback functions can be expanded for greater error handling.
The following script sends one set of dummy data to a ThingsBoard device:
#!/usr/bin/python
# MQTT test, based on API code at https://thingsboard.io/docs/samples/raspberry/temperature/
import paho.mqtt.client as mqtt
import json
import time
THINGSBOARD_HOST = '<server>'
ACCESS_TOKEN = '<token>'
sensor_data = {'ultimateAnswer':42, 'pi':3.14159}
THINGSBOARD_PORT = 1883
mqtt_connected = False
# Functions to act on connection and check for disconnection
# See https://stackoverflow.com/questions/36093078/mqtt-is-there-a-way-to-check-if-the-client-is-still-connected
def on_connect(client, userdata, flags, rc):
global mqtt_connected
if(rc==0):
mqtt_connected = True
print("MQTT server connected successfully")
else:
print("MQTT server failed to connect rc={}".format(rc))
def on_disconnect(client, userdata, rc):
global mqtt_connected
mqtt_connected = False
print("MQTT server disconnected, rc={}".format(rc))
def on_publish(client, userdata, mid):
print("MQTT on_publish returned {}".format(mid))
def mqttConnect():
global client,mqtt_connected
print("Connecting to ThingsBoard server")
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_publish = on_publish
client.username_pw_set(ACCESS_TOKEN)
try:
client.connect(THINGSBOARD_HOST, THINGSBOARD_PORT, 60)
except:
print("Error: exception when connecting to MQTT server")
client.loop_start()
client = mqtt.Client()
mqttConnect()
print("Publishing data")
client.publish('v1/devices/me/telemetry', json.dumps(sensor_data), 1)
print("Published")
client.loop_stop()
client.disconnect()
print("Test complete")
Logging to multiple devices via a gateway
If we build an object to act as a gateway (e.g. a single device in a LoRaWAN or BLE network), this can dynamically provision ThingsBoard devices and then send telemetry to them. See the ThingsBoard MQTT reference for greater details. We can expand on the above script, but rather than publish once, we need to go through a number of steps to ensure the device exists:
- Connect to the gateway via
v1/gateway/connect
API call. If this does not exist, the device in the name provided is created. By adding a ‘type’ element, the device is created with a matching profile name. If the device already exists, this call does not create another device. However, it can not be used to create the profile. - Optionally, attributes can be set via
v1/gateway/attributes
. Attributes are similar to telemetry, however where as a history of telemetry is stored, only the single value of each key->value pair is stored. In my example this is a note (you can not set a device label), but equally it could be something from the environment such as the GPS location of a fixed sensor. - Telemetry can then be sent. We can send telemetry for multiple devices at one time. The data needs to be in a specific format and include the current timestamp, e.g.
{
"testgw_rcv": [
{
"ts": 1678382122412,
"values": {
"SeeLights": 4,
"OfNine": 7
}
}
]
}
The full listing is below. From this, it should be straight forward to build a sensor loop and add other devices:
#!/usr/bin/python
# MQTT test, based on API code at https://thingsboard.io/docs/reference/python-client-sdk/
import paho.mqtt.client as mqtt
import json
import time
from pprint import pprint
THINGSBOARD_HOST = '<server>'
ACCESS_TOKEN = '<token>'
my_name="TestGateway"
sensor_data = {'ultimateAnswer':42, 'pi':3.14159}
other_device_data = {'SeeLights':4, 'OfNine': 7}
other_device_name = "testgw_rcv"
other_device_profile = "GenericTest"
THINGSBOARD_PORT = 1883
mqtt_connected = False
# Functions to act on connection and check for disconnection
# See https://stackoverflow.com/questions/36093078/mqtt-is-there-a-way-to-check-if-the-client-is-still-connected
def on_connect(client, userdata, flags, rc):
global mqtt_connected
if(rc==0):
mqtt_connected = True
print("MQTT server connected successfully")
else:
print("MQTT server failed to connect rc={}".format(rc))
def on_disconnect(client, userdata, rc):
global mqtt_connected
mqtt_connected = False
print("MQTT server disconnected, rc={}".format(rc))
def on_publish(client, userdata, mid):
print("MQTT on_publish returned {}".format(mid))
def mqttConnect():
global client,mqtt_connected
print("Connecting to ThingsBoard server")
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_publish = on_publish
client.username_pw_set(ACCESS_TOKEN)
try:
client.connect(THINGSBOARD_HOST, THINGSBOARD_PORT, 60)
except:
print("Error: exception when connecting to MQTT server")
client.loop_start()
client = mqtt.Client()
mqttConnect()
print("Publishing teletry data")
client.publish('v1/devices/me/telemetry', json.dumps(sensor_data), 1)
# Create the device
# If the device already exists, this is ignored and the profile does not change (if different)
msg="{{'device':'{}', 'type':'{}'}}".format(other_device_name,other_device_profile)
print("Connecting to gateway to provision device; Message: "+msg)
client.publish('v1/gateway/connect', msg)
# Set device attributes
msg="{{'{}':{{'Note':'Created by MQTT Gateway'}}}}".format(other_device_name)
print("Sending attributes: "+msg)
client.publish('v1/gateway/attributes', msg)
# Build data structure
timeSeries={"ts": int(time.time()*1000), "values": other_device_data}
sendData={other_device_name:[timeSeries]}
#print(json.dumps(sendData, indent=4))
msg=json.dumps(sendData)
print("Publishing teletry: "+msg)
client.publish('v1/gateway/telemetry', msg)
client.loop_stop()
client.disconnect()
print("Test complete")