10
- March
2023
Posted By : Dave Hartburn
ThingsBoard, Python, MQTT and Gateways

MQTT is a lightweight communication platform, ideal for messaging between servers and IoT devices. ThingsBoard is a powerful IoT platform which supports device provisioning and telemetry logging via MQTT. One very useful feature is the ability to mark a device as a gateway. Rather than having to hold credentials for individual devices, a single device acting as an IoT gateway (or a script pulling data from multiple devices) can log to multiple ThingsBoard devices by only communicating to the MQTT gateway node.

MQTT logging to a single device

There are a number of MQTT libraries available for Python, the paho library from Eclipse is very popular. Although there is also a ThingsBoard specific library, the version I looked at had a bug and was not usable at the time.

To post telemetry to a device, the device needs to be created in ThingsBoard, and the access token obtained. The device does not need to be set as public. The client procedure to send a single element of telemetry is:

  1. Define a client object
  2. Set up any call back functions
  3. Set the username / password to be the device access token
  4. Connect to the server
  5. Start the client loop
  6. Publish telemetry
  7. Stop the client loop
  8. Disconnect

The callback functions will execute on the specific events, connect, publish and disconnect. By setting a global variable mqtt_connected, to track connection status, we can deal with disconnects. While this is unlikely in a script which sends a single telemetry set, it is useful for long running applications where the connection may be dropped. The callback functions can be expanded for greater error handling.

The following script sends one set of dummy data to a ThingsBoard device:

#!/usr/bin/python

# MQTT test, based on API code at https://thingsboard.io/docs/samples/raspberry/temperature/
import paho.mqtt.client as mqtt
import json
import time

THINGSBOARD_HOST = '<server>'
ACCESS_TOKEN = '<token>'

sensor_data = {'ultimateAnswer':42, 'pi':3.14159}
THINGSBOARD_PORT = 1883
mqtt_connected = False

# Functions to act on connection and check for disconnection
# See https://stackoverflow.com/questions/36093078/mqtt-is-there-a-way-to-check-if-the-client-is-still-connected
def on_connect(client, userdata, flags, rc):
    global mqtt_connected
    if(rc==0):
        mqtt_connected = True
        print("MQTT server connected successfully")
    else:
        print("MQTT server failed to connect rc={}".format(rc))

def on_disconnect(client, userdata, rc):
    global mqtt_connected
    mqtt_connected = False
    print("MQTT server disconnected, rc={}".format(rc))

def on_publish(client, userdata, mid):
    print("MQTT on_publish returned {}".format(mid))

def mqttConnect():
    global client,mqtt_connected
    print("Connecting to ThingsBoard server")
    client.on_connect = on_connect
    client.on_disconnect = on_disconnect
    client.on_publish = on_publish
    client.username_pw_set(ACCESS_TOKEN)
    try:
        client.connect(THINGSBOARD_HOST, THINGSBOARD_PORT, 60)
    except:
        print("Error: exception when connecting to MQTT server")
    client.loop_start()

client = mqtt.Client()
mqttConnect()

print("Publishing data")
client.publish('v1/devices/me/telemetry', json.dumps(sensor_data), 1)
print("Published")

client.loop_stop()
client.disconnect()
print("Test complete")

Logging to multiple devices via a gateway

If we build an object to act as a gateway (e.g. a single device in a LoRaWAN or BLE network), this can dynamically provision ThingsBoard devices and then send telemetry to them. See the ThingsBoard MQTT reference for greater details. We can expand on the above script, but rather than publish once, we need to go through a number of steps to ensure the device exists:

  1. Connect to the gateway via v1/gateway/connect API call. If this does not exist, the device in the name provided is created. By adding a ‘type’ element, the device is created with a matching profile name. If the device already exists, this call does not create another device. However, it can not be used to create the profile.
  2. Optionally, attributes can be set via v1/gateway/attributes. Attributes are similar to telemetry, however where as a history of telemetry is stored, only the single value of each key->value pair is stored. In my example this is a note (you can not set a device label), but equally it could be something from the environment such as the GPS location of a fixed sensor.
  3. Telemetry can then be sent. We can send telemetry for multiple devices at one time. The data needs to be in a specific format and include the current timestamp, e.g.
{
    "testgw_rcv": [
        {
            "ts": 1678382122412,
            "values": {
                "SeeLights": 4,
                "OfNine": 7
            }
        }
    ]
}

The full listing is below. From this, it should be straight forward to build a sensor loop and add other devices:

#!/usr/bin/python

# MQTT test, based on API code at https://thingsboard.io/docs/reference/python-client-sdk/
import paho.mqtt.client as mqtt
import json
import time
from pprint import pprint

THINGSBOARD_HOST = '<server>'
ACCESS_TOKEN = '<token>'

my_name="TestGateway"
sensor_data = {'ultimateAnswer':42, 'pi':3.14159}
other_device_data = {'SeeLights':4, 'OfNine': 7}
other_device_name = "testgw_rcv"
other_device_profile = "GenericTest"

THINGSBOARD_PORT = 1883
mqtt_connected = False

# Functions to act on connection and check for disconnection
# See https://stackoverflow.com/questions/36093078/mqtt-is-there-a-way-to-check-if-the-client-is-still-connected
def on_connect(client, userdata, flags, rc):
    global mqtt_connected
    if(rc==0):
        mqtt_connected = True
        print("MQTT server connected successfully")
    else:
        print("MQTT server failed to connect rc={}".format(rc))

def on_disconnect(client, userdata, rc):
    global mqtt_connected
    mqtt_connected = False
    print("MQTT server disconnected, rc={}".format(rc))

def on_publish(client, userdata, mid):
    print("MQTT on_publish returned {}".format(mid))

def mqttConnect():
    global client,mqtt_connected
    print("Connecting to ThingsBoard server")
    client.on_connect = on_connect
    client.on_disconnect = on_disconnect
    client.on_publish = on_publish
    client.username_pw_set(ACCESS_TOKEN)
    try:
        client.connect(THINGSBOARD_HOST, THINGSBOARD_PORT, 60)
    except:
        print("Error: exception when connecting to MQTT server")
    client.loop_start()

client = mqtt.Client()
mqttConnect()

print("Publishing teletry data")
client.publish('v1/devices/me/telemetry', json.dumps(sensor_data), 1)

# Create the device
# If the device already exists, this is ignored and the profile does not change (if different)
msg="{{'device':'{}', 'type':'{}'}}".format(other_device_name,other_device_profile)
print("Connecting to gateway to provision device; Message: "+msg)
client.publish('v1/gateway/connect', msg)

# Set device attributes
msg="{{'{}':{{'Note':'Created by MQTT Gateway'}}}}".format(other_device_name)
print("Sending attributes: "+msg)
client.publish('v1/gateway/attributes', msg)

# Build data structure
timeSeries={"ts": int(time.time()*1000), "values": other_device_data}
sendData={other_device_name:[timeSeries]}
#print(json.dumps(sendData, indent=4))
msg=json.dumps(sendData)
print("Publishing teletry: "+msg)
client.publish('v1/gateway/telemetry', msg)

client.loop_stop()
client.disconnect()
print("Test complete")

Leave a Reply