Falls der Code vom Schmitzi funktional ist anbei das ganze als Python Bibliothek - ich kanns leider nicht testen oder debuggen weil Zählerkasten verplombt:
#!/usr/bin/env python3
import requests
import base64 # base64 wird im ursprünglichen Skript importiert, aber nicht verwendet. Kann ggf. entfernt werden.
from requests.auth import HTTPDigestAuth
from bs4 import BeautifulSoup
import urllib3
import logging
from typing import Optional, Dict, List, Tuple
# Logging konfigurieren (optional, aber empfohlen für Bibliotheken)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class HanServiceError(Exception):
"""Benutzerdefinierte Exception für Fehler in der HanService-Kommunikation."""
pass
class HanServiceReader:
"""
Eine Klasse zur Interaktion mit einem HanService-Gerät über HTTP Digest Auth.
Diese Klasse ermöglicht das Auslesen von Zählerdaten von Geräten,
die eine spezifische Web-Schnittstelle unter /cgi-bin/hanservice.cgi verwenden.
"""
def __init__(self, url: str, user: str, password: str, verify_ssl: bool = False):
"""
Initialisiert den HanServiceReader.
Args:
url (str): Die Basis-URL des HanService CGI-Skripts (z.B. 'https://192.168.1.200/cgi-bin/hanservice.cgi').
user (str): Der Benutzername für die HTTP Digest Authentication.
password (str): Das Passwort für die HTTP Digest Authentication.
verify_ssl (bool): Ob die SSL-Zertifikatsprüfung aktiviert sein soll. Standard: False.
"""
if not url.endswith('/cgi-bin/hanservice.cgi'):
logger.warning(f"URL '{url}' endet nicht mit '/cgi-bin/hanservice.cgi'. Stelle sicher, dass dies korrekt ist.")
self.base_url = url.rstrip('/') + '/cgi-bin/hanservice.cgi'
else:
self.base_url = url
self.user = user
self.password = password
self.verify_ssl = verify_ssl
self.session = requests.Session()
self.session.auth = HTTPDigestAuth(self.user, self.password)
self.token: Optional[str] = None
self.cookies: Optional[Dict[str, str]] = None
if not self.verify_ssl:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
logger.warning("SSL-Verifizierung ist deaktiviert. Dies ist unsicher für Produktionsumgebungen.")
self._login()
def _login(self):
"""Führt den initialen Login durch, um Session-Cookie und Token zu erhalten."""
logger.info(f"Versuche Login bei {self.base_url}")
try:
response = self.session.get(self.base_url, verify=self.verify_ssl)
response.raise_for_status() # Löst HTTPError bei Fehlern aus (4xx, 5xx)
self.cookies = self.session.cookies.get_dict()
if not self.cookies or 'session' not in self.cookies:
# Manchmal wird das Cookie erst nach dem ersten POST gesetzt
logger.warning("Session-Cookie wurde beim ersten GET nicht gefunden. Versuche es nach dem ersten POST.")
# Führe einen harmlosen POST durch, um das Cookie zu provozieren (falls nötig)
self._get_initial_token_and_update_cookie(response.content)
if not self.cookies or 'session' not in self.cookies:
raise HanServiceError("Konnte Session-Cookie nicht erhalten.")
self._get_initial_token_and_update_cookie(response.content)
logger.info("Login erfolgreich, Token und Cookie erhalten.")
except requests.exceptions.RequestException as e:
logger.error(f"Fehler bei der Verbindung oder Authentifizierung: {e}")
raise HanServiceError(f"Verbindungsfehler zu {self.base_url}: {e}") from e
except HanServiceError as e:
logger.error(f"Fehler beim Login-Prozess: {e}")
raise # Die HanServiceError direkt weiterleiten
def _get_initial_token_and_update_cookie(self, html_content: bytes):
"""Extrahiert das initiale CSRF-Token aus dem HTML."""
soup = BeautifulSoup(html_content, 'html.parser')
token_input = soup.find('input', {'type': 'hidden', 'name': 'tkn'})
if not token_input or not token_input.get('value'):
# Versuche alternativ, das Token aus einem Formular zu holen, falls das erste GET kein Token liefert
form_action = 'meterform'
post_data = {"action": form_action} # Sende ohne Token, um Token zu bekommen
try:
response = self.session.post(self.base_url, data=post_data, cookies=self.cookies, verify=self.verify_ssl)
response.raise_for_status()
self.cookies = self.session.cookies.get_dict() # Cookie hier aktualisieren!
soup = BeautifulSoup(response.content, 'html.parser')
token_input = soup.find('input', {'type': 'hidden', 'name': 'tkn'})
if not token_input or not token_input.get('value'):
raise HanServiceError("CSRF-Token (tkn) konnte nicht im HTML gefunden werden.")
except requests.exceptions.RequestException as e:
logger.error(f"Fehler beim Versuch, Token über POST zu erhalten: {e}")
raise HanServiceError(f"Fehler beim Abrufen des Tokens: {e}") from e
self.token = token_input['value']
logger.debug(f"Token gefunden: {self.token}")
# Stelle sicher, dass das Cookie-Dictionary korrekt formatiert ist
if self.cookies:
self.cookies = {'Cookie': "; ".join([f"{k}={v}" for k, v in self.cookies.items()])}
else:
raise HanServiceError("Cookies konnten nicht korrekt formatiert werden.")
def _make_post_request(self, action: str, additional_data: Optional[Dict[str, str]] = None) -> BeautifulSoup:
"""Führt eine POST-Anfrage an den HanService aus."""
if not self.token:
raise HanServiceError("Kein gültiges Token vorhanden. Login fehlgeschlagen?")
if not self.cookies:
raise HanServiceError("Keine gültigen Cookies vorhanden. Login fehlgeschlagen?")
post_data = {"tkn": self.token, "action": action}
if additional_data:
post_data.update(additional_data)
logger.debug(f"Sende POST Anfrage: action={action}, data={additional_data or {}}")
try:
response = self.session.post(self.base_url, data=post_data, cookies=self.cookies, verify=self.verify_ssl)
response.raise_for_status()
# Token könnte sich ändern, aktualisiere es nach jedem erfolgreichen POST
new_soup = BeautifulSoup(response.content, 'html.parser')
token_input = new_soup.find('input', {'type': 'hidden', 'name': 'tkn'})
if token_input and token_input.get('value'):
self.token = token_input['value']
logger.debug(f"Token aktualisiert: {self.token}")
else:
logger.warning("Kein neues Token nach POST gefunden. Verwende altes Token weiter.")
return new_soup
except requests.exceptions.RequestException as e:
logger.error(f"Fehler bei POST-Anfrage (action={action}): {e}")
raise HanServiceError(f"Fehler bei POST-Anfrage (action={action}): {e}") from e
def get_meter_ids(self) -> List[Tuple[str, str]]:
"""
Ruft die Liste der verfügbaren Zähler-IDs und deren Namen ab.
Returns:
List[Tuple[str, str]]: Eine Liste von Tupeln, wobei jedes Tupel (meter_id, meter_name) enthält.
Gibt eine leere Liste zurück, wenn keine Zähler gefunden wurden.
Raises:
HanServiceError: Wenn die Zählerliste nicht abgerufen oder geparst werden konnte.
"""
logger.info("Rufe Zählerliste ab...")
try:
soup = self._make_post_request(action='meterform')
select_element = soup.find('select', id='meterform_select_meter')
if not select_element:
logger.warning("Select-Element 'meterform_select_meter' nicht gefunden.")
# Prüfen ob vielleicht nur ein Zähler existiert und direkt angezeigt wird
# Diese Logik hängt stark von der spezifischen Implementierung des Geräts ab.
# Hier gehen wir davon aus, dass das Select-Element existieren MUSS.
raise HanServiceError("Konnte das Auswahlfeld für Zähler nicht finden.")
meters: List[Tuple[str, str]] = []
options = select_element.find_all('option')
if not options:
logger.warning("Keine Optionen im Zähler-Auswahlfeld gefunden.")
return [] # Leere Liste, wenn keine Optionen da sind
for option in options:
meter_id = option.get('value')
meter_name = option.string.strip() if option.string else meter_id # Name nehmen oder ID als Fallback
if meter_id:
meters.append((meter_id, meter_name))
logger.debug(f"Gefundener Zähler: ID={meter_id}, Name={meter_name}")
if not meters:
logger.warning("Obwohl Optionen gefunden wurden, konnte keine gültige Zähler-ID extrahiert werden.")
logger.info(f"{len(meters)} Zähler gefunden.")
return meters
except HanServiceError as e:
logger.error(f"Fehler beim Abrufen der Zähler-IDs: {e}")
raise # Fehler weiterleiten
except Exception as e:
logger.error(f"Unerwarteter Fehler beim Parsen der Zählerliste: {e}")
raise HanServiceError(f"Fehler beim Parsen der Zählerliste: {e}") from e
def get_meter_data(self, meter_id: str) -> Optional[Dict[str, str]]:
"""
Ruft die Detaildaten für einen spezifischen Zähler ab.
Args:
meter_id (str): Die ID des Zählers, dessen Daten abgerufen werden sollen.
Returns:
Optional[Dict[str, str]]: Ein Dictionary mit den Zählerdaten
(keys: 'value', 'unit', 'timestamp', 'isvalid', 'name', 'obis')
oder None, wenn die Daten nicht gefunden wurden.
Raises:
HanServiceError: Wenn die Daten nicht abgerufen oder geparst werden konnten.
"""
logger.info(f"Rufe Daten für Zähler-ID {meter_id} ab...")
try:
soup = self._make_post_request(action='showMeterProfile', additional_data={'mid': meter_id})
table = soup.find('table', id='metervalue')
if not table:
logger.error(f"Datentabelle 'metervalue' für Zähler {meter_id} nicht gefunden.")
raise HanServiceError(f"Datentabelle 'metervalue' für Zähler {meter_id} nicht gefunden.")
data = {}
mapping = {
'value': 'table_metervalues_col_wert',
'unit': 'table_metervalues_col_einheit',
'timestamp': 'table_metervalues_col_timestamp',
'isvalid': 'table_metervalues_col_istvalide',
'name': 'table_metervalues_col_name',
'obis': 'table_metervalues_col_obis'
}
all_found = True
for key, element_id in mapping.items():
element = table.find(id=element_id)
if element and element.string:
data[key] = element.string.strip()
else:
logger.warning(f"Element mit ID '{element_id}' (für '{key}') nicht gefunden oder leer in Tabelle für Zähler {meter_id}.")
data[key] = '' # Leeren String als Fallback
all_found = False # Merken, wenn etwas fehlt
if not all_found:
logger.warning(f"Nicht alle erwarteten Datenfelder konnten für Zähler {meter_id} gefunden werden.")
logger.info(f"Daten für Zähler {meter_id} erfolgreich abgerufen.")
return data
except HanServiceError as e:
logger.error(f"Fehler beim Abrufen der Daten für Zähler {meter_id}: {e}")
raise # Fehler weiterleiten
except Exception as e:
logger.error(f"Unerwarteter Fehler beim Parsen der Zählerdaten für {meter_id}: {e}")
raise HanServiceError(f"Fehler beim Parsen der Zählerdaten für {meter_id}: {e}") from e
def get_first_meter_data(self) -> Optional[Dict[str, str]]:
"""
Bequemlichkeitsmethode: Ruft die Daten des ersten gefundenen Zählers ab.
Returns:
Optional[Dict[str, str]]: Daten des ersten Zählers oder None, wenn kein Zähler gefunden wurde.
Raises:
HanServiceError: Wenn Fehler beim Abrufen auftreten.
"""
meters = self.get_meter_ids()
if not meters:
logger.warning("Keine Zähler gefunden, um Daten abzurufen.")
return None
first_meter_id = meters[0][0]
logger.info(f"Verwende ersten gefundenen Zähler: ID={first_meter_id}, Name={meters[0][1]}")
return self.get_meter_data(first_meter_id)
def close(self):
"""Schließt die zugrunde liegende HTTP-Session."""
logger.info("Schließe HanServiceReader Session.")
self.session.close()
def __enter__(self):
"""Ermöglicht die Verwendung mit 'with' Statements."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Schließt die Session automatisch beim Verlassen des 'with'-Blocks."""
self.close()
# --- Beispielhafte Verwendung ---
if __name__ == "__main__":
# Konfiguration (anpassen!)
SERVICE_URL = 'https://192.168.1.200/cgi-bin/hanservice.cgi' # Ändere dies zur korrekten URL
USERNAME = '12345678' # Ändere dies zum korrekten Benutzernamen
PASSWORD = 'secret' # Ändere dies zum korrekten Passwort
VERIFY_SSL_CERT = False # Auf True setzen, wenn das Zertifikat gültig ist
try:
# Verwendung mit 'with' Statement (empfohlen)
with HanServiceReader(SERVICE_URL, USERNAME, PASSWORD, verify_ssl=VERIFY_SSL_CERT) as reader:
print("\n--- Test 1: Alle Zähler-IDs abrufen ---")
meter_list = reader.get_meter_ids()
if meter_list:
print("Verfügbare Zähler:")
for mid, mname in meter_list:
print(f" ID: {mid}, Name: {mname}")
# Daten für den ersten Zähler abrufen
print(f"\n--- Test 2: Daten für den ersten Zähler (ID: {meter_list[0][0]}) abrufen ---")
first_meter_data = reader.get_meter_data(meter_list[0][0])
if first_meter_data:
print("Empfangene Daten:")
for key, value in first_meter_data.items():
print(f" {key.capitalize()}: {value}")
# Formatierte Ausgabe wie im Originalskript
print("\nFormatierte Ausgabe:")
print(f"{first_meter_data.get('timestamp', 'N/A')} {first_meter_data.get('value', 'N/A')} {first_meter_data.get('unit', 'N/A')}")
else:
print("Konnte keine Daten für den ersten Zähler abrufen.")
# Daten für alle gefundenen Zähler abrufen (optional)
print("\n--- Test 3: Daten für alle gefundenen Zähler abrufen ---")
for mid, mname in meter_list:
print(f"\nDaten für Zähler ID: {mid} ({mname})")
meter_data = reader.get_meter_data(mid)
if meter_data:
for key, value in meter_data.items():
print(f" {key.capitalize()}: {value}")
else:
print(f" Fehler beim Abrufen der Daten für Zähler {mid}.")
else:
print("Keine Zähler auf dem Gerät gefunden.")
# Alternativ: Nur Daten des ersten Zählers abrufen (Bequemlichkeitsmethode)
print("\n--- Test 4: Daten des ersten Zählers mit get_first_meter_data() ---")
first_data_again = reader.get_first_meter_data()
if first_data_again:
print("Empfangene Daten (erste Methode):")
for key, value in first_data_again.items():
print(f" {key.capitalize()}: {value}")
else:
print("Konnte keine Daten für den ersten Zähler abrufen (Methode 2).")
except HanServiceError as e:
print(f"\nEin Fehler bei der Kommunikation mit dem HanService ist aufgetreten: {e}")
except Exception as e:
print(f"\nEin unerwarteter Fehler ist aufgetreten: {e}")
# Alternative Verwendung ohne 'with' (manuelles Schließen erforderlich)
# print("\n--- Manuelle Verwendung (ohne 'with') ---")
# reader_manual = None
# try:
# reader_manual = HanServiceReader(SERVICE_URL, USERNAME, PASSWORD, verify_ssl=VERIFY_SSL_CERT)
# meters = reader_manual.get_meter_ids()
# print(f"Manuell gefundene Zähler: {meters}")
# # ... weitere Aktionen ...
# except HanServiceError as e:
# print(f"Fehler (manuell): {e}")
# finally:
# if reader_manual:
# reader_manual.close() # Wichtig: Session manuell schließen!
# HanService Reader
Eine Python-Bibliothek zur Abfrage von Zählerdaten von Geräten, die eine spezifische Webschnittstelle unter `/cgi-bin/hanservice.cgi` bereitstellen und HTTP Digest Authentication verwenden.
Diese Bibliothek wurde entwickelt, um die Interaktion mit solchen Geräten zu vereinfachen, indem sie die Authentifizierung, das Session-Management, das Token-Handling und das Parsen der HTML-Antworten kapselt.
## Features
* Verbindung über HTTPS (mit optionaler SSL-Verifizierungs-Deaktivierung).
* Authentifizierung mittels HTTP Digest Authentication.
* Automatisches Management von Session-Cookies.
* Handhabung von CSRF-Tokens (`tkn`), die vom Dienst verwendet werden.
* Abrufen einer Liste aller verfügbaren Zähler-IDs und -Namen.
* Abrufen detaillierter Messwerte für einen bestimmten Zähler (Wert, Einheit, Zeitstempel, OBIS-Kennzahl etc.).
* Implementiert als Klasse (`HanServiceReader`).
* Unterstützung für Context Manager (`with`-Statement) zur automatischen Ressourcenfreigabe (Schließen der Session).
* Grundlegende Fehlerbehandlung mit einer benutzerdefinierten Exception (`HanServiceError`).
* Verwendet das `logging`-Modul für Ausgaben.
## Anforderungen
* Python 3.x
* Bibliotheken:
* `requests`
* `beautifulsoup4`
Du kannst die benötigten Bibliotheken installieren mit:
```bash
pip install requests beautifulsoup4
from hanservice_reader import HanServiceReader, HanServiceError
import logging
# Optional: Logging-Level anpassen, um mehr Details zu sehen
# logging.basicConfig(level=logging.DEBUG)
# --- Konfiguration ---
# Passe diese Werte an dein Gerät an!
SERVICE_URL = '[https://192.168.1.200/cgi-bin/hanservice.cgi](https://192.168.1.200/cgi-bin/hanservice.cgi)' # Deine Geräte-URL
USERNAME = '12345678' # Dein Benutzername
PASSWORD = 'secret' # Dein Passwort
VERIFY_SSL_CERT = False # Auf True setzen, wenn dein Gerät ein gültiges SSL-Zertifikat hat!
try:
# Verwende das 'with'-Statement für automatisches Verbindungsmanagement
with HanServiceReader(SERVICE_URL, USERNAME, PASSWORD, verify_ssl=VERIFY_SSL_CERT) as reader:
# 1. Liste aller verfügbaren Zähler-IDs abrufen
print("--- Abrufen der Zählerliste ---")
meter_list = reader.get_meter_ids()
if not meter_list:
print("Keine Zähler auf dem Gerät gefunden.")
else:
print("Verfügbare Zähler:")
for meter_id, meter_name in meter_list:
print(f" ID: {meter_id}, Name: {meter_name}")
# 2. Daten für einen spezifischen Zähler abrufen (z.B. den ersten)
first_meter_id = meter_list[0][0]
print(f"\n--- Abrufen der Daten für Zähler ID: {first_meter_id} ---")
meter_data = reader.get_meter_data(first_meter_id)
if meter_data:
print("Empfangene Daten:")
for key, value in meter_data.items():
print(f" {key.capitalize()}: {value}")
# Formatierte Ausgabe (Beispiel)
print("\nFormatierte Primärdaten:")
ts = meter_data.get('timestamp', 'N/A')
val = meter_data.get('value', 'N/A')
unit = meter_data.get('unit', 'N/A')
print(f"{ts} - {val} {unit}")
else:
print(f"Konnte keine Daten für Zähler {first_meter_id} abrufen.")
# 3. Bequemlichkeitsmethode: Daten des ersten Zählers direkt abrufen
print("\n--- Abrufen der Daten des ersten Zählers (Methode 2) ---")
first_meter_data_direct = reader.get_first_meter_data()
if first_meter_data_direct:
print("Daten (direkt):")
# ... (Ausgabe wie oben) ...
ts = first_meter_data_direct.get('timestamp', 'N/A')
val = first_meter_data_direct.get('value', 'N/A')
unit = first_meter_data_direct.get('unit', 'N/A')
print(f"{ts} - {val} {unit}")
else:
print("Konnte keine Daten für den ersten Zähler abrufen (Methode 2).")
except HanServiceError as e:
print(f"\nFehler bei der Kommunikation mit dem HanService: {e}")
# Hier könntest du spezifischer auf Fehler reagieren
except Exception as e:
print(f"\nEin unerwarteter Fehler ist aufgetreten: {e}")
vielleicht hat ja jemand Zeit, Lust und nerv zum Ausprobieren und keinen verplombten Zählerkasten 