chobit/services/tray/chobit_tray.py
2026-03-30 10:33:02 -07:00

423 lines
14 KiB
Python

"""Chobit system tray — settings UI for the desktop companion.
All persistent state lives in Godot's AppState (user://app_state.json).
The tray reads/writes its section via UDP commands to Godot — no local
config files, no separate persistence layer.
"""
from __future__ import annotations
import atexit
import json
import signal
import socket
import subprocess
import sys
import threading
import time
from datetime import datetime, timezone
from pathlib import Path
from lilith_tray.base import TrayApp
from lilith_tray.types import TrayConfig, TrayIcon, TrayMenuItem
import flight_recorder as fr
GODOT_PORT = 19700
TRAY_PORT = 19701
PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
VISION_SCRIPT = PROJECT_ROOT / "services" / "vision" / "chobit_vision.py"
LOG_DIR = Path.home() / ".local" / "share" / "chobit"
VISION_LOG = LOG_DIR / "vision.log"
TRAY_DEFAULTS: dict = {
"active_camera": 0,
"camera_capture_enabled": False,
"camera_rect": None,
"behavior_settings": {
"focus_steal_cooldown": 15.0,
"gaze_duration_s": 3.0,
"gaze_margin": 50,
},
}
TRAY_ICONS_DIR = (
Path.home()
/ "Code"
/ "@packages"
/ "@tray"
/ "tray-resources"
/ "generated"
/ "circle-dot"
)
def _send_command(cmd: str, **kwargs: object) -> dict | None:
"""Send a JSON command to Godot via UDP and wait for response."""
msg = {"cmd": cmd, **kwargs}
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(1.0)
try:
sock.sendto(json.dumps(msg).encode(), ("127.0.0.1", GODOT_PORT))
data, _ = sock.recvfrom(4096)
return json.loads(data.decode())
except (OSError, json.JSONDecodeError):
return None
finally:
sock.close()
def _load_settings() -> dict:
"""Load tray settings from AppState via Godot UDP."""
result = _send_command("state_get", section="tray")
if result and not result.get("error"):
return result
return {}
def _save_settings(settings: dict) -> None:
"""Save tray settings to AppState via Godot UDP."""
_send_command("state_set", section="tray", data=settings)
def _publish_redis(event_type: str, payload: dict) -> None:
"""Publish an eventbus envelope to Redis synchronously."""
try:
import redis
r = redis.Redis()
envelope = {
"type": event_type,
"payload": payload,
"timestamp": datetime.now(timezone.utc).isoformat(),
"correlationId": f"{int(datetime.now(timezone.utc).timestamp() * 1000):x}",
"source": "chobit-tray",
}
r.publish(f"eventbus:{event_type}", json.dumps(envelope))
r.close()
except Exception:
pass
def _enumerate_cameras() -> list[dict]:
"""Enumerate available cameras using imajin-face-tracker."""
try:
from imajin_face_tracker import enumerate_cameras
return [
{"index": cam.index, "name": cam.name, "path": cam.path}
for cam in enumerate_cameras()
]
except ImportError:
return []
class ChobitTray(TrayApp):
"""System tray for the Chobit desktop companion."""
_vision_process: subprocess.Popen | None = None
_camera_enabled: bool = False
_active_camera: int = 0
_cameras: list[dict] = []
_last_godot_status: dict | None = None
_settings_loaded: bool = False
_vision_restart_cooldown: float = 0.0
def __init__(self, dev_mode: bool = False) -> None:
icon_green = TRAY_ICONS_DIR / "green-24.png"
icon_yellow = TRAY_ICONS_DIR / "yellow-24.png"
icon_red = TRAY_ICONS_DIR / "red-24.png"
self._cameras = _enumerate_cameras()
self._apply_settings(TRAY_DEFAULTS.copy())
menu: list[TrayMenuItem] = [
TrayMenuItem.action("Settings", self._open_board),
TrayMenuItem.action("Camera Settings", self._open_camera_settings),
TrayMenuItem.separator(),
TrayMenuItem.action("Toggle Snap", self._toggle_snap),
]
if not dev_mode:
menu.append(TrayMenuItem.action("Toggle Camera", self.toggle_camera))
menu += [
TrayMenuItem.separator(),
TrayMenuItem.action("Reset Position", self._reset_position),
TrayMenuItem.action("Restart", self._restart),
TrayMenuItem.quit("Quit Chobit"),
]
config = TrayConfig(
name="Chobit",
icons={
"running": TrayIcon.from_file(icon_green),
"warning": TrayIcon.from_file(icon_yellow),
"stopped": TrayIcon.from_file(icon_red),
},
initial_icon="running",
menu=menu,
poll_interval=5,
label="Main",
)
super().__init__(config)
atexit.register(self._cleanup)
signal.signal(signal.SIGTERM, lambda *_: sys.exit(0))
self._start_command_listener()
def _start_command_listener(self) -> None:
"""Listen on TRAY_PORT for commands from Godot."""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
sock.bind(("127.0.0.1", TRAY_PORT))
except OSError:
return
self._cmd_sock = sock
def listener() -> None:
while True:
try:
data, addr = sock.recvfrom(4096)
msg = json.loads(data.decode())
self._dispatch_command(msg, addr)
except (OSError, json.JSONDecodeError):
break
t = threading.Thread(target=listener, daemon=True)
t.start()
def _dispatch_command(self, msg: dict, addr: tuple) -> None:
"""Handle a command received from Godot via UDP."""
cmd = msg.get("cmd", "")
if cmd == "select_camera":
index = msg.get("index")
if isinstance(index, int):
self._select_camera(index)
elif cmd == "list_cameras":
self._send_camera_list()
elif cmd == "set_camera_enabled":
enabled = bool(msg.get("enabled", False))
if enabled != self._camera_enabled:
self.toggle_camera()
def _send_camera_list(self) -> None:
"""Send enumerated camera list to Godot via UDP."""
cameras = [
{"index": c["index"], "name": c["name"], "path": c.get("path", "")}
for c in self._cameras
]
fr.record("tray.camera_list", "Camera list sent to Godot", {"count": len(cameras)})
msg = {"cmd": "camera_list", "cameras": cameras}
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
sock.sendto(json.dumps(msg).encode(), ("127.0.0.1", GODOT_PORT))
except OSError:
pass
finally:
sock.close()
def _apply_settings(self, settings: dict) -> None:
"""Apply a settings dict to instance state."""
d = TRAY_DEFAULTS
saved_index = int(settings.get("active_camera", d["active_camera"]))
available_indices = {cam["index"] for cam in self._cameras}
if available_indices and saved_index not in available_indices:
self._active_camera = min(available_indices)
else:
self._active_camera = saved_index
self._camera_enabled = bool(
settings.get("camera_capture_enabled", d["camera_capture_enabled"])
)
self._camera_rect = settings.get("camera_rect", d["camera_rect"])
self._behavior_settings = settings.get(
"behavior_settings", d["behavior_settings"].copy()
)
def poll_status(self) -> str:
result = _send_command("status")
if result and result.get("running"):
self._last_godot_status = result
# Load settings from AppState on first successful connection
if not self._settings_loaded:
self._settings_loaded = True
saved = _load_settings()
if saved:
self._apply_settings(saved)
# Sync Godot gaze mode to match restored camera state
if self._camera_enabled:
_send_command("set_gaze_mode", mode="face_to_face")
# Health check: restart vision if it died unexpectedly (30s cooldown)
if self._camera_enabled:
if self._vision_process is None or self._vision_process.poll() is not None:
now = time.monotonic()
if now >= self._vision_restart_cooldown:
self._vision_restart_cooldown = now + 30.0
self._start_vision()
return "running"
self._last_godot_status = None
return "stopped"
def get_status_labels(self) -> dict[str, str]:
result = _send_command("status")
if not result:
return {
"Snap": "?",
"Camera": "Disconnected",
"Face": "",
}
snap = "ON" if result.get("snap_enabled") else "OFF"
camera_on = self._camera_enabled and (
self._vision_process is not None and self._vision_process.poll() is None
)
if camera_on:
cam_name = "Unknown"
for cam in self._cameras:
if cam["index"] == self._active_camera:
cam_name = cam["name"]
break
camera_str = f"ON — {cam_name}"
face_str = "Yes" if result.get("face_detected") else "No"
else:
camera_str = (
f"OFF ({len(self._cameras)} available)"
if self._cameras
else "OFF"
)
face_str = ""
return {
"Snap": snap,
"Camera": camera_str,
"Face": face_str,
}
def _settings_dict(self) -> dict:
return {
"active_camera": self._active_camera,
"camera_capture_enabled": self._camera_enabled,
"camera_rect": self._camera_rect,
"behavior_settings": self._behavior_settings,
}
def _persist(self) -> None:
"""Save current tray settings to AppState."""
_save_settings(self._settings_dict())
def _toggle_snap(self) -> None:
fr.record("tray.toggle_snap", "Edge snap toggled")
_send_command("toggle_snap")
self.set_status_labels(self.get_status_labels())
def toggle_camera(self) -> None:
self._camera_enabled = not self._camera_enabled
fr.record(
"tray.toggle_camera",
"Camera toggled " + ("ON" if self._camera_enabled else "OFF"),
{"camera": self._active_camera},
)
if self._camera_enabled:
self._start_vision()
else:
self._stop_vision()
_send_command("set_gaze_mode", mode="face_to_face" if self._camera_enabled else "desktop")
self._persist()
self.set_status_labels(self.get_status_labels())
def _start_vision(self) -> None:
"""Launch the vision sidecar subprocess."""
if (
self._vision_process is not None
and self._vision_process.poll() is None
):
return
if not VISION_SCRIPT.exists():
fr.record("tray.vision.error", "Vision script not found", {"path": str(VISION_SCRIPT)})
return
LOG_DIR.mkdir(parents=True, exist_ok=True)
log_file = VISION_LOG.open("a")
self._vision_process = subprocess.Popen(
[
sys.executable,
str(VISION_SCRIPT),
"--camera",
str(self._active_camera),
"--preview-port",
"19703",
],
stdout=log_file,
stderr=log_file,
)
fr.record("tray.vision.start", "Vision sidecar started", {"camera": self._active_camera})
def _stop_vision(self) -> None:
"""Stop the vision sidecar subprocess."""
if self._vision_process is not None:
fr.record("tray.vision.stop", "Vision sidecar stopped", {"camera": self._active_camera})
self._terminate(self._vision_process)
self._vision_process = None
def _cleanup(self) -> None:
fr.record("tray.quit", "Tray exiting — stopping Godot and vision")
_send_command("quit")
self._stop_vision()
@staticmethod
def _terminate(proc: subprocess.Popen) -> None:
"""Gracefully terminate a subprocess."""
if proc.poll() is not None:
return
proc.send_signal(signal.SIGTERM)
try:
proc.wait(timeout=3)
except subprocess.TimeoutExpired:
proc.kill()
def _reset_position(self) -> None:
fr.record("tray.reset_position", "Window position reset")
_send_command("reset_position")
def _restart(self) -> None:
fr.record("tray.restart", "Chobit restart triggered")
run_script = PROJECT_ROOT / "run"
subprocess.Popen(
[str(run_script), "restart"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
def _select_camera(self, index: int) -> None:
fr.record("tray.select_camera", f"Camera switched to {index}", {"index": index})
was_running = (
self._vision_process is not None
and self._vision_process.poll() is None
)
self._active_camera = index
self._persist()
_publish_redis("chobit.camera.select", {"index": index})
if was_running:
self._stop_vision()
self._start_vision()
def _open_board(self) -> None:
fr.record("tray.open_settings", "Settings panel opened")
_send_command("open_settings")
def _open_camera_settings(self) -> None:
fr.record("tray.open_camera_settings", "Camera settings panel opened")
_send_command("open_settings_page", page="camera")
def create_app(dev_mode: bool = False) -> ChobitTray:
"""Create and return a ChobitTray instance (without running the main loop)."""
return ChobitTray(dev_mode=dev_mode)
def main() -> None:
app = create_app()
app.run()
if __name__ == "__main__":
main()