Integration Examples
Code samples for importing Parquet data into common analytics platforms
Python with Pandas
Read Parquet files directly into pandas DataFrames:
import pandas as pd
# Read a single file
df = pd.read_parquet('device/2026/01/2026_01_28_08_device.parquet')
# Read all files in a directory
import glob
files = glob.glob('measurementOccupancyStatus/2026/01/*.parquet')
df = pd.concat([pd.read_parquet(f) for f in files])
# Filter by time range
df_filtered = df[(df['ts'] >= '2026-01-01') & (df['ts'] < '2026-02-01')]
Python with PyArrow
For better performance with large datasets:
import pyarrow.parquet as pq
# Read with column selection
table = pq.read_table(
'measurementAmbientTemperature/2026/01/',
columns=['deviceId', 'ts', 'ambientTemperature']
)
# Convert to pandas
df = table.to_pandas()
# Read with row filtering (predicate pushdown)
table = pq.read_table(
'device/2026/01/',
filters=[('installationStatus', '=', 'installed')]
)
DuckDB
Query Parquet files directly with SQL:
import duckdb
# Query files with SQL
result = duckdb.query("""
SELECT
deviceId,
DATE_TRUNC('hour', ts) as hour,
AVG(ambientTemperature) as avg_temp
FROM 'measurementAmbientTemperature/2026/01/*.parquet'
GROUP BY deviceId, DATE_TRUNC('hour', ts)
ORDER BY hour
""").df()
# Join multiple entity types
result = duckdb.query("""
SELECT
d.name as device_name,
m.ts,
m.status
FROM 'measurementOccupancyStatus/2026/01/*.parquet' m
JOIN 'device/2026/01/*.parquet' d ON m.deviceId = d.id
WHERE m.status = 1
""").df()
AWS S3 Direct Access
Python with boto3
import boto3
import pandas as pd
from io import BytesIO
s3 = boto3.client('s3',
aws_access_key_id='YOUR_ACCESS_KEY',
aws_secret_access_key='YOUR_SECRET_KEY'
)
bucket = 'your-bucket'
key = 'downloads/parquet/{org-id}/device/2026/01/2026_01_28_08_device.parquet'
response = s3.get_object(Bucket=bucket, Key=key)
df = pd.read_parquet(BytesIO(response['Body'].read()))
PyArrow with S3 filesystem
import pyarrow.parquet as pq
from pyarrow import fs
s3 = fs.S3FileSystem(
access_key='YOUR_ACCESS_KEY',
secret_key='YOUR_SECRET_KEY',
region='eu-west-1'
)
# Read directly from S3
table = pq.read_table(
'your-bucket/downloads/parquet/{org-id}/device/2026/01/',
filesystem=s3
)
Databricks
SQL
-- Create external table pointing to S3
CREATE TABLE IF NOT EXISTS haltian_devices
USING PARQUET
LOCATION 's3://your-bucket/downloads/parquet/{org-id}/device/';
-- Query the data
SELECT * FROM haltian_devices
WHERE installationStatus = 'installed';
Python (PySpark)
# Read from S3
df = spark.read.parquet(
"s3://your-bucket/downloads/parquet/{org-id}/measurementOccupancyStatus/"
)
# Filter and aggregate
from pyspark.sql.functions import date_trunc, avg
hourly_occupancy = df \
.groupBy("deviceId", date_trunc("hour", "ts").alias("hour")) \
.agg(avg("status").alias("occupancy_rate"))
Snowflake
Create external stage
CREATE OR REPLACE STAGE haltian_parquet
URL = 's3://your-bucket/downloads/parquet/{org-id}/'
CREDENTIALS = (AWS_KEY_ID = '...' AWS_SECRET_KEY = '...')
FILE_FORMAT = (TYPE = PARQUET);
Query staged files
SELECT
$1:deviceId::STRING as device_id,
$1:ts::TIMESTAMP as measurement_time,
$1:ambientTemperature::FLOAT as temperature
FROM @haltian_parquet/measurementAmbientTemperature/2026/01/
(FILE_FORMAT => 'PARQUET');
Load into table
CREATE TABLE measurement_temperature (
device_id STRING,
ts TIMESTAMP_NTZ,
ambient_temperature FLOAT,
organization_id STRING
);
COPY INTO measurement_temperature
FROM @haltian_parquet/measurementAmbientTemperature/
FILE_FORMAT = (TYPE = PARQUET)
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;
Google BigQuery
Load from Cloud Storage
LOAD DATA INTO mydataset.devices
FROM FILES (
format = 'PARQUET',
uris = ['gs://your-bucket/downloads/parquet/{org-id}/device/2026/01/*.parquet']
);
External table
CREATE EXTERNAL TABLE mydataset.measurements
OPTIONS (
format = 'PARQUET',
uris = ['gs://your-bucket/downloads/parquet/{org-id}/measurementOccupancyStatus/*/*.parquet']
);
Apache Spark
// Scala
val devices = spark.read.parquet(
"s3a://your-bucket/downloads/parquet/{org-id}/device/"
)
val measurements = spark.read.parquet(
"s3a://your-bucket/downloads/parquet/{org-id}/measurementOccupancyStatus/"
)
// Join and analyze
val result = measurements
.join(devices, measurements("deviceId") === devices("id"))
.groupBy(devices("name"), date_trunc("day", measurements("ts")))
.agg(avg("status").as("daily_occupancy_rate"))
ClickHouse
-- Query S3 directly
SELECT
deviceId,
toStartOfHour(ts) AS hour,
avg(ambientTemperature) AS avg_temp
FROM s3(
'https://your-bucket.s3.amazonaws.com/downloads/parquet/{org-id}/measurementAmbientTemperature/2026/01/*.parquet',
'AWS_ACCESS_KEY',
'AWS_SECRET_KEY',
'Parquet'
)
GROUP BY deviceId, hour
ORDER BY hour;
-- Create table from Parquet
CREATE TABLE devices
ENGINE = MergeTree()
ORDER BY id
AS SELECT * FROM s3(
'https://your-bucket.s3.amazonaws.com/downloads/parquet/{org-id}/device/2026/01/*.parquet',
'AWS_ACCESS_KEY',
'AWS_SECRET_KEY',
'Parquet'
);
Power BI
Using Python script
- In Power BI Desktop, select Get Data → Python script
- Use this script:
import pandas as pd
import glob
# Update path to your downloaded files
path = "C:/data/haltian/measurementOccupancyStatus/2026/01/*.parquet"
df = pd.concat([pd.read_parquet(f) for f in glob.glob(path)])
Using Azure Synapse connector
If you’ve imported data into Azure Synapse, use the native Synapse connector for best performance.
Common Patterns
Incremental Loading
Track the latest processed file to avoid reprocessing:
import os
from datetime import datetime
def get_new_files(base_path, last_processed_time):
"""Find files newer than last processed timestamp."""
new_files = []
for root, dirs, files in os.walk(base_path):
for file in files:
if file.endswith('.parquet'):
filepath = os.path.join(root, file)
# Parse timestamp from filename: YYYY_MM_DD_HH_entity.parquet
parts = file.split('_')
file_time = datetime(
int(parts[0]), int(parts[1]),
int(parts[2]), int(parts[3])
)
if file_time > last_processed_time:
new_files.append(filepath)
return sorted(new_files)
Joining Entity and Measurement Data
import duckdb
# Get device details with their latest temperature readings
result = duckdb.query("""
WITH latest_temps AS (
SELECT
deviceId,
ts,
ambientTemperature,
ROW_NUMBER() OVER (PARTITION BY deviceId ORDER BY ts DESC) as rn
FROM 'measurementAmbientTemperature/2026/01/*.parquet'
)
SELECT
d.id,
d.name,
d.model,
t.ts as last_reading_time,
t.ambientTemperature as last_temperature
FROM 'device/2026/01/*.parquet' d
LEFT JOIN latest_temps t ON d.id = t.deviceId AND t.rn = 1
WHERE d.installationStatus = 'installed'
""").df()