Files
MonitorSwitcher/docs/superpowers/plans/2026-04-09-monitor-switcher.md
2026-04-09 09:58:31 +02:00

1629 lines
50 KiB
Markdown

# MonitorSwitcher Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Python system tray app that switches two Lenovo L27qe monitors between DP and HDMI inputs automatically when HP 975 keyboard or Logitech MX Anywhere 2S connects/disconnects via Bluetooth, with hotkey support and PyQt6 config UI.
**Architecture:** Profile-based config (each profile = set of monitor VCP input values). WMI event watcher detects BT device connect/disconnect and fires `MonitorSwitcher.apply_profile()`. PyQt6 runs on the main thread; pystray runs in a daemon thread and communicates back via a `queue.Queue`.
**Tech Stack:** Python 3.11+, monitorcontrol, pystray, keyboard, wmi, pywin32, PyQt6, Pillow, pytest, PyInstaller
---
## File Map
| File | Responsibility |
|---|---|
| `main.py` | Bootstrap: create components, start threads, run Qt event loop |
| `config/config_manager.py` | Load/save/validate `config.json`, callbacks on change |
| `config/config.json` | Default config (ships with app) |
| `core/switcher.py` | DDC/CI via monitorcontrol, `apply_profile()`, monitor enumeration |
| `core/device_watcher.py` | WMI BT event watcher in background thread, debounce |
| `core/hotkey_manager.py` | Global hotkey registration via `keyboard` lib |
| `ui/tray.py` | pystray icon + menu, communicates with Qt via queue |
| `ui/settings_window.py` | PyQt6 4-tab settings window |
| `tests/test_config_manager.py` | Unit tests for ConfigManager |
| `tests/test_switcher.py` | Unit tests for MonitorSwitcher (mocked monitorcontrol) |
| `tests/test_device_watcher.py` | Unit tests for DeviceWatcher (mocked WMI) |
| `tests/test_hotkey_manager.py` | Unit tests for HotkeyManager (mocked keyboard) |
| `requirements.txt` | Python dependencies |
| `MonitorSwitcher.spec` | PyInstaller spec file |
---
## Task 1: Project scaffold + requirements
**Files:**
- Create: `requirements.txt`
- Create: `config/__init__.py`
- Create: `core/__init__.py`
- Create: `ui/__init__.py`
- Create: `tests/__init__.py`
- [ ] **Step 1: Create virtual environment and requirements.txt**
```
d:/VSCode/MonitorSwitcher/requirements.txt
```
```text
monitorcontrol>=0.10.0
pystray>=0.19.5
keyboard>=0.13.5
wmi>=1.5.1
pywin32>=306
PyQt6>=6.6.0
Pillow>=10.0.0
pytest>=7.4.0
pytest-mock>=3.12.0
```
- [ ] **Step 2: Create directory structure**
Run:
```bash
cd d:/VSCode/MonitorSwitcher
python -m venv .venv
.venv/Scripts/activate
pip install -r requirements.txt
mkdir config core ui tests
```
- [ ] **Step 3: Create `__init__.py` files**
Create `config/__init__.py`, `core/__init__.py`, `ui/__init__.py`, `tests/__init__.py` — all empty files.
- [ ] **Step 4: Verify install**
Run:
```bash
python -c "import monitorcontrol, pystray, keyboard, wmi, PyQt6, PIL; print('OK')"
```
Expected: `OK`
- [ ] **Step 5: Commit**
```bash
git init
git add requirements.txt config/__init__.py core/__init__.py ui/__init__.py tests/__init__.py
git commit -m "chore: project scaffold"
```
---
## Task 2: ConfigManager
**Files:**
- Create: `config/config.json`
- Create: `config/config_manager.py`
- Create: `tests/test_config_manager.py`
- [ ] **Step 1: Write failing tests**
`tests/test_config_manager.py`:
```python
import json
import pytest
from pathlib import Path
from config.config_manager import ConfigManager, DEFAULT_CONFIG
def test_load_defaults_when_no_file(tmp_path):
cm = ConfigManager(tmp_path / "config.json")
assert cm.get()["active_profile"] == "pc1_dp"
assert len(cm.get()["profiles"]) == 2
def test_saves_default_config_on_first_run(tmp_path):
path = tmp_path / "config.json"
ConfigManager(path)
assert path.exists()
data = json.loads(path.read_text())
assert data["active_profile"] == "pc1_dp"
def test_load_existing_config(tmp_path):
path = tmp_path / "config.json"
custom = {"active_profile": "custom", "profiles": [], "device_triggers": [], "general": {}}
path.write_text(json.dumps(custom))
cm = ConfigManager(path)
assert cm.get()["active_profile"] == "custom"
def test_fallback_on_corrupt_config(tmp_path):
path = tmp_path / "config.json"
path.write_text("not json {{{")
cm = ConfigManager(path)
assert cm.get()["active_profile"] == "pc1_dp"
def test_get_profile_returns_profile(tmp_path):
cm = ConfigManager(tmp_path / "config.json")
p = cm.get_profile("pc1_dp")
assert p is not None
assert p["name"] == "PC1 — DP"
def test_get_profile_returns_none_for_missing(tmp_path):
cm = ConfigManager(tmp_path / "config.json")
assert cm.get_profile("nonexistent") is None
def test_set_active_profile_persists(tmp_path):
path = tmp_path / "config.json"
cm = ConfigManager(path)
cm.set_active_profile("pc2_hdmi")
cm2 = ConfigManager(path)
assert cm2.get_active_profile_id() == "pc2_hdmi"
def test_on_change_callback_fires(tmp_path):
cm = ConfigManager(tmp_path / "config.json")
called_with = []
cm.on_change(lambda cfg: called_with.append(cfg["active_profile"]))
cfg = cm.get().copy()
cfg["active_profile"] = "pc2_hdmi"
cm.update(cfg)
assert called_with == ["pc2_hdmi"]
```
- [ ] **Step 2: Run tests to confirm they fail**
```bash
pytest tests/test_config_manager.py -v
```
Expected: all FAIL with `ModuleNotFoundError` or `ImportError`
- [ ] **Step 3: Create `config/config.json`**
```json
{
"active_profile": "pc1_dp",
"profiles": [
{
"id": "pc1_dp",
"name": "PC1 — DP",
"hotkey": "ctrl+alt+1",
"monitor_inputs": [
{"monitor_index": 0, "vcp_value": 15},
{"monitor_index": 1, "vcp_value": 15}
]
},
{
"id": "pc2_hdmi",
"name": "PC2 — HDMI",
"hotkey": "ctrl+alt+2",
"monitor_inputs": [
{"monitor_index": 0, "vcp_value": 17},
{"monitor_index": 1, "vcp_value": 17}
]
}
],
"device_triggers": [
{
"name": "HP 975 keyboard",
"bt_device_id": "VID&0203F0_PID&6343",
"on_connect": "pc1_dp",
"on_disconnect": "pc2_hdmi",
"enabled": true
},
{
"name": "MX Anywhere 2S",
"bt_device_id": "VID&02046D_PID&B01A",
"on_connect": "pc1_dp",
"on_disconnect": "pc2_hdmi",
"enabled": true
}
],
"general": {
"autostart": true,
"minimize_to_tray": true,
"debounce_ms": 500
}
}
```
- [ ] **Step 4: Implement `config/config_manager.py`**
```python
import json
import copy
from pathlib import Path
from typing import Callable
DEFAULT_CONFIG = {
"active_profile": "pc1_dp",
"profiles": [
{
"id": "pc1_dp",
"name": "PC1 — DP",
"hotkey": "ctrl+alt+1",
"monitor_inputs": [
{"monitor_index": 0, "vcp_value": 15},
{"monitor_index": 1, "vcp_value": 15}
]
},
{
"id": "pc2_hdmi",
"name": "PC2 — HDMI",
"hotkey": "ctrl+alt+2",
"monitor_inputs": [
{"monitor_index": 0, "vcp_value": 17},
{"monitor_index": 1, "vcp_value": 17}
]
}
],
"device_triggers": [
{
"name": "HP 975 keyboard",
"bt_device_id": "VID&0203F0_PID&6343",
"on_connect": "pc1_dp",
"on_disconnect": "pc2_hdmi",
"enabled": True
},
{
"name": "MX Anywhere 2S",
"bt_device_id": "VID&02046D_PID&B01A",
"on_connect": "pc1_dp",
"on_disconnect": "pc2_hdmi",
"enabled": True
}
],
"general": {
"autostart": True,
"minimize_to_tray": True,
"debounce_ms": 500
}
}
class ConfigManager:
def __init__(self, config_path=None):
if config_path is None:
config_path = Path(__file__).parent / "config.json"
self.config_path = Path(config_path)
self._config: dict = {}
self._callbacks: list[Callable] = []
self.load()
def load(self) -> None:
if self.config_path.exists():
try:
with open(self.config_path, "r", encoding="utf-8") as f:
self._config = json.load(f)
except (json.JSONDecodeError, OSError):
self._config = copy.deepcopy(DEFAULT_CONFIG)
else:
self._config = copy.deepcopy(DEFAULT_CONFIG)
self.save()
def save(self) -> None:
self.config_path.parent.mkdir(parents=True, exist_ok=True)
with open(self.config_path, "w", encoding="utf-8") as f:
json.dump(self._config, f, indent=2, ensure_ascii=False)
def get(self) -> dict:
return self._config
def update(self, new_config: dict) -> None:
self._config = new_config
self.save()
for cb in self._callbacks:
cb(self._config)
def on_change(self, callback: Callable) -> None:
self._callbacks.append(callback)
def get_profile(self, profile_id: str) -> dict | None:
for p in self._config.get("profiles", []):
if p["id"] == profile_id:
return p
return None
def get_active_profile_id(self) -> str:
return self._config.get("active_profile", "pc1_dp")
def set_active_profile(self, profile_id: str) -> None:
self._config["active_profile"] = profile_id
self.save()
```
- [ ] **Step 5: Run tests to verify they pass**
```bash
pytest tests/test_config_manager.py -v
```
Expected: all 8 PASS
- [ ] **Step 6: Commit**
```bash
git add config/config_manager.py config/config.json tests/test_config_manager.py
git commit -m "feat: ConfigManager with load/save/callbacks"
```
---
## Task 3: MonitorSwitcher (DDC/CI)
**Files:**
- Create: `core/switcher.py`
- Create: `tests/test_switcher.py`
- [ ] **Step 1: Write failing tests**
`tests/test_switcher.py`:
```python
from unittest.mock import MagicMock, patch, call
import pytest
from config.config_manager import ConfigManager
from core.switcher import MonitorSwitcher
@pytest.fixture
def config(tmp_path):
return ConfigManager(tmp_path / "config.json")
def _make_mock_monitor(index):
m = MagicMock()
m.__enter__ = MagicMock(return_value=m)
m.__exit__ = MagicMock(return_value=False)
return m
def test_apply_profile_sends_vcp_to_both_monitors(config):
mon0 = _make_mock_monitor(0)
mon1 = _make_mock_monitor(1)
with patch("core.switcher.get_monitors", return_value=[mon0, mon1]):
sw = MonitorSwitcher(config)
sw.apply_profile("pc1_dp")
mon0.set_vcp_feature.assert_called_once_with(0x60, 15)
mon1.set_vcp_feature.assert_called_once_with(0x60, 15)
def test_apply_profile_hdmi(config):
mon0 = _make_mock_monitor(0)
mon1 = _make_mock_monitor(1)
with patch("core.switcher.get_monitors", return_value=[mon0, mon1]):
sw = MonitorSwitcher(config)
sw.apply_profile("pc2_hdmi")
mon0.set_vcp_feature.assert_called_once_with(0x60, 17)
mon1.set_vcp_feature.assert_called_once_with(0x60, 17)
def test_apply_profile_skips_missing_monitor(config):
mon0 = _make_mock_monitor(0)
# only 1 monitor, profile requires 2 — should skip monitor_index=1 silently
with patch("core.switcher.get_monitors", return_value=[mon0]):
sw = MonitorSwitcher(config)
sw.apply_profile("pc1_dp")
mon0.set_vcp_feature.assert_called_once_with(0x60, 15)
def test_apply_profile_continues_on_monitor_error(config):
mon0 = _make_mock_monitor(0)
mon1 = _make_mock_monitor(1)
mon0.set_vcp_feature.side_effect = Exception("DDC/CI error")
with patch("core.switcher.get_monitors", return_value=[mon0, mon1]):
sw = MonitorSwitcher(config)
sw.apply_profile("pc1_dp") # must not raise
mon1.set_vcp_feature.assert_called_once_with(0x60, 15)
def test_apply_profile_unknown_profile_does_nothing(config):
with patch("core.switcher.get_monitors", return_value=[]) as mock_gm:
sw = MonitorSwitcher(config)
sw.apply_profile("nonexistent")
mock_gm.assert_not_called()
def test_on_switch_callback_fires(config):
mon0 = _make_mock_monitor(0)
mon1 = _make_mock_monitor(1)
received = []
with patch("core.switcher.get_monitors", return_value=[mon0, mon1]):
sw = MonitorSwitcher(config)
sw.on_switch(lambda pid: received.append(pid))
sw.apply_profile("pc2_hdmi")
assert received == ["pc2_hdmi"]
def test_apply_profile_updates_active_profile(config):
mon0 = _make_mock_monitor(0)
mon1 = _make_mock_monitor(1)
with patch("core.switcher.get_monitors", return_value=[mon0, mon1]):
sw = MonitorSwitcher(config)
sw.apply_profile("pc2_hdmi")
assert config.get_active_profile_id() == "pc2_hdmi"
```
- [ ] **Step 2: Run tests to confirm they fail**
```bash
pytest tests/test_switcher.py -v
```
Expected: FAIL with `ImportError`
- [ ] **Step 3: Implement `core/switcher.py`**
```python
import logging
from typing import Callable
from monitorcontrol import get_monitors
logger = logging.getLogger(__name__)
VCP_INPUT_SOURCE = 0x60
class MonitorSwitcher:
def __init__(self, config_manager):
self._config_manager = config_manager
self._on_switch_callbacks: list[Callable[[str], None]] = []
def apply_profile(self, profile_id: str) -> None:
profile = self._config_manager.get_profile(profile_id)
if profile is None:
logger.warning("Profile not found: %s", profile_id)
return
monitors = list(get_monitors())
for input_cfg in profile["monitor_inputs"]:
idx = input_cfg["monitor_index"]
vcp_value = input_cfg["vcp_value"]
if idx >= len(monitors):
logger.warning(
"Monitor index %d out of range (have %d)", idx, len(monitors)
)
continue
try:
with monitors[idx] as monitor:
monitor.set_vcp_feature(VCP_INPUT_SOURCE, vcp_value)
logger.info("Monitor %d → VCP input %d", idx, vcp_value)
except Exception as exc:
logger.warning("Monitor %d DDC/CI error: %s", idx, exc)
self._config_manager.set_active_profile(profile_id)
for cb in self._on_switch_callbacks:
cb(profile_id)
def on_switch(self, callback: Callable[[str], None]) -> None:
self._on_switch_callbacks.append(callback)
def list_monitors(self) -> list[dict]:
result = []
for idx, monitor_handle in enumerate(get_monitors()):
try:
with monitor_handle as monitor:
current = monitor.get_vcp_feature(VCP_INPUT_SOURCE)
result.append({"index": idx, "current_vcp": current.value})
except Exception as exc:
logger.warning("Cannot query monitor %d: %s", idx, exc)
result.append({"index": idx, "current_vcp": None})
return result
```
- [ ] **Step 4: Run tests to verify they pass**
```bash
pytest tests/test_switcher.py -v
```
Expected: all 7 PASS
- [ ] **Step 5: Commit**
```bash
git add core/switcher.py tests/test_switcher.py
git commit -m "feat: MonitorSwitcher with DDC/CI apply_profile"
```
---
## Task 4: DeviceWatcher (WMI BT events)
**Files:**
- Create: `core/device_watcher.py`
- Create: `tests/test_device_watcher.py`
- [ ] **Step 1: Write failing tests**
`tests/test_device_watcher.py`:
```python
import time
import threading
from unittest.mock import MagicMock, patch
import pytest
from config.config_manager import ConfigManager
from core.device_watcher import DeviceWatcher
@pytest.fixture
def config(tmp_path):
return ConfigManager(tmp_path / "config.json")
@pytest.fixture
def switcher():
sw = MagicMock()
return sw
def test_handle_event_connect_triggers_correct_profile(config, switcher):
watcher = DeviceWatcher(config, switcher)
watcher._handle_event(
"HID\\{...}_DEV_VID&0203F0_PID&6343_REV&0413_AABBCCDD", "connect"
)
time.sleep(0.6) # wait for debounce (500ms)
switcher.apply_profile.assert_called_once_with("pc1_dp")
def test_handle_event_disconnect_triggers_correct_profile(config, switcher):
watcher = DeviceWatcher(config, switcher)
watcher._handle_event(
"HID\\{...}_DEV_VID&0203F0_PID&6343_REV&0413_AABBCCDD", "disconnect"
)
time.sleep(0.6)
switcher.apply_profile.assert_called_once_with("pc2_hdmi")
def test_handle_event_ignores_unknown_device(config, switcher):
watcher = DeviceWatcher(config, switcher)
watcher._handle_event("HID\\VID&DEAD_PID&BEEF", "connect")
time.sleep(0.6)
switcher.apply_profile.assert_not_called()
def test_handle_event_ignores_disabled_trigger(config, switcher):
cfg = config.get()
cfg["device_triggers"][0]["enabled"] = False
config.update(cfg)
watcher = DeviceWatcher(config, switcher)
watcher._handle_event(
"HID\\{...}_DEV_VID&0203F0_PID&6343_REV&0413", "connect"
)
time.sleep(0.6)
switcher.apply_profile.assert_not_called()
def test_debounce_collapses_rapid_events(config, switcher):
watcher = DeviceWatcher(config, switcher)
device_id = "HID\\{...}_DEV_VID&0203F0_PID&6343_REV&0413"
# Fire 5 connect events in quick succession
for _ in range(5):
watcher._handle_event(device_id, "connect")
time.sleep(0.8)
# Should fire only once
assert switcher.apply_profile.call_count == 1
```
- [ ] **Step 2: Run tests to confirm they fail**
```bash
pytest tests/test_device_watcher.py -v
```
Expected: FAIL with `ImportError`
- [ ] **Step 3: Implement `core/device_watcher.py`**
```python
import logging
import threading
import pythoncom
import wmi
logger = logging.getLogger(__name__)
class DeviceWatcher:
def __init__(self, config_manager, switcher):
self._config_manager = config_manager
self._switcher = switcher
self._stop_event = threading.Event()
self._debounce_timers: dict[str, threading.Timer] = {}
self._debounce_lock = threading.Lock()
self._thread: threading.Thread | None = None
def start(self) -> None:
self._stop_event.clear()
self._thread = threading.Thread(target=self._watch_loop, daemon=True, name="DeviceWatcher")
self._thread.start()
logger.info("DeviceWatcher started")
def stop(self) -> None:
self._stop_event.set()
def _watch_loop(self) -> None:
retries = 0
max_retries = 3
while not self._stop_event.is_set() and retries <= max_retries:
try:
self._run_watcher()
retries = 0 # reset on clean exit
except Exception as exc:
retries += 1
logger.error("WMI watcher error (attempt %d/%d): %s", retries, max_retries, exc)
self._stop_event.wait(5)
logger.warning("DeviceWatcher stopped after %d retries", retries)
def _run_watcher(self) -> None:
pythoncom.CoInitialize()
try:
c = wmi.WMI()
debounce_s = self._config_manager.get().get("general", {}).get("debounce_ms", 500) / 1000
watcher_create = c.watch_for(
notification_type="Creation",
wmi_class="Win32_PnPEntity",
delay_secs=1,
)
watcher_delete = c.watch_for(
notification_type="Deletion",
wmi_class="Win32_PnPEntity",
delay_secs=1,
)
t_create = threading.Thread(
target=self._watcher_thread,
args=(watcher_create, "connect"),
daemon=True,
)
t_delete = threading.Thread(
target=self._watcher_thread,
args=(watcher_delete, "disconnect"),
daemon=True,
)
t_create.start()
t_delete.start()
t_create.join()
t_delete.join()
finally:
pythoncom.CoUninitialize()
def _watcher_thread(self, watcher, event_type: str) -> None:
while not self._stop_event.is_set():
try:
event = watcher(timeout_ms=1000)
if event:
self._handle_event(event.DeviceID or "", event_type)
except wmi.x_wmi_timed_out:
pass
except Exception as exc:
logger.error("Watcher thread (%s) error: %s", event_type, exc)
break
def _handle_event(self, device_id: str, event_type: str) -> None:
config = self._config_manager.get()
for trigger in config.get("device_triggers", []):
if not trigger.get("enabled", True):
continue
if trigger["bt_device_id"] in device_id:
profile_id = trigger["on_connect"] if event_type == "connect" else trigger["on_disconnect"]
logger.info(
"Device %s event '%s' matched trigger '%s' → profile '%s'",
device_id, event_type, trigger["name"], profile_id,
)
self._schedule_switch(device_id + event_type, profile_id)
return
def _schedule_switch(self, key: str, profile_id: str) -> None:
debounce_s = self._config_manager.get().get("general", {}).get("debounce_ms", 500) / 1000.0
with self._debounce_lock:
existing = self._debounce_timers.get(key)
if existing:
existing.cancel()
timer = threading.Timer(debounce_s, self._switcher.apply_profile, args=[profile_id])
self._debounce_timers[key] = timer
timer.start()
```
- [ ] **Step 4: Run tests to verify they pass**
```bash
pytest tests/test_device_watcher.py -v
```
Expected: all 5 PASS
- [ ] **Step 5: Commit**
```bash
git add core/device_watcher.py tests/test_device_watcher.py
git commit -m "feat: DeviceWatcher with WMI BT event monitoring and debounce"
```
---
## Task 5: HotkeyManager
**Files:**
- Create: `core/hotkey_manager.py`
- Create: `tests/test_hotkey_manager.py`
- [ ] **Step 1: Write failing tests**
`tests/test_hotkey_manager.py`:
```python
from unittest.mock import MagicMock, patch, call
import pytest
from config.config_manager import ConfigManager
from core.hotkey_manager import HotkeyManager
@pytest.fixture
def config(tmp_path):
return ConfigManager(tmp_path / "config.json")
@pytest.fixture
def switcher():
return MagicMock()
def test_register_all_registers_profile_hotkeys(config, switcher):
with patch("core.hotkey_manager.keyboard") as mock_kb:
hm = HotkeyManager(config, switcher)
hm.register_all()
assert mock_kb.add_hotkey.call_count == 2
calls = [c.args[0] for c in mock_kb.add_hotkey.call_args_list]
assert "ctrl+alt+1" in calls
assert "ctrl+alt+2" in calls
def test_register_all_skips_empty_hotkeys(config, switcher):
cfg = config.get()
cfg["profiles"][0]["hotkey"] = ""
config.update(cfg)
with patch("core.hotkey_manager.keyboard") as mock_kb:
hm = HotkeyManager(config, switcher)
hm.register_all()
assert mock_kb.add_hotkey.call_count == 1
def test_unregister_all_removes_hotkeys(config, switcher):
with patch("core.hotkey_manager.keyboard") as mock_kb:
hm = HotkeyManager(config, switcher)
hm.register_all()
hm.unregister_all()
assert mock_kb.remove_hotkey.call_count == 2
def test_register_all_clears_old_before_registering(config, switcher):
with patch("core.hotkey_manager.keyboard") as mock_kb:
hm = HotkeyManager(config, switcher)
hm.register_all()
hm.register_all() # second call
# remove_hotkey called once (before second register)
assert mock_kb.remove_hotkey.call_count == 2
def test_hotkey_calls_apply_profile(config, switcher):
captured_callbacks = {}
def fake_add_hotkey(hotkey, fn, args):
captured_callbacks[hotkey] = (fn, args)
with patch("core.hotkey_manager.keyboard") as mock_kb:
mock_kb.add_hotkey.side_effect = fake_add_hotkey
hm = HotkeyManager(config, switcher)
hm.register_all()
fn, args = captured_callbacks["ctrl+alt+1"]
fn(*args)
switcher.apply_profile.assert_called_once_with("pc1_dp")
```
- [ ] **Step 2: Run tests to confirm they fail**
```bash
pytest tests/test_hotkey_manager.py -v
```
Expected: FAIL with `ImportError`
- [ ] **Step 3: Implement `core/hotkey_manager.py`**
```python
import logging
import keyboard
logger = logging.getLogger(__name__)
class HotkeyManager:
def __init__(self, config_manager, switcher):
self._config_manager = config_manager
self._switcher = switcher
self._registered: list[str] = []
def register_all(self) -> None:
self.unregister_all()
config = self._config_manager.get()
for profile in config.get("profiles", []):
hotkey = profile.get("hotkey", "").strip()
if not hotkey:
continue
profile_id = profile["id"]
try:
keyboard.add_hotkey(hotkey, self._switcher.apply_profile, args=[profile_id])
self._registered.append(hotkey)
logger.info("Hotkey '%s' → profile '%s'", hotkey, profile_id)
except Exception as exc:
logger.warning("Failed to register hotkey '%s': %s", hotkey, exc)
def unregister_all(self) -> None:
for hotkey in self._registered:
try:
keyboard.remove_hotkey(hotkey)
except Exception:
pass
self._registered.clear()
```
- [ ] **Step 4: Run tests to verify they pass**
```bash
pytest tests/test_hotkey_manager.py -v
```
Expected: all 5 PASS
- [ ] **Step 5: Commit**
```bash
git add core/hotkey_manager.py tests/test_hotkey_manager.py
git commit -m "feat: HotkeyManager with global hotkey registration"
```
---
## Task 6: Tray icon + main.py
**Files:**
- Create: `ui/tray.py`
- Create: `main.py`
- [ ] **Step 1: Implement `ui/tray.py`**
```python
import queue
import threading
import logging
import pystray
from PIL import Image, ImageDraw, ImageFont
logger = logging.getLogger(__name__)
def _make_icon(label: str, color: str) -> Image.Image:
img = Image.new("RGBA", (64, 64), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
draw.ellipse([2, 2, 62, 62], fill=color)
text = label[:2].upper()
# Use default font — no external font needed
bbox = draw.textbbox((0, 0), text)
tw = bbox[2] - bbox[0]
th = bbox[3] - bbox[1]
draw.text(((64 - tw) // 2, (64 - th) // 2), text, fill="white")
return img
class TrayApp:
# Colors per profile type
_COLORS = {"dp": "#1565C0", "hdmi": "#B71C1C"}
def __init__(self, config_manager, switcher, open_settings_queue: queue.Queue):
self._config_manager = config_manager
self._switcher = switcher
self._settings_queue = open_settings_queue
self._icon: pystray.Icon | None = None
switcher.on_switch(self._on_profile_switched)
def _color_for(self, profile_id: str) -> str:
return self._COLORS["dp"] if "dp" in profile_id else self._COLORS["hdmi"]
def _build_menu(self) -> pystray.Menu:
config = self._config_manager.get()
items = []
for profile in config.get("profiles", []):
pid = profile["id"]
name = profile["name"]
items.append(
pystray.MenuItem(
name,
lambda _, p=pid: self._switcher.apply_profile(p),
checked=lambda item, p=pid: self._config_manager.get_active_profile_id() == p,
radio=True,
)
)
items.append(pystray.Menu.SEPARATOR)
items.append(pystray.MenuItem("Ustawienia", self._request_settings))
items.append(pystray.MenuItem("Wyjście", self._exit))
return pystray.Menu(*items)
def _on_profile_switched(self, profile_id: str) -> None:
if self._icon is None:
return
profile = self._config_manager.get_profile(profile_id)
label = profile["name"][:2] if profile else "??"
title = profile["name"] if profile else profile_id
self._icon.icon = _make_icon(label, self._color_for(profile_id))
self._icon.title = f"MonitorSwitcher — {title}"
self._icon.update_menu()
def _request_settings(self, icon=None, item=None) -> None:
self._settings_queue.put("open_settings")
def _exit(self, icon=None, item=None) -> None:
self._settings_queue.put("exit")
if self._icon:
self._icon.stop()
def run_tray(self) -> None:
active_id = self._config_manager.get_active_profile_id()
profile = self._config_manager.get_profile(active_id)
label = profile["name"][:2] if profile else "MS"
title = f"MonitorSwitcher — {profile['name']}" if profile else "MonitorSwitcher"
self._icon = pystray.Icon(
"MonitorSwitcher",
_make_icon(label, self._color_for(active_id)),
title=title,
menu=self._build_menu(),
)
self._icon.run()
```
- [ ] **Step 2: Implement `main.py`**
```python
import sys
import queue
import threading
import logging
from PyQt6.QtWidgets import QApplication
from PyQt6.QtCore import QTimer
from config.config_manager import ConfigManager
from core.switcher import MonitorSwitcher
from core.device_watcher import DeviceWatcher
from core.hotkey_manager import HotkeyManager
from ui.tray import TrayApp
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(name)s %(levelname)s %(message)s",
)
def main() -> None:
app = QApplication(sys.argv)
app.setQuitOnLastWindowClosed(False)
config = ConfigManager()
switcher = MonitorSwitcher(config)
watcher = DeviceWatcher(config, switcher)
hotkeys = HotkeyManager(config, switcher)
settings_queue: queue.Queue = queue.Queue()
tray = TrayApp(config, switcher, settings_queue)
watcher.start()
hotkeys.register_all()
tray_thread = threading.Thread(target=tray.run_tray, daemon=True, name="TrayThread")
tray_thread.start()
settings_window = None
def process_queue() -> None:
nonlocal settings_window
while not settings_queue.empty():
cmd = settings_queue.get_nowait()
if cmd == "open_settings":
from ui.settings_window import SettingsWindow
if settings_window is None or not settings_window.isVisible():
settings_window = SettingsWindow(config, switcher, hotkeys)
settings_window.show()
settings_window.raise_()
settings_window.activateWindow()
elif cmd == "exit":
hotkeys.unregister_all()
watcher.stop()
app.quit()
timer = QTimer()
timer.timeout.connect(process_queue)
timer.start(100) # poll queue every 100ms
sys.exit(app.exec())
if __name__ == "__main__":
main()
```
- [ ] **Step 3: Smoke test — run the app**
```bash
python main.py
```
Expected: tray icon appears in system tray, right-click shows menu with profiles + Ustawienia + Wyjście. No errors in console.
- [ ] **Step 4: Commit**
```bash
git add ui/tray.py main.py
git commit -m "feat: tray icon + main orchestration"
```
---
## Task 7: Settings Window — shell + Profiles tab
**Files:**
- Create: `ui/settings_window.py`
- [ ] **Step 1: Implement settings window shell + Profiles tab**
`ui/settings_window.py`:
```python
import copy
import logging
from PyQt6.QtWidgets import (
QMainWindow, QWidget, QTabWidget, QVBoxLayout, QHBoxLayout,
QPushButton, QListWidget, QListWidgetItem, QLabel, QLineEdit,
QFormLayout, QSpinBox, QCheckBox, QComboBox, QGroupBox,
QMessageBox, QDialog, QDialogButtonBox,
)
from PyQt6.QtCore import Qt
logger = logging.getLogger(__name__)
VCP_INPUT_LABELS = {
15: "DisplayPort-1 (0x0F)",
16: "DisplayPort-2 (0x10)",
17: "HDMI-1 (0x11)",
18: "HDMI-2 (0x12)",
1: "VGA-1 (0x01)",
}
class SettingsWindow(QMainWindow):
def __init__(self, config_manager, switcher, hotkey_manager):
super().__init__()
self._config_manager = config_manager
self._switcher = switcher
self._hotkey_manager = hotkey_manager
self._working_config = copy.deepcopy(config_manager.get())
self.setWindowTitle("MonitorSwitcher — Ustawienia")
self.setMinimumSize(600, 500)
tabs = QTabWidget()
tabs.addTab(self._build_profiles_tab(), "Profile")
tabs.addTab(self._build_devices_tab(), "Urządzenia")
tabs.addTab(self._build_hotkeys_tab(), "Hotkeys")
tabs.addTab(self._build_general_tab(), "Ogólne")
save_btn = QPushButton("Zapisz i zamknij")
save_btn.clicked.connect(self._save_and_close)
cancel_btn = QPushButton("Anuluj")
cancel_btn.clicked.connect(self.close)
btn_row = QHBoxLayout()
btn_row.addStretch()
btn_row.addWidget(cancel_btn)
btn_row.addWidget(save_btn)
layout = QVBoxLayout()
layout.addWidget(tabs)
layout.addLayout(btn_row)
central = QWidget()
central.setLayout(layout)
self.setCentralWidget(central)
# ── Profiles tab ──────────────────────────────────────────────────────────
def _build_profiles_tab(self) -> QWidget:
widget = QWidget()
layout = QHBoxLayout(widget)
# Left: profile list
left = QVBoxLayout()
self._profile_list = QListWidget()
self._profile_list.currentRowChanged.connect(self._on_profile_selected)
left.addWidget(QLabel("Profile:"))
left.addWidget(self._profile_list)
add_btn = QPushButton("+ Dodaj")
add_btn.clicked.connect(self._add_profile)
del_btn = QPushButton("Usuń")
del_btn.clicked.connect(self._delete_profile)
btn_row = QHBoxLayout()
btn_row.addWidget(add_btn)
btn_row.addWidget(del_btn)
left.addLayout(btn_row)
# Right: profile editor
right = QVBoxLayout()
self._profile_name_edit = QLineEdit()
self._profile_name_edit.setPlaceholderText("Nazwa profilu")
self._profile_name_edit.textChanged.connect(self._on_profile_name_changed)
right.addWidget(QLabel("Nazwa:"))
right.addWidget(self._profile_name_edit)
self._monitor_combos: list[QComboBox] = []
self._monitor_group = QGroupBox("Wejścia monitorów")
monitor_layout = QFormLayout()
for i in range(2):
combo = QComboBox()
for vcp, label in VCP_INPUT_LABELS.items():
combo.addItem(label, vcp)
combo.currentIndexChanged.connect(lambda _, idx=i: self._on_monitor_input_changed(idx))
self._monitor_combos.append(combo)
monitor_layout.addRow(f"Monitor {i}:", combo)
self._monitor_group.setLayout(monitor_layout)
right.addWidget(self._monitor_group)
right.addStretch()
layout.addLayout(left, 1)
layout.addLayout(right, 2)
self._refresh_profile_list()
return widget
def _refresh_profile_list(self) -> None:
self._profile_list.clear()
for p in self._working_config.get("profiles", []):
self._profile_list.addItem(QListWidgetItem(p["name"]))
if self._profile_list.count() > 0:
self._profile_list.setCurrentRow(0)
def _on_profile_selected(self, row: int) -> None:
profiles = self._working_config.get("profiles", [])
if row < 0 or row >= len(profiles):
return
profile = profiles[row]
self._profile_name_edit.blockSignals(True)
self._profile_name_edit.setText(profile["name"])
self._profile_name_edit.blockSignals(False)
for combo_widget in self._monitor_combos:
combo_widget.blockSignals(True)
for inp in profile.get("monitor_inputs", []):
idx = inp["monitor_index"]
vcp = inp["vcp_value"]
if idx < len(self._monitor_combos):
combo = self._monitor_combos[idx]
data_idx = combo.findData(vcp)
combo.setCurrentIndex(data_idx if data_idx >= 0 else 0)
for combo_widget in self._monitor_combos:
combo_widget.blockSignals(False)
def _on_profile_name_changed(self, text: str) -> None:
row = self._profile_list.currentRow()
profiles = self._working_config.get("profiles", [])
if 0 <= row < len(profiles):
profiles[row]["name"] = text
self._profile_list.item(row).setText(text)
def _on_monitor_input_changed(self, monitor_idx: int) -> None:
row = self._profile_list.currentRow()
profiles = self._working_config.get("profiles", [])
if row < 0 or row >= len(profiles):
return
vcp = self._monitor_combos[monitor_idx].currentData()
inputs = profiles[row].get("monitor_inputs", [])
for inp in inputs:
if inp["monitor_index"] == monitor_idx:
inp["vcp_value"] = vcp
return
inputs.append({"monitor_index": monitor_idx, "vcp_value": vcp})
def _add_profile(self) -> None:
new_profile = {
"id": f"profile_{len(self._working_config['profiles'])}",
"name": "Nowy profil",
"hotkey": "",
"monitor_inputs": [
{"monitor_index": 0, "vcp_value": 15},
{"monitor_index": 1, "vcp_value": 15},
],
}
self._working_config["profiles"].append(new_profile)
self._profile_list.addItem(QListWidgetItem(new_profile["name"]))
self._profile_list.setCurrentRow(self._profile_list.count() - 1)
def _delete_profile(self) -> None:
row = self._profile_list.currentRow()
if row < 0:
return
self._working_config["profiles"].pop(row)
self._refresh_profile_list()
# ── Devices tab ───────────────────────────────────────────────────────────
def _build_devices_tab(self) -> QWidget:
widget = QWidget()
layout = QVBoxLayout(widget)
layout.addWidget(QLabel("Urządzenia BT wyzwalające przełączenie profilu:"))
self._device_rows: list[dict] = []
self._devices_container = QVBoxLayout()
layout.addLayout(self._devices_container)
add_btn = QPushButton("+ Dodaj urządzenie")
add_btn.clicked.connect(self._add_device_row)
layout.addWidget(add_btn)
layout.addStretch()
for trigger in self._working_config.get("device_triggers", []):
self._add_device_row(trigger)
return widget
def _add_device_row(self, trigger: dict | None = None) -> None:
if trigger is None:
trigger = {
"name": "Nowe urządzenie",
"bt_device_id": "",
"on_connect": self._first_profile_id(),
"on_disconnect": self._first_profile_id(),
"enabled": True,
}
self._working_config.setdefault("device_triggers", []).append(trigger)
group = QGroupBox()
form = QFormLayout(group)
name_edit = QLineEdit(trigger.get("name", ""))
bt_id_edit = QLineEdit(trigger.get("bt_device_id", ""))
connect_combo = self._profile_combo(trigger.get("on_connect", ""))
disconnect_combo = self._profile_combo(trigger.get("on_disconnect", ""))
enabled_cb = QCheckBox()
enabled_cb.setChecked(trigger.get("enabled", True))
form.addRow("Nazwa:", name_edit)
form.addRow("BT Device ID (fragment):", bt_id_edit)
form.addRow("Przy podłączeniu:", connect_combo)
form.addRow("Przy odłączeniu:", disconnect_combo)
form.addRow("Aktywne:", enabled_cb)
row_ref = {"trigger": trigger, "group": group}
self._device_rows.append(row_ref)
def update_trigger():
trigger["name"] = name_edit.text()
trigger["bt_device_id"] = bt_id_edit.text()
trigger["on_connect"] = connect_combo.currentData()
trigger["on_disconnect"] = disconnect_combo.currentData()
trigger["enabled"] = enabled_cb.isChecked()
name_edit.textChanged.connect(lambda _: update_trigger())
bt_id_edit.textChanged.connect(lambda _: update_trigger())
connect_combo.currentIndexChanged.connect(lambda _: update_trigger())
disconnect_combo.currentIndexChanged.connect(lambda _: update_trigger())
enabled_cb.stateChanged.connect(lambda _: update_trigger())
self._devices_container.addWidget(group)
def _profile_combo(self, selected_id: str) -> QComboBox:
combo = QComboBox()
for p in self._working_config.get("profiles", []):
combo.addItem(p["name"], p["id"])
idx = combo.findData(selected_id)
if idx >= 0:
combo.setCurrentIndex(idx)
return combo
def _first_profile_id(self) -> str:
profiles = self._working_config.get("profiles", [])
return profiles[0]["id"] if profiles else ""
# ── Hotkeys tab ───────────────────────────────────────────────────────────
def _build_hotkeys_tab(self) -> QWidget:
widget = QWidget()
layout = QVBoxLayout(widget)
layout.addWidget(QLabel("Przypisz skróty klawiszowe do profili:"))
layout.addWidget(QLabel("Format: ctrl+alt+1, win+F1, itp."))
self._hotkey_edits: dict[str, QLineEdit] = {}
form = QFormLayout()
for profile in self._working_config.get("profiles", []):
edit = QLineEdit(profile.get("hotkey", ""))
edit.setPlaceholderText("np. ctrl+alt+1")
pid = profile["id"]
edit.textChanged.connect(lambda text, p=pid: self._on_hotkey_changed(p, text))
self._hotkey_edits[pid] = edit
form.addRow(f"{profile['name']}:", edit)
layout.addLayout(form)
layout.addStretch()
return widget
def _on_hotkey_changed(self, profile_id: str, text: str) -> None:
for p in self._working_config.get("profiles", []):
if p["id"] == profile_id:
p["hotkey"] = text.strip()
return
# ── General tab ───────────────────────────────────────────────────────────
def _build_general_tab(self) -> QWidget:
widget = QWidget()
layout = QFormLayout(widget)
self._autostart_cb = QCheckBox()
self._autostart_cb.setChecked(
self._working_config.get("general", {}).get("autostart", True)
)
self._minimize_cb = QCheckBox()
self._minimize_cb.setChecked(
self._working_config.get("general", {}).get("minimize_to_tray", True)
)
self._debounce_spin = QSpinBox()
self._debounce_spin.setRange(100, 5000)
self._debounce_spin.setSuffix(" ms")
self._debounce_spin.setValue(
self._working_config.get("general", {}).get("debounce_ms", 500)
)
layout.addRow("Autostart z Windows:", self._autostart_cb)
layout.addRow("Minimalizuj do tray przy zamknięciu:", self._minimize_cb)
layout.addRow("Debounce (opóźnienie przełączenia):", self._debounce_spin)
return widget
# ── Save ──────────────────────────────────────────────────────────────────
def _save_and_close(self) -> None:
general = self._working_config.setdefault("general", {})
general["autostart"] = self._autostart_cb.isChecked()
general["minimize_to_tray"] = self._minimize_cb.isChecked()
general["debounce_ms"] = self._debounce_spin.value()
self._config_manager.update(self._working_config)
self._hotkey_manager.unregister_all()
self._hotkey_manager.register_all()
if general.get("autostart"):
_set_autostart(True)
else:
_set_autostart(False)
self.close()
def closeEvent(self, event):
if self._config_manager.get().get("general", {}).get("minimize_to_tray", True):
event.accept()
else:
event.accept()
# ── Autostart helper ──────────────────────────────────────────────────────────
def _set_autostart(enable: bool) -> None:
import sys
import winreg
key_path = r"Software\Microsoft\Windows\CurrentVersion\Run"
app_name = "MonitorSwitcher"
exe_path = sys.executable if not getattr(sys, "frozen", False) else sys.executable
try:
with winreg.OpenKey(winreg.HKEY_CURRENT_USER, key_path, 0, winreg.KEY_SET_VALUE) as key:
if enable:
winreg.SetValueEx(key, app_name, 0, winreg.REG_SZ, f'"{exe_path}"')
else:
try:
winreg.DeleteValue(key, app_name)
except FileNotFoundError:
pass
except Exception as exc:
logger.warning("Autostart registry error: %s", exc)
```
- [ ] **Step 2: Smoke test — open settings window**
```bash
python main.py
```
Right-click tray icon → "Ustawienia". Expected: window opens with 4 tabs, Profile tab shows 2 profiles (PC1 — DP, PC2 — HDMI) with monitor dropdowns.
- [ ] **Step 3: Commit**
```bash
git add ui/settings_window.py
git commit -m "feat: settings window with Profiles/Devices/Hotkeys/General tabs"
```
---
## Task 8: Run all tests + fix VCP value detection
**Files:**
- Modify: `core/switcher.py` — verify `list_monitors()` works live
- [ ] **Step 1: Run full test suite**
```bash
pytest tests/ -v
```
Expected: all tests PASS (20 total across 4 test files)
- [ ] **Step 2: Detect actual VCP input values from monitors**
Run this one-off script to confirm which VCP values the L27qe monitors use for DP and HDMI:
```python
# run_once: detect_vcp.py
from monitorcontrol import get_monitors
for i, handle in enumerate(get_monitors()):
with handle as monitor:
try:
current = monitor.get_vcp_feature(0x60)
print(f"Monitor {i}: current input VCP = {current.value} (0x{current.value:02X})")
except Exception as e:
print(f"Monitor {i}: error — {e}")
```
Run:
```bash
python detect_vcp.py
```
Expected output (typical for L27qe):
```
Monitor 0: current input VCP = 15 (0x0F) ← DP active
Monitor 1: current input VCP = 15 (0x0F) ← DP active
```
If values differ from 15/17, update `config/config.json` defaults accordingly.
- [ ] **Step 3: Create `detect_vcp.py` as a helper tool**
```python
# detect_vcp.py — helper to find VCP input values on this machine
from monitorcontrol import get_monitors
print("Scanning monitors for current input VCP value (0x60)...")
for i, handle in enumerate(get_monitors()):
with handle as monitor:
try:
current = monitor.get_vcp_feature(0x60)
print(f" Monitor {i}: VCP input = {current.value} (0x{current.value:02X})")
except Exception as e:
print(f" Monitor {i}: could not read — {e}")
print("Done. Use these values in config.json → profiles → monitor_inputs → vcp_value")
```
- [ ] **Step 4: Commit**
```bash
git add detect_vcp.py
git commit -m "chore: add detect_vcp.py helper, all tests passing"
```
---
## Task 9: PyInstaller packaging
**Files:**
- Create: `MonitorSwitcher.spec`
- [ ] **Step 1: Install PyInstaller**
```bash
pip install pyinstaller
```
- [ ] **Step 2: Create `MonitorSwitcher.spec`**
```python
# MonitorSwitcher.spec
block_cipher = None
a = Analysis(
['main.py'],
pathex=['.'],
binaries=[],
datas=[('config/config.json', 'config')],
hiddenimports=['wmi', 'pythoncom', 'pywintypes', 'win32api', 'win32con', 'winreg'],
hookspath=[],
runtime_hooks=[],
excludes=[],
win_no_prefer_redirects=False,
win_private_assemblies=False,
cipher=block_cipher,
)
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.zipfiles,
a.datas,
[],
name='MonitorSwitcher',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=False,
upx_exclude=[],
runtime_tmpdir=None,
console=False, # no console window
icon=None, # add .ico path here if desired
)
```
- [ ] **Step 3: Build**
```bash
pyinstaller MonitorSwitcher.spec
```
Expected: `dist/MonitorSwitcher.exe` created
- [ ] **Step 4: Test the exe**
```bash
dist/MonitorSwitcher.exe
```
Expected: tray icon appears, settings window opens, no console window, no errors.
- [ ] **Step 5: Commit**
```bash
git add MonitorSwitcher.spec detect_vcp.py
git add -f dist/MonitorSwitcher.exe # optional — if distributing exe via git
git commit -m "build: PyInstaller spec, produces dist/MonitorSwitcher.exe"
```
---
## Self-Review
### Spec coverage check
| Spec requirement | Task |
|---|---|
| Switch both L27qe monitors via DDC/CI VCP 0x60 | Task 3 |
| Profile-based config (DP/HDMI inputs per monitor) | Task 2, 3 |
| WMI BT device connect/disconnect detection | Task 4 |
| Debounce rapid events | Task 4 |
| WMI watcher retry on crash (max 3, 5s backoff) | Task 4 (`_watch_loop`) |
| HP 975 keyboard trigger | Task 2 (config defaults) |
| MX Anywhere 2S trigger | Task 2 (config defaults) |
| Global hotkeys per profile | Task 5 |
| System tray icon (changes color/label per profile) | Task 6 |
| Tray menu: profiles + Settings + Exit | Task 6 |
| PyQt6 settings window with 4 tabs | Task 7 |
| Profiles tab: create/edit/delete, per-monitor input | Task 7 |
| Devices tab: BT triggers, connect/disconnect → profile | Task 7 |
| Hotkeys tab: per-profile hotkey assignment | Task 7 |
| General tab: autostart, minimize to tray, debounce | Task 7 |
| Autostart via Windows registry | Task 7 (`_set_autostart`) |
| Config falls back to defaults on corrupt file | Task 2 |
| Packaging to single .exe via PyInstaller | Task 9 |
All requirements covered. ✅
### Type consistency check
- `ConfigManager.get_profile(id)` returns `dict | None` — used in `MonitorSwitcher.apply_profile()`
- `MonitorSwitcher.apply_profile(profile_id: str)` — called consistently in DeviceWatcher, HotkeyManager, TrayApp ✅
- `TrayApp` receives `queue.Queue``main.py` creates and passes it ✅
- `SettingsWindow` receives `config_manager, switcher, hotkey_manager``main.py` passes all three ✅