claire/tests/test_sync_auth.py
Natalie 8d8c1e32e9 fix(@projects/@claire): 🐛 correct spelling in test names
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
2026-05-31 17:47:31 -06:00

235 lines
7.9 KiB
Python

"""HMAC-SHA256 auth on /api/v1/sync/*.
Covers:
1. Valid signature → 200
2. Tampered body → 401
3. Expired timestamp → 401
4. Missing headers → 401
5. sync_secret=None → all pass through (back-compat)
6. Full two-Claire convergence with matched secrets
"""
from __future__ import annotations
import hashlib
import hmac
import json
import time
from pathlib import Path
from unittest.mock import patch
import httpx
import pytest
from fastapi.testclient import TestClient
from claire.config import PeerConfig
from claire.sync import compute_signature
from claire.web.app import create_app
SECRET = "test-secret-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
OTHER_SECRET = "test-secret-BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
def _signed(method: str, path: str, body: bytes, secret: str) -> dict[str, str]:
ts = str(int(time.time()))
sig = compute_signature(ts, method, path, body, secret)
return {"X-Claire-Timestamp": ts, "X-Claire-Signature": sig}
def _client(tmp_path: Path, secret: str | None) -> TestClient:
return TestClient(
create_app(
config_path=tmp_path / "claire.toml",
db_path=tmp_path / "claire.db",
sync_secret=secret,
)
)
def test_valid_signature_passes(tmp_path: Path) -> None:
c = _client(tmp_path, SECRET)
headers = _signed("GET", "/api/v1/sync/cursor", b"", SECRET)
r = c.get("/api/v1/sync/cursor", headers=headers)
assert r.status_code == 200
def test_missing_headers_rejected(tmp_path: Path) -> None:
c = _client(tmp_path, SECRET)
r = c.get("/api/v1/sync/cursor")
assert r.status_code == 401
assert r.json()["detail"] == "auth failed"
def test_tampered_body_rejected(tmp_path: Path) -> None:
c = _client(tmp_path, SECRET)
real_body = json.dumps({"events": []}, separators=(",", ":")).encode()
headers = _signed("POST", "/api/v1/sync/events", real_body, SECRET)
headers["content-type"] = "application/json"
# Send a *different* body than what we signed.
tampered = json.dumps({"events": [{"injected": True}]}, separators=(",", ":")).encode()
r = c.post("/api/v1/sync/events", content=tampered, headers=headers)
assert r.status_code == 401
def test_expired_timestamp_rejected(tmp_path: Path) -> None:
c = _client(tmp_path, SECRET)
old_ts = str(int(time.time()) - 1000)
sig = compute_signature(old_ts, "GET", "/api/v1/sync/cursor", b"", SECRET)
r = c.get(
"/api/v1/sync/cursor",
headers={"X-Claire-Timestamp": old_ts, "X-Claire-Signature": sig},
)
assert r.status_code == 401
def test_wrong_secret_rejected(tmp_path: Path) -> None:
c = _client(tmp_path, SECRET)
headers = _signed("GET", "/api/v1/sync/cursor", b"", OTHER_SECRET)
r = c.get("/api/v1/sync/cursor", headers=headers)
assert r.status_code == 401
def test_no_secret_passes_through(tmp_path: Path) -> None:
"""When sync_secret is None, sync endpoints are unauthenticated."""
c = _client(tmp_path, None)
r = c.get("/api/v1/sync/cursor")
assert r.status_code == 200
r = c.post("/api/v1/sync/events", json={"events": []})
assert r.status_code == 200
def test_health_unauthenticated_even_with_secret(tmp_path: Path) -> None:
c = _client(tmp_path, SECRET)
r = c.get("/api/v1/health")
assert r.status_code == 200
# ---------------------------------------------------------------------------
# End-to-end convergence with signed sync
# ---------------------------------------------------------------------------
def _swap_httpx_to_clients(clients_by_base: dict[str, TestClient]):
def fake_get(url: str, **kwargs):
for base, client in clients_by_base.items():
if url.startswith(base):
path = url[len(base):]
return client.get(
path,
params=kwargs.get("params"),
timeout=kwargs.get("timeout"),
headers=kwargs.get("headers"),
)
raise RuntimeError(f"no TestClient registered for {url}")
def fake_post(url: str, **kwargs):
for base, client in clients_by_base.items():
if url.startswith(base):
path = url[len(base):]
if "content" in kwargs:
return client.post(
path,
content=kwargs.get("content"),
timeout=kwargs.get("timeout"),
headers=kwargs.get("headers"),
)
return client.post(
path,
json=kwargs.get("json"),
timeout=kwargs.get("timeout"),
headers=kwargs.get("headers"),
)
raise RuntimeError(f"no TestClient registered for {url}")
def fake_request(method: str, url: str, **kwargs):
if method.upper() == "GET":
return fake_get(url, **kwargs)
if method.upper() == "POST":
return fake_post(url, **kwargs)
raise RuntimeError(f"unsupported method {method}")
return patch.multiple(
httpx, get=fake_get, post=fake_post, request=fake_request,
)
def test_signed_two_claires_converge(tmp_path: Path) -> None:
a_secret = "a-secret-" + "A" * 40
b_secret = "b-secret-" + "B" * 40
a_dir = tmp_path / "a"
b_dir = tmp_path / "b"
a_dir.mkdir()
b_dir.mkdir()
a = TestClient(create_app(
config_path=a_dir / "claire.toml",
db_path=a_dir / "claire.db",
sync_secret=a_secret,
))
b = TestClient(create_app(
config_path=b_dir / "claire.toml",
db_path=b_dir / "claire.db",
sync_secret=b_secret,
))
# Need to sign requests we make directly via TestClient with each side's
# own secret.
def signed_get(client, path, secret, **kw):
headers = _signed("GET", path, b"", secret)
return client.get(path, headers=headers, **kw)
def signed_post(client, path, body_obj, secret):
body = json.dumps(body_obj, separators=(",", ":")).encode()
headers = _signed("POST", path, body, secret)
headers["content-type"] = "application/json"
return client.post(path, content=body, headers=headers)
# Unsigned writes via the non-sync API still work (no auth on /projects).
a.post("/api/v1/projects", json={"name": "alpha"})
a.post("/api/v1/tasks", json={"project": "alpha", "title": "task on A"})
b.post("/api/v1/projects", json={"name": "beta"})
b.post("/api/v1/tasks", json={"project": "beta", "title": "task on B"})
BASE_A = "http://machine-a"
BASE_B = "http://machine-b"
# A knows B's secret (to talk to B); B knows A's (to talk to A).
peer_b = PeerConfig(url=BASE_B, secret=b_secret)
with _swap_httpx_to_clients({BASE_A: a, BASE_B: b}):
from claire.sync import sync_peer
result = sync_peer(peer_b, local_base=BASE_A, local_secret=a_secret)
assert result.pulled > 0
assert result.pushed > 0
# Verify both sides see both projects.
a_names = {p["name"] for p in a.get("/api/v1/projects").json()["projects"]}
b_names = {p["name"] for p in b.get("/api/v1/projects").json()["projects"]}
assert a_names == {"alpha", "beta"}
assert b_names == {"alpha", "beta"}
def test_peers_back_compat_plain_strings(tmp_path: Path) -> None:
"""Legacy `peers = ["http://..."]` form still parses."""
from claire.config import ClaireConfig
cfg = ClaireConfig.model_validate({
"machine_id": "abc",
"peers": ["http://10.0.0.1:8765"],
})
assert len(cfg.peers) == 1
assert cfg.peers[0].url == "http://10.0.0.1:8765"
assert cfg.peers[0].secret is None
def test_peers_object_form(tmp_path: Path) -> None:
from claire.config import ClaireConfig
cfg = ClaireConfig.model_validate({
"machine_id": "abc",
"peers": [{"url": "http://10.0.0.1:8765", "secret": "s"}],
})
assert cfg.peers[0].url == "http://10.0.0.1:8765"
assert cfg.peers[0].secret == "s"