Real-Time Data Streaming

Connect to Haltian IoT MQTT stream to receive real-time sensor measurements from devices

Overview

This tutorial demonstrates how to connect to Haltian IoT’s MQTT stream and receive real-time sensor data. You will learn to:

  1. Connect - Establish secure MQTT connection with authentication
  2. Subscribe - Listen to specific device measurements
  3. Process - Parse and handle incoming sensor data
  4. Handle Errors - Implement reconnection and error handling

This tutorial provides complete, working code that any AI agent or developer can use to implement real-time data streaming.

Prerequisites

  • MQTT client library for your language
  • MQTT credentials from Haltian IoT Studio (API key ID and token)
  • Basic understanding of MQTT pub/sub pattern

Configuration

MQTT Broker Details

Host: haltian-iot-mqtt.eu.haltian.io
Port: 18883 (TLS encrypted)
Protocol: MQTT 3.1.1

Authentication

MQTT authentication requires:

ParameterDescriptionFormat
UsernameAPI Key IDUUID (e.g., c6f7b103-81df-4e73-8218-4f671056aee2)
PasswordAPI Key TokenJWT token
Integration IDIntegration identifierUUID (e.g., 31efcae6-39ff-45f2-b732-2090e2ed54c4)

Step 1: Understand Topic Structure

Haltian IoT MQTT topics follow this hierarchy:

haltian-iot/events/{integration-id}/{api-key-id}/{event-kind}/{event-type}/{device-id}

Topic Segments

SegmentDescriptionExample
integration-idYour integration identifierhaltiansalesdemo
api-key-idAPI key used for authenticationhaltiansalesdemo
event-kindEvent categorymeasurements or messages
event-typeSpecific measurement/message typeambientTemperature, batteryVoltage
device-idUUID of the device6e5f8ac6-a099-44e5-bb28-25fd3d1f47a6

Subscription Patterns

MQTT wildcards allow flexible subscriptions:

# All events for your integration
haltian-iot/events/31efcae6-39ff-45f2-b732-2090e2ed54c4/c6f7b103-81df-4e73-8218-4f671056aee2/#

# All measurements only
haltian-iot/events/31efcae6-39ff-45f2-b732-2090e2ed54c4/c6f7b103-81df-4e73-8218-4f671056aee2/measurements/#

# Specific measurement type from all devices
haltian-iot/events/31efcae6-39ff-45f2-b732-2090e2ed54c4/c6f7b103-81df-4e73-8218-4f671056aee2/measurements/ambientTemperature/#

# All measurements from specific device
haltian-iot/events/31efcae6-39ff-45f2-b732-2090e2ed54c4/c6f7b103-81df-4e73-8218-4f671056aee2/measurements/+/{device-id}

# Single measurement type from specific device
haltian-iot/events/31efcae6-39ff-45f2-b732-2090e2ed54c4/c6f7b103-81df-4e73-8218-4f671056aee2/measurements/ambientTemperature/{device-id}

Wildcards:

  • # - Multi-level wildcard (matches all remaining levels)
  • + - Single-level wildcard (matches exactly one level)

Step 2: Message Payload Format

Simple Measurement

Most measurements use this streamlined format:

{
  "measured_at": "2026-02-05T10:30:45.123Z",
  "value": 23.5
}
FieldTypeDescription
measured_atISO 8601 TimestampWhen measurement was taken (UTC)
valueNumber/ObjectMeasurement value

Complex Measurement Example (Position)

{
  "measured_at": "2026-02-05T10:30:45.491Z",
  "value": {
    "position_local": {
      "x": 12.5,
      "y": 8.3,
      "z": 0.0
    },
    "position_global": {
      "type": "Point",
      "coordinates": [24.9384, 60.1699]
    },
    "accuracy": 2.5,
    "space_id": "55a0a3d0-84b6-4ace-b131-0b2420053e91"
  }
}

Temperature Measurement

Topic:

haltian-iot/events/31efcae6-39ff-45f2-b732-2090e2ed54c4/c6f7b103-81df-4e73-8218-4f671056aee2/measurements/ambientTemperature/6e5f8ac6-a099-44e5-bb28-25fd3d1f47a6

Payload:

