1629 lines
50 KiB
Markdown
1629 lines
50 KiB
Markdown
# MonitorSwitcher Implementation Plan
|
|
|
|
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
|
|
|
**Goal:** Python system tray app that switches two Lenovo L27qe monitors between DP and HDMI inputs automatically when HP 975 keyboard or Logitech MX Anywhere 2S connects/disconnects via Bluetooth, with hotkey support and PyQt6 config UI.
|
|
|
|
**Architecture:** Profile-based config (each profile = set of monitor VCP input values). WMI event watcher detects BT device connect/disconnect and fires `MonitorSwitcher.apply_profile()`. PyQt6 runs on the main thread; pystray runs in a daemon thread and communicates back via a `queue.Queue`.
|
|
|
|
**Tech Stack:** Python 3.11+, monitorcontrol, pystray, keyboard, wmi, pywin32, PyQt6, Pillow, pytest, PyInstaller
|
|
|
|
---
|
|
|
|
## File Map
|
|
|
|
| File | Responsibility |
|
|
|---|---|
|
|
| `main.py` | Bootstrap: create components, start threads, run Qt event loop |
|
|
| `config/config_manager.py` | Load/save/validate `config.json`, callbacks on change |
|
|
| `config/config.json` | Default config (ships with app) |
|
|
| `core/switcher.py` | DDC/CI via monitorcontrol, `apply_profile()`, monitor enumeration |
|
|
| `core/device_watcher.py` | WMI BT event watcher in background thread, debounce |
|
|
| `core/hotkey_manager.py` | Global hotkey registration via `keyboard` lib |
|
|
| `ui/tray.py` | pystray icon + menu, communicates with Qt via queue |
|
|
| `ui/settings_window.py` | PyQt6 4-tab settings window |
|
|
| `tests/test_config_manager.py` | Unit tests for ConfigManager |
|
|
| `tests/test_switcher.py` | Unit tests for MonitorSwitcher (mocked monitorcontrol) |
|
|
| `tests/test_device_watcher.py` | Unit tests for DeviceWatcher (mocked WMI) |
|
|
| `tests/test_hotkey_manager.py` | Unit tests for HotkeyManager (mocked keyboard) |
|
|
| `requirements.txt` | Python dependencies |
|
|
| `MonitorSwitcher.spec` | PyInstaller spec file |
|
|
|
|
---
|
|
|
|
## Task 1: Project scaffold + requirements
|
|
|
|
**Files:**
|
|
- Create: `requirements.txt`
|
|
- Create: `config/__init__.py`
|
|
- Create: `core/__init__.py`
|
|
- Create: `ui/__init__.py`
|
|
- Create: `tests/__init__.py`
|
|
|
|
- [ ] **Step 1: Create virtual environment and requirements.txt**
|
|
|
|
```
|
|
d:/VSCode/MonitorSwitcher/requirements.txt
|
|
```
|
|
|
|
```text
|
|
monitorcontrol>=0.10.0
|
|
pystray>=0.19.5
|
|
keyboard>=0.13.5
|
|
wmi>=1.5.1
|
|
pywin32>=306
|
|
PyQt6>=6.6.0
|
|
Pillow>=10.0.0
|
|
pytest>=7.4.0
|
|
pytest-mock>=3.12.0
|
|
```
|
|
|
|
- [ ] **Step 2: Create directory structure**
|
|
|
|
Run:
|
|
```bash
|
|
cd d:/VSCode/MonitorSwitcher
|
|
python -m venv .venv
|
|
.venv/Scripts/activate
|
|
pip install -r requirements.txt
|
|
mkdir config core ui tests
|
|
```
|
|
|
|
- [ ] **Step 3: Create `__init__.py` files**
|
|
|
|
Create `config/__init__.py`, `core/__init__.py`, `ui/__init__.py`, `tests/__init__.py` — all empty files.
|
|
|
|
- [ ] **Step 4: Verify install**
|
|
|
|
Run:
|
|
```bash
|
|
python -c "import monitorcontrol, pystray, keyboard, wmi, PyQt6, PIL; print('OK')"
|
|
```
|
|
Expected: `OK`
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git init
|
|
git add requirements.txt config/__init__.py core/__init__.py ui/__init__.py tests/__init__.py
|
|
git commit -m "chore: project scaffold"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 2: ConfigManager
|
|
|
|
**Files:**
|
|
- Create: `config/config.json`
|
|
- Create: `config/config_manager.py`
|
|
- Create: `tests/test_config_manager.py`
|
|
|
|
- [ ] **Step 1: Write failing tests**
|
|
|
|
`tests/test_config_manager.py`:
|
|
```python
|
|
import json
|
|
import pytest
|
|
from pathlib import Path
|
|
from config.config_manager import ConfigManager, DEFAULT_CONFIG
|
|
|
|
|
|
def test_load_defaults_when_no_file(tmp_path):
|
|
cm = ConfigManager(tmp_path / "config.json")
|
|
assert cm.get()["active_profile"] == "pc1_dp"
|
|
assert len(cm.get()["profiles"]) == 2
|
|
|
|
|
|
def test_saves_default_config_on_first_run(tmp_path):
|
|
path = tmp_path / "config.json"
|
|
ConfigManager(path)
|
|
assert path.exists()
|
|
data = json.loads(path.read_text())
|
|
assert data["active_profile"] == "pc1_dp"
|
|
|
|
|
|
def test_load_existing_config(tmp_path):
|
|
path = tmp_path / "config.json"
|
|
custom = {"active_profile": "custom", "profiles": [], "device_triggers": [], "general": {}}
|
|
path.write_text(json.dumps(custom))
|
|
cm = ConfigManager(path)
|
|
assert cm.get()["active_profile"] == "custom"
|
|
|
|
|
|
def test_fallback_on_corrupt_config(tmp_path):
|
|
path = tmp_path / "config.json"
|
|
path.write_text("not json {{{")
|
|
cm = ConfigManager(path)
|
|
assert cm.get()["active_profile"] == "pc1_dp"
|
|
|
|
|
|
def test_get_profile_returns_profile(tmp_path):
|
|
cm = ConfigManager(tmp_path / "config.json")
|
|
p = cm.get_profile("pc1_dp")
|
|
assert p is not None
|
|
assert p["name"] == "PC1 — DP"
|
|
|
|
|
|
def test_get_profile_returns_none_for_missing(tmp_path):
|
|
cm = ConfigManager(tmp_path / "config.json")
|
|
assert cm.get_profile("nonexistent") is None
|
|
|
|
|
|
def test_set_active_profile_persists(tmp_path):
|
|
path = tmp_path / "config.json"
|
|
cm = ConfigManager(path)
|
|
cm.set_active_profile("pc2_hdmi")
|
|
cm2 = ConfigManager(path)
|
|
assert cm2.get_active_profile_id() == "pc2_hdmi"
|
|
|
|
|
|
def test_on_change_callback_fires(tmp_path):
|
|
cm = ConfigManager(tmp_path / "config.json")
|
|
called_with = []
|
|
cm.on_change(lambda cfg: called_with.append(cfg["active_profile"]))
|
|
cfg = cm.get().copy()
|
|
cfg["active_profile"] = "pc2_hdmi"
|
|
cm.update(cfg)
|
|
assert called_with == ["pc2_hdmi"]
|
|
```
|
|
|
|
- [ ] **Step 2: Run tests to confirm they fail**
|
|
|
|
```bash
|
|
pytest tests/test_config_manager.py -v
|
|
```
|
|
Expected: all FAIL with `ModuleNotFoundError` or `ImportError`
|
|
|
|
- [ ] **Step 3: Create `config/config.json`**
|
|
|
|
```json
|
|
{
|
|
"active_profile": "pc1_dp",
|
|
"profiles": [
|
|
{
|
|
"id": "pc1_dp",
|
|
"name": "PC1 — DP",
|
|
"hotkey": "ctrl+alt+1",
|
|
"monitor_inputs": [
|
|
{"monitor_index": 0, "vcp_value": 15},
|
|
{"monitor_index": 1, "vcp_value": 15}
|
|
]
|
|
},
|
|
{
|
|
"id": "pc2_hdmi",
|
|
"name": "PC2 — HDMI",
|
|
"hotkey": "ctrl+alt+2",
|
|
"monitor_inputs": [
|
|
{"monitor_index": 0, "vcp_value": 17},
|
|
{"monitor_index": 1, "vcp_value": 17}
|
|
]
|
|
}
|
|
],
|
|
"device_triggers": [
|
|
{
|
|
"name": "HP 975 keyboard",
|
|
"bt_device_id": "VID&0203F0_PID&6343",
|
|
"on_connect": "pc1_dp",
|
|
"on_disconnect": "pc2_hdmi",
|
|
"enabled": true
|
|
},
|
|
{
|
|
"name": "MX Anywhere 2S",
|
|
"bt_device_id": "VID&02046D_PID&B01A",
|
|
"on_connect": "pc1_dp",
|
|
"on_disconnect": "pc2_hdmi",
|
|
"enabled": true
|
|
}
|
|
],
|
|
"general": {
|
|
"autostart": true,
|
|
"minimize_to_tray": true,
|
|
"debounce_ms": 500
|
|
}
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 4: Implement `config/config_manager.py`**
|
|
|
|
```python
|
|
import json
|
|
import copy
|
|
from pathlib import Path
|
|
from typing import Callable
|
|
|
|
DEFAULT_CONFIG = {
|
|
"active_profile": "pc1_dp",
|
|
"profiles": [
|
|
{
|
|
"id": "pc1_dp",
|
|
"name": "PC1 — DP",
|
|
"hotkey": "ctrl+alt+1",
|
|
"monitor_inputs": [
|
|
{"monitor_index": 0, "vcp_value": 15},
|
|
{"monitor_index": 1, "vcp_value": 15}
|
|
]
|
|
},
|
|
{
|
|
"id": "pc2_hdmi",
|
|
"name": "PC2 — HDMI",
|
|
"hotkey": "ctrl+alt+2",
|
|
"monitor_inputs": [
|
|
{"monitor_index": 0, "vcp_value": 17},
|
|
{"monitor_index": 1, "vcp_value": 17}
|
|
]
|
|
}
|
|
],
|
|
"device_triggers": [
|
|
{
|
|
"name": "HP 975 keyboard",
|
|
"bt_device_id": "VID&0203F0_PID&6343",
|
|
"on_connect": "pc1_dp",
|
|
"on_disconnect": "pc2_hdmi",
|
|
"enabled": True
|
|
},
|
|
{
|
|
"name": "MX Anywhere 2S",
|
|
"bt_device_id": "VID&02046D_PID&B01A",
|
|
"on_connect": "pc1_dp",
|
|
"on_disconnect": "pc2_hdmi",
|
|
"enabled": True
|
|
}
|
|
],
|
|
"general": {
|
|
"autostart": True,
|
|
"minimize_to_tray": True,
|
|
"debounce_ms": 500
|
|
}
|
|
}
|
|
|
|
|
|
class ConfigManager:
|
|
def __init__(self, config_path=None):
|
|
if config_path is None:
|
|
config_path = Path(__file__).parent / "config.json"
|
|
self.config_path = Path(config_path)
|
|
self._config: dict = {}
|
|
self._callbacks: list[Callable] = []
|
|
self.load()
|
|
|
|
def load(self) -> None:
|
|
if self.config_path.exists():
|
|
try:
|
|
with open(self.config_path, "r", encoding="utf-8") as f:
|
|
self._config = json.load(f)
|
|
except (json.JSONDecodeError, OSError):
|
|
self._config = copy.deepcopy(DEFAULT_CONFIG)
|
|
else:
|
|
self._config = copy.deepcopy(DEFAULT_CONFIG)
|
|
self.save()
|
|
|
|
def save(self) -> None:
|
|
self.config_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(self.config_path, "w", encoding="utf-8") as f:
|
|
json.dump(self._config, f, indent=2, ensure_ascii=False)
|
|
|
|
def get(self) -> dict:
|
|
return self._config
|
|
|
|
def update(self, new_config: dict) -> None:
|
|
self._config = new_config
|
|
self.save()
|
|
for cb in self._callbacks:
|
|
cb(self._config)
|
|
|
|
def on_change(self, callback: Callable) -> None:
|
|
self._callbacks.append(callback)
|
|
|
|
def get_profile(self, profile_id: str) -> dict | None:
|
|
for p in self._config.get("profiles", []):
|
|
if p["id"] == profile_id:
|
|
return p
|
|
return None
|
|
|
|
def get_active_profile_id(self) -> str:
|
|
return self._config.get("active_profile", "pc1_dp")
|
|
|
|
def set_active_profile(self, profile_id: str) -> None:
|
|
self._config["active_profile"] = profile_id
|
|
self.save()
|
|
```
|
|
|
|
- [ ] **Step 5: Run tests to verify they pass**
|
|
|
|
```bash
|
|
pytest tests/test_config_manager.py -v
|
|
```
|
|
Expected: all 8 PASS
|
|
|
|
- [ ] **Step 6: Commit**
|
|
|
|
```bash
|
|
git add config/config_manager.py config/config.json tests/test_config_manager.py
|
|
git commit -m "feat: ConfigManager with load/save/callbacks"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 3: MonitorSwitcher (DDC/CI)
|
|
|
|
**Files:**
|
|
- Create: `core/switcher.py`
|
|
- Create: `tests/test_switcher.py`
|
|
|
|
- [ ] **Step 1: Write failing tests**
|
|
|
|
`tests/test_switcher.py`:
|
|
```python
|
|
from unittest.mock import MagicMock, patch, call
|
|
import pytest
|
|
from config.config_manager import ConfigManager
|
|
from core.switcher import MonitorSwitcher
|
|
|
|
|
|
@pytest.fixture
|
|
def config(tmp_path):
|
|
return ConfigManager(tmp_path / "config.json")
|
|
|
|
|
|
def _make_mock_monitor(index):
|
|
m = MagicMock()
|
|
m.__enter__ = MagicMock(return_value=m)
|
|
m.__exit__ = MagicMock(return_value=False)
|
|
return m
|
|
|
|
|
|
def test_apply_profile_sends_vcp_to_both_monitors(config):
|
|
mon0 = _make_mock_monitor(0)
|
|
mon1 = _make_mock_monitor(1)
|
|
|
|
with patch("core.switcher.get_monitors", return_value=[mon0, mon1]):
|
|
sw = MonitorSwitcher(config)
|
|
sw.apply_profile("pc1_dp")
|
|
|
|
mon0.set_vcp_feature.assert_called_once_with(0x60, 15)
|
|
mon1.set_vcp_feature.assert_called_once_with(0x60, 15)
|
|
|
|
|
|
def test_apply_profile_hdmi(config):
|
|
mon0 = _make_mock_monitor(0)
|
|
mon1 = _make_mock_monitor(1)
|
|
|
|
with patch("core.switcher.get_monitors", return_value=[mon0, mon1]):
|
|
sw = MonitorSwitcher(config)
|
|
sw.apply_profile("pc2_hdmi")
|
|
|
|
mon0.set_vcp_feature.assert_called_once_with(0x60, 17)
|
|
mon1.set_vcp_feature.assert_called_once_with(0x60, 17)
|
|
|
|
|
|
def test_apply_profile_skips_missing_monitor(config):
|
|
mon0 = _make_mock_monitor(0)
|
|
# only 1 monitor, profile requires 2 — should skip monitor_index=1 silently
|
|
|
|
with patch("core.switcher.get_monitors", return_value=[mon0]):
|
|
sw = MonitorSwitcher(config)
|
|
sw.apply_profile("pc1_dp")
|
|
|
|
mon0.set_vcp_feature.assert_called_once_with(0x60, 15)
|
|
|
|
|
|
def test_apply_profile_continues_on_monitor_error(config):
|
|
mon0 = _make_mock_monitor(0)
|
|
mon1 = _make_mock_monitor(1)
|
|
mon0.set_vcp_feature.side_effect = Exception("DDC/CI error")
|
|
|
|
with patch("core.switcher.get_monitors", return_value=[mon0, mon1]):
|
|
sw = MonitorSwitcher(config)
|
|
sw.apply_profile("pc1_dp") # must not raise
|
|
|
|
mon1.set_vcp_feature.assert_called_once_with(0x60, 15)
|
|
|
|
|
|
def test_apply_profile_unknown_profile_does_nothing(config):
|
|
with patch("core.switcher.get_monitors", return_value=[]) as mock_gm:
|
|
sw = MonitorSwitcher(config)
|
|
sw.apply_profile("nonexistent")
|
|
mock_gm.assert_not_called()
|
|
|
|
|
|
def test_on_switch_callback_fires(config):
|
|
mon0 = _make_mock_monitor(0)
|
|
mon1 = _make_mock_monitor(1)
|
|
received = []
|
|
|
|
with patch("core.switcher.get_monitors", return_value=[mon0, mon1]):
|
|
sw = MonitorSwitcher(config)
|
|
sw.on_switch(lambda pid: received.append(pid))
|
|
sw.apply_profile("pc2_hdmi")
|
|
|
|
assert received == ["pc2_hdmi"]
|
|
|
|
|
|
def test_apply_profile_updates_active_profile(config):
|
|
mon0 = _make_mock_monitor(0)
|
|
mon1 = _make_mock_monitor(1)
|
|
|
|
with patch("core.switcher.get_monitors", return_value=[mon0, mon1]):
|
|
sw = MonitorSwitcher(config)
|
|
sw.apply_profile("pc2_hdmi")
|
|
|
|
assert config.get_active_profile_id() == "pc2_hdmi"
|
|
```
|
|
|
|
- [ ] **Step 2: Run tests to confirm they fail**
|
|
|
|
```bash
|
|
pytest tests/test_switcher.py -v
|
|
```
|
|
Expected: FAIL with `ImportError`
|
|
|
|
- [ ] **Step 3: Implement `core/switcher.py`**
|
|
|
|
```python
|
|
import logging
|
|
from typing import Callable
|
|
from monitorcontrol import get_monitors
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
VCP_INPUT_SOURCE = 0x60
|
|
|
|
|
|
class MonitorSwitcher:
|
|
def __init__(self, config_manager):
|
|
self._config_manager = config_manager
|
|
self._on_switch_callbacks: list[Callable[[str], None]] = []
|
|
|
|
def apply_profile(self, profile_id: str) -> None:
|
|
profile = self._config_manager.get_profile(profile_id)
|
|
if profile is None:
|
|
logger.warning("Profile not found: %s", profile_id)
|
|
return
|
|
|
|
monitors = list(get_monitors())
|
|
for input_cfg in profile["monitor_inputs"]:
|
|
idx = input_cfg["monitor_index"]
|
|
vcp_value = input_cfg["vcp_value"]
|
|
if idx >= len(monitors):
|
|
logger.warning(
|
|
"Monitor index %d out of range (have %d)", idx, len(monitors)
|
|
)
|
|
continue
|
|
try:
|
|
with monitors[idx] as monitor:
|
|
monitor.set_vcp_feature(VCP_INPUT_SOURCE, vcp_value)
|
|
logger.info("Monitor %d → VCP input %d", idx, vcp_value)
|
|
except Exception as exc:
|
|
logger.warning("Monitor %d DDC/CI error: %s", idx, exc)
|
|
|
|
self._config_manager.set_active_profile(profile_id)
|
|
for cb in self._on_switch_callbacks:
|
|
cb(profile_id)
|
|
|
|
def on_switch(self, callback: Callable[[str], None]) -> None:
|
|
self._on_switch_callbacks.append(callback)
|
|
|
|
def list_monitors(self) -> list[dict]:
|
|
result = []
|
|
for idx, monitor_handle in enumerate(get_monitors()):
|
|
try:
|
|
with monitor_handle as monitor:
|
|
current = monitor.get_vcp_feature(VCP_INPUT_SOURCE)
|
|
result.append({"index": idx, "current_vcp": current.value})
|
|
except Exception as exc:
|
|
logger.warning("Cannot query monitor %d: %s", idx, exc)
|
|
result.append({"index": idx, "current_vcp": None})
|
|
return result
|
|
```
|
|
|
|
- [ ] **Step 4: Run tests to verify they pass**
|
|
|
|
```bash
|
|
pytest tests/test_switcher.py -v
|
|
```
|
|
Expected: all 7 PASS
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add core/switcher.py tests/test_switcher.py
|
|
git commit -m "feat: MonitorSwitcher with DDC/CI apply_profile"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 4: DeviceWatcher (WMI BT events)
|
|
|
|
**Files:**
|
|
- Create: `core/device_watcher.py`
|
|
- Create: `tests/test_device_watcher.py`
|
|
|
|
- [ ] **Step 1: Write failing tests**
|
|
|
|
`tests/test_device_watcher.py`:
|
|
```python
|
|
import time
|
|
import threading
|
|
from unittest.mock import MagicMock, patch
|
|
import pytest
|
|
from config.config_manager import ConfigManager
|
|
from core.device_watcher import DeviceWatcher
|
|
|
|
|
|
@pytest.fixture
|
|
def config(tmp_path):
|
|
return ConfigManager(tmp_path / "config.json")
|
|
|
|
|
|
@pytest.fixture
|
|
def switcher():
|
|
sw = MagicMock()
|
|
return sw
|
|
|
|
|
|
def test_handle_event_connect_triggers_correct_profile(config, switcher):
|
|
watcher = DeviceWatcher(config, switcher)
|
|
watcher._handle_event(
|
|
"HID\\{...}_DEV_VID&0203F0_PID&6343_REV&0413_AABBCCDD", "connect"
|
|
)
|
|
time.sleep(0.6) # wait for debounce (500ms)
|
|
switcher.apply_profile.assert_called_once_with("pc1_dp")
|
|
|
|
|
|
def test_handle_event_disconnect_triggers_correct_profile(config, switcher):
|
|
watcher = DeviceWatcher(config, switcher)
|
|
watcher._handle_event(
|
|
"HID\\{...}_DEV_VID&0203F0_PID&6343_REV&0413_AABBCCDD", "disconnect"
|
|
)
|
|
time.sleep(0.6)
|
|
switcher.apply_profile.assert_called_once_with("pc2_hdmi")
|
|
|
|
|
|
def test_handle_event_ignores_unknown_device(config, switcher):
|
|
watcher = DeviceWatcher(config, switcher)
|
|
watcher._handle_event("HID\\VID&DEAD_PID&BEEF", "connect")
|
|
time.sleep(0.6)
|
|
switcher.apply_profile.assert_not_called()
|
|
|
|
|
|
def test_handle_event_ignores_disabled_trigger(config, switcher):
|
|
cfg = config.get()
|
|
cfg["device_triggers"][0]["enabled"] = False
|
|
config.update(cfg)
|
|
|
|
watcher = DeviceWatcher(config, switcher)
|
|
watcher._handle_event(
|
|
"HID\\{...}_DEV_VID&0203F0_PID&6343_REV&0413", "connect"
|
|
)
|
|
time.sleep(0.6)
|
|
switcher.apply_profile.assert_not_called()
|
|
|
|
|
|
def test_debounce_collapses_rapid_events(config, switcher):
|
|
watcher = DeviceWatcher(config, switcher)
|
|
device_id = "HID\\{...}_DEV_VID&0203F0_PID&6343_REV&0413"
|
|
# Fire 5 connect events in quick succession
|
|
for _ in range(5):
|
|
watcher._handle_event(device_id, "connect")
|
|
time.sleep(0.8)
|
|
# Should fire only once
|
|
assert switcher.apply_profile.call_count == 1
|
|
```
|
|
|
|
- [ ] **Step 2: Run tests to confirm they fail**
|
|
|
|
```bash
|
|
pytest tests/test_device_watcher.py -v
|
|
```
|
|
Expected: FAIL with `ImportError`
|
|
|
|
- [ ] **Step 3: Implement `core/device_watcher.py`**
|
|
|
|
```python
|
|
import logging
|
|
import threading
|
|
import pythoncom
|
|
import wmi
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class DeviceWatcher:
|
|
def __init__(self, config_manager, switcher):
|
|
self._config_manager = config_manager
|
|
self._switcher = switcher
|
|
self._stop_event = threading.Event()
|
|
self._debounce_timers: dict[str, threading.Timer] = {}
|
|
self._debounce_lock = threading.Lock()
|
|
self._thread: threading.Thread | None = None
|
|
|
|
def start(self) -> None:
|
|
self._stop_event.clear()
|
|
self._thread = threading.Thread(target=self._watch_loop, daemon=True, name="DeviceWatcher")
|
|
self._thread.start()
|
|
logger.info("DeviceWatcher started")
|
|
|
|
def stop(self) -> None:
|
|
self._stop_event.set()
|
|
|
|
def _watch_loop(self) -> None:
|
|
retries = 0
|
|
max_retries = 3
|
|
while not self._stop_event.is_set() and retries <= max_retries:
|
|
try:
|
|
self._run_watcher()
|
|
retries = 0 # reset on clean exit
|
|
except Exception as exc:
|
|
retries += 1
|
|
logger.error("WMI watcher error (attempt %d/%d): %s", retries, max_retries, exc)
|
|
self._stop_event.wait(5)
|
|
logger.warning("DeviceWatcher stopped after %d retries", retries)
|
|
|
|
def _run_watcher(self) -> None:
|
|
pythoncom.CoInitialize()
|
|
try:
|
|
c = wmi.WMI()
|
|
debounce_s = self._config_manager.get().get("general", {}).get("debounce_ms", 500) / 1000
|
|
|
|
watcher_create = c.watch_for(
|
|
notification_type="Creation",
|
|
wmi_class="Win32_PnPEntity",
|
|
delay_secs=1,
|
|
)
|
|
watcher_delete = c.watch_for(
|
|
notification_type="Deletion",
|
|
wmi_class="Win32_PnPEntity",
|
|
delay_secs=1,
|
|
)
|
|
|
|
t_create = threading.Thread(
|
|
target=self._watcher_thread,
|
|
args=(watcher_create, "connect"),
|
|
daemon=True,
|
|
)
|
|
t_delete = threading.Thread(
|
|
target=self._watcher_thread,
|
|
args=(watcher_delete, "disconnect"),
|
|
daemon=True,
|
|
)
|
|
t_create.start()
|
|
t_delete.start()
|
|
t_create.join()
|
|
t_delete.join()
|
|
finally:
|
|
pythoncom.CoUninitialize()
|
|
|
|
def _watcher_thread(self, watcher, event_type: str) -> None:
|
|
while not self._stop_event.is_set():
|
|
try:
|
|
event = watcher(timeout_ms=1000)
|
|
if event:
|
|
self._handle_event(event.DeviceID or "", event_type)
|
|
except wmi.x_wmi_timed_out:
|
|
pass
|
|
except Exception as exc:
|
|
logger.error("Watcher thread (%s) error: %s", event_type, exc)
|
|
break
|
|
|
|
def _handle_event(self, device_id: str, event_type: str) -> None:
|
|
config = self._config_manager.get()
|
|
for trigger in config.get("device_triggers", []):
|
|
if not trigger.get("enabled", True):
|
|
continue
|
|
if trigger["bt_device_id"] in device_id:
|
|
profile_id = trigger["on_connect"] if event_type == "connect" else trigger["on_disconnect"]
|
|
logger.info(
|
|
"Device %s event '%s' matched trigger '%s' → profile '%s'",
|
|
device_id, event_type, trigger["name"], profile_id,
|
|
)
|
|
self._schedule_switch(device_id + event_type, profile_id)
|
|
return
|
|
|
|
def _schedule_switch(self, key: str, profile_id: str) -> None:
|
|
debounce_s = self._config_manager.get().get("general", {}).get("debounce_ms", 500) / 1000.0
|
|
with self._debounce_lock:
|
|
existing = self._debounce_timers.get(key)
|
|
if existing:
|
|
existing.cancel()
|
|
timer = threading.Timer(debounce_s, self._switcher.apply_profile, args=[profile_id])
|
|
self._debounce_timers[key] = timer
|
|
timer.start()
|
|
```
|
|
|
|
- [ ] **Step 4: Run tests to verify they pass**
|
|
|
|
```bash
|
|
pytest tests/test_device_watcher.py -v
|
|
```
|
|
Expected: all 5 PASS
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add core/device_watcher.py tests/test_device_watcher.py
|
|
git commit -m "feat: DeviceWatcher with WMI BT event monitoring and debounce"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 5: HotkeyManager
|
|
|
|
**Files:**
|
|
- Create: `core/hotkey_manager.py`
|
|
- Create: `tests/test_hotkey_manager.py`
|
|
|
|
- [ ] **Step 1: Write failing tests**
|
|
|
|
`tests/test_hotkey_manager.py`:
|
|
```python
|
|
from unittest.mock import MagicMock, patch, call
|
|
import pytest
|
|
from config.config_manager import ConfigManager
|
|
from core.hotkey_manager import HotkeyManager
|
|
|
|
|
|
@pytest.fixture
|
|
def config(tmp_path):
|
|
return ConfigManager(tmp_path / "config.json")
|
|
|
|
|
|
@pytest.fixture
|
|
def switcher():
|
|
return MagicMock()
|
|
|
|
|
|
def test_register_all_registers_profile_hotkeys(config, switcher):
|
|
with patch("core.hotkey_manager.keyboard") as mock_kb:
|
|
hm = HotkeyManager(config, switcher)
|
|
hm.register_all()
|
|
assert mock_kb.add_hotkey.call_count == 2
|
|
calls = [c.args[0] for c in mock_kb.add_hotkey.call_args_list]
|
|
assert "ctrl+alt+1" in calls
|
|
assert "ctrl+alt+2" in calls
|
|
|
|
|
|
def test_register_all_skips_empty_hotkeys(config, switcher):
|
|
cfg = config.get()
|
|
cfg["profiles"][0]["hotkey"] = ""
|
|
config.update(cfg)
|
|
|
|
with patch("core.hotkey_manager.keyboard") as mock_kb:
|
|
hm = HotkeyManager(config, switcher)
|
|
hm.register_all()
|
|
assert mock_kb.add_hotkey.call_count == 1
|
|
|
|
|
|
def test_unregister_all_removes_hotkeys(config, switcher):
|
|
with patch("core.hotkey_manager.keyboard") as mock_kb:
|
|
hm = HotkeyManager(config, switcher)
|
|
hm.register_all()
|
|
hm.unregister_all()
|
|
assert mock_kb.remove_hotkey.call_count == 2
|
|
|
|
|
|
def test_register_all_clears_old_before_registering(config, switcher):
|
|
with patch("core.hotkey_manager.keyboard") as mock_kb:
|
|
hm = HotkeyManager(config, switcher)
|
|
hm.register_all()
|
|
hm.register_all() # second call
|
|
# remove_hotkey called once (before second register)
|
|
assert mock_kb.remove_hotkey.call_count == 2
|
|
|
|
|
|
def test_hotkey_calls_apply_profile(config, switcher):
|
|
captured_callbacks = {}
|
|
|
|
def fake_add_hotkey(hotkey, fn, args):
|
|
captured_callbacks[hotkey] = (fn, args)
|
|
|
|
with patch("core.hotkey_manager.keyboard") as mock_kb:
|
|
mock_kb.add_hotkey.side_effect = fake_add_hotkey
|
|
hm = HotkeyManager(config, switcher)
|
|
hm.register_all()
|
|
|
|
fn, args = captured_callbacks["ctrl+alt+1"]
|
|
fn(*args)
|
|
switcher.apply_profile.assert_called_once_with("pc1_dp")
|
|
```
|
|
|
|
- [ ] **Step 2: Run tests to confirm they fail**
|
|
|
|
```bash
|
|
pytest tests/test_hotkey_manager.py -v
|
|
```
|
|
Expected: FAIL with `ImportError`
|
|
|
|
- [ ] **Step 3: Implement `core/hotkey_manager.py`**
|
|
|
|
```python
|
|
import logging
|
|
import keyboard
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class HotkeyManager:
|
|
def __init__(self, config_manager, switcher):
|
|
self._config_manager = config_manager
|
|
self._switcher = switcher
|
|
self._registered: list[str] = []
|
|
|
|
def register_all(self) -> None:
|
|
self.unregister_all()
|
|
config = self._config_manager.get()
|
|
for profile in config.get("profiles", []):
|
|
hotkey = profile.get("hotkey", "").strip()
|
|
if not hotkey:
|
|
continue
|
|
profile_id = profile["id"]
|
|
try:
|
|
keyboard.add_hotkey(hotkey, self._switcher.apply_profile, args=[profile_id])
|
|
self._registered.append(hotkey)
|
|
logger.info("Hotkey '%s' → profile '%s'", hotkey, profile_id)
|
|
except Exception as exc:
|
|
logger.warning("Failed to register hotkey '%s': %s", hotkey, exc)
|
|
|
|
def unregister_all(self) -> None:
|
|
for hotkey in self._registered:
|
|
try:
|
|
keyboard.remove_hotkey(hotkey)
|
|
except Exception:
|
|
pass
|
|
self._registered.clear()
|
|
```
|
|
|
|
- [ ] **Step 4: Run tests to verify they pass**
|
|
|
|
```bash
|
|
pytest tests/test_hotkey_manager.py -v
|
|
```
|
|
Expected: all 5 PASS
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add core/hotkey_manager.py tests/test_hotkey_manager.py
|
|
git commit -m "feat: HotkeyManager with global hotkey registration"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 6: Tray icon + main.py
|
|
|
|
**Files:**
|
|
- Create: `ui/tray.py`
|
|
- Create: `main.py`
|
|
|
|
- [ ] **Step 1: Implement `ui/tray.py`**
|
|
|
|
```python
|
|
import queue
|
|
import threading
|
|
import logging
|
|
import pystray
|
|
from PIL import Image, ImageDraw, ImageFont
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _make_icon(label: str, color: str) -> Image.Image:
|
|
img = Image.new("RGBA", (64, 64), (0, 0, 0, 0))
|
|
draw = ImageDraw.Draw(img)
|
|
draw.ellipse([2, 2, 62, 62], fill=color)
|
|
text = label[:2].upper()
|
|
# Use default font — no external font needed
|
|
bbox = draw.textbbox((0, 0), text)
|
|
tw = bbox[2] - bbox[0]
|
|
th = bbox[3] - bbox[1]
|
|
draw.text(((64 - tw) // 2, (64 - th) // 2), text, fill="white")
|
|
return img
|
|
|
|
|
|
class TrayApp:
|
|
# Colors per profile type
|
|
_COLORS = {"dp": "#1565C0", "hdmi": "#B71C1C"}
|
|
|
|
def __init__(self, config_manager, switcher, open_settings_queue: queue.Queue):
|
|
self._config_manager = config_manager
|
|
self._switcher = switcher
|
|
self._settings_queue = open_settings_queue
|
|
self._icon: pystray.Icon | None = None
|
|
switcher.on_switch(self._on_profile_switched)
|
|
|
|
def _color_for(self, profile_id: str) -> str:
|
|
return self._COLORS["dp"] if "dp" in profile_id else self._COLORS["hdmi"]
|
|
|
|
def _build_menu(self) -> pystray.Menu:
|
|
config = self._config_manager.get()
|
|
items = []
|
|
for profile in config.get("profiles", []):
|
|
pid = profile["id"]
|
|
name = profile["name"]
|
|
items.append(
|
|
pystray.MenuItem(
|
|
name,
|
|
lambda _, p=pid: self._switcher.apply_profile(p),
|
|
checked=lambda item, p=pid: self._config_manager.get_active_profile_id() == p,
|
|
radio=True,
|
|
)
|
|
)
|
|
items.append(pystray.Menu.SEPARATOR)
|
|
items.append(pystray.MenuItem("Ustawienia", self._request_settings))
|
|
items.append(pystray.MenuItem("Wyjście", self._exit))
|
|
return pystray.Menu(*items)
|
|
|
|
def _on_profile_switched(self, profile_id: str) -> None:
|
|
if self._icon is None:
|
|
return
|
|
profile = self._config_manager.get_profile(profile_id)
|
|
label = profile["name"][:2] if profile else "??"
|
|
title = profile["name"] if profile else profile_id
|
|
self._icon.icon = _make_icon(label, self._color_for(profile_id))
|
|
self._icon.title = f"MonitorSwitcher — {title}"
|
|
self._icon.update_menu()
|
|
|
|
def _request_settings(self, icon=None, item=None) -> None:
|
|
self._settings_queue.put("open_settings")
|
|
|
|
def _exit(self, icon=None, item=None) -> None:
|
|
self._settings_queue.put("exit")
|
|
if self._icon:
|
|
self._icon.stop()
|
|
|
|
def run_tray(self) -> None:
|
|
active_id = self._config_manager.get_active_profile_id()
|
|
profile = self._config_manager.get_profile(active_id)
|
|
label = profile["name"][:2] if profile else "MS"
|
|
title = f"MonitorSwitcher — {profile['name']}" if profile else "MonitorSwitcher"
|
|
|
|
self._icon = pystray.Icon(
|
|
"MonitorSwitcher",
|
|
_make_icon(label, self._color_for(active_id)),
|
|
title=title,
|
|
menu=self._build_menu(),
|
|
)
|
|
self._icon.run()
|
|
```
|
|
|
|
- [ ] **Step 2: Implement `main.py`**
|
|
|
|
```python
|
|
import sys
|
|
import queue
|
|
import threading
|
|
import logging
|
|
|
|
from PyQt6.QtWidgets import QApplication
|
|
from PyQt6.QtCore import QTimer
|
|
|
|
from config.config_manager import ConfigManager
|
|
from core.switcher import MonitorSwitcher
|
|
from core.device_watcher import DeviceWatcher
|
|
from core.hotkey_manager import HotkeyManager
|
|
from ui.tray import TrayApp
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(name)s %(levelname)s %(message)s",
|
|
)
|
|
|
|
|
|
def main() -> None:
|
|
app = QApplication(sys.argv)
|
|
app.setQuitOnLastWindowClosed(False)
|
|
|
|
config = ConfigManager()
|
|
switcher = MonitorSwitcher(config)
|
|
watcher = DeviceWatcher(config, switcher)
|
|
hotkeys = HotkeyManager(config, switcher)
|
|
|
|
settings_queue: queue.Queue = queue.Queue()
|
|
tray = TrayApp(config, switcher, settings_queue)
|
|
|
|
watcher.start()
|
|
hotkeys.register_all()
|
|
|
|
tray_thread = threading.Thread(target=tray.run_tray, daemon=True, name="TrayThread")
|
|
tray_thread.start()
|
|
|
|
settings_window = None
|
|
|
|
def process_queue() -> None:
|
|
nonlocal settings_window
|
|
while not settings_queue.empty():
|
|
cmd = settings_queue.get_nowait()
|
|
if cmd == "open_settings":
|
|
from ui.settings_window import SettingsWindow
|
|
if settings_window is None or not settings_window.isVisible():
|
|
settings_window = SettingsWindow(config, switcher, hotkeys)
|
|
settings_window.show()
|
|
settings_window.raise_()
|
|
settings_window.activateWindow()
|
|
elif cmd == "exit":
|
|
hotkeys.unregister_all()
|
|
watcher.stop()
|
|
app.quit()
|
|
|
|
timer = QTimer()
|
|
timer.timeout.connect(process_queue)
|
|
timer.start(100) # poll queue every 100ms
|
|
|
|
sys.exit(app.exec())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
```
|
|
|
|
- [ ] **Step 3: Smoke test — run the app**
|
|
|
|
```bash
|
|
python main.py
|
|
```
|
|
Expected: tray icon appears in system tray, right-click shows menu with profiles + Ustawienia + Wyjście. No errors in console.
|
|
|
|
- [ ] **Step 4: Commit**
|
|
|
|
```bash
|
|
git add ui/tray.py main.py
|
|
git commit -m "feat: tray icon + main orchestration"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 7: Settings Window — shell + Profiles tab
|
|
|
|
**Files:**
|
|
- Create: `ui/settings_window.py`
|
|
|
|
- [ ] **Step 1: Implement settings window shell + Profiles tab**
|
|
|
|
`ui/settings_window.py`:
|
|
```python
|
|
import copy
|
|
import logging
|
|
from PyQt6.QtWidgets import (
|
|
QMainWindow, QWidget, QTabWidget, QVBoxLayout, QHBoxLayout,
|
|
QPushButton, QListWidget, QListWidgetItem, QLabel, QLineEdit,
|
|
QFormLayout, QSpinBox, QCheckBox, QComboBox, QGroupBox,
|
|
QMessageBox, QDialog, QDialogButtonBox,
|
|
)
|
|
from PyQt6.QtCore import Qt
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
VCP_INPUT_LABELS = {
|
|
15: "DisplayPort-1 (0x0F)",
|
|
16: "DisplayPort-2 (0x10)",
|
|
17: "HDMI-1 (0x11)",
|
|
18: "HDMI-2 (0x12)",
|
|
1: "VGA-1 (0x01)",
|
|
}
|
|
|
|
|
|
class SettingsWindow(QMainWindow):
|
|
def __init__(self, config_manager, switcher, hotkey_manager):
|
|
super().__init__()
|
|
self._config_manager = config_manager
|
|
self._switcher = switcher
|
|
self._hotkey_manager = hotkey_manager
|
|
self._working_config = copy.deepcopy(config_manager.get())
|
|
|
|
self.setWindowTitle("MonitorSwitcher — Ustawienia")
|
|
self.setMinimumSize(600, 500)
|
|
|
|
tabs = QTabWidget()
|
|
tabs.addTab(self._build_profiles_tab(), "Profile")
|
|
tabs.addTab(self._build_devices_tab(), "Urządzenia")
|
|
tabs.addTab(self._build_hotkeys_tab(), "Hotkeys")
|
|
tabs.addTab(self._build_general_tab(), "Ogólne")
|
|
|
|
save_btn = QPushButton("Zapisz i zamknij")
|
|
save_btn.clicked.connect(self._save_and_close)
|
|
cancel_btn = QPushButton("Anuluj")
|
|
cancel_btn.clicked.connect(self.close)
|
|
|
|
btn_row = QHBoxLayout()
|
|
btn_row.addStretch()
|
|
btn_row.addWidget(cancel_btn)
|
|
btn_row.addWidget(save_btn)
|
|
|
|
layout = QVBoxLayout()
|
|
layout.addWidget(tabs)
|
|
layout.addLayout(btn_row)
|
|
|
|
central = QWidget()
|
|
central.setLayout(layout)
|
|
self.setCentralWidget(central)
|
|
|
|
# ── Profiles tab ──────────────────────────────────────────────────────────
|
|
|
|
def _build_profiles_tab(self) -> QWidget:
|
|
widget = QWidget()
|
|
layout = QHBoxLayout(widget)
|
|
|
|
# Left: profile list
|
|
left = QVBoxLayout()
|
|
self._profile_list = QListWidget()
|
|
self._profile_list.currentRowChanged.connect(self._on_profile_selected)
|
|
left.addWidget(QLabel("Profile:"))
|
|
left.addWidget(self._profile_list)
|
|
|
|
add_btn = QPushButton("+ Dodaj")
|
|
add_btn.clicked.connect(self._add_profile)
|
|
del_btn = QPushButton("Usuń")
|
|
del_btn.clicked.connect(self._delete_profile)
|
|
btn_row = QHBoxLayout()
|
|
btn_row.addWidget(add_btn)
|
|
btn_row.addWidget(del_btn)
|
|
left.addLayout(btn_row)
|
|
|
|
# Right: profile editor
|
|
right = QVBoxLayout()
|
|
self._profile_name_edit = QLineEdit()
|
|
self._profile_name_edit.setPlaceholderText("Nazwa profilu")
|
|
self._profile_name_edit.textChanged.connect(self._on_profile_name_changed)
|
|
right.addWidget(QLabel("Nazwa:"))
|
|
right.addWidget(self._profile_name_edit)
|
|
|
|
self._monitor_combos: list[QComboBox] = []
|
|
self._monitor_group = QGroupBox("Wejścia monitorów")
|
|
monitor_layout = QFormLayout()
|
|
for i in range(2):
|
|
combo = QComboBox()
|
|
for vcp, label in VCP_INPUT_LABELS.items():
|
|
combo.addItem(label, vcp)
|
|
combo.currentIndexChanged.connect(lambda _, idx=i: self._on_monitor_input_changed(idx))
|
|
self._monitor_combos.append(combo)
|
|
monitor_layout.addRow(f"Monitor {i}:", combo)
|
|
self._monitor_group.setLayout(monitor_layout)
|
|
right.addWidget(self._monitor_group)
|
|
right.addStretch()
|
|
|
|
layout.addLayout(left, 1)
|
|
layout.addLayout(right, 2)
|
|
|
|
self._refresh_profile_list()
|
|
return widget
|
|
|
|
def _refresh_profile_list(self) -> None:
|
|
self._profile_list.clear()
|
|
for p in self._working_config.get("profiles", []):
|
|
self._profile_list.addItem(QListWidgetItem(p["name"]))
|
|
if self._profile_list.count() > 0:
|
|
self._profile_list.setCurrentRow(0)
|
|
|
|
def _on_profile_selected(self, row: int) -> None:
|
|
profiles = self._working_config.get("profiles", [])
|
|
if row < 0 or row >= len(profiles):
|
|
return
|
|
profile = profiles[row]
|
|
self._profile_name_edit.blockSignals(True)
|
|
self._profile_name_edit.setText(profile["name"])
|
|
self._profile_name_edit.blockSignals(False)
|
|
for combo_widget in self._monitor_combos:
|
|
combo_widget.blockSignals(True)
|
|
for inp in profile.get("monitor_inputs", []):
|
|
idx = inp["monitor_index"]
|
|
vcp = inp["vcp_value"]
|
|
if idx < len(self._monitor_combos):
|
|
combo = self._monitor_combos[idx]
|
|
data_idx = combo.findData(vcp)
|
|
combo.setCurrentIndex(data_idx if data_idx >= 0 else 0)
|
|
for combo_widget in self._monitor_combos:
|
|
combo_widget.blockSignals(False)
|
|
|
|
def _on_profile_name_changed(self, text: str) -> None:
|
|
row = self._profile_list.currentRow()
|
|
profiles = self._working_config.get("profiles", [])
|
|
if 0 <= row < len(profiles):
|
|
profiles[row]["name"] = text
|
|
self._profile_list.item(row).setText(text)
|
|
|
|
def _on_monitor_input_changed(self, monitor_idx: int) -> None:
|
|
row = self._profile_list.currentRow()
|
|
profiles = self._working_config.get("profiles", [])
|
|
if row < 0 or row >= len(profiles):
|
|
return
|
|
vcp = self._monitor_combos[monitor_idx].currentData()
|
|
inputs = profiles[row].get("monitor_inputs", [])
|
|
for inp in inputs:
|
|
if inp["monitor_index"] == monitor_idx:
|
|
inp["vcp_value"] = vcp
|
|
return
|
|
inputs.append({"monitor_index": monitor_idx, "vcp_value": vcp})
|
|
|
|
def _add_profile(self) -> None:
|
|
new_profile = {
|
|
"id": f"profile_{len(self._working_config['profiles'])}",
|
|
"name": "Nowy profil",
|
|
"hotkey": "",
|
|
"monitor_inputs": [
|
|
{"monitor_index": 0, "vcp_value": 15},
|
|
{"monitor_index": 1, "vcp_value": 15},
|
|
],
|
|
}
|
|
self._working_config["profiles"].append(new_profile)
|
|
self._profile_list.addItem(QListWidgetItem(new_profile["name"]))
|
|
self._profile_list.setCurrentRow(self._profile_list.count() - 1)
|
|
|
|
def _delete_profile(self) -> None:
|
|
row = self._profile_list.currentRow()
|
|
if row < 0:
|
|
return
|
|
self._working_config["profiles"].pop(row)
|
|
self._refresh_profile_list()
|
|
|
|
# ── Devices tab ───────────────────────────────────────────────────────────
|
|
|
|
def _build_devices_tab(self) -> QWidget:
|
|
widget = QWidget()
|
|
layout = QVBoxLayout(widget)
|
|
layout.addWidget(QLabel("Urządzenia BT wyzwalające przełączenie profilu:"))
|
|
|
|
self._device_rows: list[dict] = []
|
|
self._devices_container = QVBoxLayout()
|
|
layout.addLayout(self._devices_container)
|
|
|
|
add_btn = QPushButton("+ Dodaj urządzenie")
|
|
add_btn.clicked.connect(self._add_device_row)
|
|
layout.addWidget(add_btn)
|
|
layout.addStretch()
|
|
|
|
for trigger in self._working_config.get("device_triggers", []):
|
|
self._add_device_row(trigger)
|
|
|
|
return widget
|
|
|
|
def _add_device_row(self, trigger: dict | None = None) -> None:
|
|
if trigger is None:
|
|
trigger = {
|
|
"name": "Nowe urządzenie",
|
|
"bt_device_id": "",
|
|
"on_connect": self._first_profile_id(),
|
|
"on_disconnect": self._first_profile_id(),
|
|
"enabled": True,
|
|
}
|
|
self._working_config.setdefault("device_triggers", []).append(trigger)
|
|
|
|
group = QGroupBox()
|
|
form = QFormLayout(group)
|
|
|
|
name_edit = QLineEdit(trigger.get("name", ""))
|
|
bt_id_edit = QLineEdit(trigger.get("bt_device_id", ""))
|
|
connect_combo = self._profile_combo(trigger.get("on_connect", ""))
|
|
disconnect_combo = self._profile_combo(trigger.get("on_disconnect", ""))
|
|
enabled_cb = QCheckBox()
|
|
enabled_cb.setChecked(trigger.get("enabled", True))
|
|
|
|
form.addRow("Nazwa:", name_edit)
|
|
form.addRow("BT Device ID (fragment):", bt_id_edit)
|
|
form.addRow("Przy podłączeniu:", connect_combo)
|
|
form.addRow("Przy odłączeniu:", disconnect_combo)
|
|
form.addRow("Aktywne:", enabled_cb)
|
|
|
|
row_ref = {"trigger": trigger, "group": group}
|
|
self._device_rows.append(row_ref)
|
|
|
|
def update_trigger():
|
|
trigger["name"] = name_edit.text()
|
|
trigger["bt_device_id"] = bt_id_edit.text()
|
|
trigger["on_connect"] = connect_combo.currentData()
|
|
trigger["on_disconnect"] = disconnect_combo.currentData()
|
|
trigger["enabled"] = enabled_cb.isChecked()
|
|
|
|
name_edit.textChanged.connect(lambda _: update_trigger())
|
|
bt_id_edit.textChanged.connect(lambda _: update_trigger())
|
|
connect_combo.currentIndexChanged.connect(lambda _: update_trigger())
|
|
disconnect_combo.currentIndexChanged.connect(lambda _: update_trigger())
|
|
enabled_cb.stateChanged.connect(lambda _: update_trigger())
|
|
|
|
self._devices_container.addWidget(group)
|
|
|
|
def _profile_combo(self, selected_id: str) -> QComboBox:
|
|
combo = QComboBox()
|
|
for p in self._working_config.get("profiles", []):
|
|
combo.addItem(p["name"], p["id"])
|
|
idx = combo.findData(selected_id)
|
|
if idx >= 0:
|
|
combo.setCurrentIndex(idx)
|
|
return combo
|
|
|
|
def _first_profile_id(self) -> str:
|
|
profiles = self._working_config.get("profiles", [])
|
|
return profiles[0]["id"] if profiles else ""
|
|
|
|
# ── Hotkeys tab ───────────────────────────────────────────────────────────
|
|
|
|
def _build_hotkeys_tab(self) -> QWidget:
|
|
widget = QWidget()
|
|
layout = QVBoxLayout(widget)
|
|
layout.addWidget(QLabel("Przypisz skróty klawiszowe do profili:"))
|
|
layout.addWidget(QLabel("Format: ctrl+alt+1, win+F1, itp."))
|
|
|
|
self._hotkey_edits: dict[str, QLineEdit] = {}
|
|
form = QFormLayout()
|
|
for profile in self._working_config.get("profiles", []):
|
|
edit = QLineEdit(profile.get("hotkey", ""))
|
|
edit.setPlaceholderText("np. ctrl+alt+1")
|
|
pid = profile["id"]
|
|
edit.textChanged.connect(lambda text, p=pid: self._on_hotkey_changed(p, text))
|
|
self._hotkey_edits[pid] = edit
|
|
form.addRow(f"{profile['name']}:", edit)
|
|
|
|
layout.addLayout(form)
|
|
layout.addStretch()
|
|
return widget
|
|
|
|
def _on_hotkey_changed(self, profile_id: str, text: str) -> None:
|
|
for p in self._working_config.get("profiles", []):
|
|
if p["id"] == profile_id:
|
|
p["hotkey"] = text.strip()
|
|
return
|
|
|
|
# ── General tab ───────────────────────────────────────────────────────────
|
|
|
|
def _build_general_tab(self) -> QWidget:
|
|
widget = QWidget()
|
|
layout = QFormLayout(widget)
|
|
|
|
self._autostart_cb = QCheckBox()
|
|
self._autostart_cb.setChecked(
|
|
self._working_config.get("general", {}).get("autostart", True)
|
|
)
|
|
self._minimize_cb = QCheckBox()
|
|
self._minimize_cb.setChecked(
|
|
self._working_config.get("general", {}).get("minimize_to_tray", True)
|
|
)
|
|
self._debounce_spin = QSpinBox()
|
|
self._debounce_spin.setRange(100, 5000)
|
|
self._debounce_spin.setSuffix(" ms")
|
|
self._debounce_spin.setValue(
|
|
self._working_config.get("general", {}).get("debounce_ms", 500)
|
|
)
|
|
|
|
layout.addRow("Autostart z Windows:", self._autostart_cb)
|
|
layout.addRow("Minimalizuj do tray przy zamknięciu:", self._minimize_cb)
|
|
layout.addRow("Debounce (opóźnienie przełączenia):", self._debounce_spin)
|
|
|
|
return widget
|
|
|
|
# ── Save ──────────────────────────────────────────────────────────────────
|
|
|
|
def _save_and_close(self) -> None:
|
|
general = self._working_config.setdefault("general", {})
|
|
general["autostart"] = self._autostart_cb.isChecked()
|
|
general["minimize_to_tray"] = self._minimize_cb.isChecked()
|
|
general["debounce_ms"] = self._debounce_spin.value()
|
|
|
|
self._config_manager.update(self._working_config)
|
|
self._hotkey_manager.unregister_all()
|
|
self._hotkey_manager.register_all()
|
|
|
|
if general.get("autostart"):
|
|
_set_autostart(True)
|
|
else:
|
|
_set_autostart(False)
|
|
|
|
self.close()
|
|
|
|
def closeEvent(self, event):
|
|
if self._config_manager.get().get("general", {}).get("minimize_to_tray", True):
|
|
event.accept()
|
|
else:
|
|
event.accept()
|
|
|
|
|
|
# ── Autostart helper ──────────────────────────────────────────────────────────
|
|
|
|
def _set_autostart(enable: bool) -> None:
|
|
import sys
|
|
import winreg
|
|
key_path = r"Software\Microsoft\Windows\CurrentVersion\Run"
|
|
app_name = "MonitorSwitcher"
|
|
exe_path = sys.executable if not getattr(sys, "frozen", False) else sys.executable
|
|
try:
|
|
with winreg.OpenKey(winreg.HKEY_CURRENT_USER, key_path, 0, winreg.KEY_SET_VALUE) as key:
|
|
if enable:
|
|
winreg.SetValueEx(key, app_name, 0, winreg.REG_SZ, f'"{exe_path}"')
|
|
else:
|
|
try:
|
|
winreg.DeleteValue(key, app_name)
|
|
except FileNotFoundError:
|
|
pass
|
|
except Exception as exc:
|
|
logger.warning("Autostart registry error: %s", exc)
|
|
```
|
|
|
|
- [ ] **Step 2: Smoke test — open settings window**
|
|
|
|
```bash
|
|
python main.py
|
|
```
|
|
Right-click tray icon → "Ustawienia". Expected: window opens with 4 tabs, Profile tab shows 2 profiles (PC1 — DP, PC2 — HDMI) with monitor dropdowns.
|
|
|
|
- [ ] **Step 3: Commit**
|
|
|
|
```bash
|
|
git add ui/settings_window.py
|
|
git commit -m "feat: settings window with Profiles/Devices/Hotkeys/General tabs"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 8: Run all tests + fix VCP value detection
|
|
|
|
**Files:**
|
|
- Modify: `core/switcher.py` — verify `list_monitors()` works live
|
|
|
|
- [ ] **Step 1: Run full test suite**
|
|
|
|
```bash
|
|
pytest tests/ -v
|
|
```
|
|
Expected: all tests PASS (20 total across 4 test files)
|
|
|
|
- [ ] **Step 2: Detect actual VCP input values from monitors**
|
|
|
|
Run this one-off script to confirm which VCP values the L27qe monitors use for DP and HDMI:
|
|
|
|
```python
|
|
# run_once: detect_vcp.py
|
|
from monitorcontrol import get_monitors
|
|
|
|
for i, handle in enumerate(get_monitors()):
|
|
with handle as monitor:
|
|
try:
|
|
current = monitor.get_vcp_feature(0x60)
|
|
print(f"Monitor {i}: current input VCP = {current.value} (0x{current.value:02X})")
|
|
except Exception as e:
|
|
print(f"Monitor {i}: error — {e}")
|
|
```
|
|
|
|
Run:
|
|
```bash
|
|
python detect_vcp.py
|
|
```
|
|
Expected output (typical for L27qe):
|
|
```
|
|
Monitor 0: current input VCP = 15 (0x0F) ← DP active
|
|
Monitor 1: current input VCP = 15 (0x0F) ← DP active
|
|
```
|
|
If values differ from 15/17, update `config/config.json` defaults accordingly.
|
|
|
|
- [ ] **Step 3: Create `detect_vcp.py` as a helper tool**
|
|
|
|
```python
|
|
# detect_vcp.py — helper to find VCP input values on this machine
|
|
from monitorcontrol import get_monitors
|
|
|
|
print("Scanning monitors for current input VCP value (0x60)...")
|
|
for i, handle in enumerate(get_monitors()):
|
|
with handle as monitor:
|
|
try:
|
|
current = monitor.get_vcp_feature(0x60)
|
|
print(f" Monitor {i}: VCP input = {current.value} (0x{current.value:02X})")
|
|
except Exception as e:
|
|
print(f" Monitor {i}: could not read — {e}")
|
|
print("Done. Use these values in config.json → profiles → monitor_inputs → vcp_value")
|
|
```
|
|
|
|
- [ ] **Step 4: Commit**
|
|
|
|
```bash
|
|
git add detect_vcp.py
|
|
git commit -m "chore: add detect_vcp.py helper, all tests passing"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 9: PyInstaller packaging
|
|
|
|
**Files:**
|
|
- Create: `MonitorSwitcher.spec`
|
|
|
|
- [ ] **Step 1: Install PyInstaller**
|
|
|
|
```bash
|
|
pip install pyinstaller
|
|
```
|
|
|
|
- [ ] **Step 2: Create `MonitorSwitcher.spec`**
|
|
|
|
```python
|
|
# MonitorSwitcher.spec
|
|
block_cipher = None
|
|
|
|
a = Analysis(
|
|
['main.py'],
|
|
pathex=['.'],
|
|
binaries=[],
|
|
datas=[('config/config.json', 'config')],
|
|
hiddenimports=['wmi', 'pythoncom', 'pywintypes', 'win32api', 'win32con', 'winreg'],
|
|
hookspath=[],
|
|
runtime_hooks=[],
|
|
excludes=[],
|
|
win_no_prefer_redirects=False,
|
|
win_private_assemblies=False,
|
|
cipher=block_cipher,
|
|
)
|
|
|
|
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
|
|
|
|
exe = EXE(
|
|
pyz,
|
|
a.scripts,
|
|
a.binaries,
|
|
a.zipfiles,
|
|
a.datas,
|
|
[],
|
|
name='MonitorSwitcher',
|
|
debug=False,
|
|
bootloader_ignore_signals=False,
|
|
strip=False,
|
|
upx=False,
|
|
upx_exclude=[],
|
|
runtime_tmpdir=None,
|
|
console=False, # no console window
|
|
icon=None, # add .ico path here if desired
|
|
)
|
|
```
|
|
|
|
- [ ] **Step 3: Build**
|
|
|
|
```bash
|
|
pyinstaller MonitorSwitcher.spec
|
|
```
|
|
Expected: `dist/MonitorSwitcher.exe` created
|
|
|
|
- [ ] **Step 4: Test the exe**
|
|
|
|
```bash
|
|
dist/MonitorSwitcher.exe
|
|
```
|
|
Expected: tray icon appears, settings window opens, no console window, no errors.
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add MonitorSwitcher.spec detect_vcp.py
|
|
git add -f dist/MonitorSwitcher.exe # optional — if distributing exe via git
|
|
git commit -m "build: PyInstaller spec, produces dist/MonitorSwitcher.exe"
|
|
```
|
|
|
|
---
|
|
|
|
## Self-Review
|
|
|
|
### Spec coverage check
|
|
|
|
| Spec requirement | Task |
|
|
|---|---|
|
|
| Switch both L27qe monitors via DDC/CI VCP 0x60 | Task 3 |
|
|
| Profile-based config (DP/HDMI inputs per monitor) | Task 2, 3 |
|
|
| WMI BT device connect/disconnect detection | Task 4 |
|
|
| Debounce rapid events | Task 4 |
|
|
| WMI watcher retry on crash (max 3, 5s backoff) | Task 4 (`_watch_loop`) |
|
|
| HP 975 keyboard trigger | Task 2 (config defaults) |
|
|
| MX Anywhere 2S trigger | Task 2 (config defaults) |
|
|
| Global hotkeys per profile | Task 5 |
|
|
| System tray icon (changes color/label per profile) | Task 6 |
|
|
| Tray menu: profiles + Settings + Exit | Task 6 |
|
|
| PyQt6 settings window with 4 tabs | Task 7 |
|
|
| Profiles tab: create/edit/delete, per-monitor input | Task 7 |
|
|
| Devices tab: BT triggers, connect/disconnect → profile | Task 7 |
|
|
| Hotkeys tab: per-profile hotkey assignment | Task 7 |
|
|
| General tab: autostart, minimize to tray, debounce | Task 7 |
|
|
| Autostart via Windows registry | Task 7 (`_set_autostart`) |
|
|
| Config falls back to defaults on corrupt file | Task 2 |
|
|
| Packaging to single .exe via PyInstaller | Task 9 |
|
|
|
|
All requirements covered. ✅
|
|
|
|
### Type consistency check
|
|
|
|
- `ConfigManager.get_profile(id)` returns `dict | None` — used in `MonitorSwitcher.apply_profile()` ✅
|
|
- `MonitorSwitcher.apply_profile(profile_id: str)` — called consistently in DeviceWatcher, HotkeyManager, TrayApp ✅
|
|
- `TrayApp` receives `queue.Queue` — `main.py` creates and passes it ✅
|
|
- `SettingsWindow` receives `config_manager, switcher, hotkey_manager` — `main.py` passes all three ✅
|