Open Data Schema for Energy

Data Engineer Integration Guide

This guide shows how ODS-E fits into a typical energy data pipeline — from SCADA export to analytics-ready Parquet. It’s written for data engineers who need to ingest heterogeneous solar production data from multiple OEMs and normalize it for downstream analytics.

Where ODS-E Fits

┌─────────────┐     ┌─────────────┐     ┌───────────┐     ┌──────────┐     ┌──────────────┐
│  SCADA /     │     │  Extract    │     │  ODS-E    │     │  Store   │     │  Analytics   │
│  OEM Portal  │────▶│  (CSV/JSON) │────▶│  Pipeline │────▶│  (Lake)  │────▶│  (BI / ML)   │
│              │     │             │     │           │     │          │     │              │
│ FusionSolar  │     │ Scheduled   │     │ Transform │     │ Parquet  │     │ PowerBI      │
│ Enlighten    │     │ API pulls   │     │ Validate  │     │ S3/GCS   │     │ Jupyter      │
│ SolarEdge    │     │ FTP drops   │     │ Enrich    │     │ BigQuery │     │ dbt          │
└─────────────┘     └─────────────┘     └───────────┘     └──────────┘     └──────────────┘

ODS-E handles the middle step: normalizing raw OEM data into a single schema, validating it, and writing it to storage. Everything before (extraction) and after (analytics) stays in your existing stack.

Step 1: Read and Transform

Each OEM uses different column names, units, and timestamp formats. ODS-E normalizes all of them.

from pathlib import Path
from odse import transform

# Process a directory of OEM exports
data_dir = Path("/data/incoming/2026-02-09/")

# Each file might come from a different OEM
huawei_rows = transform(data_dir / "site_alpha.csv", source="huawei", asset_id="SITE-ALPHA")
enphase_rows = transform(data_dir / "site_beta.json", source="enphase", asset_id="SITE-BETA", expected_devices=12)

# All rows now have the same schema: timestamp, kWh, error_type, ...

For OEMs not in the built-in list, use the generic CSV mapper:

rows = transform(data_dir / "scada_export.csv", source="csv", mapping={
    "timestamp": "Timestamp",
    "kWh": "ActiveEnergy_kWh",
    "asset_id": "SiteTag",
    "extra": {"kVAr": "ReactivePower_kVAr"},
})

See the Python SDK Reference for all transform() parameters.

Step 2: Validate

Run schema validation to catch malformed records before they enter your data lake.

from odse import validate_batch

result = validate_batch(huawei_rows)
if result.invalid > 0:
    for idx, error in result.errors:
        print(f"  Row {idx}: {error.code}{error.message}")
    raise RuntimeError(f"{result.invalid} invalid records")

For physical plausibility checks, add semantic validation:

result = validate_batch(huawei_rows, level="semantic", capacity_kw=10.0)
for idx, warning in result.errors:
    print(f"  Row {idx}: {warning.message}")

Step 3: Enrich (Optional)

If records need market context (settlement metadata, tariff info), inject it before writing:

from odse import enrich

enriched = enrich(huawei_rows, {
    "seller_party_id": "nersa:gen:SOLARPK-001",
    "settlement_type": "bilateral",
    "tariff_currency": "ZAR",
})

Step 4: Write to Storage

from odse import to_parquet

all_rows = huawei_rows + enphase_rows
to_parquet(all_rows, "/data/lake/energy/", partition_by=["asset_id", "year", "month", "day"])

This creates a Hive-partitioned directory structure:

/data/lake/energy/
├── asset_id=SITE-ALPHA/year=2026/month=02/day=09/part-00000.parquet
└── asset_id=SITE-BETA/year=2026/month=02/day=09/part-00000.parquet

Compatible with Spark, BigQuery external tables, AWS Athena, DuckDB, and pandas.

JSONL (For Traceability)

from odse import to_json

to_json(all_rows, "/data/archive/2026-02-09.jsonl")

CSV (For Legacy Systems)

from odse import to_csv

to_csv(all_rows, "/data/export/2026-02-09.csv")

Complete Batch Pipeline

Here’s a full pipeline that processes a directory of mixed-OEM exports:

