82 lines
2.8 KiB
Python
82 lines
2.8 KiB
Python
from __future__ import annotations
|
|
|
|
from datetime import datetime, timezone
|
|
|
|
from gibil.classes.models import Decision, PowerStage, Snapshot
|
|
|
|
|
|
class GibilAgent:
|
|
"""Stateful decision engine for Astrape."""
|
|
|
|
def __init__(self) -> None:
|
|
self.surplus_latch_set = False
|
|
self.previous_stage: PowerStage | None = None
|
|
|
|
def decide(self, snapshot: Snapshot) -> Decision:
|
|
reasons: list[str] = []
|
|
|
|
self._update_surplus_latch(snapshot, reasons)
|
|
|
|
if self.surplus_latch_set:
|
|
stage = PowerStage.SURPLUS
|
|
reasons.append("surplus latch is set")
|
|
elif snapshot.cheap_window_active:
|
|
stage = PowerStage.CHEAP_GRID
|
|
reasons.append("cheap grid window is active")
|
|
elif self._should_conserve(snapshot):
|
|
stage = PowerStage.CONSERVE
|
|
reasons.append("battery is low and there is no useful solar surplus")
|
|
else:
|
|
stage = PowerStage.STANDARD
|
|
reasons.append("no surplus, cheap window, or conserve condition is active")
|
|
|
|
if self.previous_stage != stage:
|
|
previous = self.previous_stage.value if self.previous_stage else "none"
|
|
reasons.append(f"stage changed from {previous} to {stage.value}")
|
|
|
|
self.previous_stage = stage
|
|
|
|
return Decision(
|
|
created_at=datetime.now(timezone.utc),
|
|
stage=stage,
|
|
reasons=reasons,
|
|
confidence=self._confidence(snapshot),
|
|
)
|
|
|
|
def _update_surplus_latch(
|
|
self, snapshot: Snapshot, reasons: list[str]
|
|
) -> None:
|
|
if snapshot.battery_soc_pct is None or snapshot.solar_power_w is None:
|
|
return
|
|
|
|
home_power_w = snapshot.home_power_w or 0
|
|
has_surplus = snapshot.solar_power_w > home_power_w
|
|
|
|
if not self.surplus_latch_set:
|
|
if snapshot.battery_soc_pct >= 95 and has_surplus:
|
|
self.surplus_latch_set = True
|
|
reasons.append("surplus latch set: battery >= 95% and solar exceeds load")
|
|
return
|
|
|
|
if snapshot.battery_soc_pct < 80 or not has_surplus:
|
|
self.surplus_latch_set = False
|
|
reasons.append("surplus latch cleared: battery < 80% or surplus ended")
|
|
|
|
def _should_conserve(self, snapshot: Snapshot) -> bool:
|
|
if snapshot.battery_soc_pct is None:
|
|
return False
|
|
|
|
solar_power_w = snapshot.solar_power_w or 0
|
|
home_power_w = snapshot.home_power_w or 0
|
|
|
|
return snapshot.battery_soc_pct < 25 and solar_power_w < home_power_w
|
|
|
|
def _confidence(self, snapshot: Snapshot) -> float:
|
|
expected_inputs = [
|
|
snapshot.solar_power_w,
|
|
snapshot.home_power_w,
|
|
snapshot.battery_soc_pct,
|
|
]
|
|
present = sum(value is not None for value in expected_inputs)
|
|
return present / len(expected_inputs)
|