Files
rpotter6298 c8e3016fd6 Add new daemons and debug scripts for Sigenergy and Oracle functionalities
- Implement `sigen_daemon.py` to poll Sigenergy plant metrics and store snapshots.
- Create `web_daemon.py` for serving a web interface with various endpoints.
- Add debug scripts:
  - `debug_duplicates.py` to find duplicate target times in forecast data.
  - `debug_energy_forecast.py` to print baseline energy forecast curves.
  - `debug_oracle_evaluations.py` to run the oracle evaluator.
  - `debug_sigen.py` to inspect stored Sigenergy plant snapshots.
  - `debug_weather.py` to trace resolved truth data.
  - `modbus_test.py` for exploring Sigenergy plants or inverters over Modbus TCP.
- Introduce `oracle_evaluator.py` for evaluating stored oracle predictions against actuals.
- Add TCN training scripts in `tcn` directory for training usage sequence models.
2026-04-28 08:14:00 +02:00

384 lines
11 KiB
Python

#!/usr/bin/env python3
"""Explore a Sigenergy plant or inverter over Modbus TCP."""
from __future__ import annotations
import argparse
import json
from dataclasses import asdict
from os import environ
from gibil.classes.sigen.builder import SigenPlantClient
from gibil.classes.env_loader import EnvLoader
from gibil.classes.sigen.modbus import (
ModbusReadError,
ModbusReadResult,
RegisterKind,
SigenModbusClient,
)
from gibil.classes.sigen.registers import (
DEFAULT_INVERTER_REGISTER_NAMES,
DEFAULT_PLANT_REGISTER_NAMES,
INVERTER_REGISTERS,
PLANT_PARAMETER_REGISTERS,
PLANT_REGISTERS,
SigenRegister,
)
DEFAULT_KINDS: tuple[RegisterKind, ...] = ("holding", "input")
ALL_KINDS: tuple[RegisterKind, ...] = ("holding", "input", "coil", "discrete")
DEFAULT_UNIT_CANDIDATES = (0, 1, 2, 3, 247, 255)
def main() -> None:
EnvLoader().load()
args = parse_args()
if args.command == "units":
results = probe_units(args)
print_results(results, errors=True)
return
if args.command == "catalog":
if args.group in {"plant", "all"}:
print_catalog("Plant Sensors", PLANT_REGISTERS)
if args.group in {"params", "all"}:
print_catalog("Plant Parameters", PLANT_PARAMETER_REGISTERS)
return
if args.command == "snapshot":
snapshot = SigenPlantClient.from_env().fetch_snapshot()
print(json.dumps(asdict(snapshot), indent=2, default=str))
return
with SigenModbusClient(
host=args.host,
port=args.port,
unit_id=args.unit_id,
timeout=args.timeout,
retries=args.retries,
trace=args.trace,
) as client:
if args.command == "probe":
print(
f"Connected to {args.host}:{args.port} "
f"with unit id {args.unit_id}"
)
return
if args.command == "plant":
print_known_registers(client, args.register, PLANT_REGISTERS)
return
if args.command == "inverter":
print_known_registers(client, args.register, INVERTER_REGISTERS)
return
if args.command == "read":
try:
result = client.read(args.kind, args.address, args.count)
print_results([result], errors=True)
except Exception as exc:
print(
f"{args.kind:8} {args.address:5} "
f"+{args.count:<3} ERROR {exc}"
)
return
results: list[ModbusReadResult | ModbusReadError] = []
for kind in args.kind:
results.extend(
client.scan(
kind=kind,
start=args.start,
count=args.count,
chunk_size=args.chunk_size,
)
)
if args.json:
print(json.dumps([asdict(result) for result in results], indent=2))
else:
print_results(results, errors=args.errors)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Minimal Modbus TCP explorer for a Sigenergy plant/inverter."
)
parser.add_argument(
"--host",
default=environ.get("SIGEN_MODBUS_HOST"),
required="SIGEN_MODBUS_HOST" not in environ,
help="Modbus TCP host or IP. Can also be set as SIGEN_MODBUS_HOST.",
)
parser.add_argument(
"--port",
type=int,
default=int(environ.get("SIGEN_MODBUS_PORT", "502")),
help="Modbus TCP port. Defaults to 502.",
)
parser.add_argument(
"--unit-id",
type=int,
default=int(environ.get("SIGEN_MODBUS_UNIT_ID", "1")),
help="Modbus unit/slave id. Defaults to 1.",
)
parser.add_argument(
"--timeout",
type=float,
default=float(environ.get("SIGEN_MODBUS_TIMEOUT", "5")),
help="Socket timeout in seconds. Defaults to 5.",
)
parser.add_argument(
"--retries",
type=int,
default=int(environ.get("SIGEN_MODBUS_RETRIES", "3")),
help="Modbus request retries. Defaults to 3.",
)
parser.add_argument(
"--trace",
action="store_true",
help="Print Modbus TCP packet bytes to stderr.",
)
subparsers = parser.add_subparsers(dest="command", required=True)
subparsers.add_parser("probe", help="Open a connection and report success.")
subparsers.add_parser(
"snapshot",
help="Read core plant metrics and print the builder snapshot as JSON.",
)
catalog = subparsers.add_parser(
"catalog",
help="List known Sigenergy plant sensors and settable parameters.",
)
catalog.add_argument(
"group",
choices=("plant", "params", "all"),
nargs="?",
default="all",
help="Catalog group to list. Defaults to all.",
)
units = subparsers.add_parser(
"units",
help="Try small reads against likely unit ids.",
)
units.add_argument(
"--candidate",
action="append",
type=int,
default=None,
help=(
"Unit id candidate to try. Repeat for multiple ids. "
"Defaults to 0, 1, 2, 3, 247, and 255."
),
)
units.add_argument(
"--kind",
action="append",
choices=ALL_KINDS,
default=None,
help=(
"Register table to test. Repeat for multiple kinds. "
"Defaults to holding and input."
),
)
units.add_argument(
"--address",
type=int,
default=30000,
help="Address to test. Defaults to 30000.",
)
units.add_argument(
"--count",
type=int,
default=1,
help="Number of values to request. Defaults to 1.",
)
plant = subparsers.add_parser(
"plant",
help="Read a small set of known Sigenergy plant registers.",
)
plant.add_argument(
"--register",
action="append",
choices=sorted(PLANT_REGISTERS),
default=None,
help="Known plant register to read. Repeat for multiple registers.",
)
inverter = subparsers.add_parser(
"inverter",
help="Read a small set of known Sigenergy inverter registers.",
)
inverter.add_argument(
"--register",
action="append",
choices=sorted(INVERTER_REGISTERS),
default=None,
help="Known inverter register to read. Repeat for multiple registers.",
)
read = subparsers.add_parser(
"read",
help="Read one raw Modbus register range.",
)
read.add_argument(
"kind",
choices=ALL_KINDS,
help="Register table to read.",
)
read.add_argument(
"address",
type=int,
help="Modbus address to read.",
)
read.add_argument(
"count",
type=int,
nargs="?",
default=1,
help="Number of values to read. Defaults to 1.",
)
scan = subparsers.add_parser("scan", help="Scan register ranges in chunks.")
scan.add_argument(
"--kind",
action="append",
choices=ALL_KINDS,
default=None,
help=(
"Register table to scan. Repeat for multiple kinds. "
"Defaults to holding and input."
),
)
scan.add_argument(
"--start",
type=int,
default=0,
help="Starting zero-based Modbus address. Defaults to 0.",
)
scan.add_argument(
"--count",
type=int,
default=100,
help="Number of addresses to scan. Defaults to 100.",
)
scan.add_argument(
"--chunk-size",
type=int,
default=10,
help="Addresses per Modbus request. Defaults to 10.",
)
scan.add_argument(
"--errors",
action="store_true",
help="Show failed chunks as well as successful reads.",
)
scan.add_argument(
"--json",
action="store_true",
help="Print raw result objects as JSON.",
)
args = parser.parse_args()
if args.command == "scan" and args.kind is None:
args.kind = list(DEFAULT_KINDS)
if args.command == "units":
if args.kind is None:
args.kind = list(DEFAULT_KINDS)
if args.candidate is None:
args.candidate = list(DEFAULT_UNIT_CANDIDATES)
if args.command == "plant" and args.register is None:
args.register = list(DEFAULT_PLANT_REGISTER_NAMES)
if args.command == "inverter" and args.register is None:
args.register = list(DEFAULT_INVERTER_REGISTER_NAMES)
return args
def probe_units(args: argparse.Namespace) -> list[ModbusReadResult | ModbusReadError]:
results: list[ModbusReadResult | ModbusReadError] = []
for unit_id in args.candidate:
with SigenModbusClient(
host=args.host,
port=args.port,
unit_id=unit_id,
timeout=args.timeout,
retries=args.retries,
trace=args.trace,
) as client:
for kind in args.kind:
try:
result = client.read(kind, args.address, args.count)
results.append(result)
except Exception as exc:
results.append(
ModbusReadError(
kind=kind,
address=args.address,
count=args.count,
error=f"unit {unit_id}: {exc}",
)
)
return results
def print_known_registers(
client: SigenModbusClient,
register_names: list[str],
registers: dict[str, SigenRegister],
) -> None:
for register_name in register_names:
register = registers[register_name]
try:
result = client.read(register.kind, register.address, register.count)
value = register.decode(result.values)
unit = f" {register.unit}" if register.unit else ""
raw_values = " ".join(str(value) for value in result.values)
print(
f"{register.name:32} {value}{unit:4} "
f"({register.kind} {register.address} +{register.count}: {raw_values})"
)
except Exception as exc:
print(
f"{register.name:32} ERROR "
f"({register.kind} {register.address} +{register.count}: {exc})"
)
def print_catalog(title: str, registers: dict[str, SigenRegister]) -> None:
print(title)
print("-" * len(title))
for register in registers.values():
unit = register.unit or ""
description = register.description or ""
print(
f"{register.name:48} {register.kind:7} "
f"{register.address:5} +{register.count:<2} "
f"{register.data_type:6} gain={register.gain:<7g} "
f"{unit:5} {description}"
)
print()
def print_results(
results: list[ModbusReadResult | ModbusReadError],
errors: bool,
) -> None:
for result in results:
if isinstance(result, ModbusReadError):
if errors:
print(
f"{result.kind:8} {result.address:5} "
f"+{result.count:<3} ERROR {result.error}"
)
continue
values = " ".join(str(value) for value in result.values)
print(f"{result.kind:8} {result.address:5} +{result.count:<3} {values}")
if __name__ == "__main__":
main()