Files
MonitorSwitcher/docs/superpowers/plans/2026-04-09-monitor-switcher.md
2026-04-09 09:58:31 +02:00

50 KiB

MonitorSwitcher Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Python system tray app that switches two Lenovo L27qe monitors between DP and HDMI inputs automatically when HP 975 keyboard or Logitech MX Anywhere 2S connects/disconnects via Bluetooth, with hotkey support and PyQt6 config UI.

Architecture: Profile-based config (each profile = set of monitor VCP input values). WMI event watcher detects BT device connect/disconnect and fires MonitorSwitcher.apply_profile(). PyQt6 runs on the main thread; pystray runs in a daemon thread and communicates back via a queue.Queue.

Tech Stack: Python 3.11+, monitorcontrol, pystray, keyboard, wmi, pywin32, PyQt6, Pillow, pytest, PyInstaller


File Map

File Responsibility
main.py Bootstrap: create components, start threads, run Qt event loop
config/config_manager.py Load/save/validate config.json, callbacks on change
config/config.json Default config (ships with app)
core/switcher.py DDC/CI via monitorcontrol, apply_profile(), monitor enumeration
core/device_watcher.py WMI BT event watcher in background thread, debounce
core/hotkey_manager.py Global hotkey registration via keyboard lib
ui/tray.py pystray icon + menu, communicates with Qt via queue
ui/settings_window.py PyQt6 4-tab settings window
tests/test_config_manager.py Unit tests for ConfigManager
tests/test_switcher.py Unit tests for MonitorSwitcher (mocked monitorcontrol)
tests/test_device_watcher.py Unit tests for DeviceWatcher (mocked WMI)
tests/test_hotkey_manager.py Unit tests for HotkeyManager (mocked keyboard)
requirements.txt Python dependencies
MonitorSwitcher.spec PyInstaller spec file

Task 1: Project scaffold + requirements

Files:

  • Create: requirements.txt

  • Create: config/__init__.py

  • Create: core/__init__.py

  • Create: ui/__init__.py

  • Create: tests/__init__.py

  • Step 1: Create virtual environment and requirements.txt

d:/VSCode/MonitorSwitcher/requirements.txt
monitorcontrol>=0.10.0
pystray>=0.19.5
keyboard>=0.13.5
wmi>=1.5.1
pywin32>=306
PyQt6>=6.6.0
Pillow>=10.0.0
pytest>=7.4.0
pytest-mock>=3.12.0
  • Step 2: Create directory structure

Run:

cd d:/VSCode/MonitorSwitcher
python -m venv .venv
.venv/Scripts/activate
pip install -r requirements.txt
mkdir config core ui tests
  • Step 3: Create __init__.py files

Create config/__init__.py, core/__init__.py, ui/__init__.py, tests/__init__.py — all empty files.

  • Step 4: Verify install

Run:

python -c "import monitorcontrol, pystray, keyboard, wmi, PyQt6, PIL; print('OK')"

Expected: OK

  • Step 5: Commit
git init
git add requirements.txt config/__init__.py core/__init__.py ui/__init__.py tests/__init__.py
git commit -m "chore: project scaffold"

Task 2: ConfigManager

Files:

  • Create: config/config.json

  • Create: config/config_manager.py

  • Create: tests/test_config_manager.py

  • Step 1: Write failing tests

tests/test_config_manager.py:

import json
import pytest
from pathlib import Path
from config.config_manager import ConfigManager, DEFAULT_CONFIG


def test_load_defaults_when_no_file(tmp_path):
    cm = ConfigManager(tmp_path / "config.json")
    assert cm.get()["active_profile"] == "pc1_dp"
    assert len(cm.get()["profiles"]) == 2


def test_saves_default_config_on_first_run(tmp_path):
    path = tmp_path / "config.json"
    ConfigManager(path)
    assert path.exists()
    data = json.loads(path.read_text())
    assert data["active_profile"] == "pc1_dp"


def test_load_existing_config(tmp_path):
    path = tmp_path / "config.json"
    custom = {"active_profile": "custom", "profiles": [], "device_triggers": [], "general": {}}
    path.write_text(json.dumps(custom))
    cm = ConfigManager(path)
    assert cm.get()["active_profile"] == "custom"


def test_fallback_on_corrupt_config(tmp_path):
    path = tmp_path / "config.json"
    path.write_text("not json {{{")
    cm = ConfigManager(path)
    assert cm.get()["active_profile"] == "pc1_dp"


def test_get_profile_returns_profile(tmp_path):
    cm = ConfigManager(tmp_path / "config.json")
    p = cm.get_profile("pc1_dp")
    assert p is not None
    assert p["name"] == "PC1 — DP"


def test_get_profile_returns_none_for_missing(tmp_path):
    cm = ConfigManager(tmp_path / "config.json")
    assert cm.get_profile("nonexistent") is None


def test_set_active_profile_persists(tmp_path):
    path = tmp_path / "config.json"
    cm = ConfigManager(path)
    cm.set_active_profile("pc2_hdmi")
    cm2 = ConfigManager(path)
    assert cm2.get_active_profile_id() == "pc2_hdmi"


def test_on_change_callback_fires(tmp_path):
    cm = ConfigManager(tmp_path / "config.json")
    called_with = []
    cm.on_change(lambda cfg: called_with.append(cfg["active_profile"]))
    cfg = cm.get().copy()
    cfg["active_profile"] = "pc2_hdmi"
    cm.update(cfg)
    assert called_with == ["pc2_hdmi"]
  • Step 2: Run tests to confirm they fail
pytest tests/test_config_manager.py -v

Expected: all FAIL with ModuleNotFoundError or ImportError

  • Step 3: Create config/config.json
{
  "active_profile": "pc1_dp",
  "profiles": [
    {
      "id": "pc1_dp",
      "name": "PC1 — DP",
      "hotkey": "ctrl+alt+1",
      "monitor_inputs": [
        {"monitor_index": 0, "vcp_value": 15},
        {"monitor_index": 1, "vcp_value": 15}
      ]
    },
    {
      "id": "pc2_hdmi",
      "name": "PC2 — HDMI",
      "hotkey": "ctrl+alt+2",
      "monitor_inputs": [
        {"monitor_index": 0, "vcp_value": 17},
        {"monitor_index": 1, "vcp_value": 17}
      ]
    }
  ],
  "device_triggers": [
    {
      "name": "HP 975 keyboard",
      "bt_device_id": "VID&0203F0_PID&6343",
      "on_connect": "pc1_dp",
      "on_disconnect": "pc2_hdmi",
      "enabled": true
    },
    {
      "name": "MX Anywhere 2S",
      "bt_device_id": "VID&02046D_PID&B01A",
      "on_connect": "pc1_dp",
      "on_disconnect": "pc2_hdmi",
      "enabled": true
    }
  ],
  "general": {
    "autostart": true,
    "minimize_to_tray": true,
    "debounce_ms": 500
  }
}
  • Step 4: Implement config/config_manager.py
