Integration Examples

Code samples for importing Parquet data into common analytics platforms

Python with Pandas

Read Parquet files directly into pandas DataFrames:

import pandas as pd

# Read a single file
df = pd.read_parquet('device/2026/01/2026_01_28_08_device.parquet')

# Read all files in a directory
import glob
files = glob.glob('measurementOccupancyStatus/2026/01/*.parquet')
df = pd.concat([pd.read_parquet(f) for f in files])

# Filter by time range
df_filtered = df[(df['ts'] >= '2026-01-01') & (df['ts'] < '2026-02-01')]

Python with PyArrow

For better performance with large datasets:

import pyarrow.parquet as pq

# Read with column selection
table = pq.read_table(
    'measurementAmbientTemperature/2026/01/',
    columns=['deviceId', 'ts', 'ambientTemperature']
)

# Convert to pandas
df = table.to_pandas()

# Read with row filtering (predicate pushdown)
table = pq.read_table(
    'device/2026/01/',
    filters=[('installationStatus', '=', 'installed')]
)

DuckDB

Query Parquet files directly with SQL:

import duckdb

# Query files with SQL
result = duckdb.query("""
    SELECT 
        deviceId,
        DATE_TRUNC('hour', ts) as hour,
        AVG(ambientTemperature) as avg_temp
    FROM 'measurementAmbientTemperature/2026/01/*.parquet'
    GROUP BY deviceId, DATE_TRUNC('hour', ts)
    ORDER BY hour
""").df()

# Join multiple entity types
result = duckdb.query("""
    SELECT 
        d.name as device_name,
        m.ts,
        m.status
    FROM 'measurementOccupancyStatus/2026/01/*.parquet' m
    JOIN 'device/2026/01/*.parquet' d ON m.deviceId = d.id
    WHERE m.status = 1
""").df()

AWS S3 Direct Access

Python with boto3

import boto3
import pandas as pd
from io import BytesIO

s3 = boto3.client('s3',
    aws_access_key_id='YOUR_ACCESS_KEY',
    aws_secret_access_key='YOUR_SECRET_KEY'
)

bucket = 'your-bucket'
key = 'downloads/parquet/{org-id}/device/2026/01/2026_01_28_08_device.parquet'

response = s3.get_object(Bucket=bucket, Key=key)
df = pd.read_parquet(BytesIO(response['Body'].read()))

PyArrow with S3 filesystem

import pyarrow.parquet as pq
from pyarrow import fs

s3 = fs.S3FileSystem(
    access_key='YOUR_ACCESS_KEY',
    secret_key='YOUR_SECRET_KEY',
    region='eu-west-1'
)

# Read directly from S3
table = pq.read_table(
    'your-bucket/downloads/parquet/{org-id}/device/2026/01/',
    filesystem=s3
)

Databricks

SQL

-- Create external table pointing to S3
CREATE TABLE IF NOT EXISTS haltian_devices
USING PARQUET
LOCATION 's3://your-bucket/downloads/parquet/{org-id}/device/';

-- Query the data
SELECT * FROM haltian_devices
WHERE installationStatus = 'installed';

Python (PySpark)

# Read from S3
df = spark.read.parquet(
    "s3://your-bucket/downloads/parquet/{org-id}/measurementOccupancyStatus/"
)

# Filter and aggregate
from pyspark.sql.functions import date_trunc, avg

hourly_occupancy = df \
    .groupBy("deviceId", date_trunc("hour", "ts").alias("hour")) \
    .agg(avg("status").alias("occupancy_rate"))

Snowflake

Create external stage

CREATE OR REPLACE STAGE haltian_parquet
  URL = 's3://your-bucket/downloads/parquet/{org-id}/'
  CREDENTIALS = (AWS_KEY_ID = '...' AWS_SECRET_KEY = '...')
  FILE_FORMAT = (TYPE = PARQUET);

Query staged files

SELECT 
  $1:deviceId::STRING as device_id,
  $1:ts::TIMESTAMP as measurement_time,
  $1:ambientTemperature::FLOAT as temperature
FROM @haltian_parquet/measurementAmbientTemperature/2026/01/
(FILE_FORMAT => 'PARQUET');

Load into table

CREATE TABLE measurement_temperature (
  device_id STRING,
  ts TIMESTAMP_NTZ,
  ambient_temperature FLOAT,
  organization_id STRING
);

COPY INTO measurement_temperature
FROM @haltian_parquet/measurementAmbientTemperature/
FILE_FORMAT = (TYPE = PARQUET)
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;

Google BigQuery

Load from Cloud Storage

LOAD DATA INTO mydataset.devices
FROM FILES (
  format = 'PARQUET',
  uris = ['gs://your-bucket/downloads/parquet/{org-id}/device/2026/01/*.parquet']
);

External table

CREATE EXTERNAL TABLE mydataset.measurements
OPTIONS (
  format = 'PARQUET',
  uris = ['gs://your-bucket/downloads/parquet/{org-id}/measurementOccupancyStatus/*/*.parquet']
);

Apache Spark

// Scala
val devices = spark.read.parquet(
  "s3a://your-bucket/downloads/parquet/{org-id}/device/"
)

val measurements = spark.read.parquet(
  "s3a://your-bucket/downloads/parquet/{org-id}/measurementOccupancyStatus/"
)

// Join and analyze
val result = measurements
  .join(devices, measurements("deviceId") === devices("id"))
  .groupBy(devices("name"), date_trunc("day", measurements("ts")))
  .agg(avg("status").as("daily_occupancy_rate"))

ClickHouse

-- Query S3 directly
SELECT 
    deviceId,
    toStartOfHour(ts) AS hour,
    avg(ambientTemperature) AS avg_temp
FROM s3(
    'https://your-bucket.s3.amazonaws.com/downloads/parquet/{org-id}/measurementAmbientTemperature/2026/01/*.parquet',
    'AWS_ACCESS_KEY',
    'AWS_SECRET_KEY',
    'Parquet'
)
GROUP BY deviceId, hour
ORDER BY hour;

-- Create table from Parquet
CREATE TABLE devices
ENGINE = MergeTree()
ORDER BY id
AS SELECT * FROM s3(
    'https://your-bucket.s3.amazonaws.com/downloads/parquet/{org-id}/device/2026/01/*.parquet',
    'AWS_ACCESS_KEY',
    'AWS_SECRET_KEY',
    'Parquet'
);

Power BI

Using Python script

  1. In Power BI Desktop, select Get DataPython script
  2. Use this script:
import pandas as pd
import glob

# Update path to your downloaded files
path = "C:/data/haltian/measurementOccupancyStatus/2026/01/*.parquet"
df = pd.concat([pd.read_parquet(f) for f in glob.glob(path)])

Using Azure Synapse connector

If you’ve imported data into Azure Synapse, use the native Synapse connector for best performance.

Common Patterns

Incremental Loading

Track the latest processed file to avoid reprocessing:

import os
from datetime import datetime

def get_new_files(base_path, last_processed_time):
    """Find files newer than last processed timestamp."""
    new_files = []
    for root, dirs, files in os.walk(base_path):
        for file in files:
            if file.endswith('.parquet'):
                filepath = os.path.join(root, file)
                # Parse timestamp from filename: YYYY_MM_DD_HH_entity.parquet
                parts = file.split('_')
                file_time = datetime(
                    int(parts[0]), int(parts[1]), 
                    int(parts[2]), int(parts[3])
                )
                if file_time > last_processed_time:
                    new_files.append(filepath)
    return sorted(new_files)

Joining Entity and Measurement Data

import duckdb

# Get device details with their latest temperature readings
result = duckdb.query("""
    WITH latest_temps AS (
        SELECT 
            deviceId,
            ts,
            ambientTemperature,
            ROW_NUMBER() OVER (PARTITION BY deviceId ORDER BY ts DESC) as rn
        FROM 'measurementAmbientTemperature/2026/01/*.parquet'
    )
    SELECT 
        d.id,
        d.name,
        d.model,
        t.ts as last_reading_time,
        t.ambientTemperature as last_temperature
    FROM 'device/2026/01/*.parquet' d
    LEFT JOIN latest_temps t ON d.id = t.deviceId AND t.rn = 1
    WHERE d.installationStatus = 'installed'
""").df()