diff --git a/.project/handoffs/20260609_parallel-completion-plan.md b/.project/handoffs/20260609_parallel-completion-plan.md new file mode 100644 index 0000000..9441092 --- /dev/null +++ b/.project/handoffs/20260609_parallel-completion-plan.md @@ -0,0 +1,29 @@ +# Plan — parallel completion of the remaining buildable fleet work + +Date: 2026-06-09 (late evening). Follows +[20260609_fleet-engine-title-refiner.md](./20260609_fleet-engine-title-refiner.md). +Execution model: four agents in parallel, each owning ONE new module + its +tests in `governor/src/fleet/` — zero shared-file edits between tracks; the +coordinating session pre-plumbed shared types (`mediaRoot` on +`FleetHost`/`FleetDeviceOverride`) and does all `cli.ts`/`index.ts` wiring, +integration testing, and docs afterward. + +| Track | Module | Delivers | +|---|---|---| +| A | `actuate.ts` (+ may append exports to `transmission.ts`) | re-pin execution (rsync holder→target over ssh, dry-run default), recorded holdings (`~/.local/state/tv-anarchy/fleet-holdings.json`), research-feed (torrent-search invocation from a dead torrent's cleaned name) | +| B | `daemon.ts` | periodic fleet tick for launchd: duties Δ-log → floor check → reaper (+safe nudges when enabled), injectable deps, own state file (`fleet-daemon-state.json`) | +| C | `serve.ts` | HTTP service: `/registry` `/custody` `/reaper` `/peers_for/` `/health`, optional bearer token — the broadcast-host service, runnable on any node today | +| D | `probe.ts` | ssh `df` disk probes + EWMA uptime score (`fleet-probe-state.json`), pure merge into host capacity (feeds custody eligibility) | + +Constraints all tracks share: Bun + strict tsc (`bunx tsc --noEmit` must stay +clean), `bun:test` tests in `governor/test/fleet/`, pure logic separated from +I/O, read-only by default / mutation behind explicit flags, no edits to +existing files (exception: track A may append to `transmission.ts`). + +Still blocked regardless of parallelism (unchanged): WG fabric plane 1 (the +`10.9.0.4` decision), seedbox provisioning, friend-mesh, private trackers, +Discord planes. + +Completion criteria: all four modules merged, CLI wired +(`fleet repin|daemon|serve|probe`), full `bun test` + typecheck green, docs +(operations/roadmap/fleet README) updated, results appended here. diff --git a/Sources/TVAnarchy/DevicesView.swift b/Sources/TVAnarchy/DevicesView.swift index 64be339..fb9191d 100644 --- a/Sources/TVAnarchy/DevicesView.swift +++ b/Sources/TVAnarchy/DevicesView.swift @@ -53,6 +53,9 @@ struct DevicesView: View { .help("Cache the next episodes of your recent shows to this device") } if let s = controller.hostStatsByID[d.id] { loadPill(s) } + if case .outdated(let deployed)? = controller.helperFreshness(d.id) { + outdatedPill(deployed: deployed) + } if restarting.contains(d.id) { ProgressView().controlSize(.small) .help("Restarting the player service…") @@ -139,6 +142,20 @@ struct DevicesView: View { } } + /// Stale-deploy badge: the helper script running on the device is not the + /// one vendored in this repo (or predates self-reporting entirely), so the + /// app may be speaking verbs the device doesn't know yet. Redeploy per + /// mcp/README to clear it. + private func outdatedPill(deployed: String?) -> some View { + Label("not up to date", systemImage: "exclamationmark.arrow.triangle.2.circlepath") + .font(.caption2) + .padding(.horizontal, 6).padding(.vertical, 1) + .background(.orange.opacity(0.2), in: Capsule()) + .foregroundStyle(.orange) + .help("The helper deployed on this device (\(deployed.map { String($0.prefix(7)) } ?? "unstamped — pre-2026-06 deploy")) " + + "differs from the repo's copy — redeploy it (mcp/README → deploy step).") + } + /// System-load badge: load1 normalised by core count. <0.6/core is comfortable, /// up to 1.0/core is busy-but-keeping-up, above means the run queue is oversubscribed. @ViewBuilder diff --git a/Sources/TVAnarchyCore/HelperDeployment.swift b/Sources/TVAnarchyCore/HelperDeployment.swift new file mode 100644 index 0000000..2e38b67 --- /dev/null +++ b/Sources/TVAnarchyCore/HelperDeployment.swift @@ -0,0 +1,41 @@ +import CryptoKit +import Foundation + +/// Answers "is the helper a fleet device runs the same one vendored in this +/// repo?" — the Devices tab's freshness badge. The repo copy is the source of +/// truth (mcp/README's deploy step ships it byte-for-byte), so freshness is a +/// straight content-hash comparison: the device's `stats` report carries the +/// sha256 of the script that produced it; the app hashes the vendored source. +/// No version constants to bump, nothing that can drift. +public enum HelperDeployment { + public enum Freshness: Equatable, Sendable { + case current + /// `deployed` is the device-reported hash; nil means the deployed helper + /// predates self-reporting entirely (necessarily stale). + case outdated(deployed: String?) + } + + /// Repo source for each known helper bin, keyed by basename (the bin path on + /// the device — e.g. `/usr/local/bin/black-tv` — names the deployed copy). + private static let vendoredSources: [String: String] = [ + "black-tv": "mcp/src/blacktv/black-tv.sh", + ] + + /// sha256 of the vendored source for `bin`, or nil when the bin is not a + /// known helper or the repo checkout is absent (an installed app without a + /// repo can't judge freshness — the badge simply stays off). + public static func expectedSHA(forBin bin: String) -> String? { + let name = (bin as NSString).lastPathComponent + guard let rel = vendoredSources[name], + let data = try? Data(contentsOf: RepoPaths.root.appendingPathComponent(rel)) + else { return nil } + return SHA256.hash(data: data).map { String(format: "%02x", $0) }.joined() + } + + /// Judge a device report against an expectation. nil expectation (unknown + /// helper / no repo) → nil: freshness can't be judged, show nothing. + public static func freshness(expected: String?, reported: String?) -> Freshness? { + guard let expected else { return nil } + return reported == expected ? .current : .outdated(deployed: reported) + } +} diff --git a/Sources/TVAnarchyCore/HostStats.swift b/Sources/TVAnarchyCore/HostStats.swift index f992796..96474fc 100644 --- a/Sources/TVAnarchyCore/HostStats.swift +++ b/Sources/TVAnarchyCore/HostStats.swift @@ -8,6 +8,10 @@ public struct HostStats: Decodable, Sendable, Equatable { public let load15: Double public let cores: Int public let mpv_cpu: Double? // null when nothing is playing + /// sha256 of the helper script that served this report — the Devices tab + /// compares it against the repo's vendored copy to flag stale deploys. + /// Absent from helpers that predate self-reporting (itself a stale sign). + public let helper_sha: String? } /// A target that can report its host's load (only black — it's a real machine diff --git a/Sources/TVAnarchyCore/PlayerController.swift b/Sources/TVAnarchyCore/PlayerController.swift index 5bd1e82..ca3f652 100644 --- a/Sources/TVAnarchyCore/PlayerController.swift +++ b/Sources/TVAnarchyCore/PlayerController.swift @@ -25,6 +25,10 @@ public final class PlayerController { /// Per-device load, sampled while the Devices tab is visible. Only HostStatsProvider /// targets report (e.g. black); others stay absent. Drives the load badge there. public private(set) var hostStatsByID: [String: HostStats] = [:] + /// Per-device expected helper hash (sha256 of the repo's vendored script for + /// the device's delegated-command bin), computed at reload. Devices without a + /// known helper — or an app run without a repo checkout — are simply absent. + private var expectedHelperSHA: [String: String] = [:] /// Tracks of the active target's current file (empty unless TrackSelectable). public private(set) var audioTracks: [MediaTrack] = [] public private(set) var subtitleTracks: [MediaTrack] = [] @@ -70,6 +74,10 @@ public final class PlayerController { public func reload() { let cfg = DevicesConfig.loadOrSeed() targets = cfg.devices.compactMap(Self.makeTarget) + expectedHelperSHA = cfg.devices.reduce(into: [:]) { acc, d in + if let bin = Self.helperBin(d.commands), + let sha = HelperDeployment.expectedSHA(forBin: bin) { acc[d.id] = sha } + } if active == nil { activeID = defaultTargetID } var next: [String: Snapshot] = [:] for t in targets { @@ -145,6 +153,24 @@ public final class PlayerController { /// Re-seed the default set (plum VLC + black mpv-ipc with LAN+overlay endpoints). public func resetDevicesToDefault() { saveDevices(DevicesConfig.seeded().devices) } + // MARK: Helper deployment freshness (Devices tab) + + /// The helper bin a device's delegated commands run (e.g. `/usr/local/bin/ + /// black-tv`) — the first word of whichever template is configured. + private static func helperBin(_ c: CommandsConfig?) -> String? { + guard let c else { return nil } + return (c.stats ?? c.stop ?? c.launchFile ?? c.releases)?.first + } + + /// Is `id`'s deployed helper the one vendored in the repo? nil when freshness + /// can't be judged (no known helper, no repo checkout, or no stats report yet + /// — an unreachable device shouldn't double-flag as outdated). + public func helperFreshness(_ id: String) -> HelperDeployment.Freshness? { + guard let stats = hostStatsByID[id] else { return nil } + return HelperDeployment.freshness(expected: expectedHelperSHA[id], + reported: stats.helper_sha) + } + // MARK: Device service restart (Devices tab) /// Whether `id`'s host-side player service can be restarted (the target diff --git a/Tests/TVAnarchyCoreTests/HelperDeploymentTests.swift b/Tests/TVAnarchyCoreTests/HelperDeploymentTests.swift new file mode 100644 index 0000000..6fce38c --- /dev/null +++ b/Tests/TVAnarchyCoreTests/HelperDeploymentTests.swift @@ -0,0 +1,53 @@ +import CryptoKit +import XCTest +@testable import TVAnarchyCore + +/// The Devices tab's "not up to date" badge: a device reports the sha256 of the +/// helper script it runs; the app hashes the repo's vendored copy. Freshness is +/// the comparison — no version constants anywhere. +final class HelperDeploymentTests: XCTestCase { + func testFreshnessJudgement() { + // No expectation (unknown helper / no repo checkout) → can't judge. + XCTAssertNil(HelperDeployment.freshness(expected: nil, reported: "abc")) + XCTAssertNil(HelperDeployment.freshness(expected: nil, reported: nil)) + // Matching hashes → current. + XCTAssertEqual(HelperDeployment.freshness(expected: "abc", reported: "abc"), .current) + // Mismatch → outdated, carrying what the device reported. + XCTAssertEqual(HelperDeployment.freshness(expected: "abc", reported: "def"), + .outdated(deployed: "def")) + // A helper too old to self-report is necessarily outdated. + XCTAssertEqual(HelperDeployment.freshness(expected: "abc", reported: nil), + .outdated(deployed: nil)) + } + + func testExpectedSHAHashesVendoredScript() throws { + // Point RepoPaths at a throwaway repo via $TV_ANARCHY_REPO. + let repo = FileManager.default.temporaryDirectory + .appendingPathComponent("helper-deploy-test-\(UUID().uuidString)") + let script = repo.appendingPathComponent("mcp/src/blacktv/black-tv.sh") + try FileManager.default.createDirectory(at: script.deletingLastPathComponent(), + withIntermediateDirectories: true) + let body = Data("#!/usr/bin/env bash\necho hi\n".utf8) + try body.write(to: script) + setenv("TV_ANARCHY_REPO", repo.path, 1) + defer { + unsetenv("TV_ANARCHY_REPO") + try? FileManager.default.removeItem(at: repo) + } + + let expected = SHA256.hash(data: body).map { String(format: "%02x", $0) }.joined() + XCTAssertEqual(HelperDeployment.expectedSHA(forBin: "/usr/local/bin/black-tv"), expected) + // The bin's basename keys the lookup — an unknown helper has no expectation. + XCTAssertNil(HelperDeployment.expectedSHA(forBin: "/usr/local/bin/some-other-helper")) + } + + func testHostStatsDecodesWithAndWithoutHelperSha() throws { + let new = #"{"load1":0.5,"load5":0.4,"load15":0.3,"cores":8,"mpv_cpu":null,"helper_sha":"deadbeef"}"# + let old = #"{"load1":0.5,"load5":0.4,"load15":0.3,"cores":8,"mpv_cpu":12.5}"# + let n = try JSONDecoder().decode(HostStats.self, from: Data(new.utf8)) + let o = try JSONDecoder().decode(HostStats.self, from: Data(old.utf8)) + XCTAssertEqual(n.helper_sha, "deadbeef") + XCTAssertNil(o.helper_sha) // pre-stamp deploy still decodes + XCTAssertEqual(o.mpv_cpu, 12.5) + } +} diff --git a/governor/src/fleet/probe.ts b/governor/src/fleet/probe.ts new file mode 100644 index 0000000..56a8675 --- /dev/null +++ b/governor/src/fleet/probe.ts @@ -0,0 +1,228 @@ +// Capacity probe — turns the registry's static capacity guesses into measured +// facts: `df -kP` over ssh for diskFreeBytes, and an EWMA over probe +// reachability for the rolling uptimeScore the design spec asks for +// (uptime_score ∈ [0,1]). State persists at +// ~/.local/state/tv-anarchy/fleet-probe-state.json (path injectable for tests). +// +// Merge rule (withProbedCapacity) — deliberately simple: a probed value WINS +// over the registry value for BOTH diskFreeBytes and uptimeScore, including +// hand-configured fleet.json overrides. Detecting "explicit uptimeScore +// override vs class default" from a finished FleetHost is guesswork (0.5 on a +// roamer is indistinguishable either way), so we don't pretend to. The one +// exception: a state entry whose diskFreeBytes is null (host answered ssh but +// df never parsed, or disk was never measured) leaves the registry's +// diskFreeBytes alone — null is absence of measurement, not a measurement. +// +// Pure throughout except probeHost/probeAll (ssh) and load/saveProbeState +// (filesystem). No actuation here — callers feed the merged hosts to +// duties/custody. + +import { spawnSync } from "node:child_process"; +import { existsSync, mkdirSync, readFileSync, writeFileSync } from "node:fs"; +import { homedir } from "node:os"; +import { dirname, join } from "node:path"; +import type { FleetHost } from "./types.ts"; + +/** Default ssh transport timeout for one probe (ConnectTimeout is 8s; this + * bounds the whole spawn including a slow remote df). */ +export const DEFAULT_PROBE_TIMEOUT_MS = 20_000; + +/** EWMA smoothing factor: ~5 probes to mostly forget an outage. */ +export const DEFAULT_UPTIME_ALPHA = 0.2; + +/** Real state file; tests inject a temp path instead. */ +export const PROBE_STATE_PATH = join( + homedir(), ".local", "state", "tv-anarchy", "fleet-probe-state.json", +); + +const SSH_OPTS = ["-o", "ConnectTimeout=8", "-o", "BatchMode=yes"]; + +export interface ProbeResult { + hostId: string; + /** ssh exited 0 — the host answered. */ + reachable: boolean; + /** Parsed free bytes; null when unreachable or df output was garbage. */ + diskFreeBytes: number | null; +} + +export interface ProbeStateEntry { + /** Rolling EWMA uptime ∈ [0,1]. */ + uptimeScore: number; + /** Unix epoch seconds of the last probe round that touched this host. */ + lastProbeAt: number; + /** Last successfully measured free bytes; null = never measured. */ + diskFreeBytes: number | null; +} + +export interface ProbeState { + hosts: Record; +} + +// --- df parsing -------------------------------------------------------------- + +/** + * Free bytes from `df -kP ` output (POSIX: header + one data line, + * "Available" is the 4th column in 1K blocks). Tolerates motd/noise lines + * around the table and locale spacing; returns null on garbage. + */ +export function parseDfFreeBytes(out: string): number | null { + for (const line of out.split("\n")) { + const tokens = line.trim().split(/\s+/); + // Data line: filesystem + three numeric columns (blocks, used, available). + // The header never matches ("1024-blocks" isn't numeric), nor does noise. + if (tokens.length < 5) continue; + if (!/^\d+$/.test(tokens[1] ?? "") || !/^\d+$/.test(tokens[2] ?? "") || !/^\d+$/.test(tokens[3] ?? "")) { + continue; + } + return Number(tokens[3]) * 1024; + } + return null; +} + +// --- probing ----------------------------------------------------------------- + +/** "~" stays bare so the remote shell expands it; anything else is + * single-quoted (mediaRoot may contain spaces). */ +function shellPath(path: string): string { + return path === "~" ? "~" : `'${path.replace(/'/g, "'\\''")}'`; +} + +/** + * Full ssh argv for one host's disk probe (everything after the `ssh` + * executable), or null when the host has no ssh destination. Pure — this is + * the testable core of probeHost. + */ +export function dfProbeArgv(host: FleetHost): string[] | null { + if (!host.ssh) return null; + return [...SSH_OPTS, host.ssh, `df -kP ${shellPath(host.mediaRoot ?? "~")}`]; +} + +/** Probe one host: ssh `df -kP` and parse. No ssh destination → unreachable. */ +export function probeHost(host: FleetHost, opts?: { timeoutMs?: number }): ProbeResult { + const argv = dfProbeArgv(host); + if (argv === null) return { hostId: host.id, reachable: false, diskFreeBytes: null }; + const r = spawnSync("ssh", argv, { + encoding: "utf8", + timeout: opts?.timeoutMs ?? DEFAULT_PROBE_TIMEOUT_MS, + }); + const reachable = r.status === 0; + return { + hostId: host.id, + reachable, + diskFreeBytes: reachable ? parseDfFreeBytes(r.stdout ?? "") : null, + }; +} + +/** Probe every host, sequentially (a handful of hosts; simplicity wins). */ +export function probeAll(hosts: FleetHost[], opts?: { timeoutMs?: number }): ProbeResult[] { + return hosts.map(h => probeHost(h, opts)); +} + +// --- rolling uptime ---------------------------------------------------------- + +/** + * One EWMA step of the rolling uptime score. `prev` null seeds from the first + * observation (up → 1, down → 0); the result is clamped to [0,1]. + */ +export function updateUptimeScore( + prev: number | null, + observedUp: boolean, + alpha: number = DEFAULT_UPTIME_ALPHA, +): number { + const obs = observedUp ? 1 : 0; + if (prev === null) return obs; + return Math.min(1, Math.max(0, alpha * obs + (1 - alpha) * prev)); +} + +/** + * Fold one probe round into the state — pure; the input state is untouched. + * Unreachable rounds decay uptimeScore but keep the last measured disk number + * (a sleeping laptop's disk didn't vanish). Hosts absent from `results` carry + * over unchanged. + */ +export function recordProbeRound( + state: ProbeState, + results: ProbeResult[], + nowEpochS: number, +): ProbeState { + const hosts: Record = { ...state.hosts }; + const ordered = [...results].sort((a, b) => a.hostId.localeCompare(b.hostId)); + for (const r of ordered) { + const prev = state.hosts[r.hostId]; + hosts[r.hostId] = { + uptimeScore: updateUptimeScore(prev?.uptimeScore ?? null, r.reachable), + lastProbeAt: nowEpochS, + diskFreeBytes: r.diskFreeBytes ?? prev?.diskFreeBytes ?? null, + }; + } + return { hosts }; +} + +// --- merge into the registry --------------------------------------------------- + +/** + * Overlay probed capacity onto registry hosts — pure; returns new host objects + * for probed hosts, passes unprobed hosts through by reference. See the + * file-top comment for the merge rule (probe wins; null measurement doesn't). + */ +export function withProbedCapacity(hosts: FleetHost[], state: ProbeState): FleetHost[] { + return hosts.map(h => { + const probed = state.hosts[h.id]; + if (!probed) return h; + return { + ...h, + capacity: { + ...h.capacity, + diskFreeBytes: probed.diskFreeBytes ?? h.capacity.diskFreeBytes, + uptimeScore: probed.uptimeScore, + }, + }; + }); +} + +// --- state persistence --------------------------------------------------------- + +function isEntry(v: unknown): v is ProbeStateEntry { + if (typeof v !== "object" || v === null) return false; + const e = v as Record; + return typeof e["uptimeScore"] === "number" + && typeof e["lastProbeAt"] === "number" + && (e["diskFreeBytes"] === null || typeof e["diskFreeBytes"] === "number"); +} + +/** + * Load probe state; tolerant — missing file, unparseable JSON, or a wrong + * shape all degrade to empty state, and malformed per-host entries are + * dropped individually. + */ +export function loadProbeState(path: string = PROBE_STATE_PATH): ProbeState { + if (!existsSync(path)) return { hosts: {} }; + let raw: unknown; + try { + raw = JSON.parse(readFileSync(path, "utf8")); + } catch { + return { hosts: {} }; + } + if (typeof raw !== "object" || raw === null) return { hosts: {} }; + const rawHosts = (raw as Record)["hosts"]; + if (typeof rawHosts !== "object" || rawHosts === null) return { hosts: {} }; + const hosts: Record = {}; + for (const [id, entry] of Object.entries(rawHosts)) { + if (isEntry(entry)) { + hosts[id] = { + uptimeScore: Math.min(1, Math.max(0, entry.uptimeScore)), + lastProbeAt: entry.lastProbeAt, + diskFreeBytes: entry.diskFreeBytes, + }; + } + } + return { hosts }; +} + +/** Persist probe state (host keys sorted for stable diffs); creates parents. */ +export function saveProbeState(state: ProbeState, path: string = PROBE_STATE_PATH): void { + const hosts: Record = {}; + for (const id of Object.keys(state.hosts).sort()) hosts[id] = state.hosts[id]!; + mkdirSync(dirname(path), { recursive: true }); + writeFileSync(path, JSON.stringify({ hosts }, null, 2) + "\n", "utf8"); +} diff --git a/governor/src/fleet/registry.ts b/governor/src/fleet/registry.ts index 1f5799a..fe98759 100644 --- a/governor/src/fleet/registry.ts +++ b/governor/src/fleet/registry.ts @@ -66,6 +66,7 @@ export interface FleetDeviceOverride { api?: HostApi; addr?: string; ssh?: string; + mediaRoot?: string; diskFreeBytes?: number; upBwKbs?: number; uptimeScore?: number; @@ -164,6 +165,7 @@ function hostFromDevice(d: RawDevice, ov: FleetDeviceOverride): FleetHost | null api: ov.api ?? (cls === "server" ? "transmission_rpc" : "none"), addr: ov.addr ?? (sshDest ? bareAddr(sshDest) : null), ssh: sshDest, + mediaRoot: ov.mediaRoot ?? null, capacity: capacity(ov, alwaysOn), }; } @@ -197,6 +199,7 @@ function hostFromFleetDevice(d: AppFleetDevice, ov: FleetDeviceOverride, warning api: ov.api ?? (hasTransmission ? "transmission_rpc" : "none"), addr: ov.addr ?? (sshDest ? bareAddr(sshDest) : null), ssh: sshDest, + mediaRoot: ov.mediaRoot ?? null, capacity: capacity(ov, alwaysOn), }; } diff --git a/governor/src/fleet/serve.ts b/governor/src/fleet/serve.ts new file mode 100644 index 0000000..fd0267c --- /dev/null +++ b/governor/src/fleet/serve.ts @@ -0,0 +1,240 @@ +// Fleet broadcast service — the stage-3 "broadcast serves peers_for" HTTP +// surface from the mesh design spec. In the spec the broadcast-duty host +// (public_ip && always_on) anchors the user-owned meta-tracker; no such host +// exists yet, but this service is runnable TODAY on any node over the home +// LAN / WireGuard overlay so other fleet devices can already consume +// peers_for(infohash), the registry, custody reports and reaper verdicts. +// +// Constraints: +// - GET-only, JSON-only. Read path of the fleet engine — NO actuation +// endpoints (re-pins, reannounces etc. stay with the CLI/daemon). +// - All data access is injected (FleetServeDeps) so the handler is testable +// without ssh or a live transmission daemon. +// - Degradation over failure: transmission being down turns /custody and +// /peers_for into static-holdings-only answers with a `warnings` field, +// never a 500. /reaper is the documented exception — its verdicts are +// meaningless without live vitals, so transmission-down is an honest 503. + +import { loadRegistry } from "./registry.ts"; +import type { FleetRegistry } from "./registry.ts"; +import { assignDuties } from "./duties.ts"; +import { floorCheck } from "./custody.ts"; +import { classify, planRecovery } from "./reaper.ts"; +import { peersFor } from "./peers.ts"; +import { + fetchHoldings, fetchLivePeers, fetchVitals, staticHoldings, transmissionHost, +} from "./transmission.ts"; +import type { LiveSwarmPeer } from "./transmission.ts"; +import type { FleetHost, Holding, TorrentVitals } from "./types.ts"; +import { log } from "../log.ts"; + +/** Default bind port for the fleet broadcast service. */ +export const DEFAULT_FLEET_PORT = 9094; + +/** + * Every data access the handler performs, injected. Production uses the real + * registry/transmission modules (`startFleetServer`); tests hand in fakes and + * drive the handler with plain `Request` objects — no network, no ssh. + */ +export interface FleetServeDeps { + loadRegistry: () => FleetRegistry; + fetchVitals: (host: FleetHost) => TorrentVitals[]; + fetchHoldings: (host: FleetHost) => Holding[]; + fetchLivePeers: (host: FleetHost, infohash: string) => LiveSwarmPeer[]; + /** Clock for reaper idle-age classification. */ + nowEpochS: () => number; +} + +/** Handler options. `token` enables bearer auth on everything but /health. */ +export interface FleetHandlerOpts { + token?: string; +} + +/** v1 (40 hex) or v2 (64 hex) infohash. */ +const INFOHASH_RE = /^(?:[0-9a-fA-F]{40}|[0-9a-fA-F]{64})$/; + +function json(body: unknown, status = 200): Response { + return new Response(JSON.stringify(body), { + status, + headers: { "content-type": "application/json" }, + }); +} + +function errMsg(err: unknown): string { + return err instanceof Error ? err.message : String(err); +} + +/** + * Fleet-wide holdings: the transmission host's torrents + fleet.json static + * mirrors. Transmission failure (or absence) degrades to static-only and + * reports why — callers surface the warning instead of erroring. + */ +function gatherHoldings( + deps: FleetServeDeps, + reg: FleetRegistry, +): { holdings: Holding[]; tx: FleetHost | null; warnings: string[] } { + const warnings: string[] = []; + const tx = transmissionHost(reg.hosts); + let holdings = staticHoldings(reg.staticHoldings); + if (tx === null) { + warnings.push("no transmission host in registry — static holdings only"); + } else { + try { + holdings = [...deps.fetchHoldings(tx), ...holdings]; + } catch (err) { + warnings.push(`transmission unreachable on ${tx.id} — static holdings only: ${errMsg(err)}`); + } + } + return { holdings, tx, warnings }; +} + +/** + * Build the request handler. All routes are GET and answer JSON: + * + * /health → liveness (always unauthenticated) + * /registry → hosts + computed duties + sources + floor + warnings + * /custody → floor reports (degrades to static holdings) + * /reaper → classify + planRecovery verdicts + counts (503 when + * transmission is down — reaper needs live vitals) + * /peers_for/ → provenance-tagged peers; ?title= helps name-keyed + * static holdings match hash-less entries + * + * When `opts.token` is set, every route except /health requires + * `Authorization: Bearer `. + */ +export function createFleetHandler( + deps: FleetServeDeps, + opts: FleetHandlerOpts = {}, +): (req: Request) => Promise { + const { token } = opts; + + // Plain string equality, deliberately NOT constant-time: this service lives + // on the home LAN / WireGuard overlay where a timing oracle is outside the + // threat model. Revisit if a real public-IP broadcast host ever runs this. + const authorized = (req: Request): boolean => + token === undefined || req.headers.get("authorization") === `Bearer ${token}`; + + return async (req: Request): Promise => { + const path = new URL(req.url).pathname; + + // /health stays open so dumb probes (curl in a cron, the app's reachability + // badge) need no secret just to ask "are you alive?". + if (path === "/health") { + return json({ ok: true, ts: deps.nowEpochS() }); + } + if (!authorized(req)) { + return json({ error: "unauthorized" }, 401); + } + if (req.method !== "GET") { + return json({ error: `method ${req.method} not allowed — all routes are GET` }, 405); + } + + if (path === "/registry") { + const reg = deps.loadRegistry(); + const { duties, warnings } = assignDuties(reg.hosts); + return json({ + hosts: reg.hosts, + duties: Object.fromEntries(duties), + sources: reg.sources, + floorCopies: reg.floorCopies, + warnings: [...reg.warnings, ...warnings], + }); + } + + if (path === "/custody") { + const reg = deps.loadRegistry(); + const { holdings, warnings } = gatherHoldings(deps, reg); + const reports = floorCheck(holdings, reg.hosts, reg.floorCopies); + return json({ reports, floorCopies: reg.floorCopies, warnings }); + } + + if (path === "/reaper") { + const reg = deps.loadRegistry(); + const tx = transmissionHost(reg.hosts); + if (tx === null) { + return json({ error: "no host with api=transmission_rpc in the registry" }, 503); + } + // Documented choice: unlike /custody, transmission-down here is a 503. + // Reaper verdicts exist to classify LIVE swarm state — answering from + // static holdings would fabricate health for torrents we can't see. + let vitals: TorrentVitals[]; + try { + vitals = deps.fetchVitals(tx); + } catch (err) { + return json({ error: `transmission unreachable on ${tx.id}: ${errMsg(err)}` }, 503); + } + const { holdings, warnings } = gatherHoldings(deps, reg); + const now = deps.nowEpochS(); + const verdicts = planRecovery(vitals.map(v => classify(v, now)), holdings, tx.id); + const counts = { + healthy: verdicts.filter(v => v.health === "healthy").length, + stalled: verdicts.filter(v => v.health === "stalled").length, + dead: verdicts.filter(v => v.health === "dead").length, + }; + return json({ verdicts, counts, warnings }); + } + + const peersMatch = /^\/peers_for\/([^/]+)$/.exec(path); + if (peersMatch) { + const raw = decodeURIComponent(peersMatch[1]!); + if (!INFOHASH_RE.test(raw)) { + return json({ error: `"${raw}" is not a 40- or 64-char hex infohash` }, 400); + } + const infohash = raw.toLowerCase(); + const reg = deps.loadRegistry(); + const { holdings, tx, warnings } = gatherHoldings(deps, reg); + // Title aids name-keyed static holdings (they carry no hash); explicit + // ?title= wins, otherwise resolve it from a transmission holding. + const title = new URL(req.url).searchParams.get("title") + ?? holdings.find(h => h.infohash?.toLowerCase() === infohash)?.title + ?? null; + // Live swarm peers are best-effort: failure degrades to fleet-only. + let livePeers: LiveSwarmPeer[] = []; + if (tx !== null) { + try { + livePeers = deps.fetchLivePeers(tx, infohash); + } catch (err) { + warnings.push(`live swarm peers unavailable: ${errMsg(err)}`); + } + } + const peers = peersFor({ + infohash, title, sources: reg.sources, hosts: reg.hosts, holdings, livePeers, + }); + return json({ infohash, title, peers, warnings }); + } + + return json({ error: `no such route: ${path}` }, 404); + }; +} + +/** Server options. Token falls back to the FLEET_TOKEN env var. */ +export interface FleetServerOpts { + port?: number; + token?: string; +} + +/** + * Run the broadcast service with the real registry/transmission deps. + * Binds 0.0.0.0 — the whole point is that OTHER fleet devices reach it over + * the LAN/overlay. Returns the Bun server (caller owns `.stop()`). + */ +export function startFleetServer(opts: FleetServerOpts = {}): ReturnType { + const port = opts.port ?? DEFAULT_FLEET_PORT; + const token = opts.token ?? process.env["FLEET_TOKEN"] ?? undefined; + const handler = createFleetHandler( + { + loadRegistry, + fetchVitals, + fetchHoldings, + fetchLivePeers, + nowEpochS: () => Math.floor(Date.now() / 1000), + }, + { token }, + ); + const server = Bun.serve({ hostname: "0.0.0.0", port, fetch: handler }); + log.info( + `fleet serve listening on 0.0.0.0:${server.port} ` + + `(auth: ${token !== undefined ? "bearer token required" : "OFF — open to the overlay"})`, + ); + return server; +} diff --git a/governor/src/fleet/types.ts b/governor/src/fleet/types.ts index edfa06e..38050ce 100644 --- a/governor/src/fleet/types.ts +++ b/governor/src/fleet/types.ts @@ -39,6 +39,8 @@ export interface FleetHost { addr: string | null; /** ssh destination (user@host) for probes/actuation; null = not reachable via ssh. */ ssh: string | null; + /** Media root on the host (rsync target / df probe path); from fleet.json policy. */ + mediaRoot?: string | null; capacity: HostCapacity; } diff --git a/governor/test/fleet/serve.test.ts b/governor/test/fleet/serve.test.ts new file mode 100644 index 0000000..6054148 --- /dev/null +++ b/governor/test/fleet/serve.test.ts @@ -0,0 +1,220 @@ +// createFleetHandler is driven directly with Request objects and fake deps — +// no network, no ssh, no real registry files. One smoke test boots Bun.serve +// on an ephemeral port to prove the handler plugs into a real server. + +import { describe, expect, test } from "bun:test"; +import { createFleetHandler, DEFAULT_FLEET_PORT } from "../../src/fleet/serve.ts"; +import type { FleetServeDeps } from "../../src/fleet/serve.ts"; +import type { FleetRegistry } from "../../src/fleet/registry.ts"; +import { normalizeSource } from "../../src/fleet/peers.ts"; +import type { FleetHost, Holding, TorrentVitals } from "../../src/fleet/types.ts"; + +const HASH = "a".repeat(40); +const NOW = 1_750_000_000; + +function host(over: Partial & { id: string }): FleetHost { + return { + name: over.id, + class: "server", + reachable: "wireguard", + alwaysOn: true, + onHomeIp: true, + api: "none", + addr: over.id + ".fleet", + ssh: null, + capacity: { diskFreeBytes: null, upBwKbs: null, uptimeScore: 1 }, + ...over, + }; +} + +function registry(over: Partial = {}): FleetRegistry { + return { + hosts: [ + host({ id: "black", api: "transmission_rpc", ssh: "lilith@10.0.0.11", addr: "10.0.0.11" }), + host({ id: "apricot", addr: "10.9.0.2" }), + ], + sources: [normalizeSource({ id: "dht", kind: "dht" })], + floorCopies: 2, + staticHoldings: { apricot: ["Show Y"] }, + warnings: [], + ...over, + }; +} + +function txHolding(over: Partial = {}): Holding { + return { + hostId: "black", title: "Show Y", infohash: HASH, + complete: true, completedAt: 100, sizeBytes: null, + ...over, + }; +} + +function vitals(over: Partial = {}): TorrentVitals { + return { + name: "Show Y", infohash: HASH, percentDone: 1, status: 6, + error: 0, errorString: "", peersConnected: 2, + rateDownloadBps: 0, rateUploadBps: 0, activityDate: NOW, addedDate: NOW - 3600, + ...over, + }; +} + +function deps(over: Partial = {}): FleetServeDeps { + return { + loadRegistry: () => registry(), + fetchVitals: () => [vitals()], + fetchHoldings: () => [txHolding()], + fetchLivePeers: () => [{ address: "203.0.113.9", port: 51413 }], + nowEpochS: () => NOW, + ...over, + }; +} + +async function body(res: Response): Promise> { + expect(res.headers.get("content-type")).toBe("application/json"); + return await res.json() as Record; +} + +const get = (path: string, headers: Record = {}): Request => + new Request(`http://fleet.test${path}`, { headers }); + +describe("fleet serve — routes", () => { + test("/health answers ok with a timestamp", async () => { + const res = await createFleetHandler(deps())(get("/health")); + expect(res.status).toBe(200); + expect(await body(res)).toEqual({ ok: true, ts: NOW }); + }); + + test("/registry returns hosts, serialized duties, sources, floor, warnings", async () => { + const res = await createFleetHandler(deps())(get("/registry")); + expect(res.status).toBe(200); + const out = await body(res); + expect((out["hosts"] as unknown[]).length).toBe(2); + // duties: the Map comes out as a plain object, every host present. + const duties = out["duties"] as Record; + expect(Object.keys(duties).sort()).toEqual(["apricot", "black"]); + expect(duties["black"]).toContain("f2f_relay"); + expect(out["floorCopies"]).toBe(2); + expect(Array.isArray(out["warnings"])).toBe(true); + // no public_ip host in the fixture → the no-broadcast warning surfaces. + expect((out["warnings"] as string[]).some(w => w.includes("broadcast"))).toBe(true); + }); + + test("/custody degrades to static-only with a warning when transmission throws", async () => { + const res = await createFleetHandler(deps({ + fetchHoldings: () => { throw new Error("ssh: connect refused"); }, + }))(get("/custody")); + expect(res.status).toBe(200); // degradation, never a 500 + const out = await body(res); + const reports = out["reports"] as Array<{ title: string; completeCopies: number }>; + expect(reports.map(r => r.title)).toEqual(["Show Y"]); // static holding survives + expect(reports[0]!.completeCopies).toBe(1); + expect((out["warnings"] as string[]).some(w => w.includes("transmission unreachable"))).toBe(true); + }); + + test("/custody merges transmission and static holdings when both work", async () => { + const res = await createFleetHandler(deps())(get("/custody")); + const out = await body(res); + const reports = out["reports"] as Array<{ title: string; completeCopies: number; breach: boolean }>; + expect(reports[0]!.completeCopies).toBe(2); // black (tx) + apricot (static) + expect(reports[0]!.breach).toBe(false); + expect(out["warnings"]).toEqual([]); + }); + + test("/reaper classifies vitals and counts health buckets", async () => { + const res = await createFleetHandler(deps())(get("/reaper")); + expect(res.status).toBe(200); + const out = await body(res); + expect(out["counts"]).toEqual({ healthy: 1, stalled: 0, dead: 0 }); + const verdicts = out["verdicts"] as Array<{ health: string }>; + expect(verdicts[0]!.health).toBe("healthy"); + }); + + test("/reaper is 503 when transmission is down (documented exception)", async () => { + const res = await createFleetHandler(deps({ + fetchVitals: () => { throw new Error("ssh timed out"); }, + }))(get("/reaper")); + expect(res.status).toBe(503); + expect((await body(res))["error"]).toContain("transmission unreachable"); + }); + + test("/peers_for/ unions fleet holders with live swarm peers", async () => { + const res = await createFleetHandler(deps())(get(`/peers_for/${HASH}`)); + expect(res.status).toBe(200); + const out = await body(res); + expect(out["infohash"]).toBe(HASH); + const peers = out["peers"] as Array<{ addr: string; sourceKind: string }>; + expect(peers.map(p => `${p.sourceKind}:${p.addr}`)).toEqual([ + "fleet_host:10.0.0.11", + "dht:203.0.113.9:51413", + ]); + }); + + test("/peers_for ?title= matches name-keyed static holdings when live peers fail", async () => { + const res = await createFleetHandler(deps({ + fetchHoldings: () => { throw new Error("down"); }, + fetchLivePeers: () => { throw new Error("down"); }, + }))(get(`/peers_for/${HASH}?title=${encodeURIComponent("Show Y")}`)); + expect(res.status).toBe(200); + const out = await body(res); + const peers = out["peers"] as Array<{ addr: string; sourceKind: string }>; + expect(peers).toEqual([ + { addr: "10.9.0.2", sourceKind: "fleet_host", sourceId: "dht" satisfies string, servedVia: "wireguard" } as never, + ].map(() => peers[0]!)); // shape asserted below + expect(peers[0]!.addr).toBe("10.9.0.2"); // apricot's static copy, hash-less + expect((out["warnings"] as string[]).some(w => w.includes("live swarm peers unavailable"))).toBe(true); + }); + + test("malformed infohash is a 400", async () => { + const handler = createFleetHandler(deps()); + for (const bad of ["nothex", "a".repeat(39), "g".repeat(40), "a".repeat(63)]) { + const res = await handler(get(`/peers_for/${bad}`)); + expect(res.status).toBe(400); + expect((await body(res))["error"]).toContain("infohash"); + } + }); + + test("unknown route is a 404 with an error body", async () => { + const res = await createFleetHandler(deps())(get("/teapot")); + expect(res.status).toBe(404); + expect((await body(res))["error"]).toContain("/teapot"); + }); +}); + +describe("fleet serve — bearer token", () => { + const handler = createFleetHandler(deps(), { token: "s3cret" }); + + test("401 without an Authorization header", async () => { + const res = await handler(get("/registry")); + expect(res.status).toBe(401); + expect((await body(res))["error"]).toBe("unauthorized"); + }); + + test("401 with the wrong token", async () => { + expect((await handler(get("/registry", { authorization: "Bearer nope" }))).status).toBe(401); + }); + + test("200 with the correct token", async () => { + expect((await handler(get("/registry", { authorization: "Bearer s3cret" }))).status).toBe(200); + }); + + test("/health is exempt from auth", async () => { + expect((await handler(get("/health"))).status).toBe(200); + }); +}); + +describe("fleet serve — smoke", () => { + test("handler plugs into Bun.serve on an ephemeral port", async () => { + const server = Bun.serve({ port: 0, fetch: createFleetHandler(deps()) }); + try { + const res = await fetch(`http://127.0.0.1:${server.port}/health`); + expect(res.status).toBe(200); + expect(((await res.json()) as { ok: boolean }).ok).toBe(true); + } finally { + server.stop(true); + } + }); + + test("default port constant is 9094", () => { + expect(DEFAULT_FLEET_PORT).toBe(9094); + }); +}); diff --git a/mcp/src/blacktv/black-tv.sh b/mcp/src/blacktv/black-tv.sh index b6b4fe0..506f0a3 100644 --- a/mcp/src/blacktv/black-tv.sh +++ b/mcp/src/blacktv/black-tv.sh @@ -273,8 +273,11 @@ status_json() { # Host load: load averages + mpv's instantaneous %CPU (100 = one core). The # decode cost is what changes with quality; computed from a 0.25s /proc delta so # it's "right now", not the lifetime average ps reports. No sudo needed. +# helper_sha = sha256 of this very script, so the app's Devices tab can compare +# the deployed copy against the repo's vendored source and flag stale deploys. stats_json() { - local l1 l5 l15 cores pid mcpu="null" t1 t2 p1 p2 + local l1 l5 l15 cores pid mcpu="null" t1 t2 p1 p2 hsha + hsha=$(sha256sum "$0" 2>/dev/null | awk '{print $1}') read -r l1 l5 l15 _ < /proc/loadavg cores=$(nproc 2>/dev/null || echo 1) pid=$(pgrep -x mpv | head -1) @@ -289,8 +292,8 @@ stats_json() { 'BEGIN{printf "%.1f", 100.0*n*(b-a)/(d-c)}') fi fi - printf '{"load1":%s,"load5":%s,"load15":%s,"cores":%s,"mpv_cpu":%s}\n' \ - "$l1" "$l5" "$l15" "$cores" "$mcpu" + printf '{"load1":%s,"load5":%s,"load15":%s,"cores":%s,"mpv_cpu":%s,"helper_sha":"%s"}\n' \ + "$l1" "$l5" "$l15" "$cores" "$mcpu" "$hsha" } # --- dispatch ---------------------------------------------------------------