Examples
Code examples for connecting to the MQTT Stream API
Need MQTT credentials?
See the MQTT API Key Management Guide to create integration credentials via GraphQL before using these examples.
Python Example
Using the paho-mqtt library:
import paho.mqtt.client as mqtt
import json
import ssl
# Configuration
MQTT_BROKER = "mqtt.haltian-iot.example.com"
MQTT_PORT = 8883
INTEGRATION_ID = "your-integration-uuid"
API_KEY_ID = "your-api-key-uuid"
API_KEY_SECRET = "your-api-key-secret"
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT broker")
# Subscribe to all measurements
topic = f"haltian-iot/events/{INTEGRATION_ID}/{API_KEY_ID}/measurements/#"
client.subscribe(topic)
print(f"Subscribed to: {topic}")
else:
print(f"Connection failed with code {rc}")
def on_message(client, userdata, msg):
try:
payload = json.loads(msg.payload.decode())
print(f"Topic: {msg.topic}")
print(f"Payload: {json.dumps(payload, indent=2)}")
print("-" * 40)
except json.JSONDecodeError as e:
print(f"Error decoding message: {e}")
def on_disconnect(client, userdata, rc):
if rc != 0:
print(f"Unexpected disconnection (rc={rc}). Reconnecting...")
# Create client
client = mqtt.Client(client_id=f"python-client-{API_KEY_ID[:8]}")
client.username_pw_set(API_KEY_ID, API_KEY_SECRET)
# TLS configuration
client.tls_set(cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLS)
# Set callbacks
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
# Connect and run
try:
client.connect(MQTT_BROKER, MQTT_PORT, keepalive=60)
client.loop_forever()
except KeyboardInterrupt:
print("Disconnecting...")
client.disconnect()
except Exception as e:
print(f"Error: {e}")
Installation
pip install paho-mqtt
Node.js Example
Using the mqtt package:
const mqtt = require('mqtt');
// Configuration
const MQTT_BROKER = 'mqtts://mqtt.haltian-iot.example.com:8883';
const INTEGRATION_ID = 'your-integration-uuid';
const API_KEY_ID = 'your-api-key-uuid';
const API_KEY_SECRET = 'your-api-key-secret';
// Connection options
const options = {
clientId: `nodejs-client-${API_KEY_ID.substring(0, 8)}`,
username: API_KEY_ID,
password: API_KEY_SECRET,
clean: true,
reconnectPeriod: 5000,
rejectUnauthorized: true
};
// Connect
const client = mqtt.connect(MQTT_BROKER, options);
client.on('connect', () => {
console.log('Connected to MQTT broker');
// Subscribe to all measurements
const topic = `haltian-iot/events/${INTEGRATION_ID}/${API_KEY_ID}/measurements/#`;
client.subscribe(topic, (err) => {
if (err) {
console.error('Subscription error:', err);
} else {
console.log(`Subscribed to: ${topic}`);
}
});
});
client.on('message', (topic, message) => {
try {
const payload = JSON.parse(message.toString());
console.log(`Topic: ${topic}`);
console.log('Payload:', JSON.stringify(payload, null, 2));
console.log('-'.repeat(40));
} catch (e) {
console.error('Error parsing message:', e);
}
});
client.on('error', (err) => {
console.error('MQTT error:', err);
});
client.on('close', () => {
console.log('Connection closed');
});
client.on('reconnect', () => {
console.log('Reconnecting...');
});
// Handle graceful shutdown
process.on('SIGINT', () => {
console.log('Disconnecting...');
client.end();
process.exit();
});
Installation
npm install mqtt
Demo Credentials
For testing purposes, you can use the demo environment:
| Parameter | Value |
|---|---|
| Broker | Contact Haltian support |
| Organization | demo |
demo@haltian.com |
Note
Demo credentials provide read-only access to sample data. Contact Haltian support for production credentials.
Filtering by Device
Subscribe to a specific device’s measurements:
# Single device, all measurements
topic = f"haltian-iot/events/{INTEGRATION_ID}/{API_KEY_ID}/measurements/+/{device_id}"
client.subscribe(topic)
Filtering by Measurement Type
Subscribe to a specific measurement type from all devices:
# All devices, specific measurement
topic = f"haltian-iot/events/{INTEGRATION_ID}/{API_KEY_ID}/measurements/ambientTemperature/+"
client.subscribe(topic)
Handling Reconnection
Implement reconnection logic for production:
def on_disconnect(client, userdata, rc):
if rc != 0:
print(f"Unexpected disconnection. Attempting reconnect...")
while True:
try:
client.reconnect()
break
except Exception as e:
print(f"Reconnect failed: {e}. Retrying in 5 seconds...")
time.sleep(5)
Best Practices
- Use QoS 1 for reliable message delivery
- Implement reconnection logic for production systems
- Buffer messages locally if persistent history is required
- Handle exceptions gracefully in message callbacks
- Use TLS for secure connections
Troubleshooting
Connection Issues
- Verify API key is active and has MQTT permissions
- Check firewall allows port 8883 (TLS)
- Ensure correct broker hostname
No Messages Received
- Verify topic subscription pattern
- Check API key has access to selected spaces
- Confirm devices are online and sending data
Message Loss
- Increase QoS level to 1 or 2
- Implement persistent session
- Add local buffering
Next Steps
- Topics - Topic structure reference
- Service API - Query device metadata