#!/usr/bin/env python ########################## # adapt form # juf # and # fabian-lauer / dbus-shelly-3em-smartmeter # and # RalfZim / venus.dbus-fronius-smartmeter ########################################## # import normal packages import platform import logging import sys import os if sys.version_info.major == 2: import gobject else: from gi.repository import GLib as gobject import time import requests # for http GET import math import re # our own packages from victron sys.path.insert(1, os.path.join(os.path.dirname(__file__), '/opt/victronenergy/dbus-systemcalc-py/ext/velib_python')) from vedbus import VeDbusService def get_energy_json(url, tries=3, timeout=5, logger=None): """Hole JSON mit mehreren Versuchen und Timeout""" import time, requests last_exc = None for attempt in range(1, tries+1): try: if logger: logger.info("HTTP GET %s (try %d/%d)", url, attempt, tries) r = requests.get(url, timeout=timeout) r.raise_for_status() return r.json() except Exception as e: last_exc = e if logger: logger.warning("Request failed (%s), retrying...", e) time.sleep(1) if logger: logger.error("All retries failed for %s", url) raise last_exc class DbusSmartmeterService: def __init__(self, servicename, deviceinstance, paths, productname='Tasmota', connection='Tasmota Web service'): self._dbusservice = VeDbusService("{}.http_{:02d}".format(servicename, deviceinstance)) self._paths = paths logging.debug("%s /DeviceInstance = %d" % (servicename, deviceinstance)) # Create the management objects, as specified in the ccgx dbus-api document self._dbusservice.add_path('/Mgmt/ProcessName', __file__) self._dbusservice.add_path('/Mgmt/ProcessVersion', 'Unknown version, and running on Python ' + platform.python_version()) self._dbusservice.add_path('/Mgmt/Connection', connection) # Create the mandatory objects self._dbusservice.add_path('/DeviceInstance', deviceinstance) self._dbusservice.add_path('/ProductId', 45069) # ET340 Energy Meter (dummy id used by many community drivers) self._dbusservice.add_path('/DeviceType', 345) self._dbusservice.add_path('/ProductName', productname) self._dbusservice.add_path('/CustomName', productname) self._dbusservice.add_path('/Latency', None) self._dbusservice.add_path('/FirmwareVersion', 0.1) self._dbusservice.add_path('/HardwareVersion', 0) self._dbusservice.add_path('/Connected', 1) self._dbusservice.add_path('/Role', 'grid') self._dbusservice.add_path('/Position', 0) # normally only needed for pvinverter self._dbusservice.add_path('/UpdateIndex', 0) # add path values to dbus for path, settings in self._paths.items(): self._dbusservice.add_path( path, settings['initial'], gettextcallback=settings['textformat'], writeable=True, onchangecallback=self._handlechangedvalue) # last update self._lastUpdate = 0 # add _update function 'timer' gobject.timeout_add(1000, self._update) # pause 1s before the next request def _fnum(self, v, default=0.0): """robust float parser: entfernt z.B. 'JS:232' und nimmt erste Zahl""" try: s = str(v) m = re.findall(r'[+-]?\d+(?:\.\d+)?', s) return float(m[0]) if m else float(default) except Exception: return float(default) def _p_phase(self, u, i, phi_deg): """Wirkleistung pro Phase: P = U * I * cos(phi)""" c = math.cos(math.radians(phi_deg)) # leichte Begrenzung auf [-1, 1] gegen Rundungsfehler if c > 1: c = 1 if c < -1: c = -1 return u * i * c def _update(self): logger = logging.getLogger(__name__) try: meter_url = "http://192.168.178.106/cm?cmnd=status%2010" meter_data = get_energy_json(meter_url, tries=3, timeout=5, logger=logger) energy = meter_data['StatusSNS']['ENERGY'] volt1 = self._fnum(energy.get('Voltage', 0.0)) volt2 = self._fnum(energy.get('Voltage_L2', 0.0)) volt3 = self._fnum(energy.get('Voltage_L3', 0.0)) curr1 = self._fnum(energy.get('Current', 0.0)) curr2 = self._fnum(energy.get('Current_L2', 0.0)) curr3 = self._fnum(energy.get('Current_L3', 0.0)) phi1 = self._fnum(energy.get('phase_angle_L1', 0.0)) phi2 = self._fnum(energy.get('phase_angle_L2', 0.0)) phi3 = self._fnum(energy.get('phase_angle_L3', 0.0)) freq = self._fnum(energy.get('Freq', 50.0)) total = self._fnum(energy.get('Total', 0.0)) supply = self._fnum(energy.get('Supply', 0.0)) p_tasmota_total = self._fnum(energy.get('Power', 0.0)) p1 = self._p_phase(volt1, curr1, phi1) if volt1 and curr1 else 0.0 p2 = self._p_phase(volt2, curr2, phi2) if volt2 and curr2 else 0.0 p3 = self._p_phase(volt3, curr3, phi3) if volt3 and curr3 else 0.0 p_sum = p1 + p2 + p3 RESCALE_TO_TASMOTA_TOTAL = True if RESCALE_TO_TASMOTA_TOTAL and abs(p_sum) > 1e-6: scale = p_tasmota_total / p_sum if p_sum != 0 else 1.0 logger.info("Rescale phase sum %.1f W → Tasmota total %.1f W (scale=%.3f)", p_sum, p_tasmota_total, scale) p1, p2, p3 = p1 * scale, p2 * scale, p3 * scale p_sum = p_tasmota_total else: if p_tasmota_total != 0.0 and (abs(p_sum - p_tasmota_total) / max(1.0, abs(p_tasmota_total)) > 0.2): logger.warning("Phase sum %.1f W differs from Tasmota %.1f W (>20%%) – using Tasmota total", p_sum, p_tasmota_total) p_sum = p_tasmota_total self._dbusservice['/Ac/Power'] = float(p_sum) self._dbusservice['/Ac/Frequency'] = float(freq) self._dbusservice['/Ac/Voltage'] = float(volt1) self._dbusservice['/Ac/Current'] = float(curr1 + curr2 + curr3) self._dbusservice['/Ac/L1/Voltage'] = float(volt1) self._dbusservice['/Ac/L2/Voltage'] = float(volt2) self._dbusservice['/Ac/L3/Voltage'] = float(volt3) self._dbusservice['/Ac/L1/Current'] = float(curr1) self._dbusservice['/Ac/L2/Current'] = float(curr2) self._dbusservice['/Ac/L3/Current'] = float(curr3) self._dbusservice['/Ac/L1/Power'] = float(p1) self._dbusservice['/Ac/L2/Power'] = float(p2) self._dbusservice['/Ac/L3/Power'] = float(p3) self._dbusservice['/Ac/Energy/Forward'] = float(total) self._dbusservice['/Ac/Energy/Reverse'] = float(supply) self._dbusservice['/Ac/L1/Energy/Forward'] = float(total) self._dbusservice['/Ac/L1/Energy/Reverse'] = float(supply) self._dbusservice['/Ac/L2/Energy/Forward'] = 0.0 self._dbusservice['/Ac/L2/Energy/Reverse'] = 0.0 self._dbusservice['/Ac/L3/Energy/Forward'] = 0.0 self._dbusservice['/Ac/L3/Energy/Reverse'] = 0.0 logger.debug("P total=%.1f | L1=%.1f L2=%.1f L3=%.1f | U=%.1f/%.1f/%.1f V | I=%.2f/%.2f/%.2f A | f=%.1f Hz", p_sum, p1, p2, p3, volt1, volt2, volt3, curr1, curr2, curr3, freq) index = self._dbusservice['/UpdateIndex'] + 1 if index > 255: index = 0 self._dbusservice['/UpdateIndex'] = index self._lastUpdate = time.time() except Exception as e: logger.critical('Error at %s', '_update', exc_info=e) return True def _handlechangedvalue(self, path, value): logging.debug("someone else updated %s to %s" % (path, value)) return True # accept the change def main(): # configure logging logging.basicConfig( format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S', level=logging.INFO, handlers=[ logging.FileHandler("%s/current.log" % (os.path.dirname(os.path.realpath(__file__)))), logging.StreamHandler() ]) try: logging.info("Start") from dbus.mainloop.glib import DBusGMainLoop DBusGMainLoop(set_as_default=True) # formatting _kwh = lambda p, v: (str(round(v, 2)) + 'KWh') _a = lambda p, v: (str(round(v, 1)) + 'A') _w = lambda p, v: (str(round(v, 1)) + 'W') _v = lambda p, v: (str(round(v, 1)) + 'V') _hz = lambda p, v: (str(round(v, 1)) + 'Hz') # start our main-service pvac_output = DbusSmartmeterService( servicename='com.victronenergy.grid', deviceinstance=40, paths={ '/Ac/Energy/Forward': {'initial': 0, 'textformat': _kwh}, # energy bought from the grid '/Ac/Energy/Reverse': {'initial': 0, 'textformat': _kwh}, # energy sold to the grid '/Ac/Power': {'initial': 0, 'textformat': _w}, '/Ac/Frequency': {'initial': 0, 'textformat': _hz}, '/Ac/Current': {'initial': 0, 'textformat': _a}, '/Ac/Voltage': {'initial': 0, 'textformat': _v}, '/Ac/L1/Voltage': {'initial': 0, 'textformat': _v}, '/Ac/L2/Voltage': {'initial': 0, 'textformat': _v}, '/Ac/L3/Voltage': {'initial': 0, 'textformat': _v}, '/Ac/L1/Current': {'initial': 0, 'textformat': _a}, '/Ac/L2/Current': {'initial': 0, 'textformat': _a}, '/Ac/L3/Current': {'initial': 0, 'textformat': _a}, '/Ac/L1/Power': {'initial': 0, 'textformat': _w}, '/Ac/L2/Power': {'initial': 0, 'textformat': _w}, '/Ac/L3/Power': {'initial': 0, 'textformat': _w}, '/Ac/L1/Energy/Forward': {'initial': 0, 'textformat': _kwh}, '/Ac/L2/Energy/Forward': {'initial': 0, 'textformat': _kwh}, '/Ac/L3/Energy/Forward': {'initial': 0, 'textformat': _kwh}, '/Ac/L1/Energy/Reverse': {'initial': 0, 'textformat': _kwh}, '/Ac/L2/Energy/Reverse': {'initial': 0, 'textformat': _kwh}, '/Ac/L3/Energy/Reverse': {'initial': 0, 'textformat': _kwh}, }) logging.info('Connected to dbus, and switching over to gobject.MainLoop() (= event based)') mainloop = gobject.MainLoop() mainloop.run() except Exception as e: logging.critical('Error at %s', 'main', exc_info=e) if __name__ == "__main__": main()