Newer
Older
homeassistant-infra / ibsystem / ibsystem2mqtt_v5.py
#!/usr/bin/env python3
"""IBSystem -> MQTT Bridge for Home Assistant v5"""
import subprocess, re, time, json, threading, logging, signal, sys
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from http.server import HTTPServer, BaseHTTPRequestHandler
from datetime import datetime
import paho.mqtt.client as mqtt
try:
    import yaml
    HAS_YAML = True
except ImportError:
    HAS_YAML = False

DEFAULT_CONFIG = {"mqtt": {"brokers": [{"name": "ha", "host": "ha.local", "port": 1883}], "user": "mqtt", "password": "mqtt123", "prefix": "ibsystem", "ha_prefix": "homeassistant"}, "ibsystem": {"host": "127.0.0.1", "port": 2001, "rs": 0, "max_ids": 40, "full_ids": [1], "timeout_ms": 10000}, "polling": {"interval_sec": 2.0, "parallel_workers": 8}, "logging": {"level": "INFO", "file": None}, "http": {"enabled": True, "host": "0.0.0.0", "port": 8080}, "friendly_names": {}}

class BridgeStats:
    def __init__(self):
        self.start_time = datetime.now()
        self.poll_cycles = 0
        self.last_poll_time = None
        self.last_poll_duration = 0.0
        self.devices_discovered = 0
        self.entities_discovered = 0
        self.messages_received = 0
        self.commands_executed = 0
        self.errors = 0
        self.last_error = None
        self._lock = threading.Lock()
    def record_poll(self, duration, devices):
        with self._lock: self.poll_cycles += 1; self.last_poll_time = datetime.now(); self.last_poll_duration = duration; self.devices_discovered = devices
    def record_entity(self):
        with self._lock: self.entities_discovered += 1
    def record_command(self):
        with self._lock: self.commands_executed += 1; self.messages_received += 1
    def record_error(self, error):
        with self._lock: self.errors += 1; self.last_error = f"{datetime.now().isoformat()}: {error}"
    def to_dict(self):
        with self._lock:
            uptime = datetime.now() - self.start_time
            return {"status": "running", "uptime_seconds": int(uptime.total_seconds()), "uptime_human": str(uptime).split('.')[0], "start_time": self.start_time.isoformat(), "poll_cycles": self.poll_cycles, "last_poll_time": self.last_poll_time.isoformat() if self.last_poll_time else None, "last_poll_duration_ms": round(self.last_poll_duration * 1000, 1), "devices_discovered": self.devices_discovered, "entities_discovered": self.entities_discovered, "messages_received": self.messages_received, "commands_executed": self.commands_executed, "errors": self.errors, "last_error": self.last_error}
STATS = BridgeStats()