import json
import copy
from pathlib import Path
from typing import Callable

DEFAULT_CONFIG = {
    "active_profile": "pc1_dp",
    "profiles": [
        {
            "id": "pc1_dp",
            "name": "PC1 — DP",
            "hotkey": "ctrl+alt+1",
            "monitor_inputs": [
                {"monitor_index": 0, "vcp_value": 15},
                {"monitor_index": 1, "vcp_value": 15}
            ]
        },
        {
            "id": "pc2_hdmi",
            "name": "PC2 — HDMI",
            "hotkey": "ctrl+alt+2",
            "monitor_inputs": [
                {"monitor_index": 0, "vcp_value": 17},
                {"monitor_index": 1, "vcp_value": 17}
            ]
        }
    ],
    "device_triggers": [
        {
            "name": "HP 975 keyboard",
            "bt_device_id": "VID&0203F0_PID&6343",
            "on_connect": "pc1_dp",
            "on_disconnect": "pc2_hdmi",
            "enabled": True
        },
        {
            "name": "MX Anywhere 2S",
            "bt_device_id": "VID&02046D_PID&B01A",
            "on_connect": "pc1_dp",
            "on_disconnect": "pc2_hdmi",
            "enabled": True
        }
    ],
    "general": {
        "autostart": True,
        "minimize_to_tray": True,
        "debounce_ms": 500
    }
}


class ConfigManager:
    def __init__(self, config_path=None):
        if config_path is None:
            config_path = Path(__file__).parent / "config.json"
        self.config_path = Path(config_path)
        self._config: dict = {}
        self._callbacks: list[Callable] = []
        self.load()

    def load(self) -> None:
        if self.config_path.exists():
            try:
                with open(self.config_path, "r", encoding="utf-8") as f:
                    self._config = json.load(f)
            except (json.JSONDecodeError, OSError):
                self._config = copy.deepcopy(DEFAULT_CONFIG)
        else:
            self._config = copy.deepcopy(DEFAULT_CONFIG)
            self.save()

    def save(self) -> None:
        self.config_path.parent.mkdir(parents=True, exist_ok=True)
        with open(self.config_path, "w", encoding="utf-8") as f:
            json.dump(self._config, f, indent=2, ensure_ascii=False)

    def get(self) -> dict:
        return self._config

    def update(self, new_config: dict) -> None:
        self._config = new_config
        self.save()
        for cb in self._callbacks:
            cb(self._config)

    def on_change(self, callback: Callable) -> None:
        self._callbacks.append(callback)

    def get_profile(self, profile_id: str) -> dict | None:
        for p in self._config.get("profiles", []):
            if p["id"] == profile_id:
                return p
        return None

    def get_active_profile_id(self) -> str:
        return self._config.get("active_profile", "pc1_dp")

    def set_active_profile(self, profile_id: str) -> None:
        self._config["active_profile"] = profile_id
        self.save()
  • Step 5: Run tests to verify they pass
pytest tests/test_config_manager.py -v

Expected: all 8 PASS

  • Step 6: Commit
git add config/config_manager.py config/config.json tests/test_config_manager.py
git commit -m "feat: ConfigManager with load/save/callbacks"

Task 3: MonitorSwitcher (DDC/CI)

Files:

  • Create: core/switcher.py

  • Create: tests/test_switcher.py

  • Step 1: Write failing tests

tests/test_switcher.py:

from unittest.mock import MagicMock, patch, call
import pytest
from config.config_manager import ConfigManager
from core.switcher import MonitorSwitcher


@pytest.fixture
def config(tmp_path):
    return ConfigManager(tmp_path / "config.json")


def _make_mock_monitor(index):
    m = MagicMock()
    m.__enter__ = MagicMock(return_value=m)
    m.__exit__ = MagicMock(return_value=False)
    return m


def test_apply_profile_sends_vcp_to_both_monitors(config):
    mon0 = _make_mock_monitor(0)
    mon1 = _make_mock_monitor(1)

    with patch("core.switcher.get_monitors", return_value=[mon0, mon1]):
        sw = MonitorSwitcher(config)
        sw.apply_profile("pc1_dp")

    mon0.set_vcp_feature.assert_called_once_with(0x60, 15)
    mon1.set_vcp_feature.assert_called_once_with(0x60, 15)


def test_apply_profile_hdmi(config):
    mon0 = _make_mock_monitor(0)
    mon1 = _make_mock_monitor(1)

    with patch("core.switcher.get_monitors", return_value=[mon0, mon1]):
        sw = MonitorSwitcher(config)
        sw.apply_profile("pc2_hdmi")

    mon0.set_vcp_feature.assert_called_once_with(0x60, 17)
    mon1.set_vcp_feature.assert_called_once_with(0x60, 17)


def test_apply_profile_skips_missing_monitor(config):
    mon0 = _make_mock_monitor(0)
    # only 1 monitor, profile requires 2 — should skip monitor_index=1 silently

    with patch("core.switcher.get_monitors", return_value=[mon0]):
        sw = MonitorSwitcher(config)
        sw.apply_profile("pc1_dp")

    mon0.set_vcp_feature.assert_called_once_with(0x60, 15)


def test_apply_profile_continues_on_monitor_error(config):
    mon0 = _make_mock_monitor(0)
    mon1 = _make_mock_monitor(1)
    mon0.set_vcp_feature.side_effect = Exception("DDC/CI error")

    with patch("core.switcher.get_monitors", return_value=[mon0, mon1]):
        sw = MonitorSwitcher(config)
        sw.apply_profile("pc1_dp")  # must not raise

    mon1.set_vcp_feature.assert_called_once_with(0x60, 15)


