feat: full implementation - switcher, device watcher, hotkeys, tray, settings UI

This commit is contained in:
Miłosz Matysiak
2026-04-09 10:09:01 +02:00
parent 40d1c23051
commit 83293c8a91
13 changed files with 1162 additions and 0 deletions

124
core/device_watcher.py Normal file
View File

@@ -0,0 +1,124 @@
import logging
import threading
logger = logging.getLogger(__name__)
class DeviceWatcher:
def __init__(self, config_manager, switcher):
self._config_manager = config_manager
self._switcher = switcher
self._stop_event = threading.Event()
self._debounce_timers: dict[str, threading.Timer] = {}
self._debounce_lock = threading.Lock()
self._thread: threading.Thread | None = None
def start(self) -> None:
self._stop_event.clear()
self._thread = threading.Thread(
target=self._watch_loop, daemon=True, name="DeviceWatcher"
)
self._thread.start()
logger.info("DeviceWatcher started")
def stop(self) -> None:
self._stop_event.set()
def _watch_loop(self) -> None:
retries = 0
max_retries = 3
while not self._stop_event.is_set() and retries <= max_retries:
try:
self._run_watcher()
retries = 0
except Exception as exc:
retries += 1
logger.error(
"WMI watcher error (attempt %d/%d): %s", retries, max_retries, exc
)
self._stop_event.wait(5)
logger.warning("DeviceWatcher stopped after %d retries", retries)
def _run_watcher(self) -> None:
import pythoncom
import wmi
pythoncom.CoInitialize()
try:
c = wmi.WMI()
watcher_create = c.watch_for(
notification_type="Creation",
wmi_class="Win32_PnPEntity",
delay_secs=1,
)
watcher_delete = c.watch_for(
notification_type="Deletion",
wmi_class="Win32_PnPEntity",
delay_secs=1,
)
t_create = threading.Thread(
target=self._watcher_thread,
args=(watcher_create, "connect"),
daemon=True,
)
t_delete = threading.Thread(
target=self._watcher_thread,
args=(watcher_delete, "disconnect"),
daemon=True,
)
t_create.start()
t_delete.start()
t_create.join()
t_delete.join()
finally:
pythoncom.CoUninitialize()
def _watcher_thread(self, watcher, event_type: str) -> None:
import wmi
while not self._stop_event.is_set():
try:
event = watcher(timeout_ms=1000)
if event:
self._handle_event(event.DeviceID or "", event_type)
except wmi.x_wmi_timed_out:
pass
except Exception as exc:
logger.error("Watcher thread (%s) error: %s", event_type, exc)
break
def _handle_event(self, device_id: str, event_type: str) -> None:
config = self._config_manager.get()
for trigger in config.get("device_triggers", []):
if not trigger.get("enabled", True):
continue
if trigger["bt_device_id"] in device_id:
profile_id = (
trigger["on_connect"]
if event_type == "connect"
else trigger["on_disconnect"]
)
logger.info(
"Device event '%s' matched trigger '%s' -> profile '%s'",
event_type,
trigger["name"],
profile_id,
)
self._schedule_switch(device_id + event_type, profile_id)
return
def _schedule_switch(self, key: str, profile_id: str) -> None:
debounce_s = (
self._config_manager.get()
.get("general", {})
.get("debounce_ms", 500) / 1000.0
)
with self._debounce_lock:
existing = self._debounce_timers.get(key)
if existing:
existing.cancel()
timer = threading.Timer(
debounce_s, self._switcher.apply_profile, args=[profile_id]
)
self._debounce_timers[key] = timer
timer.start()