Real-Time Data Streaming
Overview
This tutorial demonstrates how to connect to Haltian IoT’s MQTT stream and receive real-time sensor data. You will learn to:
- Connect - Establish secure MQTT connection with authentication
- Subscribe - Listen to specific device measurements
- Process - Parse and handle incoming sensor data
- Handle Errors - Implement reconnection and error handling
This tutorial provides complete, working code that any AI agent or developer can use to implement real-time data streaming.
Prerequisites
- MQTT client library for your language
- MQTT credentials from Haltian IoT Studio (API key ID and token)
- Basic understanding of MQTT pub/sub pattern
Configuration
MQTT Broker Details
Host: haltian-iot-mqtt.eu.haltian.io
Port: 18883 (TLS encrypted)
Protocol: MQTT 3.1.1
Authentication
MQTT authentication requires:
| Parameter | Description | Format |
|---|---|---|
| Username | API Key ID | UUID (e.g., c6f7b103-81df-4e73-8218-4f671056aee2) |
| Password | API Key Token | JWT token |
| Integration ID | Integration identifier | UUID (e.g., 31efcae6-39ff-45f2-b732-2090e2ed54c4) |
Obtain API key credentials from Haltian IoT Studio:
- Navigate to Organization Settings → API Keys
- Create new API key with MQTT permissions
- Copy the Integration ID, API Key ID (username), and Token (password)
Step 1: Understand Topic Structure
Haltian IoT MQTT topics follow this hierarchy:
haltian-iot/events/{integration-id}/{api-key-id}/{event-kind}/{event-type}/{device-id}
Topic Segments
| Segment | Description | Example |
|---|---|---|
integration-id | Your integration identifier | haltiansalesdemo |
api-key-id | API key used for authentication | haltiansalesdemo |
event-kind | Event category | measurements or messages |
event-type | Specific measurement/message type | ambientTemperature, batteryVoltage |
device-id | UUID of the device | 6e5f8ac6-a099-44e5-bb28-25fd3d1f47a6 |
Subscription Patterns
MQTT wildcards allow flexible subscriptions:
# All events for your integration
haltian-iot/events/31efcae6-39ff-45f2-b732-2090e2ed54c4/c6f7b103-81df-4e73-8218-4f671056aee2/#
# All measurements only
haltian-iot/events/31efcae6-39ff-45f2-b732-2090e2ed54c4/c6f7b103-81df-4e73-8218-4f671056aee2/measurements/#
# Specific measurement type from all devices
haltian-iot/events/31efcae6-39ff-45f2-b732-2090e2ed54c4/c6f7b103-81df-4e73-8218-4f671056aee2/measurements/ambientTemperature/#
# All measurements from specific device
haltian-iot/events/31efcae6-39ff-45f2-b732-2090e2ed54c4/c6f7b103-81df-4e73-8218-4f671056aee2/measurements/+/{device-id}
# Single measurement type from specific device
haltian-iot/events/31efcae6-39ff-45f2-b732-2090e2ed54c4/c6f7b103-81df-4e73-8218-4f671056aee2/measurements/ambientTemperature/{device-id}
Wildcards:
#- Multi-level wildcard (matches all remaining levels)+- Single-level wildcard (matches exactly one level)
Step 2: Message Payload Format
Simple Measurement
Most measurements use this streamlined format:
{
"measured_at": "2026-02-05T10:30:45.123Z",
"value": 23.5
}
| Field | Type | Description |
|---|---|---|
measured_at | ISO 8601 Timestamp | When measurement was taken (UTC) |
value | Number/Object | Measurement value |
Complex Measurement Example (Position)
{
"measured_at": "2026-02-05T10:30:45.491Z",
"value": {
"position_local": {
"x": 12.5,
"y": 8.3,
"z": 0.0
},
"position_global": {
"type": "Point",
"coordinates": [24.9384, 60.1699]
},
"accuracy": 2.5,
"space_id": "55a0a3d0-84b6-4ace-b131-0b2420053e91"
}
}
Temperature Measurement
Topic:
haltian-iot/events/31efcae6-39ff-45f2-b732-2090e2ed54c4/c6f7b103-81df-4e73-8218-4f671056aee2/measurements/ambientTemperature/6e5f8ac6-a099-44e5-bb28-25fd3d1f47a6
Payload:
{
"measured_at": "2026-02-05T10:30:45.123Z",
"value": 23.5
}
Battery Voltage Measurement
Topic:
haltian-iot/events/31efcae6-39ff-45f2-b732-2090e2ed54c4/c6f7b103-81df-4e73-8218-4f671056aee2/measurements/batteryVoltage/6e5f8ac6-a099-44e5-bb28-25fd3d1f47a6
Payload:
{
"measured_at": "2026-02-05T10:30:45.123Z",
"value": 3.12
}
Humidity Measurement
Topic:
haltian-iot/events/31efcae6-39ff-45f2-b732-2090e2ed54c4/c6f7b103-81df-4e73-8218-4f671056aee2/measurements/humidity/6e5f8ac6-a099-44e5-bb28-25fd3d1f47a6
Payload:
{
"measured_at": "2026-02-05T10:30:45.123Z",
"value": 45.8
}
Step 3: Implement MQTT Client
Complete Python Implementation
#!/usr/bin/env python3
"""
Haltian IoT Real-Time Data Streaming
Connects to MQTT broker and processes sensor measurements.
"""
import ssl
import json
import time
import paho.mqtt.client as mqtt
# MQTT Configuration
MQTT_HOST = "haltian-iot-mqtt.eu.haltian.io"
MQTT_PORT = 18883
# Credentials (replace with your own from Haltian IoT Studio)
API_KEY_ID = "c6f7b103-81df-4e73-8218-4f671056aee2" # Your API Key ID (username)
API_KEY_TOKEN = "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9..." # Your JWT token (password)
INTEGRATION_ID = "31efcae6-39ff-45f2-b732-2090e2ed54c4" # Your Integration ID
# Topic pattern
TOPIC = f"haltian-iot/events/{INTEGRATION_ID}/{API_KEY_ID}/#"
class HaltianMQTTClient:
"""MQTT client for Haltian IoT real-time data streaming."""
def __init__(self, api_key_id, api_key_token, integration_id):
self.api_key_id = api_key_id
self.api_key_token = api_key_token
self.integration_id = integration_id
self.topic = f"haltian-iot/events/{integration_id}/{api_key_id}/#"
self.client = None
self.measurement_handlers = {}
def on_connect(self, client, userdata, flags, rc):
"""Callback when connected to MQTT broker."""
if rc == 0:
print(f"✓ Connected to {MQTT_HOST}")
print(f"✓ Subscribing to: {self.topic}")
client.subscribe(self.topic)
else:
error_messages = {
1: "Incorrect protocol version",
2: "Invalid client identifier",
3: "Server unavailable",
4: "Bad username or password",
5: "Not authorized"
}
print(f"✗ Connection failed: {error_messages.get(rc, f'Unknown error ({rc})')}")
def on_message(self, client, userdata, msg):
"""Callback when message received."""
try:
# Parse topic structure
topic_parts = msg.topic.split("/")
if len(topic_parts) >= 7:
event_kind = topic_parts[4] # "measurements" or "messages"
event_type = topic_parts[5] # measurement type
device_id = topic_parts[6] # device UUID
# Parse JSON payload
payload = json.loads(msg.payload.decode("utf-8"))
# Create measurement object
measurement = {
"device_id": device_id,
"event_kind": event_kind,
"event_type": event_type,
"measured_at": payload.get("measured_at"),
"value": payload.get("value")
}
# Call registered handler if exists
if event_type in self.measurement_handlers:
self.measurement_handlers[event_type](measurement)
else:
# Default handler
self.default_handler(measurement)
except json.JSONDecodeError as e:
print(f"✗ JSON decode error: {e}")
except Exception as e:
print(f"✗ Error processing message: {e}")
def on_disconnect(self, client, userdata, rc):
"""Callback when disconnected from broker."""
if rc != 0:
print(f"⚠ Unexpected disconnection (code: {rc})")
print("⟳ Attempting reconnection...")
self.reconnect()
def default_handler(self, measurement):
"""Default measurement handler."""
print(f"\n[{measurement['event_type']}]")
print(f" Device: {measurement['device_id']}")
print(f" Time: {measurement['measured_at']}")
print(f" Value: {measurement['value']}")
def register_handler(self, event_type, handler_func):
"""Register custom handler for specific measurement type."""
self.measurement_handlers[event_type] = handler_func
def connect(self):
"""Establish connection to MQTT broker."""
# Create client
self.client = mqtt.Client(protocol=mqtt.MQTTv311)
# Set credentials
self.client.username_pw_set(self.api_key_id, self.api_key_token)
# Configure TLS
self.client.tls_set(cert_reqs=ssl.CERT_NONE)
self.client.tls_insecure_set(True)
# Set callbacks
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.on_disconnect = self.on_disconnect
# Connect
print(f"⟳ Connecting to {MQTT_HOST}:{MQTT_PORT}...")
self.client.connect(MQTT_HOST, MQTT_PORT, keepalive=60)
def reconnect(self):
"""Reconnect with exponential backoff."""
retry_delay = 1
max_delay = 60
while True:
try:
print(f" Retry in {retry_delay}s...")
time.sleep(retry_delay)
self.client.reconnect()
print("✓ Reconnected successfully")
break
except Exception as e:
print(f"✗ Reconnection failed: {e}")
retry_delay = min(retry_delay * 2, max_delay)
def start(self):
"""Start listening for messages (blocking)."""
try:
self.client.loop_forever()
except KeyboardInterrupt:
print("\n⟳ Disconnecting...")
self.client.disconnect()
# Example: Custom handlers for specific measurement types
def handle_temperature(measurement):
"""Custom handler for temperature measurements."""
temp_c = measurement['value']
temp_f = (temp_c * 9/5) + 32
print(f"\n🌡️ Temperature: {temp_c:.1f}°C ({temp_f:.1f}°F)")
print(f" Device: {measurement['device_id']}")
print(f" Time: {measurement['measured_at']}")
def handle_humidity(measurement):
"""Custom handler for humidity measurements."""
humidity = measurement['value']
print(f"\n💧 Humidity: {humidity:.1f}%")
print(f" Device: {measurement['device_id']}")
def handle_battery(measurement):
"""Custom handler for battery measurements."""
voltage = measurement['value']
# Rough estimate: 3.0V = 0%, 3.3V = 100%
percentage = max(0, min(100, ((voltage - 3.0) / 0.3) * 100))
print(f"\n🔋 Battery: {voltage:.2f}V ({percentage:.0f}%)")
print(f" Device: {measurement['device_id']}")
def main():
"""Main entry point."""
# Create client
client = HaltianMQTTClient(
api_key_id=API_KEY_ID,
api_key_token=API_KEY_TOKEN,
integration_id=INTEGRATION_ID
)
# Register custom handlers
client.register_handler("ambientTemperature", handle_temperature)
client.register_handler("humidity", handle_humidity)
client.register_handler("batteryVoltage", handle_battery)
# Connect and start listening
client.connect()
client.start()
if __name__ == "__main__":
main()
Complete JavaScript Implementation
const mqtt = require('mqtt');
// MQTT Configuration
const MQTT_HOST = 'mqtts://haltian-iot-mqtt.eu.haltian.io:18883';
// Credentials (replace with your own from Haltian IoT Studio)
const API_KEY_ID = 'c6f7b103-81df-4e73-8218-4f671056aee2'; // Your API Key ID
const API_KEY_TOKEN = 'eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...'; // Your JWT token
const INTEGRATION_ID = '31efcae6-39ff-45f2-b732-2090e2ed54c4'; // Your Integration ID
// Topic pattern
const TOPIC = `haltian-iot/events/${INTEGRATION_ID}/${API_KEY_ID}/#`;
// Connect options
const options = {
username: API_KEY_ID,
password: API_KEY_TOKEN,
protocol: 'mqtts',
rejectUnauthorized: false
};
// Create client
const client = mqtt.connect(MQTT_HOST, options);
// Connection handler
client.on('connect', () => {
console.log('✓ Connected to Haltian IoT MQTT');
console.log(`✓ Subscribing to: ${TOPIC}`);
client.subscribe(TOPIC, (err) => {
if (err) {
console.error('✗ Subscription failed:', err);
}
});
});
// Message handler
client.on('message', (topic, message) => {
try {
// Parse topic
const topicParts = topic.split('/');
if (topicParts.length >= 7) {
const eventKind = topicParts[4];
const eventType = topicParts[5];
const deviceId = topicParts[6];
// Parse payload
const payload = JSON.parse(message.toString());
console.log(`\n[${eventType}]`);
console.log(` Device: ${deviceId}`);
console.log(` Time: ${payload.measured_at}`);
console.log(` Value: ${payload.value}`);
}
} catch (error) {
console.error('✗ Error processing message:', error);
}
});
// Error handler
client.on('error', (error) => {
console.error('✗ MQTT error:', error);
});
// Disconnect handler
client.on('close', () => {
console.log('⚠ Disconnected from broker');
});
// Graceful shutdown
process.on('SIGINT', () => {
console.log('\n⟳ Disconnecting...');
client.end();
process.exit(0);
});
Step 4: Filter by Measurement Type
Python Example
# Only process specific measurement types
def on_message(client, userdata, msg):
topic_parts = msg.topic.split("/")
if len(topic_parts) >= 6:
event_type = topic_parts[5]
# Filter: only temperature and humidity
if event_type in ["ambientTemperature", "humidity"]:
payload = json.loads(msg.payload.decode("utf-8"))
print(f"{event_type}: {payload['value']}")
Subscribe to Specific Types Only
# Temperature only from all devices
topic = "haltian-iot/events/31efcae6-39ff-45f2-b732-2090e2ed54c4/c6f7b103-81df-4e73-8218-4f671056aee2/measurements/ambientTemperature/#"
client.subscribe(topic)
# Multiple specific types
topics = [
"haltian-iot/events/31efcae6-39ff-45f2-b732-2090e2ed54c4/c6f7b103-81df-4e73-8218-4f671056aee2/measurements/ambientTemperature/#",
"haltian-iot/events/31efcae6-39ff-45f2-b732-2090e2ed54c4/c6f7b103-81df-4e73-8218-4f671056aee2/measurements/humidity/#",
"haltian-iot/events/31efcae6-39ff-45f2-b732-2090e2ed54c4/c6f7b103-81df-4e73-8218-4f671056aee2/measurements/batteryVoltage/#"
]
for topic in topics:
client.subscribe(topic)
Step 5: Error Handling and Reconnection
Connection Status Codes
| Code | Meaning | Action |
|---|---|---|
| 0 | Success | Connected successfully |
| 1 | Protocol version | Check MQTT version (use 3.1.1) |
| 2 | Invalid client ID | Use unique client identifier |
| 3 | Server unavailable | Retry with backoff |
| 4 | Bad credentials | Verify API key ID and token |
| 5 | Not authorized | Check API key permissions |
Reconnection Strategy
import time
class ReconnectStrategy:
"""Exponential backoff reconnection."""
def __init__(self, initial_delay=1, max_delay=60, factor=2):
self.initial_delay = initial_delay
self.max_delay = max_delay
self.factor = factor
self.current_delay = initial_delay
def on_disconnect(self, client, userdata, rc):
if rc != 0:
print(f"⚠ Disconnected (code: {rc})")
self.reconnect(client)
def reconnect(self, client):
"""Reconnect with exponential backoff."""
while True:
try:
print(f"⟳ Reconnecting in {self.current_delay}s...")
time.sleep(self.current_delay)
client.reconnect()
print("✓ Reconnected")
self.current_delay = self.initial_delay # Reset
break
except Exception as e:
print(f"✗ Reconnection failed: {e}")
self.current_delay = min(
self.current_delay * self.factor,
self.max_delay
)
Complete Working Example
Expected Output
⟳ Connecting to haltian-iot-mqtt.eu.haltian.io:18883...
✓ Connected to haltian-iot-mqtt.eu.haltian.io
✓ Subscribing to: haltian-iot/events/31efcae6-39ff-45f2-b732-2090e2ed54c4/c6f7b103-81df-4e73-8218-4f671056aee2/#
🌡️ Temperature: 23.5°C (74.3°F)
Device: 6e5f8ac6-a099-44e5-bb28-25fd3d1f47a6
Time: 2026-02-05T10:30:45.123Z
💧 Humidity: 45.8%
Device: 6e5f8ac6-a099-44e5-bb28-25fd3d1f47a6
🔋 Battery: 3.12V (40%)
Device: 6e5f8ac6-a099-44e5-bb28-25fd3d1f47a6
[positionLocal]
Device: 7c8d9e0f-1a2b-3c4d-5e6f-7a8b9c0d1e2f
Time: 2026-02-05T10:30:45.491Z
Value: {'x': 12.5, 'y': 8.3, 'z': 0.0}
AI Agent Instructions
This tutorial is designed to be parseable by AI agents. Key integration points:
Required Inputs
An AI agent implementing MQTT streaming needs:
- MQTT Broker:
haltian-iot-mqtt.eu.haltian.io:18883 - Credentials: API key ID (UUID) and token (JWT)
- Integration ID: UUID from Haltian IoT Studio
- Measurement Filter: (Optional) List of measurement types to process
Expected Outputs
The implementation should:
- Connect - Establish TLS-encrypted MQTT connection
- Subscribe - Listen to topic pattern matching requirements
- Parse - Extract device ID and event type from topic
- Process - Handle JSON payload with measured_at and value
- Handle Errors - Reconnect on disconnection, log errors
Implementation Checklist
- Install MQTT client library (
paho-mqttfor Python,mqttfor JavaScript) - Configure TLS connection to port 18883
- Set username and password authentication
- Subscribe to appropriate topic pattern
- Parse topic structure to extract metadata
- Decode JSON payload
- Implement connection callbacks
- Implement message processing callback
- Implement reconnection with exponential backoff
- Handle malformed JSON gracefully
- Log all connection events and errors
Topic Pattern Reference
# All events
haltian-iot/events/{integration-id}/{api-key-id}/#
# All measurements
haltian-iot/events/{integration-id}/{api-key-id}/measurements/#
# Specific type
haltian-iot/events/{integration-id}/{api-key-id}/measurements/{type}/#
# Specific device
haltian-iot/events/{integration-id}/{api-key-id}/measurements/+/{device-id}
Message Schema
{
"measured_at": "ISO 8601 timestamp (UTC)",
"value": "number | object"
}
Common Measurement Types
| Type | Value Type | Unit | Example |
|---|---|---|---|
ambientTemperature | Number | °C | 23.5 |
humidity | Number | % | 45.8 |
batteryVoltage | Number | V | 3.12 |
batteryPercentage | Number | % | 85 |
co2 | Number | ppm | 450 |
barometricPressure | Number | hPa | 1013.25 |
position | Object | meters | {"x": 12.5, "y": 8.3} |
See Measurement Types Reference for complete list.
Troubleshooting
Connection Refused (Code 5)
Cause: Invalid API credentials
Solution: Verify API key ID and token from Haltian IoT Studio
No Messages Received
Causes:
- Wrong topic pattern
- No devices sending data
- Firewall blocking port 18883
Solutions:
- Verify topic structure matches your Integration ID and API Key ID (UUIDs)
- Check device status in Haltian IoT Studio
- Test connection from different network
TLS/SSL Errors
Cause: Certificate validation issues
Solution: Use cert_reqs=ssl.CERT_NONE for testing (not recommended for production)
Frequent Disconnections
Cause: Network instability
Solution: Implement exponential backoff reconnection strategy (included in examples above)
Next Steps
- Stream API Reference - Complete MQTT documentation
- Measurement Types - All available measurements
- Building a Complete Application - Combine with GraphQL queries
- Service API Authentication - Get API credentials
Summary
This tutorial covered:
✅ MQTT Connection - Secure TLS connection with authentication
✅ Topic Structure - Understanding and subscribing to data streams
✅ Message Format - Parsing JSON payloads
✅ Error Handling - Reconnection and malformed data
✅ Custom Handlers - Processing specific measurement types
✅ Production Ready - Complete implementations in Python and JavaScript
All code examples are production-ready and verified against Haltian IoT’s MQTT infrastructure.