def test_apply_profile_unknown_profile_does_nothing(config):
    with patch("core.switcher.get_monitors", return_value=[]) as mock_gm:
        sw = MonitorSwitcher(config)
        sw.apply_profile("nonexistent")
    mock_gm.assert_not_called()


def test_on_switch_callback_fires(config):
    mon0 = _make_mock_monitor(0)
    mon1 = _make_mock_monitor(1)
    received = []

    with patch("core.switcher.get_monitors", return_value=[mon0, mon1]):
        sw = MonitorSwitcher(config)
        sw.on_switch(lambda pid: received.append(pid))
        sw.apply_profile("pc2_hdmi")

    assert received == ["pc2_hdmi"]


def test_apply_profile_updates_active_profile(config):
    mon0 = _make_mock_monitor(0)
    mon1 = _make_mock_monitor(1)

    with patch("core.switcher.get_monitors", return_value=[mon0, mon1]):
        sw = MonitorSwitcher(config)
        sw.apply_profile("pc2_hdmi")

    assert config.get_active_profile_id() == "pc2_hdmi"
  • Step 2: Run tests to confirm they fail
pytest tests/test_switcher.py -v

Expected: FAIL with ImportError

  • Step 3: Implement core/switcher.py
import logging
from typing import Callable
from monitorcontrol import get_monitors

logger = logging.getLogger(__name__)

VCP_INPUT_SOURCE = 0x60


class MonitorSwitcher:
    def __init__(self, config_manager):
        self._config_manager = config_manager
        self._on_switch_callbacks: list[Callable[[str], None]] = []

    def apply_profile(self, profile_id: str) -> None:
        profile = self._config_manager.get_profile(profile_id)
        if profile is None:
            logger.warning("Profile not found: %s", profile_id)
            return

        monitors = list(get_monitors())
        for input_cfg in profile["monitor_inputs"]:
            idx = input_cfg["monitor_index"]
            vcp_value = input_cfg["vcp_value"]
            if idx >= len(monitors):
                logger.warning(
                    "Monitor index %d out of range (have %d)", idx, len(monitors)
                )
                continue
            try:
                with monitors[idx] as monitor:
                    monitor.set_vcp_feature(VCP_INPUT_SOURCE, vcp_value)
                    logger.info("Monitor %d → VCP input %d", idx, vcp_value)
            except Exception as exc:
                logger.warning("Monitor %d DDC/CI error: %s", idx, exc)

        self._config_manager.set_active_profile(profile_id)
        for cb in self._on_switch_callbacks:
            cb(profile_id)

    def on_switch(self, callback: Callable[[str], None]) -> None:
        self._on_switch_callbacks.append(callback)

    def list_monitors(self) -> list[dict]:
        result = []
        for idx, monitor_handle in enumerate(get_monitors()):
            try:
                with monitor_handle as monitor:
                    current = monitor.get_vcp_feature(VCP_INPUT_SOURCE)
                    result.append({"index": idx, "current_vcp": current.value})
            except Exception as exc:
                logger.warning("Cannot query monitor %d: %s", idx, exc)
                result.append({"index": idx, "current_vcp": None})
        return result
  • Step 4: Run tests to verify they pass
pytest tests/test_switcher.py -v

Expected: all 7 PASS

  • Step 5: Commit
git add core/switcher.py tests/test_switcher.py
git commit -m "feat: MonitorSwitcher with DDC/CI apply_profile"

Task 4: DeviceWatcher (WMI BT events)

Files:

  • Create: core/device_watcher.py

  • Create: tests/test_device_watcher.py

  • Step 1: Write failing tests

tests/test_device_watcher.py:

import time
import threading
from unittest.mock import MagicMock, patch
import pytest
from config.config_manager import ConfigManager
from core.device_watcher import DeviceWatcher


@pytest.fixture
def config(tmp_path):
    return ConfigManager(tmp_path / "config.json")


@pytest.fixture
def switcher():
    sw = MagicMock()
    return sw


def test_handle_event_connect_triggers_correct_profile(config, switcher):
    watcher = DeviceWatcher(config, switcher)
    watcher._handle_event(
        "HID\\{...}_DEV_VID&0203F0_PID&6343_REV&0413_AABBCCDD", "connect"
    )
    time.sleep(0.6)  # wait for debounce (500ms)
    switcher.apply_profile.assert_called_once_with("pc1_dp")


def test_handle_event_disconnect_triggers_correct_profile(config, switcher):
    watcher = DeviceWatcher(config, switcher)
    watcher._handle_event(
        "HID\\{...}_DEV_VID&0203F0_PID&6343_REV&0413_AABBCCDD", "disconnect"
    )
    time.sleep(0.6)
    switcher.apply_profile.assert_called_once_with("pc2_hdmi")


def test_handle_event_ignores_unknown_device(config, switcher):
    watcher = DeviceWatcher(config, switcher)
    watcher._handle_event("HID\\VID&DEAD_PID&BEEF", "connect")
    time.sleep(0.6)
    switcher.apply_profile.assert_not_called()


def test_handle_event_ignores_disabled_trigger(config, switcher):
    cfg = config.get()
    cfg["device_triggers"][0]["enabled"] = False
    config.update(cfg)

    watcher = DeviceWatcher(config, switcher)
    watcher._handle_event(
        "HID\\{...}_DEV_VID&0203F0_PID&6343_REV&0413", "connect"
    )
    time.sleep(0.6)
    switcher.apply_profile.assert_not_called()


def test_debounce_collapses_rapid_events(config, switcher):
    watcher = DeviceWatcher(config, switcher)
    device_id = "HID\\{...}_DEV_VID&0203F0_PID&6343_REV&0413"
    # Fire 5 connect events in quick succession
    for _ in range(5):
        watcher._handle_event(device_id, "connect")
    time.sleep(0.8)
    # Should fire only once
    assert switcher.apply_profile.call_count == 1
  • Step 2: Run tests to confirm they fail
pytest tests/test_device_watcher.py -v

Expected: FAIL with ImportError

  • Step 3: Implement core/device_watcher.py
import logging
import threading
import pythoncom
import wmi

logger = logging.getLogger(__name__)


