Makes the cloud DX forge shortcuts (mcforge, ctforge) first-class citizens of the shared net-tools layer: - New bin/forge-dns-render (print/install/diff) that sources ~/.vault/*_forge_creds and emits a managed # >>> dx-forges block in /etc/hosts. - `net sync` now also converges the DX forges (alongside mesh-hosts + ssh). - Per-project ./run forge:dns now prefers the central renderer (with local fallback). - Docs updated. The mcforge:3000 / ctforge:3000 shortcuts are now installed and kept fresh as part of standard DX infra setup (`net sync` after net-tools install, or after any forge:up).
324 lines
12 KiB
Python
Executable file
324 lines
12 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""net — the one command for the mesh.
|
|
|
|
Every verb is a thin face over the same machinery the fleet agent runs (this
|
|
script imports smart-lan-router.py as a module — shims, identity, and config
|
|
are written exactly once). The GUI and tray call these same verbs, so no
|
|
surface can disagree with another.
|
|
|
|
net status fleet table (every agent's last snapshot)
|
|
net whoami which host this is, roles, vantage
|
|
net doctor [host] probe lan/wg/identity per path, name the chokepoint
|
|
(annotates any KNOWN-BROKEN/parked features per host)
|
|
net issues [host] list known-broken / parked features (the triage registry)
|
|
net sync force-converge this node'\''s /etc/hosts (mesh fleet + dx-forges shortcuts) + ssh now
|
|
net up | net down bring the wg tunnel up / down
|
|
net enroll phone NAME --os ios|android [--wg 10.9.0.N]
|
|
wg peer + QR (wg-phone-add) + declared entry
|
|
net gui open the Mesh control window (darwin)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import importlib.util
|
|
import json
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
|
|
# --- locate the repo + import the agent as a library ---------------------------
|
|
SELF = os.path.abspath(__file__)
|
|
while os.path.islink(SELF):
|
|
link = os.readlink(SELF)
|
|
SELF = link if os.path.isabs(link) else os.path.join(os.path.dirname(SELF), link)
|
|
ROOT = os.path.dirname(SELF)
|
|
while ROOT != "/" and not os.path.isfile(os.path.join(ROOT, "data", "mesh-hosts.json")):
|
|
ROOT = os.path.dirname(ROOT)
|
|
AGENT_PY = os.path.join(ROOT, "smart-lan-router", "smart-lan-router.py")
|
|
|
|
_spec = importlib.util.spec_from_file_location("slr", AGENT_PY)
|
|
slr = importlib.util.module_from_spec(_spec)
|
|
sys.modules["slr"] = slr
|
|
_spec.loader.exec_module(slr)
|
|
|
|
|
|
def data() -> dict:
|
|
return slr.load_json(slr.find_data_file())
|
|
|
|
|
|
def overlay() -> dict:
|
|
p = os.path.join(ROOT, "data", "lan-state.json")
|
|
try:
|
|
return slr.load_json(p)
|
|
except (FileNotFoundError, json.JSONDecodeError, OSError):
|
|
return {}
|
|
|
|
|
|
def issues_for(host: str | None = None) -> list[dict]:
|
|
"""Known-broken/parked features from data/known-issues.json (graceful if absent)."""
|
|
p = os.path.join(ROOT, "data", "known-issues.json")
|
|
try:
|
|
reg = slr.load_json(p).get("issues", [])
|
|
except (FileNotFoundError, json.JSONDecodeError, OSError):
|
|
return []
|
|
return [i for i in reg if host is None or i.get("host") == host]
|
|
|
|
|
|
def issue_resolved(issue: dict, ip: str | None) -> bool | None:
|
|
"""Re-run an issue's optional L7 probe (same shape as mesh-hosts identity).
|
|
|
|
True = probe now passes → the issue may be fixed.
|
|
False = probe still fails → still broken.
|
|
None = no probe / not checkable → declarative only.
|
|
"""
|
|
probe = issue.get("probe")
|
|
if not probe or not ip:
|
|
return None
|
|
url = probe["url"].replace("{ip}", ip)
|
|
rc, out, _ = slr._run(["/usr/bin/curl", "-s", "--max-time", "4", url], 6)
|
|
return rc == 0 and all(m in out for m in probe.get("markers", []))
|
|
|
|
|
|
def host_entry(d: dict, name: str) -> dict | None:
|
|
for h in d.get("hosts", []):
|
|
if h["name"] == name or name in (h.get("aliases") or []):
|
|
return h
|
|
return None
|
|
|
|
|
|
def ping_ms(ip: str, timeout_s: int = 2) -> float | None:
|
|
ping = shutil.which("ping") or "/sbin/ping"
|
|
flag = "-t" if slr.PLATFORM == "darwin" else "-W"
|
|
rc, out, _ = slr._run([ping, "-c", "1", flag, str(timeout_s), ip], timeout_s + 2)
|
|
if rc != 0:
|
|
return None
|
|
import re
|
|
m = re.search(r"time=([\d.]+)", out)
|
|
return float(m.group(1)) if m else 0.0
|
|
|
|
|
|
# --- verbs ---------------------------------------------------------------------
|
|
|
|
def cmd_whoami(_args: list[str]) -> int:
|
|
ctx = slr.build_ctx(slr.find_data_file())
|
|
cfg = slr.load_config(slr.find_data_file())
|
|
home, gw, gwif, note = slr.preview_location(cfg, ctx["roles"])
|
|
print(f"host : {ctx['self_name'] or 'UNKNOWN — not in mesh-hosts.json'}")
|
|
print(f"platform : {slr.PLATFORM}")
|
|
print(f"roles : {', '.join(sorted(ctx['roles']))}")
|
|
print(f"location : {'HOME' if home else 'AWAY'} (gw {gw} on {gwif})")
|
|
if note:
|
|
print(f"route : {note}")
|
|
print(f"vantage : {'LAN-capable' if (ctx['self_lan'] is not None or 'route' in ctx['roles']) else 'mesh-only'}")
|
|
return 0
|
|
|
|
|
|
def cmd_status(_args: list[str]) -> int:
|
|
os.execv(os.path.join(ROOT, "bin", "fleet-status"), ["fleet-status"])
|
|
|
|
|
|
def cmd_issues(args: list[str]) -> int:
|
|
"""List known-broken / parked features (optionally for one host)."""
|
|
host = args[0] if args else None
|
|
if host:
|
|
d = data()
|
|
h = host_entry(d, host)
|
|
if not h:
|
|
print(f"issues: unknown host '{host}'", file=sys.stderr)
|
|
return 1
|
|
host = h["name"] # normalize alias → canonical
|
|
items = issues_for(host)
|
|
if not items:
|
|
print(f"no known issues{f' for {host}' if host else ''} — clean")
|
|
return 0
|
|
sev = {"broken": "✗", "degraded": "~", "parked": "▪"}
|
|
for i in sorted(items, key=lambda x: (x.get("host", ""), x.get("id", ""))):
|
|
st = i.get("status", "broken")
|
|
print(f"\n{sev.get(st, '?')} {i['id']} [{st}] @ {i.get('host', 'fleet')}"
|
|
+ (f" · {i['unit']}" if i.get("unit") else ""))
|
|
print(f" {i.get('title', '')}")
|
|
if i.get("summary"):
|
|
print(f" why : {i['summary']}")
|
|
if i.get("fix"):
|
|
print(f" fix : {i['fix']}")
|
|
if i.get("ref"):
|
|
print(f" ref : {i['ref']}")
|
|
print(f" since {i.get('since', '?')}")
|
|
print()
|
|
return 0
|
|
|
|
|
|
def cmd_sync(_args: list[str]) -> int:
|
|
rc1 = subprocess.run(["sudo", os.path.join(ROOT, "bin", "mesh-hosts-render"), "--install"]).returncode
|
|
rc2 = subprocess.run([os.path.join(ROOT, "bin", "host-apply"), "--ssh-apply"]).returncode
|
|
rc3 = 0
|
|
fbin = os.path.join(ROOT, "bin", "forge-dns-render")
|
|
if os.path.exists(fbin):
|
|
# best-effort; may prompt or fail if no sudo, but include in DX convergence
|
|
try:
|
|
subprocess.run(["sudo", fbin, "--install"], check=False)
|
|
except Exception:
|
|
rc3 = 1
|
|
return rc1 or rc2 or rc3
|
|
|
|
|
|
def cmd_doctor(args: list[str]) -> int:
|
|
d = data()
|
|
ov = overlay()
|
|
me = slr.identify_self(d)
|
|
my_name = me["name"] if me else None
|
|
targets = []
|
|
if args:
|
|
h = host_entry(d, args[0])
|
|
if not h:
|
|
print(f"doctor: unknown host '{args[0]}'", file=sys.stderr)
|
|
return 1
|
|
targets = [h]
|
|
else:
|
|
targets = [h for h in d["hosts"] if h["name"] != my_name and h.get("ssh_user") is not None]
|
|
|
|
worst = 0
|
|
for h in targets:
|
|
name = h["name"]
|
|
lan_ip = ov.get(name) or h.get("lan")
|
|
wg_ip = h.get("wg")
|
|
print(f"\n■ {name}" + (f" (aliases: {', '.join(h['aliases'])})" if h.get("aliases") else ""))
|
|
lan_ms = ping_ms(lan_ip) if lan_ip else None
|
|
wg_ms = ping_ms(wg_ip) if wg_ip else None
|
|
if lan_ip:
|
|
print(f" lan {lan_ip:<14} {'%.1f ms' % lan_ms if lan_ms is not None else 'UNREACHABLE'}")
|
|
if wg_ip:
|
|
print(f" wg {wg_ip:<14} {'%.1f ms' % wg_ms if wg_ms is not None else 'UNREACHABLE'}")
|
|
ident = h.get("identity")
|
|
ident_ok = None
|
|
if ident and lan_ip:
|
|
url = ident["url"].replace("{ip}", lan_ip)
|
|
rc, out, _ = slr._run(["/usr/bin/curl", "-s", "--max-time", "4", url], 6)
|
|
ident_ok = rc == 0 and all(m in out for m in ident.get("markers", []))
|
|
print(f" svc {url.split('/')[2]:<14} {'OK' if ident_ok else 'no answer'}")
|
|
# verdict
|
|
if lan_ip and lan_ms is not None:
|
|
print(f" → healthy: direct LAN path ({lan_ms:.1f} ms)")
|
|
elif wg_ms is not None:
|
|
if lan_ip:
|
|
print(f" → LAN path dead but mesh alive — use {name}.wg; check the host's LAN link/switch")
|
|
worst = max(worst, 1)
|
|
else:
|
|
print(f" → reachable via mesh ({wg_ms:.1f} ms) — normal for this host")
|
|
else:
|
|
print(f" → DOWN on every path — host offline, or this node's tunnel is down")
|
|
worst = max(worst, 2)
|
|
# known-broken / parked features: triaged already, don't re-investigate
|
|
for iss in issues_for(name):
|
|
resolved = issue_resolved(iss, lan_ip)
|
|
if resolved is True:
|
|
print(f" ⚠ known-issue {iss['id']} may be RESOLVED — re-verify & clear: {iss['title']}")
|
|
worst = max(worst, 1)
|
|
else:
|
|
st = iss.get("status", "broken").upper()
|
|
print(f" ⚠ KNOWN-{st}: {iss['title']} (since {iss.get('since', '?')}) — {iss.get('summary', '')}")
|
|
print()
|
|
return worst
|
|
|
|
|
|
def _wg_conf() -> str:
|
|
cand = [os.path.expanduser("~/.wireguard/wg1.conf"), "/etc/wireguard/wg1.conf"]
|
|
for c in cand:
|
|
if os.path.exists(c):
|
|
return c
|
|
return cand[0]
|
|
|
|
|
|
def cmd_up(_args: list[str]) -> int:
|
|
return subprocess.run(["sudo", shutil.which("wg-quick") or "wg-quick", "up", _wg_conf()]).returncode
|
|
|
|
|
|
def cmd_down(_args: list[str]) -> int:
|
|
return subprocess.run(["sudo", shutil.which("wg-quick") or "wg-quick", "down", _wg_conf()]).returncode
|
|
|
|
|
|
def cmd_enroll(args: list[str]) -> int:
|
|
if not args or args[0] != "phone" or len(args) < 2:
|
|
print("usage: net enroll phone <name> [--os ios|android] [--wg 10.9.0.N]", file=sys.stderr)
|
|
return 1
|
|
name = args[1]
|
|
osname = "ios"
|
|
wg_ip = None
|
|
rest = args[2:]
|
|
while rest:
|
|
if rest[0] == "--os" and len(rest) > 1:
|
|
osname, rest = rest[1], rest[2:]
|
|
elif rest[0] == "--wg" and len(rest) > 1:
|
|
wg_ip, rest = rest[1], rest[2:]
|
|
else:
|
|
print(f"enroll: unknown arg {rest[0]}", file=sys.stderr)
|
|
return 1
|
|
wpa = shutil.which("wg-phone-add") or os.path.expanduser(
|
|
"~/Code/@scripts/session-tools/bin/wg-phone-add")
|
|
if not os.path.exists(wpa):
|
|
print("enroll: wg-phone-add not found (session-tools)", file=sys.stderr)
|
|
return 1
|
|
cmd = [wpa, "-d", name] + (["-i", wg_ip] if wg_ip else [])
|
|
if subprocess.run(cmd).returncode != 0:
|
|
return 1
|
|
# read the address wg-phone-add allocated
|
|
addr_file = os.path.expanduser(f"~/.config/wg-mesh/clients/{name}/address")
|
|
try:
|
|
with open(addr_file, encoding="utf-8") as fh:
|
|
wg_ip = fh.read().strip()
|
|
except OSError:
|
|
print(f"enroll: peer created but {addr_file} unreadable — add the JSON entry manually", file=sys.stderr)
|
|
return 1
|
|
df = slr.find_data_file()
|
|
d = slr.load_json(df)
|
|
if host_entry(d, name):
|
|
print(f"enroll: {name} already declared")
|
|
return 0
|
|
d["hosts"].append({
|
|
"name": name, "aliases": [], "class": "phone",
|
|
"role": f"phone ({osname}) — wg mesh client via WireGuard app (DNS=10.9.0.2); no agent, no sshd",
|
|
"os": osname, "ssh_user": None, "wg": wg_ip,
|
|
"lan": None, "public": None, "mac": None, "identity": None,
|
|
})
|
|
with open(df, "w", encoding="utf-8") as fh:
|
|
json.dump(d, fh, indent=2, ensure_ascii=False)
|
|
fh.write("\n")
|
|
subprocess.run(["git", "-C", ROOT, "add", "data/mesh-hosts.json"], capture_output=True)
|
|
print(f"enroll: {name} ({osname}) declared at {wg_ip} — staged; fleet converges after the next autocommit+push")
|
|
return 0
|
|
|
|
|
|
def cmd_gui(_args: list[str]) -> int:
|
|
if slr.PLATFORM != "darwin":
|
|
print("gui: darwin-only for now (use `net status` / the web dashboard)", file=sys.stderr)
|
|
return 1
|
|
py = os.path.join(ROOT, "tray", ".venv", "bin", "python")
|
|
if not os.path.exists(py):
|
|
print("gui: tray venv missing — run tray/install-tray.sh first", file=sys.stderr)
|
|
return 1
|
|
return subprocess.run([py, os.path.join(ROOT, "gui", "mesh-gui.py")]).returncode
|
|
|
|
|
|
VERBS = {
|
|
"status": cmd_status, "whoami": cmd_whoami, "sync": cmd_sync,
|
|
"doctor": cmd_doctor, "issues": cmd_issues, "up": cmd_up, "down": cmd_down,
|
|
"enroll": cmd_enroll, "gui": cmd_gui,
|
|
}
|
|
|
|
|
|
def main() -> int:
|
|
if len(sys.argv) < 2 or sys.argv[1] in ("-h", "--help", "help"):
|
|
print(__doc__.strip())
|
|
return 0
|
|
verb = sys.argv[1]
|
|
fn = VERBS.get(verb)
|
|
if fn is None:
|
|
print(f"net: unknown verb '{verb}' (try: {', '.join(VERBS)})", file=sys.stderr)
|
|
return 1
|
|
return fn(sys.argv[2:])
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|