Hallo,
ich wurde per PN gefragt wurde ob ich den Code teilen kann. So haben denke ich alle weniger Arbeit. 
Ich habe eben nochmal ein wenig getuned. Scheint aktuell ganz gut zu laufen. Hatte heute einen Netzbezug von 0,16KWh. Ich denke das ist ok für mich.
#!/usr/bin/env python3
"""
Energy Controller v2 – SolarEdge + Marstek Venus E
====================================================
Wesentliche Änderungen gegenüber v1:
1. Adaptive Regelung
- Eine Basis-Verstärkung `kp` (per MQTT tunbar)
- Interne Skalierung: große Schritte bei großem Fehler, feines
Nachregeln bei kleinem Fehler — verhindert Pendeln nahe Sollwert
- Echtes MAX_STEP-Limit (per MQTT tunbar)
- DEADBAND per MQTT tunbar
2. Marstek-Reads minimiert (schont den begrenzten Modbus-Slot)
- Standardmäßig nur **alle 2 Minuten** SOC/Power/Voltage lesen
- Bei jedem Schreibbefehl wird ein Read piggy-backed
- Write-Result wird geprüft (`isError()`) statt blind zu lesen
3. Energie-Zähler aus kommandiertem Setpoint
- `battery_charged_kwh` / `battery_discharged_kwh` integrieren den
zuletzt gesendeten Befehl × dt (genauer als 2-Min-Sampling)
- Bei SOC-Limit wird Beitrag genullt (keine Phantom-Buchungen)
4. RS485-Handshake nur noch beim Verbindungsaufbau + alle 5 min
- Spart ~250ms pro Power-Update
- Vor jedem Write: wenn Handshake älter als 5 min → erneuern
5. SOC-Buffer-Zonen
- 85–90 % SOC: Ladeleistung auf 500W gedeckelt
- 15–20 % SOC: Entladeleistung auf 500W gedeckelt
- <15 % / >90 %: Stop. Anti-Wind-up: Setpoint genullt.
6. Tunbare Parameter via MQTT (persistent über Reboot)
- marstek/cmd/kp (Basis-Verstärkung, 0.1–3.0)
- marstek/cmd/max_step (W, 50–2000)
- marstek/cmd/deadband (W, 5–200)
- marstek/cmd/max_charge_power (W, 100–MAX_POWER)
- marstek/cmd/max_discharge_power (W, 100–MAX_POWER)
- marstek/cmd/target_offset (W)
- marstek/cmd/min_soc (%, 5–80)
- marstek/cmd/mode (eigenverbrauch|laden|entladen|stop)
- marstek/cmd/charge_power (W, fester Modus)
- marstek/cmd/discharge_power (W, fester Modus)
7. Direction-Change-Delay entfernt.
"""
import time
import logging
import signal
import sys
import json
import os
from pymodbus.client import ModbusTcpClient
import paho.mqtt.client as mqtt
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
# ============================================================
# KONFIGURATION
# ============================================================
SE_HOST = "192.168.0.90"
SE_PORT = 1502
VE_HOST = "192.168.0.104"
VE_PORT = 502
MQTT_HOST = "192.168.0.32"
MQTT_PORT = 1883
MQTT_USER = ""
MQTT_PASS = ""
MQTT_PREFIX = ""
INFLUX_URL = "http://192.168.0.35:8086"
INFLUX_TOKEN = ""
INFLUX_ORG = ""
INFLUX_BUCKET = ""
# Loop-Timing
LOOP_INTERVAL = 5 # Haupt-Loop (SolarEdge wird jeden Tick gelesen)
PUBLISH_INTERVAL = 10 # MQTT-State publish
INFLUX_INTERVAL = 10 # InfluxDB write
STATS_INTERVAL = 60 # Zeitraum-Statistiken
PERSIST_INTERVAL = 300 # Zähler auf Disk speichern
# Marstek-Polling
VE_FALLBACK_READ = 120 # spätestens alle 2 min Marstek lesen
HANDSHAKE_REFRESH = 300 # RS485-Enable alle 5 min erneuern
# Festgrenzen (NICHT per MQTT)
MAX_POWER = 2500 # Hardware-Obergrenze (sowohl Laden als Entladen)
MAX_SOC = 90 # SOC-Obergrenze (Stopp)
BUFFER_WIDTH = 5 # SOC-Buffer-Breite in Prozentpunkten
BUFFER_LIMIT = 500 # W innerhalb der Buffer-Zone
# Regelung — interne adaptive Stufung (Skalierung × kp)
# (|error| > Schwelle → scale)
ADAPTIVE_GAIN_TABLE = [
(800, 1.0), # großer Fehler → voller Schritt
(300, 0.7),
(100, 0.4),
(0, 0.2), # nahe Sollwert → feines Nachregeln
]
GRID_AVG_LEN = 3 # Glättung: 3 × 5s = 15s Fenster
# Trickle-Modus: langsame Konvergenz innerhalb der Deadband
TRICKLE_INTERVAL = 30 # sek zwischen Trickle-Schritten
TRICKLE_DEADBAND = 5 # W — innerhalb davon wirklich Ruhe
TRICKLE_MIN_STEP = 5 # W minimaler Nudge
TRICKLE_MAX_STEP = 15 # W maximaler Nudge
TRICKLE_GAIN = 0.5 # Anteil des Fehlers als Schrittweite
# Statistik-Zeiträume
STAT_PERIODS = ["-6h", "-12h", "-1d", "-3d", "-7d", "-30d", "-1y", "-20y"]
# Persistenz-Pfade
PERSIST_DIR = "/var/lib/energy_controller"
COUNTERS_FILE = f"{PERSIST_DIR}/counters.json"
CONFIG_FILE = f"{PERSIST_DIR}/config.json"
# Default-Werte für tunbare Parameter
DEFAULTS = {
"kp": 1.0,
"max_step": 700,
"deadband": 40,
"max_charge_power": 800,
"max_discharge_power": 800,
"target_offset": -15,
"min_soc": 15,
"charge_power": 800,
"discharge_power": 800,
}
# ============================================================
# LOGGING
# ============================================================
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("/var/log/energy_controller.log"),
]
)
log = logging.getLogger(__name__)
# ============================================================
# ZUSTAND
# ============================================================
state = {
# Messwerte
"grid_power": 0.0,
"pv_power": 0.0,
"battery_power": 0,
"house_power": 0.0,
"soc": 50, # neutraler Default bis 1. Read
"battery_voltage": 0.0,
"inverter_status": "",
"inverter_temp": 0.0,
# kWh (aus Modbus)
"pv_lifetime_kwh": 0.0,
"meter_imported": 0.0,
"meter_exported": 0.0,
# kWh (selbst integriert)
"battery_charged_kwh": 0.0,
"battery_discharged_kwh": 0.0,
"house_energy_kwh": 0.0,
# Tunbare Parameter (werden aus CONFIG_FILE geladen)
"mode": "stop",
"kp": DEFAULTS["kp"],
"max_step": DEFAULTS["max_step"],
"deadband": DEFAULTS["deadband"],
"max_charge_power": DEFAULTS["max_charge_power"],
"max_discharge_power": DEFAULTS["max_discharge_power"],
"target_offset": DEFAULTS["target_offset"],
"min_soc": DEFAULTS["min_soc"],
"charge_power": DEFAULTS["charge_power"],
"discharge_power": DEFAULTS["discharge_power"],
# Interne Regler-Zustände
"last_setpoint": 0,
"commanded_power": 0, # signed: + laden, − entladen, 0 stop
"last_ve_mode": -1, # 0=stop, 1=laden, 2=entladen
"last_ve_power": 0, # absolut
"last_command_time": 0.0, # für piggy-back & dt-Integration
"last_ve_read": 0.0, # letzter Marstek-Read
"pending_ve_read": 0.0, # zeitpunkt für verzögerten piggy-back read (0 = keiner)
"last_handshake": 0.0, # letzter 42000=21930 Schreibvorgang
"_last_trickle": 0.0, # letzter Trickle-Schritt
# Energie-Flüsse (W & %)
"pv_to_grid_w": 0.0,
"pv_to_house_w": 0.0,
"pv_to_house_w_negativ": 0.0,
"pv_to_battery_w": 0.0,
"grid_to_house_w": 0.0,
"battery_to_house_w": 0.0,
"pv_to_grid_pct": 0.0,
"pv_to_house_pct": 0.0,
"pv_to_battery_pct": 0.0,
"grid_to_house_pct": 0.0,
"pv_to_house_cons_pct": 0.0,
"battery_to_house_pct": 0.0,
# Delta-Buchführung
"_last_pv_kwh": None,
"_last_imported": None,
"_last_exported": None,
"_last_bat_ch": 0.0,
"_last_bat_disch": 0.0,
"last_counter_time": 0.0,
# Fehlerzähler
"errors_se": 0,
"errors_ve": 0,
}
running = True
mqtt_client = None
se_client = None
ve_client = None
# Glättung Netzleistung
_grid_history = []
# ============================================================
# PERSISTENZ
# ============================================================
COUNTER_KEYS = ["battery_charged_kwh", "battery_discharged_kwh", "house_energy_kwh"]
CONFIG_KEYS = list(DEFAULTS.keys()) + ["mode"]
def load_counters():
try:
os.makedirs(PERSIST_DIR, exist_ok=True)
if os.path.exists(COUNTERS_FILE):
with open(COUNTERS_FILE) as f:
data = json.load(f)
for key in COUNTER_KEYS:
if key in data:
state[key] = float(data[key])
log.info(f"Zähler geladen: bat_ch={state['battery_charged_kwh']:.3f} kWh, "
f"bat_disch={state['battery_discharged_kwh']:.3f} kWh")
except Exception as e:
log.warning(f"Zähler laden fehlgeschlagen: {e}")
def save_counters():
try:
os.makedirs(PERSIST_DIR, exist_ok=True)
with open(COUNTERS_FILE, "w") as f:
json.dump({key: state[key] for key in COUNTER_KEYS}, f, indent=2)
except Exception as e:
log.warning(f"Zähler speichern fehlgeschlagen: {e}")
def load_config():
try:
os.makedirs(PERSIST_DIR, exist_ok=True)
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE) as f:
data = json.load(f)
for key in CONFIG_KEYS:
if key in data:
state[key] = data[key]
log.info(f"Config geladen: kp={state['kp']}, max_step={state['max_step']}, "
f"deadband={state['deadband']}, max_charge={state['max_charge_power']}W, "
f"max_disch={state['max_discharge_power']}W, mode={state['mode']}")
except Exception as e:
log.warning(f"Config laden fehlgeschlagen: {e}")
def save_config():
try:
os.makedirs(PERSIST_DIR, exist_ok=True)
with open(CONFIG_FILE, "w") as f:
json.dump({key: state[key] for key in CONFIG_KEYS}, f, indent=2)
except Exception as e:
log.warning(f"Config speichern fehlgeschlagen: {e}")
# ============================================================
# MODBUS VERBINDUNGSMANAGEMENT
# ============================================================
def ensure_se_connected():
global se_client
if se_client and se_client.connected:
return True
try:
se_client = ModbusTcpClient(SE_HOST, port=SE_PORT, timeout=5)
if se_client.connect():
log.info("SolarEdge verbunden")
return True
log.warning("SolarEdge: Verbindung fehlgeschlagen")
se_client = None
return False
except Exception as e:
log.error(f"SolarEdge connect Fehler: {e}")
se_client = None
return False
def ensure_ve_connected():
global ve_client
if ve_client and ve_client.connected:
return True
try:
ve_client = ModbusTcpClient(VE_HOST, port=VE_PORT, timeout=10)
if ve_client.connect():
log.info("Venus E verbunden")
# Beim ersten Connect Handshake setzen
do_handshake()
return True
log.warning("Venus E: Verbindung fehlgeschlagen")
ve_client = None
return False
except Exception as e:
log.error(f"Venus E connect Fehler: {e}")
ve_client = None
return False
def disconnect_se():
global se_client
if se_client:
try: se_client.close()
except Exception: pass
se_client = None
def disconnect_ve():
global ve_client
if ve_client:
try: ve_client.close()
except Exception: pass
ve_client = None
def do_handshake():
"""RS485-Enable. Nur intern aufrufen, ve_client muss verbunden sein."""
try:
r = ve_client.write_register(address=42000, value=21930)
if r.isError():
log.warning(f"Handshake Fehler: {r}")
return False
state["last_handshake"] = time.time()
return True
except Exception as e:
log.warning(f"Handshake Exception: {e}")
return False
def ensure_handshake_fresh():
"""Sorgt dafür, dass der RS485-Enable nicht älter als HANDSHAKE_REFRESH ist."""
if time.time() - state["last_handshake"] > HANDSHAKE_REFRESH:
return do_handshake()
return True
# ============================================================
# SOLAREDGE LESEN
# ============================================================
STATUS_MAP = {1:'Off', 2:'Sleeping', 3:'Starting', 4:'MPPT',
5:'Throttled', 6:'ShuttingDown', 7:'Fault', 8:'Standby'}
def _to_int16(val):
return val if val < 32768 else val - 65536
def _read_uint32_be(regs):
return (regs[0] * 65536) + regs[1]
def read_solaredge():
global se_client
if not ensure_se_connected():
return False
try:
r = se_client.read_holding_registers(address=40083, count=2)
if r.isError(): raise Exception("AC Power read error")
state["pv_power"] = round(_to_int16(r.registers[0]) * (10 ** _to_int16(r.registers[1])), 1)
r = se_client.read_holding_registers(address=40206, count=1)
sf_r = se_client.read_holding_registers(address=40210, count=1)
if not r.isError() and not sf_r.isError():
state["grid_power"] = round(_to_int16(r.registers[0]) * (10 ** _to_int16(sf_r.registers[0])), 1)
r = se_client.read_holding_registers(address=40107, count=1)
if not r.isError():
state["inverter_status"] = STATUS_MAP.get(r.registers[0], f"Unknown({r.registers[0]})")
r = se_client.read_holding_registers(address=40103, count=1)
if not r.isError():
state["inverter_temp"] = round(_to_int16(r.registers[0]) * 0.01, 1)
r = se_client.read_holding_registers(address=40093, count=2)
sf_r = se_client.read_holding_registers(address=40095, count=1)
if not r.isError() and not sf_r.isError():
state["pv_lifetime_kwh"] = round(
_read_uint32_be(r.registers) * (10 ** _to_int16(sf_r.registers[0])) / 1000, 3)
r_exp = se_client.read_holding_registers(address=40226, count=2)
r_imp = se_client.read_holding_registers(address=40234, count=2)
r_sf = se_client.read_holding_registers(address=40242, count=1)
if not r_exp.isError() and not r_imp.isError() and not r_sf.isError():
sf = _to_int16(r_sf.registers[0])
state["meter_exported"] = round(_read_uint32_be(r_exp.registers) * (10 ** sf) / 1000, 3)
state["meter_imported"] = round(_read_uint32_be(r_imp.registers) * (10 ** sf) / 1000, 3)
state["errors_se"] = 0
return True
except Exception as e:
log.error(f"SolarEdge Lesefehler: {e}")
disconnect_se()
state["errors_se"] += 1
return False
# ============================================================
# VENUS E LESEN
# ============================================================
def read_venus():
"""Liest SOC, battery_power, battery_voltage. Wird aufgerufen:
- direkt nach jedem erfolgreichen Schreibbefehl (piggy-back)
- spätestens alle VE_FALLBACK_READ Sekunden
"""
global ve_client
if not ensure_ve_connected():
return False
try:
r = ve_client.read_holding_registers(address=37004, count=2)
if r.isError(): raise Exception("Grid Power read error")
state["battery_power"] = r.registers[0] if r.registers[0] < 32768 else r.registers[0] - 65536
state["soc"] = r.registers[1]
r = ve_client.read_holding_registers(address=34000, count=1)
if not r.isError():
state["battery_voltage"] = round(r.registers[0] / 100, 2)
state["errors_ve"] = 0
state["last_ve_read"] = time.time()
return True
except Exception as e:
log.error(f"Venus E Lesefehler: {e}")
disconnect_ve()
state["errors_ve"] += 1
return False
# ============================================================
# VENUS E STEUERN
# ============================================================
def apply_soc_buffer(mode, power_w):
"""Begrenzt Leistung in den SOC-Buffer-Zonen auf BUFFER_LIMIT.
Gibt 0 zurück wenn SOC die Außengrenze erreicht hat."""
soc = state["soc"]
if mode == 1: # Laden
if soc >= MAX_SOC:
return 0
if soc >= MAX_SOC - BUFFER_WIDTH:
return min(power_w, BUFFER_LIMIT)
elif mode == 2: # Entladen
if soc <= state["min_soc"]:
return 0
if soc <= state["min_soc"] + BUFFER_WIDTH:
return min(power_w, BUFFER_LIMIT)
return power_w
def venus_command(mode, power_w=0):
"""Schreibt Befehl zur Batterie. Nur wenn sich etwas wirklich ändert.
Nach erfolgreichem Schreiben wird piggy-back gelesen.
Bei mode in (1,2) wird die Leistung auf das aktuelle Limit gekappt
und die SOC-Buffer-Zone angewandt.
"""
global ve_client
power_w = int(power_w)
# Power-Limits anwenden
if mode == 1:
power_w = max(0, min(state["max_charge_power"], power_w))
elif mode == 2:
power_w = max(0, min(state["max_discharge_power"], power_w))
# SOC-Buffer-Zone anwenden
if mode in (1, 2):
capped = apply_soc_buffer(mode, power_w)
if capped == 0 and power_w > 0:
mode = 0
power_w = 0
else:
power_w = capped
# Nichts geändert?
if mode == state["last_ve_mode"] and power_w == state["last_ve_power"]:
return True
if not ensure_ve_connected():
return False
try:
# Handshake auffrischen wenn fällig
ensure_handshake_fresh()
if mode == 0:
r = ve_client.write_register(address=42010, value=0)
if r.isError():
log.warning(f"Stop-Befehl Fehler: {r}")
return False
log.info("Venus E: STOP")
state["commanded_power"] = 0
elif mode == 1:
# Wenn Modus gleich bleibt → nur Leistung updaten
if state["last_ve_mode"] == 1:
r = ve_client.write_register(address=42020, value=power_w)
if r.isError():
log.warning(f"Charge-Power-Update Fehler: {r}")
return False
else:
r = ve_client.write_register(address=42020, value=power_w)
if r.isError():
log.warning(f"Charge-Power-Write Fehler: {r}")
return False
time.sleep(0.1)
r = ve_client.write_register(address=42010, value=1)
if r.isError():
log.warning(f"Charge-Mode-Write Fehler: {r}")
return False
log.info(f"Venus E: LADEN {power_w}W")
state["commanded_power"] = power_w
elif mode == 2:
if state["last_ve_mode"] == 2:
r = ve_client.write_register(address=42021, value=power_w)
if r.isError():
log.warning(f"Discharge-Power-Update Fehler: {r}")
return False
else:
r = ve_client.write_register(address=42021, value=power_w)
if r.isError():
log.warning(f"Discharge-Power-Write Fehler: {r}")
return False
time.sleep(0.1)
r = ve_client.write_register(address=42010, value=2)
if r.isError():
log.warning(f"Discharge-Mode-Write Fehler: {r}")
return False
log.info(f"Venus E: ENTLADEN {power_w}W")
state["commanded_power"] = -power_w
state["last_ve_mode"] = mode
state["last_ve_power"] = power_w
state["last_command_time"] = time.time()
# Piggy-back Read verzögert: Marstek braucht 1-2s bis er reagiert.
# Wird in der Hauptschleife abgeholt sobald der Zeitpunkt erreicht ist.
state["pending_ve_read"] = time.time() + 2.0
return True
except Exception as e:
log.error(f"Venus E Schreibfehler: {e}")
disconnect_ve()
state["last_ve_mode"] = -1
state["last_ve_power"] = 0
return False
# ============================================================
# ENERGIE-ZÄHLER (auf Basis kommandierter Leistung)
# ============================================================
def update_energy_counters():
now = time.time()
if state["last_counter_time"] == 0.0:
state["last_counter_time"] = now
state["_last_pv_kwh"] = state["pv_lifetime_kwh"]
state["_last_imported"] = state["meter_imported"]
state["_last_exported"] = state["meter_exported"]
state["_last_bat_ch"] = state["battery_charged_kwh"]
state["_last_bat_disch"] = state["battery_discharged_kwh"]
return
dt_h = (now - state["last_counter_time"]) / 3600
state["last_counter_time"] = now
# Batterie-Zähler aus kommandierter Leistung
cmd = state["commanded_power"]
# Anti-Phantom: bei SOC-Limit nicht buchen
if cmd > 0 and state["soc"] >= MAX_SOC:
cmd = 0
elif cmd < 0 and state["soc"] <= state["min_soc"]:
cmd = 0
if cmd > 0:
state["battery_charged_kwh"] += cmd * dt_h / 1000
elif cmd < 0:
state["battery_discharged_kwh"] += abs(cmd) * dt_h / 1000
# Power-Flow-Berechnung (auf Basis aktueller Messwerte)
pv = max(0.0, state["pv_power"])
grid = state["grid_power"]
bat = state["battery_power"] # für Visualisierung
pv_to_grid = max(0.0, grid)
pv_to_battery = max(0.0, -bat)
grid_to_house = max(0.0, -grid)
battery_to_house = max(0.0, bat)
pv_to_house = max(0.0, pv - pv_to_grid - pv_to_battery)
state["pv_to_grid_w"] = round(pv_to_grid, 1)
state["pv_to_house_w"] = round(pv_to_house, 1)
state["pv_to_house_w_negativ"] = round(-pv_to_house, 1)
state["pv_to_battery_w"] = round(pv_to_battery, 1)
state["grid_to_house_w"] = round(grid_to_house, 1)
state["battery_to_house_w"] = round(battery_to_house, 1)
# Hausverbrauch (kWh) aus Bilanzgleichung
d_pv = max(0.0, state["pv_lifetime_kwh"] - (state["_last_pv_kwh"] or state["pv_lifetime_kwh"]))
d_imp = max(0.0, state["meter_imported"] - (state["_last_imported"] or state["meter_imported"]))
d_exp = max(0.0, state["meter_exported"] - (state["_last_exported"] or state["meter_exported"]))
d_disch = state["battery_discharged_kwh"] - state["_last_bat_disch"]
d_ch = state["battery_charged_kwh"] - state["_last_bat_ch"]
d_house = d_pv + d_disch + d_imp - d_exp - d_ch
if d_house > 0:
state["house_energy_kwh"] += d_house
state["_last_pv_kwh"] = state["pv_lifetime_kwh"]
state["_last_imported"] = state["meter_imported"]
state["_last_exported"] = state["meter_exported"]
state["_last_bat_ch"] = state["battery_charged_kwh"]
state["_last_bat_disch"] = state["battery_discharged_kwh"]
# Prozent-Anteile (Lifetime)
total_pv = state["pv_lifetime_kwh"]
if total_pv > 0:
pv_to_grid_kwh = state["meter_exported"]
pv_to_battery_kwh = state["battery_charged_kwh"]
pv_to_house_kwh = max(0.0, total_pv - pv_to_grid_kwh - pv_to_battery_kwh)
state["pv_to_grid_pct"] = round(pv_to_grid_kwh / total_pv * 100, 1)
state["pv_to_battery_pct"] = round(pv_to_battery_kwh / total_pv * 100, 1)
state["pv_to_house_pct"] = round(pv_to_house_kwh / total_pv * 100, 1)
total_cons = state["house_energy_kwh"]
if total_cons > 0:
pv_to_house_kwh = max(0.0, state["pv_lifetime_kwh"] - state["meter_exported"] - state["battery_charged_kwh"])
state["grid_to_house_pct"] = round(state["meter_imported"] / total_cons * 100, 1)
state["pv_to_house_cons_pct"] = round(pv_to_house_kwh / total_cons * 100, 1)
state["battery_to_house_pct"] = round(state["battery_discharged_kwh"] / total_cons * 100, 1)
# ============================================================
# REGELLOGIK — adaptiver Integrator
# ============================================================
def adaptive_step(error, kp):
"""Adaptive Schrittweite: nichtlineare Verstärkung."""
abs_e = abs(error)
scale = ADAPTIVE_GAIN_TABLE[-1][1]
for threshold, s in ADAPTIVE_GAIN_TABLE:
if abs_e > threshold:
scale = s
break
return error * kp * scale
def run_control():
"""Adaptive Integrator-Regelung. Nur im Modus 'eigenverbrauch'."""
global _grid_history
if state["mode"] != "eigenverbrauch":
return
_grid_history.append(state["grid_power"])
if len(_grid_history) > GRID_AVG_LEN:
_grid_history.pop(0)
if len(_grid_history) < GRID_AVG_LEN:
return # noch nicht genug Samples
grid_avg = sum(_grid_history) / len(_grid_history)
error = grid_avg - state["target_offset"]
deadband = state["deadband"]
# Schritt bestimmen
if abs(error) >= deadband:
# === Normale Regelung ===
step = adaptive_step(error, state["kp"])
step = max(-state["max_step"], min(state["max_step"], step))
state["_last_trickle"] = time.time() # Cooldown für Trickle resetten
else:
# === Trickle-Modus: langsame Konvergenz in der Deadband ===
if abs(error) < TRICKLE_DEADBAND:
return # wirklich nahe am Ziel — Ruhe geben
now = time.time()
if now - state["_last_trickle"] < TRICKLE_INTERVAL:
return # noch nicht Zeit für nächsten Nudge
state["_last_trickle"] = now
sign = 1 if error > 0 else -1
step = sign * max(TRICKLE_MIN_STEP, min(TRICKLE_MAX_STEP, abs(error) * TRICKLE_GAIN))
log.info(f"Trickle: grid={grid_avg:.0f}W err={error:+.0f}W → step={step:+.0f}W")
new_setpoint = state["last_setpoint"] + step
# Setpoint richtungsabhängig clippen
if new_setpoint > 0:
new_setpoint = min(new_setpoint, state["max_charge_power"])
elif new_setpoint < 0:
new_setpoint = max(new_setpoint, -state["max_discharge_power"])
state["last_setpoint"] = new_setpoint
# In Aktion umsetzen
if new_setpoint > deadband:
# Laden
if state["soc"] >= MAX_SOC:
if state["last_ve_mode"] != 0:
log.info(f"SOC {state['soc']}% ≥ {MAX_SOC}% → stoppe Laden (anti-windup)")
venus_command(0)
state["last_setpoint"] = 0 # Anti-windup
return
venus_command(1, int(new_setpoint))
elif new_setpoint < -deadband:
# Entladen
if state["soc"] <= state["min_soc"]:
if state["last_ve_mode"] != 0:
log.info(f"SOC {state['soc']}% ≤ {state['min_soc']}% → stoppe Entladen (anti-windup)")
venus_command(0)
state["last_setpoint"] = 0 # Anti-windup
return
venus_command(2, int(abs(new_setpoint)))
else:
# Sehr nahe an 0 → Stop, falls aktiv
if state["last_ve_mode"] != 0:
venus_command(0)
# ============================================================
# INFLUXDB
# ============================================================
influx_client = None
influx_write_api = None
influx_query_api = None
def influx_connect():
global influx_client, influx_write_api, influx_query_api
try:
influx_client = InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG)
influx_write_api = influx_client.write_api(write_options=SYNCHRONOUS)
influx_query_api = influx_client.query_api()
log.info(f"InfluxDB verbunden: {INFLUX_URL}")
return True
except Exception as e:
log.error(f"InfluxDB Verbindung fehlgeschlagen: {e}")
return False
def influx_write():
if not influx_write_api: return
try:
point = (
Point("energie")
.field("pv_power", float(state["pv_power"]))
.field("grid_power", float(state["grid_power"]))
.field("battery_power", float(state["battery_power"]))
.field("house_power", float(state["house_power"]))
.field("soc", float(state["soc"]))
.field("battery_voltage", float(state["battery_voltage"]))
.field("inverter_temp", float(state["inverter_temp"]))
.field("pv_lifetime_kwh", float(state["pv_lifetime_kwh"]))
.field("meter_imported", float(state["meter_imported"]))
.field("meter_exported", float(state["meter_exported"]))
.field("battery_charged_kwh", float(state["battery_charged_kwh"]))
.field("battery_discharged_kwh", float(state["battery_discharged_kwh"]))
.field("house_energy_kwh", float(state["house_energy_kwh"]))
.field("pv_to_grid_w", float(state["pv_to_grid_w"]))
.field("pv_to_house_w", float(state["pv_to_house_w"]))
.field("pv_to_battery_w", float(state["pv_to_battery_w"]))
.field("grid_to_house_w", float(state["grid_to_house_w"]))
.field("battery_to_house_w", float(state["battery_to_house_w"]))
.field("commanded_power", float(state["commanded_power"]))
.field("last_setpoint", float(state["last_setpoint"]))
)
influx_write_api.write(bucket=INFLUX_BUCKET, org=INFLUX_ORG, record=point)
except Exception as e:
log.warning(f"InfluxDB Write Fehler: {e}")
def _query_sum(field, period):
query_first = f'''
from(bucket: "{INFLUX_BUCKET}")
|> range(start: {period})
|> filter(fn: (r) => r._measurement == "energie" and r._field == "{field}")
|> first()
'''
query_last = f'''
from(bucket: "{INFLUX_BUCKET}")
|> range(start: {period})
|> filter(fn: (r) => r._measurement == "energie" and r._field == "{field}")
|> last()
'''
try:
r_first = influx_query_api.query(query_first)
r_last = influx_query_api.query(query_last)
first_rec = r_first[0].records[0] if r_first and r_first[0].records else None
last_rec = r_last[0].records[0] if r_last and r_last[0].records else None
if first_rec is None or last_rec is None:
return 0.0
if first_rec.get_time() == last_rec.get_time():
return 0.0
return round(max(0.0, last_rec.get_value() - first_rec.get_value()), 3)
except Exception as e:
log.warning(f"InfluxDB Query Fehler ({field}, {period}): {e}")
return 0.0
def publish_stats():
if not influx_query_api or not mqtt_client:
return
fields = [
("pv_lifetime_kwh", "pv_kwh"),
("meter_imported", "imported_kwh"),
("meter_exported", "exported_kwh"),
("battery_charged_kwh", "bat_charged_kwh"),
("battery_discharged_kwh", "bat_discharged_kwh"),
]
for period in STAT_PERIODS:
results = {}
for influx_field, mqtt_suffix in fields:
val = _query_sum(influx_field, period)
if val is not None:
results[mqtt_suffix] = val
mqtt_client.publish(
f"{MQTT_PREFIX}/stats/{period}/{mqtt_suffix}",
str(val), retain=True
)
if len(results) == len(fields):
pv = results["pv_kwh"]
exported = results["exported_kwh"]
bat_ch = results["bat_charged_kwh"]
bat_disch = results["bat_discharged_kwh"]
imported = results["imported_kwh"]
pv_to_house = round(max(0.0, pv - exported - bat_ch), 3)
house_total = round(pv_to_house + bat_disch + imported, 3)
mqtt_client.publish(f"{MQTT_PREFIX}/stats/{period}/pv_to_house_kwh", str(pv_to_house), retain=True)
mqtt_client.publish(f"{MQTT_PREFIX}/stats/{period}/house_kwh", str(house_total), retain=True)
if pv > 0:
mqtt_client.publish(f"{MQTT_PREFIX}/stats/{period}/pv_to_house_pct", str(round(pv_to_house / pv * 100, 1)), retain=True)
mqtt_client.publish(f"{MQTT_PREFIX}/stats/{period}/pv_to_grid_pct", str(round(exported / pv * 100, 1)), retain=True)
mqtt_client.publish(f"{MQTT_PREFIX}/stats/{period}/pv_to_battery_pct", str(round(bat_ch / pv * 100, 1)), retain=True)
if house_total > 0:
mqtt_client.publish(f"{MQTT_PREFIX}/stats/{period}/cons_from_pv_pct", str(round(pv_to_house / house_total * 100, 1)), retain=True)
mqtt_client.publish(f"{MQTT_PREFIX}/stats/{period}/cons_from_grid_pct", str(round(imported / house_total * 100, 1)), retain=True)
mqtt_client.publish(f"{MQTT_PREFIX}/stats/{period}/cons_from_bat_pct", str(round(bat_disch / house_total * 100, 1)), retain=True)
if house_total > 0:
mqtt_client.publish(f"{MQTT_PREFIX}/stats/{period}/autarkie_pct",
str(round((house_total - imported) / house_total * 100, 1)), retain=True)
if pv > 0:
mqtt_client.publish(f"{MQTT_PREFIX}/stats/{period}/eigenverbrauch_pct",
str(round((pv - exported) / pv * 100, 1)), retain=True)
log.info("Statistiken publiziert")
# ============================================================
# MQTT
# ============================================================
def _set_int(key, payload, low, high, save=True):
"""Hilfsfunktion: validiert und setzt einen int-Parameter."""
try:
val = int(payload)
if low <= val <= high:
state[key] = val
if save: save_config()
log.info(f"Parameter '{key}' auf {val} gesetzt")
return True
log.warning(f"Parameter '{key}' außerhalb [{low},{high}]: {val}")
except ValueError:
log.warning(f"Parameter '{key}' ungültig: {payload}")
return False
def _set_float(key, payload, low, high, save=True):
try:
val = float(payload)
if low <= val <= high:
state[key] = val
if save: save_config()
log.info(f"Parameter '{key}' auf {val} gesetzt")
return True
log.warning(f"Parameter '{key}' außerhalb [{low},{high}]: {val}")
except ValueError:
log.warning(f"Parameter '{key}' ungültig: {payload}")
return False
def on_mqtt_connect(client, userdata, flags, rc, properties=None):
log.info(f"MQTT verbunden: {rc}")
client.publish(f"{MQTT_PREFIX}/status", "online", retain=True)
client.subscribe(f"{MQTT_PREFIX}/cmd/#")
def on_mqtt_disconnect(client, userdata, flags, rc, properties=None):
log.warning(f"MQTT getrennt (rc={rc})")
def on_mqtt_message(client, userdata, msg):
topic = msg.topic
payload = msg.payload.decode().strip()
log.info(f"MQTT CMD: {topic} = {payload}")
if topic == f"{MQTT_PREFIX}/cmd/mode":
if payload in ("eigenverbrauch", "laden", "entladen", "stop"):
state["mode"] = payload
state["last_ve_mode"] = -1
state["last_setpoint"] = 0
save_config()
if payload == "stop": venus_command(0)
elif payload == "laden": venus_command(1, state["charge_power"])
elif payload == "entladen": venus_command(2, state["discharge_power"])
elif topic == f"{MQTT_PREFIX}/cmd/kp":
_set_float("kp", payload, 0.1, 3.0)
elif topic == f"{MQTT_PREFIX}/cmd/max_step":
_set_int("max_step", payload, 50, 2000)
elif topic == f"{MQTT_PREFIX}/cmd/deadband":
_set_int("deadband", payload, 5, 200)
elif topic == f"{MQTT_PREFIX}/cmd/max_charge_power":
if _set_int("max_charge_power", payload, 100, MAX_POWER):
# Bei Reduktion: aktuelle Leistung ggf. anpassen
if state["last_ve_mode"] == 1 and state["last_ve_power"] > state["max_charge_power"]:
venus_command(1, state["max_charge_power"])
elif topic == f"{MQTT_PREFIX}/cmd/max_discharge_power":
if _set_int("max_discharge_power", payload, 100, MAX_POWER):
if state["last_ve_mode"] == 2 and state["last_ve_power"] > state["max_discharge_power"]:
venus_command(2, state["max_discharge_power"])
elif topic == f"{MQTT_PREFIX}/cmd/target_offset":
_set_int("target_offset", payload, -500, 500)
elif topic == f"{MQTT_PREFIX}/cmd/min_soc":
_set_int("min_soc", payload, 5, 80)
elif topic == f"{MQTT_PREFIX}/cmd/charge_power":
if _set_int("charge_power", payload, 0, MAX_POWER):
if state["mode"] == "laden":
venus_command(1, state["charge_power"])
elif topic == f"{MQTT_PREFIX}/cmd/discharge_power":
if _set_int("discharge_power", payload, 0, MAX_POWER):
if state["mode"] == "entladen":
venus_command(2, state["discharge_power"])
def publish_state():
if not mqtt_client: return
house = max(0, round(state["pv_power"] + state["battery_power"] - state["grid_power"], 1))
state["house_power"] = house
topics = {
# Messwerte
"grid_power": state["grid_power"],
"pv_power": state["pv_power"],
"battery_power": state["battery_power"],
"house_power": house,
"soc": state["soc"],
"battery_voltage": state["battery_voltage"],
"inverter_status": state["inverter_status"],
"inverter_temp": state["inverter_temp"],
# kWh-Zähler
"pv_lifetime_kwh": round(state["pv_lifetime_kwh"], 3),
"meter_imported": round(state["meter_imported"], 3),
"meter_exported": round(state["meter_exported"], 3),
"battery_charged_kwh": round(state["battery_charged_kwh"], 3),
"battery_discharged_kwh": round(state["battery_discharged_kwh"], 3),
"house_energy_kwh": round(state["house_energy_kwh"], 3),
# Energie-Flüsse
"pv_to_grid_w": state["pv_to_grid_w"],
"pv_to_house_w": state["pv_to_house_w"],
"pv_to_house_w_negativ": state["pv_to_house_w_negativ"],
"pv_to_battery_w": state["pv_to_battery_w"],
"grid_to_house_w": state["grid_to_house_w"],
"battery_to_house_w": state["battery_to_house_w"],
"pv_to_grid_pct": state["pv_to_grid_pct"],
"pv_to_house_pct": state["pv_to_house_pct"],
"pv_to_battery_pct": state["pv_to_battery_pct"],
"grid_to_house_pct": state["grid_to_house_pct"],
"pv_to_house_cons_pct": state["pv_to_house_cons_pct"],
"battery_to_house_pct": state["battery_to_house_pct"],
# Tunbare Parameter (Echo zurück an HA)
"mode": state["mode"],
"kp": state["kp"],
"max_step": state["max_step"],
"deadband": state["deadband"],
"max_charge_power": state["max_charge_power"],
"max_discharge_power": state["max_discharge_power"],
"target_offset": state["target_offset"],
"min_soc": state["min_soc"],
"charge_power": state["charge_power"],
"discharge_power": state["discharge_power"],
# Regler-Debug
"commanded_power": state["commanded_power"],
"last_setpoint": round(state["last_setpoint"], 1),
"status": "online",
}
for key, val in topics.items():
mqtt_client.publish(f"{MQTT_PREFIX}/{key}", str(val), retain=True)
log.info(
f"PV={state['pv_power']:.0f}W Grid={state['grid_power']:.0f}W "
f"Bat={state['battery_power']}W SOC={state['soc']}% "
f"cmd={state['commanded_power']}W sp={state['last_setpoint']:.0f}W "
f"Mode={state['mode']}"
)
# ============================================================
# HAUPTSCHLEIFE
# ============================================================
def signal_handler(sig, frame):
global running
log.info("Beende Energy Controller...")
running = False
save_counters()
save_config()
venus_command(0)
if mqtt_client:
mqtt_client.publish(f"{MQTT_PREFIX}/status", "offline", retain=True)
mqtt_client.loop_stop()
disconnect_se()
disconnect_ve()
sys.exit(0)
def main():
global mqtt_client
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
log.info("Energy Controller v2 gestartet")
# Konfiguration & Zähler laden BEVOR MQTT publiziert
load_config()
load_counters()
mqtt_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="energy_controller")
mqtt_client.username_pw_set(MQTT_USER, MQTT_PASS)
mqtt_client.on_connect = on_mqtt_connect
mqtt_client.on_message = on_mqtt_message
mqtt_client.on_disconnect = on_mqtt_disconnect
mqtt_client.will_set(f"{MQTT_PREFIX}/status", "offline", retain=True)
mqtt_client.connect(MQTT_HOST, MQTT_PORT, keepalive=60)
mqtt_client.loop_start()
time.sleep(2)
influx_connect()
# Initialer Marstek-Read damit SOC plausibel ist
read_venus()
last_publish = 0.0
last_influx = 0.0
last_persist = 0.0
last_stats = 0.0
log.info("Starte Regelschleife")
while running:
loop_start = time.time()
se_ok = read_solaredge()
# Verzögerten Piggy-Back-Read abholen, falls fällig
now = time.time()
if state["pending_ve_read"] > 0 and now >= state["pending_ve_read"]:
read_venus()
state["pending_ve_read"] = 0.0
# Marstek nur lesen wenn länger nicht geschehen
# (piggy-back nach venus_command ist der Normalfall)
if now - state["last_ve_read"] > VE_FALLBACK_READ:
read_venus()
if se_ok:
update_energy_counters()
run_control()
if now - last_publish >= PUBLISH_INTERVAL:
publish_state()
last_publish = now
if now - last_influx >= INFLUX_INTERVAL:
influx_write()
last_influx = now
if now - last_persist >= PERSIST_INTERVAL:
save_counters()
last_persist = now
if now - last_stats >= STATS_INTERVAL:
publish_stats()
last_stats = now
else:
if state["errors_se"] >= 3:
log.error("Zu viele SolarEdge-Fehler → Venus E stoppen")
venus_command(0)
elapsed = time.time() - loop_start
time.sleep(max(0, LOOP_INTERVAL - elapsed))
if __name__ == "__main__":
main()
P.S.: Stellt nicht zu viele Fragen. Das ist mittlerweile V27 von Claude. Ich versteh da viel, aber schreiben könnte ich sowas nicht. 
Achso bevor ich es vergesse. Die beiden Werte kann man natürlich noch anpassen. Habe die jetzt mal konservativ hier im Code eingestellt.
MAX_DISCHARGE_POWER = 800
MAX_CHARGE_POWER = 800