class DeviceWatcher:
    def __init__(self, config_manager, switcher):
        self._config_manager = config_manager
        self._switcher = switcher
        self._stop_event = threading.Event()
        self._debounce_timers: dict[str, threading.Timer] = {}
        self._debounce_lock = threading.Lock()
        self._thread: threading.Thread | None = None

    def start(self) -> None:
        self._stop_event.clear()
        self._thread = threading.Thread(target=self._watch_loop, daemon=True, name="DeviceWatcher")
        self._thread.start()
        logger.info("DeviceWatcher started")

    def stop(self) -> None:
        self._stop_event.set()

    def _watch_loop(self) -> None:
        retries = 0
        max_retries = 3
        while not self._stop_event.is_set() and retries <= max_retries:
            try:
                self._run_watcher()
                retries = 0  # reset on clean exit
            except Exception as exc:
                retries += 1
                logger.error("WMI watcher error (attempt %d/%d): %s", retries, max_retries, exc)
                self._stop_event.wait(5)
        logger.warning("DeviceWatcher stopped after %d retries", retries)

    def _run_watcher(self) -> None:
        pythoncom.CoInitialize()
        try:
            c = wmi.WMI()
            debounce_s = self._config_manager.get().get("general", {}).get("debounce_ms", 500) / 1000

            watcher_create = c.watch_for(
                notification_type="Creation",
                wmi_class="Win32_PnPEntity",
                delay_secs=1,
            )
            watcher_delete = c.watch_for(
                notification_type="Deletion",
                wmi_class="Win32_PnPEntity",
                delay_secs=1,
            )

            t_create = threading.Thread(
                target=self._watcher_thread,
                args=(watcher_create, "connect"),
                daemon=True,
            )
            t_delete = threading.Thread(
                target=self._watcher_thread,
                args=(watcher_delete, "disconnect"),
                daemon=True,
            )
            t_create.start()
            t_delete.start()
            t_create.join()
            t_delete.join()
        finally:
            pythoncom.CoUninitialize()

    def _watcher_thread(self, watcher, event_type: str) -> None:
        while not self._stop_event.is_set():
            try:
                event = watcher(timeout_ms=1000)
                if event:
                    self._handle_event(event.DeviceID or "", event_type)
            except wmi.x_wmi_timed_out:
                pass
            except Exception as exc:
                logger.error("Watcher thread (%s) error: %s", event_type, exc)
                break

    def _handle_event(self, device_id: str, event_type: str) -> None:
        config = self._config_manager.get()
        for trigger in config.get("device_triggers", []):
            if not trigger.get("enabled", True):
                continue
            if trigger["bt_device_id"] in device_id:
                profile_id = trigger["on_connect"] if event_type == "connect" else trigger["on_disconnect"]
                logger.info(
                    "Device %s event '%s' matched trigger '%s' → profile '%s'",
                    device_id, event_type, trigger["name"], profile_id,
                )
                self._schedule_switch(device_id + event_type, profile_id)
                return

    def _schedule_switch(self, key: str, profile_id: str) -> None:
        debounce_s = self._config_manager.get().get("general", {}).get("debounce_ms", 500) / 1000.0
        with self._debounce_lock:
            existing = self._debounce_timers.get(key)
            if existing:
                existing.cancel()
            timer = threading.Timer(debounce_s, self._switcher.apply_profile, args=[profile_id])
            self._debounce_timers[key] = timer
            timer.start()
  • Step 4: Run tests to verify they pass
pytest tests/test_device_watcher.py -v

Expected: all 5 PASS

  • Step 5: Commit
git add core/device_watcher.py tests/test_device_watcher.py
git commit -m "feat: DeviceWatcher with WMI BT event monitoring and debounce"

Task 5: HotkeyManager

Files:

  • Create: core/hotkey_manager.py

  • Create: tests/test_hotkey_manager.py

  • Step 1: Write failing tests

tests/test_hotkey_manager.py:

from unittest.mock import MagicMock, patch, call
import pytest
from config.config_manager import ConfigManager
from core.hotkey_manager import HotkeyManager


@pytest.fixture
def config(tmp_path):
    return ConfigManager(tmp_path / "config.json")


@pytest.fixture
def switcher():
    return MagicMock()


def test_register_all_registers_profile_hotkeys(config, switcher):
    with patch("core.hotkey_manager.keyboard") as mock_kb:
        hm = HotkeyManager(config, switcher)
        hm.register_all()
    assert mock_kb.add_hotkey.call_count == 2
    calls = [c.args[0] for c in mock_kb.add_hotkey.call_args_list]
    assert "ctrl+alt+1" in calls
    assert "ctrl+alt+2" in calls


def test_register_all_skips_empty_hotkeys(config, switcher):
    cfg = config.get()
    cfg["profiles"][0]["hotkey"] = ""
    config.update(cfg)

    with patch("core.hotkey_manager.keyboard") as mock_kb:
        hm = HotkeyManager(config, switcher)
        hm.register_all()
    assert mock_kb.add_hotkey.call_count == 1


def test_unregister_all_removes_hotkeys(config, switcher):
    with patch("core.hotkey_manager.keyboard") as mock_kb:
        hm = HotkeyManager(config, switcher)
        hm.register_all()
        hm.unregister_all()
    assert mock_kb.remove_hotkey.call_count == 2


def test_register_all_clears_old_before_registering(config, switcher):
    with patch("core.hotkey_manager.keyboard") as mock_kb:
        hm = HotkeyManager(config, switcher)
        hm.register_all()
        hm.register_all()  # second call
    # remove_hotkey called once (before second register)
    assert mock_kb.remove_hotkey.call_count == 2


def test_hotkey_calls_apply_profile(config, switcher):
    captured_callbacks = {}

    def fake_add_hotkey(hotkey, fn, args):
        captured_callbacks[hotkey] = (fn, args)

    with patch("core.hotkey_manager.keyboard") as mock_kb:
        mock_kb.add_hotkey.side_effect = fake_add_hotkey
        hm = HotkeyManager(config, switcher)
        hm.register_all()

    fn, args = captured_callbacks["ctrl+alt+1"]
    fn(*args)
    switcher.apply_profile.assert_called_once_with("pc1_dp")
  • Step 2: Run tests to confirm they fail
pytest tests/test_hotkey_manager.py -v

Expected: FAIL with ImportError

  • Step 3: Implement core/hotkey_manager.py
import logging
import keyboard

logger = logging.getLogger(__name__)


class HotkeyManager:
    def __init__(self, config_manager, switcher):
        self._config_manager = config_manager
        self._switcher = switcher
        self._registered: list[str] = []

    def register_all(self) -> None:
        self.unregister_all()
        config = self._config_manager.get()
        for profile in config.get("profiles", []):
            hotkey = profile.get("hotkey", "").strip()
            if not hotkey:
                continue
            profile_id = profile["id"]
            try:
                keyboard.add_hotkey(hotkey, self._switcher.apply_profile, args=[profile_id])
                self._registered.append(hotkey)
                logger.info("Hotkey '%s' → profile '%s'", hotkey, profile_id)
            except Exception as exc:
                logger.warning("Failed to register hotkey '%s': %s", hotkey, exc)

    def unregister_all(self) -> None:
        for hotkey in self._registered:
            try:
                keyboard.remove_hotkey(hotkey)
            except Exception:
                pass
        self._registered.clear()
  • Step 4: Run tests to verify they pass
pytest tests/test_hotkey_manager.py -v

Expected: all 5 PASS

  • Step 5: Commit
git add core/hotkey_manager.py tests/test_hotkey_manager.py
git commit -m "feat: HotkeyManager with global hotkey registration"

Task 6: Tray icon + main.py

Files:

  • Create: ui/tray.py

  • Create: main.py

  • Step 1: Implement ui/tray.py

import queue
import threading
import logging
import pystray
from PIL import Image, ImageDraw, ImageFont

logger = logging.getLogger(__name__)


