#!/usr/bin/env python3
"""IBSystem -> MQTT Bridge for Home Assistant v5"""
import subprocess, re, time, json, threading, logging, signal, sys
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from http.server import HTTPServer, BaseHTTPRequestHandler
from datetime import datetime
import paho.mqtt.client as mqtt
try:
import yaml
HAS_YAML = True
except ImportError:
HAS_YAML = False
DEFAULT_CONFIG = {"mqtt": {"brokers": [{"name": "ha", "host": "ha.local", "port": 1883}], "user": "mqtt", "password": "mqtt123", "prefix": "ibsystem", "ha_prefix": "homeassistant"}, "ibsystem": {"host": "127.0.0.1", "port": 2001, "rs": 0, "max_ids": 40, "full_ids": [1], "timeout_ms": 10000}, "polling": {"interval_sec": 2.0, "parallel_workers": 8}, "logging": {"level": "INFO", "file": None}, "http": {"enabled": True, "host": "0.0.0.0", "port": 8080}, "friendly_names": {}}
class BridgeStats:
def __init__(self):
self.start_time = datetime.now()
self.poll_cycles = 0
self.last_poll_time = None
self.last_poll_duration = 0.0
self.devices_discovered = 0
self.entities_discovered = 0
self.messages_received = 0
self.commands_executed = 0
self.errors = 0
self.last_error = None
self._lock = threading.Lock()
def record_poll(self, duration, devices):
with self._lock: self.poll_cycles += 1; self.last_poll_time = datetime.now(); self.last_poll_duration = duration; self.devices_discovered = devices
def record_entity(self):
with self._lock: self.entities_discovered += 1
def record_command(self):
with self._lock: self.commands_executed += 1; self.messages_received += 1
def record_error(self, error):
with self._lock: self.errors += 1; self.last_error = f"{datetime.now().isoformat()}: {error}"
def to_dict(self):
with self._lock:
uptime = datetime.now() - self.start_time
return {"status": "running", "uptime_seconds": int(uptime.total_seconds()), "uptime_human": str(uptime).split('.')[0], "start_time": self.start_time.isoformat(), "poll_cycles": self.poll_cycles, "last_poll_time": self.last_poll_time.isoformat() if self.last_poll_time else None, "last_poll_duration_ms": round(self.last_poll_duration * 1000, 1), "devices_discovered": self.devices_discovered, "entities_discovered": self.entities_discovered, "messages_received": self.messages_received, "commands_executed": self.commands_executed, "errors": self.errors, "last_error": self.last_error}
STATS = BridgeStats()
def setup_logging(config):
level = getattr(logging, config.get("level", "INFO").upper(), logging.INFO)
handlers = [logging.StreamHandler()]
if config.get("file"): handlers.append(logging.FileHandler(config["file"]))
logging.basicConfig(level=level, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", handlers=handlers)
return logging.getLogger("ibsystem2mqtt")
class FriendlyNames:
def __init__(self, config): self.names = config.get("friendly_names") or {}
def get_device_name(self, rs, dev_id): return self.names.get(f"rs{rs}_id{dev_id}", f"IBSystem RS{rs} ID{dev_id}")
def get_entity_name(self, rs, dev_id, path):
safe_path = re.sub(r"[^a-zA-Z0-9_]+", "_", path)
if f"rs{rs}_id{dev_id}_{safe_path}" in self.names: return self.names[f"rs{rs}_id{dev_id}_{safe_path}"]
device_name = self.names.get(f"rs{rs}_id{dev_id}")
readable = self._path_to_readable(path)
return f"{device_name} {readable}" if device_name else f"RS{rs} ID{dev_id} {readable}"
def _path_to_readable(self, path):
m = re.match(r"output\.do\.(\d+)$", path)
if m: return f"Wyjscie DO {m.group(1)}"
m = re.match(r"output\.px\.(\d+)$", path)
if m: return f"PWM {m.group(1)}"
m = re.match(r"input\.di\.(\d+)\.(a|b)$", path)
if m: return f"Wejscie DI {m.group(1)} {m.group(2).upper()}"
m = re.match(r"input\.t\.(\d+)\.value$", path)
if m: return f"Temperatura {m.group(1)}"
return path.replace(".", " ").title()
@dataclass
class DeviceRecord:
rs: str
id: str
path: str
val: str
@property
def unique_id(self): return f"ibsystem_rs{self.rs}_id{self.id}_{re.sub(r'[^a-zA-Z0-9_]+', '_', self.path)}"
@property
def device_id(self): return f"ibsystem_rs{self.rs}_id{self.id}"
@property
def component(self):
if self.path.startswith("output.do."): return "switch"
if self.path.startswith("input.di."): return "binary_sensor"
return "sensor"
@property
def device_class(self): return "temperature" if "temp" in self.path.lower() or self.path.startswith("input.t.") else None
@property
def unit_of_measurement(self): return "C" if self.path.startswith("input.t.") and "value" in self.path else None
class DiagnosticsHandler(BaseHTTPRequestHandler):
bridge = None
def log_message(self, format, *args): pass
def do_GET(self):
if self.path in ("/", "/status"): self._send_json(STATS.to_dict())
elif self.path == "/health": self._send_json({"status": "ok"})
elif self.path == "/config" and self.bridge:
cfg = json.loads(json.dumps(self.bridge.config)); cfg["mqtt"]["password"] = "***"; self._send_json(cfg)
elif self.path == "/entities" and self.bridge:
self._send_json({"count": len(self.bridge.discovered), "entities": sorted(list(self.bridge.discovered))})
else: self.send_error(404)
def _send_json(self, data):
content = json.dumps(data, indent=2).encode()
self.send_response(200); self.send_header("Content-Type", "application/json"); self.send_header("Content-Length", len(content)); self.end_headers(); self.wfile.write(content)
class DiagnosticsServer:
def __init__(self, host, port, logger): self.host = host; self.port = port; self.logger = logger; self.server = None
def start(self, bridge):
DiagnosticsHandler.bridge = bridge
try:
self.server = HTTPServer((self.host, self.port), DiagnosticsHandler)
threading.Thread(target=self.server.serve_forever, daemon=True).start()
self.logger.info(f"Diagnostics: http://{self.host}:{self.port}")
except Exception as e: self.logger.error(f"Diagnostics failed: {e}")
def stop(self):
if self.server: self.server.shutdown()
class IBLSRunner:
def __init__(self, host, port, timeout_ms, logger): self.host = host; self.port = port; self.timeout_ms = timeout_ms; self.logger = logger; self._lock = threading.Lock()
def run(self, command):
cmd = f'/ibsystem/ibls -a {self.host} --pretty -p {self.port} --timeout={self.timeout_ms} -c "{command}"'
try:
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=30)
if result.returncode != 0: self.logger.warning(f"ibls failed: {result.stderr}"); STATS.record_error(f"ibls: {result.stderr[:50]}"); return None
return result.stdout.strip()
except Exception as e: self.logger.error(f"ibls error: {e}"); STATS.record_error(str(e)); return None
def get_device(self, rs, dev_id, full=False):
cmd = f"get(rs.{rs}.id.{dev_id};);" if full else f"get(rs.{rs}.id.{dev_id}.input;);get(rs.{rs}.id.{dev_id}.output;);"
output = self.run(cmd)
if not output: return []
records = []
for line in output.splitlines():
m = re.match(r"rs\.(\d+)\.id\.(\d+)\.(.+?)\s*=\s*(.+)", line.strip())
if m: records.append(DeviceRecord(rs=m.group(1), id=m.group(2), path=m.group(3).strip(), val=m.group(4).strip()))
return records
def set_output(self, rs, dev_id, output_path, value):
with self._lock: return self.run(f"set(rs.{rs}.id.{dev_id}.{output_path}={value};);") is not None
class MQTTBridge:
def __init__(self, config, ibls, logger):
self.config = config; self.ibls = ibls; self.logger = logger
self.friendly_names = FriendlyNames(config)
self.mqtt_prefix = config["mqtt"]["prefix"]; self.ha_prefix = config["mqtt"]["ha_prefix"]
self.clients = []; self.discovered = set(); self.last_published = {}
self._cmd_lock = threading.Lock(); self._last_cmd = {}
def start(self):
for broker in self.config["mqtt"]["brokers"]:
client = mqtt.Client(client_id=f"ibsystem2mqtt-{broker['name']}", protocol=mqtt.MQTTv311)
client.username_pw_set(self.config["mqtt"]["user"], self.config["mqtt"]["password"])
client.on_message = self._on_message; client._broker_name = broker["name"]
client.will_set(f"{self.mqtt_prefix}/bridge/availability", "offline", retain=True)
try:
client.connect(broker["host"], broker["port"], 60); client.loop_start(); self.clients.append(client)
self.logger.info(f"Connected: {broker['name']} ({broker['host']})")
except Exception as e: self.logger.error(f"MQTT failed {broker['name']}: {e}"); STATS.record_error(f"MQTT: {broker['name']}")
self._publish_all(f"{self.mqtt_prefix}/bridge/availability", "online")
def stop(self):
self._publish_all(f"{self.mqtt_prefix}/bridge/availability", "offline")
for c in self.clients: c.loop_stop(); c.disconnect()
def _on_message(self, client, userdata, msg):
try:
STATS.record_command(); parts = msg.topic.split("/")
if len(parts) < 5 or parts[0] != self.mqtt_prefix or parts[3] != "set": return
rs, dev_id, target = parts[1].replace("rs",""), parts[2].replace("id",""), parts[4]
m = re.match(r"do(\d+)$", target)
if not m: return
do_num = m.group(1); payload = msg.payload.decode().strip().upper()
value = "1" if payload in ("ON","1","TRUE") else "0" if payload in ("OFF","0","FALSE") else None
if not value: return
key = (rs, dev_id, do_num, value); now = time.time()
with self._cmd_lock:
self._last_cmd = {k:v for k,v in self._last_cmd.items() if v > now-0.5}
if key in self._last_cmd: return
self._last_cmd[key] = now
self.logger.info(f"Command: rs{rs}/id{dev_id}/do{do_num} = {value}")
if self.ibls.set_output(int(rs), int(dev_id), f"setting.light.{do_num}", value):
self._publish_all(f"{self.mqtt_prefix}/rs{rs}/id{dev_id}/output/do/{do_num}/state", "ON" if value=="1" else "OFF")
except Exception as e: self.logger.error(f"Message error: {e}"); STATS.record_error(str(e))
def poll_and_publish(self):
cfg = self.config["ibsystem"]; rs = cfg["rs"]; full_ids = set(cfg.get("full_ids", []))
all_records = []; devices = 0
with ThreadPoolExecutor(max_workers=self.config["polling"].get("parallel_workers", 4)) as ex:
futures = {ex.submit(self.ibls.get_device, rs, i, i in full_ids): i for i in range(1, cfg["max_ids"]+1)}
for f in as_completed(futures):
try:
recs = f.result()
if recs: devices += 1; all_records.extend(recs)
except: pass
for rec in all_records: self._publish_discovery(rec); self._publish_state(rec)
return devices
def _publish_discovery(self, rec):
if rec.unique_id in self.discovered: return
component = rec.component; state_topic = self._state_topic(rec)
payload = {"name": self.friendly_names.get_entity_name(rec.rs, rec.id, rec.path), "state_topic": state_topic, "unique_id": rec.unique_id, "availability_topic": f"{self.mqtt_prefix}/bridge/availability", "device": {"identifiers": [rec.device_id], "name": self.friendly_names.get_device_name(rec.rs, rec.id), "manufacturer": "IBSystem"}}
if component == "switch":
m = re.match(r"output\.do\.(\d+)$", rec.path)
if m: payload.update({"command_topic": f"{self.mqtt_prefix}/rs{rec.rs}/id{rec.id}/set/do{m.group(1)}", "payload_on": "ON", "payload_off": "OFF", "state_on": "ON", "state_off": "OFF"})
elif component == "binary_sensor": payload.update({"payload_on": "1", "payload_off": "0"})
elif rec.device_class: payload["device_class"] = rec.device_class
if rec.unit_of_measurement: payload["unit_of_measurement"] = rec.unit_of_measurement; payload["value_template"] = "{{ value | float / 10 }}" if rec.device_class == "temperature" else None
self._publish_all(f"{self.ha_prefix}/{component}/{rec.unique_id}/config", json.dumps(payload))
self.discovered.add(rec.unique_id); STATS.record_entity()
def _publish_state(self, rec):
topic = self._state_topic(rec)
value = ("ON" if rec.val in ("1","true","ON") else "OFF") if rec.component == "switch" else rec.val
for c in self.clients:
cache = self.last_published.setdefault(c._broker_name, {})
if cache.get(topic) != value: cache[topic] = value; c.publish(topic, value, retain=True)
def _state_topic(self, rec):
p = rec.path
m = re.match(r"output\.do\.(\d+)$", p)
if m: return f"{self.mqtt_prefix}/rs{rec.rs}/id{rec.id}/output/do/{m.group(1)}/state"
m = re.match(r"input\.di\.(\d+)\.(a|b)$", p)
if m: return f"{self.mqtt_prefix}/rs{rec.rs}/id{rec.id}/input/di/{m.group(1)}/{m.group(2)}"
m = re.match(r"input\.t\.(\d+)\.(\w+)$", p)
if m: return f"{self.mqtt_prefix}/rs{rec.rs}/id{rec.id}/input/t/{m.group(1)}/{m.group(2)}"
return f"{self.mqtt_prefix}/rs{rec.rs}/id{rec.id}/{p.replace('.', '/')}"
def _publish_all(self, topic, payload, retain=True):
for c in self.clients: c.publish(topic, payload, retain=retain)
def load_config(path=None):
if path and HAS_YAML and Path(path).exists():
with open(path) as f: user = yaml.safe_load(f)
cfg = json.loads(json.dumps(DEFAULT_CONFIG))
for k,v in user.items(): cfg[k].update(v) if isinstance(v,dict) and k in cfg else cfg.update({k:v})
return cfg
return json.loads(json.dumps(DEFAULT_CONFIG))
def main():
import argparse
p = argparse.ArgumentParser(); p.add_argument("-c", "--config"); p.add_argument("-v", "--verbose", action="store_true")
args = p.parse_args(); config = load_config(args.config)
if args.verbose: config["logging"]["level"] = "DEBUG"
logger = setup_logging(config["logging"]); logger.info("Starting IBSystem2MQTT v5")
ibls = IBLSRunner(config["ibsystem"]["host"], config["ibsystem"]["port"], config["ibsystem"]["timeout_ms"], logger)
bridge = MQTTBridge(config, ibls, logger)
diag = DiagnosticsServer(config["http"]["host"], config["http"]["port"], logger) if config.get("http",{}).get("enabled") else None
if diag: diag.start(bridge)
def shutdown(sig, frame): logger.info("Shutdown"); diag and diag.stop(); bridge.stop(); sys.exit(0)
signal.signal(signal.SIGINT, shutdown); signal.signal(signal.SIGTERM, shutdown)
bridge.start()
for c in bridge.clients: c.subscribe(f"{bridge.mqtt_prefix}/+/+/set/#")
interval = config["polling"]["interval_sec"]
while True:
try:
t0 = time.time(); devices = bridge.poll_and_publish(); elapsed = time.time() - t0
STATS.record_poll(elapsed, devices); logger.debug(f"Poll: {devices} in {elapsed:.2f}s")
time.sleep(max(0, interval - elapsed))
except Exception as e: import traceback; logger.error(f"Loop error: {e}\n{traceback.format_exc()}"); STATS.record_error(str(e)); time.sleep(interval)
if __name__ == "__main__": main()