From 83293c8a91cb12ac4d26a7ddec9066e145470381 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mi=C5=82osz=20Matysiak?= Date: Thu, 9 Apr 2026 10:09:01 +0200 Subject: [PATCH] feat: full implementation - switcher, device watcher, hotkeys, tray, settings UI --- config/config.json | 44 +++++ config/config_manager.py | 100 ++++++++++ core/device_watcher.py | 124 +++++++++++++ core/hotkey_manager.py | 34 ++++ core/switcher.py | 54 ++++++ detect_vcp.py | 22 +++ main.py | 67 +++++++ tests/test_config_manager.py | 63 +++++++ tests/test_device_watcher.py | 62 +++++++ tests/test_hotkey_manager.py | 67 +++++++ tests/test_switcher.py | 93 ++++++++++ ui/settings_window.py | 342 +++++++++++++++++++++++++++++++++++ ui/tray.py | 90 +++++++++ 13 files changed, 1162 insertions(+) create mode 100644 config/config.json create mode 100644 config/config_manager.py create mode 100644 core/device_watcher.py create mode 100644 core/hotkey_manager.py create mode 100644 core/switcher.py create mode 100644 detect_vcp.py create mode 100644 main.py create mode 100644 tests/test_config_manager.py create mode 100644 tests/test_device_watcher.py create mode 100644 tests/test_hotkey_manager.py create mode 100644 tests/test_switcher.py create mode 100644 ui/settings_window.py create mode 100644 ui/tray.py diff --git a/config/config.json b/config/config.json new file mode 100644 index 0000000..da70d99 --- /dev/null +++ b/config/config.json @@ -0,0 +1,44 @@ +{ + "active_profile": "pc1_dp", + "profiles": [ + { + "id": "pc1_dp", + "name": "PC1 — DP", + "hotkey": "ctrl+alt+1", + "monitor_inputs": [ + {"monitor_index": 0, "vcp_value": 15}, + {"monitor_index": 1, "vcp_value": 15} + ] + }, + { + "id": "pc2_hdmi", + "name": "PC2 — HDMI", + "hotkey": "ctrl+alt+2", + "monitor_inputs": [ + {"monitor_index": 0, "vcp_value": 17}, + {"monitor_index": 1, "vcp_value": 17} + ] + } + ], + "device_triggers": [ + { + "name": "HP 975 keyboard", + "bt_device_id": "VID&0203F0_PID&6343", + "on_connect": "pc1_dp", + "on_disconnect": "pc2_hdmi", + "enabled": true + }, + { + "name": "MX Anywhere 2S", + "bt_device_id": "VID&02046D_PID&B01A", + "on_connect": "pc1_dp", + "on_disconnect": "pc2_hdmi", + "enabled": true + } + ], + "general": { + "autostart": true, + "minimize_to_tray": true, + "debounce_ms": 500 + } +} diff --git a/config/config_manager.py b/config/config_manager.py new file mode 100644 index 0000000..49aca85 --- /dev/null +++ b/config/config_manager.py @@ -0,0 +1,100 @@ +import json +import copy +from pathlib import Path +from typing import Callable + +DEFAULT_CONFIG = { + "active_profile": "pc1_dp", + "profiles": [ + { + "id": "pc1_dp", + "name": "PC1 \u2014 DP", + "hotkey": "ctrl+alt+1", + "monitor_inputs": [ + {"monitor_index": 0, "vcp_value": 15}, + {"monitor_index": 1, "vcp_value": 15} + ] + }, + { + "id": "pc2_hdmi", + "name": "PC2 \u2014 HDMI", + "hotkey": "ctrl+alt+2", + "monitor_inputs": [ + {"monitor_index": 0, "vcp_value": 17}, + {"monitor_index": 1, "vcp_value": 17} + ] + } + ], + "device_triggers": [ + { + "name": "HP 975 keyboard", + "bt_device_id": "VID&0203F0_PID&6343", + "on_connect": "pc1_dp", + "on_disconnect": "pc2_hdmi", + "enabled": True + }, + { + "name": "MX Anywhere 2S", + "bt_device_id": "VID&02046D_PID&B01A", + "on_connect": "pc1_dp", + "on_disconnect": "pc2_hdmi", + "enabled": True + } + ], + "general": { + "autostart": True, + "minimize_to_tray": True, + "debounce_ms": 500 + } +} + + +class ConfigManager: + def __init__(self, config_path=None): + if config_path is None: + config_path = Path(__file__).parent / "config.json" + self.config_path = Path(config_path) + self._config: dict = {} + self._callbacks: list[Callable] = [] + self.load() + + def load(self) -> None: + if self.config_path.exists(): + try: + with open(self.config_path, "r", encoding="utf-8") as f: + self._config = json.load(f) + except (json.JSONDecodeError, OSError): + self._config = copy.deepcopy(DEFAULT_CONFIG) + else: + self._config = copy.deepcopy(DEFAULT_CONFIG) + self.save() + + def save(self) -> None: + self.config_path.parent.mkdir(parents=True, exist_ok=True) + with open(self.config_path, "w", encoding="utf-8") as f: + json.dump(self._config, f, indent=2, ensure_ascii=False) + + def get(self) -> dict: + return self._config + + def update(self, new_config: dict) -> None: + self._config = new_config + self.save() + for cb in self._callbacks: + cb(self._config) + + def on_change(self, callback: Callable) -> None: + self._callbacks.append(callback) + + def get_profile(self, profile_id: str) -> dict | None: + for p in self._config.get("profiles", []): + if p["id"] == profile_id: + return p + return None + + def get_active_profile_id(self) -> str: + return self._config.get("active_profile", "pc1_dp") + + def set_active_profile(self, profile_id: str) -> None: + self._config["active_profile"] = profile_id + self.save() diff --git a/core/device_watcher.py b/core/device_watcher.py new file mode 100644 index 0000000..a21274d --- /dev/null +++ b/core/device_watcher.py @@ -0,0 +1,124 @@ +import logging +import threading + +logger = logging.getLogger(__name__) + + +class DeviceWatcher: + def __init__(self, config_manager, switcher): + self._config_manager = config_manager + self._switcher = switcher + self._stop_event = threading.Event() + self._debounce_timers: dict[str, threading.Timer] = {} + self._debounce_lock = threading.Lock() + self._thread: threading.Thread | None = None + + def start(self) -> None: + self._stop_event.clear() + self._thread = threading.Thread( + target=self._watch_loop, daemon=True, name="DeviceWatcher" + ) + self._thread.start() + logger.info("DeviceWatcher started") + + def stop(self) -> None: + self._stop_event.set() + + def _watch_loop(self) -> None: + retries = 0 + max_retries = 3 + while not self._stop_event.is_set() and retries <= max_retries: + try: + self._run_watcher() + retries = 0 + except Exception as exc: + retries += 1 + logger.error( + "WMI watcher error (attempt %d/%d): %s", retries, max_retries, exc + ) + self._stop_event.wait(5) + logger.warning("DeviceWatcher stopped after %d retries", retries) + + def _run_watcher(self) -> None: + import pythoncom + import wmi + + pythoncom.CoInitialize() + try: + c = wmi.WMI() + watcher_create = c.watch_for( + notification_type="Creation", + wmi_class="Win32_PnPEntity", + delay_secs=1, + ) + watcher_delete = c.watch_for( + notification_type="Deletion", + wmi_class="Win32_PnPEntity", + delay_secs=1, + ) + t_create = threading.Thread( + target=self._watcher_thread, + args=(watcher_create, "connect"), + daemon=True, + ) + t_delete = threading.Thread( + target=self._watcher_thread, + args=(watcher_delete, "disconnect"), + daemon=True, + ) + t_create.start() + t_delete.start() + t_create.join() + t_delete.join() + finally: + pythoncom.CoUninitialize() + + def _watcher_thread(self, watcher, event_type: str) -> None: + import wmi + + while not self._stop_event.is_set(): + try: + event = watcher(timeout_ms=1000) + if event: + self._handle_event(event.DeviceID or "", event_type) + except wmi.x_wmi_timed_out: + pass + except Exception as exc: + logger.error("Watcher thread (%s) error: %s", event_type, exc) + break + + def _handle_event(self, device_id: str, event_type: str) -> None: + config = self._config_manager.get() + for trigger in config.get("device_triggers", []): + if not trigger.get("enabled", True): + continue + if trigger["bt_device_id"] in device_id: + profile_id = ( + trigger["on_connect"] + if event_type == "connect" + else trigger["on_disconnect"] + ) + logger.info( + "Device event '%s' matched trigger '%s' -> profile '%s'", + event_type, + trigger["name"], + profile_id, + ) + self._schedule_switch(device_id + event_type, profile_id) + return + + def _schedule_switch(self, key: str, profile_id: str) -> None: + debounce_s = ( + self._config_manager.get() + .get("general", {}) + .get("debounce_ms", 500) / 1000.0 + ) + with self._debounce_lock: + existing = self._debounce_timers.get(key) + if existing: + existing.cancel() + timer = threading.Timer( + debounce_s, self._switcher.apply_profile, args=[profile_id] + ) + self._debounce_timers[key] = timer + timer.start() diff --git a/core/hotkey_manager.py b/core/hotkey_manager.py new file mode 100644 index 0000000..e2944d7 --- /dev/null +++ b/core/hotkey_manager.py @@ -0,0 +1,34 @@ +import logging +import keyboard + +logger = logging.getLogger(__name__) + + +class HotkeyManager: + def __init__(self, config_manager, switcher): + self._config_manager = config_manager + self._switcher = switcher + self._registered: list[str] = [] + + def register_all(self) -> None: + self.unregister_all() + config = self._config_manager.get() + for profile in config.get("profiles", []): + hotkey = profile.get("hotkey", "").strip() + if not hotkey: + continue + profile_id = profile["id"] + try: + keyboard.add_hotkey(hotkey, self._switcher.apply_profile, args=[profile_id]) + self._registered.append(hotkey) + logger.info("Hotkey '%s' -> profile '%s'", hotkey, profile_id) + except Exception as exc: + logger.warning("Failed to register hotkey '%s': %s", hotkey, exc) + + def unregister_all(self) -> None: + for hotkey in self._registered: + try: + keyboard.remove_hotkey(hotkey) + except Exception: + pass + self._registered.clear() diff --git a/core/switcher.py b/core/switcher.py new file mode 100644 index 0000000..771bcf5 --- /dev/null +++ b/core/switcher.py @@ -0,0 +1,54 @@ +import logging +from typing import Callable +from monitorcontrol import get_monitors + +logger = logging.getLogger(__name__) + +VCP_INPUT_SOURCE = 0x60 + + +class MonitorSwitcher: + def __init__(self, config_manager): + self._config_manager = config_manager + self._on_switch_callbacks: list[Callable[[str], None]] = [] + + def apply_profile(self, profile_id: str) -> None: + profile = self._config_manager.get_profile(profile_id) + if profile is None: + logger.warning("Profile not found: %s", profile_id) + return + + monitors = list(get_monitors()) + for input_cfg in profile["monitor_inputs"]: + idx = input_cfg["monitor_index"] + vcp_value = input_cfg["vcp_value"] + if idx >= len(monitors): + logger.warning( + "Monitor index %d out of range (have %d)", idx, len(monitors) + ) + continue + try: + with monitors[idx] as monitor: + monitor.set_vcp_feature(VCP_INPUT_SOURCE, vcp_value) + logger.info("Monitor %d -> VCP input %d", idx, vcp_value) + except Exception as exc: + logger.warning("Monitor %d DDC/CI error: %s", idx, exc) + + self._config_manager.set_active_profile(profile_id) + for cb in self._on_switch_callbacks: + cb(profile_id) + + def on_switch(self, callback: Callable[[str], None]) -> None: + self._on_switch_callbacks.append(callback) + + def list_monitors(self) -> list[dict]: + result = [] + for idx, monitor_handle in enumerate(get_monitors()): + try: + with monitor_handle as monitor: + current = monitor.get_vcp_feature(VCP_INPUT_SOURCE) + result.append({"index": idx, "current_vcp": current.value}) + except Exception as exc: + logger.warning("Cannot query monitor %d: %s", idx, exc) + result.append({"index": idx, "current_vcp": None}) + return result diff --git a/detect_vcp.py b/detect_vcp.py new file mode 100644 index 0000000..aae88bc --- /dev/null +++ b/detect_vcp.py @@ -0,0 +1,22 @@ +""" +Helper script — uruchom raz aby sprawdzic aktualne wartosci VCP wejscia monitorow. +Wyniki wpisz do config/config.json -> profiles -> monitor_inputs -> vcp_value +""" +from monitorcontrol import get_monitors + +VCP_INPUT_SOURCE = 0x60 + +print("Skanowanie monitorow (VCP code 0x60 - Input Source)...") +for i, handle in enumerate(get_monitors()): + with handle as monitor: + try: + current = monitor.get_vcp_feature(VCP_INPUT_SOURCE) + print(f" Monitor {i}: VCP input = {current.value} (0x{current.value:02X})") + except Exception as e: + print(f" Monitor {i}: blad odczytu - {e}") +print() +print("Typowe wartosci:") +print(" 0x0F (15) = DisplayPort-1") +print(" 0x10 (16) = DisplayPort-2") +print(" 0x11 (17) = HDMI-1") +print(" 0x12 (18) = HDMI-2") diff --git a/main.py b/main.py new file mode 100644 index 0000000..6a1d4c1 --- /dev/null +++ b/main.py @@ -0,0 +1,67 @@ +import sys +import queue +import threading +import logging + +from PyQt6.QtWidgets import QApplication +from PyQt6.QtCore import QTimer + +from config.config_manager import ConfigManager +from core.switcher import MonitorSwitcher +from core.device_watcher import DeviceWatcher +from core.hotkey_manager import HotkeyManager +from ui.tray import TrayApp + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(name)s %(levelname)s %(message)s", +) + + +def main() -> None: + app = QApplication(sys.argv) + app.setQuitOnLastWindowClosed(False) + + config = ConfigManager() + switcher = MonitorSwitcher(config) + watcher = DeviceWatcher(config, switcher) + hotkeys = HotkeyManager(config, switcher) + + settings_queue: queue.Queue = queue.Queue() + tray = TrayApp(config, switcher, settings_queue) + + watcher.start() + hotkeys.register_all() + + tray_thread = threading.Thread( + target=tray.run_tray, daemon=True, name="TrayThread" + ) + tray_thread.start() + + settings_window = None + + def process_queue() -> None: + nonlocal settings_window + while not settings_queue.empty(): + cmd = settings_queue.get_nowait() + if cmd == "open_settings": + from ui.settings_window import SettingsWindow + if settings_window is None or not settings_window.isVisible(): + settings_window = SettingsWindow(config, switcher, hotkeys) + settings_window.show() + settings_window.raise_() + settings_window.activateWindow() + elif cmd == "exit": + hotkeys.unregister_all() + watcher.stop() + app.quit() + + timer = QTimer() + timer.timeout.connect(process_queue) + timer.start(100) + + sys.exit(app.exec()) + + +if __name__ == "__main__": + main() diff --git a/tests/test_config_manager.py b/tests/test_config_manager.py new file mode 100644 index 0000000..d65b4d8 --- /dev/null +++ b/tests/test_config_manager.py @@ -0,0 +1,63 @@ +import json +import pytest +from pathlib import Path +from config.config_manager import ConfigManager, DEFAULT_CONFIG + + +def test_load_defaults_when_no_file(tmp_path): + cm = ConfigManager(tmp_path / "config.json") + assert cm.get()["active_profile"] == "pc1_dp" + assert len(cm.get()["profiles"]) == 2 + + +def test_saves_default_config_on_first_run(tmp_path): + path = tmp_path / "config.json" + ConfigManager(path) + assert path.exists() + data = json.loads(path.read_text()) + assert data["active_profile"] == "pc1_dp" + + +def test_load_existing_config(tmp_path): + path = tmp_path / "config.json" + custom = {"active_profile": "custom", "profiles": [], "device_triggers": [], "general": {}} + path.write_text(json.dumps(custom)) + cm = ConfigManager(path) + assert cm.get()["active_profile"] == "custom" + + +def test_fallback_on_corrupt_config(tmp_path): + path = tmp_path / "config.json" + path.write_text("not json {{{") + cm = ConfigManager(path) + assert cm.get()["active_profile"] == "pc1_dp" + + +def test_get_profile_returns_profile(tmp_path): + cm = ConfigManager(tmp_path / "config.json") + p = cm.get_profile("pc1_dp") + assert p is not None + assert p["name"] == "PC1 \u2014 DP" + + +def test_get_profile_returns_none_for_missing(tmp_path): + cm = ConfigManager(tmp_path / "config.json") + assert cm.get_profile("nonexistent") is None + + +def test_set_active_profile_persists(tmp_path): + path = tmp_path / "config.json" + cm = ConfigManager(path) + cm.set_active_profile("pc2_hdmi") + cm2 = ConfigManager(path) + assert cm2.get_active_profile_id() == "pc2_hdmi" + + +def test_on_change_callback_fires(tmp_path): + cm = ConfigManager(tmp_path / "config.json") + called_with = [] + cm.on_change(lambda cfg: called_with.append(cfg["active_profile"])) + cfg = cm.get().copy() + cfg["active_profile"] = "pc2_hdmi" + cm.update(cfg) + assert called_with == ["pc2_hdmi"] diff --git a/tests/test_device_watcher.py b/tests/test_device_watcher.py new file mode 100644 index 0000000..8f70134 --- /dev/null +++ b/tests/test_device_watcher.py @@ -0,0 +1,62 @@ +import time +from unittest.mock import MagicMock +import pytest +from config.config_manager import ConfigManager +from core.device_watcher import DeviceWatcher + + +@pytest.fixture +def config(tmp_path): + return ConfigManager(tmp_path / "config.json") + + +@pytest.fixture +def switcher(): + return MagicMock() + + +def test_handle_event_connect_triggers_correct_profile(config, switcher): + watcher = DeviceWatcher(config, switcher) + watcher._handle_event( + "HID\\{...}_DEV_VID&0203F0_PID&6343_REV&0413_AABBCCDD", "connect" + ) + time.sleep(0.6) + switcher.apply_profile.assert_called_once_with("pc1_dp") + + +def test_handle_event_disconnect_triggers_correct_profile(config, switcher): + watcher = DeviceWatcher(config, switcher) + watcher._handle_event( + "HID\\{...}_DEV_VID&0203F0_PID&6343_REV&0413_AABBCCDD", "disconnect" + ) + time.sleep(0.6) + switcher.apply_profile.assert_called_once_with("pc2_hdmi") + + +def test_handle_event_ignores_unknown_device(config, switcher): + watcher = DeviceWatcher(config, switcher) + watcher._handle_event("HID\\VID&DEAD_PID&BEEF", "connect") + time.sleep(0.6) + switcher.apply_profile.assert_not_called() + + +def test_handle_event_ignores_disabled_trigger(config, switcher): + cfg = config.get() + cfg["device_triggers"][0]["enabled"] = False + config.update(cfg) + + watcher = DeviceWatcher(config, switcher) + watcher._handle_event( + "HID\\{...}_DEV_VID&0203F0_PID&6343_REV&0413", "connect" + ) + time.sleep(0.6) + switcher.apply_profile.assert_not_called() + + +def test_debounce_collapses_rapid_events(config, switcher): + watcher = DeviceWatcher(config, switcher) + device_id = "HID\\{...}_DEV_VID&0203F0_PID&6343_REV&0413" + for _ in range(5): + watcher._handle_event(device_id, "connect") + time.sleep(0.8) + assert switcher.apply_profile.call_count == 1 diff --git a/tests/test_hotkey_manager.py b/tests/test_hotkey_manager.py new file mode 100644 index 0000000..76d4d56 --- /dev/null +++ b/tests/test_hotkey_manager.py @@ -0,0 +1,67 @@ +from unittest.mock import MagicMock, patch +import pytest +from config.config_manager import ConfigManager +from core.hotkey_manager import HotkeyManager + + +@pytest.fixture +def config(tmp_path): + return ConfigManager(tmp_path / "config.json") + + +@pytest.fixture +def switcher(): + return MagicMock() + + +def test_register_all_registers_profile_hotkeys(config, switcher): + with patch("core.hotkey_manager.keyboard") as mock_kb: + hm = HotkeyManager(config, switcher) + hm.register_all() + assert mock_kb.add_hotkey.call_count == 2 + calls = [c.args[0] for c in mock_kb.add_hotkey.call_args_list] + assert "ctrl+alt+1" in calls + assert "ctrl+alt+2" in calls + + +def test_register_all_skips_empty_hotkeys(config, switcher): + cfg = config.get() + cfg["profiles"][0]["hotkey"] = "" + config.update(cfg) + + with patch("core.hotkey_manager.keyboard") as mock_kb: + hm = HotkeyManager(config, switcher) + hm.register_all() + assert mock_kb.add_hotkey.call_count == 1 + + +def test_unregister_all_removes_hotkeys(config, switcher): + with patch("core.hotkey_manager.keyboard") as mock_kb: + hm = HotkeyManager(config, switcher) + hm.register_all() + hm.unregister_all() + assert mock_kb.remove_hotkey.call_count == 2 + + +def test_register_all_clears_old_before_registering(config, switcher): + with patch("core.hotkey_manager.keyboard") as mock_kb: + hm = HotkeyManager(config, switcher) + hm.register_all() + hm.register_all() + assert mock_kb.remove_hotkey.call_count == 2 + + +def test_hotkey_calls_apply_profile(config, switcher): + captured_callbacks = {} + + def fake_add_hotkey(hotkey, fn, args): + captured_callbacks[hotkey] = (fn, args) + + with patch("core.hotkey_manager.keyboard") as mock_kb: + mock_kb.add_hotkey.side_effect = fake_add_hotkey + hm = HotkeyManager(config, switcher) + hm.register_all() + + fn, args = captured_callbacks["ctrl+alt+1"] + fn(*args) + switcher.apply_profile.assert_called_once_with("pc1_dp") diff --git a/tests/test_switcher.py b/tests/test_switcher.py new file mode 100644 index 0000000..1d82a77 --- /dev/null +++ b/tests/test_switcher.py @@ -0,0 +1,93 @@ +from unittest.mock import MagicMock, patch +import pytest +from config.config_manager import ConfigManager +from core.switcher import MonitorSwitcher + + +@pytest.fixture +def config(tmp_path): + return ConfigManager(tmp_path / "config.json") + + +def _make_mock_monitor(): + m = MagicMock() + m.__enter__ = MagicMock(return_value=m) + m.__exit__ = MagicMock(return_value=False) + return m + + +def test_apply_profile_sends_vcp_to_both_monitors(config): + mon0 = _make_mock_monitor() + mon1 = _make_mock_monitor() + + with patch("core.switcher.get_monitors", return_value=[mon0, mon1]): + sw = MonitorSwitcher(config) + sw.apply_profile("pc1_dp") + + mon0.set_vcp_feature.assert_called_once_with(0x60, 15) + mon1.set_vcp_feature.assert_called_once_with(0x60, 15) + + +def test_apply_profile_hdmi(config): + mon0 = _make_mock_monitor() + mon1 = _make_mock_monitor() + + with patch("core.switcher.get_monitors", return_value=[mon0, mon1]): + sw = MonitorSwitcher(config) + sw.apply_profile("pc2_hdmi") + + mon0.set_vcp_feature.assert_called_once_with(0x60, 17) + mon1.set_vcp_feature.assert_called_once_with(0x60, 17) + + +def test_apply_profile_skips_missing_monitor(config): + mon0 = _make_mock_monitor() + + with patch("core.switcher.get_monitors", return_value=[mon0]): + sw = MonitorSwitcher(config) + sw.apply_profile("pc1_dp") + + mon0.set_vcp_feature.assert_called_once_with(0x60, 15) + + +def test_apply_profile_continues_on_monitor_error(config): + mon0 = _make_mock_monitor() + mon1 = _make_mock_monitor() + mon0.set_vcp_feature.side_effect = Exception("DDC/CI error") + + with patch("core.switcher.get_monitors", return_value=[mon0, mon1]): + sw = MonitorSwitcher(config) + sw.apply_profile("pc1_dp") + + mon1.set_vcp_feature.assert_called_once_with(0x60, 15) + + +def test_apply_profile_unknown_profile_does_nothing(config): + with patch("core.switcher.get_monitors", return_value=[]) as mock_gm: + sw = MonitorSwitcher(config) + sw.apply_profile("nonexistent") + mock_gm.assert_not_called() + + +def test_on_switch_callback_fires(config): + mon0 = _make_mock_monitor() + mon1 = _make_mock_monitor() + received = [] + + with patch("core.switcher.get_monitors", return_value=[mon0, mon1]): + sw = MonitorSwitcher(config) + sw.on_switch(lambda pid: received.append(pid)) + sw.apply_profile("pc2_hdmi") + + assert received == ["pc2_hdmi"] + + +def test_apply_profile_updates_active_profile(config): + mon0 = _make_mock_monitor() + mon1 = _make_mock_monitor() + + with patch("core.switcher.get_monitors", return_value=[mon0, mon1]): + sw = MonitorSwitcher(config) + sw.apply_profile("pc2_hdmi") + + assert config.get_active_profile_id() == "pc2_hdmi" diff --git a/ui/settings_window.py b/ui/settings_window.py new file mode 100644 index 0000000..119a338 --- /dev/null +++ b/ui/settings_window.py @@ -0,0 +1,342 @@ +import copy +import logging +import sys +import winreg + +from PyQt6.QtWidgets import ( + QMainWindow, QWidget, QTabWidget, QVBoxLayout, QHBoxLayout, + QPushButton, QListWidget, QListWidgetItem, QLabel, QLineEdit, + QFormLayout, QSpinBox, QCheckBox, QComboBox, QGroupBox, +) +from PyQt6.QtCore import Qt + +logger = logging.getLogger(__name__) + +VCP_INPUT_OPTIONS = [ + (15, "DisplayPort-1 (0x0F)"), + (16, "DisplayPort-2 (0x10)"), + (17, "HDMI-1 (0x11)"), + (18, "HDMI-2 (0x12)"), + (1, "VGA-1 (0x01)"), + (3, "DVI-1 (0x03)"), +] + + +class SettingsWindow(QMainWindow): + def __init__(self, config_manager, switcher, hotkey_manager): + super().__init__() + self._config_manager = config_manager + self._switcher = switcher + self._hotkey_manager = hotkey_manager + self._working_config = copy.deepcopy(config_manager.get()) + + self.setWindowTitle("MonitorSwitcher \u2014 Ustawienia") + self.setMinimumSize(640, 520) + + tabs = QTabWidget() + tabs.addTab(self._build_profiles_tab(), "Profile") + tabs.addTab(self._build_devices_tab(), "Urzadzenia BT") + tabs.addTab(self._build_hotkeys_tab(), "Hotkeys") + tabs.addTab(self._build_general_tab(), "Ogolne") + + save_btn = QPushButton("Zapisz i zamknij") + save_btn.clicked.connect(self._save_and_close) + cancel_btn = QPushButton("Anuluj") + cancel_btn.clicked.connect(self.close) + + btn_row = QHBoxLayout() + btn_row.addStretch() + btn_row.addWidget(cancel_btn) + btn_row.addWidget(save_btn) + + layout = QVBoxLayout() + layout.addWidget(tabs) + layout.addLayout(btn_row) + + central = QWidget() + central.setLayout(layout) + self.setCentralWidget(central) + + # ── Profiles tab ────────────────────────────────────────────────────────── + + def _build_profiles_tab(self) -> QWidget: + widget = QWidget() + layout = QHBoxLayout(widget) + + left = QVBoxLayout() + self._profile_list = QListWidget() + self._profile_list.currentRowChanged.connect(self._on_profile_selected) + left.addWidget(QLabel("Profile:")) + left.addWidget(self._profile_list) + + add_btn = QPushButton("+ Dodaj") + add_btn.clicked.connect(self._add_profile) + del_btn = QPushButton("Usun") + del_btn.clicked.connect(self._delete_profile) + btn_row = QHBoxLayout() + btn_row.addWidget(add_btn) + btn_row.addWidget(del_btn) + left.addLayout(btn_row) + + right = QVBoxLayout() + self._profile_name_edit = QLineEdit() + self._profile_name_edit.setPlaceholderText("Nazwa profilu") + self._profile_name_edit.textChanged.connect(self._on_profile_name_changed) + right.addWidget(QLabel("Nazwa:")) + right.addWidget(self._profile_name_edit) + + self._monitor_combos: list[QComboBox] = [] + monitor_group = QGroupBox("Wejscia monitorow") + monitor_layout = QFormLayout() + for i in range(2): + combo = QComboBox() + for vcp, label in VCP_INPUT_OPTIONS: + combo.addItem(label, vcp) + combo.currentIndexChanged.connect( + lambda _, idx=i: self._on_monitor_input_changed(idx) + ) + self._monitor_combos.append(combo) + monitor_layout.addRow(f"Monitor {i}:", combo) + monitor_group.setLayout(monitor_layout) + right.addWidget(monitor_group) + right.addStretch() + + layout.addLayout(left, 1) + layout.addLayout(right, 2) + + self._refresh_profile_list() + return widget + + def _refresh_profile_list(self) -> None: + self._profile_list.clear() + for p in self._working_config.get("profiles", []): + self._profile_list.addItem(QListWidgetItem(p["name"])) + if self._profile_list.count() > 0: + self._profile_list.setCurrentRow(0) + + def _on_profile_selected(self, row: int) -> None: + profiles = self._working_config.get("profiles", []) + if row < 0 or row >= len(profiles): + return + profile = profiles[row] + self._profile_name_edit.blockSignals(True) + self._profile_name_edit.setText(profile["name"]) + self._profile_name_edit.blockSignals(False) + for combo in self._monitor_combos: + combo.blockSignals(True) + for inp in profile.get("monitor_inputs", []): + idx = inp["monitor_index"] + vcp = inp["vcp_value"] + if idx < len(self._monitor_combos): + data_idx = self._monitor_combos[idx].findData(vcp) + self._monitor_combos[idx].setCurrentIndex( + data_idx if data_idx >= 0 else 0 + ) + for combo in self._monitor_combos: + combo.blockSignals(False) + + def _on_profile_name_changed(self, text: str) -> None: + row = self._profile_list.currentRow() + profiles = self._working_config.get("profiles", []) + if 0 <= row < len(profiles): + profiles[row]["name"] = text + self._profile_list.item(row).setText(text) + + def _on_monitor_input_changed(self, monitor_idx: int) -> None: + row = self._profile_list.currentRow() + profiles = self._working_config.get("profiles", []) + if row < 0 or row >= len(profiles): + return + vcp = self._monitor_combos[monitor_idx].currentData() + inputs = profiles[row].setdefault("monitor_inputs", []) + for inp in inputs: + if inp["monitor_index"] == monitor_idx: + inp["vcp_value"] = vcp + return + inputs.append({"monitor_index": monitor_idx, "vcp_value": vcp}) + + def _add_profile(self) -> None: + new_id = f"profile_{len(self._working_config['profiles'])}" + new_profile = { + "id": new_id, + "name": "Nowy profil", + "hotkey": "", + "monitor_inputs": [ + {"monitor_index": 0, "vcp_value": 15}, + {"monitor_index": 1, "vcp_value": 15}, + ], + } + self._working_config["profiles"].append(new_profile) + self._profile_list.addItem(QListWidgetItem(new_profile["name"])) + self._profile_list.setCurrentRow(self._profile_list.count() - 1) + + def _delete_profile(self) -> None: + row = self._profile_list.currentRow() + if row < 0: + return + self._working_config["profiles"].pop(row) + self._refresh_profile_list() + + # ── Devices tab ─────────────────────────────────────────────────────────── + + def _build_devices_tab(self) -> QWidget: + widget = QWidget() + outer = QVBoxLayout(widget) + outer.addWidget(QLabel("Urzadzenia BT wyzwalajace przelaczeanie profilu:")) + + self._devices_container = QVBoxLayout() + outer.addLayout(self._devices_container) + + add_btn = QPushButton("+ Dodaj urzadzenie") + add_btn.clicked.connect(lambda: self._add_device_row()) + outer.addWidget(add_btn) + outer.addStretch() + + for trigger in self._working_config.get("device_triggers", []): + self._add_device_row(trigger) + + return widget + + def _add_device_row(self, trigger: dict | None = None) -> None: + if trigger is None: + trigger = { + "name": "Nowe urzadzenie", + "bt_device_id": "", + "on_connect": self._first_profile_id(), + "on_disconnect": self._first_profile_id(), + "enabled": True, + } + self._working_config.setdefault("device_triggers", []).append(trigger) + + group = QGroupBox(trigger.get("name", "")) + form = QFormLayout(group) + + name_edit = QLineEdit(trigger.get("name", "")) + bt_id_edit = QLineEdit(trigger.get("bt_device_id", "")) + connect_combo = self._profile_combo(trigger.get("on_connect", "")) + disconnect_combo = self._profile_combo(trigger.get("on_disconnect", "")) + enabled_cb = QCheckBox() + enabled_cb.setChecked(trigger.get("enabled", True)) + + form.addRow("Nazwa:", name_edit) + form.addRow("BT Device ID (fragment):", bt_id_edit) + form.addRow("Przy podlaczeniu:", connect_combo) + form.addRow("Przy odlaczeniu:", disconnect_combo) + form.addRow("Aktywne:", enabled_cb) + + def update_trigger(): + trigger["name"] = name_edit.text() + trigger["bt_device_id"] = bt_id_edit.text() + trigger["on_connect"] = connect_combo.currentData() + trigger["on_disconnect"] = disconnect_combo.currentData() + trigger["enabled"] = enabled_cb.isChecked() + group.setTitle(trigger["name"]) + + name_edit.textChanged.connect(lambda _: update_trigger()) + bt_id_edit.textChanged.connect(lambda _: update_trigger()) + connect_combo.currentIndexChanged.connect(lambda _: update_trigger()) + disconnect_combo.currentIndexChanged.connect(lambda _: update_trigger()) + enabled_cb.stateChanged.connect(lambda _: update_trigger()) + + self._devices_container.addWidget(group) + + def _profile_combo(self, selected_id: str) -> QComboBox: + combo = QComboBox() + for p in self._working_config.get("profiles", []): + combo.addItem(p["name"], p["id"]) + idx = combo.findData(selected_id) + if idx >= 0: + combo.setCurrentIndex(idx) + return combo + + def _first_profile_id(self) -> str: + profiles = self._working_config.get("profiles", []) + return profiles[0]["id"] if profiles else "" + + # ── Hotkeys tab ─────────────────────────────────────────────────────────── + + def _build_hotkeys_tab(self) -> QWidget: + widget = QWidget() + layout = QVBoxLayout(widget) + layout.addWidget(QLabel("Przypisz skroty klawiszowe do profili:")) + layout.addWidget(QLabel("Format: ctrl+alt+1, win+F1, itp.")) + + form = QFormLayout() + for profile in self._working_config.get("profiles", []): + edit = QLineEdit(profile.get("hotkey", "")) + edit.setPlaceholderText("np. ctrl+alt+1") + pid = profile["id"] + edit.textChanged.connect( + lambda text, p=pid: self._on_hotkey_changed(p, text) + ) + form.addRow(f"{profile['name']}:", edit) + + layout.addLayout(form) + layout.addStretch() + return widget + + def _on_hotkey_changed(self, profile_id: str, text: str) -> None: + for p in self._working_config.get("profiles", []): + if p["id"] == profile_id: + p["hotkey"] = text.strip() + return + + # ── General tab ─────────────────────────────────────────────────────────── + + def _build_general_tab(self) -> QWidget: + widget = QWidget() + layout = QFormLayout(widget) + + self._autostart_cb = QCheckBox() + self._autostart_cb.setChecked( + self._working_config.get("general", {}).get("autostart", True) + ) + self._minimize_cb = QCheckBox() + self._minimize_cb.setChecked( + self._working_config.get("general", {}).get("minimize_to_tray", True) + ) + self._debounce_spin = QSpinBox() + self._debounce_spin.setRange(100, 5000) + self._debounce_spin.setSuffix(" ms") + self._debounce_spin.setValue( + self._working_config.get("general", {}).get("debounce_ms", 500) + ) + + layout.addRow("Autostart z Windows:", self._autostart_cb) + layout.addRow("Minimalizuj do tray przy zamknieciu:", self._minimize_cb) + layout.addRow("Debounce (opoznienie przelaczenia):", self._debounce_spin) + + return widget + + # ── Save ────────────────────────────────────────────────────────────────── + + def _save_and_close(self) -> None: + general = self._working_config.setdefault("general", {}) + general["autostart"] = self._autostart_cb.isChecked() + general["minimize_to_tray"] = self._minimize_cb.isChecked() + general["debounce_ms"] = self._debounce_spin.value() + + self._config_manager.update(self._working_config) + self._hotkey_manager.unregister_all() + self._hotkey_manager.register_all() + _set_autostart(general.get("autostart", False)) + self.close() + + +def _set_autostart(enable: bool) -> None: + key_path = r"Software\Microsoft\Windows\CurrentVersion\Run" + app_name = "MonitorSwitcher" + exe_path = sys.executable if not getattr(sys, "frozen", False) else sys.executable + try: + with winreg.OpenKey( + winreg.HKEY_CURRENT_USER, key_path, 0, winreg.KEY_SET_VALUE + ) as key: + if enable: + winreg.SetValueEx(key, app_name, 0, winreg.REG_SZ, f'"{exe_path}"') + else: + try: + winreg.DeleteValue(key, app_name) + except FileNotFoundError: + pass + except Exception as exc: + logger.warning("Autostart registry error: %s", exc) diff --git a/ui/tray.py b/ui/tray.py new file mode 100644 index 0000000..1e66b67 --- /dev/null +++ b/ui/tray.py @@ -0,0 +1,90 @@ +import queue +import logging +import pystray +from PIL import Image, ImageDraw + +logger = logging.getLogger(__name__) + +_COLORS = {"dp": "#1565C0", "hdmi": "#B71C1C", "default": "#455A64"} + + +def _make_icon(label: str, color: str) -> Image.Image: + img = Image.new("RGBA", (64, 64), (0, 0, 0, 0)) + draw = ImageDraw.Draw(img) + draw.ellipse([2, 2, 62, 62], fill=color) + text = label[:2].upper() + bbox = draw.textbbox((0, 0), text) + tw = bbox[2] - bbox[0] + th = bbox[3] - bbox[1] + draw.text(((64 - tw) // 2, (64 - th) // 2), text, fill="white") + return img + + +def _color_for(profile_id: str) -> str: + if "dp" in profile_id: + return _COLORS["dp"] + if "hdmi" in profile_id: + return _COLORS["hdmi"] + return _COLORS["default"] + + +class TrayApp: + def __init__(self, config_manager, switcher, settings_queue: queue.Queue): + self._config_manager = config_manager + self._switcher = switcher + self._settings_queue = settings_queue + self._icon: pystray.Icon | None = None + switcher.on_switch(self._on_profile_switched) + + def _build_menu(self) -> pystray.Menu: + config = self._config_manager.get() + items = [] + for profile in config.get("profiles", []): + pid = profile["id"] + name = profile["name"] + items.append( + pystray.MenuItem( + name, + lambda _, p=pid: self._switcher.apply_profile(p), + checked=lambda item, p=pid: ( + self._config_manager.get_active_profile_id() == p + ), + radio=True, + ) + ) + items.append(pystray.Menu.SEPARATOR) + items.append(pystray.MenuItem("Ustawienia", self._request_settings)) + items.append(pystray.MenuItem("Wyjscie", self._exit)) + return pystray.Menu(*items) + + def _on_profile_switched(self, profile_id: str) -> None: + if self._icon is None: + return + profile = self._config_manager.get_profile(profile_id) + label = profile["name"][:2] if profile else "??" + title = profile["name"] if profile else profile_id + self._icon.icon = _make_icon(label, _color_for(profile_id)) + self._icon.title = f"MonitorSwitcher \u2014 {title}" + self._icon.update_menu() + + def _request_settings(self, icon=None, item=None) -> None: + self._settings_queue.put("open_settings") + + def _exit(self, icon=None, item=None) -> None: + self._settings_queue.put("exit") + if self._icon: + self._icon.stop() + + def run_tray(self) -> None: + active_id = self._config_manager.get_active_profile_id() + profile = self._config_manager.get_profile(active_id) + label = profile["name"][:2] if profile else "MS" + title = f"MonitorSwitcher \u2014 {profile['name']}" if profile else "MonitorSwitcher" + + self._icon = pystray.Icon( + "MonitorSwitcher", + _make_icon(label, _color_for(active_id)), + title=title, + menu=self._build_menu(), + ) + self._icon.run()