def setup_logging(config):
    level = getattr(logging, config.get("level", "INFO").upper(), logging.INFO)
    handlers = [logging.StreamHandler()]
    if config.get("file"): handlers.append(logging.FileHandler(config["file"]))
    logging.basicConfig(level=level, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", handlers=handlers)
    return logging.getLogger("ibsystem2mqtt")

class FriendlyNames:
    def __init__(self, config): self.names = config.get("friendly_names", {})
    def get_device_name(self, rs, dev_id): return self.names.get(f"rs{rs}_id{dev_id}", f"IBSystem RS{rs} ID{dev_id}")
    def get_entity_name(self, rs, dev_id, path):
        safe_path = re.sub(r"[^a-zA-Z0-9_]+", "_", path)
        if f"rs{rs}_id{dev_id}_{safe_path}" in self.names: return self.names[f"rs{rs}_id{dev_id}_{safe_path}"]
        device_name = self.names.get(f"rs{rs}_id{dev_id}")
        readable = self._path_to_readable(path)
        return f"{device_name} {readable}" if device_name else f"RS{rs} ID{dev_id} {readable}"
    def _path_to_readable(self, path):
        m = re.match(r"output\.do\.(\d+)$", path)
        if m: return f"Wyjscie DO {m.group(1)}"
        m = re.match(r"output\.px\.(\d+)$", path)
        if m: return f"PWM {m.group(1)}"
        m = re.match(r"input\.di\.(\d+)\.(a|b)$", path)
        if m: return f"Wejscie DI {m.group(1)} {m.group(2).upper()}"
        m = re.match(r"input\.t\.(\d+)\.value$", path)
        if m: return f"Temperatura {m.group(1)}"
        return path.replace(".", " ").title()

@dataclass
class DeviceRecord:
    rs: str
    id: str
    path: str
    val: str
    @property
    def unique_id(self): return f"ibsystem_rs{self.rs}_id{self.id}_{re.sub(r'[^a-zA-Z0-9_]+', '_', self.path)}"
    @property
    def device_id(self): return f"ibsystem_rs{self.rs}_id{self.id}"
    @property
    def component(self):
        if self.path.startswith("output.do."): return "switch"
        if self.path.startswith("input.di."): return "binary_sensor"
        return "sensor"
    @property
    def device_class(self): return "temperature" if "temp" in self.path.lower() or self.path.startswith("input.t.") else None
    @property
    def unit_of_measurement(self): return "C" if self.path.startswith("input.t.") and "value" in self.path else None

class DiagnosticsHandler(BaseHTTPRequestHandler):
    bridge = None
    def log_message(self, format, *args): pass
    def do_GET(self):
        if self.path in ("/", "/status"): self._send_json(STATS.to_dict())
        elif self.path == "/health": self._send_json({"status": "ok"})
        elif self.path == "/config" and self.bridge:
            cfg = json.loads(json.dumps(self.bridge.config)); cfg["mqtt"]["password"] = "***"; self._send_json(cfg)
        elif self.path == "/entities" and self.bridge:
            self._send_json({"count": len(self.bridge.discovered), "entities": sorted(list(self.bridge.discovered))})
        else: self.send_error(404)
    def _send_json(self, data):
        content = json.dumps(data, indent=2).encode()
        self.send_response(200); self.send_header("Content-Type", "application/json"); self.send_header("Content-Length", len(content)); self.end_headers(); self.wfile.write(content)

class DiagnosticsServer:
    def __init__(self, host, port, logger): self.host = host; self.port = port; self.logger = logger; self.server = None
    def start(self, bridge):
        DiagnosticsHandler.bridge = bridge
        try:
            self.server = HTTPServer((self.host, self.port), DiagnosticsHandler)
            threading.Thread(target=self.server.serve_forever, daemon=True).start()
            self.logger.info(f"Diagnostics: http://{self.host}:{self.port}")
        except Exception as e: self.logger.error(f"Diagnostics failed: {e}")
    def stop(self):
        if self.server: self.server.shutdown()

class IBLSRunner:
    def __init__(self, host, port, timeout_ms, logger): self.host = host; self.port = port; self.timeout_ms = timeout_ms; self.logger = logger; self._lock = threading.Lock()
    def run(self, command):
        cmd = f'/ibsystem/ibls -a {self.host} --pretty -p {self.port} --timeout={self.timeout_ms} -c "{command}"'
        try:
            result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=30)
            if result.returncode != 0: self.logger.warning(f"ibls failed: {result.stderr}"); STATS.record_error(f"ibls: {result.stderr[:50]}"); return None
            return result.stdout.strip()
        except Exception as e: self.logger.error(f"ibls error: {e}"); STATS.record_error(str(e)); return None
    def get_device(self, rs, dev_id, full=False):
        cmd = f"get(rs.{rs}.id.{dev_id};);" if full else f"get(rs.{rs}.id.{dev_id}.input;);get(rs.{rs}.id.{dev_id}.output;);"
        output = self.run(cmd)
        if not output: return []
        records = []
        for line in output.splitlines():
            m = re.match(r"rs\.(\d+)\.id\.(\d+)\.(.+?)\s*=\s*(.+)", line.strip())
            if m: records.append(DeviceRecord(rs=m.group(1), id=m.group(2), path=m.group(3).strip(), val=m.group(4).strip()))
        return records
    def set_output(self, rs, dev_id, output_path, value):
        with self._lock: return self.run(f"set(rs.{rs}.id.{dev_id}.{output_path}={value};);") is not None

