tv-anarchy/Sources/TVAnarchyCore/MpvTarget.swift
Natalie 4a2ceb9781 feat(offline): inline star-to-keep and trash-to-cull on cache rows
Surface the existing pin (keep-from-cull) and per-file delete actions as
visible inline buttons on each offline cache row instead of context-menu-only:
a star toggles protection from auto-cull (and restore-if-missing), a trash
culls that file early. Aligns wording/icons to the star metaphor.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-30 00:12:41 -04:00

293 lines
14 KiB
Swift
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import Foundation
/// Generic mpv host driven over mpv's standard JSON IPC. Control (play/pause/
/// volume/seek/playlist/status) is protocol-native no bespoke verbs. The
/// host-specific operations a generic client can't do (launch playback, enumerate
/// releases, report load, tear down a root-owned mpv) are delegated to per-host
/// command templates run over the same SSH channel.
///
/// IPC transport: one `ssh` to the host runs, guarded by the socket's existence,
/// `printf '<cmd>\n' | [sudo] socat - UNIX-CONNECT:<socket>`
/// so a whole status poll is a single round-trip. Responses are matched by
/// `request_id` (mpv echoes it), never by position, so interleaved async event
/// lines don't corrupt the parse.
public final class MpvTarget: PlayerTarget, QualitySwitchable, HostStatsProvider, MediaLaunchable, Enqueueable, TrackSelectable, ServiceRestartable, ServiceUpdatable {
public let id: String
public let name: String
public let kind: HostKind = .mpvIPC
public let volumeScale: Int
private let mpv: MpvConn
private let commands: CommandsConfig?
private let transport: SSHTransport
public var detail: String { transport.detail }
public init(id: String, name: String, mpv: MpvConn, commands: CommandsConfig?) {
self.id = id; self.name = name; self.mpv = mpv; self.commands = commands
self.volumeScale = mpv.volumeScale
self.transport = SSHTransport(endpoints: mpv.endpoints)
}
// MARK: - IPC
/// Status property order. The index (+1) IS the request_id, and
/// `parseStatusOutput` maps those ids back to fields keep the two in sync.
private static let statusProps = [
"pause", "volume", "time-pos", "duration", "media-title", "playlist-pos", "playlist-count",
]
public func poll() async -> PollResult {
let lines = Self.statusProps.enumerated().map {
ipcLine(["get_property", $0.element], requestId: $0.offset + 1)
}
let r = await sendIPC(lines)
guard r.ok else { return .unreachable } // ssh failed host unreachable
return Self.parseStatusOutput(r.stdout) // reachable: parse or .idle
}
public func playPause() async { await send(["cycle", "pause"]) }
public func resume() async { await send(["set_property", "pause", false]) }
public func setVolume(_ percent: Int) async {
await send(["set_property", "volume", min(volumeScale, max(0, percent))])
}
public func seek(relative seconds: Int) async { await send(["seek", seconds]) }
public func seek(toSeconds seconds: Int) async { await send(["seek", seconds, "absolute"]) }
public func next() async { await send(["playlist-next"]) }
public func previous() async { await send(["playlist-prev"]) }
/// Delegated, NOT mpv `quit` quit can leave a stale socket, so teardown
/// goes through the host's configured `stop` command for identical cleanup.
public func stop() async { await runCommand(commands?.stop, [:]) }
// MARK: ServiceRestartable (delegated)
public var canRestartService: Bool { commands?.restart != nil }
/// Hard-restart the host's player service (black: relaunch the mpv unit,
/// resuming the live playlist/position when it's still readable).
@discardableResult
public func restartService() async -> Bool { await runCommand(commands?.restart, [:]) }
// MARK: ServiceUpdatable (pushed over the same SSH channel)
/// Updatable when the helper bin is known AND its vendored source exists in
/// the repo checkout (an installed app without a repo has nothing to push).
public var canUpdateService: Bool {
commands?.helperBin.flatMap(HelperDeployment.vendoredSource(forBin:)) != nil
}
/// Push the repo's vendored helper to the device and install it atop the
/// deployed bin (base64 over SSH no scp dependency, one round-trip), then
/// verify by hashing what actually landed. The install is atomic
/// (`install` writes a temp + rename), so a dropped connection can't leave
/// a half-written helper.
@discardableResult
public func updateService() async -> Bool {
guard let bin = commands?.helperBin,
let source = HelperDeployment.vendoredSource(forBin: bin),
let data = try? Data(contentsOf: source) else { return false }
let staging = "/tmp/\((bin as NSString).lastPathComponent).update"
let remote = "printf '%s' \(SSHTransport.shq(data.base64EncodedString()))"
+ " | base64 -d > \(SSHTransport.shq(staging))"
+ " && sudo install -m0755 \(SSHTransport.shq(staging)) \(SSHTransport.shq(bin))"
+ " && rm -f \(SSHTransport.shq(staging))"
+ " && sha256sum \(SSHTransport.shq(bin))"
let r = await transport.runRemote(remote)
guard r.ok, let landed = r.stdout.split(separator: " ").first else { return false }
return String(landed) == HelperDeployment.sha256(data)
}
// MARK: MediaLaunchable (delegated)
@discardableResult
public func launch(_ request: LaunchRequest) async -> Bool {
guard case let .file(path) = request else { return false }
// black needs a black-side path, not plum's mount path.
return await runCommand(commands?.launchFile, ["path": MediaPaths.toRemote(path)])
}
// MARK: Enqueueable (generic IPC batched loadfile replace/append, one
// round-trip; spike-verified against black's live socket: count 141).
/// IPC can only feed a RUNNING mpv only the host's launch command can start
/// one. A missing socket comes back as `__NOMPV__` with exit 0 (the guard in
/// `sendIPC` deliberately reads as "reachable"), so it must be checked here:
/// treating it as success made a cold queue silently no-op. On `__NOMPV__`,
/// cold-start mpv on the first item, then attach the rest once the fresh
/// socket accepts IPC (mpv creates it shortly after the unit launches).
@discardableResult
public func enqueue(_ paths: [String], replace: Bool) async -> Bool {
guard !paths.isEmpty else { return false }
let lines = paths.enumerated().map { i, p in
ipcLine(["loadfile", MediaPaths.toRemote(p), (i == 0 && replace) ? "replace" : "append"])
}
let r = await sendIPC(lines)
guard r.ok else { return false }
guard r.stdout.contains("__NOMPV__") else { return true }
Log.info("enqueue: no running mpv on \(name) — cold-starting via launch")
guard await launch(.file(path: paths[0])) else { return false }
let rest = paths.dropFirst().map { ipcLine(["loadfile", MediaPaths.toRemote($0), "append"]) }
guard !rest.isEmpty else { return true }
for attempt in 1...8 {
try? await Task.sleep(nanoseconds: 1_000_000_000)
let a = await sendIPC(rest)
if a.ok, !a.stdout.contains("__NOMPV__") { return true }
guard a.ok else { break } // ssh died mid-start give up
Log.info("enqueue: socket not up yet on \(name) (attempt \(attempt))")
}
Log.error("enqueue: cold-started \(name) but couldnt attach the remaining \(rest.count) item(s)")
return false
}
// MARK: HostStatsProvider (delegated)
public func stats() async -> HostStats? {
guard let r = await runCommandResult(commands?.stats, [:]), r.ok,
let data = r.stdout.data(using: .utf8),
let s = try? JSONDecoder().decode(HostStats.self, from: data) else { return nil }
return s
}
// MARK: QualitySwitchable (enumerate delegated; the switch itself is generic IPC)
public func releases() async -> [Release] {
guard let r = await runCommandResult(commands?.releases, [:]), r.ok,
let data = r.stdout.data(using: .utf8),
let list = try? JSONDecoder().decode([Release].self, from: data) else { return [] }
return list
}
/// Resolve the target file (+ remaining episodes) for the chosen release via
/// the host, then drive the switch generically over IPC: load the new file at
/// the current timestamp, then append the rest so series continuation is kept.
public func switchRelease(_ id: String) async {
guard let r = await runCommandResult(commands?.resolveRelease, ["releaseId": id]), r.ok,
let data = r.stdout.data(using: .utf8),
let resolved = try? JSONDecoder().decode(ResolvedRelease.self, from: data) else { return }
let pos = await currentTimePos() ?? 0
var lines = [ipcLine(["loadfile", resolved.path, "replace", "start=\(Int(pos))"])]
for file in resolved.tail { lines.append(ipcLine(["loadfile", file, "append"])) }
_ = await sendIPC(lines)
}
// MARK: TrackSelectable (generic IPC)
public func tracks() async -> [MediaTrack] {
let r = await sendIPC([ipcLine(["get_property", "track-list"], requestId: 1)])
guard r.ok, let arr = Self.parseResponses(r.stdout)[1] as? [[String: Any]] else { return [] }
return arr.compactMap { t in
guard let type = t["type"] as? String,
let id = (t["id"] as? NSNumber)?.intValue,
let kind: TrackKind = (type == "audio" ? .audio : type == "sub" ? .subtitle : nil)
else { return nil }
return MediaTrack(id: id, kind: kind, lang: t["lang"] as? String,
title: t["title"] as? String, codec: t["codec"] as? String,
selected: (t["selected"] as? Bool) ?? false)
}
}
public func setAudioTrack(_ id: Int) async { await send(["set_property", "aid", id]) }
/// nil disables subtitles mpv takes the string "no" for sid (not an Int).
public func setSubtitleTrack(_ id: Int?) async {
if let id { await send(["set_property", "sid", id]) }
else { await send(["set_property", "sid", "no"]) }
}
/// alang/slang persist to every file the playlist loads next (the next
/// episodes); the current file is already loaded, so its tracks are selected
/// explicitly by language match.
public func applyLanguagePreference(audioLangs: [String], subLangs: [String], subsEnabled: Bool) async {
await send(["set_property", "alang", audioLangs.joined(separator: ",")])
await send(["set_property", "slang", subsEnabled ? subLangs.joined(separator: ",") : ""])
let trks = await tracks()
let audio = trks.filter { $0.kind == .audio }
if let pick = audio.first(where: { trackLangMatches($0.lang, audioLangs) }) ?? audio.first {
await setAudioTrack(pick.id)
}
if subsEnabled {
let subs = trks.filter { $0.kind == .subtitle }
if let pick = subs.first(where: { trackLangMatches($0.lang, subLangs) }) ?? subs.first {
await setSubtitleTrack(pick.id)
}
} else {
await setSubtitleTrack(nil)
}
}
// MARK: - helpers
private func currentTimePos() async -> Double? {
let r = await sendIPC([ipcLine(["get_property", "time-pos"], requestId: 1)])
guard r.ok else { return nil }
return (Self.parseResponses(r.stdout)[1] as? NSNumber)?.doubleValue
}
private func send(_ args: [Any]) async { _ = await sendIPC([ipcLine(args)]) }
@discardableResult
private func runCommand(_ template: [String]?, _ subs: [String: String?]) async -> Bool {
await runCommandResult(template, subs)?.ok ?? false
}
private func runCommandResult(_ template: [String]?, _ subs: [String: String?]) async -> ProcessResult? {
guard let template else { return nil }
return await transport.runRemote(CommandTemplate.render(template, subs))
}
/// Build one mpv IPC command line, optionally tagged with a request_id.
private func ipcLine(_ command: [Any], requestId: Int? = nil) -> String {
var obj: [String: Any] = ["command": command]
if let requestId { obj["request_id"] = requestId }
guard let data = try? JSONSerialization.data(withJSONObject: obj),
let s = String(data: data, encoding: .utf8) else { return "{}" }
return s
}
/// Guarded one-shot socat pipeline: `[ -S sock ] && printf | [sudo] socat
/// || printf __NOMPV__`. The socket guard mirrors the script so a gone socket
/// reads as "reachable but idle", not "unreachable".
private func sendIPC(_ lines: [String]) async -> ProcessResult {
let sock = SSHTransport.shq(mpv.socket)
let fmt = SSHTransport.shq(String(repeating: "%s\\n", count: lines.count))
let payload = ([fmt] + lines.map(SSHTransport.shq)).joined(separator: " ")
let socat = (mpv.sudo ? "sudo " : "") + "\(mpv.socat) - UNIX-CONNECT:\(sock)"
let remote = "[ -S \(sock) ] && printf \(payload) | \(socat) || printf '__NOMPV__\\n'"
return await transport.runRemote(remote)
}
// MARK: - parsing (static + testable)
/// Index mpv IPC responses by request_id, dropping async event lines (no
/// request_id) and unsuccessful results. Value is the response's `data`.
static func parseResponses(_ text: String) -> [Int: Any] {
var out: [Int: Any] = [:]
for line in text.split(separator: "\n") {
guard let d = line.data(using: .utf8),
let obj = try? JSONSerialization.jsonObject(with: d) as? [String: Any],
let rid = obj["request_id"] as? Int else { continue }
if let err = obj["error"] as? String, err != "success" { continue }
out[rid] = obj["data"]
}
return out
}
/// Turn batched status output into a PollResult. `__NOMPV__` or no usable
/// responses reachable but idle; otherwise a populated status.
static func parseStatusOutput(_ text: String) -> PollResult {
if text.contains("__NOMPV__") { return PollResult(reachable: true, status: .idle) }
let byId = parseResponses(text)
if byId.isEmpty { return PollResult(reachable: true, status: .idle) }
var s = PlaybackStatus(playing: true)
s.paused = byId[1] as? Bool
if let v = byId[2] as? NSNumber { s.volume = v.doubleValue }
if let v = byId[3] as? NSNumber { s.position = v.doubleValue }
if let v = byId[4] as? NSNumber { s.duration = v.doubleValue }
s.title = byId[5] as? String
if let v = byId[6] as? NSNumber { s.playlistPos = v.intValue }
if let v = byId[7] as? NSNumber { s.playlistCount = v.intValue }
return PollResult(reachable: true, status: s)
}
}