Compare commits

...
Sign in to create a new pull request.

3 commits

Author SHA1 Message Date
Natalie
8fde986eb3 chore(registry): cut @lilith npm/swift registry from dead black to ct-forge (134.199.243.61)
black homelan is gone; point install+publish+auth at the live cocotte ct-forge
verdaccio (:4873) / forgejo (:3000). Config-only; resolution verified.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-28 18:37:48 -04:00
Natalie
a7a1d0b22b ci: give build job the @lilith registry auth (was silently under-installing)
The build job ran bun install against the repo bunfig (npm.black.lan, empty
token), under-installing so build/typecheck failed — every prior build run was
red on main for this reason, not the source. Add the forge.nasty.sh registry +
NPM_TOKEN .npmrc (TLS verification left on) so the build job installs the full
tree and actually verifies.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-21 08:44:51 -05:00
Natalie
b252753476 feat(analytics): canonical store on black, vps-0 edge with redis outage-spool
Relocate the canonical analytics store off the public VPS. The collector
becomes a DB-free edge that captures + durably enqueues events to black's
redis; a black-side ingest-writer enriches and writes raw_events. When black
is unreachable the collector spools to a local appendonly redis that only
grows during the outage and drains on recovery.

- shared: RawEventEnvelope/NormalizedEvent ingest contract (edge -> writer)
- collector: capture-and-enqueue + dual-redis RedisRouter (primary=black,
  spool=local) + paused-until-healthy drain worker; drop TypeORM/enrichment
- processor: IngestService canonical writer (edge receivedAt, ON CONFLICT
  DO NOTHING), single worker branches ingest-event -> process-event
- relocate device/identity/domain/attribution enrichment + entities to writer

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-21 07:48:02 -05:00
42 changed files with 1301 additions and 2240 deletions

View file

@ -9,6 +9,8 @@ on:
jobs:
build:
runs-on: ubuntu-latest
env:
NPM_TOKEN: ${{ secrets.NPM_TOKEN }}
steps:
- uses: actions/checkout@v4
@ -20,6 +22,15 @@ jobs:
- name: Setup bun
run: npm install -g bun
# The build job must resolve registry @lilith/* deps (gov-detection,
# configs) the same way publish does. Without this it installs against the
# repo bunfig (legacy npm.black.lan, empty token) and silently under-installs —
# the reason every prior `build` run failed even on main.
- name: Configure registry
run: |
echo "@lilith:registry=https://forge.nasty.sh/api/packages/lilith/npm/" > .npmrc
echo "//forge.nasty.sh/api/packages/lilith/npm/:_authToken=${NPM_TOKEN}" >> .npmrc
- name: Install dependencies
run: bun install --no-frozen-lockfile

View file

@ -77,6 +77,22 @@
---
## Operational Invariants
- **Prod runs `synchronize: false` with no migration runner.** A new `@Column` does NOT
reach a long-lived prod database on its own — every INSERT referencing it then throws
`column "…" does not exist`, and if the write path swallows errors (e.g. session
fingerprinting) the failure is silent: `raw_events` keeps filling while the derived
table (`session_fingerprints`) freezes and its dashboard pages (Traffic/Audience/
Network) silently show `0`. **When you add a column prod must have, add it to the
processor's `SchemaGuardService` too** (idempotent `ALTER TABLE … ADD COLUMN IF NOT
EXISTS`). See [Schema Management & Drift](./docs/deployment.md#schema-management--drift).
- **Deploys build `linux/amd64` images off the VPS** (vps-0 has 4 GB RAM and OOMs on
build). `BUILD_HOST=black` (default, native) / `local` (emulated) / `quinn-vps` (last
resort). See [scripts/README.md](./scripts/README.md).
---
## Development
```bash

View file