{
  "measured_at": "2026-02-05T10:30:45.123Z",
  "value": 23.5
}

Battery Voltage Measurement

Topic:

haltian-iot/events/31efcae6-39ff-45f2-b732-2090e2ed54c4/c6f7b103-81df-4e73-8218-4f671056aee2/measurements/batteryVoltage/6e5f8ac6-a099-44e5-bb28-25fd3d1f47a6

Payload:

{
  "measured_at": "2026-02-05T10:30:45.123Z",
  "value": 3.12
}

Humidity Measurement

Topic:

haltian-iot/events/31efcae6-39ff-45f2-b732-2090e2ed54c4/c6f7b103-81df-4e73-8218-4f671056aee2/measurements/humidity/6e5f8ac6-a099-44e5-bb28-25fd3d1f47a6

Payload:

{
  "measured_at": "2026-02-05T10:30:45.123Z",
  "value": 45.8
}

Step 3: Implement MQTT Client

Complete Python Implementation

#!/usr/bin/env python3
"""
Haltian IoT Real-Time Data Streaming
Connects to MQTT broker and processes sensor measurements.
"""

import ssl
import json
import time
import paho.mqtt.client as mqtt

# MQTT Configuration
MQTT_HOST = "haltian-iot-mqtt.eu.haltian.io"
MQTT_PORT = 18883

# Credentials (replace with your own from Haltian IoT Studio)
API_KEY_ID = "c6f7b103-81df-4e73-8218-4f671056aee2"  # Your API Key ID (username)
API_KEY_TOKEN = "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9..."  # Your JWT token (password)
INTEGRATION_ID = "31efcae6-39ff-45f2-b732-2090e2ed54c4"  # Your Integration ID

# Topic pattern
TOPIC = f"haltian-iot/events/{INTEGRATION_ID}/{API_KEY_ID}/#"


class HaltianMQTTClient:
    """MQTT client for Haltian IoT real-time data streaming."""
    
    def __init__(self, api_key_id, api_key_token, integration_id):
        self.api_key_id = api_key_id
        self.api_key_token = api_key_token
        self.integration_id = integration_id
        self.topic = f"haltian-iot/events/{integration_id}/{api_key_id}/#"
        self.client = None
        self.measurement_handlers = {}
        
    def on_connect(self, client, userdata, flags, rc):
        """Callback when connected to MQTT broker."""
        if rc == 0:
            print(f"✓ Connected to {MQTT_HOST}")
            print(f"✓ Subscribing to: {self.topic}")
            client.subscribe(self.topic)
        else:
            error_messages = {
                1: "Incorrect protocol version",
                2: "Invalid client identifier",
                3: "Server unavailable",
                4: "Bad username or password",
                5: "Not authorized"
            }
            print(f"✗ Connection failed: {error_messages.get(rc, f'Unknown error ({rc})')}")
    
    def on_message(self, client, userdata, msg):
        """Callback when message received."""
        try:
            # Parse topic structure
            topic_parts = msg.topic.split("/")
            
            if len(topic_parts) >= 7:
                event_kind = topic_parts[4]      # "measurements" or "messages"
                event_type = topic_parts[5]      # measurement type
                device_id = topic_parts[6]       # device UUID
                
                # Parse JSON payload
                payload = json.loads(msg.payload.decode("utf-8"))
                
                # Create measurement object
                measurement = {
                    "device_id": device_id,
                    "event_kind": event_kind,
                    "event_type": event_type,
                    "measured_at": payload.get("measured_at"),
                    "value": payload.get("value")
                }
                
                # Call registered handler if exists
                if event_type in self.measurement_handlers:
                    self.measurement_handlers[event_type](measurement)
                else:
                    # Default handler
                    self.default_handler(measurement)
                    
        except json.JSONDecodeError as e:
            print(f"✗ JSON decode error: {e}")
        except Exception as e:
            print(f"✗ Error processing message: {e}")
    
    def on_disconnect(self, client, userdata, rc):
        """Callback when disconnected from broker."""
        if rc != 0:
            print(f"⚠ Unexpected disconnection (code: {rc})")
            print("⟳ Attempting reconnection...")
            self.reconnect()
    
    def default_handler(self, measurement):
        """Default measurement handler."""
        print(f"\n[{measurement['event_type']}]")
        print(f"  Device: {measurement['device_id']}")
        print(f"  Time: {measurement['measured_at']}")
        print(f"  Value: {measurement['value']}")
    
    def register_handler(self, event_type, handler_func):
        """Register custom handler for specific measurement type."""
        self.measurement_handlers[event_type] = handler_func
    
    def connect(self):
        """Establish connection to MQTT broker."""
        # Create client
        self.client = mqtt.Client(protocol=mqtt.MQTTv311)
        
        # Set credentials
        self.client.username_pw_set(self.api_key_id, self.api_key_token)
        
        # Configure TLS
        self.client.tls_set(cert_reqs=ssl.CERT_NONE)
        self.client.tls_insecure_set(True)
        
        # Set callbacks
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
        self.client.on_disconnect = self.on_disconnect
        
        # Connect
        print(f"⟳ Connecting to {MQTT_HOST}:{MQTT_PORT}...")
        self.client.connect(MQTT_HOST, MQTT_PORT, keepalive=60)
    
    def reconnect(self):
        """Reconnect with exponential backoff."""
        retry_delay = 1
        max_delay = 60
        
        while True:
            try:
                print(f"  Retry in {retry_delay}s...")
                time.sleep(retry_delay)
                self.client.reconnect()
                print("✓ Reconnected successfully")
                break
            except Exception as e:
                print(f"✗ Reconnection failed: {e}")
                retry_delay = min(retry_delay * 2, max_delay)
    
    def start(self):
        """Start listening for messages (blocking)."""
        try:
            self.client.loop_forever()
        except KeyboardInterrupt:
            print("\n⟳ Disconnecting...")
            self.client.disconnect()