from pathlib import Path
from odse import transform, validate_batch, enrich, to_parquet

# Configuration
SITES = {
    "SITE-ALPHA": {"source": "huawei", "seller": "nersa:gen:ALPHA"},
    "SITE-BETA":  {"source": "enphase", "seller": "nersa:gen:BETA", "expected_devices": 12},
    "SITE-GAMMA": {"source": "solaredge", "seller": "nersa:gen:GAMMA"},
}

data_dir = Path("/data/incoming/2026-02-09/")
all_records = []

for site_id, cfg in SITES.items():
    # Find the file for this site
    files = list(data_dir.glob(f"{site_id}.*"))
    if not files:
        print(f"SKIP {site_id}: no file found")
        continue

    # Transform
    kwargs = {k: v for k, v in cfg.items() if k not in ("source", "seller")}
    rows = transform(files[0], source=cfg["source"], asset_id=site_id, **kwargs)

    # Validate
    result = validate_batch(rows, level="semantic", capacity_kw=10.0)
    print(f"{site_id}: {result.summary}")
    if result.invalid > 0:
        print(f"  WARNING: {result.invalid} invalid records skipped")
        rows = [r for i, r in enumerate(rows) if i not in {idx for idx, _ in result.errors}]

    # Enrich
    rows = enrich(rows, {
        "seller_party_id": cfg["seller"],
        "settlement_type": "bilateral",
    })

    all_records.extend(rows)

# Write to lake
to_parquet(all_records, "/data/lake/energy/", partition_by=["asset_id", "year", "month", "day"])
print(f"Wrote {len(all_records)} records to Parquet")

Streaming Large Files

For files too large to fit in memory, use transform_stream():

from odse import transform_stream, validate, to_json
import json

with open("/data/output/large_site.jsonl", "w") as f:
    for record in transform_stream("/data/incoming/large_export.csv", source="huawei"):
        result = validate(record)
        if result.is_valid:
            f.write(json.dumps(record) + "\n")

CLI for Cron Jobs

The CLI works well in shell scripts and cron jobs:

#!/bin/bash
set -e

DATE=$(date +%Y-%m-%d)
INPUT_DIR="/data/incoming/$DATE"
OUTPUT_DIR="/data/lake/energy"

# Transform each source
odse transform --source huawei \
  --input "$INPUT_DIR/site_alpha.csv" \
  --asset-id SITE-ALPHA \
  --format parquet \
  -o "$OUTPUT_DIR/"

odse transform --source enphase \
  --input "$INPUT_DIR/site_beta.json" \
  --asset-id SITE-BETA \
  --format parquet \
  -o "$OUTPUT_DIR/"

# Validate
odse validate --input "$OUTPUT_DIR/site_alpha.json" --level semantic --capacity-kw 10
odse validate --input "$OUTPUT_DIR/site_beta.json" --level semantic --capacity-kw 10

See the CLI Walkthrough for all commands and flags.

Common Patterns

Incremental Processing

Track the last-processed timestamp to avoid reprocessing:

import json
from pathlib import Path

state_file = Path("/data/state/last_run.json")

# Load state
if state_file.exists():
    state = json.loads(state_file.read_text())
    last_ts = state.get("last_timestamp")
else:
    last_ts = None

# Transform new data
rows = transform("new_export.csv", source="huawei", asset_id="SITE-001")

# Filter to only new records
if last_ts:
    rows = [r for r in rows if r["timestamp"] > last_ts]

if rows:
    to_parquet(rows, "/data/lake/energy/", partition_by=["asset_id", "year", "month"])
    # Save state
    state_file.parent.mkdir(parents=True, exist_ok=True)
    state_file.write_text(json.dumps({"last_timestamp": rows[-1]["timestamp"]}))
    print(f"Wrote {len(rows)} new records")

Multi-Format Ingest

When sites deliver data in different formats:

SOURCE_MAP = {
    ".csv": "huawei",     # Huawei exports as CSV
    ".json": "enphase",   # Enphase API returns JSON
}

for file in data_dir.iterdir():
    source = SOURCE_MAP.get(file.suffix)
    if source:
        rows = transform(file, source=source, asset_id=file.stem)