Subscribe to MQTT Data Stream
How to connect to Haltian IoT MQTT stream and receive real-time sensor data using Python
This guide shows you how to subscribe to Haltian IoT MQTT data stream using Python. You’ll learn to connect, authenticate, and process incoming sensor measurements.
Prerequisites
- Python 3.7 or later
paho-mqttlibrary- MQTT credentials (API key ID and token) from Haltian IoT Studio
Install Dependencies
pip install paho-mqtt
Demo Credentials
For testing purposes, you can use Haltian’s public demo MQTT stream:
| Parameter | Value |
|---|---|
| Host | mqtt.eu.haltian.io |
| Port | 8883 (TLS) |
| API Key ID | haltiansalesdemo |
| Token | haltiansalesdemo |
| Integration ID | haltiansalesdemo |
Note
Demo credentials provide read-only access to Haltian’s sales demo environment. For production use, obtain credentials from your Haltian IoT Studio organization.
Complete Python Example
#!/usr/bin/env python3
"""
Haltian IoT MQTT Subscriber Example
Connects to Haltian IoT MQTT broker and prints incoming measurements.
"""
import ssl
import json
import paho.mqtt.client as mqtt
# MQTT Configuration
MQTT_HOST = "mqtt.eu.haltian.io"
MQTT_PORT = 8883
# Credentials - replace with your own for production
API_KEY_ID = "haltiansalesdemo"
API_KEY_TOKEN = "haltiansalesdemo"
INTEGRATION_ID = "haltiansalesdemo"
# Topic pattern: haltian-iot/events/{integration-id}/{api-key-id}/#
TOPIC = f"haltian-iot/events/{INTEGRATION_ID}/{API_KEY_ID}/#"
def on_connect(client, userdata, flags, rc):
"""Callback when connected to MQTT broker."""
if rc == 0:
print(f"Connected successfully to {MQTT_HOST}")
print(f"Subscribing to: {TOPIC}")
client.subscribe(TOPIC)
else:
print(f"Connection failed with code: {rc}")
# Common error codes:
# 1 = incorrect protocol version
# 2 = invalid client identifier
# 3 = server unavailable
# 4 = bad username or password
# 5 = not authorized
def on_message(client, userdata, msg):
"""Callback when message received."""
try:
# Parse topic to extract metadata
topic_parts = msg.topic.split("/")
# Topic structure: haltian-iot/events/{integration}/{apikey}/{event-kind}/{event-type}/{device-id}
if len(topic_parts) >= 7:
event_kind = topic_parts[4] # e.g., "measurements"
event_type = topic_parts[5] # e.g., "ambient_temperature"
device_id = topic_parts[6] # UUID of the device
# Parse JSON payload
payload = json.loads(msg.payload.decode("utf-8"))
print(f"\n--- {event_kind}/{event_type} ---")
print(f"Device: {device_id}")
print(f"Payload: {json.dumps(payload, indent=2)}")
else:
print(f"Topic: {msg.topic}")
print(f"Payload: {msg.payload.decode('utf-8')}")
except json.JSONDecodeError:
print(f"Non-JSON payload: {msg.payload}")
except Exception as e:
print(f"Error processing message: {e}")
def on_disconnect(client, userdata, rc):
"""Callback when disconnected from broker."""
if rc != 0:
print(f"Unexpected disconnection (code: {rc})")
def main():
"""Main entry point."""
# Create MQTT client
client = mqtt.Client(protocol=mqtt.MQTTv311)
# Set credentials
client.username_pw_set(API_KEY_ID, API_KEY_TOKEN)
# Configure TLS
client.tls_set(cert_reqs=ssl.CERT_NONE)
client.tls_insecure_set(True)
# Set callbacks
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
# Connect
print(f"Connecting to {MQTT_HOST}:{MQTT_PORT}...")
client.connect(MQTT_HOST, MQTT_PORT, keepalive=60)
# Start message loop (blocking)
try:
client.loop_forever()
except KeyboardInterrupt:
print("\nDisconnecting...")
client.disconnect()
if __name__ == "__main__":
main()
Understanding the Topic Structure
Haltian IoT MQTT topics follow this structure:
haltian-iot/events/{integration-id}/{api-key-id}/{event-kind}/{event-type}/{device-id}
| Segment | Description | Example |
|---|---|---|
integration-id | Your integration identifier | haltiansalesdemo |
api-key-id | API key used for authentication | haltiansalesdemo |
event-kind | Category of event | measurements, messages |
event-type | Specific measurement type | ambient_temperature, humidity |
device-id | UUID of the device | 550e8400-e29b-41d4-a716-446655440000 |
Subscription Patterns
# Subscribe to all events
"haltian-iot/events/{integration}/{apikey}/#"
# Subscribe to all measurements
"haltian-iot/events/{integration}/{apikey}/measurements/#"
# Subscribe to specific measurement type
"haltian-iot/events/{integration}/{apikey}/measurements/ambient_temperature/#"
# Subscribe to specific device
"haltian-iot/events/{integration}/{apikey}/measurements/+/{device-id}"
Message Payload Format
Haltian IoT MQTT messages use a streamlined JSON format:
{
"measured_at": "2025-01-28T10:30:00.000Z",
"value": 23.5
}
For complex measurements like position:
{
"measured_at": "2025-01-28T10:30:00.000Z",
"value": {
"position_local": {
"x": 12.5,
"y": 8.3,
"z": 0
},
"position_global": {
"type": "Point",
"coordinates": [24.9384, 60.1699]
},
"accuracy": 2.5
}
}
Filtering by Measurement Type
To process only specific measurement types:
def on_message(client, userdata, msg):
topic_parts = msg.topic.split("/")
if len(topic_parts) >= 6:
event_type = topic_parts[5]
# Only process temperature and humidity
if event_type in ["ambient_temperature", "humidity"]:
payload = json.loads(msg.payload.decode("utf-8"))
print(f"{event_type}: {payload['value']}")
Error Handling and Reconnection
For production use, implement automatic reconnection:
import time
def on_disconnect(client, userdata, rc):
if rc != 0:
print(f"Disconnected (code: {rc}). Reconnecting...")
while True:
try:
client.reconnect()
break
except Exception as e:
print(f"Reconnection failed: {e}")
time.sleep(5)
Next Steps
- Stream API Reference - Complete MQTT topic documentation
- Measurement Types - All available measurement types
- Service API - Query historical data via GraphQL