feat(offline): ⤴︎ priority download lane — user picks jump the warmup

On-demand downloads (e.g. goon clips picked in the Adult collection list) now
take priority over the background warmup plan instead of waiting behind it or
starting a competing rsync:

- new priority lane (priorityEpisodes/priorityCount/enqueuePriority) drained
  before each plan item and immediately when the cache is idle
- fetchFile routes through the lane when a warmup is already running (was: a
  second concurrent rsync that also stomped the queue UI), and awaits completion
- per-episode fetch extracted to fetchOne(), shared by plan + priority drains
- status line shows '⤴︎N prioritized'; prioritizeFetch()/awaitDownload() expose
  it for callers

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Natalie 2026-06-30 01:06:46 -04:00
parent 3c67b547c6
commit 40188f85a9

View file

@ -89,6 +89,11 @@ public final class OfflineCacheController {
public private(set) var planMissingCount = 0
/// User-pinned basenames (from policy) absent from disk. Restored with high priority on reconcile when warmup + headroom.
public private(set) var pinnedMissingCount = 0
/// On-demand downloads the user prioritized (e.g. picked goon clips). These
/// jump ahead of the background warmup plan: drained before each plan item,
/// and fetched immediately when the cache is otherwise idle.
public private(set) var priorityCount = 0
private var priorityEpisodes: [OfflineEpisode] = []
/// Background reconcile: cull when over budget, refetch missing plan episodes.
public static let reconcileInterval: TimeInterval = 600
@ -134,11 +139,12 @@ public final class OfflineCacheController {
let done = downloadQueue.filter { $0.state == .done }.count
let active = downloadQueue.first { $0.state == .downloading }
let pct = queueProgress.map { Int($0 * 100) }
let pri = priorityCount > 0 ? " · ⤴︎\(priorityCount) prioritized" : ""
if let active {
if let pct {
return "Downloading \(done + 1)/\(total) · \(pct)% — \(active.name)"
return "Downloading \(done + 1)/\(total) · \(pct)% — \(active.name)\(pri)"
}
return "Downloading \(done + 1)/\(total)\(active.name)"
return "Downloading \(done + 1)/\(total)\(active.name)\(pri)"
}
if done > 0 { return "Downloading \(done)/\(total)" }
return "Queued \(total) episodes…"
@ -680,26 +686,11 @@ public final class OfflineCacheController {
Log.info("offline cache: \(action.lowercased()) \(episodes.count) episodes → \(cacheRoot)")
var ok = 0
for (i, ep) in episodes.enumerated() {
let name = (ep.plumPath as NSString).lastPathComponent
if MediaPaths.localCopy(of: ep.plumPath) != nil {
setQueueItem(id: ep.plumPath, state: .done, progress: 1)
ok += 1
continue
}
if !Self.hasDownloadHeadroom(policy: policy, cacheRoot: cacheRoot) {
setQueueItem(id: ep.plumPath, state: .failed, progress: nil)
Log.warn("offline cache: skipped \(name) — below \(policy.reserveFreeGB) GB free reserve")
continue
}
setQueueItem(id: ep.plumPath, state: .downloading, progress: nil)
status = "\(action) \(i + 1)/\(episodes.count): \(name)"
let destDir = Self.destRoot(for: policy).appendingPathComponent(Self.sanitize(ep.show), isDirectory: true)
let didFetch = await Self.rsync(remotePath: ep.remotePath, destDir: destDir.path) { p in
await MainActor.run { Self.active?.setQueueItem(id: ep.plumPath, state: .downloading, progress: p) }
}
setQueueItem(id: ep.plumPath, state: didFetch ? .done : .failed, progress: didFetch ? 1 : nil)
if didFetch { ok += 1 }
await drainPriority(policy: policy, cacheRoot: cacheRoot)
status = "\(action) \(i + 1)/\(episodes.count): \((ep.plumPath as NSString).lastPathComponent)"
if await fetchOne(ep, policy: policy, cacheRoot: cacheRoot) { ok += 1 }
}
await drainPriority(policy: policy, cacheRoot: cacheRoot)
await Task.detached(priority: .utility) { _ = DownloadsIndex.shared.refresh() }.value
let bytes = Self.scanDisk(at: cacheRoot).1
let verb = action == "Refetching" ? "Refetched" : "Cached"
@ -708,6 +699,118 @@ public final class OfflineCacheController {
notifyCacheComplete(ok: ok, total: episodes.count, bytes: bytes, title: verb)
}
/// Fetch one episode into the cache, updating its queue row. Appends a row if
/// absent (priority items aren't part of the plan's `beginQueue`). Returns
/// true on success or when the file is already present locally.
@discardableResult
private func fetchOne(_ ep: OfflineEpisode, policy: OfflineCachePolicy, cacheRoot: String) async -> Bool {
let name = (ep.plumPath as NSString).lastPathComponent
if !downloadQueue.contains(where: { $0.id == ep.plumPath }) {
downloadQueue.append(OfflineQueueItem(episode: ep))
recomputeQueueProgress()
}
if MediaPaths.localCopy(of: ep.plumPath) != nil {
setQueueItem(id: ep.plumPath, state: .done, progress: 1)
return true
}
if !Self.hasDownloadHeadroom(policy: policy, cacheRoot: cacheRoot) {
setQueueItem(id: ep.plumPath, state: .failed, progress: nil)
Log.warn("offline cache: skipped \(name) — below \(policy.reserveFreeGB) GB free reserve")
return false
}
setQueueItem(id: ep.plumPath, state: .downloading, progress: nil)
let destDir = Self.destRoot(for: policy).appendingPathComponent(Self.sanitize(ep.show), isDirectory: true)
let didFetch = await Self.rsync(remotePath: ep.remotePath, destDir: destDir.path) { p in
await MainActor.run { Self.active?.setQueueItem(id: ep.plumPath, state: .downloading, progress: p) }
}
setQueueItem(id: ep.plumPath, state: didFetch ? .done : .failed, progress: didFetch ? 1 : nil)
return didFetch
}
/// Drain all queued priority downloads (culling to budget, protecting the
/// file being fetched). Called before each plan item and by the idle drainer.
private func drainPriority(policy: OfflineCachePolicy, cacheRoot: String) async {
while !priorityEpisodes.isEmpty {
let ep = priorityEpisodes.removeFirst()
priorityCount = priorityEpisodes.count
let name = (ep.plumPath as NSString).lastPathComponent
let prot = Self.protectedNames(for: [], additionalPinned: Self.effectivePinned(from: policy) + [name])
let budget = Self.budgetBytes(policy: policy, cacheBytesOnDisk: Self.scanDisk(at: cacheRoot).1)
let cull = Self.evictOldest(protectedNames: prot, budgetBytes: budget, cacheRoot: cacheRoot)
if cull.count > 0 { recordCull(cull, policy: policy, cacheRoot: cacheRoot); refreshDiskStats() }
status = "Priority: \(name)"
_ = await fetchOne(ep, policy: policy, cacheRoot: cacheRoot)
}
}
// MARK: priority lane (user-chosen downloads jump the warmup plan)
/// Queue files to download ahead of the warmup plan. Already-local or already-
/// queued paths are dropped. If a fetch loop is running they're drained before
/// the next plan item; if idle, a priority-only fetch starts immediately.
public func enqueuePriority(_ episodes: [OfflineEpisode]) {
let existing = Set(priorityEpisodes.map(\.plumPath))
let fresh = episodes.filter { MediaPaths.localCopy(of: $0.plumPath) == nil && !existing.contains($0.plumPath) }
guard !fresh.isEmpty else { return }
priorityEpisodes.append(contentsOf: fresh)
priorityCount = priorityEpisodes.count
if caching {
status = "Prioritized \(fresh.count) download\(fresh.count == 1 ? "" : "s") — fetching next"
return
}
Task { await self.fetchPriorityIfIdle() }
}
/// True while `path` is waiting in or being fetched by the priority lane.
public func isPrioritized(path: String) -> Bool {
priorityEpisodes.contains { $0.plumPath == path }
}
private func fetchPriorityIfIdle() async {
guard !caching, !priorityEpisodes.isEmpty else { return }
guard Self.isStorageReachable() else {
status = "Black offline — priority downloads paused"
return
}
caching = true
let policy = policyActuator.policyForActuation()
let cacheRoot = Self.destRoot(for: policy).path
queueProgress = 0
defer {
caching = false
downloadingLabel = nil
downloadingProgress = nil
downloadQueue = []
queueProgress = nil
refreshDiskStats()
}
await drainPriority(policy: policy, cacheRoot: cacheRoot)
await Task.detached(priority: .utility) { _ = DownloadsIndex.shared.refresh() }.value
status = "Priority downloads done (\(Self.formatBytes(Self.scanDisk(at: cacheRoot).1)) on disk)"
}
/// Build an episode for a single library path and prioritize its download.
/// Used by on-demand callers (e.g. the Adult collection clip list).
public static func prioritizeFetch(path: String, show: String?) {
guard let active else { return }
let remote = MediaPaths.toRemote(path)
let ep = OfflineEpisode(show: show ?? inferShowName(remote),
label: (path as NSString).lastPathComponent,
plumPath: path, remotePath: remote)
active.enqueuePriority([ep])
}
/// Await a prioritized download: resolves true once the file lands locally,
/// false once the lane has drained without it. Polled (1 s) up to ~1 h.
public func awaitDownload(path: String) async -> Bool {
for _ in 0..<3600 {
if MediaPaths.localCopy(of: path) != nil { return true }
if !isPrioritized(path: path) && !caching { break }
try? await Task.sleep(for: .seconds(1))
}
return MediaPaths.localCopy(of: path) != nil
}
private func notifyCacheComplete(ok: Int, total: Int, bytes: Int64, title: String = "Offline cache") {
let body = "\(ok)/\(total) episodes · \(Self.formatBytes(bytes))"
if SettingsStore.load().notifyDownloads {
@ -724,6 +827,15 @@ public final class OfflineCacheController {
let remote = MediaPaths.toRemote(path)
let showName = show ?? inferShowName(remote)
let fileName = (path as NSString).lastPathComponent
// A warmup/plan (or another priority drain) is already rsyncing don't
// start a competing transfer. Hand this to the priority lane so it jumps
// ahead of the remaining plan items, and await its completion.
if let a = active, a.caching {
a.enqueuePriority([OfflineEpisode(show: showName, label: fileName,
plumPath: path, remotePath: remote)])
onStatus?("Prioritized \(fileName) — downloading next")
return await a.awaitDownload(path: path)
}
let cacheRoot = destRoot(for: policy).path
let cacheBytes = scanDisk(at: cacheRoot).1
let budget = budgetBytes(policy: policy, cacheBytesOnDisk: cacheBytes)