#!/usr/bin/env python # coding: utf-8 # In[1]: """ ************************************************************************** * WARNHINWEIS: Die Verwendung dieses Skripts erfolgt auf eigene Gefahr! * * Unsachgemäße Nutzung kann zu gefährlichen Zuständen und/oder Schäden * * am Wechselrichter oder angeschlossenen Anlagen führen. * * Für Folgen aus der Nutzung übernimmt der Autor keine Haftung! * ************************************************************************** Fronius/SunSpec Modbus-Registertool Dieses Skript ermöglicht das komfortable Auslesen und (sofern erlaubt) Beschreiben von Modbus-Parametern an einem Fronius-Wechselrichter (oder kompatiblen SunSpec-Geräten) über Modbus TCP. **Das Skript benötigt dafür eine CSV-Datei, welche die vollständige "Complete Map" für das SunSpec-Modell mit allen absoluten Modbus-Registeradressen enthält.** WICHTIGE HINWEISE: ------------------ - Dieses Skript ist für Integer-basierte SunSpec-Modelle (z.B. 101, 102, 103, "int+SF") konzipiert. - Es unterstützt Register, die als int16/uint16 mit oder ohne zugehörigen Skalierungsfaktor (_SF) vorliegen. Register ohne _SF werden direkt als Integer interpretiert. - Für Floating-Point-Modelle (z.B. SunSpec 112, 113, 120) oder andere Datentypen ist dieses Skript NICHT geeignet! - **Lesen und Schreiben von/zu dem Wechselrichter funktioniert NUR, wenn eine Modbusverbindung besteht.** Ist keine Verbindung möglich, steht ausschließlich die Suchfunktion zur Verfügung. - **Schreibtransaktionen sind nur möglich, wenn das Modellregister (meist 40071) den Wert 101, 102 oder 103 enthält.** Bei anderen Modellen sind Schreibvorgänge gesperrt. BEGRIFFSKLÄRUNG: ---------------- - Ein **Datensatz** ist eine Zeile der Registertabelle (CSV), die alle Metadaten zu einem Modbus-Parameter enthält: START (Registeradresse Anfang), ENDE (Registeradresse Ende), NAME (Kurzbezeichnung), DESCRIPTION (Beschreibung), Typ, Einheit, Skalierungsfaktor usw. - Ein **Parameter** ist der eigentliche Wert oder die Funktion im Wechselrichter, die über einen oder mehrere Register (und den zugehörigen Datensatz) angesprochen wird. Suchfunktion: ------------- Suche in der CSV-Datei nach Datensätzen, bei denen der eingegebene Suchbegriff in den Feldern Start (Register), End (Register), Name oder Description vorkommt, und gib die zugehörigen Registeradressen aus. Wird ein Scale Factor verwendet, wird das zugehörige Register ebenfalls angezeigt. Hinweis zu SunSpec-Modellnummern: ---------------------------------- Die SunSpec-Modellnummer (z.B. 101, 102, 103) im Register 'Model' beschreibt ausschließlich die Netzphasenart: - 101: einphasig (single phase) - 102: split-phase - 103: dreiphasig (three phase) Die Anzahl der MPPTs ist darin nicht enthalten und wird in der Registertabelle nicht abgebildet. Was ist bei Floating-Point-Modellen zu ändern? - Floating-Point-Register (z.B. Typ "float32", "acc32", "acc64") belegen zwei oder vier Modbus-Register. - Die Werte müssen als 32- oder 64-Bit-Float interpretiert werden (z.B. mit struct.unpack). - Es gibt keinen Skalierungsfaktor (_SF), der Wert steht direkt als physikalischer Wert im Register. - Das Mapping muss den Typ erkennen und die entsprechende Lesefunktion anwenden. Funktionen: ----------- - Automatisches Einlesen einer Fronius/SunSpec-Registertabelle (CSV, Semikolon-getrennt) - Automatische Zuordnung von Skalierungsfaktoren (_SF) zu den jeweiligen Parametern (falls vorhanden) - Anzeige aller verfügbaren Parameter mit Registeradresse, Einheit und Les-/Schreibbarkeit - Werte werden beim Lesen korrekt mit dem Skalierungsfaktor multipliziert (falls vorhanden) - Schreibbare Register können beschrieben werden (mit Rückskalierung des Wertes, falls _SF vorhanden) - Nach dem Schreiben wird der Wert 5 Sekunden später erneut ausgelesen und überprüft - **Suchfunktion:** Suche nach Start, End, Name oder Description, Ausgabe der Registeradressen. Diese Funktion steht immer zur Verfügung, auch wenn keine Modbusverbindung besteht. **Wird ein Scale Factor verwendet, wird das zugehörige Register ebenfalls angezeigt.** - **Protokollierung:** Alle Lese- und Schreibaktionen werden mit Zeitstempel, Register, Beschreibung und Wert in eine Logdatei geschrieben. Hinweise zur Benutzung: ----------------------- 1. Die CSV-Datei muss mit Semikolon (;) getrennt sein und folgende Spalten enthalten: Start, End, Size, R/W, Function codes, Name, Description, Type, Units, Scale Factor, Range of values 2. Die Spalte 'Name' enthält den Parameternamen (z.B. W oder W_SF), 'Start' die Registeradresse (z.B. 40084), 'Type' sollte 'int16' oder 'uint16' sein, 'R/W' gibt an, ob das Register lesbar/schreibbar ist, 'Scale Factor' verweist ggf. auf das zugehörige _SF-Register. 3. Beim Start fragt das Skript nach dem Pfad zur CSV-Datei und zur IP-Adresse des Wechselrichters. 4. Die Suchfunktion ermöglicht jederzeit die Suche nach Start, End, Name oder Description und gibt die Registeradressen zurück. 5. Bei bestehender Modbusverbindung kannst du einen Parameter aus der Liste auswählen (Name oder Nummer), der aktuelle Wert wird angezeigt. 6. Bei schreibbaren Registern wirst du gefragt, ob du einen neuen Wert schreiben möchtest. Der Wert wird bei Bedarf automatisch rückskaliert (physikalischer Wert / 10^SF). 7. Nach dem Schreiben wartet das Skript 5 Sekunden und liest den Wert erneut aus, um das Ergebnis zu prüfen. 8. Das Skript benötigt das Paket 'pymodbus' (installierbar via pip). Beispiel für CSV-Zeilen: ------------------------ Start;End;Size;R/W;Function codes;Name;Description;Type;Units;Scale Factor;Range of values 40084;40084;1;R/W;0x03,0x06;W;AC Power value;int16;W;W_SF; 40085;40085;1;R;0x03;W_SF;AC Power Scale Factor;sunssf;;; Autor: ARC (akkudoktor.net) hingewixxt mit Perplexity AI Unterstützung """ import csv import time import os from datetime import datetime from pymodbus.client import ModbusTcpClient def print_warning(): print("\n" + "*" * 75) print("WARNHINWEIS: Die Verwendung dieses Skripts erfolgt auf eigene Gefahr!") print("Unsachgemäße Nutzung kann zu gefährlichen Zuständen und/oder Schäden") print("am Wechselrichter oder angeschlossenen Anlagen führen.") print("Für Folgen aus der Nutzung übernimmt der Autor keine Haftung!") print("*" * 75 + "\n") def get_log_filename(base="register_change_log.txt"): if not os.path.exists(base): return base i = 1 while True: fname = f"register_change_log_{i}.txt" if not os.path.exists(fname): return fname i += 1 def log_action(logfile, action, reg_addr, description, value): timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") with open(logfile, "a", encoding="utf-8") as f: f.write(f"[{timestamp}] {action}: Register {reg_addr}, {description}, Wert: {value}\n") def build_parameter_map(filename): parameter_map = {} sf_lookup = {} with open(filename, newline='', encoding='utf-8') as csvfile: reader = csv.DictReader(csvfile, delimiter=';') print("Gefundene Spaltenüberschriften:", reader.fieldnames) # Debug-Hilfe rows = list(reader) for row in rows: name = (row.get('Name') or '').strip() if name.endswith('_SF'): start_str = (row.get('Start') or '').strip() if start_str.isdigit(): sf_lookup[name] = int(start_str) for row in rows: param_name = (row.get('Name') or '').strip() sf_name = (row.get('Scale Factor') or '').strip() if not param_name or param_name.endswith('_SF'): continue reg_addr_str = (row.get('Start') or '').strip() if not reg_addr_str.isdigit(): continue reg_addr = int(reg_addr_str) unit = (row.get('Units') or '').strip() signed = (row.get('Type') or '').lower() == 'int16' rw_field = (row.get('R/W') or '').strip().upper() readable = 'R' in rw_field writable = 'W' in rw_field sf_register = sf_lookup.get(sf_name) if sf_name else None description = (row.get('Description') or '').strip() range_of_values = (row.get('Range of values') or '').strip() parameter_map[param_name] = { "register": reg_addr, "sf_register": sf_register, "unit": unit, "signed": signed, "readable": readable, "writable": writable, "description": description, "range_of_values": range_of_values } return parameter_map def read_register(client, reg_addr, signed=False): rr = client.read_holding_registers(reg_addr - 40001, 1, unit=1) if hasattr(rr, 'registers') and rr.registers: value = rr.registers[0] if signed and value >= 0x8000: value -= 0x10000 return value else: print(f"Fehler beim Lesen von Register {reg_addr}") return None def write_register(client, reg_addr, value): wr = client.write_register(reg_addr - 40001, value, unit=1) if hasattr(wr, 'isError') and not wr.isError(): return True else: print(f"Fehler beim Schreiben in Register {reg_addr}") return False def search_registers(csvfile): suchbegriff = input("\nSuchbegriff für Start, End, Name oder Description eingeben (oder Enter zum Beenden): ").strip().lower() if not suchbegriff: return False found = False # Zuerst alle Zeilen einlesen, damit wir später auf _SF-Register zugreifen können with open(csvfile, newline='', encoding='utf-8') as f: reader = csv.DictReader(f, delimiter=';') rows = list(reader) # Erstelle ein Lookup für alle SF-Register sf_lookup = {} for row in rows: name = (row.get('Name') or '').strip() if name.endswith('_SF'): sf_lookup[name] = (row.get('Start') or '').strip() print("\nGefundene Datensätze:") for row in rows: start = (row.get('Start') or '').strip().lower() end = (row.get('End') or '').strip().lower() name = (row.get('Name') or '').strip().lower() desc = (row.get('Description') or '').strip().lower() if (suchbegriff in start or suchbegriff in end or suchbegriff in name or suchbegriff in desc): print(f"Start: {row.get('Start')}, End: {row.get('End')}, Name: {row.get('Name')}, Description: {row.get('Description')}") scale_factor_name = (row.get('Scale Factor') or '').strip() if scale_factor_name: sf_reg = sf_lookup.get(scale_factor_name) if sf_reg: print(f" -> Scale Factor-Register: {sf_reg} (Name: {scale_factor_name})") found = True if not found: print("Kein passender Datensatz gefunden.") return True def get_sunspec_model(client): model_reg = 40071 model = read_register(client, model_reg, signed=False) if model is not None: print(f"SunSpec Model-Register (Adresse {model_reg}): {model}") else: print("Konnte SunSpec Model-Register nicht lesen.") return model def main(): print_warning() csvfile = input("Pfad zur Register-CSV: ").strip() parameter_map = build_parameter_map(csvfile) ip = input("IP-Adresse des Wechselrichters: ").strip() logfile = get_log_filename() print(f"Protokolliere alle Lese- und Schreibaktionen in: {logfile}") client = ModbusTcpClient(ip, port=502) print("Prüfe Modbusverbindung zum Wechselrichter...") if not client.connect(): print("\nKEINE Modbusverbindung möglich!") print("Es steht nur die Suchfunktion zur Verfügung.") print("Beende das Programm mit Enter.\n") while True: if not search_registers(csvfile): break return model = get_sunspec_model(client) if model not in (101, 102, 103): print("\nWARNUNG: Das angeschlossene Gerät ist kein unterstütztes Integer-Inverter-Modell (101, 102, 103)!") print("Schreibtransaktionen werden deaktiviert. Lesen und Suche sind weiterhin möglich.\n") allow_write = False else: allow_write = True try: params = list(parameter_map.keys()) while True: print("\nMenü:") print("1: Parameter lesen" + (" / schreiben" if allow_write else "")) print("2: Suche nach Start, End, Name oder Description") print("3: Beenden") auswahl = input("Bitte Auswahl eingeben: ").strip() if auswahl == "1": print("\nVerfügbare Parameter:") for i, p in enumerate(params): print(f"{i+1}: {p}") choice = input("\nParametername oder Nummer eingeben: ").strip() if choice.isdigit(): idx = int(choice) - 1 if 0 <= idx < len(params): param = params[idx] else: print("Ungültige Nummer.") continue else: param = choice if param not in parameter_map: print("Unbekannter Parameter.") continue info = parameter_map[param] if not info["readable"]: print("Dieses Register ist nicht lesbar.") continue value = read_register(client, info["register"], signed=info["signed"]) if value is None: continue sf = None if info["sf_register"]: sf = read_register(client, info["sf_register"], signed=True) if sf is not None: value = value * (10 ** sf) vorzeichen = "±" if info["signed"] else "+" rw = [] if info["readable"]: rw.append("lesbar") if info["writable"]: rw.append("schreibbar") print(f"\nParameter: {param}") print(f"Wert: {value} {info['unit']} ({vorzeichen})") print(f"Register: {info['register']} ({', '.join(rw)})") log_action(logfile, "LESEN", info["register"], info["description"], value) if info["writable"] and allow_write: do_write = input("\nNeuen Wert in dieses Register schreiben? (j/n): ").strip().lower() if do_write == "j": try: new_value = float(input(f"Gib den neuen Wert für {param} in {info['unit']} ein: ")) if info["sf_register"]: if sf is None: sf = read_register(client, info["sf_register"], signed=True) if sf is not None: raw_value = int(round(new_value / (10 ** sf))) print(f"Rückskalierter Wert für das Register (wird geschrieben): {raw_value}") else: print("Skalierungsfaktor konnte nicht gelesen werden.") continue else: raw_value = int(round(new_value)) if info["signed"]: if not (-32768 <= raw_value <= 32767): print("Wert außerhalb des zulässigen Bereichs für int16.") continue else: if not (0 <= raw_value <= 65535): print("Wert außerhalb des zulässigen Bereichs für uint16.") continue if write_register(client, info["register"], raw_value): print("Wert geschrieben. Warte 5 Sekunden...") log_action(logfile, "SCHREIBEN", info["register"], info["description"], new_value) time.sleep(5) check_value = read_register(client, info["register"], signed=info["signed"]) if info["sf_register"] and sf is not None: check_value = check_value * (10 ** sf) print(f"Neuer Wert im Register: {check_value} {info['unit']}") if abs(check_value - new_value) < 1e-6: print("Schreiben war erfolgreich.") else: print("Warnung: Wert im Register stimmt nicht exakt mit Eingabe überein.") else: print("Fehler beim Schreiben.") except Exception as e: print(f"Fehler bei der Eingabe oder beim Schreiben: {e}") elif info["writable"] and not allow_write: print("WARNUNG: Schreibtransaktionen sind für dieses Gerät deaktiviert (kein unterstütztes Modell 101/102/103).") elif auswahl == "2": search_registers(csvfile) elif auswahl == "3": print("Beende das Programm.") break else: print("Ungültige Auswahl.") finally: client.close() if __name__ == "__main__": main() # In[ ]: