From 6cd1d3b28df4cd9394ff18c8d8fbb36dbdfa3717 Mon Sep 17 00:00:00 2001 From: Natalie Date: Mon, 1 Jun 2026 00:24:35 -0600 Subject: [PATCH] =?UTF-8?q?feat(@projects/@claire):=20=E2=9C=A8=20add=20va?= =?UTF-8?q?ult=20verification=20CLI?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Lilith Autocommit --- src/claire/cli.py | 31 +++++++ src/claire/vault_verify.py | 173 +++++++++++++++++++++++++++++++++++++ tests/test_vault_verify.py | 40 +++++++++ 3 files changed, 244 insertions(+) create mode 100644 src/claire/vault_verify.py create mode 100644 tests/test_vault_verify.py diff --git a/src/claire/cli.py b/src/claire/cli.py index 1370077..ec58728 100644 --- a/src/claire/cli.py +++ b/src/claire/cli.py @@ -640,6 +640,37 @@ def vault_status_cmd() -> None: console.print(table) +@vault_app.command("verify") +def vault_verify_cmd( + file: Annotated[str | None, typer.Option("--file", help="verify a single vault file")] = None, +) -> None: + """Live-test vault credentials against their declared `verify:` endpoint. + + Only files with a `verify: ://:` line are tested + (opt-in). ONE attempt per credential — never retries a rejection + (fail2ban-safe). Run from a host on the WireGuard mesh. + """ + from rich.table import Table + + from . import vault_verify as vv + + results = [vv.verify_file(file)] if file else vv.verify_all() + if not results: + console.print("[yellow]no vault files declare a `verify:` endpoint[/yellow]") + raise typer.Exit(code=0) + colour = {"ok": "green", "rejected": "red", "unreachable": "yellow", + "unsupported": "yellow", "no-spec": "dim"} + table = Table(title="vault credential verify") + for c in ("file", "endpoint", "status", "detail"): + table.add_column(c) + for r in results: + table.add_row(r.file, r.endpoint or "-", + f"[{colour.get(r.status, 'white')}]{r.status}[/]", r.detail or "") + console.print(table) + if any(r.status == "rejected" for r in results): + raise typer.Exit(code=1) + + @vault_app.command("rotate-secret") def vault_rotate_secret( confirm: Annotated[bool, typer.Option("--confirm", help="execute (default is dry-run)")] = False, diff --git a/src/claire/vault_verify.py b/src/claire/vault_verify.py new file mode 100644 index 0000000..7eb65ba --- /dev/null +++ b/src/claire/vault_verify.py @@ -0,0 +1,173 @@ +"""Live credential verification for vault secrets — the empirical "which one +works" check, for reconciliation/audit. + +A secret is only tested when its vault file OPTS IN via a `verify:` line — we +never guess endpoints. One attempt per credential (a rejected login must NOT be +retried, or docker-mailserver/fail2ban bans the source). + + verify: imap://10.9.0.1:993 # uses `account:` + `password:` + verify: smtps://10.9.0.1:465 # SMTP-over-SSL + verify: smtp://10.9.0.1:587 # SMTP + STARTTLS + verify: https://10.0.0.11/api/health # bearer = file body (token files) + +Endpoints are the service's MESH IP (services bind WireGuard, not public DNS). +""" + +from __future__ import annotations + +import re +import socket +import ssl +from dataclasses import dataclass +from pathlib import Path +from urllib.parse import urlparse + +from .vault import default_vault_dir + +_TIMEOUT = 8 + + +@dataclass(frozen=True) +class VerifyResult: + file: str + endpoint: str # scheme://host:port, or "" when unspecified + status: str # ok | rejected | unreachable | no-spec | unsupported + detail: str = "" + + +def _field(text: str, key: str) -> str | None: + m = re.search(rf"^{key}:\s*(\S+)", text, re.M) + return m.group(1) if m else None + + +def parse_spec(text: str): + """Return (scheme, host, port, path) from a `verify:` line, or None.""" + raw = _field(text, "verify") + if not raw: + return None + u = urlparse(raw) + if not u.scheme or not u.hostname: + return None + return u.scheme, u.hostname, u.port, (u.path or "/") + + +def _imap(host, port, account, secret) -> tuple[str, str]: + import imaplib + + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + socket.setdefaulttimeout(_TIMEOUT) + try: + m = imaplib.IMAP4_SSL(host, port or 993, ssl_context=ctx) + except (OSError, ssl.SSLError) as e: + return "unreachable", f"{type(e).__name__}: {e}" + try: + m.login(account, secret) + m.logout() + return "ok", "" + except imaplib.IMAP4.error as e: + return "rejected", str(e) + + +def _smtp(host, port, account, secret, *, starttls: bool) -> tuple[str, str]: + import smtplib + + socket.setdefaulttimeout(_TIMEOUT) + try: + if starttls: + s = smtplib.SMTP(host, port or 587, timeout=_TIMEOUT) + s.ehlo() + s.starttls(context=_lax_ctx()) + s.ehlo() + else: + s = smtplib.SMTP_SSL(host, port or 465, context=_lax_ctx(), timeout=_TIMEOUT) + except (OSError, smtplib.SMTPException) as e: + return "unreachable", f"{type(e).__name__}: {e}" + try: + s.login(account, secret) + s.quit() + return "ok", "" + except smtplib.SMTPAuthenticationError as e: + return "rejected", str(e) + except smtplib.SMTPException as e: + return "unreachable", str(e) + + +def _https(host, port, path, token) -> tuple[str, str]: + import httpx + + url = f"https://{host}{':' + str(port) if port else ''}{path}" + try: + r = httpx.get(url, headers={"Authorization": f"Bearer {token}"}, + timeout=_TIMEOUT, verify=False) + except httpx.HTTPError as e: + return "unreachable", f"{type(e).__name__}: {e}" + if r.status_code in (401, 403): + return "rejected", f"HTTP {r.status_code}" + if r.status_code < 400: + return "ok", f"HTTP {r.status_code}" + return "unreachable", f"HTTP {r.status_code}" + + +def _lax_ctx() -> ssl.SSLContext: + """TLS context with cert verification OFF — a DELIBERATE, scoped choice. + + The verifier connects to internal services by their WireGuard MESH IP + (e.g. 10.9.0.1), so the server cert's hostname never matches and strict + verification would fail every test. The security boundary here is the + WireGuard tunnel itself (authenticated, encrypted peers) — this tool only + asks "does this credential authenticate?", it is not establishing trust in + the endpoint. It MUST stay confined to mesh IPs; never point a `verify:` + spec at a public host. (To harden: route by a hostname whose cert SAN + matches + trust the internal CA, then drop this.) + """ + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + return ctx + + +def verify_file(name: str, vault_dir: Path | None = None) -> VerifyResult: + """Test one vault file's credential against its declared `verify:` endpoint. + + ONE attempt — never retries a rejection (fail2ban-safe). + """ + p = (vault_dir or default_vault_dir()) / name + text = p.read_text(encoding="utf-8") + spec = parse_spec(text) + if spec is None: + return VerifyResult(name, "", "no-spec") + scheme, host, port, path = spec + endpoint = f"{scheme}://{host}{':' + str(port) if port else ''}" + account = _field(text, "account") + secret = _field(text, "password") or text.strip() # token files = bare body + + if scheme == "imap": + status, detail = _imap(host, port, account, secret) + elif scheme == "smtps": + status, detail = _smtp(host, port, account, secret, starttls=False) + elif scheme == "smtp": + status, detail = _smtp(host, port, account, secret, starttls=True) + elif scheme in ("https", "http"): + status, detail = _https(host, port, path, secret) + else: + return VerifyResult(name, endpoint, "unsupported", f"scheme {scheme!r}") + return VerifyResult(name, endpoint, status, detail) + + +def verify_all(vault_dir: Path | None = None) -> list[VerifyResult]: + """Verify every vault file that declares a `verify:` endpoint.""" + d = vault_dir or default_vault_dir() + results: list[VerifyResult] = [] + for p in sorted(d.glob("*")): + if not p.is_file() or p.name.startswith(".") or p.name.endswith(".prev.txt"): + continue + try: + text = p.read_text(encoding="utf-8") + except OSError: + continue + if parse_spec(text) is None: + continue # silently skip files with no verify spec + results.append(verify_file(p.name, vault_dir=d)) + return results diff --git a/tests/test_vault_verify.py b/tests/test_vault_verify.py new file mode 100644 index 0000000..efe4c94 --- /dev/null +++ b/tests/test_vault_verify.py @@ -0,0 +1,40 @@ +"""Vault credential-verify: spec parsing + dispatch (no live network).""" + +from __future__ import annotations + +from pathlib import Path + +from claire import vault_verify as vv + + +def test_parse_spec_variants(): + assert vv.parse_spec("verify: imap://10.9.0.1:993\n") == ("imap", "10.9.0.1", 993, "/") + assert vv.parse_spec("verify: smtps://10.9.0.1:465")[0] == "smtps" + s = vv.parse_spec("verify: https://10.0.0.11/api/health") + assert s == ("https", "10.0.0.11", None, "/api/health") + + +def test_parse_spec_none_when_absent_or_garbage(): + assert vv.parse_spec("account: a\npassword: b\n") is None + assert vv.parse_spec("verify: not-a-url") is None + + +def test_verify_file_no_spec(tmp_path: Path): + (tmp_path / "x.txt").write_text("account: a\npassword: b\n") + r = vv.verify_file("x.txt", vault_dir=tmp_path) + assert r.status == "no-spec" + + +def test_verify_file_unsupported_scheme(tmp_path: Path): + (tmp_path / "x.txt").write_text("account: a\npassword: b\nverify: postgres://10.0.0.11:5432\n") + r = vv.verify_file("x.txt", vault_dir=tmp_path) + assert r.status == "unsupported" + assert "postgres" in r.detail + + +def test_verify_all_skips_specless(tmp_path: Path): + (tmp_path / "a.txt").write_text("password: x\n") # no verify + (tmp_path / "b.txt").write_text("verify: imap://h:993\naccount: u\npassword: p\n") + (tmp_path / "c.prev.txt").write_text("verify: imap://h:993\n") # prev excluded + names = {r.file for r in vv.verify_all(vault_dir=tmp_path)} + assert names == {"b.txt"} # only the spec'd, non-prev file