class MQTTBridge:
    def __init__(self, config, ibls, logger):
        self.config = config; self.ibls = ibls; self.logger = logger
        self.friendly_names = FriendlyNames(config)
        self.mqtt_prefix = config["mqtt"]["prefix"]; self.ha_prefix = config["mqtt"]["ha_prefix"]
        self.clients = []; self.discovered = set(); self.last_published = {}
        self._cmd_lock = threading.Lock(); self._last_cmd = {}
    def start(self):
        for broker in self.config["mqtt"]["brokers"]:
            client = mqtt.Client(client_id=f"ibsystem2mqtt-{broker['name']}", protocol=mqtt.MQTTv311)
            client.username_pw_set(self.config["mqtt"]["user"], self.config["mqtt"]["password"])
            client.on_message = self._on_message; client._broker_name = broker["name"]
            client.will_set(f"{self.mqtt_prefix}/bridge/availability", "offline", retain=True)
            try:
                client.connect(broker["host"], broker["port"], 60); client.loop_start(); self.clients.append(client)
                self.logger.info(f"Connected: {broker['name']} ({broker['host']})")
            except Exception as e: self.logger.error(f"MQTT failed {broker['name']}: {e}"); STATS.record_error(f"MQTT: {broker['name']}")
        self._publish_all(f"{self.mqtt_prefix}/bridge/availability", "online")
    def stop(self):
        self._publish_all(f"{self.mqtt_prefix}/bridge/availability", "offline")
        for c in self.clients: c.loop_stop(); c.disconnect()
    def _on_message(self, client, userdata, msg):
        try:
            STATS.record_command(); parts = msg.topic.split("/")
            if len(parts) < 5 or parts[0] != self.mqtt_prefix or parts[3] != "set": return
            rs, dev_id, target = parts[1].replace("rs",""), parts[2].replace("id",""), parts[4]
            m = re.match(r"do(\d+)$", target)
            if not m: return
            do_num = m.group(1); payload = msg.payload.decode().strip().upper()
            value = "1" if payload in ("ON","1","TRUE") else "0" if payload in ("OFF","0","FALSE") else None
            if not value: return
            key = (rs, dev_id, do_num, value); now = time.time()
            with self._cmd_lock:
                self._last_cmd = {k:v for k,v in self._last_cmd.items() if v > now-0.5}
                if key in self._last_cmd: return
                self._last_cmd[key] = now
            self.logger.info(f"Command: rs{rs}/id{dev_id}/do{do_num} = {value}")
            if self.ibls.set_output(int(rs), int(dev_id), f"setting.light.{do_num}", value):
                self._publish_all(f"{self.mqtt_prefix}/rs{rs}/id{dev_id}/output/do/{do_num}/state", "ON" if value=="1" else "OFF")
        except Exception as e: self.logger.error(f"Message error: {e}"); STATS.record_error(str(e))
    def poll_and_publish(self):
        cfg = self.config["ibsystem"]; rs = cfg["rs"]; full_ids = set(cfg.get("full_ids", []))
        all_records = []; devices = 0
        with ThreadPoolExecutor(max_workers=self.config["polling"].get("parallel_workers", 4)) as ex:
            futures = {ex.submit(self.ibls.get_device, rs, i, i in full_ids): i for i in range(1, cfg["max_ids"]+1)}
            for f in as_completed(futures):
                try:
                    recs = f.result()
                    if recs: devices += 1; all_records.extend(recs)
                except: pass
        for rec in all_records: self._publish_discovery(rec); self._publish_state(rec)
        return devices
    def _publish_discovery(self, rec):
        if rec.unique_id in self.discovered: return
        component = rec.component; state_topic = self._state_topic(rec)
        payload = {"name": self.friendly_names.get_entity_name(rec.rs, rec.id, rec.path), "state_topic": state_topic, "unique_id": rec.unique_id, "availability_topic": f"{self.mqtt_prefix}/bridge/availability", "device": {"identifiers": [rec.device_id], "name": self.friendly_names.get_device_name(rec.rs, rec.id), "manufacturer": "IBSystem"}}
        if component == "switch":
            m = re.match(r"output\.do\.(\d+)$", rec.path)
            if m: payload.update({"command_topic": f"{self.mqtt_prefix}/rs{rec.rs}/id{rec.id}/set/do{m.group(1)}", "payload_on": "ON", "payload_off": "OFF", "state_on": "ON", "state_off": "OFF"})
        elif component == "binary_sensor": payload.update({"payload_on": "1", "payload_off": "0"})
        elif rec.device_class: payload["device_class"] = rec.device_class
        if rec.unit_of_measurement: payload["unit_of_measurement"] = rec.unit_of_measurement; payload["value_template"] = "{{ value | float / 10 }}" if rec.device_class == "temperature" else None
        self._publish_all(f"{self.ha_prefix}/{component}/{rec.unique_id}/config", json.dumps(payload))
        self.discovered.add(rec.unique_id); STATS.record_entity()
    def _publish_state(self, rec):
        topic = self._state_topic(rec)
        value = ("ON" if rec.val in ("1","true","ON") else "OFF") if rec.component == "switch" else rec.val
        for c in self.clients:
            cache = self.last_published.setdefault(c._broker_name, {})
            if cache.get(topic) != value: cache[topic] = value; c.publish(topic, value, retain=True)
    def _state_topic(self, rec):
        p = rec.path
        m = re.match(r"output\.do\.(\d+)$", p)
        if m: return f"{self.mqtt_prefix}/rs{rec.rs}/id{rec.id}/output/do/{m.group(1)}/state"
        m = re.match(r"input\.di\.(\d+)\.(a|b)$", p)
        if m: return f"{self.mqtt_prefix}/rs{rec.rs}/id{rec.id}/input/di/{m.group(1)}/{m.group(2)}"
        m = re.match(r"input\.t\.(\d+)\.(\w+)$", p)
        if m: return f"{self.mqtt_prefix}/rs{rec.rs}/id{rec.id}/input/t/{m.group(1)}/{m.group(2)}"
        return f"{self.mqtt_prefix}/rs{rec.rs}/id{rec.id}/{p.replace('.', '/')}"
    def _publish_all(self, topic, payload, retain=True):
        for c in self.clients: c.publish(topic, payload, retain=retain)

