Integration Examples
Code samples for importing Parquet data into common analytics platforms
Two active path formats
Both the old and new Parquet exporters are currently writing to the same S3 bucket under different prefixes. The examples on this page use the new path format (parquet/v1/...) and new column names. See Migration Guide and Folder Structure for the differences.
Python with Pandas
Read Parquet files directly into pandas DataFrames:
import pandas as pd
# Read a single file
df = pd.read_parquet('devices/2026/03/2026_03_20_0000_2026_03_20_0000_devices.parquet')
# Read all files in a directory
import glob
files = glob.glob('measurementOccupancyStatus/2026/03/*.parquet')
df = pd.concat([pd.read_parquet(f) for f in files])
# Filter by time range
df_filtered = df[(df['measuredAt'] >= '2026-03-01') & (df['measuredAt'] < '2026-04-01')]
Python with PyArrow
For better performance with large datasets:
import pyarrow.parquet as pq
# Read with column selection
table = pq.read_table(
'measurementAmbientTemperature/2026/03/',
columns=['deviceId', 'measuredAt', 'ambientTemperature']
)
# Convert to pandas
df = table.to_pandas()
# Read with row filtering (predicate pushdown)
table = pq.read_table(
'devices/2026/03/',
filters=[('installationStatus', '=', 'installed')]
)
DuckDB
Query Parquet files directly with SQL:
import duckdb
# Query files with SQL
result = duckdb.query("""
SELECT
deviceId,
DATE_TRUNC('hour', measuredAt) as hour,
AVG(ambientTemperature) as avg_temp
FROM 'measurementAmbientTemperature/2026/03/*.parquet'
GROUP BY deviceId, DATE_TRUNC('hour', measuredAt)
ORDER BY hour
""").df()
# Join multiple entity types
result = duckdb.query("""
SELECT
d.name as device_name,
m.measuredAt,
m.isOccupied
FROM 'measurementOccupancyStatus/2026/03/*.parquet' m
JOIN 'devices/2026/03/*.parquet' d ON m.deviceId = d.id
WHERE m.isOccupied = true
""").df()
AWS S3 Direct Access
Python with boto3
import boto3
import pandas as pd
from io import BytesIO
s3 = boto3.client('s3',
aws_access_key_id='YOUR_ACCESS_KEY',
aws_secret_access_key='YOUR_SECRET_KEY'
)
bucket = '<BUCKET>'
key = 'parquet/v1/{organizationId}/devices/2026/03/2026_03_20_0000_2026_03_20_0000_devices.parquet'
response = s3.get_object(Bucket=bucket, Key=key)
df = pd.read_parquet(BytesIO(response['Body'].read()))
PyArrow with S3 filesystem
import pyarrow.parquet as pq
from pyarrow import fs
s3 = fs.S3FileSystem(
access_key='YOUR_ACCESS_KEY',
secret_key='YOUR_SECRET_KEY',
region='eu-west-1'
)
# Read directly from S3
table = pq.read_table(
'<BUCKET>/parquet/v1/{organizationId}/devices/2026/03/',
filesystem=s3
)
Databricks
SQL
-- Create external table pointing to S3
CREATE TABLE IF NOT EXISTS haltian_devices
USING PARQUET
LOCATION 's3://<BUCKET>/parquet/v1/{organizationId}/devices/';
-- Query the data
SELECT * FROM haltian_devices
WHERE installationStatus = 'installed';
Python (PySpark)
# Read from S3
df = spark.read.parquet(
"s3://<BUCKET>/parquet/v1/{organizationId}/measurementOccupancyStatus/"
)
# Filter and aggregate
from pyspark.sql.functions import col, date_trunc, avg
hourly_occupancy = df \
.withColumn("isOccupied", col("isOccupied").cast("int")) \
.groupBy("deviceId", date_trunc("hour", "measuredAt").alias("hour")) \
.agg(avg("isOccupied").alias("occupancy_rate"))
Snowflake
Create external stage
CREATE OR REPLACE STAGE haltian_parquet
URL = 's3://<BUCKET>/parquet/v1/{organizationId}/'
CREDENTIALS = (AWS_KEY_ID = '...' AWS_SECRET_KEY = '...')
FILE_FORMAT = (TYPE = PARQUET);
Query staged files
SELECT
$1:deviceId::STRING as device_id,
$1:measuredAt::TIMESTAMP as measurement_time,
$1:ambientTemperature::FLOAT as temperature
FROM @haltian_parquet/measurementAmbientTemperature/2026/03/
(FILE_FORMAT => 'PARQUET');
Load into table
CREATE TABLE measurement_temperature (
device_id STRING,
ts TIMESTAMP_NTZ,
ambient_temperature FLOAT,
organization_id STRING
);
COPY INTO measurement_temperature
FROM @haltian_parquet/measurementAmbientTemperature/
FILE_FORMAT = (TYPE = PARQUET)
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;
Google BigQuery
Load from Cloud Storage
LOAD DATA INTO mydataset.devices
FROM FILES (
format = 'PARQUET',
uris = ['gs://<BUCKET>/parquet/v1/{organizationId}/devices/2026/03/*.parquet']
);
External table
CREATE EXTERNAL TABLE mydataset.measurements
OPTIONS (
format = 'PARQUET',
uris = ['gs://<BUCKET>/parquet/v1/{organizationId}/measurementOccupancyStatus/*/*.parquet']
);
Apache Spark
// Scala
val devices = spark.read.parquet(
"s3a://<BUCKET>/parquet/v1/{organizationId}/devices/"
)
val measurements = spark.read.parquet(
"s3a://<BUCKET>/parquet/v1/{organizationId}/measurementOccupancyStatus/"
)
// Join and analyze
val result = measurements
.withColumn("isOccupied", col("isOccupied").cast("int"))
.join(devices, measurements("deviceId") === devices("id"))
.groupBy(devices("name"), date_trunc("day", measurements("measuredAt")))
.agg(avg("isOccupied").as("daily_occupancy_rate"))
ClickHouse
-- Query S3 directly
SELECT
deviceId,
toStartOfHour(measuredAt) AS hour,
avg(ambientTemperature) AS avg_temp
FROM s3(
'https://<BUCKET>.s3.amazonaws.com/parquet/v1/{organizationId}/measurementAmbientTemperature/2026/03/*.parquet',
'AWS_ACCESS_KEY',
'AWS_SECRET_KEY',
'Parquet'
)
GROUP BY deviceId, hour
ORDER BY hour;
-- Create table from Parquet
CREATE TABLE devices
ENGINE = MergeTree()
ORDER BY id
AS SELECT * FROM s3(
'https://<BUCKET>.s3.amazonaws.com/parquet/v1/{organizationId}/devices/2026/03/*.parquet',
'AWS_ACCESS_KEY',
'AWS_SECRET_KEY',
'Parquet'
);
Power BI
Using Python script
- In Power BI Desktop, select Get Data → Python script
- Use this script:
import pandas as pd
import glob
# Update path to your downloaded files
path = "C:/data/haltian/measurementOccupancyStatus/2026/03/*.parquet"
df = pd.concat([pd.read_parquet(f) for f in glob.glob(path)])
Using Azure Synapse connector
If you’ve imported data into Azure Synapse, use the native Synapse connector for best performance.
Common Patterns
Incremental Loading
Track the latest processed file to avoid reprocessing:
import os
from datetime import datetime
def get_new_files(base_path, last_processed_time):
"""Find files newer than last processed timestamp."""
new_files = []
for root, dirs, files in os.walk(base_path):
for file in files:
if file.endswith('.parquet'):
filepath = os.path.join(root, file)
# Parse start timestamp from filename: YYYY_MM_DD_HHMM_YYYY_MM_DD_HHMM_entity.parquet
parts = file.split('_')
file_time = datetime(
int(parts[0]), int(parts[1]),
int(parts[2]), int(parts[3][:2]), int(parts[3][2:])
)
if file_time > last_processed_time:
new_files.append(filepath)
return sorted(new_files)
Joining Entity and Measurement Data
import duckdb
# Get device details with their latest temperature readings
result = duckdb.query("""
WITH latest_temps AS (
SELECT
deviceId,
measuredAt,
ambientTemperature,
ROW_NUMBER() OVER (PARTITION BY deviceId ORDER BY measuredAt DESC) as rn
FROM 'measurementAmbientTemperature/2026/03/*.parquet'
)
SELECT
d.id,
d.name,
d.modelType,
t.measuredAt as last_reading_time,
t.ambientTemperature as last_temperature
FROM 'devices/2026/03/*.parquet' d
LEFT JOIN latest_temps t ON d.id = t.deviceId AND t.rn = 1
WHERE d.installationStatus = 'installed'
""").df()