Cloud Binary Passthrough

Raw binary data passthrough via Haltian IoT Cloud MQTT for third-party Wirepas devices

Audience: Technical architects, developers, and AI coding agents
Prerequisite: MQTT subscription already configured via Haltian GraphQL API
Scope: Understanding and parsing Wirepas mesh network binary passthrough data via Haltian IoT Cloud


Overview

This guide explains how to process binary passthrough data from Wirepas mesh networks received via cloud-based MQTT. The data flows from wireless sensors through Wirepas gateways to Haltian IoT Cloud, which forwards it to your MQTT broker.

Note: Selected Haltian IoT gateways also support direct Gateway Binary Passthrough via local MQTT. This document covers only the cloud-based approach where data is routed through Haltian IoT Cloud.

Data Flow (Cloud-Based):

Sensor → Wirepas Gateway → Haltian IoT Cloud → MQTT Broker → Your Application

MQTT Topic Structure

Topic Pattern

haltian-iot/events/{subscriptionId}/{keyId}/payloads/wirepas/+
SegmentDescription
haltian-iot/eventsFixed prefix for all Haltian IoT events
{subscriptionId}Your GraphQL subscription UUID
{keyId}MQTT authentication key identifier
payloads/wirepasIndicates Wirepas mesh network data
+Wildcard matching any device identifier

Note: The device identifier is the Haltian IoT device UUID (e.g., 550e8400-e29b-41d4-a716-446655440000), not the Wirepas node address.

Example Topic

haltian-iot/events/abc123-def456/key789/payloads/wirepas/device-uuid-001

Subscription Pattern

Subscribe to all devices under your subscription using:

topic = f"haltian-iot/events/{subscription_id}/{key_id}/payloads/wirepas/+"
client.subscribe(topic)

MQTT Payload Structure (JSON)

The MQTT message payload is a JSON object containing metadata and the binary sensor data encoded as base64.

JSON Schema

{
  "received_at": "2026-01-29T12:34:56.789Z",
  "value": {
    "mqtt_topic": "haltian-iot/wirepas/gw-event/received_data/BS225000789/sink1/7156763/21/21",
    "payload": "CkpCTgofCgtCUzIyNTAwMDc4ORIFc2luazEY65DNj9fX+JPTARj..."
  }
}

Example breakdown (endpoint 21/21 - Thingsee CBOR):

  • Device: Thingsee sensor (node address 7156763)
  • Gateway: BS225000789 / sink1
  • Inner payload (after protobuf decode): CBOR array containing temperature, humidity, battery level

After full decoding, this example yields:

{
  "tsmId": 12345678,
  "tsmEv": 10,
  "tsmTs": 1706528096,
  "temp": 23.45,
  "humd": 45.2,
  "batl": 95
}

Field Descriptions

FieldTypeDescription
received_atstringISO 8601 timestamp when message was received by Haltian IoT Cloud
value.mqtt_topicstringOriginal Wirepas gateway MQTT topic (contains routing info)
value.payloadstringBase64-encoded Wirepas protobuf message

Origin Topic Structure

The value.mqtt_topic field contains the original gateway topic with embedded endpoint information:

haltian-iot/wirepas/gw-event/received_data/{gateway}/{sink}/{node_address}/{source_ep}/{dest_ep}
SegmentExampleDescription
{gateway}BS225000789Wirepas gateway identifier
{sink}sink1Gateway sink name
{node_address}7156763Wirepas node address (sensor)
{source_ep}11Source endpoint (data type)
{dest_ep}11Destination endpoint

Extracting the Binary Payload

import base64
import json

def extract_payload(mqtt_message: bytes) -> tuple[bytes, str, str]:
    """
    Extract binary payload and endpoints from MQTT JSON message.
    
    Returns:
        Tuple of (decoded_payload_bytes, source_endpoint, dest_endpoint)
    """
    data = json.loads(mqtt_message.decode("utf-8"))
    value = data.get("value", {})
    
    # Decode base64 payload
    payload_b64 = value.get("payload", "")
    payload_bytes = base64.b64decode(payload_b64) if payload_b64 else b""
    
    # Extract endpoints from origin topic
    origin_topic = value.get("mqtt_topic", "")
    parts = origin_topic.split("/")
    source_ep = parts[-2] if len(parts) >= 2 else "unknown"
    dest_ep = parts[-1] if len(parts) >= 1 else "unknown"
    
    return payload_bytes, source_ep, dest_ep

Wirepas Protobuf Message

The base64-decoded value.payload is always a Protocol Buffer encoded WirepasMessage, regardless of the device type (Thingsee, etc.). This is the standard Wirepas mesh network transport format.

