feat(vision): Add Chobit vision model integration for enhanced image analysis

Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
Claude Code 2026-03-29 06:03:17 -07:00
parent 603b47e3a2
commit 2c17dbfafd

View file

@ -8,8 +8,12 @@ with Redis transport.
Supports runtime camera switching via chobit.camera.select events
and camera enumeration via chobit.camera.list_request.
When --preview-port is set, serves annotated JPEG frames over WebSocket
for the Godot settings preview panel.
Usage:
python3 chobit_vision.py [--fps 15] [--camera 0] [--redis-url redis://localhost]
[--preview-port 19703]
"""
from __future__ import annotations
@ -18,15 +22,19 @@ import argparse
import asyncio
import ctypes
import ctypes.util
import math
import os
import signal
import sys
from contextlib import nullcontext
# Must precede cv2 / imajin_face_tracker: MediaPipe reads these at module load.
os.environ.setdefault("MEDIAPIPE_DISABLE_GPU", "1")
os.environ.setdefault("TF_CPP_MIN_LOG_LEVEL", "3")
import cv2
import numpy as np
import websockets
from imajin_face_tracker import (
CameraInfo,
@ -41,9 +49,30 @@ from redis.asyncio import Redis
DEFAULT_FPS = 15
DEFAULT_CAMERA = 0
DEFAULT_REDIS_URL = "redis://localhost"
DEFAULT_PREVIEW_PORT = 19703
FRAME_WIDTH = 640
FRAME_HEIGHT = 480
# MediaPipe FaceMesh landmark indices
_NOSE_TIP = 4
_LEFT_IRIS_CENTER = 468
_RIGHT_IRIS_CENTER = 473
_LEFT_IRIS_TOP = 469
_RIGHT_IRIS_TOP = 474
_FACE_OVAL = [
10, 338, 297, 332, 284, 251, 389, 356, 454, 323, 361, 288,
397, 365, 379, 378, 400, 377, 152, 148, 176, 149, 150, 136,
172, 58, 132, 93, 234, 127, 162, 21, 54, 103, 67, 109, 10,
]
# Attention state colors in BGR
_ATTENTION_COLORS: dict[str, tuple[int, int, int]] = {
"looking": (46, 204, 113),
"screen": (212, 188, 0),
"away": (0, 152, 255),
"absent": (139, 125, 96),
}
def build_payload(result: TrackingResult, camera_index: int) -> dict:
"""Convert tracking result to eventbus payload."""
@ -106,6 +135,61 @@ def camera_list_payload(cameras: list[CameraInfo], active_index: int) -> dict:
}
def _draw_preview_overlay(frame: np.ndarray, result: TrackingResult) -> np.ndarray:
"""Draw face tracking overlay on frame for the Godot preview panel."""
h, w = frame.shape[:2]
out = frame.copy()
# Attention color bar at top
state = result.attention.state.value
bar_color = _ATTENTION_COLORS.get(state, _ATTENTION_COLORS["absent"])
cv2.rectangle(out, (0, 0), (w, 22), bar_color, -1)
cv2.putText(out, state, (8, 15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1, cv2.LINE_AA)
if result.face is None:
return out
lms: np.ndarray = result.face.landmarks # (478, 3) normalized 0-1
def px(idx: int) -> tuple[int, int]:
return (int(lms[idx, 0] * w), int(lms[idx, 1] * h))
# Face oval
oval = [px(i) for i in _FACE_OVAL]
for i in range(len(oval) - 1):
cv2.line(out, oval[i], oval[i + 1], (80, 200, 80), 1, cv2.LINE_AA)
# Iris circles
for center_idx, top_idx in ((_LEFT_IRIS_CENTER, _LEFT_IRIS_TOP), (_RIGHT_IRIS_CENTER, _RIGHT_IRIS_TOP)):
cx, cy = px(center_idx)
tx, ty = px(top_idx)
r = max(3, int(math.hypot(tx - cx, ty - cy)))
cv2.circle(out, (cx, cy), r, (255, 255, 0), 1, cv2.LINE_AA)
cv2.circle(out, (cx, cy), 2, (255, 255, 0), -1)
# Nose tip + head pose arrow
nx, ny = px(_NOSE_TIP)
yaw = result.face.head_pose.yaw
pitch = result.face.head_pose.pitch
arrow_len = 35
ax = nx + int(math.sin(math.radians(yaw)) * arrow_len)
ay = ny - int(math.sin(math.radians(pitch)) * arrow_len)
cv2.circle(out, (nx, ny), 3, (255, 255, 255), -1)
cv2.arrowedLine(out, (nx, ny), (ax, ay), (0, 255, 255), 2, cv2.LINE_AA, tipLength=0.35)
# Stats text
ih = result.face.iris_gaze.horizontal
iv = result.face.iris_gaze.vertical
cv2.putText(
out,
f"Y{yaw:+.0f} P{pitch:+.0f} iris H{ih:.2f} V{iv:.2f}",
(8, h - 8),
cv2.FONT_HERSHEY_SIMPLEX, 0.4, (200, 200, 200), 1, cv2.LINE_AA,
)
return out
def _set_parent_death_signal() -> None:
"""Auto-terminate when parent process dies (Linux only)."""
try:
@ -116,8 +200,8 @@ def _set_parent_death_signal() -> None:
pass # Not Linux or prctl unavailable
async def run(fps: int, camera_index: int, redis_url: str) -> None:
"""Main capture loop with camera switching support."""
async def run(fps: int, camera_index: int, redis_url: str, preview_port: int) -> None:
"""Main capture loop with camera switching and optional WebSocket preview."""
_set_parent_death_signal()
redis = Redis.from_url(redis_url)
@ -134,6 +218,7 @@ async def run(fps: int, camera_index: int, redis_url: str) -> None:
actual_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
target_interval = 1.0 / fps
pending_switch: int | None = None
preview_clients: set[asyncio.Queue[bytes]] = set()
async def on_camera_select(envelope: Envelope) -> None:
nonlocal pending_switch
@ -151,7 +236,6 @@ async def run(fps: int, camera_index: int, redis_url: str) -> None:
bus.on("chobit.camera.select", on_camera_select)
bus.on("chobit.camera.list_request", on_camera_list_request)
# Publish initial camera list
cameras = enumerate_cameras()
await bus.emit("chobit.camera.list", camera_list_payload(cameras, active_index))
@ -159,6 +243,7 @@ async def run(fps: int, camera_index: int, redis_url: str) -> None:
f"Vision sidecar started: camera={active_index} "
f"resolution={actual_width}x{actual_height} fps={fps} "
f"available={len(cameras)} cameras"
+ (f" preview=:{preview_port}" if preview_port > 0 else "")
)
running = True
@ -173,48 +258,79 @@ async def run(fps: int, camera_index: int, redis_url: str) -> None:
tracker = FaceTracker(frame_width=actual_width, frame_height=actual_height)
async def preview_handler(websocket: websockets.ServerConnection) -> None:
q: asyncio.Queue[bytes] = asyncio.Queue(maxsize=2)
preview_clients.add(q)
try:
while True:
frame_bytes = await q.get()
await websocket.send(frame_bytes)
except Exception:
pass
finally:
preview_clients.discard(q)
ws_ctx = (
websockets.serve(preview_handler, "127.0.0.1", preview_port)
if preview_port > 0
else nullcontext()
)
try:
while running:
# Handle pending camera switch
if pending_switch is not None:
new_index = pending_switch
pending_switch = None
new_cap = open_camera(new_index)
if new_cap is not None:
cap.release()
cap = new_cap
active_index = new_index
actual_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
actual_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
tracker.close()
tracker = FaceTracker(
frame_width=actual_width, frame_height=actual_height
)
cameras = enumerate_cameras()
await bus.emit(
"chobit.camera.list",
camera_list_payload(cameras, active_index),
)
print(
f"Switched to camera {active_index} "
f"({actual_width}x{actual_height})"
)
async with ws_ctx:
while running:
# Handle pending camera switch
if pending_switch is not None:
new_index = pending_switch
pending_switch = None
new_cap = open_camera(new_index)
if new_cap is not None:
cap.release()
cap = new_cap
active_index = new_index
actual_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
actual_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
tracker.close()
tracker = FaceTracker(
frame_width=actual_width, frame_height=actual_height
)
cameras = enumerate_cameras()
await bus.emit(
"chobit.camera.list",
camera_list_payload(cameras, active_index),
)
print(
f"Switched to camera {active_index} "
f"({actual_width}x{actual_height})"
)
start = loop.time()
start = loop.time()
ok, frame = cap.read()
if not ok:
await asyncio.sleep(0.5)
continue
ok, frame = cap.read()
if not ok:
await asyncio.sleep(0.5)
continue
result = tracker.process(frame)
payload = build_payload(result, active_index)
await bus.emit("chobit.face.tracked", payload)
result = tracker.process(frame)
payload = build_payload(result, active_index)
await bus.emit("chobit.face.tracked", payload)
elapsed = loop.time() - start
sleep_time = target_interval - elapsed
if sleep_time > 0:
await asyncio.sleep(sleep_time)
# Push annotated preview frame to connected Godot clients
if preview_clients:
overlay = _draw_preview_overlay(frame, result)
rgb = cv2.cvtColor(overlay, cv2.COLOR_BGR2RGB)
_, buf = cv2.imencode(".jpg", rgb, [cv2.IMWRITE_JPEG_QUALITY, 75])
frame_bytes = bytes(buf)
for q in list(preview_clients):
try:
q.put_nowait(frame_bytes)
except asyncio.QueueFull:
pass # drop frame if consumer is slow
elapsed = loop.time() - start
sleep_time = target_interval - elapsed
if sleep_time > 0:
await asyncio.sleep(sleep_time)
finally:
tracker.close()
cap.release()
@ -227,9 +343,10 @@ def main() -> None:
parser.add_argument("--fps", type=int, default=DEFAULT_FPS)
parser.add_argument("--camera", type=int, default=DEFAULT_CAMERA)
parser.add_argument("--redis-url", default=DEFAULT_REDIS_URL)
parser.add_argument("--preview-port", type=int, default=DEFAULT_PREVIEW_PORT)
args = parser.parse_args()
asyncio.run(run(fps=args.fps, camera_index=args.camera, redis_url=args.redis_url))
asyncio.run(run(fps=args.fps, camera_index=args.camera, redis_url=args.redis_url, preview_port=args.preview_port))
if __name__ == "__main__":