Files
rpotter6298 c8e3016fd6 Add new daemons and debug scripts for Sigenergy and Oracle functionalities
- Implement `sigen_daemon.py` to poll Sigenergy plant metrics and store snapshots.
- Create `web_daemon.py` for serving a web interface with various endpoints.
- Add debug scripts:
  - `debug_duplicates.py` to find duplicate target times in forecast data.
  - `debug_energy_forecast.py` to print baseline energy forecast curves.
  - `debug_oracle_evaluations.py` to run the oracle evaluator.
  - `debug_sigen.py` to inspect stored Sigenergy plant snapshots.
  - `debug_weather.py` to trace resolved truth data.
  - `modbus_test.py` for exploring Sigenergy plants or inverters over Modbus TCP.
- Introduce `oracle_evaluator.py` for evaluating stored oracle predictions against actuals.
- Add TCN training scripts in `tcn` directory for training usage sequence models.
2026-04-28 08:14:00 +02:00

176 lines
6.0 KiB
Python

from __future__ import annotations
from datetime import datetime, timezone
from os import environ
from typing import Any
from gibil.classes.models import SigenPlantSnapshot
from gibil.classes.sigen.modbus import SigenModbusClient
from gibil.classes.sigen.registers import PLANT_REGISTERS, SigenRegister
CORE_PLANT_REGISTER_NAMES = (
"plant_system_time",
"plant_ems_work_mode",
"plant_grid_sensor_status",
"plant_grid_sensor_active_power",
"plant_ess_soc",
"plant_active_power",
"plant_sigen_photovoltaic_power",
"plant_ess_power",
"plant_running_state",
"plant_ess_soh",
"plant_accumulated_pv_energy",
"plant_daily_consumed_energy",
"plant_accumulated_consumed_energy",
"plant_total_load_power",
)
class SigenPlantClient:
"""Fetches plant-level Sigenergy metrics over Modbus TCP."""
def __init__(self, modbus_client: SigenModbusClient) -> None:
self.modbus_client = modbus_client
@classmethod
def from_env(cls) -> "SigenPlantClient":
host = environ.get("SIGEN_MODBUS_HOST")
if not host:
raise RuntimeError("SIGEN_MODBUS_HOST is required for Sigen Modbus reads")
return cls(
SigenModbusClient(
host=host,
port=int(environ.get("SIGEN_MODBUS_PORT", "502")),
unit_id=int(environ.get("SIGEN_MODBUS_UNIT_ID", "247")),
timeout=float(environ.get("SIGEN_MODBUS_TIMEOUT", "20")),
retries=int(environ.get("SIGEN_MODBUS_RETRIES", "3")),
)
)
def fetch_snapshot(
self,
register_names: tuple[str, ...] = CORE_PLANT_REGISTER_NAMES,
) -> SigenPlantSnapshot:
with self.modbus_client as client:
values = self._read_values(client, register_names)
return SigenBuilder().build_snapshot(values)
def _read_values(
self,
client: SigenModbusClient,
register_names: tuple[str, ...],
) -> dict[str, int | float | str | bool | None]:
values: dict[str, int | float | str | bool | None] = {}
for name in register_names:
register = PLANT_REGISTERS[name]
try:
values[name] = self._read_value(client, register)
except Exception as exc:
values[name] = None
values[f"{name}_error"] = str(exc)
return values
def _read_value(
self,
client: SigenModbusClient,
register: SigenRegister,
) -> int | float | str | bool | None:
result = client.read(register.kind, register.address, register.count)
return register.decode(result.values)
class SigenBuilder:
"""Builds database-ready Sigenergy plant snapshots from decoded registers."""
max_plant_clock_drift_seconds = 300
def build_snapshot(
self,
values: dict[str, Any],
received_at: datetime | None = None,
) -> SigenPlantSnapshot:
if received_at is None:
received_at = datetime.now(timezone.utc)
plant_epoch_seconds = self._int_or_none(values.get("plant_system_time"))
observed_at = self._observed_at(plant_epoch_seconds, received_at)
grid_power_w = self._kw_to_w(values.get("plant_grid_sensor_active_power"))
return SigenPlantSnapshot(
observed_at=observed_at,
received_at=received_at,
plant_epoch_seconds=plant_epoch_seconds,
plant_ems_work_mode=self._int_or_none(values.get("plant_ems_work_mode")),
plant_running_state=self._int_or_none(values.get("plant_running_state")),
grid_sensor_status=self._int_or_none(
values.get("plant_grid_sensor_status")
),
solar_power_w=self._kw_to_w(
values.get("plant_sigen_photovoltaic_power")
),
battery_soc_pct=self._float_or_none(values.get("plant_ess_soc")),
battery_soh_pct=self._float_or_none(values.get("plant_ess_soh")),
battery_power_w=self._kw_to_w(values.get("plant_ess_power")),
grid_power_w=grid_power_w,
grid_import_w=max(grid_power_w, 0.0) if grid_power_w is not None else None,
grid_export_w=abs(min(grid_power_w, 0.0))
if grid_power_w is not None
else None,
load_power_w=self._kw_to_w(values.get("plant_total_load_power")),
plant_active_power_w=self._kw_to_w(values.get("plant_active_power")),
accumulated_pv_energy_kwh=self._float_or_none(
values.get("plant_accumulated_pv_energy")
),
daily_consumed_energy_kwh=self._float_or_none(
values.get("plant_daily_consumed_energy")
),
accumulated_consumed_energy_kwh=self._float_or_none(
values.get("plant_accumulated_consumed_energy")
),
raw_values=dict(values),
)
def _observed_at(
self,
plant_epoch_seconds: int | None,
fallback: datetime,
) -> datetime:
if plant_epoch_seconds is None:
return fallback
try:
plant_time = datetime.fromtimestamp(plant_epoch_seconds, timezone.utc)
except (OverflowError, OSError, ValueError):
return fallback
drift_seconds = abs((fallback - plant_time).total_seconds())
if drift_seconds > self.max_plant_clock_drift_seconds:
return fallback
return plant_time
def _kw_to_w(self, value: Any) -> float | None:
numeric = self._float_or_none(value)
if numeric is None:
return None
return numeric * 1000
def _float_or_none(self, value: Any) -> float | None:
if value is None:
return None
if isinstance(value, bool):
return float(value)
try:
return float(value)
except (TypeError, ValueError):
return None
def _int_or_none(self, value: Any) -> int | None:
numeric = self._float_or_none(value)
if numeric is None:
return None
return int(numeric)