diff --git a/Sources/TVAnarchyCore/OfflineCacheController.swift b/Sources/TVAnarchyCore/OfflineCacheController.swift index 3d35475..64cec1e 100644 --- a/Sources/TVAnarchyCore/OfflineCacheController.swift +++ b/Sources/TVAnarchyCore/OfflineCacheController.swift @@ -89,6 +89,11 @@ public final class OfflineCacheController { public private(set) var planMissingCount = 0 /// User-pinned basenames (from policy) absent from disk. Restored with high priority on reconcile when warmup + headroom. public private(set) var pinnedMissingCount = 0 + /// On-demand downloads the user prioritized (e.g. picked goon clips). These + /// jump ahead of the background warmup plan: drained before each plan item, + /// and fetched immediately when the cache is otherwise idle. + public private(set) var priorityCount = 0 + private var priorityEpisodes: [OfflineEpisode] = [] /// Background reconcile: cull when over budget, refetch missing plan episodes. public static let reconcileInterval: TimeInterval = 600 @@ -134,11 +139,12 @@ public final class OfflineCacheController { let done = downloadQueue.filter { $0.state == .done }.count let active = downloadQueue.first { $0.state == .downloading } let pct = queueProgress.map { Int($0 * 100) } + let pri = priorityCount > 0 ? " · ⤴︎\(priorityCount) prioritized" : "" if let active { if let pct { - return "Downloading \(done + 1)/\(total) · \(pct)% — \(active.name)" + return "Downloading \(done + 1)/\(total) · \(pct)% — \(active.name)\(pri)" } - return "Downloading \(done + 1)/\(total) — \(active.name)" + return "Downloading \(done + 1)/\(total) — \(active.name)\(pri)" } if done > 0 { return "Downloading \(done)/\(total)…" } return "Queued \(total) episodes…" @@ -680,26 +686,11 @@ public final class OfflineCacheController { Log.info("offline cache: \(action.lowercased()) \(episodes.count) episodes → \(cacheRoot)") var ok = 0 for (i, ep) in episodes.enumerated() { - let name = (ep.plumPath as NSString).lastPathComponent - if MediaPaths.localCopy(of: ep.plumPath) != nil { - setQueueItem(id: ep.plumPath, state: .done, progress: 1) - ok += 1 - continue - } - if !Self.hasDownloadHeadroom(policy: policy, cacheRoot: cacheRoot) { - setQueueItem(id: ep.plumPath, state: .failed, progress: nil) - Log.warn("offline cache: skipped \(name) — below \(policy.reserveFreeGB) GB free reserve") - continue - } - setQueueItem(id: ep.plumPath, state: .downloading, progress: nil) - status = "\(action) \(i + 1)/\(episodes.count): \(name)…" - let destDir = Self.destRoot(for: policy).appendingPathComponent(Self.sanitize(ep.show), isDirectory: true) - let didFetch = await Self.rsync(remotePath: ep.remotePath, destDir: destDir.path) { p in - await MainActor.run { Self.active?.setQueueItem(id: ep.plumPath, state: .downloading, progress: p) } - } - setQueueItem(id: ep.plumPath, state: didFetch ? .done : .failed, progress: didFetch ? 1 : nil) - if didFetch { ok += 1 } + await drainPriority(policy: policy, cacheRoot: cacheRoot) + status = "\(action) \(i + 1)/\(episodes.count): \((ep.plumPath as NSString).lastPathComponent)…" + if await fetchOne(ep, policy: policy, cacheRoot: cacheRoot) { ok += 1 } } + await drainPriority(policy: policy, cacheRoot: cacheRoot) await Task.detached(priority: .utility) { _ = DownloadsIndex.shared.refresh() }.value let bytes = Self.scanDisk(at: cacheRoot).1 let verb = action == "Refetching" ? "Refetched" : "Cached" @@ -708,6 +699,118 @@ public final class OfflineCacheController { notifyCacheComplete(ok: ok, total: episodes.count, bytes: bytes, title: verb) } + /// Fetch one episode into the cache, updating its queue row. Appends a row if + /// absent (priority items aren't part of the plan's `beginQueue`). Returns + /// true on success or when the file is already present locally. + @discardableResult + private func fetchOne(_ ep: OfflineEpisode, policy: OfflineCachePolicy, cacheRoot: String) async -> Bool { + let name = (ep.plumPath as NSString).lastPathComponent + if !downloadQueue.contains(where: { $0.id == ep.plumPath }) { + downloadQueue.append(OfflineQueueItem(episode: ep)) + recomputeQueueProgress() + } + if MediaPaths.localCopy(of: ep.plumPath) != nil { + setQueueItem(id: ep.plumPath, state: .done, progress: 1) + return true + } + if !Self.hasDownloadHeadroom(policy: policy, cacheRoot: cacheRoot) { + setQueueItem(id: ep.plumPath, state: .failed, progress: nil) + Log.warn("offline cache: skipped \(name) — below \(policy.reserveFreeGB) GB free reserve") + return false + } + setQueueItem(id: ep.plumPath, state: .downloading, progress: nil) + let destDir = Self.destRoot(for: policy).appendingPathComponent(Self.sanitize(ep.show), isDirectory: true) + let didFetch = await Self.rsync(remotePath: ep.remotePath, destDir: destDir.path) { p in + await MainActor.run { Self.active?.setQueueItem(id: ep.plumPath, state: .downloading, progress: p) } + } + setQueueItem(id: ep.plumPath, state: didFetch ? .done : .failed, progress: didFetch ? 1 : nil) + return didFetch + } + + /// Drain all queued priority downloads (culling to budget, protecting the + /// file being fetched). Called before each plan item and by the idle drainer. + private func drainPriority(policy: OfflineCachePolicy, cacheRoot: String) async { + while !priorityEpisodes.isEmpty { + let ep = priorityEpisodes.removeFirst() + priorityCount = priorityEpisodes.count + let name = (ep.plumPath as NSString).lastPathComponent + let prot = Self.protectedNames(for: [], additionalPinned: Self.effectivePinned(from: policy) + [name]) + let budget = Self.budgetBytes(policy: policy, cacheBytesOnDisk: Self.scanDisk(at: cacheRoot).1) + let cull = Self.evictOldest(protectedNames: prot, budgetBytes: budget, cacheRoot: cacheRoot) + if cull.count > 0 { recordCull(cull, policy: policy, cacheRoot: cacheRoot); refreshDiskStats() } + status = "Priority: \(name)…" + _ = await fetchOne(ep, policy: policy, cacheRoot: cacheRoot) + } + } + + // MARK: priority lane (user-chosen downloads jump the warmup plan) + + /// Queue files to download ahead of the warmup plan. Already-local or already- + /// queued paths are dropped. If a fetch loop is running they're drained before + /// the next plan item; if idle, a priority-only fetch starts immediately. + public func enqueuePriority(_ episodes: [OfflineEpisode]) { + let existing = Set(priorityEpisodes.map(\.plumPath)) + let fresh = episodes.filter { MediaPaths.localCopy(of: $0.plumPath) == nil && !existing.contains($0.plumPath) } + guard !fresh.isEmpty else { return } + priorityEpisodes.append(contentsOf: fresh) + priorityCount = priorityEpisodes.count + if caching { + status = "Prioritized \(fresh.count) download\(fresh.count == 1 ? "" : "s") — fetching next" + return + } + Task { await self.fetchPriorityIfIdle() } + } + + /// True while `path` is waiting in or being fetched by the priority lane. + public func isPrioritized(path: String) -> Bool { + priorityEpisodes.contains { $0.plumPath == path } + } + + private func fetchPriorityIfIdle() async { + guard !caching, !priorityEpisodes.isEmpty else { return } + guard Self.isStorageReachable() else { + status = "Black offline — priority downloads paused" + return + } + caching = true + let policy = policyActuator.policyForActuation() + let cacheRoot = Self.destRoot(for: policy).path + queueProgress = 0 + defer { + caching = false + downloadingLabel = nil + downloadingProgress = nil + downloadQueue = [] + queueProgress = nil + refreshDiskStats() + } + await drainPriority(policy: policy, cacheRoot: cacheRoot) + await Task.detached(priority: .utility) { _ = DownloadsIndex.shared.refresh() }.value + status = "Priority downloads done (\(Self.formatBytes(Self.scanDisk(at: cacheRoot).1)) on disk)" + } + + /// Build an episode for a single library path and prioritize its download. + /// Used by on-demand callers (e.g. the Adult collection clip list). + public static func prioritizeFetch(path: String, show: String?) { + guard let active else { return } + let remote = MediaPaths.toRemote(path) + let ep = OfflineEpisode(show: show ?? inferShowName(remote), + label: (path as NSString).lastPathComponent, + plumPath: path, remotePath: remote) + active.enqueuePriority([ep]) + } + + /// Await a prioritized download: resolves true once the file lands locally, + /// false once the lane has drained without it. Polled (1 s) up to ~1 h. + public func awaitDownload(path: String) async -> Bool { + for _ in 0..<3600 { + if MediaPaths.localCopy(of: path) != nil { return true } + if !isPrioritized(path: path) && !caching { break } + try? await Task.sleep(for: .seconds(1)) + } + return MediaPaths.localCopy(of: path) != nil + } + private func notifyCacheComplete(ok: Int, total: Int, bytes: Int64, title: String = "Offline cache") { let body = "\(ok)/\(total) episodes · \(Self.formatBytes(bytes))" if SettingsStore.load().notifyDownloads { @@ -724,6 +827,15 @@ public final class OfflineCacheController { let remote = MediaPaths.toRemote(path) let showName = show ?? inferShowName(remote) let fileName = (path as NSString).lastPathComponent + // A warmup/plan (or another priority drain) is already rsyncing — don't + // start a competing transfer. Hand this to the priority lane so it jumps + // ahead of the remaining plan items, and await its completion. + if let a = active, a.caching { + a.enqueuePriority([OfflineEpisode(show: showName, label: fileName, + plumPath: path, remotePath: remote)]) + onStatus?("Prioritized \(fileName) — downloading next") + return await a.awaitDownload(path: path) + } let cacheRoot = destRoot(for: policy).path let cacheBytes = scanDisk(at: cacheRoot).1 let budget = budgetBytes(policy: policy, cacheBytesOnDisk: cacheBytes)