Files
rpotter6298 c8e3016fd6 Add new daemons and debug scripts for Sigenergy and Oracle functionalities
- Implement `sigen_daemon.py` to poll Sigenergy plant metrics and store snapshots.
- Create `web_daemon.py` for serving a web interface with various endpoints.
- Add debug scripts:
  - `debug_duplicates.py` to find duplicate target times in forecast data.
  - `debug_energy_forecast.py` to print baseline energy forecast curves.
  - `debug_oracle_evaluations.py` to run the oracle evaluator.
  - `debug_sigen.py` to inspect stored Sigenergy plant snapshots.
  - `debug_weather.py` to trace resolved truth data.
  - `modbus_test.py` for exploring Sigenergy plants or inverters over Modbus TCP.
- Introduce `oracle_evaluator.py` for evaluating stored oracle predictions against actuals.
- Add TCN training scripts in `tcn` directory for training usage sequence models.
2026-04-28 08:14:00 +02:00

509 lines
20 KiB
Python

from __future__ import annotations
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from os import environ
from typing import Iterator
from gibil.classes.models import SigenPlantSnapshot
class SigenStoreConfigurationError(RuntimeError):
pass
@dataclass(frozen=True)
class SigenStoreConfig:
database_url: str
@classmethod
def from_env(cls) -> "SigenStoreConfig":
database_url = environ.get("ASTRAPE_DATABASE_URL")
if not database_url:
raise SigenStoreConfigurationError(
"ASTRAPE_DATABASE_URL is required for Sigen storage"
)
return cls(database_url=database_url)
class SigenStore:
"""Persists Sigenergy plant snapshots in TimescaleDB."""
def __init__(self, config: SigenStoreConfig) -> None:
self.config = config
@classmethod
def from_env(cls) -> "SigenStore":
return cls(SigenStoreConfig.from_env())
def initialize(self) -> None:
with self._connection() as connection:
with connection.cursor() as cursor:
cursor.execute("CREATE EXTENSION IF NOT EXISTS timescaledb")
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS sigen_plant_snapshots (
observed_at TIMESTAMPTZ NOT NULL,
received_at TIMESTAMPTZ NOT NULL,
source TEXT NOT NULL,
plant_epoch_seconds BIGINT,
plant_ems_work_mode INTEGER,
plant_running_state INTEGER,
grid_sensor_status INTEGER,
solar_power_w DOUBLE PRECISION,
battery_soc_pct DOUBLE PRECISION,
battery_soh_pct DOUBLE PRECISION,
battery_power_w DOUBLE PRECISION,
grid_power_w DOUBLE PRECISION,
grid_import_w DOUBLE PRECISION,
grid_export_w DOUBLE PRECISION,
load_power_w DOUBLE PRECISION,
plant_active_power_w DOUBLE PRECISION,
accumulated_pv_energy_kwh DOUBLE PRECISION,
daily_consumed_energy_kwh DOUBLE PRECISION,
accumulated_consumed_energy_kwh DOUBLE PRECISION,
raw_values JSONB NOT NULL DEFAULT '{}'::jsonb,
inserted_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (observed_at, source)
)
"""
)
cursor.execute(
"""
SELECT create_hypertable(
'sigen_plant_snapshots',
'observed_at',
if_not_exists => TRUE
)
"""
)
cursor.execute(
"""
CREATE INDEX IF NOT EXISTS sigen_plant_snapshots_received_at_idx
ON sigen_plant_snapshots (received_at DESC)
"""
)
self._create_rollup_view(
cursor,
view_name="sigen_plant_snapshots_1m",
bucket="1 minute",
)
self._create_rollup_view(
cursor,
view_name="sigen_plant_snapshots_15m",
bucket="15 minutes",
)
self._create_rollup_view(
cursor,
view_name="sigen_plant_snapshots_1h",
bucket="1 hour",
)
connection.commit()
def save_snapshot(self, snapshot: SigenPlantSnapshot) -> int:
with self._connection() as connection:
with connection.cursor() as cursor:
try:
from psycopg.types.json import Jsonb
except ImportError as error:
raise SigenStoreConfigurationError(
"Install dependencies with `python3 -m pip install -r requirements.txt`"
) from error
cursor.execute(
"""
INSERT INTO sigen_plant_snapshots (
observed_at,
received_at,
source,
plant_epoch_seconds,
plant_ems_work_mode,
plant_running_state,
grid_sensor_status,
solar_power_w,
battery_soc_pct,
battery_soh_pct,
battery_power_w,
grid_power_w,
grid_import_w,
grid_export_w,
load_power_w,
plant_active_power_w,
accumulated_pv_energy_kwh,
daily_consumed_energy_kwh,
accumulated_consumed_energy_kwh,
raw_values
)
VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s
)
ON CONFLICT (observed_at, source)
DO UPDATE SET
received_at = EXCLUDED.received_at,
plant_epoch_seconds = EXCLUDED.plant_epoch_seconds,
plant_ems_work_mode = EXCLUDED.plant_ems_work_mode,
plant_running_state = EXCLUDED.plant_running_state,
grid_sensor_status = EXCLUDED.grid_sensor_status,
solar_power_w = EXCLUDED.solar_power_w,
battery_soc_pct = EXCLUDED.battery_soc_pct,
battery_soh_pct = EXCLUDED.battery_soh_pct,
battery_power_w = EXCLUDED.battery_power_w,
grid_power_w = EXCLUDED.grid_power_w,
grid_import_w = EXCLUDED.grid_import_w,
grid_export_w = EXCLUDED.grid_export_w,
load_power_w = EXCLUDED.load_power_w,
plant_active_power_w = EXCLUDED.plant_active_power_w,
accumulated_pv_energy_kwh = EXCLUDED.accumulated_pv_energy_kwh,
daily_consumed_energy_kwh = EXCLUDED.daily_consumed_energy_kwh,
accumulated_consumed_energy_kwh = EXCLUDED.accumulated_consumed_energy_kwh,
raw_values = EXCLUDED.raw_values,
inserted_at = now()
""",
(
snapshot.observed_at,
snapshot.received_at,
snapshot.source,
snapshot.plant_epoch_seconds,
snapshot.plant_ems_work_mode,
snapshot.plant_running_state,
snapshot.grid_sensor_status,
snapshot.solar_power_w,
snapshot.battery_soc_pct,
snapshot.battery_soh_pct,
snapshot.battery_power_w,
snapshot.grid_power_w,
snapshot.grid_import_w,
snapshot.grid_export_w,
snapshot.load_power_w,
snapshot.plant_active_power_w,
snapshot.accumulated_pv_energy_kwh,
snapshot.daily_consumed_energy_kwh,
snapshot.accumulated_consumed_energy_kwh,
Jsonb(snapshot.raw_values),
),
)
connection.commit()
return 1
def load_latest_snapshot(self) -> SigenPlantSnapshot | None:
with self._connection() as connection:
with connection.cursor() as cursor:
cursor.execute(
"""
SELECT
observed_at,
received_at,
source,
plant_epoch_seconds,
plant_ems_work_mode,
plant_running_state,
grid_sensor_status,
solar_power_w,
battery_soc_pct,
battery_soh_pct,
battery_power_w,
grid_power_w,
grid_import_w,
grid_export_w,
load_power_w,
plant_active_power_w,
accumulated_pv_energy_kwh,
daily_consumed_energy_kwh,
accumulated_consumed_energy_kwh,
raw_values
FROM sigen_plant_snapshots
ORDER BY observed_at DESC
LIMIT 1
"""
)
row = cursor.fetchone()
if row is None:
return None
return SigenPlantSnapshot(
observed_at=row[0],
received_at=row[1],
source=row[2],
plant_epoch_seconds=row[3],
plant_ems_work_mode=row[4],
plant_running_state=row[5],
grid_sensor_status=row[6],
solar_power_w=row[7],
battery_soc_pct=row[8],
battery_soh_pct=row[9],
battery_power_w=row[10],
grid_power_w=row[11],
grid_import_w=row[12],
grid_export_w=row[13],
load_power_w=row[14],
plant_active_power_w=row[15],
accumulated_pv_energy_kwh=row[16],
daily_consumed_energy_kwh=row[17],
accumulated_consumed_energy_kwh=row[18],
raw_values=row[19] or {},
)
def load_recent_power_summary(
self,
lookback: timedelta = timedelta(minutes=30),
) -> dict[str, float | None]:
start_at = datetime.now(timezone.utc) - lookback
with self._connection() as connection:
with connection.cursor() as cursor:
cursor.execute(
"""
SELECT
avg(load_power_w),
percentile_cont(0.10) WITHIN GROUP (ORDER BY load_power_w),
percentile_cont(0.50) WITHIN GROUP (ORDER BY load_power_w),
percentile_cont(0.90) WITHIN GROUP (ORDER BY load_power_w),
max(load_power_w),
max(solar_power_w)
FROM sigen_plant_snapshots
WHERE observed_at >= %s
""",
(start_at,),
)
row = cursor.fetchone()
return {
"load_avg_w": row[0],
"load_p10_w": row[1],
"load_p50_w": row[2],
"load_p90_w": row[3],
"load_max_w": row[4],
"solar_max_w": row[5],
}
def load_load_profile(
self,
lookback: timedelta = timedelta(days=30),
bucket_minutes: int = 15,
min_samples: int = 5,
timezone_name: str = "UTC",
) -> dict[tuple[int, int], dict[str, float | int]]:
if bucket_minutes <= 0:
raise ValueError("bucket_minutes must be greater than zero")
start_at = datetime.now(timezone.utc) - lookback
with self._connection() as connection:
with connection.cursor() as cursor:
cursor.execute(
"""
WITH localized AS (
SELECT
observed_at AT TIME ZONE %s AS local_observed_at,
load_power_w
FROM sigen_plant_snapshots
WHERE observed_at >= %s
AND observed_at <= now()
AND load_power_w IS NOT NULL
)
SELECT
EXTRACT(ISODOW FROM local_observed_at)::int AS iso_dow,
(
EXTRACT(HOUR FROM local_observed_at)::int * 60
+ FLOOR(EXTRACT(MINUTE FROM local_observed_at)::int / %s)::int * %s
) AS minute_bucket,
percentile_cont(0.10) WITHIN GROUP (ORDER BY load_power_w) AS p10,
percentile_cont(0.50) WITHIN GROUP (ORDER BY load_power_w) AS p50,
percentile_cont(0.90) WITHIN GROUP (ORDER BY load_power_w) AS p90,
avg(load_power_w) AS avg_load_power_w,
max(load_power_w) AS max_load_power_w,
count(*) AS sample_count
FROM localized
GROUP BY iso_dow, minute_bucket
HAVING count(*) >= %s
""",
(
timezone_name,
start_at,
bucket_minutes,
bucket_minutes,
min_samples,
),
)
rows = cursor.fetchall()
return {
(int(row[0]), int(row[1])): {
"p10": float(row[2]),
"p50": float(row[3]),
"p90": float(row[4]),
"avg_load_power_w": float(row[5]),
"max_load_power_w": float(row[6]),
"sample_count": int(row[7]),
}
for row in rows
}
def load_recent_actual_points(
self,
lookback: timedelta = timedelta(hours=24),
bucket: str = "5 minutes",
) -> list[dict[str, object]]:
start_at = datetime.now(timezone.utc) - lookback
with self._connection() as connection:
with connection.cursor() as cursor:
cursor.execute(
f"""
SELECT
time_bucket('{bucket}', observed_at) AS bucket,
avg(solar_power_w) AS solar_power_w,
avg(load_power_w) AS load_power_w,
avg(solar_power_w - load_power_w) AS net_power_w,
avg(grid_import_w) AS grid_import_w,
avg(grid_export_w) AS grid_export_w,
count(*) AS sample_count
FROM sigen_plant_snapshots
WHERE observed_at >= %s
AND observed_at <= now()
GROUP BY bucket
ORDER BY bucket
LIMIT 10000
""",
(start_at,),
)
rows = cursor.fetchall()
return [
{
"target_at": row[0],
"solar_power_w": row[1],
"load_power_w": row[2],
"net_power_w": row[3],
"grid_import_w": row[4],
"grid_export_w": row[5],
"sample_count": row[6],
}
for row in rows
]
def load_recent_solar_peak_w(
self,
lookback: timedelta = timedelta(days=14),
) -> float | None:
start_at = datetime.now(timezone.utc) - lookback
with self._connection() as connection:
with connection.cursor() as cursor:
cursor.execute(
"""
SELECT max(solar_power_w)
FROM sigen_plant_snapshots
WHERE observed_at >= %s
""",
(start_at,),
)
row = cursor.fetchone()
return row[0] if row else None
def load_solar_training_samples(
self,
lookback: timedelta = timedelta(days=30),
min_samples_per_hour: int = 3,
) -> list[dict[str, float | int | object]]:
start_at = datetime.now(timezone.utc) - lookback
with self._connection() as connection:
with connection.cursor() as cursor:
cursor.execute(
"""
WITH hourly_solar AS (
SELECT
time_bucket('1 hour', observed_at) AS target_at,
avg(solar_power_w) AS avg_solar_power_w,
count(*) AS sample_count
FROM sigen_plant_snapshots
WHERE observed_at >= %s
AND solar_power_w IS NOT NULL
GROUP BY target_at
),
latest_weather AS (
SELECT
target_at,
shortwave_radiation_w_m2,
cloud_cover_pct,
ROW_NUMBER() OVER (
PARTITION BY target_at
ORDER BY issued_at DESC
) AS rn
FROM weather_forecast_points
WHERE target_at >= %s
)
SELECT
h.target_at,
h.avg_solar_power_w,
h.sample_count,
w.shortwave_radiation_w_m2,
w.cloud_cover_pct
FROM hourly_solar h
JOIN latest_weather w
ON w.target_at = h.target_at
AND w.rn = 1
WHERE h.sample_count >= %s
AND w.shortwave_radiation_w_m2 IS NOT NULL
ORDER BY h.target_at
""",
(start_at, start_at, min_samples_per_hour),
)
rows = cursor.fetchall()
return [
{
"target_at": row[0],
"solar_power_w": float(row[1]),
"sample_count": int(row[2]),
"shortwave_radiation_w_m2": float(row[3]),
"cloud_cover_pct": float(row[4]) if row[4] is not None else 0.0,
}
for row in rows
]
def _create_rollup_view(self, cursor: object, view_name: str, bucket: str) -> None:
cursor.execute(
f"""
CREATE OR REPLACE VIEW {view_name} AS
SELECT
time_bucket('{bucket}', observed_at) AS bucket,
source,
avg(solar_power_w) AS avg_solar_power_w,
min(solar_power_w) AS min_solar_power_w,
max(solar_power_w) AS max_solar_power_w,
avg(load_power_w) AS avg_load_power_w,
min(load_power_w) AS min_load_power_w,
max(load_power_w) AS max_load_power_w,
avg(grid_import_w) AS avg_grid_import_w,
max(grid_import_w) AS max_grid_import_w,
avg(grid_export_w) AS avg_grid_export_w,
max(grid_export_w) AS max_grid_export_w,
avg(battery_power_w) AS avg_battery_power_w,
min(battery_power_w) AS min_battery_power_w,
max(battery_power_w) AS max_battery_power_w,
avg(battery_soc_pct) AS avg_battery_soc_pct,
min(battery_soc_pct) AS min_battery_soc_pct,
max(battery_soc_pct) AS max_battery_soc_pct,
min(accumulated_pv_energy_kwh) AS start_accumulated_pv_energy_kwh,
max(accumulated_pv_energy_kwh) AS end_accumulated_pv_energy_kwh,
count(*) AS sample_count
FROM sigen_plant_snapshots
GROUP BY bucket, source
"""
)
@contextmanager
def _connection(self) -> Iterator[object]:
try:
import psycopg
except ImportError as error:
raise SigenStoreConfigurationError(
"Install dependencies with `python3 -m pip install -r requirements.txt`"
) from error
with psycopg.connect(self.config.database_url) as connection:
yield connection