Subscribe to MQTT Data Stream

How to connect to Haltian IoT MQTT stream and receive real-time sensor data using Python

This guide shows you how to subscribe to Haltian IoT MQTT data stream using Python. You’ll learn to connect, authenticate, and process incoming sensor measurements.

Prerequisites

  • Python 3.7 or later
  • paho-mqtt library
  • MQTT credentials (API key ID and token) from Haltian IoT Studio

Install Dependencies

pip install paho-mqtt

Demo Credentials

For testing purposes, you can use Haltian’s public demo MQTT stream:

ParameterValue
Hostmqtt.eu.haltian.io
Port8883 (TLS)
API Key IDhaltiansalesdemo
Tokenhaltiansalesdemo
Integration IDhaltiansalesdemo

Complete Python Example

#!/usr/bin/env python3
"""
Haltian IoT MQTT Subscriber Example
Connects to Haltian IoT MQTT broker and prints incoming measurements.
"""

import ssl
import json
import paho.mqtt.client as mqtt

# MQTT Configuration
MQTT_HOST = "mqtt.eu.haltian.io"
MQTT_PORT = 8883

# Credentials - replace with your own for production
API_KEY_ID = "haltiansalesdemo"
API_KEY_TOKEN = "haltiansalesdemo"
INTEGRATION_ID = "haltiansalesdemo"

# Topic pattern: haltian-iot/events/{integration-id}/{api-key-id}/#
TOPIC = f"haltian-iot/events/{INTEGRATION_ID}/{API_KEY_ID}/#"


def on_connect(client, userdata, flags, rc):
    """Callback when connected to MQTT broker."""
    if rc == 0:
        print(f"Connected successfully to {MQTT_HOST}")
        print(f"Subscribing to: {TOPIC}")
        client.subscribe(TOPIC)
    else:
        print(f"Connection failed with code: {rc}")
        # Common error codes:
        # 1 = incorrect protocol version
        # 2 = invalid client identifier
        # 3 = server unavailable
        # 4 = bad username or password
        # 5 = not authorized


def on_message(client, userdata, msg):
    """Callback when message received."""
    try:
        # Parse topic to extract metadata
        topic_parts = msg.topic.split("/")
        # Topic structure: haltian-iot/events/{integration}/{apikey}/{event-kind}/{event-type}/{device-id}
        
        if len(topic_parts) >= 7:
            event_kind = topic_parts[4]      # e.g., "measurements"
            event_type = topic_parts[5]      # e.g., "ambient_temperature"
            device_id = topic_parts[6]       # UUID of the device
            
            # Parse JSON payload
            payload = json.loads(msg.payload.decode("utf-8"))
            
            print(f"\n--- {event_kind}/{event_type} ---")
            print(f"Device: {device_id}")
            print(f"Payload: {json.dumps(payload, indent=2)}")
        else:
            print(f"Topic: {msg.topic}")
            print(f"Payload: {msg.payload.decode('utf-8')}")
            
    except json.JSONDecodeError:
        print(f"Non-JSON payload: {msg.payload}")
    except Exception as e:
        print(f"Error processing message: {e}")


def on_disconnect(client, userdata, rc):
    """Callback when disconnected from broker."""
    if rc != 0:
        print(f"Unexpected disconnection (code: {rc})")


def main():
    """Main entry point."""
    # Create MQTT client
    client = mqtt.Client(protocol=mqtt.MQTTv311)
    
    # Set credentials
    client.username_pw_set(API_KEY_ID, API_KEY_TOKEN)
    
    # Configure TLS
    client.tls_set(cert_reqs=ssl.CERT_NONE)
    client.tls_insecure_set(True)
    
    # Set callbacks
    client.on_connect = on_connect
    client.on_message = on_message
    client.on_disconnect = on_disconnect
    
    # Connect
    print(f"Connecting to {MQTT_HOST}:{MQTT_PORT}...")
    client.connect(MQTT_HOST, MQTT_PORT, keepalive=60)
    
    # Start message loop (blocking)
    try:
        client.loop_forever()
    except KeyboardInterrupt:
        print("\nDisconnecting...")
        client.disconnect()


if __name__ == "__main__":
    main()

Understanding the Topic Structure

Haltian IoT MQTT topics follow this structure:

haltian-iot/events/{integration-id}/{api-key-id}/{event-kind}/{event-type}/{device-id}
SegmentDescriptionExample
integration-idYour integration identifierhaltiansalesdemo
api-key-idAPI key used for authenticationhaltiansalesdemo
event-kindCategory of eventmeasurements, messages
event-typeSpecific measurement typeambient_temperature, humidity
device-idUUID of the device550e8400-e29b-41d4-a716-446655440000

Subscription Patterns

# Subscribe to all events
"haltian-iot/events/{integration}/{apikey}/#"

# Subscribe to all measurements
"haltian-iot/events/{integration}/{apikey}/measurements/#"

# Subscribe to specific measurement type
"haltian-iot/events/{integration}/{apikey}/measurements/ambient_temperature/#"

# Subscribe to specific device
"haltian-iot/events/{integration}/{apikey}/measurements/+/{device-id}"

Message Payload Format

Haltian IoT MQTT messages use a streamlined JSON format:

{
  "measured_at": "2025-01-28T10:30:00.000Z",
  "value": 23.5
}

For complex measurements like position:

{
  "measured_at": "2025-01-28T10:30:00.000Z",
  "value": {
    "position_local": {
      "x": 12.5,
      "y": 8.3,
      "z": 0
    },
    "position_global": {
      "type": "Point",
      "coordinates": [24.9384, 60.1699]
    },
    "accuracy": 2.5
  }
}

Filtering by Measurement Type

To process only specific measurement types:

def on_message(client, userdata, msg):
    topic_parts = msg.topic.split("/")
    
    if len(topic_parts) >= 6:
        event_type = topic_parts[5]
        
        # Only process temperature and humidity
        if event_type in ["ambient_temperature", "humidity"]:
            payload = json.loads(msg.payload.decode("utf-8"))
            print(f"{event_type}: {payload['value']}")

Error Handling and Reconnection

For production use, implement automatic reconnection:

import time

def on_disconnect(client, userdata, rc):
    if rc != 0:
        print(f"Disconnected (code: {rc}). Reconnecting...")
        while True:
            try:
                client.reconnect()
                break
            except Exception as e:
                print(f"Reconnection failed: {e}")
                time.sleep(5)

Next Steps