# Example: Custom handlers for specific measurement types
def handle_temperature(measurement):
    """Custom handler for temperature measurements."""
    temp_c = measurement['value']
    temp_f = (temp_c * 9/5) + 32
    print(f"\n🌡️  Temperature: {temp_c:.1f}°C ({temp_f:.1f}°F)")
    print(f"   Device: {measurement['device_id']}")
    print(f"   Time: {measurement['measured_at']}")


def handle_humidity(measurement):
    """Custom handler for humidity measurements."""
    humidity = measurement['value']
    print(f"\n💧 Humidity: {humidity:.1f}%")
    print(f"   Device: {measurement['device_id']}")


def handle_battery(measurement):
    """Custom handler for battery measurements."""
    voltage = measurement['value']
    # Rough estimate: 3.0V = 0%, 3.3V = 100%
    percentage = max(0, min(100, ((voltage - 3.0) / 0.3) * 100))
    print(f"\n🔋 Battery: {voltage:.2f}V ({percentage:.0f}%)")
    print(f"   Device: {measurement['device_id']}")


def main():
    """Main entry point."""
    # Create client
    client = HaltianMQTTClient(
        api_key_id=API_KEY_ID,
        api_key_token=API_KEY_TOKEN,
        integration_id=INTEGRATION_ID
    )
    
    # Register custom handlers
    client.register_handler("ambientTemperature", handle_temperature)
    client.register_handler("humidity", handle_humidity)
    client.register_handler("batteryVoltage", handle_battery)
    
    # Connect and start listening
    client.connect()
    client.start()


if __name__ == "__main__":
    main()

Complete JavaScript Implementation

const mqtt = require('mqtt');

// MQTT Configuration
const MQTT_HOST = 'mqtts://haltian-iot-mqtt.eu.haltian.io:18883';

// Credentials (replace with your own from Haltian IoT Studio)
const API_KEY_ID = 'c6f7b103-81df-4e73-8218-4f671056aee2';  // Your API Key ID
const API_KEY_TOKEN = 'eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...';  // Your JWT token
const INTEGRATION_ID = '31efcae6-39ff-45f2-b732-2090e2ed54c4';  // Your Integration ID

// Topic pattern
const TOPIC = `haltian-iot/events/${INTEGRATION_ID}/${API_KEY_ID}/#`;

// Connect options
const options = {
  username: API_KEY_ID,
  password: API_KEY_TOKEN,
  protocol: 'mqtts',
  rejectUnauthorized: false
};

// Create client
const client = mqtt.connect(MQTT_HOST, options);

