tv-anarchy/Sources/TVAnarchyCore/MpvTarget.swift
Natalie a86e68c525 feat(apps): add fleet engine mesh core integration
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
2026-06-09 21:23:36 -07:00

242 lines
11 KiB
Swift

import Foundation
/// Generic mpv host driven over mpv's standard JSON IPC. Control (play/pause/
/// volume/seek/playlist/status) is protocol-native no bespoke verbs. The
/// host-specific operations a generic client can't do (launch playback, enumerate
/// releases, report load, tear down a root-owned mpv) are delegated to per-host
/// command templates run over the same SSH channel.
///
/// IPC transport: one `ssh` to the host runs, guarded by the socket's existence,
/// `printf '<cmd>\n' | [sudo] socat - UNIX-CONNECT:<socket>`
/// so a whole status poll is a single round-trip. Responses are matched by
/// `request_id` (mpv echoes it), never by position, so interleaved async event
/// lines don't corrupt the parse.
public final class MpvTarget: PlayerTarget, QualitySwitchable, HostStatsProvider, MediaLaunchable, Enqueueable, TrackSelectable, ServiceRestartable {
public let id: String
public let name: String
public let kind: HostKind = .mpvIPC
public let volumeScale: Int
private let mpv: MpvConn
private let commands: CommandsConfig?
private let transport: SSHTransport
public var detail: String { transport.detail }
public init(id: String, name: String, mpv: MpvConn, commands: CommandsConfig?) {
self.id = id; self.name = name; self.mpv = mpv; self.commands = commands
self.volumeScale = mpv.volumeScale
self.transport = SSHTransport(endpoints: mpv.endpoints)
}
// MARK: - IPC
/// Status property order. The index (+1) IS the request_id, and
/// `parseStatusOutput` maps those ids back to fields keep the two in sync.
private static let statusProps = [
"pause", "volume", "time-pos", "duration", "media-title", "playlist-pos", "playlist-count",
]
public func poll() async -> PollResult {
let lines = Self.statusProps.enumerated().map {
ipcLine(["get_property", $0.element], requestId: $0.offset + 1)
}
let r = await sendIPC(lines)
guard r.ok else { return .unreachable } // ssh failed host unreachable
return Self.parseStatusOutput(r.stdout) // reachable: parse or .idle
}
public func playPause() async { await send(["cycle", "pause"]) }
public func resume() async { await send(["set_property", "pause", false]) }
public func setVolume(_ percent: Int) async {
await send(["set_property", "volume", min(volumeScale, max(0, percent))])
}
public func seek(relative seconds: Int) async { await send(["seek", seconds]) }
public func seek(toSeconds seconds: Int) async { await send(["seek", seconds, "absolute"]) }
public func next() async { await send(["playlist-next"]) }
public func previous() async { await send(["playlist-prev"]) }
/// Delegated, NOT mpv `quit` quit can leave a stale socket, so teardown
/// goes through the host's configured `stop` command for identical cleanup.
public func stop() async { await runCommand(commands?.stop, [:]) }
// MARK: ServiceRestartable (delegated)
public var canRestartService: Bool { commands?.restart != nil }
/// Hard-restart the host's player service (black: relaunch the mpv unit,
/// resuming the live playlist/position when it's still readable).
@discardableResult
public func restartService() async -> Bool { await runCommand(commands?.restart, [:]) }
// MARK: MediaLaunchable (delegated)
@discardableResult
public func launch(_ request: LaunchRequest) async -> Bool {
guard case let .file(path) = request else { return false }
// black needs a black-side path, not plum's mount path.
return await runCommand(commands?.launchFile, ["path": MediaPaths.toRemote(path)])
}
// MARK: Enqueueable (generic IPC batched loadfile replace/append, one
// round-trip; spike-verified against black's live socket: count 141).
@discardableResult
public func enqueue(_ paths: [String], replace: Bool) async -> Bool {
guard !paths.isEmpty else { return false }
let lines = paths.enumerated().map { i, p in
ipcLine(["loadfile", MediaPaths.toRemote(p), (i == 0 && replace) ? "replace" : "append"])
}
return await sendIPC(lines).ok
}
// MARK: HostStatsProvider (delegated)
public func stats() async -> HostStats? {
guard let r = await runCommandResult(commands?.stats, [:]), r.ok,
let data = r.stdout.data(using: .utf8),
let s = try? JSONDecoder().decode(HostStats.self, from: data) else { return nil }
return s
}
// MARK: QualitySwitchable (enumerate delegated; the switch itself is generic IPC)
public func releases() async -> [Release] {
guard let r = await runCommandResult(commands?.releases, [:]), r.ok,
let data = r.stdout.data(using: .utf8),
let list = try? JSONDecoder().decode([Release].self, from: data) else { return [] }
return list
}
/// Resolve the target file (+ remaining episodes) for the chosen release via
/// the host, then drive the switch generically over IPC: load the new file at
/// the current timestamp, then append the rest so series continuation is kept.
public func switchRelease(_ id: String) async {
guard let r = await runCommandResult(commands?.resolveRelease, ["releaseId": id]), r.ok,
let data = r.stdout.data(using: .utf8),
let resolved = try? JSONDecoder().decode(ResolvedRelease.self, from: data) else { return }
let pos = await currentTimePos() ?? 0
var lines = [ipcLine(["loadfile", resolved.path, "replace", "start=\(Int(pos))"])]
for file in resolved.tail { lines.append(ipcLine(["loadfile", file, "append"])) }
_ = await sendIPC(lines)
}
// MARK: TrackSelectable (generic IPC)
public func tracks() async -> [MediaTrack] {
let r = await sendIPC([ipcLine(["get_property", "track-list"], requestId: 1)])
guard r.ok, let arr = Self.parseResponses(r.stdout)[1] as? [[String: Any]] else { return [] }
return arr.compactMap { t in
guard let type = t["type"] as? String,
let id = (t["id"] as? NSNumber)?.intValue,
let kind: TrackKind = (type == "audio" ? .audio : type == "sub" ? .subtitle : nil)
else { return nil }
return MediaTrack(id: id, kind: kind, lang: t["lang"] as? String,
title: t["title"] as? String, codec: t["codec"] as? String,
selected: (t["selected"] as? Bool) ?? false)
}
}
public func setAudioTrack(_ id: Int) async { await send(["set_property", "aid", id]) }
/// nil disables subtitles mpv takes the string "no" for sid (not an Int).
public func setSubtitleTrack(_ id: Int?) async {
if let id { await send(["set_property", "sid", id]) }
else { await send(["set_property", "sid", "no"]) }
}
/// alang/slang persist to every file the playlist loads next (the next
/// episodes); the current file is already loaded, so its tracks are selected
/// explicitly by language match.
public func applyLanguagePreference(audioLangs: [String], subLangs: [String], subsEnabled: Bool) async {
await send(["set_property", "alang", audioLangs.joined(separator: ",")])
await send(["set_property", "slang", subsEnabled ? subLangs.joined(separator: ",") : ""])
let trks = await tracks()
let audio = trks.filter { $0.kind == .audio }
if let pick = audio.first(where: { trackLangMatches($0.lang, audioLangs) }) ?? audio.first {
await setAudioTrack(pick.id)
}
if subsEnabled {
let subs = trks.filter { $0.kind == .subtitle }
if let pick = subs.first(where: { trackLangMatches($0.lang, subLangs) }) ?? subs.first {
await setSubtitleTrack(pick.id)
}
} else {
await setSubtitleTrack(nil)
}
}
// MARK: - helpers
private func currentTimePos() async -> Double? {
let r = await sendIPC([ipcLine(["get_property", "time-pos"], requestId: 1)])
guard r.ok else { return nil }
return (Self.parseResponses(r.stdout)[1] as? NSNumber)?.doubleValue
}
private func send(_ args: [Any]) async { _ = await sendIPC([ipcLine(args)]) }
@discardableResult
private func runCommand(_ template: [String]?, _ subs: [String: String?]) async -> Bool {
await runCommandResult(template, subs)?.ok ?? false
}
private func runCommandResult(_ template: [String]?, _ subs: [String: String?]) async -> ProcessResult? {
guard let template else { return nil }
return await transport.runRemote(CommandTemplate.render(template, subs))
}
/// Build one mpv IPC command line, optionally tagged with a request_id.
private func ipcLine(_ command: [Any], requestId: Int? = nil) -> String {
var obj: [String: Any] = ["command": command]
if let requestId { obj["request_id"] = requestId }
guard let data = try? JSONSerialization.data(withJSONObject: obj),
let s = String(data: data, encoding: .utf8) else { return "{}" }
return s
}
/// Guarded one-shot socat pipeline: `[ -S sock ] && printf | [sudo] socat
/// || printf __NOMPV__`. The socket guard mirrors the script so a gone socket
/// reads as "reachable but idle", not "unreachable".
private func sendIPC(_ lines: [String]) async -> ProcessResult {
let sock = SSHTransport.shq(mpv.socket)
let fmt = SSHTransport.shq(String(repeating: "%s\\n", count: lines.count))
let payload = ([fmt] + lines.map(SSHTransport.shq)).joined(separator: " ")
let socat = (mpv.sudo ? "sudo " : "") + "\(mpv.socat) - UNIX-CONNECT:\(sock)"
let remote = "[ -S \(sock) ] && printf \(payload) | \(socat) || printf '__NOMPV__\\n'"
return await transport.runRemote(remote)
}
// MARK: - parsing (static + testable)
/// Index mpv IPC responses by request_id, dropping async event lines (no
/// request_id) and unsuccessful results. Value is the response's `data`.
static func parseResponses(_ text: String) -> [Int: Any] {
var out: [Int: Any] = [:]
for line in text.split(separator: "\n") {
guard let d = line.data(using: .utf8),
let obj = try? JSONSerialization.jsonObject(with: d) as? [String: Any],
let rid = obj["request_id"] as? Int else { continue }
if let err = obj["error"] as? String, err != "success" { continue }
out[rid] = obj["data"]
}
return out
}
/// Turn batched status output into a PollResult. `__NOMPV__` or no usable
/// responses reachable but idle; otherwise a populated status.
static func parseStatusOutput(_ text: String) -> PollResult {
if text.contains("__NOMPV__") { return PollResult(reachable: true, status: .idle) }
let byId = parseResponses(text)
if byId.isEmpty { return PollResult(reachable: true, status: .idle) }
var s = PlaybackStatus(playing: true)
s.paused = byId[1] as? Bool
if let v = byId[2] as? NSNumber { s.volume = v.doubleValue }
if let v = byId[3] as? NSNumber { s.position = v.doubleValue }
if let v = byId[4] as? NSNumber { s.duration = v.doubleValue }
s.title = byId[5] as? String
if let v = byId[6] as? NSNumber { s.playlistPos = v.intValue }
if let v = byId[7] as? NSNumber { s.playlistCount = v.intValue }
return PollResult(reachable: true, status: s)
}
}