Examples

Code examples for connecting to the MQTT Stream API

Python Example

Using the paho-mqtt library:

import paho.mqtt.client as mqtt
import json
import ssl

# Configuration
MQTT_BROKER = "mqtt.haltian-iot.example.com"
MQTT_PORT = 8883
INTEGRATION_ID = "your-integration-uuid"
API_KEY_ID = "your-api-key-uuid"
API_KEY_SECRET = "your-api-key-secret"

def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("Connected to MQTT broker")
        # Subscribe to all measurements
        topic = f"haltian-iot/events/{INTEGRATION_ID}/{API_KEY_ID}/measurements/#"
        client.subscribe(topic)
        print(f"Subscribed to: {topic}")
    else:
        print(f"Connection failed with code {rc}")

def on_message(client, userdata, msg):
    try:
        payload = json.loads(msg.payload.decode())
        print(f"Topic: {msg.topic}")
        print(f"Payload: {json.dumps(payload, indent=2)}")
        print("-" * 40)
    except json.JSONDecodeError as e:
        print(f"Error decoding message: {e}")

def on_disconnect(client, userdata, rc):
    if rc != 0:
        print(f"Unexpected disconnection (rc={rc}). Reconnecting...")

# Create client
client = mqtt.Client(client_id=f"python-client-{API_KEY_ID[:8]}")
client.username_pw_set(API_KEY_ID, API_KEY_SECRET)

# TLS configuration
client.tls_set(cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLS)

# Set callbacks
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect

# Connect and run
try:
    client.connect(MQTT_BROKER, MQTT_PORT, keepalive=60)
    client.loop_forever()
except KeyboardInterrupt:
    print("Disconnecting...")
    client.disconnect()
except Exception as e:
    print(f"Error: {e}")

Installation

pip install paho-mqtt

Node.js Example

Using the mqtt package:

const mqtt = require('mqtt');

// Configuration
const MQTT_BROKER = 'mqtts://mqtt.haltian-iot.example.com:8883';
const INTEGRATION_ID = 'your-integration-uuid';
const API_KEY_ID = 'your-api-key-uuid';
const API_KEY_SECRET = 'your-api-key-secret';

// Connection options
const options = {
  clientId: `nodejs-client-${API_KEY_ID.substring(0, 8)}`,
  username: API_KEY_ID,
  password: API_KEY_SECRET,
  clean: true,
  reconnectPeriod: 5000,
  rejectUnauthorized: true
};

// Connect
const client = mqtt.connect(MQTT_BROKER, options);

client.on('connect', () => {
  console.log('Connected to MQTT broker');
  
  // Subscribe to all measurements
  const topic = `haltian-iot/events/${INTEGRATION_ID}/${API_KEY_ID}/measurements/#`;
  client.subscribe(topic, (err) => {
    if (err) {
      console.error('Subscription error:', err);
    } else {
      console.log(`Subscribed to: ${topic}`);
    }
  });
});

client.on('message', (topic, message) => {
  try {
    const payload = JSON.parse(message.toString());
    console.log(`Topic: ${topic}`);
    console.log('Payload:', JSON.stringify(payload, null, 2));
    console.log('-'.repeat(40));
  } catch (e) {
    console.error('Error parsing message:', e);
  }
});

client.on('error', (err) => {
  console.error('MQTT error:', err);
});

client.on('close', () => {
  console.log('Connection closed');
});

client.on('reconnect', () => {
  console.log('Reconnecting...');
});

// Handle graceful shutdown
process.on('SIGINT', () => {
  console.log('Disconnecting...');
  client.end();
  process.exit();
});

Installation

npm install mqtt

Demo Credentials

For testing purposes, you can use the demo environment:

ParameterValue
BrokerContact Haltian support
Organizationdemo
Emaildemo@haltian.com

Filtering by Device

Subscribe to a specific device’s measurements:

# Single device, all measurements
topic = f"haltian-iot/events/{INTEGRATION_ID}/{API_KEY_ID}/measurements/+/{device_id}"
client.subscribe(topic)

Filtering by Measurement Type

Subscribe to a specific measurement type from all devices:

# All devices, specific measurement
topic = f"haltian-iot/events/{INTEGRATION_ID}/{API_KEY_ID}/measurements/ambientTemperature/+"
client.subscribe(topic)

Handling Reconnection

Implement reconnection logic for production:

def on_disconnect(client, userdata, rc):
    if rc != 0:
        print(f"Unexpected disconnection. Attempting reconnect...")
        while True:
            try:
                client.reconnect()
                break
            except Exception as e:
                print(f"Reconnect failed: {e}. Retrying in 5 seconds...")
                time.sleep(5)

Best Practices

  1. Use QoS 1 for reliable message delivery
  2. Implement reconnection logic for production systems
  3. Buffer messages locally if persistent history is required
  4. Handle exceptions gracefully in message callbacks
  5. Use TLS for secure connections

Troubleshooting

Connection Issues

  • Verify API key is active and has MQTT permissions
  • Check firewall allows port 8883 (TLS)
  • Ensure correct broker hostname

No Messages Received

  • Verify topic subscription pattern
  • Check API key has access to selected spaces
  • Confirm devices are online and sending data

Message Loss

  • Increase QoS level to 1 or 2
  • Implement persistent session
  • Add local buffering

Next Steps