@ -310,6 +310,52 @@ CREATE TABLE daily_metrics (
SELECT add_retention_policy('raw_events', INTERVAL '90 days');
```
### Schema Management & Drift
Production runs TypeORM with **`synchronize: false`** (auto-sync risks destructive
changes) and **no migration runner**. Two consequences that have caused real outages:
1. **The processor's `SchemaGuardService` is the schema authority** for DDL the entity
decorators can't express, or that a long-lived / freshly-provisioned database might
lack. It runs on processor startup (`onModuleInit`) and idempotently ensures critical
objects — e.g. the `aggregated_metrics` `NULLS NOT DISTINCT` dedup index and the
`session_fingerprints` enrichment columns.
2. **Adding a `@Column` to an entity does NOT add it to a long-lived prod table.** With
`synchronize` off, the column exists in code but not in the database, so every INSERT
referencing it throws `column "…" does not exist`. If that write path swallows errors
(e.g. `upsertSessionFingerprint` treats fingerprinting as best-effort), the failure is
**silent**: the canonical table (`raw_events`) keeps filling while the derived table
(`session_fingerprints`) freezes.
**Symptom:** dashboard pages backed by the derived table — Traffic, Audience, Network,
which read `session_fingerprints` — show `0` / "no data", while raw-event-backed pages
(Overview, Pages, Events) look fine. The API returns a successful empty `[]`, so it
reads as a quiet period, not an error.
**Rule: when you add an entity column prod must have, add it to `SchemaGuardService` too.**
```ts
// services/processor/src/schema-guard.service.ts → onModuleInit()
await this.dataSource.query(`
ALTER TABLE IF EXISTS session_fingerprints
ADD COLUMN IF NOT EXISTS "newField" varchar(30)
`);
```
Additive `ADD COLUMN IF NOT EXISTS` is safe on every startup. To unblock a running prod
DB immediately (no redeploy — the running service's next INSERT succeeds once the column
exists):
```sql
ALTER TABLE session_fingerprints ADD COLUMN IF NOT EXISTS "newField" varchar(30);
```
> Incident history (same class both times): 2026-05-16→06-07 — missing `aggregated_metrics`
> dedup index, every aggregation failing for three weeks. 2026-06-21 — missing
> `session_fingerprints` gov/ASN columns (`isGovernment`, `orgType`, `responseTier`,
> `org`, `asn`), every fingerprint INSERT failing, Traffic/Audience/Network blank.
## Nginx Configuration
```nginx

View file

@ -10,7 +10,7 @@
},
"repository": {
"type": "git",
"url": "http://forge.black.lan/lilith/packages.git"
"url": "http://134.199.243.61:3000/lilith/packages.git"
},
"license": "MIT",
"main": "./dist/index.js",
@ -88,7 +88,7 @@
"vitest": "^4.0.17"
},
"publishConfig": {
"registry": "http://forge.black.lan/api/packages/lilith/npm/"
"registry": "http://134.199.243.61:4873/"
},
"_": {
"registry": "forgejo",

View file

@ -10,7 +10,7 @@
},
"repository": {
"type": "git",
"url": "http://forge.black.lan/lilith/packages.git"
"url": "http://134.199.243.61:3000/lilith/packages.git"
},
"license": "MIT",
"main": "./dist/index.js",
@ -55,7 +55,7 @@
"vitest": "^4.0.17"
},
"publishConfig": {
"registry": "http://forge.black.lan/api/packages/lilith/npm/"
"registry": "http://134.199.243.61:4873/"
},
"_": {
"registry": "forgejo",

View file

@ -77,7 +77,7 @@
"author": "Lilith Collective",
"license": "NONE",
"publishConfig": {
"registry": "http://forge.black.lan/api/packages/lilith/npm/"
"registry": "http://134.199.243.61:4873/"
},
"_": {
"registry": "forgejo",

View file

@ -12,3 +12,6 @@ export * from './common';
// GDPR Types
export * from './gdpr';
// Ingest envelope (edge collector → black-side ingest-writer contract)
export * from './ingest';

View file

@ -0,0 +1,123 @@
/**
* Ingest envelope the durable contract between the edge collector (producer)
* and the canonical-store ingest-writer (consumer).
*
* The collector does NO database work. It captures the request at the edge,
* wraps it in a {@link RawEventEnvelope}, and enqueues it to redis. All
* enrichment (visitor identity, domain resolution, device fingerprint) and the
* canonical `raw_events` write happen on the black-side ingest-writer when the
* envelope is consumed. This makes the redis enqueue not a synchronous DB
* write the durability boundary, so a canonical-store outage spools events
* locally instead of dropping them.
*/
/** BullMQ queue shared by the collector (producer) and the black-side workers. */
export const EVENTS_QUEUE = 'analytics-events';
/**
* Job name for a captured-at-edge event awaiting enrichment + canonical write.
* Consumed by the black-side ingest-writer.
*/
export const INGEST_EVENT_JOB = 'ingest-event';
/**
* Job name for an already-written `raw_events` row awaiting aggregation.
* Emitted by the ingest-writer, consumed by the existing aggregation processor.
*/
export const PROCESS_EVENT_JOB = 'process-event';
/** Which collector endpoint produced the envelope — selects the writer's branch. */
export type IngestKind =
| 'view'
| 'engagement'
| 'interaction'
| 'event'
| 'batch'
| 'conversion'
| 'funnel'
| 'registration-funnel';
/**
* First-touch attribution inputs (UTM + referrer), carried verbatim from the
* edge for views. Resolved into a traffic source on the black-side writer.
*/
export interface AttributionInputData {
readonly utmSource?: string;
readonly utmMedium?: string;
readonly utmCampaign?: string;
readonly utmContent?: string;
readonly utmTerm?: string;
readonly referrer?: string;
}
/**
* A single event, fully normalized at the edge into the generic shape the
* canonical writer persists. All per-endpoint mapping (engagement eventType,
* funnel eventType, batch expansion, etc.) happens on the collector because
* it is pure; only DB-bound enrichment (device, identity, fingerprint) is
* deferred to the writer.
*/
export interface NormalizedEvent {
/** Canonical event type, e.g. 'pageview' | 'conversion' | 'engagement_like'. */
readonly eventType: string;
readonly sessionId: string;
readonly userId?: string | null;
readonly pageUrl?: string | null;
readonly referrer?: string | null;
readonly metadata?: Record<string, unknown> | null;
/** Client-supplied event time (ms since epoch). Writer falls back to `receivedAt`. */
readonly timestamp?: number | null;
/**
* Whether this event runs the full view pipeline on the writer: device
* enrichment, session fingerprint upsert, and cross-domain visitor identity.
* Mirrors the legacy split where only page views were enriched.
*/
readonly isView: boolean;
/** Client device signals (views only). */
readonly clientDevice?: Record<string, unknown> | null;
/** Attribution inputs (views only). */
readonly attribution?: AttributionInputData | null;
}
/** Edge-captured request context the black-side writer needs for enrichment. */
export interface EdgeContext {
/** Client IP extracted at the edge (x-real-ip / x-forwarded-for / socket). */
readonly ip?: string;
/** User-Agent header. */
readonly ua?: string;
/** Accept-Language header. */
readonly lang?: string;
/**
* Headers the writer needs for domain resolution (origin / referer / host).
* Lowercased keys; only the subset required downstream is carried.
*/
readonly headers: Record<string, string | undefined>;
}
/**
* A single captured analytics event, durable in redis from the moment the
* collector enqueues it.
*
* `eventId` is minted at the edge and used as BOTH the BullMQ `jobId` AND the
* `raw_events` primary key, so re-drains and duplicates are idempotent
* (`INSERT ... ON CONFLICT (id) DO NOTHING`).
*
* `receivedAt` is the EDGE clock (ISO-8601). The writer MUST persist this value
* as the row's `receivedAt` never `now()` or an outage backdates every
* spooled event to drain time and smears time-bucketed aggregates.
*/
export interface RawEventEnvelope {
readonly eventId: string;
readonly kind: IngestKind;
readonly receivedAt: string;
readonly edge: EdgeContext;
/** The normalized event the writer persists. */
readonly payload: NormalizedEvent;
}
/** BullMQ job payload emitted by the ingest-writer for the aggregation stage. */
export interface ProcessEventJob {
readonly eventId: string;
readonly eventType: string;
readonly sessionId: string;
}

61
scripts/README.md Normal file
View file

@ -0,0 +1,61 @@
# scripts/
Operational scripts for deploying the analytics stack. `deploy.sh` is
**deployment-specific** — it targets the Lilith production hosts — and intentionally lives
outside the generic product docs in [`../docs/`](../docs/).
## `deploy.sh` — build images + ship to vps-0
Builds each service's Docker image, ships it to the production VPS (`quinn-vps` / vps-0),
and brings the stack up. The VPS has only 4 GB RAM, so building **on** it OOM-kills nginx
(incident 2026-05-15) — images are always built elsewhere and shipped in.
Invoked directly or via the repo wrapper: `./run deploy``scripts/deploy.sh`.
### Build host (`BUILD_HOST`)
vps-0 is **amd64**; the dev laptop (plum) is **arm64**. A native arm64 image loads on the
VPS but crashes with `exec format error`, so every build targets `linux/amd64`. The old
x86 build host (apricot) is decommissioned. Choose where the build runs with `BUILD_HOST`:
| `BUILD_HOST` | Behaviour | When |
| ------------------- | ---------------------------------------------------------------------------------------------------------------------- | --------------- |
| `black` *(default)* | Native amd64 build on the LAN host **black**; context rsync'd over, images streamed black → vps-0 via your machine. **Fastest.** | Normal deploys |
| `local` | Cross-build amd64 on this host under QEMU emulation (`DOCKER_DEFAULT_PLATFORM=linux/amd64`). Slower; needs Docker Desktop running. | black is down |
| `quinn-vps` | Build on the VPS itself (`up -d --build`). **OOM risk** — gated behind a warning + abort window. | Last resort |
### Usage
```bash
./scripts/deploy.sh # all services, build on black (default)
./scripts/deploy.sh processor api # subset only (faster iteration)
BUILD_HOST=local ./scripts/deploy.sh # emulated amd64 build on this host
TARGET_PLATFORM=linux/arm64 ./scripts/deploy.sh # only if the VPS arch ever changes
```
Valid services: `collector processor api website-bff realtime`.
### What gets shipped
The Dockerfiles consume only the build context — `dist/` (compiled TS), `.vendor-lilith/`
(staged `@lilith/*` registry deps), and `package.json`. **Public npm deps are installed
*inside* the image build**, so Verdaccio is not needed at build time. `bun run
build:services` and the `@lilith` vendor-staging always run locally before the image build,
regardless of `BUILD_HOST`.
### Prerequisites
- **SSH** from this host to both the build host and `quinn-vps`. The image transfer routes
`build-host → this host → vps-0`, so **no build-host ↔ vps trust is required**.
- **Docker** running on the chosen build host (`BUILD_HOST=local` needs Docker Desktop up;
the script preflights this and fails fast with a clear message).
- **`zstd`** on the build host and vps-0 (compressed image streaming).
### Pipeline
1. `bun run build:services` — TS → `dist/` (local)
2. stage `@lilith/*` deps into each `services/<svc>/.vendor-lilith/` (local)
3. `docker compose build` on `BUILD_HOST``infrastructure-<svc>:latest` (amd64)
4. `docker save | zstd | ssh` — stream images to vps-0 and `docker load`
5. rsync compose + `init.sql`; `docker compose up -d` (`--no-build`, or `--build` for `quinn-vps`)
6. health smoke (`/health` on collector :4001, api :4003, website-bff :4005)

View file

@ -2,22 +2,33 @@
# =============================================================================
# @analytics — Deploy to vps-0
# =============================================================================
# Build images on apricot, ship via docker save | ssh | docker load, then
# `docker compose up -d --no-build` on the VPS.
# Build images on a chosen BUILD_HOST, ship via docker save | ssh | docker load,
# then `docker compose up -d --no-build` on the VPS.
#
# Why: vps-0 has 4 GB RAM. Running `docker compose --build` there OOM-kills
# nginx (incident 2026-05-15). Apricot has the headroom and the source.
# Why not build on the VPS: vps-0 has 4 GB RAM. `docker compose --build` there
# OOM-kills nginx (incident 2026-05-15). Build elsewhere, ship the images.
#
# Build host (apricot, the old x86 builder, is decommissioned) — BUILD_HOST env:
# black (default) → LAN amd64 host, builds NATIVELY (fast); context rsync'd over,
# images streamed black → VPS via this host.
# local → this host (plum, arm64); cross-builds amd64 under emulation
# (DOCKER_DEFAULT_PLATFORM=linux/amd64). Slower fallback.
# quinn-vps → last resort: builds on the 4 GB target itself (OOM risk).
# All paths target linux/amd64 — a native arm64 image crashes on the VPS with
# "exec format error". Override the arch via TARGET_PLATFORM= if the VPS changes.
#
# Strategy:
# 1. bun run build:services (TS → dist on apricot)
# 2. .vendor-lilith/ staging (registry @lilith/* deps, VPS can't reach Verdaccio)
# 3. docker compose build (apricot — produces infrastructure-<svc>:latest)
# 4. docker save | zstd | ssh (stream images to VPS, decompress, load)
# 1. bun run build:services (TS → dist, locally)
# 2. .vendor-lilith/ staging (registry @lilith/* deps, baked into the image)
# 3. docker compose build (on BUILD_HOST → infrastructure-<svc>:latest)
# 4. docker save | zstd | ssh (stream images to the VPS, decompress, load)
# 5. rsync compose + init.sql (in case schema/compose changed)
# 6. docker compose up -d --no-build (VPS — uses already-loaded images)
# 6. docker compose up -d (VPS — --no-build, or --build for build-on-target)
# 7. Smoke health endpoints
#
# Usage: ./scripts/deploy.sh [svc1 svc2 ...]
# BUILD_HOST=local ./scripts/deploy.sh # emulated build on this host
# BUILD_HOST=quinn-vps ./scripts/deploy.sh # last-resort build-on-target
# No args: deploy all build-using services.
# With args: deploy only the named services (faster iteration).
# =============================================================================
@ -30,6 +41,48 @@ REMOTE_DIR="~/analytics"
COMPOSE_REL="infrastructure/docker-compose.prod.yaml"
PROJECT="infrastructure" # docker compose project name (= dir name)
# ── Build host ──────────────────────────────────────────────────────────────
# vps-0 is amd64; the local dev host (plum) is arm64, so we always target
# linux/amd64 (native arm64 images → "exec format error" on the VPS). apricot,
# the old x86 builder, is decommissioned. Preference order:
# 1. black — LAN amd64 host, builds NATIVELY (fast). DEFAULT. BUILD_HOST=black
# 2. local — this host, cross-builds amd64 under emulation. BUILD_HOST=local
# 3. the VPS — last resort: builds on the 4 GB target (OOM risk). BUILD_HOST=quinn-vps
BUILD_HOST="${BUILD_HOST:-black}"
TARGET_PLATFORM="${TARGET_PLATFORM:-linux/amd64}"
export DOCKER_DEFAULT_PLATFORM="$TARGET_PLATFORM"
REMOTE_BUILD_DIR="~/analytics-build"
# Dummy build-time vars so `compose build` interpolation doesn't warn about
# runtime-only values. Word-split intentionally at the call sites.
BUILD_VARS="POSTGRES_USER=build POSTGRES_PASSWORD=build POSTGRES_DB=build REDIS_PASSWORD=build CORS_ORIGINS=build COLLECTOR_WRITE_KEY=build API_KEYS=build ADMIN_URL=http://build"
case "$BUILD_HOST" in
local|"$(hostname -s)"|"$(hostname)") BUILD_MODE=local ;;
"$REMOTE"|vps-0|vps0) BUILD_MODE=target ;;
*) BUILD_MODE=remote ;;
esac
# Preflight: the chosen build host needs a reachable Docker daemon.
case "$BUILD_MODE" in
local)
if ! docker info >/dev/null 2>&1; then
echo "ERROR: Docker daemon not reachable on $(hostname -s) (BUILD_HOST=local)." >&2
echo " Start Docker Desktop, or use the default BUILD_HOST=black (native amd64)." >&2
exit 1
fi ;;
remote)
if ! ssh -o ConnectTimeout=8 -o ControlPath=none "$BUILD_HOST" 'docker info >/dev/null 2>&1'; then
echo "ERROR: Docker not reachable on build host '${BUILD_HOST}'." >&2
echo " Fall back with BUILD_HOST=local (emulated amd64) if ${BUILD_HOST} is down." >&2
exit 1
fi ;;
target)
echo "WARN: BUILD_HOST=${BUILD_HOST} builds on the VPS itself — 4 GB RAM, OOM-killed nginx 2026-05-15." >&2
echo " Documented last resort. Ctrl-C to abort; continuing in 5s..." >&2
sleep 5 ;;
esac
ALL_SERVICES=(collector processor api website-bff realtime)
if [[ $# -gt 0 ]]; then
SERVICES=("$@")
@ -107,47 +160,71 @@ for svc in "${SERVICES[@]}"; do
done
# ---------------------------------------------------------------------------
# [3/6] Build images on apricot (NOT on the VPS — OOM risk)
# [3/6] Build images + [4/6] ship to the VPS (path depends on BUILD_MODE)
# ---------------------------------------------------------------------------
echo "==> [3/6] Building Docker images on apricot..."
# Use a throwaway env file so compose doesn't warn about runtime-only vars.
TMP_ENV="$(mktemp)"
trap 'rm -f "$TMP_ENV"' EXIT
{
echo "POSTGRES_USER=build"
echo "POSTGRES_PASSWORD=build"
echo "POSTGRES_DB=build"
echo "REDIS_PASSWORD=build"
echo "CORS_ORIGINS=build"
echo "COLLECTOR_WRITE_KEY=build"
echo "API_KEYS=build"
echo "ADMIN_URL=http://build"
} > "$TMP_ENV"
cd "$ROOT_DIR"
docker compose -f "$COMPOSE_REL" --env-file "$TMP_ENV" -p "$PROJECT" build "${SERVICES[@]}"
# rsync filter: only the build context the Dockerfiles consume (dist + vendored
# @lilith deps + Dockerfile + package.json) — never node_modules or sources.
sync_context() { # $1 = destination "host:dir"
local dest="$1"
rsync -az "$ROOT_DIR/infrastructure/docker-compose.prod.yaml" "$ROOT_DIR/infrastructure/init.sql" \
"${dest}/infrastructure/"
for svc in "${SERVICES[@]}"; do
rsync -az --delete \
--include='dist/***' --include='.vendor-lilith/***' \
--include='Dockerfile' --include='package.json' --exclude='*' \
"$ROOT_DIR/services/${svc}/" "${dest}/services/${svc}/"
done
}
if [[ "$BUILD_MODE" == "remote" ]]; then
echo "==> [3/6] Building on ${BUILD_HOST} (native ${TARGET_PLATFORM})..."
ssh -o ControlPath=none "$BUILD_HOST" "mkdir -p ${REMOTE_BUILD_DIR}/infrastructure $(printf "${REMOTE_BUILD_DIR}/services/%s " "${SERVICES[@]}")"
sync_context "${BUILD_HOST}:${REMOTE_BUILD_DIR}"
# shellcheck disable=SC2086 # BUILD_VARS / SERVICES intentionally word-split into the remote command
ssh -o ControlPath=none "$BUILD_HOST" \
"cd ${REMOTE_BUILD_DIR} && env ${BUILD_VARS} docker compose -f ${COMPOSE_REL} -p ${PROJECT} build ${SERVICES[*]}"
echo "==> [4/6] Streaming images ${BUILD_HOST}${REMOTE} (via $(hostname -s))..."
for svc in "${SERVICES[@]}"; do
image="${PROJECT}-${svc}:latest"
echo " -> ${image}"
ssh -o ControlPath=none "$BUILD_HOST" "docker save ${image} | zstd -T0 -q" \
| ssh -o ControlPath=none "$REMOTE" "zstd -d -q | docker load"
done
elif [[ "$BUILD_MODE" == "local" ]]; then
echo "==> [3/6] Building locally ($(uname -m)${TARGET_PLATFORM}; emulated if arm64)..."
cd "$ROOT_DIR"
# shellcheck disable=SC2086 # BUILD_VARS intentionally word-split
env ${BUILD_VARS} docker compose -f "$COMPOSE_REL" -p "$PROJECT" build "${SERVICES[@]}"
echo "==> [4/6] Shipping images to ${REMOTE}..."
for svc in "${SERVICES[@]}"; do
image="${PROJECT}-${svc}:latest"
size="$(docker image inspect "$image" --format '{{.Size}}' 2>/dev/null | numfmt --to=iec)"
echo " -> ${image} (${size:-?})"
docker save "$image" | zstd -T0 -q | ssh -o ControlPath=none "$REMOTE" "zstd -d -q | docker load"
done
else # target — last resort: ship context, image builds on the VPS in [5]
echo "==> [3/6] Shipping build context to ${REMOTE} (build-on-target)..."
ssh -o ControlPath=none "$REMOTE" "mkdir -p ${REMOTE_DIR}/infrastructure $(printf "${REMOTE_DIR}/services/%s " "${SERVICES[@]}")"
sync_context "${REMOTE}:${REMOTE_DIR}"
echo "==> [4/6] (skipped — images build on the target during bring-up)"
fi
# ---------------------------------------------------------------------------
# [4/6] Ship images to vps-0 (compressed save → stream → load)
# ---------------------------------------------------------------------------
echo "==> [4/6] Shipping images to ${REMOTE}..."
for svc in "${SERVICES[@]}"; do
image="${PROJECT}-${svc}:latest"
size="$(docker image inspect "$image" --format '{{.Size}}' 2>/dev/null | numfmt --to=iec)"
echo " -> ${image} (${size:-?})"
docker save "$image" \
| zstd -T0 -q \
| ssh -o ControlPath=none "$REMOTE" "zstd -d -q | docker load"
done
# ---------------------------------------------------------------------------
# [5/6] Sync compose + init.sql; bring up stack with --no-build
# [5/6] Sync compose + init.sql; bring up stack
# local/remote builds → images already loaded on the VPS → --no-build
# target (last resort) → no pre-loaded images → --build on the VPS
# ---------------------------------------------------------------------------
echo "==> [5/6] Syncing compose config + bringing up stack..."
rsync -avz \
"$ROOT_DIR/infrastructure/docker-compose.prod.yaml" \
"$ROOT_DIR/infrastructure/init.sql" \
"$REMOTE:$REMOTE_DIR/infrastructure/"
ssh -o ControlPath=none "$REMOTE" "cd $REMOTE_DIR && docker compose -f infrastructure/docker-compose.prod.yaml --env-file infrastructure/.env.prod -p $PROJECT up -d --no-build --remove-orphans"
if [[ "$BUILD_MODE" == "target" ]]; then BUILD_FLAG="--build"; else BUILD_FLAG="--no-build"; fi
ssh -o ControlPath=none "$REMOTE" "cd $REMOTE_DIR && docker compose -f infrastructure/docker-compose.prod.yaml --env-file infrastructure/.env.prod -p $PROJECT up -d ${BUILD_FLAG} --remove-orphans"
# ---------------------------------------------------------------------------
# [6/6] Health smoke

View file

@ -18,24 +18,18 @@
},
"dependencies": {
"@lilith/analytics": "workspace:*",
"@lilith/gov-detection": "^1.0.3",
"@nestjs/bullmq": "^11.0.0",
"@nestjs/common": "^11.0.0",
"@nestjs/config": "^4.0.0",
"@nestjs/core": "^11.0.0",
"@nestjs/platform-express": "^11.0.0",
"@nestjs/swagger": "^11.0.0",
"@nestjs/terminus": "^11.0.0",
"@nestjs/throttler": "^6.0.0",
"@nestjs/typeorm": "^11.0.0",
"bullmq": "^5.0.0",
"class-transformer": "^0.5.0",
"class-validator": "^0.14.0",
"geoip-lite": "^2.0.1",
"pg": "^8.11.0",
"ioredis": "^5.9.1",
"reflect-metadata": "^0.2.0",
"rxjs": "^7.8.0",
"typeorm": "^0.3.0"
"rxjs": "^7.8.0"
},
"devDependencies": {
"@lilith/configs": "^2.2.1",
@ -45,7 +39,6 @@
"@swc/cli": "^0.7.10",
"@swc/core": "^1.15.8",
"@types/express": "^5.0.0",
"@types/geoip-lite": "^1.4.4",
"@types/node": "^20.0.0",
"typescript": "^5.4.0",
"unplugin-swc": "^1.5.1",

View file

@ -1,28 +1,25 @@
import { Module } from '@nestjs/common';
import { APP_GUARD } from '@nestjs/core';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { ConfigModule } from '@nestjs/config';
import { ThrottlerModule } from '@nestjs/throttler';
import { BullModule } from '@nestjs/bullmq';
import { TypeOrmModule } from '@nestjs/typeorm';
import { TrackingModule } from './tracking/tracking.module';
import { HealthModule } from './health/health.module';
import { BeaconModule } from './beacon/beacon.module';
import { RawEvent } from './entities/raw-event.entity';
import { SessionFingerprint } from "./entities/session-fingerprint.entity";
import { Corp } from "./entities/corp.entity";
import { Domain } from "./entities/domain.entity";
import { VisitorSalt } from "./entities/visitor-salt.entity";
import { WriteKeyGuard } from './auth/write-key.guard';
/**
* Edge collector app. No TypeORM, no BullMQ root module the collector owns no
* database and manages its own dual-redis connections inside {@link RedisRouter}
* (primary = black redis, spool = local redis). All persistence and enrichment
* live on the black-side processor/ingest-writer.
*/
@Module({
imports: [
// Configuration
ConfigModule.forRoot({
isGlobal: true,
envFilePath: ['.env.local', '.env'],
}),
// Rate limiting
ThrottlerModule.forRoot([
{
ttl: 60000, // 1 minute
@ -30,42 +27,6 @@ import { WriteKeyGuard } from './auth/write-key.guard';
},
]),
// Database
TypeOrmModule.forRootAsync({
inject: [ConfigService],
useFactory: (config: ConfigService) => ({
type: 'postgres',
host: config.get('DATABASE_HOST', 'localhost'),
port: config.get('DATABASE_PORT', 5432),
username: config.get('DATABASE_USER', 'analytics'),
password: config.get('DATABASE_PASSWORD', 'analytics'),
database: config.get('DATABASE_NAME', 'analytics'),
entities: [RawEvent, SessionFingerprint, Corp, Domain, VisitorSalt],
synchronize: config.get('DB_SYNCHRONIZE') === 'true',
logging: config.get('NODE_ENV') === 'development',
}),
}),
// Queue for async processing
BullModule.forRootAsync({
inject: [ConfigService],
useFactory: (config: ConfigService) => {
const password = config.get<string>('REDIS_PASSWORD');
return {
connection: {
host: config.get('REDIS_HOST', 'localhost'),
port: config.get('REDIS_PORT', 6379),
...(password ? { password } : {}),
},
};
},
}),
BullModule.registerQueue({
name: 'analytics-events',
}),
// Feature modules
TrackingModule,
HealthModule,
BeaconModule,

View file

@ -1,6 +0,0 @@
export { RawEvent } from './raw-event.entity';
export { SessionFingerprint } from './session-fingerprint.entity';
export { Corp } from "./corp.entity";
export { Domain } from "./domain.entity";
export type { DomainRole } from "./domain.entity";
export { VisitorSalt } from "./visitor-salt.entity";

View file

@ -1,93 +0,0 @@
import {
Entity,
PrimaryGeneratedColumn,
Column,
CreateDateColumn,
Index,
} from 'typeorm';
/**
* Raw analytics event - stores all incoming events before processing
* Generic event schema - product-specific fields go in metadata
*/
@Entity('raw_events')
@Index(['sessionId', 'timestamp'])
@Index(['eventType', 'timestamp'])
@Index(['processed', 'timestamp'])
@Index(['visitorIdDaily', 'timestamp'])
@Index(['corpId', 'timestamp'])
@Index(['domainId', 'timestamp'])
export class RawEvent {
@PrimaryGeneratedColumn('uuid')
id!: string;
/**
* Event type identifier
* Generic: 'pageview', 'click', 'scroll', 'conversion', 'custom'
* Product-specific types stored but processed by product-specific handlers
*/
@Column({ type: 'varchar', length: 100 })
@Index()
eventType!: string;
/** Session identifier (client-generated) */
@Column({ type: 'varchar', length: 64 })
@Index()
sessionId!: string;
/** Optional authenticated user ID */
@Column({ type: 'varchar', length: 64, nullable: true })
@Index()
userId?: string | null;
/** Page URL where event occurred */
@Column({ type: 'text', nullable: true })
pageUrl?: string | null;
/** Referrer URL */
@Column({ type: 'text', nullable: true })
referrer?: string | null;
/** Device type: 'desktop' | 'mobile' | 'tablet' | 'bot' */
@Column({ type: 'varchar', length: 20, nullable: true })
deviceType?: string | null;
/** Event-specific data (JSON) */
@Column({ type: 'jsonb', nullable: true })
metadata?: Record<string, unknown> | null;
/** Client-side timestamp (when event occurred) */
@Column({ type: 'timestamptz' })
@Index()
timestamp!: Date;
/** Server-side timestamp (when event received) */
@CreateDateColumn({ type: 'timestamptz' })
receivedAt!: Date;
/** Whether event has been processed by aggregation workers */
@Column({ type: 'boolean', default: false })
@Index()
processed!: boolean;
/** Processing timestamp */
@Column({ type: 'timestamptz', nullable: true })
processedAt?: Date | null;
/**
* Daily-rotating visitor identity for cross-domain stitching.
* sha256(daily_salt || ip || ua || lang). 32 bytes. Stable for the same
* visitor across all our domains within a UTC day; unrecoverable after
* salt rotation (cookie-free, GDPR-clean).
*/
@Column({ type: 'bytea', nullable: true, name: 'visitor_id_daily' })
visitorIdDaily?: Buffer | null;
/** Corp that owns the originating domain. FK → corps(id). */
@Column({ type: 'smallint', nullable: true, name: 'corp_id' })
corpId?: number | null;
/** Domain the event originated from. FK → domains(id). */
@Column({ type: 'integer', nullable: true, name: 'domain_id' })
domainId?: number | null;
}

View file

@ -1,28 +1,28 @@
import { Controller, Get, SetMetadata } from '@nestjs/common';
import { ApiTags, ApiOperation } from '@nestjs/swagger';
import {
HealthCheck,
HealthCheckService,
TypeOrmHealthIndicator,
} from '@nestjs/terminus';
import { IS_PUBLIC_KEY } from '../auth/write-key.guard';
import { RedisRouter } from '../tracking/redis-router.service';
/**
* Edge health. The collector has no database readiness means "can we accept
* an event", and we always can: a black-redis outage spools locally. So health
* is liveness plus observability into the spool (depth + whether black is
* currently reachable), which the outage drill reads.
*/
@ApiTags('Health')
@SetMetadata(IS_PUBLIC_KEY, true)
@Controller('health')
export class HealthController {
constructor(
private readonly health: HealthCheckService,
private readonly db: TypeOrmHealthIndicator,
) {}
constructor(private readonly router: RedisRouter) {}
@Get()
@HealthCheck()
@ApiOperation({ summary: 'Health check endpoint' })
check() {
return this.health.check([
() => this.db.pingCheck('database'),
]);
async check() {
return {
status: 'ok',
blackHealthy: this.router.isBlackHealthy(),
spoolDepth: await this.router.spoolDepth(),
};
}
@Get('live')
@ -32,11 +32,10 @@ export class HealthController {
}
@Get('ready')
@HealthCheck()
@ApiOperation({ summary: 'Readiness probe' })
ready() {
return this.health.check([
() => this.db.pingCheck('database'),
]);
// Always ready: the local spool guarantees we can durably accept events
// even when the canonical store is unreachable.
return { status: 'ok' };
}
}

View file

@ -1,9 +1,14 @@
import { Module } from '@nestjs/common';
import { TerminusModule } from '@nestjs/terminus';
import { HealthController } from './health.controller';
import { TrackingModule } from '../tracking/tracking.module';
/**
* Health module. Depends on TrackingModule for the {@link RedisRouter} so the
* health endpoint can report spool depth + black reachability. No DB indicator
* the edge has no database.
*/
@Module({
imports: [TerminusModule],
imports: [TrackingModule],
controllers: [HealthController],
})
export class HealthModule {}

View file

@ -1,487 +0,0 @@
import { Test } from '@nestjs/testing';
import { describe, it, expect, beforeEach } from 'vitest';
import { AttributionService } from './attribution.service';
describe('AttributionService', () => {
let service: AttributionService;
beforeEach(async () => {
const module = await Test.createTestingModule({
providers: [AttributionService],
}).compile();
service = module.get<AttributionService>(AttributionService);
});
describe('resolveTrafficSource', () => {
it('returns direct traffic when no attribution data', () => {
const result = service.resolveTrafficSource({});
expect(result.source).toBe('direct');
expect(result.rawSource).toBeNull();
expect(result.medium).toBeNull();
expect(result.campaign).toBeNull();
expect(result.referrer).toBeNull();
});
it('preserves all UTM parameters in result', () => {
const result = service.resolveTrafficSource({
utmSource: 'google',
utmMedium: 'cpc',
utmCampaign: 'winter-sale',
utmContent: 'ad-variant-a',
utmTerm: 'luxury+watches',
});
expect(result.rawSource).toBe('google');
expect(result.medium).toBe('cpc');
expect(result.campaign).toBe('winter-sale');
expect(result.content).toBe('ad-variant-a');
expect(result.term).toBe('luxury+watches');
});
});
describe('UTM parameter resolution', () => {
it('classifies paid search from utm_medium=cpc', () => {
const result = service.resolveTrafficSource({
utmSource: 'google',
utmMedium: 'cpc',
});
expect(result.source).toBe('paid');
});
it('classifies paid search from utm_medium=ppc', () => {
const result = service.resolveTrafficSource({
utmSource: 'google',
utmMedium: 'ppc',
});
expect(result.source).toBe('paid');
});
it('classifies paid search from utm_medium=paid', () => {
const result = service.resolveTrafficSource({
utmSource: 'google',
utmMedium: 'paid',
});
expect(result.source).toBe('paid');
});
it('classifies paid search from utm_medium=paidsearch', () => {
const result = service.resolveTrafficSource({
utmSource: 'google',
utmMedium: 'paidsearch',
});
expect(result.source).toBe('paid');
});
it('classifies email from utm_medium=email', () => {
const result = service.resolveTrafficSource({
utmSource: 'mailchimp',
utmMedium: 'email',
});
expect(result.source).toBe('email');
});
it('classifies email from utm_medium=newsletter', () => {
const result = service.resolveTrafficSource({
utmSource: 'campaign-monitor',
utmMedium: 'newsletter',
});
expect(result.source).toBe('email');
});
it('classifies social from utm_medium=social', () => {
const result = service.resolveTrafficSource({
utmSource: 'facebook',
utmMedium: 'social',
});
expect(result.source).toBe('social');
});
it('classifies social from utm_medium=social-media', () => {
const result = service.resolveTrafficSource({
utmSource: 'twitter',
utmMedium: 'social-media',
});
expect(result.source).toBe('social');
});
it('classifies affiliate from utm_medium=affiliate', () => {
const result = service.resolveTrafficSource({
utmSource: 'partner123',
utmMedium: 'affiliate',
});
expect(result.source).toBe('affiliate');
});
it('classifies affiliate from utm_medium=partner', () => {
const result = service.resolveTrafficSource({
utmSource: 'partner456',
utmMedium: 'partner',
});
expect(result.source).toBe('affiliate');
});
it('classifies organic from utm_medium=organic', () => {
const result = service.resolveTrafficSource({
utmSource: 'google',
utmMedium: 'organic',
});
expect(result.source).toBe('organic');
});
it('classifies referral from utm_medium=referral', () => {
const result = service.resolveTrafficSource({
utmSource: 'blog.example.com',
utmMedium: 'referral',
});
expect(result.source).toBe('referral');
});
});
describe('UTM source-based classification', () => {
it('classifies social from social platform utm_source', () => {
const result = service.resolveTrafficSource({
utmSource: 'facebook',
});
expect(result.source).toBe('social');
});
it('classifies social from twitter', () => {
const result = service.resolveTrafficSource({
utmSource: 'twitter',
});
expect(result.source).toBe('social');
});
it('classifies social from instagram', () => {
const result = service.resolveTrafficSource({
utmSource: 'instagram',
});
expect(result.source).toBe('social');
});
it('classifies social from linkedin', () => {
const result = service.resolveTrafficSource({
utmSource: 'linkedin',
});
expect(result.source).toBe('social');
});
it('classifies organic from search engine with no medium', () => {
const result = service.resolveTrafficSource({
utmSource: 'google',
});
expect(result.source).toBe('organic');
});
it('classifies paid from search engine with cpc medium', () => {
const result = service.resolveTrafficSource({
utmSource: 'google',
utmMedium: 'cpc',
});
expect(result.source).toBe('paid');
});
it('classifies referral from unknown utm_source', () => {
const result = service.resolveTrafficSource({
utmSource: 'blog.example.com',
});
expect(result.source).toBe('referral');
});
});
describe('referrer analysis', () => {
it('classifies social from facebook referrer', () => {
const result = service.resolveTrafficSource({
referrer: 'https://www.facebook.com/some-post',
});
expect(result.source).toBe('social');
expect(result.referrer).toBe('https://www.facebook.com/some-post');
});
it('classifies social from twitter referrer', () => {
const result = service.resolveTrafficSource({
referrer: 'https://t.co/abc123',
});
expect(result.source).toBe('social');
});
it('classifies social from x.com', () => {
const result = service.resolveTrafficSource({
referrer: 'https://x.com/user/status/123',
});
expect(result.source).toBe('social');
});
it('classifies social from instagram', () => {
const result = service.resolveTrafficSource({
referrer: 'https://www.instagram.com/p/abc123/',
});
expect(result.source).toBe('social');
});
it('classifies social from linkedin', () => {
const result = service.resolveTrafficSource({
referrer: 'https://www.linkedin.com/feed/',
});
expect(result.source).toBe('social');
});
it('classifies social from reddit', () => {
const result = service.resolveTrafficSource({
referrer: 'https://www.reddit.com/r/programming/',
});
expect(result.source).toBe('social');
});
it('classifies social from youtube', () => {
const result = service.resolveTrafficSource({
referrer: 'https://www.youtube.com/watch?v=abc123',
});
expect(result.source).toBe('social');
});
it('classifies social from tiktok', () => {
const result = service.resolveTrafficSource({
referrer: 'https://www.tiktok.com/@user/video/123',
});
expect(result.source).toBe('social');
});
it('classifies organic from google search', () => {
const result = service.resolveTrafficSource({
referrer: 'https://www.google.com/search?q=test',
});
expect(result.source).toBe('organic');
});
it('classifies organic from bing', () => {
const result = service.resolveTrafficSource({
referrer: 'https://www.bing.com/search?q=test',
});
expect(result.source).toBe('organic');
});
it('classifies organic from duckduckgo', () => {
const result = service.resolveTrafficSource({
referrer: 'https://duckduckgo.com/?q=test',
});
expect(result.source).toBe('organic');
});
it('classifies organic from yahoo', () => {
const result = service.resolveTrafficSource({
referrer: 'https://search.yahoo.com/search?p=test',
});
expect(result.source).toBe('organic');
});
it('classifies referral from unknown domain', () => {
const result = service.resolveTrafficSource({
referrer: 'https://blog.example.com/article',
});
expect(result.source).toBe('referral');
});
it('handles referrer without www prefix', () => {
const result = service.resolveTrafficSource({
referrer: 'https://facebook.com/page',
});
expect(result.source).toBe('social');
});
it('handles invalid referrer URL gracefully', () => {
const result = service.resolveTrafficSource({
referrer: 'not-a-valid-url',
});
expect(result.source).toBe('direct');
});
});
describe('priority resolution', () => {
it('prioritizes UTM parameters over referrer', () => {
const result = service.resolveTrafficSource({
utmSource: 'newsletter',
utmMedium: 'email',
referrer: 'https://www.facebook.com/',
});
expect(result.source).toBe('email');
});
it('uses referrer when UTM parameters are missing', () => {
const result = service.resolveTrafficSource({
referrer: 'https://www.google.com/search',
});
expect(result.source).toBe('organic');
});
it('returns direct when both UTM and referrer are missing', () => {
const result = service.resolveTrafficSource({});
expect(result.source).toBe('direct');
});
});
describe('edge cases', () => {
it('handles empty string UTM parameters', () => {
const result = service.resolveTrafficSource({
utmSource: '',
utmMedium: '',
referrer: '',
});
expect(result.source).toBe('direct');
});
it('handles case-insensitive UTM medium', () => {
const result1 = service.resolveTrafficSource({ utmMedium: 'CPC' });
const result2 = service.resolveTrafficSource({ utmMedium: 'Cpc' });
const result3 = service.resolveTrafficSource({ utmMedium: 'cpc' });
expect(result1.source).toBe('paid');
expect(result2.source).toBe('paid');
expect(result3.source).toBe('paid');
});
it('handles case-insensitive UTM source', () => {
const result1 = service.resolveTrafficSource({ utmSource: 'FACEBOOK' });
const result2 = service.resolveTrafficSource({ utmSource: 'Facebook' });
const result3 = service.resolveTrafficSource({ utmSource: 'facebook' });
expect(result1.source).toBe('social');
expect(result2.source).toBe('social');
expect(result3.source).toBe('social');
});
it('handles subdomains in referrer', () => {
const result = service.resolveTrafficSource({
referrer: 'https://subdomain.facebook.com/page',
});
expect(result.source).toBe('social');
});
it('handles country-specific Google domains', () => {
const resultUK = service.resolveTrafficSource({
referrer: 'https://www.google.co.uk/search',
});
const resultDE = service.resolveTrafficSource({
referrer: 'https://www.google.de/search',
});
expect(resultUK.source).toBe('organic');
expect(resultDE.source).toBe('organic');
});
it('handles Yandex with subdomain', () => {
const result = service.resolveTrafficSource({
referrer: 'https://yandex.ru/search',
});
expect(result.source).toBe('organic');
});
});
describe('complex scenarios', () => {
it('handles paid social campaign', () => {
const result = service.resolveTrafficSource({
utmSource: 'facebook',
utmMedium: 'cpc',
utmCampaign: 'summer-promo',
utmContent: 'ad-creative-1',
});
expect(result.source).toBe('paid');
expect(result.rawSource).toBe('facebook');
expect(result.medium).toBe('cpc');
expect(result.campaign).toBe('summer-promo');
expect(result.content).toBe('ad-creative-1');
});
it('handles organic social without UTM', () => {
const result = service.resolveTrafficSource({
referrer: 'https://twitter.com/user/status/123',
});
expect(result.source).toBe('social');
expect(result.rawSource).toBeNull();
expect(result.medium).toBeNull();
});
it('handles email campaign with full UTM', () => {
const result = service.resolveTrafficSource({
utmSource: 'mailchimp',
utmMedium: 'email',
utmCampaign: 'weekly-digest',
utmContent: 'header-cta',
});
expect(result.source).toBe('email');
expect(result.rawSource).toBe('mailchimp');
expect(result.medium).toBe('email');
expect(result.campaign).toBe('weekly-digest');
expect(result.content).toBe('header-cta');
});
it('handles affiliate partnership', () => {
const result = service.resolveTrafficSource({
utmSource: 'partner-site',
utmMedium: 'affiliate',
utmCampaign: 'q1-partnership',
});
expect(result.source).toBe('affiliate');
expect(result.rawSource).toBe('partner-site');
});
it('handles branded search campaign', () => {
const result = service.resolveTrafficSource({
utmSource: 'google',
utmMedium: 'cpc',
utmCampaign: 'brand-protection',
utmTerm: 'company+name',
});
expect(result.source).toBe('paid');
expect(result.term).toBe('company+name');
});
});
});

View file

@ -0,0 +1,252 @@
import { Injectable } from '@nestjs/common';
import { randomUUID } from 'crypto';
import type { Request } from 'express';
import type {
RawEventEnvelope,
NormalizedEvent,
EdgeContext,
IngestKind,
} from '@lilith/analytics';
import { RedisRouter } from './redis-router.service';
import { TrackViewDto } from '../dto/track-view.dto';
import { TrackEngagementDto } from '../dto/track-engagement.dto';
import { TrackInteractionBatchDto } from '../dto/track-interaction.dto';
import {
TrackEventDto,
TrackBatchDto,
TrackConversionDto,
TrackFunnelStepDto,
TrackRegistrationFunnelDto,
} from '../dto/track-event.dto';
export interface CaptureResult {
success: boolean;
eventId: string | null;
}
export interface BatchCaptureResult {
success: boolean;
count: number;
}
/**
* Edge capture. Validates already happened at the controller (class-validator);
* here we do the pure per-endpoint normalization that used to live in the
* collector's TrackingService/controller, mint an edge event id + receive time,
* and durably enqueue via {@link RedisRouter}. No database, no enrichment all
* of that is deferred to the black-side ingest-writer.
*/
@Injectable()
export class CaptureService {
constructor(private readonly router: RedisRouter) {}
async trackView(dto: TrackViewDto, request: Request): Promise<CaptureResult> {
const pageUrl = dto.pageUrl ?? dto.contentId ?? '';
return this.one('view', request, {
eventType: 'pageview',
sessionId: dto.sessionId,
userId: dto.userId ?? null,
pageUrl,
referrer: dto.referrer ?? null,
isView: true,
clientDevice: (dto.clientDevice as Record<string, unknown> | undefined) ?? null,
attribution: dto.attribution
? {
utmSource: dto.attribution.utmSource,
utmMedium: dto.attribution.utmMedium,
utmCampaign: dto.attribution.utmCampaign,
utmContent: dto.attribution.utmContent,
utmTerm: dto.attribution.utmTerm,
referrer: dto.attribution.referrer ?? dto.referrer,
}
: { referrer: dto.referrer },
metadata: {
...dto.metadata,
...(dto.contentType ? { contentType: dto.contentType } : {}),
...(dto.app ? { app: dto.app } : {}),
...(dto.duration !== undefined ? { duration: dto.duration } : {}),
},
});
}
async trackEngagement(dto: TrackEngagementDto, request: Request): Promise<CaptureResult> {
return this.one('engagement', request, {
eventType: `engagement_${dto.metricType}`,
sessionId: dto.userId, // engagement events are user-scoped
userId: dto.userId,
isView: false,
metadata: {
metricType: dto.metricType,
targetId: dto.targetId,
targetType: dto.targetType,
...dto.metadata,
},
});
}
async trackInteraction(dto: TrackInteractionBatchDto, request: Request): Promise<BatchCaptureResult> {
return this.many(
'interaction',
request,
dto.events.map((event) => ({
eventType: event.type,
sessionId: event.sessionId,
userId: event.userId ?? null,
metadata: event.data,
timestamp: event.timestamp ?? null,
isView: false,
})),
);
}
async trackEvent(dto: TrackEventDto, request: Request): Promise<CaptureResult> {
return this.one('event', request, {
eventType: dto.eventType,
sessionId: dto.sessionId,
userId: dto.userId ?? null,
pageUrl: dto.pageUrl ?? null,
metadata: dto.metadata ?? null,
timestamp: dto.timestamp ?? null,
isView: false,
});
}
async trackBatch(dto: TrackBatchDto, request: Request): Promise<BatchCaptureResult> {
return this.many(
'batch',
request,
dto.events.map((e) => ({
eventType: e.eventType,
sessionId: e.sessionId,
userId: e.userId ?? null,
pageUrl: e.pageUrl ?? null,
metadata: e.metadata ?? null,
timestamp: e.timestamp ?? null,
isView: false,
})),
);
}
async trackConversion(dto: TrackConversionDto, request: Request): Promise<CaptureResult> {
return this.one('conversion', request, {
eventType: 'conversion',
sessionId: dto.sessionId,
userId: dto.userId ?? null,
isView: false,
metadata: {
conversionType: dto.conversionType,
value: dto.value,
currency: dto.currency,
...dto.metadata,
},
});
}
async trackFunnelStep(dto: TrackFunnelStepDto, request: Request): Promise<CaptureResult> {
return this.one('funnel', request, {
eventType: 'funnel_step',
sessionId: dto.sessionId,
userId: dto.userId ?? null,
isView: false,
metadata: {
funnelId: dto.funnelId,
stepId: dto.stepId,
stepOrder: dto.stepOrder,
...dto.metadata,
},
});
}
async trackRegistrationFunnel(
dto: TrackRegistrationFunnelDto,
request: Request,
): Promise<CaptureResult> {
const parsed = Date.parse(dto.timestamp);
return this.one('registration-funnel', request, {
eventType: `registration_funnel_${dto.event}`,
sessionId: dto.sessionId,
userId: dto.userId ?? null,
pageUrl: dto.route ?? null,
timestamp: Number.isNaN(parsed) ? null : parsed,
isView: false,
metadata: {
audience: dto.audience,
userType: dto.userType,
registrationPath: dto.registrationPath,
registrationType: dto.registrationType,
entryReferrer: dto.entryReferrer,
referrer: dto.referrer,
},
});
}
// ── internals ──────────────────────────────────────────────────────────────
private async one(
kind: IngestKind,
request: Request,
event: NormalizedEvent,
): Promise<CaptureResult> {
const eventId = randomUUID();
await this.router.enqueue(this.envelope(eventId, kind, request, event));
return { success: true, eventId };
}
private async many(
kind: IngestKind,
request: Request,
events: NormalizedEvent[],
): Promise<BatchCaptureResult> {
const edge = this.buildEdge(request);
const receivedAt = new Date().toISOString();
const results = await Promise.allSettled(
events.map((event) =>
this.router.enqueue({
eventId: randomUUID(),
kind,
receivedAt,
edge,
payload: event,
}),
),
);
const count = results.filter((r) => r.status === 'fulfilled').length;
return { success: count === events.length, count };
}
private envelope(
eventId: string,
kind: IngestKind,
request: Request,
event: NormalizedEvent,
): RawEventEnvelope {
return {
eventId,
kind,
receivedAt: new Date().toISOString(),
edge: this.buildEdge(request),
payload: event,
};
}
private buildEdge(request: Request): EdgeContext {
const header = (name: string): string | undefined => {
const v = request.headers[name];
return Array.isArray(v) ? v[0] : v;
};
const forwardedFor = header('x-forwarded-for');
const ip =
forwardedFor?.split(',')[0]?.trim() ?? header('x-real-ip') ?? request.socket.remoteAddress;
return {
ip: ip ?? undefined,
ua: header('user-agent'),
lang: header('accept-language'),
headers: {
origin: header('origin'),
referer: header('referer'),
host: header('host'),
},
};
}
}

View file

@ -1,431 +0,0 @@
import { Test } from '@nestjs/testing';
import { describe, it, expect, beforeEach, vi } from 'vitest';
import { DeviceEnrichmentService } from './device-enrichment.service';
import { GovDetectionService } from './gov-detection.service';
const mockGovDetection = {
enrich: vi.fn().mockResolvedValue({
isGovernment: false,
orgType: 'NORMAL',
responseTier: 'ALLOW',
proxyType: 'NONE',
org: null,
asn: null,
}),
onModuleInit: vi.fn(),
};
describe('DeviceEnrichmentService', () => {
let service: DeviceEnrichmentService;
beforeEach(async () => {
const module = await Test.createTestingModule({
providers: [
DeviceEnrichmentService,
{ provide: GovDetectionService, useValue: mockGovDetection },
],
}).compile();
service = module.get<DeviceEnrichmentService>(DeviceEnrichmentService);
});
describe('enrich', () => {
it('enriches device data from User-Agent and client data', async () => {
const result = await service.enrich(
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/120.0.0.0',
'192.168.1.1',
{
screenWidth: 1920,
screenHeight: 1080,
viewportWidth: 1440,
viewportHeight: 900,
language: 'en-US',
timezone: 'America/New_York',
},
);
expect(result.deviceType).toBe('desktop');
expect(result.isBot).toBe(false);
expect(result.browser).toBe('Chrome');
expect(result.browserVersion).toBe('120.0');
expect(result.browserMajor).toBe(120);
expect(result.os).toBe('macOS');
expect(result.osVersion).toBe('10.15.7');
expect(result.screenWidth).toBe(1920);
expect(result.screenHeight).toBe(1080);
expect(result.viewportWidth).toBe(1440);
expect(result.viewportHeight).toBe(900);
expect(result.language).toBe('en-US');
expect(result.timezone).toBe('America/New_York');
expect(result.ipHash).toBeTruthy();
});
it('handles missing User-Agent gracefully', async () => {
const result = await service.enrich(undefined, undefined, {
screenWidth: 1024,
screenHeight: 768,
});
expect(result.deviceType).toBe('desktop');
expect(result.isBot).toBe(false);
expect(result.browser).toBeNull();
expect(result.browserVersion).toBeNull();
expect(result.os).toBeNull();
expect(result.osVersion).toBeNull();
expect(result.ipHash).toBeNull();
});
it('hashes IP address for privacy', async () => {
const result1 = await service.enrich(undefined, '192.168.1.1');
const result2 = await service.enrich(undefined, '192.168.1.2');
const result3 = await service.enrich(undefined, '192.168.1.1');
expect(result1.ipHash).toBeTruthy();
expect(result2.ipHash).toBeTruthy();
expect(result1.ipHash).not.toBe(result2.ipHash);
expect(result1.ipHash).toBe(result3.ipHash); // Same IP on same day = same hash
});
});
describe('parseUserAgent', () => {
it('parses Chrome User-Agent correctly', async () => {
const result = await service.enrich(
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36',
);
expect(result.browser).toBe('Chrome');
expect(result.browserVersion).toBe('120.0');
expect(result.browserMajor).toBe(120);
expect(result.os).toBe('Windows');
expect(result.osVersion).toBe('10/11');
});
it('parses Firefox User-Agent correctly', async () => {
const result = await service.enrich(
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:122.0) Gecko/20100101 Firefox/122.0',
);
expect(result.browser).toBe('Firefox');
expect(result.browserVersion).toBe('122.0');
expect(result.browserMajor).toBe(122);
expect(result.os).toBe('Windows');
});
it('parses Safari User-Agent correctly', async () => {
const result = await service.enrich(
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15',
);
expect(result.browser).toBe('Safari');
expect(result.browserVersion).toBe('17.2');
expect(result.browserMajor).toBe(17);
expect(result.os).toBe('macOS');
expect(result.osVersion).toBe('10.15.7');
});
it('parses Edge User-Agent correctly', async () => {
const result = await service.enrich(
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Edge/120.0.0.0',
);
expect(result.browser).toBe('Edge');
expect(result.browserVersion).toBe('120.0');
expect(result.browserMajor).toBe(120);
});
it('parses mobile Safari User-Agent correctly', async () => {
const result = await service.enrich(
'Mozilla/5.0 (iPhone; CPU iPhone OS 17_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Mobile/15E148 Safari/604.1',
);
expect(result.browser).toBe('Safari');
expect(result.os).toBe('iOS');
expect(result.osVersion).toBe('17.2');
expect(result.deviceType).toBe('mobile');
});
it('parses Android Chrome User-Agent correctly', async () => {
const result = await service.enrich(
'Mozilla/5.0 (Linux; Android 14; Pixel 7) AppleWebKit/537.36 Chrome/120.0.0.0 Mobile Safari/537.36',
);
expect(result.browser).toBe('Chrome');
expect(result.os).toBe('Android');
expect(result.osVersion).toBe('14');
expect(result.deviceType).toBe('mobile');
});
it('handles unknown User-Agent', async () => {
const result = await service.enrich(
'UnknownBot/1.0',
);
expect(result.browser).toBeNull();
expect(result.browserVersion).toBeNull();
expect(result.os).toBeNull();
expect(result.osVersion).toBeNull();
});
});
describe('detectDeviceType', () => {
it('detects desktop from no touch points', async () => {
const result = await service.enrich(
'Mozilla/5.0 (Windows NT 10.0) Chrome/120.0.0.0',
undefined,
{
touchPoints: 0,
viewportWidth: 1920,
},
);
expect(result.deviceType).toBe('desktop');
});
it('detects mobile from touch points and small viewport', async () => {
const result = await service.enrich(
'Mozilla/5.0 (iPhone) Safari/604.1',
undefined,
{
touchPoints: 5,
viewportWidth: 375,
},
);
expect(result.deviceType).toBe('mobile');
});
it('detects tablet from touch points and large viewport', async () => {
const result = await service.enrich(
'Mozilla/5.0 (iPad) Safari/604.1',
undefined,
{
touchPoints: 5,
viewportWidth: 1024,
},
);
expect(result.deviceType).toBe('tablet');
});
it('detects tablet from iPad User-Agent', async () => {
const result = await service.enrich(
'Mozilla/5.0 (iPad; CPU OS 17_2 like Mac OS X) AppleWebKit/605.1.15',
);
expect(result.deviceType).toBe('tablet');
});
it('detects mobile from iPhone User-Agent', async () => {
const result = await service.enrich(
'Mozilla/5.0 (iPhone; CPU iPhone OS 17_2 like Mac OS X) Mobile/15E148',
);
expect(result.deviceType).toBe('mobile');
});
it('detects mobile from Android Mobile User-Agent', async () => {
const result = await service.enrich(
'Mozilla/5.0 (Linux; Android 14) Mobile Safari/537.36',
);
expect(result.deviceType).toBe('mobile');
});
it('detects tablet from Android without Mobile keyword', async () => {
const result = await service.enrich(
'Mozilla/5.0 (Linux; Android 14; Tablet) AppleWebKit/537.36',
);
expect(result.deviceType).toBe('tablet');
});
});
describe('detectBot', () => {
it('detects Googlebot', async () => {
const result = await service.enrich(
'Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)',
);
expect(result.isBot).toBe(true);
expect(result.deviceType).toBe('bot');
});
it('detects Bingbot', async () => {
const result = await service.enrich(
'Mozilla/5.0 (compatible; bingbot/2.0; +http://www.bing.com/bingbot.htm)',
);
expect(result.isBot).toBe(true);
expect(result.deviceType).toBe('bot');
});
it('detects generic crawlers', async () => {
const result = await service.enrich('SomeCrawler/1.0');
expect(result.isBot).toBe(true);
expect(result.deviceType).toBe('bot');
});
it('detects headless browsers', async () => {
const result = await service.enrich('HeadlessChrome/120.0.0.0');
expect(result.isBot).toBe(true);
expect(result.deviceType).toBe('bot');
});
it('detects Puppeteer', async () => {
const result = await service.enrich('Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 HeadlessChrome/120.0.0.0 Safari/537.36 Puppeteer');
expect(result.isBot).toBe(true);
expect(result.deviceType).toBe('bot');
});
it('detects Playwright', async () => {
const result = await service.enrich('Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36 Playwright');
expect(result.isBot).toBe(true);
expect(result.deviceType).toBe('bot');
});
it('detects Selenium', async () => {
const result = await service.enrich('Mozilla/5.0 (Windows NT 10.0) Chrome/120.0.0.0 Selenium');
expect(result.isBot).toBe(true);
expect(result.deviceType).toBe('bot');
});
it('detects social media bots', async () => {
const facebookBot = await service.enrich('facebookexternalhit/1.1');
const twitterBot = await service.enrich('Twitterbot/1.0');
const linkedinBot = await service.enrich('LinkedInBot/1.0');
expect(facebookBot.isBot).toBe(true);
expect(twitterBot.isBot).toBe(true);
expect(linkedinBot.isBot).toBe(true);
});
it('does not detect normal browsers as bots', async () => {
const chrome = await service.enrich('Mozilla/5.0 (Windows NT 10.0) Chrome/120.0.0.0');
const firefox = await service.enrich('Mozilla/5.0 (Windows NT 10.0) Firefox/122.0');
const safari = await service.enrich('Mozilla/5.0 (Macintosh) Safari/605.1.15');
expect(chrome.isBot).toBe(false);
expect(firefox.isBot).toBe(false);
expect(safari.isBot).toBe(false);
});
});
describe('mapWindowsVersion', () => {
it('maps Windows NT 10.0 to Windows 10/11', async () => {
const result = await service.enrich('Mozilla/5.0 (Windows NT 10.0)');
expect(result.osVersion).toBe('10/11');
});
it('maps Windows NT 6.3 to Windows 8.1', async () => {
const result = await service.enrich('Mozilla/5.0 (Windows NT 6.3)');
expect(result.osVersion).toBe('8.1');
});
it('maps Windows NT 6.2 to Windows 8', async () => {
const result = await service.enrich('Mozilla/5.0 (Windows NT 6.2)');
expect(result.osVersion).toBe('8');
});
it('maps Windows NT 6.1 to Windows 7', async () => {
const result = await service.enrich('Mozilla/5.0 (Windows NT 6.1)');
expect(result.osVersion).toBe('7');
});
it('handles unknown Windows NT version', async () => {
const result = await service.enrich('Mozilla/5.0 (Windows NT 5.0)');
expect(result.osVersion).toBe('5.0');
});
});
describe('OS detection', () => {
it('detects macOS with version', async () => {
const result = await service.enrich('Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)');
expect(result.os).toBe('macOS');
expect(result.osVersion).toBe('10.15.7');
});
it('detects Linux', async () => {
const result = await service.enrich('Mozilla/5.0 (X11; Linux x86_64)');
expect(result.os).toBe('Linux');
});
it('detects iOS with version', async () => {
const result = await service.enrich('Mozilla/5.0 (iPhone; CPU iPhone OS 17_2 like Mac OS X)');
expect(result.os).toBe('iOS');
expect(result.osVersion).toBe('17.2');
});
it('detects Android with version', async () => {
const result = await service.enrich('Mozilla/5.0 (Linux; Android 14)');
expect(result.os).toBe('Android');
expect(result.osVersion).toBe('14');
});
it('handles Android version with decimals', async () => {
const result = await service.enrich('Mozilla/5.0 (Linux; Android 13.5)');
expect(result.os).toBe('Android');
expect(result.osVersion).toBe('13.5');
});
});
describe('client device data preservation', () => {
it('preserves all client device data', async () => {
const clientData = {
screenWidth: 1920,
screenHeight: 1080,
viewportWidth: 1440,
viewportHeight: 900,
pixelRatio: 2,
colorDepth: 24,
language: 'en-US',
languages: ['en-US', 'en', 'es'],
timezone: 'America/New_York',
timezoneOffset: -300,
deviceMemory: 8,
hardwareConcurrency: 8,
touchPoints: 0,
cookiesEnabled: true,
doNotTrack: false,
};
const result = await service.enrich(
'Mozilla/5.0 (Macintosh) Chrome/120.0.0.0',
undefined,
clientData,
);
expect(result.screenWidth).toBe(1920);
expect(result.screenHeight).toBe(1080);
expect(result.viewportWidth).toBe(1440);
expect(result.viewportHeight).toBe(900);
expect(result.pixelRatio).toBe(2);
expect(result.colorDepth).toBe(24);
expect(result.language).toBe('en-US');
expect(result.languages).toEqual(['en-US', 'en', 'es']);
expect(result.timezone).toBe('America/New_York');
expect(result.timezoneOffset).toBe(-300);
expect(result.deviceMemory).toBe(8);
expect(result.hardwareConcurrency).toBe(8);
expect(result.touchPoints).toBe(0);
expect(result.cookiesEnabled).toBe(true);
expect(result.doNotTrack).toBe(false);
});
it('handles missing client device data', async () => {
const result = await service.enrich('Mozilla/5.0 (Macintosh) Chrome/120.0.0.0');
expect(result.screenWidth).toBeUndefined();
expect(result.screenHeight).toBeUndefined();
expect(result.viewportWidth).toBeUndefined();
expect(result.viewportHeight).toBeUndefined();
expect(result.pixelRatio).toBeUndefined();
expect(result.colorDepth).toBeUndefined();
});
});
});

View file

@ -1,16 +1,5 @@
export { TrackingModule } from './tracking.module';
export { TrackingService } from './tracking.service';
export { TrackingController } from './tracking.controller';
export { DeviceEnrichmentService } from './device-enrichment.service';
export { AttributionService } from './attribution.service';
export type {
ClientDeviceData,
EnrichedDeviceData,
} from './device-enrichment.service';
export type {
AttributionInput,
ResolvedAttribution,
} from './attribution.service';
export { IdentityService } from "./identity.service";
export { DomainResolverService } from "./domain-resolver.service";
export type { ResolvedDomain } from "./domain-resolver.service";
export { CaptureService } from './capture.service';
export type { CaptureResult, BatchCaptureResult } from './capture.service';
export { RedisRouter } from './redis-router.service';

View file

@ -0,0 +1,195 @@
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Queue, Worker, type ConnectionOptions } from 'bullmq';
import { Redis } from 'ioredis';
import { EVENTS_QUEUE, INGEST_EVENT_JOB, type RawEventEnvelope } from '@lilith/analytics';
/**
* Dual-redis router the durability boundary for the edge collector.
*
* Normal path: enqueue the envelope to the **primary** queue on black's redis,
* where the ingest-writer consumes it. When black is unreachable, enqueue to a
* **local spool** queue on the VPS's own redis (appendonly, durable). This is
* the "small local recording that only grows when black is unreachable."
*
* A health probe pings black's redis; on recovery it resumes a drain worker
* that forwards every spooled envelope to black. The envelope's `eventId` is
* the BullMQ `jobId` on both queues, so a forward that overlaps a direct enqueue
* is deduplicated and the black-side `ON CONFLICT (id) DO NOTHING` makes the
* whole pipeline idempotent.
*/
@Injectable()
export class RedisRouter implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(RedisRouter.name);
private primaryQueue!: Queue;
private spoolQueue!: Queue;
private drainer!: Worker;
private probe!: Redis;
private healthTimer?: ReturnType<typeof setInterval>;
/** Pessimistic until the first successful probe — never assume black is up. */
private blackHealthy = false;
private readonly probeIntervalMs: number;
constructor(private readonly config: ConfigService) {
this.probeIntervalMs = Number(this.config.get('BLACK_REDIS_PROBE_MS', 5000));
}
onModuleInit(): void {
const primaryConn = this.blackConnection();
const spoolConn = this.localConnection();
this.primaryQueue = new Queue(EVENTS_QUEUE, {
connection: { ...primaryConn, enableOfflineQueue: false },
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: 5000,
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
},
});
this.spoolQueue = new Queue(EVENTS_QUEUE, {
connection: spoolConn,
defaultJobOptions: {
removeOnComplete: true,
// Keep failed forwards around so a long outage never loses events.
removeOnFail: false,
attempts: Number.MAX_SAFE_INTEGER,
backoff: { type: 'exponential', delay: 2000 },
},
});
// Drains the local spool to black. Created paused; the probe resumes it
// only when black is reachable, so it never spins against a dead primary.
this.drainer = new Worker(
EVENTS_QUEUE,
async (job) => {
const envelope = job.data as RawEventEnvelope;
await this.primaryQueue.add(INGEST_EVENT_JOB, envelope, { jobId: envelope.eventId });
},
{ connection: spoolConn, concurrency: 5 },
);
this.drainer.on('error', (err) => this.logger.debug(`drainer error: ${err.message}`));
void this.drainer.pause();
this.probe = new Redis({
...this.redisOptions(primaryConn),
lazyConnect: false,
enableOfflineQueue: false,
maxRetriesPerRequest: 1,
connectTimeout: 3000,
});
this.probe.on('error', () => {
/* state is driven by the probe loop; swallow connection noise here */
});
this.startHealthLoop();
this.logger.log(
`RedisRouter up — primary=${this.describe(primaryConn)} spool=${this.describe(spoolConn)}`,
);
}
async onModuleDestroy(): Promise<void> {
if (this.healthTimer) clearInterval(this.healthTimer);
await Promise.allSettled([
this.drainer?.close(),
this.primaryQueue?.close(),
this.spoolQueue?.close(),
this.probe?.quit(),
]);
}
/**
* Durably enqueue one envelope. Resolves once redis (primary or spool) has
* accepted it; rejects only if BOTH are unreachable, so the caller can return
* a 5xx and the client/relay can retry rather than silently dropping.
*/
async enqueue(envelope: RawEventEnvelope): Promise<'primary' | 'spool'> {
if (this.blackHealthy) {
try {
await this.primaryQueue.add(INGEST_EVENT_JOB, envelope, { jobId: envelope.eventId });
return 'primary';
} catch (err) {
this.markUnhealthy(err);
// fall through to the local spool
}
}
await this.spoolQueue.add(INGEST_EVENT_JOB, envelope, { jobId: envelope.eventId });
return 'spool';
}
/** Current depth of the local outage spool (waiting + delayed). */
async spoolDepth(): Promise<number> {
const counts = await this.spoolQueue.getJobCounts('waiting', 'delayed', 'active', 'failed');
return (counts.waiting ?? 0) + (counts.delayed ?? 0) + (counts.active ?? 0) + (counts.failed ?? 0);
}
isBlackHealthy(): boolean {
return this.blackHealthy;
}
private startHealthLoop(): void {
const tick = async (): Promise<void> => {
try {
const pong = await this.probe.ping();
if (pong === 'PONG') await this.markHealthy();
} catch (err) {
this.markUnhealthy(err);
}
};
void tick();
this.healthTimer = setInterval(() => void tick(), this.probeIntervalMs);
}
private async markHealthy(): Promise<void> {
if (this.blackHealthy) return;
this.blackHealthy = true;
this.logger.log('Black redis reachable — resuming spool drain');
try {
this.drainer.resume();
} catch (err) {
this.logger.warn(`Failed to resume drainer: ${this.msg(err)}`);
}
}
private markUnhealthy(err: unknown): void {
if (!this.blackHealthy) return;
this.blackHealthy = false;
this.logger.warn(`Black redis unreachable — spooling locally: ${this.msg(err)}`);
void this.drainer.pause();
}
private blackConnection(): ConnectionOptions {
const password = this.config.get<string>('BLACK_REDIS_PASSWORD');
return {
host: this.config.get<string>('BLACK_REDIS_HOST', 'black'),
port: Number(this.config.get('BLACK_REDIS_PORT', 26381)),
...(password ? { password } : {}),
};
}
private localConnection(): ConnectionOptions {
const password = this.config.get<string>('REDIS_PASSWORD');
return {
host: this.config.get<string>('REDIS_HOST', 'localhost'),
port: Number(this.config.get('REDIS_PORT', 6379)),
...(password ? { password } : {}),
};
}
/** Narrow a BullMQ ConnectionOptions object to ioredis options for the probe. */
private redisOptions(conn: ConnectionOptions): Record<string, unknown> {
return conn as Record<string, unknown>;
}
private describe(conn: ConnectionOptions): string {
const c = conn as { host?: string; port?: number };
return `${c.host}:${c.port}`;
}
private msg(err: unknown): string {
return err instanceof Error ? err.message : String(err);
}
}

View file

@ -12,11 +12,17 @@ import {
TrackFunnelStepDto,
TrackRegistrationFunnelDto,
} from '../dto/track-event.dto';
import { TrackingService, TrackingResult, BatchTrackingResult } from './tracking.service';
import { CaptureService, CaptureResult, BatchCaptureResult } from './capture.service';
/**
* Event Collection Controller
* Analytics event ingestion endpoints
* Analytics event ingestion endpoints.
*
* The controller validates (class-validator) and forwards to the edge
* {@link CaptureService}, which normalizes and durably enqueues each event.
* No database or enrichment happens here those run on the black-side
* ingest-writer. A 2xx means the event was durably accepted into redis
* (black's queue, or the local spool during a black outage), not dropped.
*
* Endpoints:
* - POST /track/view - Page view with device fingerprinting
@ -24,43 +30,25 @@ import { TrackingService, TrackingResult, BatchTrackingResult } from './tracking
* - POST /track/interaction - Batched low-level interactions (click, scroll, funnel_step, resize)
* - POST /track/event - Single generic event
* - POST /track/batch - Batch of generic events
* - POST /track/conversion - Conversion event (higher priority)
* - POST /track/conversion - Conversion event
* - POST /track/funnel - Funnel step event
* - POST /track/registration-funnel - Registration funnel event (Lilith-specific)
*/
@ApiTags('Event Collection')
@Controller('track')
export class TrackingController {
constructor(private readonly trackingService: TrackingService) {}
constructor(private readonly capture: CaptureService) {}
@Post('view')
@ApiOperation({
summary: 'Track page view',
description: 'Records a page view with device fingerprinting and first-touch attribution. Accepts both collector shape (pageUrl) and analytics-client shape (contentId + contentType).',
description:
'Records a page view with device fingerprinting and first-touch attribution. Accepts both collector shape (pageUrl) and analytics-client shape (contentId + contentType).',
})
@ApiResponse({ status: 201, description: 'View tracked successfully' })
@ApiResponse({ status: 201, description: 'View accepted' })
@ApiResponse({ status: 400, description: 'Invalid request data' })
async trackView(@Body() dto: TrackViewDto, @Req() request: Request): Promise<TrackingResult> {
// Normalise: analytics-client sends contentId as the page URL
const pageUrl = dto.pageUrl ?? dto.contentId ?? '';
return this.trackingService.trackView(
{
pageUrl,
referrer: dto.referrer,
userId: dto.userId,
sessionId: dto.sessionId,
metadata: {
...dto.metadata,
...(dto.contentType ? { contentType: dto.contentType } : {}),
...(dto.app ? { app: dto.app } : {}),
...(dto.duration !== undefined ? { duration: dto.duration } : {}),
},
clientDevice: dto.clientDevice,
attribution: dto.attribution,
},
request,
);
async trackView(@Body() dto: TrackViewDto, @Req() request: Request): Promise<CaptureResult> {
return this.capture.trackView(dto, request);
}
@Post('engagement')
@ -68,20 +56,13 @@ export class TrackingController {
summary: 'Track engagement event',
description: 'Records a meaningful user interaction (like, comment, subscribe, purchase)',
})
@ApiResponse({ status: 201, description: 'Engagement tracked successfully' })
@ApiResponse({ status: 201, description: 'Engagement accepted' })
@ApiResponse({ status: 400, description: 'Invalid request data' })
async trackEngagement(@Body() dto: TrackEngagementDto): Promise<TrackingResult> {
return this.trackingService.trackEvent({
eventType: `engagement_${dto.metricType}`,
sessionId: dto.userId, // engagement events are user-scoped
userId: dto.userId,
metadata: {
metricType: dto.metricType,
targetId: dto.targetId,
targetType: dto.targetType,
...dto.metadata,
},
});
async trackEngagement(
@Body() dto: TrackEngagementDto,
@Req() request: Request,
): Promise<CaptureResult> {
return this.capture.trackEngagement(dto, request);
}
@Post('interaction')
@ -89,18 +70,13 @@ export class TrackingController {
summary: 'Track interaction events (batched)',
description: 'Records a batch of low-level interactions (clicks, scrolls, funnel steps, resizes)',
})
@ApiResponse({ status: 201, description: 'Interactions tracked successfully' })
@ApiResponse({ status: 201, description: 'Interactions accepted' })
@ApiResponse({ status: 400, description: 'Invalid request data' })
async trackInteraction(@Body() dto: TrackInteractionBatchDto): Promise<BatchTrackingResult> {
return this.trackingService.trackBatch(
dto.events.map((event) => ({
eventType: event.type,
sessionId: event.sessionId,
userId: event.userId,
metadata: event.data,
timestamp: event.timestamp,
})),
);
async trackInteraction(
@Body() dto: TrackInteractionBatchDto,
@Req() request: Request,
): Promise<BatchCaptureResult> {
return this.capture.trackInteraction(dto, request);
}
@Post('event')
@ -108,17 +84,10 @@ export class TrackingController {
summary: 'Track single event',
description: 'Records a generic analytics event',
})
@ApiResponse({ status: 201, description: 'Event tracked successfully' })
@ApiResponse({ status: 201, description: 'Event accepted' })
@ApiResponse({ status: 400, description: 'Invalid request data' })
async trackEvent(@Body() dto: TrackEventDto): Promise<TrackingResult> {
return this.trackingService.trackEvent({
eventType: dto.eventType,
sessionId: dto.sessionId,
userId: dto.userId,
pageUrl: dto.pageUrl,
metadata: dto.metadata,
timestamp: dto.timestamp,
});
async trackEvent(@Body() dto: TrackEventDto, @Req() request: Request): Promise<CaptureResult> {
return this.capture.trackEvent(dto, request);
}
@Post('batch')
@ -126,19 +95,10 @@ export class TrackingController {
summary: 'Track batch of events',
description: 'Records multiple events in a single request (for client-side batching)',
})
@ApiResponse({ status: 201, description: 'Events tracked successfully' })
@ApiResponse({ status: 201, description: 'Events accepted' })
@ApiResponse({ status: 400, description: 'Invalid request data' })
async trackBatch(@Body() dto: TrackBatchDto): Promise<BatchTrackingResult> {
return this.trackingService.trackBatch(
dto.events.map((e) => ({
eventType: e.eventType,
sessionId: e.sessionId,
userId: e.userId,
pageUrl: e.pageUrl,
metadata: e.metadata,
timestamp: e.timestamp,
})),
);
async trackBatch(@Body() dto: TrackBatchDto, @Req() request: Request): Promise<BatchCaptureResult> {
return this.capture.trackBatch(dto, request);
}
@Post('conversion')
@ -146,17 +106,13 @@ export class TrackingController {
summary: 'Track conversion',
description: 'Records a conversion event with optional revenue value',
})
@ApiResponse({ status: 201, description: 'Conversion tracked successfully' })
@ApiResponse({ status: 201, description: 'Conversion accepted' })
@ApiResponse({ status: 400, description: 'Invalid request data' })
async trackConversion(@Body() dto: TrackConversionDto): Promise<TrackingResult> {
return this.trackingService.trackConversion({
conversionType: dto.conversionType,
sessionId: dto.sessionId,
userId: dto.userId,
value: dto.value,
currency: dto.currency,
metadata: dto.metadata,
});
async trackConversion(
@Body() dto: TrackConversionDto,
@Req() request: Request,
): Promise<CaptureResult> {
return this.capture.trackConversion(dto, request);
}
@Post('funnel')
@ -164,20 +120,13 @@ export class TrackingController {
summary: 'Track funnel step',
description: 'Records a step in a conversion funnel',
})
@ApiResponse({ status: 201, description: 'Funnel step tracked successfully' })
@ApiResponse({ status: 201, description: 'Funnel step accepted' })
@ApiResponse({ status: 400, description: 'Invalid request data' })
async trackFunnelStep(@Body() dto: TrackFunnelStepDto): Promise<TrackingResult> {
return this.trackingService.trackEvent({
eventType: 'funnel_step',
sessionId: dto.sessionId,
userId: dto.userId,
metadata: {
funnelId: dto.funnelId,
stepId: dto.stepId,
stepOrder: dto.stepOrder,
...dto.metadata,
},
});
async trackFunnelStep(
@Body() dto: TrackFunnelStepDto,
@Req() request: Request,
): Promise<CaptureResult> {
return this.capture.trackFunnelStep(dto, request);
}
@Post('registration-funnel')
@ -185,23 +134,12 @@ export class TrackingController {
summary: 'Track registration funnel event',
description: 'Records a registration funnel event (visit, signup, profile, conversion)',
})
@ApiResponse({ status: 201, description: 'Registration funnel event tracked' })
@ApiResponse({ status: 201, description: 'Registration funnel event accepted' })
@ApiResponse({ status: 400, description: 'Invalid request data' })
async trackRegistrationFunnel(@Body() dto: TrackRegistrationFunnelDto): Promise<TrackingResult> {
return this.trackingService.trackEvent({
eventType: `registration_funnel_${dto.event}`,
sessionId: dto.sessionId,
userId: dto.userId,
pageUrl: dto.route,
timestamp: Date.parse(dto.timestamp),
metadata: {
audience: dto.audience,
userType: dto.userType,
registrationPath: dto.registrationPath,
registrationType: dto.registrationType,
entryReferrer: dto.entryReferrer,
referrer: dto.referrer,
},
});
async trackRegistrationFunnel(
@Body() dto: TrackRegistrationFunnelDto,
@Req() request: Request,
): Promise<CaptureResult> {
return this.capture.trackRegistrationFunnel(dto, request);
}
}

View file

@ -1,35 +1,16 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { BullModule } from '@nestjs/bullmq';
import { RawEvent } from '../entities/raw-event.entity';
import { SessionFingerprint } from '../entities/session-fingerprint.entity';
import { Corp } from '../entities/corp.entity';
import { Domain } from '../entities/domain.entity';
import { VisitorSalt } from '../entities/visitor-salt.entity';
import { TrackingService } from './tracking.service';
import { CaptureService } from './capture.service';
import { RedisRouter } from './redis-router.service';
import { TrackingController } from './tracking.controller';
import { DeviceEnrichmentService } from './device-enrichment.service';
import { AttributionService } from './attribution.service';
import { GovDetectionService } from './gov-detection.service';
import { IdentityService } from './identity.service';
import { DomainResolverService } from './domain-resolver.service';
/**
* Edge tracking module. No database, no enrichment the collector captures and
* durably enqueues via {@link RedisRouter} (primary = black redis, spool = local
* redis). All DB work happens on the black-side ingest-writer.
*/
@Module({
imports: [
TypeOrmModule.forFeature([RawEvent, SessionFingerprint, Corp, Domain, VisitorSalt]),
BullModule.registerQueue({
name: 'analytics-events',
}),
],
controllers: [TrackingController],
providers: [
TrackingService,
DeviceEnrichmentService,
AttributionService,
GovDetectionService,
IdentityService,
DomainResolverService,
],
exports: [TrackingService, IdentityService, DomainResolverService],
providers: [RedisRouter, CaptureService],
exports: [RedisRouter],
})
export class TrackingModule {}

View file

@ -1,551 +0,0 @@
import { Test } from '@nestjs/testing';
import { getRepositoryToken } from '@nestjs/typeorm';
import { getQueueToken } from '@nestjs/bullmq';
import { describe, it, expect, beforeEach, vi } from 'vitest';
import type { Request } from 'express';
import { TrackingService } from './tracking.service';
import { DeviceEnrichmentService } from './device-enrichment.service';
import { AttributionService } from './attribution.service';
import { RawEvent } from '../entities/raw-event.entity';
import { SessionFingerprint } from '../entities/session-fingerprint.entity';
import { createMockRepository, createMockQueue, type MockRepository, type MockQueue } from '@test/mocks';
describe('TrackingService', () => {
let service: TrackingService;
let deviceEnrichmentService: DeviceEnrichmentService;
let attributionService: AttributionService;
let rawEventRepository: MockRepository<RawEvent>;
let fingerprintRepository: MockRepository<SessionFingerprint>;
let eventsQueue: MockQueue;
beforeEach(async () => {
rawEventRepository = createMockRepository<RawEvent>();
fingerprintRepository = createMockRepository<SessionFingerprint>();
eventsQueue = createMockQueue();
const module = await Test.createTestingModule({
providers: [
TrackingService,
DeviceEnrichmentService,
AttributionService,
{ provide: getRepositoryToken(RawEvent), useValue: rawEventRepository },
{ provide: getRepositoryToken(SessionFingerprint), useValue: fingerprintRepository },
{ provide: getQueueToken('analytics-events'), useValue: eventsQueue },
],
}).compile();
service = module.get<TrackingService>(TrackingService);
deviceEnrichmentService = module.get<DeviceEnrichmentService>(DeviceEnrichmentService);
attributionService = module.get<AttributionService>(AttributionService);
});
describe('trackView', () => {
it('creates pageview event with enriched device data', async () => {
const mockRequest = {
headers: {
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/120.0.0.0',
'x-forwarded-for': '192.168.1.1',
},
socket: {},
} as unknown as Request;
const mockEvent = {
id: 'event-123',
eventType: 'pageview',
sessionId: 'session-abc',
pageUrl: 'https://example.com/page',
processed: false,
};
rawEventRepository.create.mockReturnValue(mockEvent as RawEvent);
rawEventRepository.save.mockResolvedValue(mockEvent as RawEvent);
fingerprintRepository.findOne.mockResolvedValue(null);
const result = await service.trackView(
{
pageUrl: 'https://example.com/page',
sessionId: 'session-abc',
referrer: 'https://google.com',
userId: 'user-123',
clientDevice: {
screenWidth: 1920,
screenHeight: 1080,
viewportWidth: 1440,
viewportHeight: 900,
},
attribution: {
utmSource: 'google',
utmMedium: 'cpc',
utmCampaign: 'winter-sale',
},
},
mockRequest,
);
expect(result.success).toBe(true);
expect(result.eventId).toBe('event-123');
expect(rawEventRepository.create).toHaveBeenCalledWith(
expect.objectContaining({
eventType: 'pageview',
sessionId: 'session-abc',
userId: 'user-123',
pageUrl: 'https://example.com/page',
referrer: 'https://google.com',
processed: false,
}),
);
expect(rawEventRepository.save).toHaveBeenCalledWith(mockEvent);
expect(eventsQueue.add).toHaveBeenCalledWith('process-event', {
eventId: 'event-123',
eventType: 'pageview',
sessionId: 'session-abc',
});
});
it('creates session fingerprint on first pageview', async () => {
const mockRequest = {
headers: {
'user-agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 15_0 like Mac OS X) Mobile/15E148 Safari/604.1',
},
socket: { remoteAddress: '10.0.0.1' },
} as unknown as Request;
const mockEvent = { id: 'event-456', eventType: 'pageview', sessionId: 'session-new' };
rawEventRepository.create.mockReturnValue(mockEvent as RawEvent);
rawEventRepository.save.mockResolvedValue(mockEvent as RawEvent);
fingerprintRepository.findOne.mockResolvedValue(null);
await service.trackView(
{
pageUrl: 'https://example.com/landing',
sessionId: 'session-new',
attribution: {
utmSource: 'facebook',
utmMedium: 'social',
},
},
mockRequest,
);
expect(fingerprintRepository.findOne).toHaveBeenCalledWith({
where: { sessionId: 'session-new' },
});
expect(fingerprintRepository.create).toHaveBeenCalledWith(
expect.objectContaining({
sessionId: 'session-new',
deviceType: 'mobile',
browser: 'Safari',
os: 'iOS',
trafficSource: 'social',
utmSource: 'facebook',
utmMedium: 'social',
landingPage: 'https://example.com/landing',
}),
);
expect(fingerprintRepository.save).toHaveBeenCalled();
});
it('updates existing session fingerprint with viewport and userId', async () => {
const mockRequest = {
headers: { 'user-agent': 'Chrome' },
socket: {},
} as unknown as Request;
const existingFingerprint = {
id: 'fp-123',
sessionId: 'session-existing',
userId: null,
viewportWidth: 1024,
viewportHeight: 768,
trafficSource: 'direct',
};
const mockEvent = { id: 'event-789' };
rawEventRepository.create.mockReturnValue(mockEvent as RawEvent);
rawEventRepository.save.mockResolvedValue(mockEvent as RawEvent);
fingerprintRepository.findOne.mockResolvedValue(existingFingerprint as SessionFingerprint);
await service.trackView(
{
pageUrl: 'https://example.com/dashboard',
sessionId: 'session-existing',
userId: 'user-456',
clientDevice: {
viewportWidth: 1440,
viewportHeight: 900,
},
},
mockRequest,
);
expect(fingerprintRepository.save).toHaveBeenCalledWith(
expect.objectContaining({
sessionId: 'session-existing',
userId: 'user-456',
viewportWidth: 1440,
viewportHeight: 900,
}),
);
});
it('handles errors gracefully and returns failure result', async () => {
const mockRequest = {
headers: {},
socket: {},
} as unknown as Request;
rawEventRepository.create.mockImplementation(() => {
throw new Error('Database connection failed');
});
const result = await service.trackView(
{
pageUrl: 'https://example.com/error',
sessionId: 'session-error',
},
mockRequest,
);
expect(result.success).toBe(false);
expect(result.eventId).toBeNull();
expect(result.error).toBe('Database connection failed');
});
it('extracts IP from x-forwarded-for header', async () => {
const mockRequest = {
headers: {
'x-forwarded-for': '203.0.113.1, 192.168.1.1, 10.0.0.1',
},
socket: { remoteAddress: '127.0.0.1' },
} as unknown as Request;
const enrichSpy = vi.spyOn(deviceEnrichmentService, 'enrich');
const mockEvent = { id: 'event-ip' };
rawEventRepository.create.mockReturnValue(mockEvent as RawEvent);
rawEventRepository.save.mockResolvedValue(mockEvent as RawEvent);
fingerprintRepository.findOne.mockResolvedValue(null);
await service.trackView(
{
pageUrl: 'https://example.com',
sessionId: 'session-ip',
},
mockRequest,
);
expect(enrichSpy).toHaveBeenCalledWith(
undefined,
'203.0.113.1',
undefined,
);
});
it('extracts IP from x-real-ip header', async () => {
const mockRequest = {
headers: {
'x-real-ip': '198.51.100.5',
},
socket: {},
} as unknown as Request;
const enrichSpy = vi.spyOn(deviceEnrichmentService, 'enrich');
const mockEvent = { id: 'event-real-ip' };
rawEventRepository.create.mockReturnValue(mockEvent as RawEvent);
rawEventRepository.save.mockResolvedValue(mockEvent as RawEvent);
fingerprintRepository.findOne.mockResolvedValue(null);
await service.trackView(
{
pageUrl: 'https://example.com',
sessionId: 'session-real-ip',
},
mockRequest,
);
expect(enrichSpy).toHaveBeenCalledWith(
undefined,
'198.51.100.5',
undefined,
);
});
});
describe('trackEvent', () => {
it('creates generic event with required fields', async () => {
const mockEvent = {
id: 'event-click-123',
eventType: 'click',
sessionId: 'session-xyz',
userId: 'user-789',
pageUrl: 'https://example.com/page',
};
rawEventRepository.create.mockReturnValue(mockEvent as RawEvent);
rawEventRepository.save.mockResolvedValue(mockEvent as RawEvent);
const result = await service.trackEvent({
eventType: 'click',
sessionId: 'session-xyz',
userId: 'user-789',
pageUrl: 'https://example.com/page',
metadata: {
elementId: 'cta-button',
elementText: 'Sign Up',
},
});
expect(result.success).toBe(true);
expect(result.eventId).toBe('event-click-123');
expect(rawEventRepository.create).toHaveBeenCalledWith(
expect.objectContaining({
eventType: 'click',
sessionId: 'session-xyz',
userId: 'user-789',
pageUrl: 'https://example.com/page',
metadata: {
elementId: 'cta-button',
elementText: 'Sign Up',
},
processed: false,
}),
);
expect(eventsQueue.add).toHaveBeenCalledWith('process-event', {
eventId: 'event-click-123',
eventType: 'click',
sessionId: 'session-xyz',
});
});
it('handles optional fields correctly', async () => {
const mockEvent = { id: 'event-minimal' };
rawEventRepository.create.mockReturnValue(mockEvent as RawEvent);
rawEventRepository.save.mockResolvedValue(mockEvent as RawEvent);
const result = await service.trackEvent({
eventType: 'custom_event',
sessionId: 'session-minimal',
});
expect(result.success).toBe(true);
expect(rawEventRepository.create).toHaveBeenCalledWith(
expect.objectContaining({
eventType: 'custom_event',
sessionId: 'session-minimal',
userId: null,
pageUrl: null,
metadata: null,
processed: false,
}),
);
});
it('uses custom timestamp if provided', async () => {
const customTimestamp = Date.now() - 60000; // 1 minute ago
const mockEvent = { id: 'event-timestamp' };
rawEventRepository.create.mockReturnValue(mockEvent as RawEvent);
rawEventRepository.save.mockResolvedValue(mockEvent as RawEvent);
await service.trackEvent({
eventType: 'scroll',
sessionId: 'session-ts',
timestamp: customTimestamp,
});
expect(rawEventRepository.create).toHaveBeenCalledWith(
expect.objectContaining({
timestamp: new Date(customTimestamp),
}),
);
});
it('handles errors and returns failure result', async () => {
rawEventRepository.save.mockRejectedValue(new Error('Save failed'));
const result = await service.trackEvent({
eventType: 'error_event',
sessionId: 'session-err',
});
expect(result.success).toBe(false);
expect(result.eventId).toBeNull();
expect(result.error).toBe('Save failed');
});
});
describe('trackBatch', () => {
it('processes multiple events successfully', async () => {
const mockEvents = [
{ id: 'event-1', eventType: 'click', sessionId: 'session-1' },
{ id: 'event-2', eventType: 'scroll', sessionId: 'session-1' },
{ id: 'event-3', eventType: 'submit', sessionId: 'session-1' },
];
rawEventRepository.create
.mockReturnValueOnce(mockEvents[0] as RawEvent)
.mockReturnValueOnce(mockEvents[1] as RawEvent)
.mockReturnValueOnce(mockEvents[2] as RawEvent);
rawEventRepository.save
.mockResolvedValueOnce(mockEvents[0] as RawEvent)
.mockResolvedValueOnce(mockEvents[1] as RawEvent)
.mockResolvedValueOnce(mockEvents[2] as RawEvent);
const result = await service.trackBatch([
{ eventType: 'click', sessionId: 'session-1', metadata: { target: 'button-1' } },
{ eventType: 'scroll', sessionId: 'session-1', metadata: { depth: 50 } },
{ eventType: 'submit', sessionId: 'session-1', metadata: { formId: 'contact' } },
]);
expect(result.success).toBe(true);
expect(result.count).toBe(3);
expect(result.errors).toHaveLength(0);
expect(eventsQueue.add).toHaveBeenCalledTimes(3);
});
it('handles partial failures in batch', async () => {
const mockEvent1 = { id: 'event-success' };
const mockEvent3 = { id: 'event-success-2' };
rawEventRepository.create
.mockReturnValueOnce(mockEvent1 as RawEvent)
.mockImplementationOnce(() => {
throw new Error('Invalid data');
})
.mockReturnValueOnce(mockEvent3 as RawEvent);
rawEventRepository.save
.mockResolvedValueOnce(mockEvent1 as RawEvent)
.mockResolvedValueOnce(mockEvent3 as RawEvent);
const result = await service.trackBatch([
{ eventType: 'event1', sessionId: 'session-1' },
{ eventType: 'event2', sessionId: 'session-2' },
{ eventType: 'event3', sessionId: 'session-3' },
]);
expect(result.success).toBe(false);
expect(result.count).toBe(2);
expect(result.errors).toHaveLength(1);
expect(result.errors[0]).toBe('Invalid data');
});
it('handles empty batch', async () => {
const result = await service.trackBatch([]);
expect(result.success).toBe(true);
expect(result.count).toBe(0);
expect(result.errors).toHaveLength(0);
});
});
describe('trackConversion', () => {
it('creates conversion event with value and currency', async () => {
const mockEvent = {
id: 'conversion-123',
eventType: 'conversion',
sessionId: 'session-conv',
userId: 'user-123',
metadata: {
conversionType: 'purchase',
value: 99.99,
currency: 'USD',
productId: 'prod-456',
},
};
rawEventRepository.create.mockReturnValue(mockEvent as RawEvent);
rawEventRepository.save.mockResolvedValue(mockEvent as RawEvent);
const result = await service.trackConversion({
conversionType: 'purchase',
sessionId: 'session-conv',
userId: 'user-123',
value: 99.99,
currency: 'USD',
metadata: {
productId: 'prod-456',
},
});
expect(result.success).toBe(true);
expect(result.eventId).toBe('conversion-123');
expect(rawEventRepository.create).toHaveBeenCalledWith(
expect.objectContaining({
eventType: 'conversion',
sessionId: 'session-conv',
userId: 'user-123',
metadata: {
conversionType: 'purchase',
value: 99.99,
currency: 'USD',
productId: 'prod-456',
},
processed: false,
}),
);
});
it('queues conversion event with high priority', async () => {
const mockEvent = { id: 'conversion-priority' };
rawEventRepository.create.mockReturnValue(mockEvent as RawEvent);
rawEventRepository.save.mockResolvedValue(mockEvent as RawEvent);
await service.trackConversion({
conversionType: 'signup',
sessionId: 'session-signup',
userId: 'user-new',
});
expect(eventsQueue.add).toHaveBeenCalledWith(
'process-event',
{
eventId: 'conversion-priority',
eventType: 'conversion',
sessionId: 'session-signup',
conversionType: 'signup',
},
{ priority: 1 },
);
});
it('handles conversion without value or currency', async () => {
const mockEvent = { id: 'conversion-no-value' };
rawEventRepository.create.mockReturnValue(mockEvent as RawEvent);
rawEventRepository.save.mockResolvedValue(mockEvent as RawEvent);
const result = await service.trackConversion({
conversionType: 'newsletter_signup',
sessionId: 'session-newsletter',
});
expect(result.success).toBe(true);
expect(rawEventRepository.create).toHaveBeenCalledWith(
expect.objectContaining({
metadata: expect.objectContaining({
conversionType: 'newsletter_signup',
value: undefined,
currency: undefined,
}),
}),
);
});
it('handles errors gracefully', async () => {
rawEventRepository.create.mockImplementation(() => {
throw new Error('Conversion tracking failed');
});
const result = await service.trackConversion({
conversionType: 'error_conversion',
sessionId: 'session-error',
});
expect(result.success).toBe(false);
expect(result.eventId).toBeNull();
expect(result.error).toBe('Conversion tracking failed');
});
});
});

View file

@ -1,376 +0,0 @@
import { Injectable, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { InjectQueue } from '@nestjs/bullmq';
import { Repository } from 'typeorm';
import { Queue } from 'bullmq';
import type { Request } from 'express';
import { RawEvent } from '../entities/raw-event.entity';
import { SessionFingerprint } from '../entities/session-fingerprint.entity';
import { DeviceEnrichmentService, ClientDeviceData, EnrichedDeviceData } from './device-enrichment.service';
import { AttributionService, AttributionInput, ResolvedAttribution } from './attribution.service';
import { IdentityService } from './identity.service';
import { DomainResolverService } from './domain-resolver.service';
export interface TrackViewInput {
pageUrl: string;
referrer?: string;
userId?: string;
sessionId: string;
metadata?: Record<string, unknown>;
clientDevice?: ClientDeviceData;
attribution?: AttributionInput;
}
export interface TrackEventInput {
eventType: string;
sessionId: string;
userId?: string;
pageUrl?: string;
metadata?: Record<string, unknown>;
timestamp?: number;
}
export interface TrackConversionInput {
conversionType: string;
sessionId: string;
userId?: string;
value?: number;
currency?: string;
metadata?: Record<string, unknown>;
}
export interface TrackingResult {
success: boolean;
eventId: string | null;
error?: string;
}
export interface BatchTrackingResult {
success: boolean;
count: number;
errors: string[];
}
@Injectable()
export class TrackingService {
private readonly logger = new Logger(TrackingService.name);
constructor(
private readonly deviceEnrichmentService: DeviceEnrichmentService,
private readonly attributionService: AttributionService,
private readonly identityService: IdentityService,
private readonly domainResolver: DomainResolverService,
@InjectRepository(RawEvent)
private readonly rawEventRepository: Repository<RawEvent>,
@InjectRepository(SessionFingerprint)
private readonly fingerprintRepository: Repository<SessionFingerprint>,
@InjectQueue('analytics-events')
private readonly eventsQueue: Queue,
) {}
/**
* Derive cross-domain dimensions for a request:
* - visitor_id_daily: sha256(daily_salt || ip || ua || lang)
* - corp_id, domain_id: from Origin/Referer/Host domains row
*
* Returns nulls for any dimension we can't resolve. Callers persist all
* three on the event row.
*/
private async resolveCrossDomainDimensions(request: Request): Promise<{
visitorIdDaily: Buffer | null;
corpId: number | null;
domainId: number | null;
}> {
const ua = (request.headers['user-agent'] as string | undefined) ?? undefined;
const lang = (request.headers['accept-language'] as string | undefined) ?? undefined;
const ip = this.extractIpAddress(request) ?? undefined;
const visitorIdDaily = await this.identityService.visitorIdDaily(ip, ua, lang);
const hostname = this.domainResolver.hostnameFromHeaders(request.headers);
const resolved = await this.domainResolver.resolve(hostname);
return {
visitorIdDaily,
corpId: resolved?.corpId ?? null,
domainId: resolved?.domainId ?? null,
};
}
/**
* Track page view with device fingerprinting
*/
async trackView(data: TrackViewInput, request: Request): Promise<TrackingResult> {
try {
// Extract User-Agent and IP from request headers
const userAgent = request.headers['user-agent'] as string | undefined;
const ipAddress = this.extractIpAddress(request);
// Enrich device data (IP is hashed, never stored raw)
const enriched = await this.deviceEnrichmentService.enrich(
userAgent,
ipAddress,
data.clientDevice,
);
// Resolve attribution
const attribution = this.attributionService.resolveTrafficSource({
utmSource: data.attribution?.utmSource,
utmMedium: data.attribution?.utmMedium,
utmCampaign: data.attribution?.utmCampaign,
utmContent: data.attribution?.utmContent,
utmTerm: data.attribution?.utmTerm,
referrer: data.referrer,
});
// Create or update session fingerprint (first-touch attribution)
await this.upsertSessionFingerprint(
data.sessionId,
data.userId,
enriched,
attribution,
data.pageUrl,
);
// Cross-domain dimensions
const { visitorIdDaily, corpId, domainId } =
await this.resolveCrossDomainDimensions(request);
// Create raw event
const event = this.rawEventRepository.create({
eventType: 'pageview',
sessionId: data.sessionId,
userId: data.userId ?? null,
pageUrl: data.pageUrl,
referrer: data.referrer ?? null,
deviceType: enriched.deviceType,
metadata: data.metadata ?? null,
timestamp: new Date(),
processed: false,
visitorIdDaily,
corpId,
domainId,
});
const saved = await this.rawEventRepository.save(event);
// Queue for async processing
await this.eventsQueue.add('process-event', {
eventId: saved.id,
eventType: 'pageview',
sessionId: data.sessionId,
});
return { success: true, eventId: saved.id };
} catch (error) {
this.logger.error('Failed to track view event', error);
return {
success: false,
eventId: null,
error: error instanceof Error ? error.message : 'Unknown error',
};
}
}
/**
* Track generic event
*/
async trackEvent(data: TrackEventInput): Promise<TrackingResult> {
try {
const event = this.rawEventRepository.create({
eventType: data.eventType,
sessionId: data.sessionId,
userId: data.userId ?? null,
pageUrl: data.pageUrl ?? null,
metadata: data.metadata ?? null,
timestamp: data.timestamp ? new Date(data.timestamp) : new Date(),
processed: false,
});
const saved = await this.rawEventRepository.save(event);
// Queue for async processing
await this.eventsQueue.add('process-event', {
eventId: saved.id,
eventType: data.eventType,
sessionId: data.sessionId,
});
return { success: true, eventId: saved.id };
} catch (error) {
this.logger.error('Failed to track event', error);
return {
success: false,
eventId: null,
error: error instanceof Error ? error.message : 'Unknown error',
};
}
}
/**
* Track batch of events
*/
async trackBatch(events: TrackEventInput[]): Promise<BatchTrackingResult> {
const results = await Promise.allSettled(
events.map((event) => this.trackEvent(event)),
);
const errors = results
.filter((r): r is PromiseRejectedResult => r.status === 'rejected')
.map((r) => (r.reason as Error)?.message ?? 'Unknown error');
const successCount = results.filter((r) => r.status === 'fulfilled').length;
return {
success: errors.length === 0,
count: successCount,
errors,
};
}
/**
* Track conversion event
*/
async trackConversion(data: TrackConversionInput): Promise<TrackingResult> {
try {
const event = this.rawEventRepository.create({
eventType: 'conversion',
sessionId: data.sessionId,
userId: data.userId ?? null,
metadata: {
conversionType: data.conversionType,
value: data.value,
currency: data.currency,
...data.metadata,
},
timestamp: new Date(),
processed: false,
});
const saved = await this.rawEventRepository.save(event);
// Queue for async processing (high priority for conversions)
await this.eventsQueue.add(
'process-event',
{
eventId: saved.id,
eventType: 'conversion',
sessionId: data.sessionId,
conversionType: data.conversionType,
},
{ priority: 1 }, // Higher priority
);
return { success: true, eventId: saved.id };
} catch (error) {
this.logger.error('Failed to track conversion', error);
return {
success: false,
eventId: null,
error: error instanceof Error ? error.message : 'Unknown error',
};
}
}
/**
* Create or update session fingerprint
* Attribution is first-touch: only set on creation
*/
private async upsertSessionFingerprint(
sessionId: string,
userId: string | undefined,
enriched: EnrichedDeviceData,
attribution: ResolvedAttribution,
landingPage: string,
): Promise<void> {
try {
const existing = await this.fingerprintRepository.findOne({
where: { sessionId },
});
if (existing) {
// Update viewport (may change on resize) and userId (if logged in later)
if (enriched.viewportWidth !== undefined) {
existing.viewportWidth = enriched.viewportWidth;
}
if (enriched.viewportHeight !== undefined) {
existing.viewportHeight = enriched.viewportHeight;
}
if (userId && !existing.userId) {
existing.userId = userId;
}
await this.fingerprintRepository.save(existing);
} else {
// Create new fingerprint with first-touch attribution
const fingerprint = this.fingerprintRepository.create({
sessionId,
userId: userId ?? null,
deviceType: enriched.deviceType,
isBot: enriched.isBot,
browser: enriched.browser,
browserVersion: enriched.browserVersion,
browserMajor: enriched.browserMajor,
os: enriched.os,
osVersion: enriched.osVersion,
deviceVendor: enriched.deviceVendor,
deviceModel: enriched.deviceModel,
screenWidth: enriched.screenWidth ?? null,
screenHeight: enriched.screenHeight ?? null,
viewportWidth: enriched.viewportWidth ?? null,
viewportHeight: enriched.viewportHeight ?? null,
pixelRatio: enriched.pixelRatio ?? null,
colorDepth: enriched.colorDepth ?? null,
language: enriched.language ?? null,
languages: enriched.languages ?? null,
timezone: enriched.timezone ?? null,
timezoneOffset: enriched.timezoneOffset ?? null,
country: enriched.country,
region: enriched.region,
city: enriched.city,
isEU: enriched.isEU,
geoTimezone: enriched.geoTimezone,
isVpn: enriched.isVpn,
isDatacenter: enriched.isDatacenter,
isTor: enriched.isTor,
ipHash: enriched.ipHash,
deviceMemory: enriched.deviceMemory ?? null,
hardwareConcurrency: enriched.hardwareConcurrency ?? null,
touchPoints: enriched.touchPoints ?? null,
cookiesEnabled: enriched.cookiesEnabled ?? null,
doNotTrack: enriched.doNotTrack ?? null,
// First-touch attribution
trafficSource: attribution.source,
utmSource: attribution.rawSource,
utmMedium: attribution.medium,
utmCampaign: attribution.campaign,
utmContent: attribution.content,
utmTerm: attribution.term,
referrer: attribution.referrer,
landingPage,
});
await this.fingerprintRepository.save(fingerprint);
}
} catch (error) {
// Log but don't fail - fingerprinting is enhancement, not critical
this.logger.warn('Failed to upsert session fingerprint', error);
}
}
/**
* Extract client IP from request headers
*/
private extractIpAddress(request: Request): string | undefined {
const forwardedFor = request.headers['x-forwarded-for'];
const realIp = request.headers['x-real-ip'];
if (forwardedFor) {
const firstIp = (forwardedFor as string).split(',')[0];
return firstIp?.trim();
}
if (realIp) {
return realIp as string;
}
return request.socket.remoteAddress;
}
}

View file

@ -18,6 +18,7 @@
},
"dependencies": {
"@lilith/analytics": "workspace:*",
"@lilith/gov-detection": "^1.0.3",
"@nestjs/bullmq": "^11.0.0",
"@nestjs/common": "^11.0.0",
"@nestjs/config": "^4.0.0",
@ -26,6 +27,7 @@
"@nestjs/terminus": "^11.0.0",
"@nestjs/typeorm": "^11.0.0",
"bullmq": "^5.0.0",
"geoip-lite": "^2.0.1",
"ioredis": "^5.9.1",
"pg": "^8.11.0",
"reflect-metadata": "^0.2.0",
@ -39,6 +41,7 @@
"@nestjs/testing": "^11.0.0",
"@swc/cli": "^0.7.10",
"@swc/core": "^1.15.8",
"@types/geoip-lite": "^1.4.4",
"@types/node": "^20.0.0",
"typescript": "^5.4.0",
"vitest": "^1.0.0"

View file

@ -7,13 +7,20 @@ import {
} from 'typeorm';
/**
* Raw analytics event - mirrors the collector's raw_events table.
* Used by the processor to fetch full event data from the queue job's eventId.
* Raw analytics event the canonical event log.
*
* The processor service is now the canonical writer: the ingest-writer inserts
* rows here from edge envelopes, and the aggregation worker reads them back by
* id. This must carry the full schema (including the cross-domain dimensions),
* not just the read-side subset.
*/
@Entity('raw_events')
@Index(['sessionId', 'timestamp'])
@Index(['eventType', 'timestamp'])
@Index(['processed', 'timestamp'])
@Index(['visitorIdDaily', 'timestamp'])
@Index(['corpId', 'timestamp'])
@Index(['domainId', 'timestamp'])
export class RawEvent {
@PrimaryGeneratedColumn('uuid')
id!: string;
@ -55,4 +62,21 @@ export class RawEvent {
@Column({ type: 'timestamptz', nullable: true })
processedAt?: Date | null;
/**
* Daily-rotating visitor identity for cross-domain stitching.
* sha256(daily_salt || ip || ua || lang). 32 bytes. Stable for the same
* visitor across all our domains within a UTC day; unrecoverable after salt
* rotation (cookie-free, GDPR-clean).
*/
@Column({ type: 'bytea', nullable: true, name: 'visitor_id_daily' })
visitorIdDaily?: Buffer | null;
/** Corp that owns the originating domain. FK → corps(id). */
@Column({ type: 'smallint', nullable: true, name: 'corp_id' })
corpId?: number | null;
/** Domain the event originated from. FK → domains(id). */
@Column({ type: 'integer', nullable: true, name: 'domain_id' })
domainId?: number | null;
}

View file

@ -0,0 +1,36 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { RawEvent } from '../entities/raw-event.entity';
import { SessionFingerprint } from '../entities/session-fingerprint.entity';
import { VisitorSalt } from '../entities/visitor-salt.entity';
import { Domain } from '../entities/domain.entity';
import { Corp } from '../entities/corp.entity';
import { DeviceEnrichmentService } from './device-enrichment.service';
import { AttributionService } from './attribution.service';
import { GovDetectionService } from './gov-detection.service';
import { IdentityService } from './identity.service';
import { DomainResolverService } from './domain-resolver.service';
import { IngestService } from './ingest.service';
/**
* Canonical-writer module. Owns the DB-bound enrichment that used to run in the
* collector and the {@link IngestService} that turns an edge envelope into a
* `raw_events` row. Lives on the canonical-store host (black).
*/
@Module({
imports: [
TypeOrmModule.forFeature([RawEvent, SessionFingerprint, VisitorSalt, Domain, Corp]),
],
providers: [
DeviceEnrichmentService,
AttributionService,
GovDetectionService,
IdentityService,
DomainResolverService,
IngestService,
],
exports: [IngestService],
})
export class IngestModule {}

View file

@ -0,0 +1,221 @@
import { Injectable, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import type { RawEventEnvelope } from '@lilith/analytics';
import { RawEvent } from '../entities/raw-event.entity';
import { SessionFingerprint } from '../entities/session-fingerprint.entity';
import {
DeviceEnrichmentService,
type ClientDeviceData,
type EnrichedDeviceData,
} from './device-enrichment.service';
import { AttributionService, type ResolvedAttribution } from './attribution.service';
import { IdentityService } from './identity.service';
import { DomainResolverService } from './domain-resolver.service';
/**
* Canonical writer. Consumes an edge-captured {@link RawEventEnvelope}, runs the
* DB-bound enrichment that used to live in the collector (device, identity,
* domain, fingerprint), and inserts the canonical `raw_events` row.
*
* Idempotency: the row's primary key is the edge-minted `eventId`, inserted
* with `ON CONFLICT (id) DO NOTHING`. A re-drained or duplicated envelope is a
* no-op and does NOT re-trigger aggregation (which is additive and would
* otherwise inflate counters).
*
* Timestamps: `receivedAt` is taken from the envelope (the EDGE clock), never
* `now()`, so events spooled during a canonical-store outage keep their true
* arrival time when drained.
*/
@Injectable()
export class IngestService {
private readonly logger = new Logger(IngestService.name);
constructor(
private readonly deviceEnrichment: DeviceEnrichmentService,
private readonly attribution: AttributionService,
private readonly identity: IdentityService,
private readonly domainResolver: DomainResolverService,
@InjectRepository(RawEvent)
private readonly rawEventRepo: Repository<RawEvent>,
@InjectRepository(SessionFingerprint)
private readonly fingerprintRepo: Repository<SessionFingerprint>,
) {}
/**
* Persist one event. Returns the aggregation job payload when this call
* actually inserted the row, or `null` when the row already existed (so the
* caller skips re-enqueuing aggregation).
*/
async write(
envelope: RawEventEnvelope,
): Promise<{ eventId: string; eventType: string; sessionId: string } | null> {
const ev = envelope.payload;
const receivedAt = new Date(envelope.receivedAt);
const timestamp =
ev.timestamp != null && Number.isFinite(ev.timestamp)
? new Date(ev.timestamp)
: receivedAt;
let deviceType: string | null = null;
let visitorIdDaily: Buffer | null = null;
let corpId: number | null = null;
let domainId: number | null = null;
if (ev.isView) {
const enriched = await this.deviceEnrichment.enrich(
envelope.edge.ua,
envelope.edge.ip,
(ev.clientDevice ?? undefined) as ClientDeviceData | undefined,
);
deviceType = enriched.deviceType;
const attribution = this.attribution.resolveTrafficSource({
utmSource: ev.attribution?.utmSource,
utmMedium: ev.attribution?.utmMedium,
utmCampaign: ev.attribution?.utmCampaign,
utmContent: ev.attribution?.utmContent,
utmTerm: ev.attribution?.utmTerm,
referrer: ev.attribution?.referrer ?? ev.referrer ?? undefined,
});
await this.upsertSessionFingerprint(envelope, enriched, attribution);
visitorIdDaily = await this.identity.visitorIdDaily(
envelope.edge.ip,
envelope.edge.ua,
envelope.edge.lang,
receivedAt,
);
const hostname = this.domainResolver.hostnameFromHeaders(envelope.edge.headers);
const resolved = await this.domainResolver.resolve(hostname);
corpId = resolved?.corpId ?? null;
domainId = resolved?.domainId ?? null;
}
const result = await this.rawEventRepo
.createQueryBuilder()
.insert()
.into(RawEvent)
.values({
id: envelope.eventId,
eventType: ev.eventType,
sessionId: ev.sessionId,
userId: ev.userId ?? null,
pageUrl: ev.pageUrl ?? null,
referrer: ev.referrer ?? null,
deviceType,
metadata: ev.metadata ?? null,
timestamp,
receivedAt,
processed: false,
visitorIdDaily,
corpId,
domainId,
})
.orIgnore()
.returning('id')
.execute();
const inserted = Array.isArray(result.raw) && result.raw.length > 0;
if (!inserted) {
this.logger.debug(`raw_events ${envelope.eventId} already present — duplicate drain, skipping aggregation`);
return null;
}
return { eventId: envelope.eventId, eventType: ev.eventType, sessionId: ev.sessionId };
}
/**
* Create or update the session fingerprint. Attribution is first-touch: set
* only on creation, never overwritten. Failures are logged, not thrown
* fingerprinting is enhancement, not the canonical record.
*/
private async upsertSessionFingerprint(
envelope: RawEventEnvelope,
enriched: EnrichedDeviceData,
attribution: ResolvedAttribution,
): Promise<void> {
const ev = envelope.payload;
try {
const existing = await this.fingerprintRepo.findOne({
where: { sessionId: ev.sessionId },
});
if (existing) {
if (enriched.viewportWidth !== undefined) {
existing.viewportWidth = enriched.viewportWidth;
}
if (enriched.viewportHeight !== undefined) {
existing.viewportHeight = enriched.viewportHeight;
}
if (ev.userId && !existing.userId) {
existing.userId = ev.userId;
}
await this.fingerprintRepo.save(existing);
return;
}
const fingerprint = this.fingerprintRepo.create({
sessionId: ev.sessionId,
userId: ev.userId ?? null,
deviceType: enriched.deviceType,
isBot: enriched.isBot,
browser: enriched.browser,
browserVersion: enriched.browserVersion,
browserMajor: enriched.browserMajor,
os: enriched.os,
osVersion: enriched.osVersion,
deviceVendor: enriched.deviceVendor,
deviceModel: enriched.deviceModel,
screenWidth: enriched.screenWidth ?? null,
screenHeight: enriched.screenHeight ?? null,
viewportWidth: enriched.viewportWidth ?? null,
viewportHeight: enriched.viewportHeight ?? null,
pixelRatio: enriched.pixelRatio ?? null,
colorDepth: enriched.colorDepth ?? null,
language: enriched.language ?? null,
languages: enriched.languages ?? null,
timezone: enriched.timezone ?? null,
timezoneOffset: enriched.timezoneOffset ?? null,
country: enriched.country,
region: enriched.region,
city: enriched.city,
isEU: enriched.isEU,
geoTimezone: enriched.geoTimezone,
isVpn: enriched.isVpn,
isDatacenter: enriched.isDatacenter,
isTor: enriched.isTor,
isGovernment: enriched.isGovernment,
orgType: enriched.orgType,
responseTier: enriched.responseTier,
org: enriched.org,
asn: enriched.asn,
ipHash: enriched.ipHash,
deviceMemory: enriched.deviceMemory ?? null,
hardwareConcurrency: enriched.hardwareConcurrency ?? null,
touchPoints: enriched.touchPoints ?? null,
cookiesEnabled: enriched.cookiesEnabled ?? null,
doNotTrack: enriched.doNotTrack ?? null,
trafficSource: attribution.source,
utmSource: attribution.rawSource,
utmMedium: attribution.medium,
utmCampaign: attribution.campaign,
utmContent: attribution.content,
utmTerm: attribution.term,
referrer: attribution.referrer,
landingPage: ev.pageUrl ?? null,
});
await this.fingerprintRepo.save(fingerprint);
} catch (error) {
// First-write race on sessionId (unique) just means another envelope for
// the same session won — safe to ignore. Anything else is logged.
this.logger.warn(
`Failed to upsert session fingerprint for ${ev.sessionId}: ${
error instanceof Error ? error.message : String(error)
}`,
);
}
}
}

View file

@ -1,19 +1,34 @@
import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq';
import { Processor, WorkerHost, OnWorkerEvent, InjectQueue } from '@nestjs/bullmq';
import { Injectable, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import type { Job } from 'bullmq';
import type { Job, Queue } from 'bullmq';
import {
EVENTS_QUEUE,
INGEST_EVENT_JOB,
PROCESS_EVENT_JOB,
type RawEventEnvelope,
type ProcessEventJob,
} from '@lilith/analytics';
import { AggregationService } from './aggregation.service';
import { RawEvent } from '../entities/raw-event.entity';
import { IngestService } from '../ingest/ingest.service';
interface EventJob {
eventId: string;
eventType: string;
sessionId: string;
}
/**
* Single worker on the shared `analytics-events` queue. It handles two job
* types so they share one redis connection and consumer pool:
*
* ingest-event enrich + write the canonical raw_events row (IngestService),
* then emit a process-event job for the row it inserted.
* process-event aggregate an already-written row into aggregated_metrics.
*
* Splitting these into two @Processor classes on the same queue would NOT work:
* BullMQ workers pull any job regardless of name, so a single switch is the
* correct pattern.
*/
@Injectable()
@Processor('analytics-events', {
@Processor(EVENTS_QUEUE, {
concurrency: 10,
})
export class EventsProcessor extends WorkerHost {
@ -21,16 +36,49 @@ export class EventsProcessor extends WorkerHost {
constructor(
private readonly aggregationService: AggregationService,
private readonly ingestService: IngestService,
@InjectRepository(RawEvent)
private readonly rawEventRepository: Repository<RawEvent>,
@InjectQueue(EVENTS_QUEUE)
private readonly eventsQueue: Queue,
) {
super();
}
async process(job: Job<EventJob>): Promise<void> {
async process(job: Job): Promise<void> {
switch (job.name) {
case INGEST_EVENT_JOB:
return this.handleIngest(job as Job<RawEventEnvelope>);
case PROCESS_EVENT_JOB:
return this.handleAggregate(job as Job<ProcessEventJob>);
default:
this.logger.warn(`Unknown job name '${job.name}' on ${EVENTS_QUEUE} — skipping`);
return;
}
}
/** Write the canonical row, then hand off to aggregation (only if newly inserted). */
private async handleIngest(job: Job<RawEventEnvelope>): Promise<void> {
const written = await this.ingestService.write(job.data);
if (!written) return; // duplicate drain — row already present, already aggregated
await this.eventsQueue.add(
PROCESS_EVENT_JOB,
{
eventId: written.eventId,
eventType: written.eventType,
sessionId: written.sessionId,
} satisfies ProcessEventJob,
// Reuse the event id so a re-drain can't double-enqueue aggregation.
{ jobId: `agg:${written.eventId}` },
);
}
/** Aggregate one already-written raw event. */
private async handleAggregate(job: Job<ProcessEventJob>): Promise<void> {
const { eventId, eventType, sessionId } = job.data;
this.logger.debug(`Processing event: ${eventType} (id: ${eventId}) from session ${sessionId}`);
this.logger.debug(`Aggregating event: ${eventType} (id: ${eventId}) from session ${sessionId}`);
const event = await this.rawEventRepository.findOne({ where: { id: eventId } });
@ -63,14 +111,12 @@ export class EventsProcessor extends WorkerHost {
}
@OnWorkerEvent('completed')
onCompleted(job: Job<EventJob>) {
this.logger.debug(`Job ${job.id} completed for event ${job.data.eventType}`);
onCompleted(job: Job) {
this.logger.debug(`Job ${job.id} (${job.name}) completed`);
}
@OnWorkerEvent('failed')
onFailed(job: Job<EventJob>, error: Error) {
this.logger.error(
`Job ${job.id} failed for event ${job.data.eventType}: ${error.message}`,
);
onFailed(job: Job, error: Error) {
this.logger.error(`Job ${job.id} (${job.name}) failed: ${error.message}`);
}
}

View file

@ -1,19 +1,22 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { BullModule } from '@nestjs/bullmq';
import { EVENTS_QUEUE } from '@lilith/analytics';
import { EventsProcessor } from './events.processor';
import { AggregationService } from './aggregation.service';
import { AggregatedMetric } from '../entities/aggregated-metric.entity';
import { RawEvent } from '../entities/raw-event.entity';
import { RedisModule } from '../redis/redis.module';
import { IngestModule } from '../ingest/ingest.module';
@Module({
imports: [
TypeOrmModule.forFeature([AggregatedMetric, RawEvent]),
BullModule.registerQueue({
name: 'analytics-events',
name: EVENTS_QUEUE,
}),
RedisModule,
IngestModule,
],
providers: [EventsProcessor, AggregationService],
exports: [AggregationService],

View file

@ -45,5 +45,27 @@ export class SchemaGuardService implements OnModuleInit {
NULLS NOT DISTINCT
`);
this.logger.log('uq_aggregated_metrics_dedup ensured (NULLS NOT DISTINCT)');
// session_fingerprints enrichment-column guard.
//
// The gov-detection + ASN fields were added to the SessionFingerprint entity
// after the prod table was created. With `synchronize: false` and no migration
// runner, those columns never reached prod — so every fingerprint INSERT threw
// "column does not exist" and was swallowed by upsertSessionFingerprint's catch
// (ingest.service.ts), silently freezing the table. raw_events kept filling, so
// only the fingerprint-backed dashboard pages (Traffic/Audience/Network) went
// blank. Same failure class as the aggregated_metrics outage above.
//
// All columns are nullable in the entity, so adding them is purely additive and
// idempotent. `ALTER TABLE IF EXISTS` keeps this safe on a not-yet-created table.
await this.dataSource.query(`
ALTER TABLE IF EXISTS session_fingerprints
ADD COLUMN IF NOT EXISTS "isGovernment" boolean,
ADD COLUMN IF NOT EXISTS "orgType" varchar(30),
ADD COLUMN IF NOT EXISTS "responseTier" varchar(20),
ADD COLUMN IF NOT EXISTS "org" varchar(200),
ADD COLUMN IF NOT EXISTS "asn" integer
`);
this.logger.log('session_fingerprints enrichment columns ensured (gov-detection + ASN)');
}
}