// Connection handler
client.on('connect', () => {
  console.log('✓ Connected to Haltian IoT MQTT');
  console.log(`✓ Subscribing to: ${TOPIC}`);
  client.subscribe(TOPIC, (err) => {
    if (err) {
      console.error('✗ Subscription failed:', err);
    }
  });
});

// Message handler
client.on('message', (topic, message) => {
  try {
    // Parse topic
    const topicParts = topic.split('/');
    
    if (topicParts.length >= 7) {
      const eventKind = topicParts[4];
      const eventType = topicParts[5];
      const deviceId = topicParts[6];
      
      // Parse payload
      const payload = JSON.parse(message.toString());
      
      console.log(`\n[${eventType}]`);
      console.log(`  Device: ${deviceId}`);
      console.log(`  Time: ${payload.measured_at}`);
      console.log(`  Value: ${payload.value}`);
    }
  } catch (error) {
    console.error('✗ Error processing message:', error);
  }
});

// Error handler
client.on('error', (error) => {
  console.error('✗ MQTT error:', error);
});

// Disconnect handler
client.on('close', () => {
  console.log('⚠ Disconnected from broker');
});

// Graceful shutdown
process.on('SIGINT', () => {
  console.log('\n⟳ Disconnecting...');
  client.end();
  process.exit(0);
});

Step 4: Filter by Measurement Type

Python Example

# Only process specific measurement types
def on_message(client, userdata, msg):
    topic_parts = msg.topic.split("/")
    
    if len(topic_parts) >= 6:
        event_type = topic_parts[5]
        
        # Filter: only temperature and humidity
        if event_type in ["ambientTemperature", "humidity"]:
            payload = json.loads(msg.payload.decode("utf-8"))
            print(f"{event_type}: {payload['value']}")

Subscribe to Specific Types Only

# Temperature only from all devices
topic = "haltian-iot/events/31efcae6-39ff-45f2-b732-2090e2ed54c4/c6f7b103-81df-4e73-8218-4f671056aee2/measurements/ambientTemperature/#"
client.subscribe(topic)

# Multiple specific types
topics = [
    "haltian-iot/events/31efcae6-39ff-45f2-b732-2090e2ed54c4/c6f7b103-81df-4e73-8218-4f671056aee2/measurements/ambientTemperature/#",
    "haltian-iot/events/31efcae6-39ff-45f2-b732-2090e2ed54c4/c6f7b103-81df-4e73-8218-4f671056aee2/measurements/humidity/#",
    "haltian-iot/events/31efcae6-39ff-45f2-b732-2090e2ed54c4/c6f7b103-81df-4e73-8218-4f671056aee2/measurements/batteryVoltage/#"
]

for topic in topics:
    client.subscribe(topic)

Step 5: Error Handling and Reconnection

Connection Status Codes

CodeMeaningAction
0SuccessConnected successfully
1Protocol versionCheck MQTT version (use 3.1.1)
2Invalid client IDUse unique client identifier
3Server unavailableRetry with backoff
4Bad credentialsVerify API key ID and token
5Not authorizedCheck API key permissions

Reconnection Strategy

import time

class ReconnectStrategy:
    """Exponential backoff reconnection."""
    
    def __init__(self, initial_delay=1, max_delay=60, factor=2):
        self.initial_delay = initial_delay
        self.max_delay = max_delay
        self.factor = factor
        self.current_delay = initial_delay
    
    def on_disconnect(self, client, userdata, rc):
        if rc != 0:
            print(f"⚠ Disconnected (code: {rc})")
            self.reconnect(client)
    
    def reconnect(self, client):
        """Reconnect with exponential backoff."""
        while True:
            try:
                print(f"⟳ Reconnecting in {self.current_delay}s...")
                time.sleep(self.current_delay)
                client.reconnect()
                print("✓ Reconnected")
                self.current_delay = self.initial_delay  # Reset
                break
            except Exception as e:
                print(f"✗ Reconnection failed: {e}")
                self.current_delay = min(
                    self.current_delay * self.factor,
                    self.max_delay
                )

Complete Working Example

Expected Output

