import logging import threading logger = logging.getLogger(__name__) class DeviceWatcher: def __init__(self, config_manager, switcher): self._config_manager = config_manager self._switcher = switcher self._stop_event = threading.Event() self._debounce_timers: dict[str, threading.Timer] = {} self._debounce_lock = threading.Lock() self._thread: threading.Thread | None = None def start(self) -> None: self._stop_event.clear() self._thread = threading.Thread( target=self._watch_loop, daemon=True, name="DeviceWatcher" ) self._thread.start() logger.info("DeviceWatcher started") def stop(self) -> None: self._stop_event.set() def _watch_loop(self) -> None: retries = 0 max_retries = 3 while not self._stop_event.is_set() and retries <= max_retries: try: self._run_watcher() retries = 0 except Exception as exc: retries += 1 logger.error( "WMI watcher error (attempt %d/%d): %s", retries, max_retries, exc ) self._stop_event.wait(5) logger.warning("DeviceWatcher stopped after %d retries", retries) def _run_watcher(self) -> None: import pythoncom import wmi pythoncom.CoInitialize() try: c = wmi.WMI() watcher_create = c.watch_for( notification_type="Creation", wmi_class="Win32_PnPEntity", delay_secs=1, ) watcher_delete = c.watch_for( notification_type="Deletion", wmi_class="Win32_PnPEntity", delay_secs=1, ) t_create = threading.Thread( target=self._watcher_thread, args=(watcher_create, "connect"), daemon=True, ) t_delete = threading.Thread( target=self._watcher_thread, args=(watcher_delete, "disconnect"), daemon=True, ) t_create.start() t_delete.start() t_create.join() t_delete.join() finally: pythoncom.CoUninitialize() def _watcher_thread(self, watcher, event_type: str) -> None: import wmi while not self._stop_event.is_set(): try: event = watcher(timeout_ms=1000) if event: self._handle_event(event.DeviceID or "", event_type) except wmi.x_wmi_timed_out: pass except Exception as exc: logger.error("Watcher thread (%s) error: %s", event_type, exc) break def _handle_event(self, device_id: str, event_type: str) -> None: config = self._config_manager.get() for trigger in config.get("device_triggers", []): if not trigger.get("enabled", True): continue if trigger["bt_device_id"] in device_id: profile_id = ( trigger["on_connect"] if event_type == "connect" else trigger["on_disconnect"] ) logger.info( "Device event '%s' matched trigger '%s' -> profile '%s'", event_type, trigger["name"], profile_id, ) self._schedule_switch(device_id + event_type, profile_id) return def _schedule_switch(self, key: str, profile_id: str) -> None: debounce_s = ( self._config_manager.get() .get("general", {}) .get("debounce_ms", 500) / 1000.0 ) with self._debounce_lock: existing = self._debounce_timers.get(key) if existing: existing.cancel() timer = threading.Timer( debounce_s, self._switcher.apply_profile, args=[profile_id] ) self._debounce_timers[key] = timer timer.start()