Examples
Code examples for connecting to the MQTT Stream API
Python Example
Using the paho-mqtt library:
import paho.mqtt.client as mqtt
import json
import ssl
# Configuration
MQTT_BROKER = "mqtt.haltian-iot.example.com"
MQTT_PORT = 8883
INTEGRATION_ID = "your-integration-uuid"
API_KEY_ID = "your-api-key-uuid"
API_KEY_SECRET = "your-api-key-secret"
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT broker")
# Subscribe to all measurements
topic = f"haltian-iot/events/{INTEGRATION_ID}/{API_KEY_ID}/measurements/#"
client.subscribe(topic)
print(f"Subscribed to: {topic}")
else:
print(f"Connection failed with code {rc}")
def on_message(client, userdata, msg):
try:
payload = json.loads(msg.payload.decode())
print(f"Topic: {msg.topic}")
print(f"Payload: {json.dumps(payload, indent=2)}")
print("-" * 40)
except json.JSONDecodeError as e:
print(f"Error decoding message: {e}")
def on_disconnect(client, userdata, rc):
if rc != 0:
print(f"Unexpected disconnection (rc={rc}). Reconnecting...")
# Create client
client = mqtt.Client(client_id=f"python-client-{API_KEY_ID[:8]}")
client.username_pw_set(API_KEY_ID, API_KEY_SECRET)
# TLS configuration
client.tls_set(cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLS)
# Set callbacks
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
# Connect and run
try:
client.connect(MQTT_BROKER, MQTT_PORT, keepalive=60)
client.loop_forever()
except KeyboardInterrupt:
print("Disconnecting...")
client.disconnect()
except Exception as e:
print(f"Error: {e}")
Installation
pip install paho-mqtt
Node.js Example
Using the mqtt package:
const mqtt = require('mqtt');
// Configuration
const MQTT_BROKER = 'mqtts://mqtt.haltian-iot.example.com:8883';
const INTEGRATION_ID = 'your-integration-uuid';
const API_KEY_ID = 'your-api-key-uuid';
const API_KEY_SECRET = 'your-api-key-secret';
// Connection options
const options = {
clientId: `nodejs-client-${API_KEY_ID.substring(0, 8)}`,
username: API_KEY_ID,
password: API_KEY_SECRET,
clean: true,
reconnectPeriod: 5000,
rejectUnauthorized: true
};
// Connect
const client = mqtt.connect(MQTT_BROKER, options);
client.on('connect', () => {
console.log('Connected to MQTT broker');
// Subscribe to all measurements
const topic = `haltian-iot/events/${INTEGRATION_ID}/${API_KEY_ID}/measurements/#`;
client.subscribe(topic, (err) => {
if (err) {
console.error('Subscription error:', err);
} else {
console.log(`Subscribed to: ${topic}`);
}
});
});
client.on('message', (topic, message) => {
try {
const payload = JSON.parse(message.toString());
console.log(`Topic: ${topic}`);
console.log('Payload:', JSON.stringify(payload, null, 2));
console.log('-'.repeat(40));
} catch (e) {
console.error('Error parsing message:', e);
}
});
client.on('error', (err) => {
console.error('MQTT error:', err);
});
client.on('close', () => {
console.log('Connection closed');
});
client.on('reconnect', () => {
console.log('Reconnecting...');
});
// Handle graceful shutdown
process.on('SIGINT', () => {
console.log('Disconnecting...');
client.end();
process.exit();
});
Installation
npm install mqtt
Demo Credentials
For testing purposes, you can use the demo environment:
| Parameter | Value |
|---|---|
| Broker | Contact Haltian support |
| Organization | demo |
demo@haltian.com |
Note
Demo credentials provide read-only access to sample data. Contact Haltian support for production credentials.
Filtering by Device
Subscribe to a specific device’s measurements:
# Single device, all measurements
topic = f"haltian-iot/events/{INTEGRATION_ID}/{API_KEY_ID}/measurements/+/{device_id}"
client.subscribe(topic)
Filtering by Measurement Type
Subscribe to a specific measurement type from all devices:
# All devices, specific measurement
topic = f"haltian-iot/events/{INTEGRATION_ID}/{API_KEY_ID}/measurements/ambientTemperature/+"
client.subscribe(topic)
Handling Reconnection
Implement reconnection logic for production:
def on_disconnect(client, userdata, rc):
if rc != 0:
print(f"Unexpected disconnection. Attempting reconnect...")
while True:
try:
client.reconnect()
break
except Exception as e:
print(f"Reconnect failed: {e}. Retrying in 5 seconds...")
time.sleep(5)
Best Practices
- Use QoS 1 for reliable message delivery
- Implement reconnection logic for production systems
- Buffer messages locally if persistent history is required
- Handle exceptions gracefully in message callbacks
- Use TLS for secure connections
Troubleshooting
Connection Issues
- Verify API key is active and has MQTT permissions
- Check firewall allows port 8883 (TLS)
- Ensure correct broker hostname
No Messages Received
- Verify topic subscription pattern
- Check API key has access to selected spaces
- Confirm devices are online and sending data
Message Loss
- Increase QoS level to 1 or 2
- Implement persistent session
- Add local buffering
Next Steps
- Topics - Topic structure reference
- Service API - Query device metadata