423 lines
14 KiB
Python
423 lines
14 KiB
Python
"""Chobit system tray — settings UI for the desktop companion.
|
|
|
|
All persistent state lives in Godot's AppState (user://app_state.json).
|
|
The tray reads/writes its section via UDP commands to Godot — no local
|
|
config files, no separate persistence layer.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import atexit
|
|
import json
|
|
import signal
|
|
import socket
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
from lilith_tray.base import TrayApp
|
|
from lilith_tray.types import TrayConfig, TrayIcon, TrayMenuItem
|
|
|
|
import flight_recorder as fr
|
|
|
|
|
|
GODOT_PORT = 19700
|
|
TRAY_PORT = 19701
|
|
PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
|
|
VISION_SCRIPT = PROJECT_ROOT / "services" / "vision" / "chobit_vision.py"
|
|
LOG_DIR = Path.home() / ".local" / "share" / "chobit"
|
|
VISION_LOG = LOG_DIR / "vision.log"
|
|
|
|
TRAY_DEFAULTS: dict = {
|
|
"active_camera": 0,
|
|
"camera_capture_enabled": False,
|
|
"camera_rect": None,
|
|
"behavior_settings": {
|
|
"focus_steal_cooldown": 15.0,
|
|
"gaze_duration_s": 3.0,
|
|
"gaze_margin": 50,
|
|
},
|
|
}
|
|
|
|
TRAY_ICONS_DIR = (
|
|
Path.home()
|
|
/ "Code"
|
|
/ "@packages"
|
|
/ "@tray"
|
|
/ "tray-resources"
|
|
/ "generated"
|
|
/ "circle-dot"
|
|
)
|
|
|
|
|
|
def _send_command(cmd: str, **kwargs: object) -> dict | None:
|
|
"""Send a JSON command to Godot via UDP and wait for response."""
|
|
msg = {"cmd": cmd, **kwargs}
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
sock.settimeout(1.0)
|
|
try:
|
|
sock.sendto(json.dumps(msg).encode(), ("127.0.0.1", GODOT_PORT))
|
|
data, _ = sock.recvfrom(4096)
|
|
return json.loads(data.decode())
|
|
except (OSError, json.JSONDecodeError):
|
|
return None
|
|
finally:
|
|
sock.close()
|
|
|
|
|
|
def _load_settings() -> dict:
|
|
"""Load tray settings from AppState via Godot UDP."""
|
|
result = _send_command("state_get", section="tray")
|
|
if result and not result.get("error"):
|
|
return result
|
|
return {}
|
|
|
|
|
|
def _save_settings(settings: dict) -> None:
|
|
"""Save tray settings to AppState via Godot UDP."""
|
|
_send_command("state_set", section="tray", data=settings)
|
|
|
|
|
|
def _publish_redis(event_type: str, payload: dict) -> None:
|
|
"""Publish an eventbus envelope to Redis synchronously."""
|
|
try:
|
|
import redis
|
|
|
|
r = redis.Redis()
|
|
envelope = {
|
|
"type": event_type,
|
|
"payload": payload,
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"correlationId": f"{int(datetime.now(timezone.utc).timestamp() * 1000):x}",
|
|
"source": "chobit-tray",
|
|
}
|
|
r.publish(f"eventbus:{event_type}", json.dumps(envelope))
|
|
r.close()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _enumerate_cameras() -> list[dict]:
|
|
"""Enumerate available cameras using imajin-face-tracker."""
|
|
try:
|
|
from imajin_face_tracker import enumerate_cameras
|
|
|
|
return [
|
|
{"index": cam.index, "name": cam.name, "path": cam.path}
|
|
for cam in enumerate_cameras()
|
|
]
|
|
except ImportError:
|
|
return []
|
|
|
|
|
|
class ChobitTray(TrayApp):
|
|
"""System tray for the Chobit desktop companion."""
|
|
|
|
_vision_process: subprocess.Popen | None = None
|
|
_camera_enabled: bool = False
|
|
_active_camera: int = 0
|
|
_cameras: list[dict] = []
|
|
_last_godot_status: dict | None = None
|
|
_settings_loaded: bool = False
|
|
_vision_restart_cooldown: float = 0.0
|
|
|
|
def __init__(self, dev_mode: bool = False) -> None:
|
|
icon_green = TRAY_ICONS_DIR / "green-24.png"
|
|
icon_yellow = TRAY_ICONS_DIR / "yellow-24.png"
|
|
icon_red = TRAY_ICONS_DIR / "red-24.png"
|
|
|
|
self._cameras = _enumerate_cameras()
|
|
self._apply_settings(TRAY_DEFAULTS.copy())
|
|
|
|
menu: list[TrayMenuItem] = [
|
|
TrayMenuItem.action("Settings", self._open_board),
|
|
TrayMenuItem.action("Camera Settings", self._open_camera_settings),
|
|
TrayMenuItem.separator(),
|
|
TrayMenuItem.action("Toggle Snap", self._toggle_snap),
|
|
]
|
|
if not dev_mode:
|
|
menu.append(TrayMenuItem.action("Toggle Camera", self.toggle_camera))
|
|
menu += [
|
|
TrayMenuItem.separator(),
|
|
TrayMenuItem.action("Reset Position", self._reset_position),
|
|
TrayMenuItem.action("Restart", self._restart),
|
|
TrayMenuItem.quit("Quit Chobit"),
|
|
]
|
|
|
|
config = TrayConfig(
|
|
name="Chobit",
|
|
icons={
|
|
"running": TrayIcon.from_file(icon_green),
|
|
"warning": TrayIcon.from_file(icon_yellow),
|
|
"stopped": TrayIcon.from_file(icon_red),
|
|
},
|
|
initial_icon="running",
|
|
menu=menu,
|
|
poll_interval=5,
|
|
label="Main",
|
|
)
|
|
super().__init__(config)
|
|
atexit.register(self._cleanup)
|
|
signal.signal(signal.SIGTERM, lambda *_: sys.exit(0))
|
|
self._start_command_listener()
|
|
|
|
def _start_command_listener(self) -> None:
|
|
"""Listen on TRAY_PORT for commands from Godot."""
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
try:
|
|
sock.bind(("127.0.0.1", TRAY_PORT))
|
|
except OSError:
|
|
return
|
|
self._cmd_sock = sock
|
|
|
|
def listener() -> None:
|
|
while True:
|
|
try:
|
|
data, addr = sock.recvfrom(4096)
|
|
msg = json.loads(data.decode())
|
|
self._dispatch_command(msg, addr)
|
|
except (OSError, json.JSONDecodeError):
|
|
break
|
|
|
|
t = threading.Thread(target=listener, daemon=True)
|
|
t.start()
|
|
|
|
def _dispatch_command(self, msg: dict, addr: tuple) -> None:
|
|
"""Handle a command received from Godot via UDP."""
|
|
cmd = msg.get("cmd", "")
|
|
if cmd == "select_camera":
|
|
index = msg.get("index")
|
|
if isinstance(index, int):
|
|
self._select_camera(index)
|
|
elif cmd == "list_cameras":
|
|
self._send_camera_list()
|
|
elif cmd == "set_camera_enabled":
|
|
enabled = bool(msg.get("enabled", False))
|
|
if enabled != self._camera_enabled:
|
|
self.toggle_camera()
|
|
|
|
def _send_camera_list(self) -> None:
|
|
"""Send enumerated camera list to Godot via UDP."""
|
|
cameras = [
|
|
{"index": c["index"], "name": c["name"], "path": c.get("path", "")}
|
|
for c in self._cameras
|
|
]
|
|
fr.record("tray.camera_list", "Camera list sent to Godot", {"count": len(cameras)})
|
|
msg = {"cmd": "camera_list", "cameras": cameras}
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
try:
|
|
sock.sendto(json.dumps(msg).encode(), ("127.0.0.1", GODOT_PORT))
|
|
except OSError:
|
|
pass
|
|
finally:
|
|
sock.close()
|
|
|
|
def _apply_settings(self, settings: dict) -> None:
|
|
"""Apply a settings dict to instance state."""
|
|
d = TRAY_DEFAULTS
|
|
saved_index = int(settings.get("active_camera", d["active_camera"]))
|
|
available_indices = {cam["index"] for cam in self._cameras}
|
|
if available_indices and saved_index not in available_indices:
|
|
self._active_camera = min(available_indices)
|
|
else:
|
|
self._active_camera = saved_index
|
|
self._camera_enabled = bool(
|
|
settings.get("camera_capture_enabled", d["camera_capture_enabled"])
|
|
)
|
|
self._camera_rect = settings.get("camera_rect", d["camera_rect"])
|
|
self._behavior_settings = settings.get(
|
|
"behavior_settings", d["behavior_settings"].copy()
|
|
)
|
|
|
|
def poll_status(self) -> str:
|
|
result = _send_command("status")
|
|
if result and result.get("running"):
|
|
self._last_godot_status = result
|
|
# Load settings from AppState on first successful connection
|
|
if not self._settings_loaded:
|
|
self._settings_loaded = True
|
|
saved = _load_settings()
|
|
if saved:
|
|
self._apply_settings(saved)
|
|
# Sync Godot gaze mode to match restored camera state
|
|
if self._camera_enabled:
|
|
_send_command("set_gaze_mode", mode="face_to_face")
|
|
# Health check: restart vision if it died unexpectedly (30s cooldown)
|
|
if self._camera_enabled:
|
|
if self._vision_process is None or self._vision_process.poll() is not None:
|
|
now = time.monotonic()
|
|
if now >= self._vision_restart_cooldown:
|
|
self._vision_restart_cooldown = now + 30.0
|
|
self._start_vision()
|
|
return "running"
|
|
self._last_godot_status = None
|
|
return "stopped"
|
|
|
|
def get_status_labels(self) -> dict[str, str]:
|
|
result = _send_command("status")
|
|
if not result:
|
|
return {
|
|
"Snap": "?",
|
|
"Camera": "Disconnected",
|
|
"Face": "—",
|
|
}
|
|
|
|
snap = "ON" if result.get("snap_enabled") else "OFF"
|
|
camera_on = self._camera_enabled and (
|
|
self._vision_process is not None and self._vision_process.poll() is None
|
|
)
|
|
|
|
if camera_on:
|
|
cam_name = "Unknown"
|
|
for cam in self._cameras:
|
|
if cam["index"] == self._active_camera:
|
|
cam_name = cam["name"]
|
|
break
|
|
camera_str = f"ON — {cam_name}"
|
|
face_str = "Yes" if result.get("face_detected") else "No"
|
|
else:
|
|
camera_str = (
|
|
f"OFF ({len(self._cameras)} available)"
|
|
if self._cameras
|
|
else "OFF"
|
|
)
|
|
face_str = "—"
|
|
|
|
return {
|
|
"Snap": snap,
|
|
"Camera": camera_str,
|
|
"Face": face_str,
|
|
}
|
|
|
|
def _settings_dict(self) -> dict:
|
|
return {
|
|
"active_camera": self._active_camera,
|
|
"camera_capture_enabled": self._camera_enabled,
|
|
"camera_rect": self._camera_rect,
|
|
"behavior_settings": self._behavior_settings,
|
|
}
|
|
|
|
def _persist(self) -> None:
|
|
"""Save current tray settings to AppState."""
|
|
_save_settings(self._settings_dict())
|
|
|
|
def _toggle_snap(self) -> None:
|
|
fr.record("tray.toggle_snap", "Edge snap toggled")
|
|
_send_command("toggle_snap")
|
|
self.set_status_labels(self.get_status_labels())
|
|
|
|
def toggle_camera(self) -> None:
|
|
self._camera_enabled = not self._camera_enabled
|
|
fr.record(
|
|
"tray.toggle_camera",
|
|
"Camera toggled " + ("ON" if self._camera_enabled else "OFF"),
|
|
{"camera": self._active_camera},
|
|
)
|
|
if self._camera_enabled:
|
|
self._start_vision()
|
|
else:
|
|
self._stop_vision()
|
|
_send_command("set_gaze_mode", mode="face_to_face" if self._camera_enabled else "desktop")
|
|
self._persist()
|
|
self.set_status_labels(self.get_status_labels())
|
|
|
|
def _start_vision(self) -> None:
|
|
"""Launch the vision sidecar subprocess."""
|
|
if (
|
|
self._vision_process is not None
|
|
and self._vision_process.poll() is None
|
|
):
|
|
return
|
|
if not VISION_SCRIPT.exists():
|
|
fr.record("tray.vision.error", "Vision script not found", {"path": str(VISION_SCRIPT)})
|
|
return
|
|
LOG_DIR.mkdir(parents=True, exist_ok=True)
|
|
log_file = VISION_LOG.open("a")
|
|
self._vision_process = subprocess.Popen(
|
|
[
|
|
sys.executable,
|
|
str(VISION_SCRIPT),
|
|
"--camera",
|
|
str(self._active_camera),
|
|
"--preview-port",
|
|
"19703",
|
|
],
|
|
stdout=log_file,
|
|
stderr=log_file,
|
|
)
|
|
fr.record("tray.vision.start", "Vision sidecar started", {"camera": self._active_camera})
|
|
|
|
def _stop_vision(self) -> None:
|
|
"""Stop the vision sidecar subprocess."""
|
|
if self._vision_process is not None:
|
|
fr.record("tray.vision.stop", "Vision sidecar stopped", {"camera": self._active_camera})
|
|
self._terminate(self._vision_process)
|
|
self._vision_process = None
|
|
|
|
def _cleanup(self) -> None:
|
|
fr.record("tray.quit", "Tray exiting — stopping Godot and vision")
|
|
_send_command("quit")
|
|
self._stop_vision()
|
|
|
|
@staticmethod
|
|
def _terminate(proc: subprocess.Popen) -> None:
|
|
"""Gracefully terminate a subprocess."""
|
|
if proc.poll() is not None:
|
|
return
|
|
proc.send_signal(signal.SIGTERM)
|
|
try:
|
|
proc.wait(timeout=3)
|
|
except subprocess.TimeoutExpired:
|
|
proc.kill()
|
|
|
|
def _reset_position(self) -> None:
|
|
fr.record("tray.reset_position", "Window position reset")
|
|
_send_command("reset_position")
|
|
|
|
def _restart(self) -> None:
|
|
fr.record("tray.restart", "Chobit restart triggered")
|
|
run_script = PROJECT_ROOT / "run"
|
|
subprocess.Popen(
|
|
[str(run_script), "restart"],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
)
|
|
|
|
def _select_camera(self, index: int) -> None:
|
|
fr.record("tray.select_camera", f"Camera switched to {index}", {"index": index})
|
|
was_running = (
|
|
self._vision_process is not None
|
|
and self._vision_process.poll() is None
|
|
)
|
|
self._active_camera = index
|
|
self._persist()
|
|
_publish_redis("chobit.camera.select", {"index": index})
|
|
if was_running:
|
|
self._stop_vision()
|
|
self._start_vision()
|
|
|
|
def _open_board(self) -> None:
|
|
fr.record("tray.open_settings", "Settings panel opened")
|
|
_send_command("open_settings")
|
|
|
|
def _open_camera_settings(self) -> None:
|
|
fr.record("tray.open_camera_settings", "Camera settings panel opened")
|
|
_send_command("open_settings_page", page="camera")
|
|
|
|
|
|
def create_app(dev_mode: bool = False) -> ChobitTray:
|
|
"""Create and return a ChobitTray instance (without running the main loop)."""
|
|
return ChobitTray(dev_mode=dev_mode)
|
|
|
|
|
|
def main() -> None:
|
|
app = create_app()
|
|
app.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|