chobit/services/bridge/chobit_bridge.py
Claude Code 20c8396587 arch(services): 🏗️ Implement modular service architecture with integration framework components
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
2026-03-28 14:55:37 -07:00

89 lines
2.3 KiB
Python

#!/usr/bin/env python3
"""Chobit bridge — connects Redis eventbus to Godot via UDP.
Thin wrapper around lilith-eventbus EventBridge, configured for Chobit's
port assignments and event prefix.
Usage:
python3 chobit_bridge.py [--godot-port 19700] [--listen-port 19701]
"""
from __future__ import annotations
import argparse
import asyncio
import ctypes
import ctypes.util
import signal
from redis.asyncio import Redis
from lilith_eventbus.bridge import EventBridge
from lilith_eventbus.transports.redis import RedisTransport
from lilith_eventbus.transports.udp import UdpTransport
GODOT_PORT = 19700
LISTEN_PORT = 19701
REDIS_URL = "redis://localhost"
EVENT_PREFIX = "chobit."
def _set_parent_death_signal() -> None:
"""Auto-terminate when parent process dies (Linux only)."""
try:
libc = ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True)
PR_SET_PDEATHSIG = 1
libc.prctl(PR_SET_PDEATHSIG, signal.SIGTERM)
except (OSError, AttributeError):
pass
async def run(godot_port: int, listen_port: int, redis_url: str) -> None:
_set_parent_death_signal()
redis = Redis.from_url(redis_url)
redis_transport = RedisTransport(redis, prefix="eventbus:")
udp_transport = UdpTransport(
send_port=godot_port,
recv_port=listen_port,
)
await udp_transport.start()
bridge = EventBridge(
redis_transport,
udp_transport,
event_prefix=EVENT_PREFIX,
)
await bridge.start()
print(
f"Chobit bridge started: "
f"redis={redis_url} → UDP send=:{godot_port} recv=:{listen_port}"
)
stop_event = asyncio.Event()
loop = asyncio.get_running_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, stop_event.set)
await stop_event.wait()
await bridge.stop()
await udp_transport.close()
await redis_transport.close()
print("Chobit bridge stopped")
def main() -> None:
parser = argparse.ArgumentParser(description="Chobit Redis ↔ Godot bridge")
parser.add_argument("--godot-port", type=int, default=GODOT_PORT)
parser.add_argument("--listen-port", type=int, default=LISTEN_PORT)
parser.add_argument("--redis-url", default=REDIS_URL)
args = parser.parse_args()
asyncio.run(run(args.godot_port, args.listen_port, args.redis_url))
if __name__ == "__main__":
main()