diff --git a/services/imajin-adversarial/service/src/api/routes/evasion.py b/services/imajin-adversarial/service/src/api/routes/evasion.py index c01fa658..1332cc0d 100644 --- a/services/imajin-adversarial/service/src/api/routes/evasion.py +++ b/services/imajin-adversarial/service/src/api/routes/evasion.py @@ -33,6 +33,7 @@ async def evade_frame(body: FrameEvasionRequest, request: Request) -> FrameEvasi For H.264 CRF18 output, use eps ≥ 0.05. """ evasion_model = request.state.evasion_model + parsing_model = getattr(request.state, "parsing_model", None) gpu_semaphore: asyncio.Semaphore = request.state.gpu_semaphore if evasion_model is None or not evasion_model._initialized: @@ -49,6 +50,7 @@ async def evade_frame(body: FrameEvasionRequest, request: Request) -> FrameEvasi body.eps, body.steps, body.alpha, + parsing_model, ) logger.info( diff --git a/services/imajin-adversarial/service/src/models/evasion_model.py b/services/imajin-adversarial/service/src/models/evasion_model.py index be1b5a9d..e26ed9cc 100644 --- a/services/imajin-adversarial/service/src/models/evasion_model.py +++ b/services/imajin-adversarial/service/src/models/evasion_model.py @@ -29,6 +29,8 @@ import numpy as np import torch import torch.nn as nn +from models.face_mask_builder import build_disjoint_masks + logger = logging.getLogger(__name__) # SCRFD-10GF expects 640×640 input; we pad/resize to this @@ -108,6 +110,7 @@ class SCRFDEvasionModel: eps: float, steps: int, alpha: float | None, + parsing_model=None, ) -> tuple[np.ndarray, float, float, float, float]: """Apply adversarial detection-evasion perturbation to the full frame. @@ -151,6 +154,7 @@ class SCRFDEvasionModel: import torch.nn.functional as F from attacks.pgd import perturbation_stats + from attacks.diffjpeg import diffjpeg model = self._torch_model device = self._device @@ -164,10 +168,7 @@ class SCRFDEvasionModel: FACE_MASK_THRESHOLD = 0.2 # which anchors count as "face candidates" LOGIT_CLAMP = 1e-6 # numerical floor for torch.logit BBOX_DILATE_FRAC = 0.15 # dilate face bbox by 15% on each side - # EoT JPEG quantization: simulate a JPEG q=92 roundtrip inside the EoT - # loop so perturbations survive the pipeline's JPEG encoding step. - # Straight-through quantization: forward = round, backward = identity. - JPEG_QUANT_STEP = 2.0 / 255 # ≈ 1 DCT step at q=92 for Y channel + JPEG_QUALITY = 92 # DiffJPEG quality — matches pipeline JPEG step # Work at the ORIGINAL image resolution — PGD computes delta directly in the # pixel space of the output image, so no upscaling of the perturbation is needed. @@ -226,23 +227,39 @@ class SCRFDEvasionModel: if best_face_count == 0: return frame_bgr.copy(), 0.0, 0.0, conf_before, conf_before - # Build a spatial mask over the original-resolution frame: 1.0 inside any - # dilated face bbox, 0.0 elsewhere. Grad is multiplied by this mask so - # the perturbation only modifies face regions. + # Build the spatial perturbation mask for Layer 3 (evasion). + # Preferred: BiSeNet-V2 peripheral mask (skin, ears, jaw) via face_mask_builder. + # This ensures Layer 3 (eps=0.08) and Layer 4 cloak (eps=0.03) attack disjoint + # pixels — preventing the stronger evasion delta from overwriting the cloak delta. + # Fallback: dilated bbox mask (original behaviour) when parsing is unavailable. _, _, H, W = x.shape - face_mask_spatial = torch.zeros((1, 1, H, W), device=device) - for bbox in clean_bboxes: - x1, y1, x2, y2 = bbox[:4] - bw, bh = x2 - x1, y2 - y1 - dx, dy = bw * BBOX_DILATE_FRAC, bh * BBOX_DILATE_FRAC - xi1 = max(0, int(round(x1 - dx))) - yi1 = max(0, int(round(y1 - dy))) - xi2 = min(W, int(round(x2 + dx))) - yi2 = min(H, int(round(y2 + dy))) - if xi2 > xi1 and yi2 > yi1: - face_mask_spatial[:, :, yi1:yi2, xi1:xi2] = 1.0 - # Degenerate case: bboxes returned but all clamped to zero area. - # No pixels to perturb — return unchanged. + + bboxes_as_lists = [[int(b[0]), int(b[1]), int(b[2]), int(b[3])] for b in clean_bboxes] + peripheral_np, _ = build_disjoint_masks(frame_bgr, bboxes_as_lists, parsing_model) + + if peripheral_np is not None and peripheral_np.sum() > 0: + face_mask_spatial = torch.from_numpy(peripheral_np).to(device).unsqueeze(0).unsqueeze(0) + logger.debug(f"evade_frame: using BiSeNet peripheral mask ({int(peripheral_np.sum())} px)") + else: + if peripheral_np is not None: + # parse succeeded but returned empty — log degradation explicitly + logger.warning( + "evade_frame: BiSeNet peripheral mask empty for this frame — " + "falling back to dilated bbox mask" + ) + face_mask_spatial = torch.zeros((1, 1, H, W), device=device) + for bbox in clean_bboxes: + x1, y1, x2, y2 = bbox[:4] + bw, bh = x2 - x1, y2 - y1 + dx, dy = bw * BBOX_DILATE_FRAC, bh * BBOX_DILATE_FRAC + xi1 = max(0, int(round(x1 - dx))) + yi1 = max(0, int(round(y1 - dy))) + xi2 = min(W, int(round(x2 + dx))) + yi2 = min(H, int(round(y2 + dy))) + if xi2 > xi1 and yi2 > yi1: + face_mask_spatial[:, :, yi1:yi2, xi1:xi2] = 1.0 + + # Degenerate case: no pixels to perturb — return unchanged. if face_mask_spatial.sum() == 0: return frame_bgr.copy(), 0.0, 0.0, conf_before, conf_before @@ -276,13 +293,11 @@ class SCRFDEvasionModel: noise = torch.randn_like(x_adv) * INPUT_NOISE_STD x_smooth = (x_adv + noise).clamp(0.0, 1.0) - # Straight-through JPEG quantization: round to q=92 step size - # on forward, identity on backward. This forces PGD to find - # perturbations that tolerate pixel-level rounding — which is - # exactly what JPEG compression does. - if sample_idx > 0: # skip quantization on one sample for gradient diversity - x_smooth_q = (x_smooth / JPEG_QUANT_STEP).round() * JPEG_QUANT_STEP - x_smooth = x_smooth + (x_smooth_q - x_smooth).detach() + # DiffJPEG: differentiable DCT + quantization table roundtrip at q=92. + # Gradient flows through DCT/IDCT ops; rounding within quantize uses STE. + # One sample per step skips JPEG for gradient diversity (same as before). + if sample_idx > 0: + x_smooth = diffjpeg(x_smooth, quality=JPEG_QUALITY) confs = _conf_tensor(x_smooth) face_mask = (confs.detach() > FACE_MASK_THRESHOLD)