tv-anarchy/Sources/TVAnarchyCore/Streamability.swift
Natalie 4a2ceb9781 feat(offline): inline star-to-keep and trash-to-cull on cache rows
Surface the existing pin (keep-from-cull) and per-file delete actions as
visible inline buttons on each offline cache row instead of context-menu-only:
a star toggles protection from auto-cull (and restore-if-missing), a trash
culls that file early. Aligns wording/icons to the star metaphor.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-30 00:12:41 -04:00

206 lines
No EOL
8.3 KiB
Swift

import Foundation
/// Health of the link to the storage server when Stream sourcing is selected.
public enum StreamabilityLevel: String, Sendable, Equatable, Codable {
case unknown
case checking
case good
case marginal
case poor
case unreachable
public var label: String {
switch self {
case .unknown: ""
case .checking: "Checking…"
case .good: "Good"
case .marginal: "Marginal"
case .poor: "Poor"
case .unreachable: "Unreachable"
}
}
}
/// One ping + bandwidth sample against the storage server.
public struct StreamabilitySample: Sendable, Equatable {
public var level: StreamabilityLevel
public var pingMs: Int?
public var bandwidthKBps: Int?
public var requiredKBps: Int?
public var bufferSeconds: Int
public var checkedAt: Date
public var detail: String
public static let idle = StreamabilitySample(
level: .unknown, pingMs: nil, bandwidthKBps: nil, requiredKBps: nil,
bufferSeconds: StreamPolicy.defaults.bufferSeconds, checkedAt: .distantPast,
detail: "Stream probes idle")
public static let checking = StreamabilitySample(
level: .checking, pingMs: nil, bandwidthKBps: nil, requiredKBps: nil,
bufferSeconds: StreamPolicy.defaults.bufferSeconds, checkedAt: Date(),
detail: "Probing link to storage…")
}
/// Pure assessment maps measured link quality to a streamability level.
public enum StreamabilityAssessor {
/// Typical file size when episode duration is known but byte size is not.
public static let assumedMbps: Double = 5
/// Sustained KB/s needed to keep `bufferSeconds` ahead of playback.
public static func requiredKBps(bufferSeconds: Int, episodeSeconds: Double,
fileBytes: Int64?) -> Int {
let dur = max(episodeSeconds, 60)
let bytes: Int64
if let fileBytes, fileBytes > 0 {
bytes = fileBytes
} else {
bytes = Int64(assumedMbps * 1_000_000 / 8 * dur)
}
let playbackKBps = Double(bytes) / 1024.0 / dur
// Fill the buffer window within one buffer period (with 10% headroom).
let window = max(Double(bufferSeconds), 30)
let fillKBps = (Double(bytes) / 1024.0) * (window / dur) / window
return max(1, Int(max(playbackKBps, fillKBps) * 1.1))
}
public static func assess(pingMs: Int?, bandwidthKBps: Int?, requiredKBps: Int) -> StreamabilityLevel {
guard let bw = bandwidthKBps, bw > 0 else { return .unreachable }
if let ping = pingMs, ping > 8_000 { return .poor }
if bw >= requiredKBps * 2 { return .good }
if bw >= requiredKBps { return .marginal }
return .poor
}
public static func detail(level: StreamabilityLevel, pingMs: Int?, bandwidthKBps: Int?,
requiredKBps: Int, bufferSeconds: Int) -> String {
var parts: [String] = []
if let ping = pingMs { parts.append("ping \(ping) ms") }
if let bw = bandwidthKBps { parts.append("\(bw) KB/s") }
if requiredKBps > 0 { parts.append("need ≥\(requiredKBps) KB/s for \(bufferSeconds)s buffer") }
let base = parts.joined(separator: " · ")
switch level {
case .good: return base.isEmpty ? "Link looks healthy for streaming" : base
case .marginal: return base + " — may stutter on high-bitrate episodes"
case .poor: return base + " — consider Offline mode or a closer host"
case .unreachable: return "Storage server unreachable"
case .checking: return "Probing link to storage…"
case .unknown: return "Stream probes idle"
}
}
}
/// End-to-end probes over the same SSH/rsync path offline fetch uses.
public enum StreamabilityProbe {
private static let probeBytes = 1 * 1024 * 1024
private static let minProbeFileBytes: Int64 = 64 * 1024 * 1024
private static let sshControl = [
"-o", "ControlPath=/tmp/tva-cm-%r@%h:%p",
"-o", "ConnectTimeout=12",
"-o", "BatchMode=yes",
]
private static var probeCacheURL: URL {
let base: URL
if let dir = ProcessInfo.processInfo.environment["TV_ANARCHY_STATE_DIR"], !dir.isEmpty {
base = URL(fileURLWithPath: dir, isDirectory: true)
} else {
base = FileManager.default.homeDirectoryForCurrentUser
.appendingPathComponent(".local/state/tv-anarchy", isDirectory: true)
}
return base.appendingPathComponent("stream-probe-path.txt")
}
public struct ProbeResult: Sendable {
public var pingMs: Int?
public var bandwidthKBps: Int?
public var host: String?
}
public static func run(hosts: [String], episodeSeconds: Double,
bufferSeconds: Int, fileBytes: Int64?) -> ProbeResult {
guard !hosts.isEmpty else { return ProbeResult() }
var pingMs: Int?
var bandwidthKBps: Int?
var usedHost: String?
for host in hosts {
if let ms = ping(host: host) {
pingMs = ms
usedHost = host
if let bw = bandwidth(host: host) {
bandwidthKBps = bw
break
}
}
}
_ = episodeSeconds
_ = bufferSeconds
_ = fileBytes
return ProbeResult(pingMs: pingMs, bandwidthKBps: bandwidthKBps, host: usedHost)
}
private static func ping(host: String) -> Int? {
let start = Date()
let r = ProcessRunner.run("/usr/bin/ssh", sshControl + [host, "true"])
let ms = Int(Date().timeIntervalSince(start) * 1000)
return r.ok ? max(1, ms) : nil
}
private static func bandwidth(host: String) -> Int? {
guard let path = pickProbePath(host: host) else { return nil }
let skip = Int.random(in: 1...32)
let cmd = "dd if=\(shq(path)) of=/dev/null bs=1M count=1 skip=\(skip) 2>/dev/null"
let start = Date()
let r = ProcessRunner.run("/usr/bin/ssh", sshControl + [host, cmd])
let elapsed = Date().timeIntervalSince(start)
guard r.ok, elapsed > 0.05 else { return nil }
return max(1, Int(Double(probeBytes) / 1024.0 / elapsed))
}
private static func pickProbePath(host: String) -> String? {
if let cached = readCachedPath(), cachedOK(cached, host: host) { return cached }
if let fromIndex = pathFromIndex(host: host) {
writeCachedPath(fromIndex)
return fromIndex
}
return nil
}
private static func cachedOK(_ path: String, host: String) -> Bool {
let cmd = "test -f \(shq(path)) && stat -c%s \(shq(path)) 2>/dev/null || stat -f%z \(shq(path))"
let r = ProcessRunner.run("/usr/bin/ssh", sshControl + [host, cmd])
guard r.ok, let size = Int64(r.stdout.trimmingCharacters(in: .whitespacesAndNewlines)),
size >= minProbeFileBytes else { return false }
return true
}
private static func pathFromIndex(host: String) -> String? {
let r = ProcessRunner.run("/usr/bin/ssh", sshControl + [host, "cat /bigdisk/_/media/_tools/index.tsv"])
guard r.ok else { return nil }
var best: (size: Int64, path: String)?
for line in r.stdout.split(separator: "\n") {
let parts = line.split(separator: "\t", omittingEmptySubsequences: false)
guard parts.count >= 3, let size = Int64(parts[0]), size >= minProbeFileBytes else { continue }
let path = String(parts[2])
if best == nil || size > best!.size { best = (size, path) }
}
return best?.path
}
private static func readCachedPath() -> String? {
guard let data = try? String(contentsOf: probeCacheURL, encoding: .utf8) else { return nil }
let trimmed = data.trimmingCharacters(in: .whitespacesAndNewlines)
return trimmed.isEmpty ? nil : trimmed
}
private static func writeCachedPath(_ path: String) {
let url = probeCacheURL
try? FileManager.default.createDirectory(at: url.deletingLastPathComponent(),
withIntermediateDirectories: true)
try? (path + "\n").write(to: url, atomically: true, encoding: .utf8)
}
private static func shq(_ s: String) -> String {
"'" + s.replacingOccurrences(of: "'", with: "'\\''") + "'"
}
}