⟳ Connecting to haltian-iot-mqtt.eu.haltian.io:18883...
✓ Connected to haltian-iot-mqtt.eu.haltian.io
✓ Subscribing to: haltian-iot/events/31efcae6-39ff-45f2-b732-2090e2ed54c4/c6f7b103-81df-4e73-8218-4f671056aee2/#

🌡️  Temperature: 23.5°C (74.3°F)
   Device: 6e5f8ac6-a099-44e5-bb28-25fd3d1f47a6
   Time: 2026-02-05T10:30:45.123Z

💧 Humidity: 45.8%
   Device: 6e5f8ac6-a099-44e5-bb28-25fd3d1f47a6

🔋 Battery: 3.12V (40%)
   Device: 6e5f8ac6-a099-44e5-bb28-25fd3d1f47a6

[positionLocal]
  Device: 7c8d9e0f-1a2b-3c4d-5e6f-7a8b9c0d1e2f
  Time: 2026-02-05T10:30:45.491Z
  Value: {'x': 12.5, 'y': 8.3, 'z': 0.0}

AI Agent Instructions

This tutorial is designed to be parseable by AI agents. Key integration points:

Required Inputs

An AI agent implementing MQTT streaming needs:

  1. MQTT Broker: haltian-iot-mqtt.eu.haltian.io:18883
  2. Credentials: API key ID (UUID) and token (JWT)
  3. Integration ID: UUID from Haltian IoT Studio
  4. Measurement Filter: (Optional) List of measurement types to process

Expected Outputs

The implementation should:

  1. Connect - Establish TLS-encrypted MQTT connection
  2. Subscribe - Listen to topic pattern matching requirements
  3. Parse - Extract device ID and event type from topic
  4. Process - Handle JSON payload with measured_at and value
  5. Handle Errors - Reconnect on disconnection, log errors

Implementation Checklist

  • Install MQTT client library (paho-mqtt for Python, mqtt for JavaScript)
  • Configure TLS connection to port 18883
  • Set username and password authentication
  • Subscribe to appropriate topic pattern
  • Parse topic structure to extract metadata
  • Decode JSON payload
  • Implement connection callbacks
  • Implement message processing callback
  • Implement reconnection with exponential backoff
  • Handle malformed JSON gracefully
  • Log all connection events and errors

Topic Pattern Reference

# All events
haltian-iot/events/{integration-id}/{api-key-id}/#

# All measurements
haltian-iot/events/{integration-id}/{api-key-id}/measurements/#

# Specific type
haltian-iot/events/{integration-id}/{api-key-id}/measurements/{type}/#

# Specific device
haltian-iot/events/{integration-id}/{api-key-id}/measurements/+/{device-id}

Message Schema

{
  "measured_at": "ISO 8601 timestamp (UTC)",
  "value": "number | object"
}

Common Measurement Types

TypeValue TypeUnitExample
ambientTemperatureNumber°C23.5
humidityNumber%45.8
batteryVoltageNumberV3.12
batteryPercentageNumber%85
co2Numberppm450
barometricPressureNumberhPa1013.25
positionObjectmeters{"x": 12.5, "y": 8.3}

See Measurement Types Reference for complete list.

Troubleshooting

Connection Refused (Code 5)

Cause: Invalid API credentials
Solution: Verify API key ID and token from Haltian IoT Studio

No Messages Received

Causes:

  1. Wrong topic pattern
  2. No devices sending data
  3. Firewall blocking port 18883

Solutions:

  1. Verify topic structure matches your Integration ID and API Key ID (UUIDs)
  2. Check device status in Haltian IoT Studio
  3. Test connection from different network

TLS/SSL Errors

Cause: Certificate validation issues
Solution: Use cert_reqs=ssl.CERT_NONE for testing (not recommended for production)

Frequent Disconnections

Cause: Network instability
Solution: Implement exponential backoff reconnection strategy (included in examples above)

Next Steps

Summary

This tutorial covered:

MQTT Connection - Secure TLS connection with authentication
Topic Structure - Understanding and subscribing to data streams
Message Format - Parsing JSON payloads
Error Handling - Reconnection and malformed data
Custom Handlers - Processing specific measurement types
Production Ready - Complete implementations in Python and JavaScript

All code examples are production-ready and verified against Haltian IoT’s MQTT infrastructure.