#!/usr/bin/env python3 """Explore a Sigenergy plant or inverter over Modbus TCP.""" from __future__ import annotations import argparse import json from dataclasses import asdict from os import environ from gibil.classes.sigen.builder import SigenPlantClient from gibil.classes.env_loader import EnvLoader from gibil.classes.sigen.modbus import ( ModbusReadError, ModbusReadResult, RegisterKind, SigenModbusClient, ) from gibil.classes.sigen.registers import ( DEFAULT_INVERTER_REGISTER_NAMES, DEFAULT_PLANT_REGISTER_NAMES, INVERTER_REGISTERS, PLANT_PARAMETER_REGISTERS, PLANT_REGISTERS, SigenRegister, ) DEFAULT_KINDS: tuple[RegisterKind, ...] = ("holding", "input") ALL_KINDS: tuple[RegisterKind, ...] = ("holding", "input", "coil", "discrete") DEFAULT_UNIT_CANDIDATES = (0, 1, 2, 3, 247, 255) def main() -> None: EnvLoader().load() args = parse_args() if args.command == "units": results = probe_units(args) print_results(results, errors=True) return if args.command == "catalog": if args.group in {"plant", "all"}: print_catalog("Plant Sensors", PLANT_REGISTERS) if args.group in {"params", "all"}: print_catalog("Plant Parameters", PLANT_PARAMETER_REGISTERS) return if args.command == "snapshot": snapshot = SigenPlantClient.from_env().fetch_snapshot() print(json.dumps(asdict(snapshot), indent=2, default=str)) return with SigenModbusClient( host=args.host, port=args.port, unit_id=args.unit_id, timeout=args.timeout, retries=args.retries, trace=args.trace, ) as client: if args.command == "probe": print( f"Connected to {args.host}:{args.port} " f"with unit id {args.unit_id}" ) return if args.command == "plant": print_known_registers(client, args.register, PLANT_REGISTERS) return if args.command == "inverter": print_known_registers(client, args.register, INVERTER_REGISTERS) return if args.command == "read": try: result = client.read(args.kind, args.address, args.count) print_results([result], errors=True) except Exception as exc: print( f"{args.kind:8} {args.address:5} " f"+{args.count:<3} ERROR {exc}" ) return results: list[ModbusReadResult | ModbusReadError] = [] for kind in args.kind: results.extend( client.scan( kind=kind, start=args.start, count=args.count, chunk_size=args.chunk_size, ) ) if args.json: print(json.dumps([asdict(result) for result in results], indent=2)) else: print_results(results, errors=args.errors) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Minimal Modbus TCP explorer for a Sigenergy plant/inverter." ) parser.add_argument( "--host", default=environ.get("SIGEN_MODBUS_HOST"), required="SIGEN_MODBUS_HOST" not in environ, help="Modbus TCP host or IP. Can also be set as SIGEN_MODBUS_HOST.", ) parser.add_argument( "--port", type=int, default=int(environ.get("SIGEN_MODBUS_PORT", "502")), help="Modbus TCP port. Defaults to 502.", ) parser.add_argument( "--unit-id", type=int, default=int(environ.get("SIGEN_MODBUS_UNIT_ID", "1")), help="Modbus unit/slave id. Defaults to 1.", ) parser.add_argument( "--timeout", type=float, default=float(environ.get("SIGEN_MODBUS_TIMEOUT", "5")), help="Socket timeout in seconds. Defaults to 5.", ) parser.add_argument( "--retries", type=int, default=int(environ.get("SIGEN_MODBUS_RETRIES", "3")), help="Modbus request retries. Defaults to 3.", ) parser.add_argument( "--trace", action="store_true", help="Print Modbus TCP packet bytes to stderr.", ) subparsers = parser.add_subparsers(dest="command", required=True) subparsers.add_parser("probe", help="Open a connection and report success.") subparsers.add_parser( "snapshot", help="Read core plant metrics and print the builder snapshot as JSON.", ) catalog = subparsers.add_parser( "catalog", help="List known Sigenergy plant sensors and settable parameters.", ) catalog.add_argument( "group", choices=("plant", "params", "all"), nargs="?", default="all", help="Catalog group to list. Defaults to all.", ) units = subparsers.add_parser( "units", help="Try small reads against likely unit ids.", ) units.add_argument( "--candidate", action="append", type=int, default=None, help=( "Unit id candidate to try. Repeat for multiple ids. " "Defaults to 0, 1, 2, 3, 247, and 255." ), ) units.add_argument( "--kind", action="append", choices=ALL_KINDS, default=None, help=( "Register table to test. Repeat for multiple kinds. " "Defaults to holding and input." ), ) units.add_argument( "--address", type=int, default=30000, help="Address to test. Defaults to 30000.", ) units.add_argument( "--count", type=int, default=1, help="Number of values to request. Defaults to 1.", ) plant = subparsers.add_parser( "plant", help="Read a small set of known Sigenergy plant registers.", ) plant.add_argument( "--register", action="append", choices=sorted(PLANT_REGISTERS), default=None, help="Known plant register to read. Repeat for multiple registers.", ) inverter = subparsers.add_parser( "inverter", help="Read a small set of known Sigenergy inverter registers.", ) inverter.add_argument( "--register", action="append", choices=sorted(INVERTER_REGISTERS), default=None, help="Known inverter register to read. Repeat for multiple registers.", ) read = subparsers.add_parser( "read", help="Read one raw Modbus register range.", ) read.add_argument( "kind", choices=ALL_KINDS, help="Register table to read.", ) read.add_argument( "address", type=int, help="Modbus address to read.", ) read.add_argument( "count", type=int, nargs="?", default=1, help="Number of values to read. Defaults to 1.", ) scan = subparsers.add_parser("scan", help="Scan register ranges in chunks.") scan.add_argument( "--kind", action="append", choices=ALL_KINDS, default=None, help=( "Register table to scan. Repeat for multiple kinds. " "Defaults to holding and input." ), ) scan.add_argument( "--start", type=int, default=0, help="Starting zero-based Modbus address. Defaults to 0.", ) scan.add_argument( "--count", type=int, default=100, help="Number of addresses to scan. Defaults to 100.", ) scan.add_argument( "--chunk-size", type=int, default=10, help="Addresses per Modbus request. Defaults to 10.", ) scan.add_argument( "--errors", action="store_true", help="Show failed chunks as well as successful reads.", ) scan.add_argument( "--json", action="store_true", help="Print raw result objects as JSON.", ) args = parser.parse_args() if args.command == "scan" and args.kind is None: args.kind = list(DEFAULT_KINDS) if args.command == "units": if args.kind is None: args.kind = list(DEFAULT_KINDS) if args.candidate is None: args.candidate = list(DEFAULT_UNIT_CANDIDATES) if args.command == "plant" and args.register is None: args.register = list(DEFAULT_PLANT_REGISTER_NAMES) if args.command == "inverter" and args.register is None: args.register = list(DEFAULT_INVERTER_REGISTER_NAMES) return args def probe_units(args: argparse.Namespace) -> list[ModbusReadResult | ModbusReadError]: results: list[ModbusReadResult | ModbusReadError] = [] for unit_id in args.candidate: with SigenModbusClient( host=args.host, port=args.port, unit_id=unit_id, timeout=args.timeout, retries=args.retries, trace=args.trace, ) as client: for kind in args.kind: try: result = client.read(kind, args.address, args.count) results.append(result) except Exception as exc: results.append( ModbusReadError( kind=kind, address=args.address, count=args.count, error=f"unit {unit_id}: {exc}", ) ) return results def print_known_registers( client: SigenModbusClient, register_names: list[str], registers: dict[str, SigenRegister], ) -> None: for register_name in register_names: register = registers[register_name] try: result = client.read(register.kind, register.address, register.count) value = register.decode(result.values) unit = f" {register.unit}" if register.unit else "" raw_values = " ".join(str(value) for value in result.values) print( f"{register.name:32} {value}{unit:4} " f"({register.kind} {register.address} +{register.count}: {raw_values})" ) except Exception as exc: print( f"{register.name:32} ERROR " f"({register.kind} {register.address} +{register.count}: {exc})" ) def print_catalog(title: str, registers: dict[str, SigenRegister]) -> None: print(title) print("-" * len(title)) for register in registers.values(): unit = register.unit or "" description = register.description or "" print( f"{register.name:48} {register.kind:7} " f"{register.address:5} +{register.count:<2} " f"{register.data_type:6} gain={register.gain:<7g} " f"{unit:5} {description}" ) print() def print_results( results: list[ModbusReadResult | ModbusReadError], errors: bool, ) -> None: for result in results: if isinstance(result, ModbusReadError): if errors: print( f"{result.kind:8} {result.address:5} " f"+{result.count:<3} ERROR {result.error}" ) continue values = " ".join(str(value) for value in result.values) print(f"{result.kind:8} {result.address:5} +{result.count:<3} {values}") if __name__ == "__main__": main()