def load_config(path=None):
    if path and HAS_YAML and Path(path).exists():
        with open(path) as f: user = yaml.safe_load(f)
        cfg = json.loads(json.dumps(DEFAULT_CONFIG))
        for k,v in user.items(): cfg[k].update(v) if isinstance(v,dict) and k in cfg else cfg.update({k:v})
        return cfg
    return json.loads(json.dumps(DEFAULT_CONFIG))

def main():
    import argparse
    p = argparse.ArgumentParser(); p.add_argument("-c", "--config"); p.add_argument("-v", "--verbose", action="store_true")
    args = p.parse_args(); config = load_config(args.config)
    if args.verbose: config["logging"]["level"] = "DEBUG"
    logger = setup_logging(config["logging"]); logger.info("Starting IBSystem2MQTT v5")
    ibls = IBLSRunner(config["ibsystem"]["host"], config["ibsystem"]["port"], config["ibsystem"]["timeout_ms"], logger)
    bridge = MQTTBridge(config, ibls, logger)
    diag = DiagnosticsServer(config["http"]["host"], config["http"]["port"], logger) if config.get("http",{}).get("enabled") else None
    if diag: diag.start(bridge)
    def shutdown(sig, frame): logger.info("Shutdown"); diag and diag.stop(); bridge.stop(); sys.exit(0)
    signal.signal(signal.SIGINT, shutdown); signal.signal(signal.SIGTERM, shutdown)
    bridge.start()
    for c in bridge.clients: c.subscribe(f"{bridge.mqtt_prefix}/+/+/set/#")
    interval = config["polling"]["interval_sec"]
    while True:
        try:
            t0 = time.time(); devices = bridge.poll_and_publish(); elapsed = time.time() - t0
            STATS.record_poll(elapsed, devices); logger.debug(f"Poll: {devices} in {elapsed:.2f}s")
            time.sleep(max(0, interval - elapsed))
        except Exception as e: logger.error(f"Loop error: {e}"); STATS.record_error(str(e)); time.sleep(interval)

if __name__ == "__main__": main()