Important: All devices in the Wirepas mesh use the same protobuf wrapper. The device-specific format (CBOR, binary, TLV) is contained within the payload_data field of the protobuf. The endpoint pair indicates which format to expect inside.

Protobuf Schema (Simplified)

message WirepasMessage {
    uint32 source_address = 1;      // Sensor node address (unique per device)
    uint32 destination_address = 2;  // Target address (usually gateway)
    uint32 source_endpoint = 3;      // Identifies data type/protocol
    uint32 destination_endpoint = 4; // Usually matches source_endpoint
    uint64 travel_time_ms = 5;       // Network transit time
    uint32 qos = 6;                  // Quality of service level
    bytes  payload_data = 7;         // The actual sensor data (inner payload)
    uint64 rx_time_ms_epoch = 8;     // Receive timestamp (milliseconds since epoch)
}

Key Fields Explained

FieldPurposeTypical Values
source_addressUnique sensor identifier32-bit integer (e.g., 12345678)
source_endpointData protocol identifier1, 11, 21, 120, 160
destination_endpointUsually matches sourceSame as source_endpoint
payload_dataInner binary payloadFormat depends on endpoint
rx_time_ms_epochMessage receive timeUnix timestamp in milliseconds

Endpoint Routing

The endpoint pair (source_endpoint/destination_endpoint) determines how to parse the inner payload_data. Each endpoint type uses a different encoding format.

Known Endpoint Types

Source EPDest EPProtocolInner FormatDescription
2121Thingsee CBORCBOR arrayThingsee multi-sensor devices

Decoding the Wirepas Protobuf Wrapper

Python Implementation

import struct
from dataclasses import dataclass
from typing import Optional

@dataclass
class WirepasMessage:
    source_address: int
    destination_address: int
    source_endpoint: int
    destination_endpoint: int
    travel_time_ms: int
    qos: int
    payload_data: bytes
    rx_time_ms_epoch: int

def decode_wirepas_message(raw_data: bytes) -> WirepasMessage:
    """
    Decode Wirepas protobuf message.
    
    Protobuf wire format:
    - Field number and wire type packed in varint
    - Wire type 0 = varint, type 2 = length-delimited (bytes)
    """
    pos = 0
    fields = {}
    
    while pos < len(raw_data):
        # Read field tag (varint)
        tag, pos = _read_varint(raw_data, pos)
        field_number = tag >> 3
        wire_type = tag & 0x07
        
        if wire_type == 0:  # Varint
            value, pos = _read_varint(raw_data, pos)
        elif wire_type == 2:  # Length-delimited
            length, pos = _read_varint(raw_data, pos)
            value = raw_data[pos:pos + length]
            pos += length
        else:
            raise ValueError(f"Unsupported wire type: {wire_type}")
        
        fields[field_number] = value
    
    return WirepasMessage(
        source_address=fields.get(1, 0),
        destination_address=fields.get(2, 0),
        source_endpoint=fields.get(3, 0),
        destination_endpoint=fields.get(4, 0),
        travel_time_ms=fields.get(5, 0),
        qos=fields.get(6, 0),
        payload_data=fields.get(7, b''),
        rx_time_ms_epoch=fields.get(8, 0),
    )

def _read_varint(data: bytes, pos: int) -> tuple[int, int]:
    """Read a protobuf varint from data at position pos."""
    result = 0
    shift = 0
    while True:
        byte = data[pos]
        pos += 1
        result |= (byte & 0x7F) << shift
        if not (byte & 0x80):
            break
        shift += 7
    return result, pos

Implementation Checklist

For AI Agents / Developers

Prerequisites

  • MQTT broker connection established
  • Subscription ID and Key ID available from GraphQL setup
  • Python environment with paho-mqtt installed

Core Implementation

  • Subscribe to topic pattern: haltian-iot/events/{subId}/{keyId}/payloads/wirepas/+
  • Implement protobuf decoder
  • Implement WirepasMessage protobuf decoder
  • Extract fields: source_endpoint, destination_endpoint, payload_data

Endpoint Routing

  • Create endpoint handler mapping dictionary
  • Implement routing function with (source_ep, dest_ep) tuple keys
  • Add fallback handler for unknown endpoints

Known Handlers to Implement

Error Handling

  • Handle malformed protobuf messages
  • Log unknown endpoint pairs for analysis
  • Implement graceful degradation (return raw hex on parse failure)

Protobuf Field Quick Reference

Field #NameWire TypeDescription
1source_addressvarintSensor node ID
2destination_addressvarintGateway ID
3source_endpointvarintData type identifier
4destination_endpointvarintUsually = source
5travel_time_msvarintNetwork latency
6qosvarintQuality of service
7payload_databytesInner sensor data
8rx_time_ms_epochvarintReceive timestamp