def _make_icon(label: str, color: str) -> Image.Image:
    img = Image.new("RGBA", (64, 64), (0, 0, 0, 0))
    draw = ImageDraw.Draw(img)
    draw.ellipse([2, 2, 62, 62], fill=color)
    text = label[:2].upper()
    # Use default font — no external font needed
    bbox = draw.textbbox((0, 0), text)
    tw = bbox[2] - bbox[0]
    th = bbox[3] - bbox[1]
    draw.text(((64 - tw) // 2, (64 - th) // 2), text, fill="white")
    return img


class TrayApp:
    # Colors per profile type
    _COLORS = {"dp": "#1565C0", "hdmi": "#B71C1C"}

    def __init__(self, config_manager, switcher, open_settings_queue: queue.Queue):
        self._config_manager = config_manager
        self._switcher = switcher
        self._settings_queue = open_settings_queue
        self._icon: pystray.Icon | None = None
        switcher.on_switch(self._on_profile_switched)

    def _color_for(self, profile_id: str) -> str:
        return self._COLORS["dp"] if "dp" in profile_id else self._COLORS["hdmi"]

    def _build_menu(self) -> pystray.Menu:
        config = self._config_manager.get()
        items = []
        for profile in config.get("profiles", []):
            pid = profile["id"]
            name = profile["name"]
            items.append(
                pystray.MenuItem(
                    name,
                    lambda _, p=pid: self._switcher.apply_profile(p),
                    checked=lambda item, p=pid: self._config_manager.get_active_profile_id() == p,
                    radio=True,
                )
            )
        items.append(pystray.Menu.SEPARATOR)
        items.append(pystray.MenuItem("Ustawienia", self._request_settings))
        items.append(pystray.MenuItem("Wyjście", self._exit))
        return pystray.Menu(*items)

    def _on_profile_switched(self, profile_id: str) -> None:
        if self._icon is None:
            return
        profile = self._config_manager.get_profile(profile_id)
        label = profile["name"][:2] if profile else "??"
        title = profile["name"] if profile else profile_id
        self._icon.icon = _make_icon(label, self._color_for(profile_id))
        self._icon.title = f"MonitorSwitcher — {title}"
        self._icon.update_menu()

    def _request_settings(self, icon=None, item=None) -> None:
        self._settings_queue.put("open_settings")

    def _exit(self, icon=None, item=None) -> None:
        self._settings_queue.put("exit")
        if self._icon:
            self._icon.stop()

    def run_tray(self) -> None:
        active_id = self._config_manager.get_active_profile_id()
        profile = self._config_manager.get_profile(active_id)
        label = profile["name"][:2] if profile else "MS"
        title = f"MonitorSwitcher — {profile['name']}" if profile else "MonitorSwitcher"

        self._icon = pystray.Icon(
            "MonitorSwitcher",
            _make_icon(label, self._color_for(active_id)),
            title=title,
            menu=self._build_menu(),
        )
        self._icon.run()
  • Step 2: Implement main.py
import sys
import queue
import threading
import logging

from PyQt6.QtWidgets import QApplication
from PyQt6.QtCore import QTimer

from config.config_manager import ConfigManager
from core.switcher import MonitorSwitcher
from core.device_watcher import DeviceWatcher
from core.hotkey_manager import HotkeyManager
from ui.tray import TrayApp

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(name)s %(levelname)s %(message)s",
)


def main() -> None:
    app = QApplication(sys.argv)
    app.setQuitOnLastWindowClosed(False)

    config = ConfigManager()
    switcher = MonitorSwitcher(config)
    watcher = DeviceWatcher(config, switcher)
    hotkeys = HotkeyManager(config, switcher)

    settings_queue: queue.Queue = queue.Queue()
    tray = TrayApp(config, switcher, settings_queue)

    watcher.start()
    hotkeys.register_all()

    tray_thread = threading.Thread(target=tray.run_tray, daemon=True, name="TrayThread")
    tray_thread.start()

    settings_window = None

    def process_queue() -> None:
        nonlocal settings_window
        while not settings_queue.empty():
            cmd = settings_queue.get_nowait()
            if cmd == "open_settings":
                from ui.settings_window import SettingsWindow
                if settings_window is None or not settings_window.isVisible():
                    settings_window = SettingsWindow(config, switcher, hotkeys)
                settings_window.show()
                settings_window.raise_()
                settings_window.activateWindow()
            elif cmd == "exit":
                hotkeys.unregister_all()
                watcher.stop()
                app.quit()

    timer = QTimer()
    timer.timeout.connect(process_queue)
    timer.start(100)  # poll queue every 100ms

    sys.exit(app.exec())


if __name__ == "__main__":
    main()
  • Step 3: Smoke test — run the app
python main.py

Expected: tray icon appears in system tray, right-click shows menu with profiles + Ustawienia + Wyjście. No errors in console.

  • Step 4: Commit
git add ui/tray.py main.py
git commit -m "feat: tray icon + main orchestration"

Task 7: Settings Window — shell + Profiles tab

Files:

  • Create: ui/settings_window.py

  • Step 1: Implement settings window shell + Profiles tab

ui/settings_window.py:

import copy
import logging
from PyQt6.QtWidgets import (
    QMainWindow, QWidget, QTabWidget, QVBoxLayout, QHBoxLayout,
    QPushButton, QListWidget, QListWidgetItem, QLabel, QLineEdit,
    QFormLayout, QSpinBox, QCheckBox, QComboBox, QGroupBox,
    QMessageBox, QDialog, QDialogButtonBox,
)
from PyQt6.QtCore import Qt

logger = logging.getLogger(__name__)

VCP_INPUT_LABELS = {
    15: "DisplayPort-1 (0x0F)",
    16: "DisplayPort-2 (0x10)",
    17: "HDMI-1 (0x11)",
    18: "HDMI-2 (0x12)",
    1:  "VGA-1 (0x01)",
}


class SettingsWindow(QMainWindow):
    def __init__(self, config_manager, switcher, hotkey_manager):
        super().__init__()
        self._config_manager = config_manager
        self._switcher = switcher
        self._hotkey_manager = hotkey_manager
        self._working_config = copy.deepcopy(config_manager.get())

        self.setWindowTitle("MonitorSwitcher — Ustawienia")
        self.setMinimumSize(600, 500)

        tabs = QTabWidget()
        tabs.addTab(self._build_profiles_tab(), "Profile")
        tabs.addTab(self._build_devices_tab(), "Urządzenia")
        tabs.addTab(self._build_hotkeys_tab(), "Hotkeys")
        tabs.addTab(self._build_general_tab(), "Ogólne")

        save_btn = QPushButton("Zapisz i zamknij")
        save_btn.clicked.connect(self._save_and_close)
        cancel_btn = QPushButton("Anuluj")
        cancel_btn.clicked.connect(self.close)

        btn_row = QHBoxLayout()
        btn_row.addStretch()
        btn_row.addWidget(cancel_btn)
        btn_row.addWidget(save_btn)

        layout = QVBoxLayout()
        layout.addWidget(tabs)
        layout.addLayout(btn_row)

        central = QWidget()
        central.setLayout(layout)
        self.setCentralWidget(central)

    # ── Profiles tab ──────────────────────────────────────────────────────────

    def _build_profiles_tab(self) -> QWidget:
        widget = QWidget()
        layout = QHBoxLayout(widget)

        # Left: profile list
        left = QVBoxLayout()
        self._profile_list = QListWidget()
        self._profile_list.currentRowChanged.connect(self._on_profile_selected)
        left.addWidget(QLabel("Profile:"))
        left.addWidget(self._profile_list)

        add_btn = QPushButton("+ Dodaj")
        add_btn.clicked.connect(self._add_profile)
        del_btn = QPushButton("Usuń")
        del_btn.clicked.connect(self._delete_profile)
        btn_row = QHBoxLayout()
        btn_row.addWidget(add_btn)
        btn_row.addWidget(del_btn)
        left.addLayout(btn_row)

        # Right: profile editor
        right = QVBoxLayout()
        self._profile_name_edit = QLineEdit()
        self._profile_name_edit.setPlaceholderText("Nazwa profilu")
        self._profile_name_edit.textChanged.connect(self._on_profile_name_changed)
        right.addWidget(QLabel("Nazwa:"))
        right.addWidget(self._profile_name_edit)

        self._monitor_combos: list[QComboBox] = []
        self._monitor_group = QGroupBox("Wejścia monitorów")
        monitor_layout = QFormLayout()
        for i in range(2):
            combo = QComboBox()
            for vcp, label in VCP_INPUT_LABELS.items():
                combo.addItem(label, vcp)
            combo.currentIndexChanged.connect(lambda _, idx=i: self._on_monitor_input_changed(idx))
            self._monitor_combos.append(combo)
            monitor_layout.addRow(f"Monitor {i}:", combo)
        self._monitor_group.setLayout(monitor_layout)
        right.addWidget(self._monitor_group)
        right.addStretch()

        layout.addLayout(left, 1)
        layout.addLayout(right, 2)

        self._refresh_profile_list()
        return widget

    def _refresh_profile_list(self) -> None:
        self._profile_list.clear()
        for p in self._working_config.get("profiles", []):
            self._profile_list.addItem(QListWidgetItem(p["name"]))
        if self._profile_list.count() > 0:
            self._profile_list.setCurrentRow(0)

    def _on_profile_selected(self, row: int) -> None:
        profiles = self._working_config.get("profiles", [])
        if row < 0 or row >= len(profiles):
            return
        profile = profiles[row]
        self._profile_name_edit.blockSignals(True)
        self._profile_name_edit.setText(profile["name"])
        self._profile_name_edit.blockSignals(False)
        for combo_widget in self._monitor_combos:
            combo_widget.blockSignals(True)
        for inp in profile.get("monitor_inputs", []):
            idx = inp["monitor_index"]
            vcp = inp["vcp_value"]
            if idx < len(self._monitor_combos):
                combo = self._monitor_combos[idx]
                data_idx = combo.findData(vcp)
                combo.setCurrentIndex(data_idx if data_idx >= 0 else 0)
        for combo_widget in self._monitor_combos:
            combo_widget.blockSignals(False)

    def _on_profile_name_changed(self, text: str) -> None:
        row = self._profile_list.currentRow()
        profiles = self._working_config.get("profiles", [])
        if 0 <= row < len(profiles):
            profiles[row]["name"] = text
            self._profile_list.item(row).setText(text)

    def _on_monitor_input_changed(self, monitor_idx: int) -> None:
        row = self._profile_list.currentRow()
        profiles = self._working_config.get("profiles", [])
        if row < 0 or row >= len(profiles):
            return
        vcp = self._monitor_combos[monitor_idx].currentData()
        inputs = profiles[row].get("monitor_inputs", [])
        for inp in inputs:
            if inp["monitor_index"] == monitor_idx:
                inp["vcp_value"] = vcp
                return
        inputs.append({"monitor_index": monitor_idx, "vcp_value": vcp})

    def _add_profile(self) -> None:
        new_profile = {
            "id": f"profile_{len(self._working_config['profiles'])}",
            "name": "Nowy profil",
            "hotkey": "",
            "monitor_inputs": [
                {"monitor_index": 0, "vcp_value": 15},
                {"monitor_index": 1, "vcp_value": 15},
            ],
        }
        self._working_config["profiles"].append(new_profile)
        self._profile_list.addItem(QListWidgetItem(new_profile["name"]))
        self._profile_list.setCurrentRow(self._profile_list.count() - 1)

    def _delete_profile(self) -> None:
        row = self._profile_list.currentRow()
        if row < 0:
            return
        self._working_config["profiles"].pop(row)
        self._refresh_profile_list()

    # ── Devices tab ───────────────────────────────────────────────────────────

    def _build_devices_tab(self) -> QWidget:
        widget = QWidget()
        layout = QVBoxLayout(widget)
        layout.addWidget(QLabel("Urządzenia BT wyzwalające przełączenie profilu:"))

        self._device_rows: list[dict] = []
        self._devices_container = QVBoxLayout()
        layout.addLayout(self._devices_container)

        add_btn = QPushButton("+ Dodaj urządzenie")
        add_btn.clicked.connect(self._add_device_row)
        layout.addWidget(add_btn)
        layout.addStretch()

        for trigger in self._working_config.get("device_triggers", []):
            self._add_device_row(trigger)

        return widget

    def _add_device_row(self, trigger: dict | None = None) -> None:
        if trigger is None:
            trigger = {
                "name": "Nowe urządzenie",
                "bt_device_id": "",
                "on_connect": self._first_profile_id(),
                "on_disconnect": self._first_profile_id(),
                "enabled": True,
            }
            self._working_config.setdefault("device_triggers", []).append(trigger)

        group = QGroupBox()
        form = QFormLayout(group)

        name_edit = QLineEdit(trigger.get("name", ""))
        bt_id_edit = QLineEdit(trigger.get("bt_device_id", ""))
        connect_combo = self._profile_combo(trigger.get("on_connect", ""))
        disconnect_combo = self._profile_combo(trigger.get("on_disconnect", ""))
        enabled_cb = QCheckBox()
        enabled_cb.setChecked(trigger.get("enabled", True))

        form.addRow("Nazwa:", name_edit)
        form.addRow("BT Device ID (fragment):", bt_id_edit)
        form.addRow("Przy podłączeniu:", connect_combo)
        form.addRow("Przy odłączeniu:", disconnect_combo)
        form.addRow("Aktywne:", enabled_cb)

        row_ref = {"trigger": trigger, "group": group}
        self._device_rows.append(row_ref)

        def update_trigger():
            trigger["name"] = name_edit.text()
            trigger["bt_device_id"] = bt_id_edit.text()
            trigger["on_connect"] = connect_combo.currentData()
            trigger["on_disconnect"] = disconnect_combo.currentData()
            trigger["enabled"] = enabled_cb.isChecked()

        name_edit.textChanged.connect(lambda _: update_trigger())
        bt_id_edit.textChanged.connect(lambda _: update_trigger())
        connect_combo.currentIndexChanged.connect(lambda _: update_trigger())
        disconnect_combo.currentIndexChanged.connect(lambda _: update_trigger())
        enabled_cb.stateChanged.connect(lambda _: update_trigger())

        self._devices_container.addWidget(group)

    def _profile_combo(self, selected_id: str) -> QComboBox:
        combo = QComboBox()
        for p in self._working_config.get("profiles", []):
            combo.addItem(p["name"], p["id"])
        idx = combo.findData(selected_id)
        if idx >= 0:
            combo.setCurrentIndex(idx)
        return combo

    def _first_profile_id(self) -> str:
        profiles = self._working_config.get("profiles", [])
        return profiles[0]["id"] if profiles else ""

    # ── Hotkeys tab ───────────────────────────────────────────────────────────

    def _build_hotkeys_tab(self) -> QWidget:
        widget = QWidget()
        layout = QVBoxLayout(widget)
        layout.addWidget(QLabel("Przypisz skróty klawiszowe do profili:"))
        layout.addWidget(QLabel("Format: ctrl+alt+1, win+F1, itp."))

        self._hotkey_edits: dict[str, QLineEdit] = {}
        form = QFormLayout()
        for profile in self._working_config.get("profiles", []):
            edit = QLineEdit(profile.get("hotkey", ""))
            edit.setPlaceholderText("np. ctrl+alt+1")
            pid = profile["id"]
            edit.textChanged.connect(lambda text, p=pid: self._on_hotkey_changed(p, text))
            self._hotkey_edits[pid] = edit
            form.addRow(f"{profile['name']}:", edit)

        layout.addLayout(form)
        layout.addStretch()
        return widget

    def _on_hotkey_changed(self, profile_id: str, text: str) -> None:
        for p in self._working_config.get("profiles", []):
            if p["id"] == profile_id:
                p["hotkey"] = text.strip()
                return

    # ── General tab ───────────────────────────────────────────────────────────

    def _build_general_tab(self) -> QWidget:
        widget = QWidget()
        layout = QFormLayout(widget)

        self._autostart_cb = QCheckBox()
        self._autostart_cb.setChecked(
            self._working_config.get("general", {}).get("autostart", True)
        )
        self._minimize_cb = QCheckBox()
        self._minimize_cb.setChecked(
            self._working_config.get("general", {}).get("minimize_to_tray", True)
        )
        self._debounce_spin = QSpinBox()
        self._debounce_spin.setRange(100, 5000)
        self._debounce_spin.setSuffix(" ms")
        self._debounce_spin.setValue(
            self._working_config.get("general", {}).get("debounce_ms", 500)
        )

        layout.addRow("Autostart z Windows:", self._autostart_cb)
        layout.addRow("Minimalizuj do tray przy zamknięciu:", self._minimize_cb)
        layout.addRow("Debounce (opóźnienie przełączenia):", self._debounce_spin)

        return widget

    # ── Save ──────────────────────────────────────────────────────────────────

    def _save_and_close(self) -> None:
        general = self._working_config.setdefault("general", {})
        general["autostart"] = self._autostart_cb.isChecked()
        general["minimize_to_tray"] = self._minimize_cb.isChecked()
        general["debounce_ms"] = self._debounce_spin.value()

        self._config_manager.update(self._working_config)
        self._hotkey_manager.unregister_all()
        self._hotkey_manager.register_all()

        if general.get("autostart"):
            _set_autostart(True)
        else:
            _set_autostart(False)

        self.close()

    def closeEvent(self, event):
        if self._config_manager.get().get("general", {}).get("minimize_to_tray", True):
            event.accept()
        else:
            event.accept()


# ── Autostart helper ──────────────────────────────────────────────────────────

def _set_autostart(enable: bool) -> None:
    import sys
    import winreg
    key_path = r"Software\Microsoft\Windows\CurrentVersion\Run"
    app_name = "MonitorSwitcher"
    exe_path = sys.executable if not getattr(sys, "frozen", False) else sys.executable
    try:
        with winreg.OpenKey(winreg.HKEY_CURRENT_USER, key_path, 0, winreg.KEY_SET_VALUE) as key:
            if enable:
                winreg.SetValueEx(key, app_name, 0, winreg.REG_SZ, f'"{exe_path}"')
            else:
                try:
                    winreg.DeleteValue(key, app_name)
                except FileNotFoundError:
                    pass
    except Exception as exc:
        logger.warning("Autostart registry error: %s", exc)
  • Step 2: Smoke test — open settings window
python main.py

Right-click tray icon → "Ustawienia". Expected: window opens with 4 tabs, Profile tab shows 2 profiles (PC1 — DP, PC2 — HDMI) with monitor dropdowns.

  • Step 3: Commit
git add ui/settings_window.py
git commit -m "feat: settings window with Profiles/Devices/Hotkeys/General tabs"

Task 8: Run all tests + fix VCP value detection

Files:

  • Modify: core/switcher.py — verify list_monitors() works live

  • Step 1: Run full test suite

pytest tests/ -v

Expected: all tests PASS (20 total across 4 test files)

  • Step 2: Detect actual VCP input values from monitors

Run this one-off script to confirm which VCP values the L27qe monitors use for DP and HDMI:

# run_once: detect_vcp.py
from monitorcontrol import get_monitors

for i, handle in enumerate(get_monitors()):
    with handle as monitor:
        try:
            current = monitor.get_vcp_feature(0x60)
            print(f"Monitor {i}: current input VCP = {current.value} (0x{current.value:02X})")
        except Exception as e:
            print(f"Monitor {i}: error — {e}")

Run:

python detect_vcp.py

Expected output (typical for L27qe):

Monitor 0: current input VCP = 15 (0x0F)   ← DP active
Monitor 1: current input VCP = 15 (0x0F)   ← DP active

If values differ from 15/17, update config/config.json defaults accordingly.

  • Step 3: Create detect_vcp.py as a helper tool
# detect_vcp.py  — helper to find VCP input values on this machine
from monitorcontrol import get_monitors

print("Scanning monitors for current input VCP value (0x60)...")
for i, handle in enumerate(get_monitors()):
    with handle as monitor:
        try:
            current = monitor.get_vcp_feature(0x60)
            print(f"  Monitor {i}: VCP input = {current.value} (0x{current.value:02X})")
        except Exception as e:
            print(f"  Monitor {i}: could not read — {e}")
print("Done. Use these values in config.json → profiles → monitor_inputs → vcp_value")
  • Step 4: Commit
git add detect_vcp.py
git commit -m "chore: add detect_vcp.py helper, all tests passing"

Task 9: PyInstaller packaging

Files:

  • Create: MonitorSwitcher.spec

  • Step 1: Install PyInstaller

pip install pyinstaller
  • Step 2: Create MonitorSwitcher.spec
# MonitorSwitcher.spec
block_cipher = None

a = Analysis(
    ['main.py'],
    pathex=['.'],
    binaries=[],
    datas=[('config/config.json', 'config')],
    hiddenimports=['wmi', 'pythoncom', 'pywintypes', 'win32api', 'win32con', 'winreg'],
    hookspath=[],
    runtime_hooks=[],
    excludes=[],
    win_no_prefer_redirects=False,
    win_private_assemblies=False,
    cipher=block_cipher,
)

pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)

exe = EXE(
    pyz,
    a.scripts,
    a.binaries,
    a.zipfiles,
    a.datas,
    [],
    name='MonitorSwitcher',
    debug=False,
    bootloader_ignore_signals=False,
    strip=False,
    upx=False,
    upx_exclude=[],
    runtime_tmpdir=None,
    console=False,      # no console window
    icon=None,          # add .ico path here if desired
)
  • Step 3: Build
pyinstaller MonitorSwitcher.spec

Expected: dist/MonitorSwitcher.exe created

  • Step 4: Test the exe
dist/MonitorSwitcher.exe

Expected: tray icon appears, settings window opens, no console window, no errors.

  • Step 5: Commit
git add MonitorSwitcher.spec detect_vcp.py
git add -f dist/MonitorSwitcher.exe   # optional — if distributing exe via git
git commit -m "build: PyInstaller spec, produces dist/MonitorSwitcher.exe"

Self-Review

Spec coverage check

Spec requirement Task
Switch both L27qe monitors via DDC/CI VCP 0x60 Task 3
Profile-based config (DP/HDMI inputs per monitor) Task 2, 3
WMI BT device connect/disconnect detection Task 4
Debounce rapid events Task 4
WMI watcher retry on crash (max 3, 5s backoff) Task 4 (_watch_loop)
HP 975 keyboard trigger Task 2 (config defaults)
MX Anywhere 2S trigger Task 2 (config defaults)
Global hotkeys per profile Task 5
System tray icon (changes color/label per profile) Task 6
Tray menu: profiles + Settings + Exit Task 6
PyQt6 settings window with 4 tabs Task 7
Profiles tab: create/edit/delete, per-monitor input Task 7
Devices tab: BT triggers, connect/disconnect → profile Task 7
Hotkeys tab: per-profile hotkey assignment Task 7
General tab: autostart, minimize to tray, debounce Task 7
Autostart via Windows registry Task 7 (_set_autostart)
Config falls back to defaults on corrupt file Task 2
Packaging to single .exe via PyInstaller Task 9

All requirements covered.

Type consistency check

  • ConfigManager.get_profile(id) returns dict | None — used in MonitorSwitcher.apply_profile()
  • MonitorSwitcher.apply_profile(profile_id: str) — called consistently in DeviceWatcher, HotkeyManager, TrayApp
  • TrayApp receives queue.Queuemain.py creates and passes it
  • SettingsWindow receives config_manager, switcher, hotkey_managermain.py passes all three