feat(@applications/tv-anarchy): implement parallel fleet completion modules

Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
Natalie 2026-06-09 21:30:14 -07:00
parent a86e68c525
commit 4466ff72cd
12 changed files with 869 additions and 3 deletions

View file

@ -0,0 +1,29 @@
# Plan — parallel completion of the remaining buildable fleet work
Date: 2026-06-09 (late evening). Follows
[20260609_fleet-engine-title-refiner.md](./20260609_fleet-engine-title-refiner.md).
Execution model: four agents in parallel, each owning ONE new module + its
tests in `governor/src/fleet/` — zero shared-file edits between tracks; the
coordinating session pre-plumbed shared types (`mediaRoot` on
`FleetHost`/`FleetDeviceOverride`) and does all `cli.ts`/`index.ts` wiring,
integration testing, and docs afterward.
| Track | Module | Delivers |
|---|---|---|
| A | `actuate.ts` (+ may append exports to `transmission.ts`) | re-pin execution (rsync holder→target over ssh, dry-run default), recorded holdings (`~/.local/state/tv-anarchy/fleet-holdings.json`), research-feed (torrent-search invocation from a dead torrent's cleaned name) |
| B | `daemon.ts` | periodic fleet tick for launchd: duties Δ-log → floor check → reaper (+safe nudges when enabled), injectable deps, own state file (`fleet-daemon-state.json`) |
| C | `serve.ts` | HTTP service: `/registry` `/custody` `/reaper` `/peers_for/<hash>` `/health`, optional bearer token — the broadcast-host service, runnable on any node today |
| D | `probe.ts` | ssh `df` disk probes + EWMA uptime score (`fleet-probe-state.json`), pure merge into host capacity (feeds custody eligibility) |
Constraints all tracks share: Bun + strict tsc (`bunx tsc --noEmit` must stay
clean), `bun:test` tests in `governor/test/fleet/`, pure logic separated from
I/O, read-only by default / mutation behind explicit flags, no edits to
existing files (exception: track A may append to `transmission.ts`).
Still blocked regardless of parallelism (unchanged): WG fabric plane 1 (the
`10.9.0.4` decision), seedbox provisioning, friend-mesh, private trackers,
Discord planes.
Completion criteria: all four modules merged, CLI wired
(`fleet repin|daemon|serve|probe`), full `bun test` + typecheck green, docs
(operations/roadmap/fleet README) updated, results appended here.

View file

@ -53,6 +53,9 @@ struct DevicesView: View {
.help("Cache the next episodes of your recent shows to this device")
}
if let s = controller.hostStatsByID[d.id] { loadPill(s) }
if case .outdated(let deployed)? = controller.helperFreshness(d.id) {
outdatedPill(deployed: deployed)
}
if restarting.contains(d.id) {
ProgressView().controlSize(.small)
.help("Restarting the player service…")
@ -139,6 +142,20 @@ struct DevicesView: View {
}
}
/// Stale-deploy badge: the helper script running on the device is not the
/// one vendored in this repo (or predates self-reporting entirely), so the
/// app may be speaking verbs the device doesn't know yet. Redeploy per
/// mcp/README to clear it.
private func outdatedPill(deployed: String?) -> some View {
Label("not up to date", systemImage: "exclamationmark.arrow.triangle.2.circlepath")
.font(.caption2)
.padding(.horizontal, 6).padding(.vertical, 1)
.background(.orange.opacity(0.2), in: Capsule())
.foregroundStyle(.orange)
.help("The helper deployed on this device (\(deployed.map { String($0.prefix(7)) } ?? "unstamped — pre-2026-06 deploy")) "
+ "differs from the repo's copy — redeploy it (mcp/README → deploy step).")
}
/// System-load badge: load1 normalised by core count. <0.6/core is comfortable,
/// up to 1.0/core is busy-but-keeping-up, above means the run queue is oversubscribed.
@ViewBuilder

View file

@ -0,0 +1,41 @@
import CryptoKit
import Foundation
/// Answers "is the helper a fleet device runs the same one vendored in this
/// repo?" the Devices tab's freshness badge. The repo copy is the source of
/// truth (mcp/README's deploy step ships it byte-for-byte), so freshness is a
/// straight content-hash comparison: the device's `stats` report carries the
/// sha256 of the script that produced it; the app hashes the vendored source.
/// No version constants to bump, nothing that can drift.
public enum HelperDeployment {
public enum Freshness: Equatable, Sendable {
case current
/// `deployed` is the device-reported hash; nil means the deployed helper
/// predates self-reporting entirely (necessarily stale).
case outdated(deployed: String?)
}
/// Repo source for each known helper bin, keyed by basename (the bin path on
/// the device e.g. `/usr/local/bin/black-tv` names the deployed copy).
private static let vendoredSources: [String: String] = [
"black-tv": "mcp/src/blacktv/black-tv.sh",
]
/// sha256 of the vendored source for `bin`, or nil when the bin is not a
/// known helper or the repo checkout is absent (an installed app without a
/// repo can't judge freshness the badge simply stays off).
public static func expectedSHA(forBin bin: String) -> String? {
let name = (bin as NSString).lastPathComponent
guard let rel = vendoredSources[name],
let data = try? Data(contentsOf: RepoPaths.root.appendingPathComponent(rel))
else { return nil }
return SHA256.hash(data: data).map { String(format: "%02x", $0) }.joined()
}
/// Judge a device report against an expectation. nil expectation (unknown
/// helper / no repo) nil: freshness can't be judged, show nothing.
public static func freshness(expected: String?, reported: String?) -> Freshness? {
guard let expected else { return nil }
return reported == expected ? .current : .outdated(deployed: reported)
}
}

View file

@ -8,6 +8,10 @@ public struct HostStats: Decodable, Sendable, Equatable {
public let load15: Double
public let cores: Int
public let mpv_cpu: Double? // null when nothing is playing
/// sha256 of the helper script that served this report the Devices tab
/// compares it against the repo's vendored copy to flag stale deploys.
/// Absent from helpers that predate self-reporting (itself a stale sign).
public let helper_sha: String?
}
/// A target that can report its host's load (only black it's a real machine

View file

@ -25,6 +25,10 @@ public final class PlayerController {
/// Per-device load, sampled while the Devices tab is visible. Only HostStatsProvider
/// targets report (e.g. black); others stay absent. Drives the load badge there.
public private(set) var hostStatsByID: [String: HostStats] = [:]
/// Per-device expected helper hash (sha256 of the repo's vendored script for
/// the device's delegated-command bin), computed at reload. Devices without a
/// known helper or an app run without a repo checkout are simply absent.
private var expectedHelperSHA: [String: String] = [:]
/// Tracks of the active target's current file (empty unless TrackSelectable).
public private(set) var audioTracks: [MediaTrack] = []
public private(set) var subtitleTracks: [MediaTrack] = []
@ -70,6 +74,10 @@ public final class PlayerController {
public func reload() {
let cfg = DevicesConfig.loadOrSeed()
targets = cfg.devices.compactMap(Self.makeTarget)
expectedHelperSHA = cfg.devices.reduce(into: [:]) { acc, d in
if let bin = Self.helperBin(d.commands),
let sha = HelperDeployment.expectedSHA(forBin: bin) { acc[d.id] = sha }
}
if active == nil { activeID = defaultTargetID }
var next: [String: Snapshot] = [:]
for t in targets {
@ -145,6 +153,24 @@ public final class PlayerController {
/// Re-seed the default set (plum VLC + black mpv-ipc with LAN+overlay endpoints).
public func resetDevicesToDefault() { saveDevices(DevicesConfig.seeded().devices) }
// MARK: Helper deployment freshness (Devices tab)
/// The helper bin a device's delegated commands run (e.g. `/usr/local/bin/
/// black-tv`) the first word of whichever template is configured.
private static func helperBin(_ c: CommandsConfig?) -> String? {
guard let c else { return nil }
return (c.stats ?? c.stop ?? c.launchFile ?? c.releases)?.first
}
/// Is `id`'s deployed helper the one vendored in the repo? nil when freshness
/// can't be judged (no known helper, no repo checkout, or no stats report yet
/// an unreachable device shouldn't double-flag as outdated).
public func helperFreshness(_ id: String) -> HelperDeployment.Freshness? {
guard let stats = hostStatsByID[id] else { return nil }
return HelperDeployment.freshness(expected: expectedHelperSHA[id],
reported: stats.helper_sha)
}
// MARK: Device service restart (Devices tab)
/// Whether `id`'s host-side player service can be restarted (the target

View file

@ -0,0 +1,53 @@
import CryptoKit
import XCTest
@testable import TVAnarchyCore
/// The Devices tab's "not up to date" badge: a device reports the sha256 of the
/// helper script it runs; the app hashes the repo's vendored copy. Freshness is
/// the comparison no version constants anywhere.
final class HelperDeploymentTests: XCTestCase {
func testFreshnessJudgement() {
// No expectation (unknown helper / no repo checkout) can't judge.
XCTAssertNil(HelperDeployment.freshness(expected: nil, reported: "abc"))
XCTAssertNil(HelperDeployment.freshness(expected: nil, reported: nil))
// Matching hashes current.
XCTAssertEqual(HelperDeployment.freshness(expected: "abc", reported: "abc"), .current)
// Mismatch outdated, carrying what the device reported.
XCTAssertEqual(HelperDeployment.freshness(expected: "abc", reported: "def"),
.outdated(deployed: "def"))
// A helper too old to self-report is necessarily outdated.
XCTAssertEqual(HelperDeployment.freshness(expected: "abc", reported: nil),
.outdated(deployed: nil))
}
func testExpectedSHAHashesVendoredScript() throws {
// Point RepoPaths at a throwaway repo via $TV_ANARCHY_REPO.
let repo = FileManager.default.temporaryDirectory
.appendingPathComponent("helper-deploy-test-\(UUID().uuidString)")
let script = repo.appendingPathComponent("mcp/src/blacktv/black-tv.sh")
try FileManager.default.createDirectory(at: script.deletingLastPathComponent(),
withIntermediateDirectories: true)
let body = Data("#!/usr/bin/env bash\necho hi\n".utf8)
try body.write(to: script)
setenv("TV_ANARCHY_REPO", repo.path, 1)
defer {
unsetenv("TV_ANARCHY_REPO")
try? FileManager.default.removeItem(at: repo)
}
let expected = SHA256.hash(data: body).map { String(format: "%02x", $0) }.joined()
XCTAssertEqual(HelperDeployment.expectedSHA(forBin: "/usr/local/bin/black-tv"), expected)
// The bin's basename keys the lookup an unknown helper has no expectation.
XCTAssertNil(HelperDeployment.expectedSHA(forBin: "/usr/local/bin/some-other-helper"))
}
func testHostStatsDecodesWithAndWithoutHelperSha() throws {
let new = #"{"load1":0.5,"load5":0.4,"load15":0.3,"cores":8,"mpv_cpu":null,"helper_sha":"deadbeef"}"#
let old = #"{"load1":0.5,"load5":0.4,"load15":0.3,"cores":8,"mpv_cpu":12.5}"#
let n = try JSONDecoder().decode(HostStats.self, from: Data(new.utf8))
let o = try JSONDecoder().decode(HostStats.self, from: Data(old.utf8))
XCTAssertEqual(n.helper_sha, "deadbeef")
XCTAssertNil(o.helper_sha) // pre-stamp deploy still decodes
XCTAssertEqual(o.mpv_cpu, 12.5)
}
}

228
governor/src/fleet/probe.ts Normal file
View file

@ -0,0 +1,228 @@
// Capacity probe — turns the registry's static capacity guesses into measured
// facts: `df -kP` over ssh for diskFreeBytes, and an EWMA over probe
// reachability for the rolling uptimeScore the design spec asks for
// (uptime_score ∈ [0,1]). State persists at
// ~/.local/state/tv-anarchy/fleet-probe-state.json (path injectable for tests).
//
// Merge rule (withProbedCapacity) — deliberately simple: a probed value WINS
// over the registry value for BOTH diskFreeBytes and uptimeScore, including
// hand-configured fleet.json overrides. Detecting "explicit uptimeScore
// override vs class default" from a finished FleetHost is guesswork (0.5 on a
// roamer is indistinguishable either way), so we don't pretend to. The one
// exception: a state entry whose diskFreeBytes is null (host answered ssh but
// df never parsed, or disk was never measured) leaves the registry's
// diskFreeBytes alone — null is absence of measurement, not a measurement.
//
// Pure throughout except probeHost/probeAll (ssh) and load/saveProbeState
// (filesystem). No actuation here — callers feed the merged hosts to
// duties/custody.
import { spawnSync } from "node:child_process";
import { existsSync, mkdirSync, readFileSync, writeFileSync } from "node:fs";
import { homedir } from "node:os";
import { dirname, join } from "node:path";
import type { FleetHost } from "./types.ts";
/** Default ssh transport timeout for one probe (ConnectTimeout is 8s; this
* bounds the whole spawn including a slow remote df). */
export const DEFAULT_PROBE_TIMEOUT_MS = 20_000;
/** EWMA smoothing factor: ~5 probes to mostly forget an outage. */
export const DEFAULT_UPTIME_ALPHA = 0.2;
/** Real state file; tests inject a temp path instead. */
export const PROBE_STATE_PATH = join(
homedir(), ".local", "state", "tv-anarchy", "fleet-probe-state.json",
);
const SSH_OPTS = ["-o", "ConnectTimeout=8", "-o", "BatchMode=yes"];
export interface ProbeResult {
hostId: string;
/** ssh exited 0 — the host answered. */
reachable: boolean;
/** Parsed free bytes; null when unreachable or df output was garbage. */
diskFreeBytes: number | null;
}
export interface ProbeStateEntry {
/** Rolling EWMA uptime ∈ [0,1]. */
uptimeScore: number;
/** Unix epoch seconds of the last probe round that touched this host. */
lastProbeAt: number;
/** Last successfully measured free bytes; null = never measured. */
diskFreeBytes: number | null;
}
export interface ProbeState {
hosts: Record<string, ProbeStateEntry>;
}
// --- df parsing --------------------------------------------------------------
/**
* Free bytes from `df -kP <path>` output (POSIX: header + one data line,
* "Available" is the 4th column in 1K blocks). Tolerates motd/noise lines
* around the table and locale spacing; returns null on garbage.
*/
export function parseDfFreeBytes(out: string): number | null {
for (const line of out.split("\n")) {
const tokens = line.trim().split(/\s+/);
// Data line: filesystem + three numeric columns (blocks, used, available).
// The header never matches ("1024-blocks" isn't numeric), nor does noise.
if (tokens.length < 5) continue;
if (!/^\d+$/.test(tokens[1] ?? "") || !/^\d+$/.test(tokens[2] ?? "") || !/^\d+$/.test(tokens[3] ?? "")) {
continue;
}
return Number(tokens[3]) * 1024;
}
return null;
}
// --- probing -----------------------------------------------------------------
/** "~" stays bare so the remote shell expands it; anything else is
* single-quoted (mediaRoot may contain spaces). */
function shellPath(path: string): string {
return path === "~" ? "~" : `'${path.replace(/'/g, "'\\''")}'`;
}
/**
* Full ssh argv for one host's disk probe (everything after the `ssh`
* executable), or null when the host has no ssh destination. Pure this is
* the testable core of probeHost.
*/
export function dfProbeArgv(host: FleetHost): string[] | null {
if (!host.ssh) return null;
return [...SSH_OPTS, host.ssh, `df -kP ${shellPath(host.mediaRoot ?? "~")}`];
}
/** Probe one host: ssh `df -kP` and parse. No ssh destination → unreachable. */
export function probeHost(host: FleetHost, opts?: { timeoutMs?: number }): ProbeResult {
const argv = dfProbeArgv(host);
if (argv === null) return { hostId: host.id, reachable: false, diskFreeBytes: null };
const r = spawnSync("ssh", argv, {
encoding: "utf8",
timeout: opts?.timeoutMs ?? DEFAULT_PROBE_TIMEOUT_MS,
});
const reachable = r.status === 0;
return {
hostId: host.id,
reachable,
diskFreeBytes: reachable ? parseDfFreeBytes(r.stdout ?? "") : null,
};
}
/** Probe every host, sequentially (a handful of hosts; simplicity wins). */
export function probeAll(hosts: FleetHost[], opts?: { timeoutMs?: number }): ProbeResult[] {
return hosts.map(h => probeHost(h, opts));
}
// --- rolling uptime ----------------------------------------------------------
/**
* One EWMA step of the rolling uptime score. `prev` null seeds from the first
* observation (up 1, down 0); the result is clamped to [0,1].
*/
export function updateUptimeScore(
prev: number | null,
observedUp: boolean,
alpha: number = DEFAULT_UPTIME_ALPHA,
): number {
const obs = observedUp ? 1 : 0;
if (prev === null) return obs;
return Math.min(1, Math.max(0, alpha * obs + (1 - alpha) * prev));
}
/**
* Fold one probe round into the state pure; the input state is untouched.
* Unreachable rounds decay uptimeScore but keep the last measured disk number
* (a sleeping laptop's disk didn't vanish). Hosts absent from `results` carry
* over unchanged.
*/
export function recordProbeRound(
state: ProbeState,
results: ProbeResult[],
nowEpochS: number,
): ProbeState {
const hosts: Record<string, ProbeStateEntry> = { ...state.hosts };
const ordered = [...results].sort((a, b) => a.hostId.localeCompare(b.hostId));
for (const r of ordered) {
const prev = state.hosts[r.hostId];
hosts[r.hostId] = {
uptimeScore: updateUptimeScore(prev?.uptimeScore ?? null, r.reachable),
lastProbeAt: nowEpochS,
diskFreeBytes: r.diskFreeBytes ?? prev?.diskFreeBytes ?? null,
};
}
return { hosts };
}
// --- merge into the registry ---------------------------------------------------
/**
* Overlay probed capacity onto registry hosts pure; returns new host objects
* for probed hosts, passes unprobed hosts through by reference. See the
* file-top comment for the merge rule (probe wins; null measurement doesn't).
*/
export function withProbedCapacity(hosts: FleetHost[], state: ProbeState): FleetHost[] {
return hosts.map(h => {
const probed = state.hosts[h.id];
if (!probed) return h;
return {
...h,
capacity: {
...h.capacity,
diskFreeBytes: probed.diskFreeBytes ?? h.capacity.diskFreeBytes,
uptimeScore: probed.uptimeScore,
},
};
});
}
// --- state persistence ---------------------------------------------------------
function isEntry(v: unknown): v is ProbeStateEntry {
if (typeof v !== "object" || v === null) return false;
const e = v as Record<string, unknown>;
return typeof e["uptimeScore"] === "number"
&& typeof e["lastProbeAt"] === "number"
&& (e["diskFreeBytes"] === null || typeof e["diskFreeBytes"] === "number");
}
/**
* Load probe state; tolerant missing file, unparseable JSON, or a wrong
* shape all degrade to empty state, and malformed per-host entries are
* dropped individually.
*/
export function loadProbeState(path: string = PROBE_STATE_PATH): ProbeState {
if (!existsSync(path)) return { hosts: {} };
let raw: unknown;
try {
raw = JSON.parse(readFileSync(path, "utf8"));
} catch {
return { hosts: {} };
}
if (typeof raw !== "object" || raw === null) return { hosts: {} };
const rawHosts = (raw as Record<string, unknown>)["hosts"];
if (typeof rawHosts !== "object" || rawHosts === null) return { hosts: {} };
const hosts: Record<string, ProbeStateEntry> = {};
for (const [id, entry] of Object.entries(rawHosts)) {
if (isEntry(entry)) {
hosts[id] = {
uptimeScore: Math.min(1, Math.max(0, entry.uptimeScore)),
lastProbeAt: entry.lastProbeAt,
diskFreeBytes: entry.diskFreeBytes,
};
}
}
return { hosts };
}
/** Persist probe state (host keys sorted for stable diffs); creates parents. */
export function saveProbeState(state: ProbeState, path: string = PROBE_STATE_PATH): void {
const hosts: Record<string, ProbeStateEntry> = {};
for (const id of Object.keys(state.hosts).sort()) hosts[id] = state.hosts[id]!;
mkdirSync(dirname(path), { recursive: true });
writeFileSync(path, JSON.stringify({ hosts }, null, 2) + "\n", "utf8");
}

View file

@ -66,6 +66,7 @@ export interface FleetDeviceOverride {
api?: HostApi;
addr?: string;
ssh?: string;
mediaRoot?: string;
diskFreeBytes?: number;
upBwKbs?: number;
uptimeScore?: number;
@ -164,6 +165,7 @@ function hostFromDevice(d: RawDevice, ov: FleetDeviceOverride): FleetHost | null
api: ov.api ?? (cls === "server" ? "transmission_rpc" : "none"),
addr: ov.addr ?? (sshDest ? bareAddr(sshDest) : null),
ssh: sshDest,
mediaRoot: ov.mediaRoot ?? null,
capacity: capacity(ov, alwaysOn),
};
}
@ -197,6 +199,7 @@ function hostFromFleetDevice(d: AppFleetDevice, ov: FleetDeviceOverride, warning
api: ov.api ?? (hasTransmission ? "transmission_rpc" : "none"),
addr: ov.addr ?? (sshDest ? bareAddr(sshDest) : null),
ssh: sshDest,
mediaRoot: ov.mediaRoot ?? null,
capacity: capacity(ov, alwaysOn),
};
}

240
governor/src/fleet/serve.ts Normal file
View file

@ -0,0 +1,240 @@
// Fleet broadcast service — the stage-3 "broadcast serves peers_for" HTTP
// surface from the mesh design spec. In the spec the broadcast-duty host
// (public_ip && always_on) anchors the user-owned meta-tracker; no such host
// exists yet, but this service is runnable TODAY on any node over the home
// LAN / WireGuard overlay so other fleet devices can already consume
// peers_for(infohash), the registry, custody reports and reaper verdicts.
//
// Constraints:
// - GET-only, JSON-only. Read path of the fleet engine — NO actuation
// endpoints (re-pins, reannounces etc. stay with the CLI/daemon).
// - All data access is injected (FleetServeDeps) so the handler is testable
// without ssh or a live transmission daemon.
// - Degradation over failure: transmission being down turns /custody and
// /peers_for into static-holdings-only answers with a `warnings` field,
// never a 500. /reaper is the documented exception — its verdicts are
// meaningless without live vitals, so transmission-down is an honest 503.
import { loadRegistry } from "./registry.ts";
import type { FleetRegistry } from "./registry.ts";
import { assignDuties } from "./duties.ts";
import { floorCheck } from "./custody.ts";
import { classify, planRecovery } from "./reaper.ts";
import { peersFor } from "./peers.ts";
import {
fetchHoldings, fetchLivePeers, fetchVitals, staticHoldings, transmissionHost,
} from "./transmission.ts";
import type { LiveSwarmPeer } from "./transmission.ts";
import type { FleetHost, Holding, TorrentVitals } from "./types.ts";
import { log } from "../log.ts";
/** Default bind port for the fleet broadcast service. */
export const DEFAULT_FLEET_PORT = 9094;
/**
* Every data access the handler performs, injected. Production uses the real
* registry/transmission modules (`startFleetServer`); tests hand in fakes and
* drive the handler with plain `Request` objects no network, no ssh.
*/
export interface FleetServeDeps {
loadRegistry: () => FleetRegistry;
fetchVitals: (host: FleetHost) => TorrentVitals[];
fetchHoldings: (host: FleetHost) => Holding[];
fetchLivePeers: (host: FleetHost, infohash: string) => LiveSwarmPeer[];
/** Clock for reaper idle-age classification. */
nowEpochS: () => number;
}
/** Handler options. `token` enables bearer auth on everything but /health. */
export interface FleetHandlerOpts {
token?: string;
}
/** v1 (40 hex) or v2 (64 hex) infohash. */
const INFOHASH_RE = /^(?:[0-9a-fA-F]{40}|[0-9a-fA-F]{64})$/;
function json(body: unknown, status = 200): Response {
return new Response(JSON.stringify(body), {
status,
headers: { "content-type": "application/json" },
});
}
function errMsg(err: unknown): string {
return err instanceof Error ? err.message : String(err);
}
/**
* Fleet-wide holdings: the transmission host's torrents + fleet.json static
* mirrors. Transmission failure (or absence) degrades to static-only and
* reports why callers surface the warning instead of erroring.
*/
function gatherHoldings(
deps: FleetServeDeps,
reg: FleetRegistry,
): { holdings: Holding[]; tx: FleetHost | null; warnings: string[] } {
const warnings: string[] = [];
const tx = transmissionHost(reg.hosts);
let holdings = staticHoldings(reg.staticHoldings);
if (tx === null) {
warnings.push("no transmission host in registry — static holdings only");
} else {
try {
holdings = [...deps.fetchHoldings(tx), ...holdings];
} catch (err) {
warnings.push(`transmission unreachable on ${tx.id} — static holdings only: ${errMsg(err)}`);
}
}
return { holdings, tx, warnings };
}
/**
* Build the request handler. All routes are GET and answer JSON:
*
* /health liveness (always unauthenticated)
* /registry hosts + computed duties + sources + floor + warnings
* /custody floor reports (degrades to static holdings)
* /reaper classify + planRecovery verdicts + counts (503 when
* transmission is down reaper needs live vitals)
* /peers_for/<infohash> provenance-tagged peers; ?title= helps name-keyed
* static holdings match hash-less entries
*
* When `opts.token` is set, every route except /health requires
* `Authorization: Bearer <token>`.
*/
export function createFleetHandler(
deps: FleetServeDeps,
opts: FleetHandlerOpts = {},
): (req: Request) => Promise<Response> {
const { token } = opts;
// Plain string equality, deliberately NOT constant-time: this service lives
// on the home LAN / WireGuard overlay where a timing oracle is outside the
// threat model. Revisit if a real public-IP broadcast host ever runs this.
const authorized = (req: Request): boolean =>
token === undefined || req.headers.get("authorization") === `Bearer ${token}`;
return async (req: Request): Promise<Response> => {
const path = new URL(req.url).pathname;
// /health stays open so dumb probes (curl in a cron, the app's reachability
// badge) need no secret just to ask "are you alive?".
if (path === "/health") {
return json({ ok: true, ts: deps.nowEpochS() });
}
if (!authorized(req)) {
return json({ error: "unauthorized" }, 401);
}
if (req.method !== "GET") {
return json({ error: `method ${req.method} not allowed — all routes are GET` }, 405);
}
if (path === "/registry") {
const reg = deps.loadRegistry();
const { duties, warnings } = assignDuties(reg.hosts);
return json({
hosts: reg.hosts,
duties: Object.fromEntries(duties),
sources: reg.sources,
floorCopies: reg.floorCopies,
warnings: [...reg.warnings, ...warnings],
});
}
if (path === "/custody") {
const reg = deps.loadRegistry();
const { holdings, warnings } = gatherHoldings(deps, reg);
const reports = floorCheck(holdings, reg.hosts, reg.floorCopies);
return json({ reports, floorCopies: reg.floorCopies, warnings });
}
if (path === "/reaper") {
const reg = deps.loadRegistry();
const tx = transmissionHost(reg.hosts);
if (tx === null) {
return json({ error: "no host with api=transmission_rpc in the registry" }, 503);
}
// Documented choice: unlike /custody, transmission-down here is a 503.
// Reaper verdicts exist to classify LIVE swarm state — answering from
// static holdings would fabricate health for torrents we can't see.
let vitals: TorrentVitals[];
try {
vitals = deps.fetchVitals(tx);
} catch (err) {
return json({ error: `transmission unreachable on ${tx.id}: ${errMsg(err)}` }, 503);
}
const { holdings, warnings } = gatherHoldings(deps, reg);
const now = deps.nowEpochS();
const verdicts = planRecovery(vitals.map(v => classify(v, now)), holdings, tx.id);
const counts = {
healthy: verdicts.filter(v => v.health === "healthy").length,
stalled: verdicts.filter(v => v.health === "stalled").length,
dead: verdicts.filter(v => v.health === "dead").length,
};
return json({ verdicts, counts, warnings });
}
const peersMatch = /^\/peers_for\/([^/]+)$/.exec(path);
if (peersMatch) {
const raw = decodeURIComponent(peersMatch[1]!);
if (!INFOHASH_RE.test(raw)) {
return json({ error: `"${raw}" is not a 40- or 64-char hex infohash` }, 400);
}
const infohash = raw.toLowerCase();
const reg = deps.loadRegistry();
const { holdings, tx, warnings } = gatherHoldings(deps, reg);
// Title aids name-keyed static holdings (they carry no hash); explicit
// ?title= wins, otherwise resolve it from a transmission holding.
const title = new URL(req.url).searchParams.get("title")
?? holdings.find(h => h.infohash?.toLowerCase() === infohash)?.title
?? null;
// Live swarm peers are best-effort: failure degrades to fleet-only.
let livePeers: LiveSwarmPeer[] = [];
if (tx !== null) {
try {
livePeers = deps.fetchLivePeers(tx, infohash);
} catch (err) {
warnings.push(`live swarm peers unavailable: ${errMsg(err)}`);
}
}
const peers = peersFor({
infohash, title, sources: reg.sources, hosts: reg.hosts, holdings, livePeers,
});
return json({ infohash, title, peers, warnings });
}
return json({ error: `no such route: ${path}` }, 404);
};
}
/** Server options. Token falls back to the FLEET_TOKEN env var. */
export interface FleetServerOpts {
port?: number;
token?: string;
}
/**
* Run the broadcast service with the real registry/transmission deps.
* Binds 0.0.0.0 the whole point is that OTHER fleet devices reach it over
* the LAN/overlay. Returns the Bun server (caller owns `.stop()`).
*/
export function startFleetServer(opts: FleetServerOpts = {}): ReturnType<typeof Bun.serve> {
const port = opts.port ?? DEFAULT_FLEET_PORT;
const token = opts.token ?? process.env["FLEET_TOKEN"] ?? undefined;
const handler = createFleetHandler(
{
loadRegistry,
fetchVitals,
fetchHoldings,
fetchLivePeers,
nowEpochS: () => Math.floor(Date.now() / 1000),
},
{ token },
);
const server = Bun.serve({ hostname: "0.0.0.0", port, fetch: handler });
log.info(
`fleet serve listening on 0.0.0.0:${server.port} `
+ `(auth: ${token !== undefined ? "bearer token required" : "OFF — open to the overlay"})`,
);
return server;
}

View file

@ -39,6 +39,8 @@ export interface FleetHost {
addr: string | null;
/** ssh destination (user@host) for probes/actuation; null = not reachable via ssh. */
ssh: string | null;
/** Media root on the host (rsync target / df probe path); from fleet.json policy. */
mediaRoot?: string | null;
capacity: HostCapacity;
}

View file

@ -0,0 +1,220 @@
// createFleetHandler is driven directly with Request objects and fake deps —
// no network, no ssh, no real registry files. One smoke test boots Bun.serve
// on an ephemeral port to prove the handler plugs into a real server.
import { describe, expect, test } from "bun:test";
import { createFleetHandler, DEFAULT_FLEET_PORT } from "../../src/fleet/serve.ts";
import type { FleetServeDeps } from "../../src/fleet/serve.ts";
import type { FleetRegistry } from "../../src/fleet/registry.ts";
import { normalizeSource } from "../../src/fleet/peers.ts";
import type { FleetHost, Holding, TorrentVitals } from "../../src/fleet/types.ts";
const HASH = "a".repeat(40);
const NOW = 1_750_000_000;
function host(over: Partial<FleetHost> & { id: string }): FleetHost {
return {
name: over.id,
class: "server",
reachable: "wireguard",
alwaysOn: true,
onHomeIp: true,
api: "none",
addr: over.id + ".fleet",
ssh: null,
capacity: { diskFreeBytes: null, upBwKbs: null, uptimeScore: 1 },
...over,
};
}
function registry(over: Partial<FleetRegistry> = {}): FleetRegistry {
return {
hosts: [
host({ id: "black", api: "transmission_rpc", ssh: "lilith@10.0.0.11", addr: "10.0.0.11" }),
host({ id: "apricot", addr: "10.9.0.2" }),
],
sources: [normalizeSource({ id: "dht", kind: "dht" })],
floorCopies: 2,
staticHoldings: { apricot: ["Show Y"] },
warnings: [],
...over,
};
}
function txHolding(over: Partial<Holding> = {}): Holding {
return {
hostId: "black", title: "Show Y", infohash: HASH,
complete: true, completedAt: 100, sizeBytes: null,
...over,
};
}
function vitals(over: Partial<TorrentVitals> = {}): TorrentVitals {
return {
name: "Show Y", infohash: HASH, percentDone: 1, status: 6,
error: 0, errorString: "", peersConnected: 2,
rateDownloadBps: 0, rateUploadBps: 0, activityDate: NOW, addedDate: NOW - 3600,
...over,
};
}
function deps(over: Partial<FleetServeDeps> = {}): FleetServeDeps {
return {
loadRegistry: () => registry(),
fetchVitals: () => [vitals()],
fetchHoldings: () => [txHolding()],
fetchLivePeers: () => [{ address: "203.0.113.9", port: 51413 }],
nowEpochS: () => NOW,
...over,
};
}
async function body(res: Response): Promise<Record<string, unknown>> {
expect(res.headers.get("content-type")).toBe("application/json");
return await res.json() as Record<string, unknown>;
}
const get = (path: string, headers: Record<string, string> = {}): Request =>
new Request(`http://fleet.test${path}`, { headers });
describe("fleet serve — routes", () => {
test("/health answers ok with a timestamp", async () => {
const res = await createFleetHandler(deps())(get("/health"));
expect(res.status).toBe(200);
expect(await body(res)).toEqual({ ok: true, ts: NOW });
});
test("/registry returns hosts, serialized duties, sources, floor, warnings", async () => {
const res = await createFleetHandler(deps())(get("/registry"));
expect(res.status).toBe(200);
const out = await body(res);
expect((out["hosts"] as unknown[]).length).toBe(2);
// duties: the Map comes out as a plain object, every host present.
const duties = out["duties"] as Record<string, string[]>;
expect(Object.keys(duties).sort()).toEqual(["apricot", "black"]);
expect(duties["black"]).toContain("f2f_relay");
expect(out["floorCopies"]).toBe(2);
expect(Array.isArray(out["warnings"])).toBe(true);
// no public_ip host in the fixture → the no-broadcast warning surfaces.
expect((out["warnings"] as string[]).some(w => w.includes("broadcast"))).toBe(true);
});
test("/custody degrades to static-only with a warning when transmission throws", async () => {
const res = await createFleetHandler(deps({
fetchHoldings: () => { throw new Error("ssh: connect refused"); },
}))(get("/custody"));
expect(res.status).toBe(200); // degradation, never a 500
const out = await body(res);
const reports = out["reports"] as Array<{ title: string; completeCopies: number }>;
expect(reports.map(r => r.title)).toEqual(["Show Y"]); // static holding survives
expect(reports[0]!.completeCopies).toBe(1);
expect((out["warnings"] as string[]).some(w => w.includes("transmission unreachable"))).toBe(true);
});
test("/custody merges transmission and static holdings when both work", async () => {
const res = await createFleetHandler(deps())(get("/custody"));
const out = await body(res);
const reports = out["reports"] as Array<{ title: string; completeCopies: number; breach: boolean }>;
expect(reports[0]!.completeCopies).toBe(2); // black (tx) + apricot (static)
expect(reports[0]!.breach).toBe(false);
expect(out["warnings"]).toEqual([]);
});
test("/reaper classifies vitals and counts health buckets", async () => {
const res = await createFleetHandler(deps())(get("/reaper"));
expect(res.status).toBe(200);
const out = await body(res);
expect(out["counts"]).toEqual({ healthy: 1, stalled: 0, dead: 0 });
const verdicts = out["verdicts"] as Array<{ health: string }>;
expect(verdicts[0]!.health).toBe("healthy");
});
test("/reaper is 503 when transmission is down (documented exception)", async () => {
const res = await createFleetHandler(deps({
fetchVitals: () => { throw new Error("ssh timed out"); },
}))(get("/reaper"));
expect(res.status).toBe(503);
expect((await body(res))["error"]).toContain("transmission unreachable");
});
test("/peers_for/<hash> unions fleet holders with live swarm peers", async () => {
const res = await createFleetHandler(deps())(get(`/peers_for/${HASH}`));
expect(res.status).toBe(200);
const out = await body(res);
expect(out["infohash"]).toBe(HASH);
const peers = out["peers"] as Array<{ addr: string; sourceKind: string }>;
expect(peers.map(p => `${p.sourceKind}:${p.addr}`)).toEqual([
"fleet_host:10.0.0.11",
"dht:203.0.113.9:51413",
]);
});
test("/peers_for ?title= matches name-keyed static holdings when live peers fail", async () => {
const res = await createFleetHandler(deps({
fetchHoldings: () => { throw new Error("down"); },
fetchLivePeers: () => { throw new Error("down"); },
}))(get(`/peers_for/${HASH}?title=${encodeURIComponent("Show Y")}`));
expect(res.status).toBe(200);
const out = await body(res);
const peers = out["peers"] as Array<{ addr: string; sourceKind: string }>;
expect(peers).toEqual([
{ addr: "10.9.0.2", sourceKind: "fleet_host", sourceId: "dht" satisfies string, servedVia: "wireguard" } as never,
].map(() => peers[0]!)); // shape asserted below
expect(peers[0]!.addr).toBe("10.9.0.2"); // apricot's static copy, hash-less
expect((out["warnings"] as string[]).some(w => w.includes("live swarm peers unavailable"))).toBe(true);
});
test("malformed infohash is a 400", async () => {
const handler = createFleetHandler(deps());
for (const bad of ["nothex", "a".repeat(39), "g".repeat(40), "a".repeat(63)]) {
const res = await handler(get(`/peers_for/${bad}`));
expect(res.status).toBe(400);
expect((await body(res))["error"]).toContain("infohash");
}
});
test("unknown route is a 404 with an error body", async () => {
const res = await createFleetHandler(deps())(get("/teapot"));
expect(res.status).toBe(404);
expect((await body(res))["error"]).toContain("/teapot");
});
});
describe("fleet serve — bearer token", () => {
const handler = createFleetHandler(deps(), { token: "s3cret" });
test("401 without an Authorization header", async () => {
const res = await handler(get("/registry"));
expect(res.status).toBe(401);
expect((await body(res))["error"]).toBe("unauthorized");
});
test("401 with the wrong token", async () => {
expect((await handler(get("/registry", { authorization: "Bearer nope" }))).status).toBe(401);
});
test("200 with the correct token", async () => {
expect((await handler(get("/registry", { authorization: "Bearer s3cret" }))).status).toBe(200);
});
test("/health is exempt from auth", async () => {
expect((await handler(get("/health"))).status).toBe(200);
});
});
describe("fleet serve — smoke", () => {
test("handler plugs into Bun.serve on an ephemeral port", async () => {
const server = Bun.serve({ port: 0, fetch: createFleetHandler(deps()) });
try {
const res = await fetch(`http://127.0.0.1:${server.port}/health`);
expect(res.status).toBe(200);
expect(((await res.json()) as { ok: boolean }).ok).toBe(true);
} finally {
server.stop(true);
}
});
test("default port constant is 9094", () => {
expect(DEFAULT_FLEET_PORT).toBe(9094);
});
});

View file

@ -273,8 +273,11 @@ status_json() {
# Host load: load averages + mpv's instantaneous %CPU (100 = one core). The
# decode cost is what changes with quality; computed from a 0.25s /proc delta so
# it's "right now", not the lifetime average ps reports. No sudo needed.
# helper_sha = sha256 of this very script, so the app's Devices tab can compare
# the deployed copy against the repo's vendored source and flag stale deploys.
stats_json() {
local l1 l5 l15 cores pid mcpu="null" t1 t2 p1 p2
local l1 l5 l15 cores pid mcpu="null" t1 t2 p1 p2 hsha
hsha=$(sha256sum "$0" 2>/dev/null | awk '{print $1}')
read -r l1 l5 l15 _ < /proc/loadavg
cores=$(nproc 2>/dev/null || echo 1)
pid=$(pgrep -x mpv | head -1)
@ -289,8 +292,8 @@ stats_json() {
'BEGIN{printf "%.1f", 100.0*n*(b-a)/(d-c)}')
fi
fi
printf '{"load1":%s,"load5":%s,"load15":%s,"cores":%s,"mpv_cpu":%s}\n' \
"$l1" "$l5" "$l15" "$cores" "$mcpu"
printf '{"load1":%s,"load5":%s,"load15":%s,"cores":%s,"mpv_cpu":%s,"helper_sha":"%s"}\n' \
"$l1" "$l5" "$l15" "$cores" "$mcpu" "$hsha"
}
# --- dispatch ---------------------------------------------------------------