from __future__ import annotations from datetime import datetime, timezone from os import environ from typing import Any from gibil.classes.models import SigenPlantSnapshot from gibil.classes.sigen.modbus import SigenModbusClient from gibil.classes.sigen.registers import PLANT_REGISTERS, SigenRegister CORE_PLANT_REGISTER_NAMES = ( "plant_system_time", "plant_ems_work_mode", "plant_grid_sensor_status", "plant_grid_sensor_active_power", "plant_ess_soc", "plant_active_power", "plant_sigen_photovoltaic_power", "plant_ess_power", "plant_running_state", "plant_ess_soh", "plant_accumulated_pv_energy", "plant_daily_consumed_energy", "plant_accumulated_consumed_energy", "plant_total_load_power", ) class SigenPlantClient: """Fetches plant-level Sigenergy metrics over Modbus TCP.""" def __init__(self, modbus_client: SigenModbusClient) -> None: self.modbus_client = modbus_client @classmethod def from_env(cls) -> "SigenPlantClient": host = environ.get("SIGEN_MODBUS_HOST") if not host: raise RuntimeError("SIGEN_MODBUS_HOST is required for Sigen Modbus reads") return cls( SigenModbusClient( host=host, port=int(environ.get("SIGEN_MODBUS_PORT", "502")), unit_id=int(environ.get("SIGEN_MODBUS_UNIT_ID", "247")), timeout=float(environ.get("SIGEN_MODBUS_TIMEOUT", "20")), retries=int(environ.get("SIGEN_MODBUS_RETRIES", "3")), ) ) def fetch_snapshot( self, register_names: tuple[str, ...] = CORE_PLANT_REGISTER_NAMES, ) -> SigenPlantSnapshot: with self.modbus_client as client: values = self._read_values(client, register_names) return SigenBuilder().build_snapshot(values) def _read_values( self, client: SigenModbusClient, register_names: tuple[str, ...], ) -> dict[str, int | float | str | bool | None]: values: dict[str, int | float | str | bool | None] = {} for name in register_names: register = PLANT_REGISTERS[name] try: values[name] = self._read_value(client, register) except Exception as exc: values[name] = None values[f"{name}_error"] = str(exc) return values def _read_value( self, client: SigenModbusClient, register: SigenRegister, ) -> int | float | str | bool | None: result = client.read(register.kind, register.address, register.count) return register.decode(result.values) class SigenBuilder: """Builds database-ready Sigenergy plant snapshots from decoded registers.""" max_plant_clock_drift_seconds = 300 def build_snapshot( self, values: dict[str, Any], received_at: datetime | None = None, ) -> SigenPlantSnapshot: if received_at is None: received_at = datetime.now(timezone.utc) plant_epoch_seconds = self._int_or_none(values.get("plant_system_time")) observed_at = self._observed_at(plant_epoch_seconds, received_at) grid_power_w = self._kw_to_w(values.get("plant_grid_sensor_active_power")) return SigenPlantSnapshot( observed_at=observed_at, received_at=received_at, plant_epoch_seconds=plant_epoch_seconds, plant_ems_work_mode=self._int_or_none(values.get("plant_ems_work_mode")), plant_running_state=self._int_or_none(values.get("plant_running_state")), grid_sensor_status=self._int_or_none( values.get("plant_grid_sensor_status") ), solar_power_w=self._kw_to_w( values.get("plant_sigen_photovoltaic_power") ), battery_soc_pct=self._float_or_none(values.get("plant_ess_soc")), battery_soh_pct=self._float_or_none(values.get("plant_ess_soh")), battery_power_w=self._kw_to_w(values.get("plant_ess_power")), grid_power_w=grid_power_w, grid_import_w=max(grid_power_w, 0.0) if grid_power_w is not None else None, grid_export_w=abs(min(grid_power_w, 0.0)) if grid_power_w is not None else None, load_power_w=self._kw_to_w(values.get("plant_total_load_power")), plant_active_power_w=self._kw_to_w(values.get("plant_active_power")), accumulated_pv_energy_kwh=self._float_or_none( values.get("plant_accumulated_pv_energy") ), daily_consumed_energy_kwh=self._float_or_none( values.get("plant_daily_consumed_energy") ), accumulated_consumed_energy_kwh=self._float_or_none( values.get("plant_accumulated_consumed_energy") ), raw_values=dict(values), ) def _observed_at( self, plant_epoch_seconds: int | None, fallback: datetime, ) -> datetime: if plant_epoch_seconds is None: return fallback try: plant_time = datetime.fromtimestamp(plant_epoch_seconds, timezone.utc) except (OverflowError, OSError, ValueError): return fallback drift_seconds = abs((fallback - plant_time).total_seconds()) if drift_seconds > self.max_plant_clock_drift_seconds: return fallback return plant_time def _kw_to_w(self, value: Any) -> float | None: numeric = self._float_or_none(value) if numeric is None: return None return numeric * 1000 def _float_or_none(self, value: Any) -> float | None: if value is None: return None if isinstance(value, bool): return float(value) try: return float(value) except (TypeError, ValueError): return None def _int_or_none(self, value: Any) -> int | None: numeric = self._float_or_none(value) if numeric is None: return None return int(numeric)