"""HMAC-SHA256 auth on /api/v1/sync/*. Covers: 1. Valid signature → 200 2. Tampered body → 401 3. Expired timestamp → 401 4. Missing headers → 401 5. sync_secret=None → all pass through (back-compat) 6. Full two-Claire convergence with matched secrets """ from __future__ import annotations import hashlib import hmac import json import time from pathlib import Path from unittest.mock import patch import httpx import pytest from fastapi.testclient import TestClient from claire.config import PeerConfig from claire.sync import compute_signature from claire.web.app import create_app SECRET = "test-secret-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" OTHER_SECRET = "test-secret-BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB" def _signed(method: str, path: str, body: bytes, secret: str) -> dict[str, str]: ts = str(int(time.time())) sig = compute_signature(ts, method, path, body, secret) return {"X-Claire-Timestamp": ts, "X-Claire-Signature": sig} def _client(tmp_path: Path, secret: str | None) -> TestClient: return TestClient( create_app( config_path=tmp_path / "claire.toml", db_path=tmp_path / "claire.db", sync_secret=secret, ) ) def test_valid_signature_passes(tmp_path: Path) -> None: c = _client(tmp_path, SECRET) headers = _signed("GET", "/api/v1/sync/cursor", b"", SECRET) r = c.get("/api/v1/sync/cursor", headers=headers) assert r.status_code == 200 def test_missing_headers_rejected(tmp_path: Path) -> None: c = _client(tmp_path, SECRET) r = c.get("/api/v1/sync/cursor") assert r.status_code == 401 assert r.json()["detail"] == "auth failed" def test_tampered_body_rejected(tmp_path: Path) -> None: c = _client(tmp_path, SECRET) real_body = json.dumps({"events": []}, separators=(",", ":")).encode() headers = _signed("POST", "/api/v1/sync/events", real_body, SECRET) headers["content-type"] = "application/json" # Send a *different* body than what we signed. tampered = json.dumps({"events": [{"injected": True}]}, separators=(",", ":")).encode() r = c.post("/api/v1/sync/events", content=tampered, headers=headers) assert r.status_code == 401 def test_expired_timestamp_rejected(tmp_path: Path) -> None: c = _client(tmp_path, SECRET) old_ts = str(int(time.time()) - 1000) sig = compute_signature(old_ts, "GET", "/api/v1/sync/cursor", b"", SECRET) r = c.get( "/api/v1/sync/cursor", headers={"X-Claire-Timestamp": old_ts, "X-Claire-Signature": sig}, ) assert r.status_code == 401 def test_wrong_secret_rejected(tmp_path: Path) -> None: c = _client(tmp_path, SECRET) headers = _signed("GET", "/api/v1/sync/cursor", b"", OTHER_SECRET) r = c.get("/api/v1/sync/cursor", headers=headers) assert r.status_code == 401 def test_no_secret_passes_through(tmp_path: Path) -> None: """When sync_secret is None, sync endpoints are unauthenticated.""" c = _client(tmp_path, None) r = c.get("/api/v1/sync/cursor") assert r.status_code == 200 r = c.post("/api/v1/sync/events", json={"events": []}) assert r.status_code == 200 def test_health_unauthenticated_even_with_secret(tmp_path: Path) -> None: c = _client(tmp_path, SECRET) r = c.get("/api/v1/health") assert r.status_code == 200 # --------------------------------------------------------------------------- # End-to-end convergence with signed sync # --------------------------------------------------------------------------- def _swap_httpx_to_clients(clients_by_base: dict[str, TestClient]): def fake_get(url: str, **kwargs): for base, client in clients_by_base.items(): if url.startswith(base): path = url[len(base):] return client.get( path, params=kwargs.get("params"), timeout=kwargs.get("timeout"), headers=kwargs.get("headers"), ) raise RuntimeError(f"no TestClient registered for {url}") def fake_post(url: str, **kwargs): for base, client in clients_by_base.items(): if url.startswith(base): path = url[len(base):] if "content" in kwargs: return client.post( path, content=kwargs.get("content"), timeout=kwargs.get("timeout"), headers=kwargs.get("headers"), ) return client.post( path, json=kwargs.get("json"), timeout=kwargs.get("timeout"), headers=kwargs.get("headers"), ) raise RuntimeError(f"no TestClient registered for {url}") def fake_request(method: str, url: str, **kwargs): if method.upper() == "GET": return fake_get(url, **kwargs) if method.upper() == "POST": return fake_post(url, **kwargs) raise RuntimeError(f"unsupported method {method}") return patch.multiple( httpx, get=fake_get, post=fake_post, request=fake_request, ) def test_signed_two_clares_converge(tmp_path: Path) -> None: a_secret = "a-secret-" + "A" * 40 b_secret = "b-secret-" + "B" * 40 a_dir = tmp_path / "a" b_dir = tmp_path / "b" a_dir.mkdir() b_dir.mkdir() a = TestClient(create_app( config_path=a_dir / "claire.toml", db_path=a_dir / "claire.db", sync_secret=a_secret, )) b = TestClient(create_app( config_path=b_dir / "claire.toml", db_path=b_dir / "claire.db", sync_secret=b_secret, )) # Need to sign requests we make directly via TestClient with each side's # own secret. def signed_get(client, path, secret, **kw): headers = _signed("GET", path, b"", secret) return client.get(path, headers=headers, **kw) def signed_post(client, path, body_obj, secret): body = json.dumps(body_obj, separators=(",", ":")).encode() headers = _signed("POST", path, body, secret) headers["content-type"] = "application/json" return client.post(path, content=body, headers=headers) # Unsigned writes via the non-sync API still work (no auth on /projects). a.post("/api/v1/projects", json={"name": "alpha"}) a.post("/api/v1/tasks", json={"project": "alpha", "title": "task on A"}) b.post("/api/v1/projects", json={"name": "beta"}) b.post("/api/v1/tasks", json={"project": "beta", "title": "task on B"}) BASE_A = "http://machine-a" BASE_B = "http://machine-b" # A knows B's secret (to talk to B); B knows A's (to talk to A). peer_b = PeerConfig(url=BASE_B, secret=b_secret) with _swap_httpx_to_clients({BASE_A: a, BASE_B: b}): from claire.sync import sync_peer result = sync_peer(peer_b, local_base=BASE_A, local_secret=a_secret) assert result.pulled > 0 assert result.pushed > 0 # Verify both sides see both projects. a_names = {p["name"] for p in a.get("/api/v1/projects").json()["projects"]} b_names = {p["name"] for p in b.get("/api/v1/projects").json()["projects"]} assert a_names == {"alpha", "beta"} assert b_names == {"alpha", "beta"} def test_peers_back_compat_plain_strings(tmp_path: Path) -> None: """Legacy `peers = ["http://..."]` form still parses.""" from claire.config import ClaireConfig cfg = ClaireConfig.model_validate({ "machine_id": "abc", "peers": ["http://10.0.0.1:8765"], }) assert len(cfg.peers) == 1 assert cfg.peers[0].url == "http://10.0.0.1:8765" assert cfg.peers[0].secret is None def test_peers_object_form(tmp_path: Path) -> None: from claire.config import ClaireConfig cfg = ClaireConfig.model_validate({ "machine_id": "abc", "peers": [{"url": "http://10.0.0.1:8765", "secret": "s"}], }) assert cfg.peers[0].url == "http://10.0.0.1:8765" assert cfg.peers[0].secret == "s"