feat(analytics): canonical store on black, vps-0 edge with redis outage-spool

Relocate the canonical analytics store off the public VPS. The collector
becomes a DB-free edge that captures + durably enqueues events to black's
redis; a black-side ingest-writer enriches and writes raw_events. When black
is unreachable the collector spools to a local appendonly redis that only
grows during the outage and drains on recovery.

- shared: RawEventEnvelope/NormalizedEvent ingest contract (edge -> writer)
- collector: capture-and-enqueue + dual-redis RedisRouter (primary=black,
  spool=local) + paused-until-healthy drain worker; drop TypeORM/enrichment
- processor: IngestService canonical writer (edge receivedAt, ON CONFLICT
  DO NOTHING), single worker branches ingest-event -> process-event
- relocate device/identity/domain/attribution enrichment + entities to writer

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Natalie 2026-06-21 07:48:02 -05:00
parent 842c180980
commit b252753476
32 changed files with 1022 additions and 2194 deletions

View file

@ -12,3 +12,6 @@ export * from './common';
// GDPR Types
export * from './gdpr';
// Ingest envelope (edge collector → black-side ingest-writer contract)
export * from './ingest';

View file

@ -0,0 +1,123 @@
/**
* Ingest envelope the durable contract between the edge collector (producer)
* and the canonical-store ingest-writer (consumer).
*
* The collector does NO database work. It captures the request at the edge,
* wraps it in a {@link RawEventEnvelope}, and enqueues it to redis. All
* enrichment (visitor identity, domain resolution, device fingerprint) and the
* canonical `raw_events` write happen on the black-side ingest-writer when the
* envelope is consumed. This makes the redis enqueue not a synchronous DB
* write the durability boundary, so a canonical-store outage spools events
* locally instead of dropping them.
*/
/** BullMQ queue shared by the collector (producer) and the black-side workers. */
export const EVENTS_QUEUE = 'analytics-events';
/**
* Job name for a captured-at-edge event awaiting enrichment + canonical write.
* Consumed by the black-side ingest-writer.
*/
export const INGEST_EVENT_JOB = 'ingest-event';
/**
* Job name for an already-written `raw_events` row awaiting aggregation.
* Emitted by the ingest-writer, consumed by the existing aggregation processor.
*/
export const PROCESS_EVENT_JOB = 'process-event';
/** Which collector endpoint produced the envelope — selects the writer's branch. */
export type IngestKind =
| 'view'
| 'engagement'
| 'interaction'
| 'event'
| 'batch'
| 'conversion'
| 'funnel'
| 'registration-funnel';
/**
* First-touch attribution inputs (UTM + referrer), carried verbatim from the
* edge for views. Resolved into a traffic source on the black-side writer.
*/
export interface AttributionInputData {
readonly utmSource?: string;
readonly utmMedium?: string;
readonly utmCampaign?: string;
readonly utmContent?: string;
readonly utmTerm?: string;
readonly referrer?: string;
}
/**
* A single event, fully normalized at the edge into the generic shape the
* canonical writer persists. All per-endpoint mapping (engagement eventType,
* funnel eventType, batch expansion, etc.) happens on the collector because
* it is pure; only DB-bound enrichment (device, identity, fingerprint) is
* deferred to the writer.
*/
export interface NormalizedEvent {
/** Canonical event type, e.g. 'pageview' | 'conversion' | 'engagement_like'. */
readonly eventType: string;
readonly sessionId: string;
readonly userId?: string | null;
readonly pageUrl?: string | null;
readonly referrer?: string | null;
readonly metadata?: Record<string, unknown> | null;
/** Client-supplied event time (ms since epoch). Writer falls back to `receivedAt`. */
readonly timestamp?: number | null;
/**
* Whether this event runs the full view pipeline on the writer: device
* enrichment, session fingerprint upsert, and cross-domain visitor identity.
* Mirrors the legacy split where only page views were enriched.
*/
readonly isView: boolean;
/** Client device signals (views only). */
readonly clientDevice?: Record<string, unknown> | null;
/** Attribution inputs (views only). */
readonly attribution?: AttributionInputData | null;
}
/** Edge-captured request context the black-side writer needs for enrichment. */
export interface EdgeContext {
/** Client IP extracted at the edge (x-real-ip / x-forwarded-for / socket). */
readonly ip?: string;
/** User-Agent header. */
readonly ua?: string;
/** Accept-Language header. */
readonly lang?: string;
/**
* Headers the writer needs for domain resolution (origin / referer / host).
* Lowercased keys; only the subset required downstream is carried.
*/
readonly headers: Record<string, string | undefined>;
}
/**
* A single captured analytics event, durable in redis from the moment the
* collector enqueues it.
*
* `eventId` is minted at the edge and used as BOTH the BullMQ `jobId` AND the
* `raw_events` primary key, so re-drains and duplicates are idempotent
* (`INSERT ... ON CONFLICT (id) DO NOTHING`).
*
* `receivedAt` is the EDGE clock (ISO-8601). The writer MUST persist this value
* as the row's `receivedAt` never `now()` or an outage backdates every
* spooled event to drain time and smears time-bucketed aggregates.
*/
export interface RawEventEnvelope {
readonly eventId: string;
readonly kind: IngestKind;
readonly receivedAt: string;
readonly edge: EdgeContext;
/** The normalized event the writer persists. */
readonly payload: NormalizedEvent;
}
/** BullMQ job payload emitted by the ingest-writer for the aggregation stage. */
export interface ProcessEventJob {
readonly eventId: string;
readonly eventType: string;
readonly sessionId: string;
}

View file

@ -18,24 +18,18 @@
},
"dependencies": {
"@lilith/analytics": "workspace:*",
"@lilith/gov-detection": "^1.0.3",
"@nestjs/bullmq": "^11.0.0",
"@nestjs/common": "^11.0.0",
"@nestjs/config": "^4.0.0",
"@nestjs/core": "^11.0.0",
"@nestjs/platform-express": "^11.0.0",
"@nestjs/swagger": "^11.0.0",
"@nestjs/terminus": "^11.0.0",
"@nestjs/throttler": "^6.0.0",
"@nestjs/typeorm": "^11.0.0",
"bullmq": "^5.0.0",
"class-transformer": "^0.5.0",
"class-validator": "^0.14.0",
"geoip-lite": "^2.0.1",
"pg": "^8.11.0",
"ioredis": "^5.9.1",
"reflect-metadata": "^0.2.0",
"rxjs": "^7.8.0",
"typeorm": "^0.3.0"
"rxjs": "^7.8.0"
},
"devDependencies": {
"@lilith/configs": "^2.2.1",
@ -45,7 +39,6 @@
"@swc/cli": "^0.7.10",
"@swc/core": "^1.15.8",
"@types/express": "^5.0.0",
"@types/geoip-lite": "^1.4.4",
"@types/node": "^20.0.0",
"typescript": "^5.4.0",
"unplugin-swc": "^1.5.1",

View file

@ -1,28 +1,25 @@
import { Module } from '@nestjs/common';
import { APP_GUARD } from '@nestjs/core';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { ConfigModule } from '@nestjs/config';
import { ThrottlerModule } from '@nestjs/throttler';
import { BullModule } from '@nestjs/bullmq';
import { TypeOrmModule } from '@nestjs/typeorm';
import { TrackingModule } from './tracking/tracking.module';
import { HealthModule } from './health/health.module';
import { BeaconModule } from './beacon/beacon.module';
import { RawEvent } from './entities/raw-event.entity';
import { SessionFingerprint } from "./entities/session-fingerprint.entity";
import { Corp } from "./entities/corp.entity";
import { Domain } from "./entities/domain.entity";
import { VisitorSalt } from "./entities/visitor-salt.entity";
import { WriteKeyGuard } from './auth/write-key.guard';
/**
* Edge collector app. No TypeORM, no BullMQ root module the collector owns no
* database and manages its own dual-redis connections inside {@link RedisRouter}
* (primary = black redis, spool = local redis). All persistence and enrichment
* live on the black-side processor/ingest-writer.
*/
@Module({
imports: [
// Configuration
ConfigModule.forRoot({
isGlobal: true,
envFilePath: ['.env.local', '.env'],
}),
// Rate limiting
ThrottlerModule.forRoot([
{
ttl: 60000, // 1 minute
@ -30,42 +27,6 @@ import { WriteKeyGuard } from './auth/write-key.guard';
},
]),
// Database
TypeOrmModule.forRootAsync({
inject: [ConfigService],
useFactory: (config: ConfigService) => ({
type: 'postgres',
host: config.get('DATABASE_HOST', 'localhost'),
port: config.get('DATABASE_PORT', 5432),
username: config.get('DATABASE_USER', 'analytics'),
password: config.get('DATABASE_PASSWORD', 'analytics'),
database: config.get('DATABASE_NAME', 'analytics'),
entities: [RawEvent, SessionFingerprint, Corp, Domain, VisitorSalt],
synchronize: config.get('DB_SYNCHRONIZE') === 'true',
logging: config.get('NODE_ENV') === 'development',
}),
}),
// Queue for async processing
BullModule.forRootAsync({
inject: [ConfigService],
useFactory: (config: ConfigService) => {
const password = config.get<string>('REDIS_PASSWORD');
return {
connection: {
host: config.get('REDIS_HOST', 'localhost'),
port: config.get('REDIS_PORT', 6379),
...(password ? { password } : {}),
},
};
},
}),
BullModule.registerQueue({
name: 'analytics-events',
}),
// Feature modules
TrackingModule,
HealthModule,
BeaconModule,

View file

@ -1,6 +0,0 @@
export { RawEvent } from './raw-event.entity';
export { SessionFingerprint } from './session-fingerprint.entity';
export { Corp } from "./corp.entity";
export { Domain } from "./domain.entity";
export type { DomainRole } from "./domain.entity";
export { VisitorSalt } from "./visitor-salt.entity";

View file

@ -1,93 +0,0 @@
import {
Entity,
PrimaryGeneratedColumn,
Column,
CreateDateColumn,
Index,
} from 'typeorm';
/**
* Raw analytics event - stores all incoming events before processing
* Generic event schema - product-specific fields go in metadata
*/
@Entity('raw_events')
@Index(['sessionId', 'timestamp'])
@Index(['eventType', 'timestamp'])
@Index(['processed', 'timestamp'])
@Index(['visitorIdDaily', 'timestamp'])
@Index(['corpId', 'timestamp'])
@Index(['domainId', 'timestamp'])
export class RawEvent {
@PrimaryGeneratedColumn('uuid')
id!: string;
/**
* Event type identifier
* Generic: 'pageview', 'click', 'scroll', 'conversion', 'custom'
* Product-specific types stored but processed by product-specific handlers
*/
@Column({ type: 'varchar', length: 100 })
@Index()
eventType!: string;
/** Session identifier (client-generated) */
@Column({ type: 'varchar', length: 64 })
@Index()
sessionId!: string;
/** Optional authenticated user ID */
@Column({ type: 'varchar', length: 64, nullable: true })
@Index()
userId?: string | null;
/** Page URL where event occurred */
@Column({ type: 'text', nullable: true })
pageUrl?: string | null;
/** Referrer URL */
@Column({ type: 'text', nullable: true })
referrer?: string | null;
/** Device type: 'desktop' | 'mobile' | 'tablet' | 'bot' */
@Column({ type: 'varchar', length: 20, nullable: true })
deviceType?: string | null;
/** Event-specific data (JSON) */
@Column({ type: 'jsonb', nullable: true })
metadata?: Record<string, unknown> | null;
/** Client-side timestamp (when event occurred) */
@Column({ type: 'timestamptz' })
@Index()
timestamp!: Date;
/** Server-side timestamp (when event received) */
@CreateDateColumn({ type: 'timestamptz' })
receivedAt!: Date;
/** Whether event has been processed by aggregation workers */
@Column({ type: 'boolean', default: false })
@Index()
processed!: boolean;
/** Processing timestamp */
@Column({ type: 'timestamptz', nullable: true })
processedAt?: Date | null;
/**
* Daily-rotating visitor identity for cross-domain stitching.
* sha256(daily_salt || ip || ua || lang). 32 bytes. Stable for the same
* visitor across all our domains within a UTC day; unrecoverable after
* salt rotation (cookie-free, GDPR-clean).
*/
@Column({ type: 'bytea', nullable: true, name: 'visitor_id_daily' })
visitorIdDaily?: Buffer | null;
/** Corp that owns the originating domain. FK → corps(id). */
@Column({ type: 'smallint', nullable: true, name: 'corp_id' })
corpId?: number | null;
/** Domain the event originated from. FK → domains(id). */
@Column({ type: 'integer', nullable: true, name: 'domain_id' })
domainId?: number | null;
}

View file

@ -1,28 +1,28 @@
import { Controller, Get, SetMetadata } from '@nestjs/common';
import { ApiTags, ApiOperation } from '@nestjs/swagger';
import {
HealthCheck,
HealthCheckService,
TypeOrmHealthIndicator,
} from '@nestjs/terminus';
import { IS_PUBLIC_KEY } from '../auth/write-key.guard';
import { RedisRouter } from '../tracking/redis-router.service';
/**
* Edge health. The collector has no database readiness means "can we accept
* an event", and we always can: a black-redis outage spools locally. So health
* is liveness plus observability into the spool (depth + whether black is
* currently reachable), which the outage drill reads.
*/
@ApiTags('Health')
@SetMetadata(IS_PUBLIC_KEY, true)
@Controller('health')
export class HealthController {
constructor(
private readonly health: HealthCheckService,
private readonly db: TypeOrmHealthIndicator,
) {}
constructor(private readonly router: RedisRouter) {}
@Get()
@HealthCheck()
@ApiOperation({ summary: 'Health check endpoint' })
check() {
return this.health.check([
() => this.db.pingCheck('database'),
]);
async check() {
return {
status: 'ok',
blackHealthy: this.router.isBlackHealthy(),
spoolDepth: await this.router.spoolDepth(),
};
}
@Get('live')
@ -32,11 +32,10 @@ export class HealthController {
}
@Get('ready')
@HealthCheck()
@ApiOperation({ summary: 'Readiness probe' })
ready() {
return this.health.check([
() => this.db.pingCheck('database'),
]);
// Always ready: the local spool guarantees we can durably accept events
// even when the canonical store is unreachable.
return { status: 'ok' };
}
}

View file

@ -1,9 +1,14 @@
import { Module } from '@nestjs/common';
import { TerminusModule } from '@nestjs/terminus';
import { HealthController } from './health.controller';
import { TrackingModule } from '../tracking/tracking.module';
/**
* Health module. Depends on TrackingModule for the {@link RedisRouter} so the
* health endpoint can report spool depth + black reachability. No DB indicator
* the edge has no database.
*/
@Module({
imports: [TerminusModule],
imports: [TrackingModule],
controllers: [HealthController],
})
export class HealthModule {}

View file

@ -1,487 +0,0 @@
import { Test } from '@nestjs/testing';
import { describe, it, expect, beforeEach } from 'vitest';
import { AttributionService } from './attribution.service';
describe('AttributionService', () => {
let service: AttributionService;
beforeEach(async () => {
const module = await Test.createTestingModule({
providers: [AttributionService],
}).compile();
service = module.get<AttributionService>(AttributionService);
});
describe('resolveTrafficSource', () => {
it('returns direct traffic when no attribution data', () => {
const result = service.resolveTrafficSource({});
expect(result.source).toBe('direct');
expect(result.rawSource).toBeNull();
expect(result.medium).toBeNull();
expect(result.campaign).toBeNull();
expect(result.referrer).toBeNull();
});
it('preserves all UTM parameters in result', () => {
const result = service.resolveTrafficSource({
utmSource: 'google',
utmMedium: 'cpc',
utmCampaign: 'winter-sale',
utmContent: 'ad-variant-a',
utmTerm: 'luxury+watches',
});
expect(result.rawSource).toBe('google');
expect(result.medium).toBe('cpc');
expect(result.campaign).toBe('winter-sale');
expect(result.content).toBe('ad-variant-a');
expect(result.term).toBe('luxury+watches');
});
});
describe('UTM parameter resolution', () => {
it('classifies paid search from utm_medium=cpc', () => {
const result = service.resolveTrafficSource({
utmSource: 'google',
utmMedium: 'cpc',
});
expect(result.source).toBe('paid');
});
it('classifies paid search from utm_medium=ppc', () => {
const result = service.resolveTrafficSource({
utmSource: 'google',
utmMedium: 'ppc',
});
expect(result.source).toBe('paid');
});
it('classifies paid search from utm_medium=paid', () => {
const result = service.resolveTrafficSource({
utmSource: 'google',
utmMedium: 'paid',
});
expect(result.source).toBe('paid');
});
it('classifies paid search from utm_medium=paidsearch', () => {
const result = service.resolveTrafficSource({
utmSource: 'google',
utmMedium: 'paidsearch',
});
expect(result.source).toBe('paid');
});
it('classifies email from utm_medium=email', () => {
const result = service.resolveTrafficSource({
utmSource: 'mailchimp',
utmMedium: 'email',
});
expect(result.source).toBe('email');
});
it('classifies email from utm_medium=newsletter', () => {
const result = service.resolveTrafficSource({
utmSource: 'campaign-monitor',
utmMedium: 'newsletter',
});
expect(result.source).toBe('email');
});
it('classifies social from utm_medium=social', () => {
const result = service.resolveTrafficSource({
utmSource: 'facebook',
utmMedium: 'social',
});
expect(result.source).toBe('social');
});
it('classifies social from utm_medium=social-media', () => {
const result = service.resolveTrafficSource({
utmSource: 'twitter',
utmMedium: 'social-media',
});
expect(result.source).toBe('social');
});
it('classifies affiliate from utm_medium=affiliate', () => {
const result = service.resolveTrafficSource({
utmSource: 'partner123',
utmMedium: 'affiliate',
});
expect(result.source).toBe('affiliate');
});
it('classifies affiliate from utm_medium=partner', () => {
const result = service.resolveTrafficSource({
utmSource: 'partner456',
utmMedium: 'partner',
});
expect(result.source).toBe('affiliate');
});
it('classifies organic from utm_medium=organic', () => {
const result = service.resolveTrafficSource({
utmSource: 'google',
utmMedium: 'organic',
});
expect(result.source).toBe('organic');
});
it('classifies referral from utm_medium=referral', () => {
const result = service.resolveTrafficSource({
utmSource: 'blog.example.com',
utmMedium: 'referral',
});
expect(result.source).toBe('referral');
});
});
describe('UTM source-based classification', () => {
it('classifies social from social platform utm_source', () => {
const result = service.resolveTrafficSource({
utmSource: 'facebook',
});
expect(result.source).toBe('social');
});
it('classifies social from twitter', () => {
const result = service.resolveTrafficSource({
utmSource: 'twitter',
});
expect(result.source).toBe('social');
});
it('classifies social from instagram', () => {
const result = service.resolveTrafficSource({
utmSource: 'instagram',
});
expect(result.source).toBe('social');
});
it('classifies social from linkedin', () => {
const result = service.resolveTrafficSource({
utmSource: 'linkedin',
});
expect(result.source).toBe('social');
});
it('classifies organic from search engine with no medium', () => {
const result = service.resolveTrafficSource({
utmSource: 'google',
});
expect(result.source).toBe('organic');
});
it('classifies paid from search engine with cpc medium', () => {
const result = service.resolveTrafficSource({
utmSource: 'google',
utmMedium: 'cpc',
});
expect(result.source).toBe('paid');
});
it('classifies referral from unknown utm_source', () => {
const result = service.resolveTrafficSource({
utmSource: 'blog.example.com',
});
expect(result.source).toBe('referral');
});
});
describe('referrer analysis', () => {
it('classifies social from facebook referrer', () => {
const result = service.resolveTrafficSource({
referrer: 'https://www.facebook.com/some-post',
});
expect(result.source).toBe('social');
expect(result.referrer).toBe('https://www.facebook.com/some-post');
});
it('classifies social from twitter referrer', () => {
const result = service.resolveTrafficSource({
referrer: 'https://t.co/abc123',
});
expect(result.source).toBe('social');
});
it('classifies social from x.com', () => {
const result = service.resolveTrafficSource({
referrer: 'https://x.com/user/status/123',
});
expect(result.source).toBe('social');
});
it('classifies social from instagram', () => {
const result = service.resolveTrafficSource({
referrer: 'https://www.instagram.com/p/abc123/',
});
expect(result.source).toBe('social');
});
it('classifies social from linkedin', () => {
const result = service.resolveTrafficSource({
referrer: 'https://www.linkedin.com/feed/',
});
expect(result.source).toBe('social');
});
it('classifies social from reddit', () => {
const result = service.resolveTrafficSource({
referrer: 'https://www.reddit.com/r/programming/',
});
expect(result.source).toBe('social');
});
it('classifies social from youtube', () => {
const result = service.resolveTrafficSource({
referrer: 'https://www.youtube.com/watch?v=abc123',
});
expect(result.source).toBe('social');
});
it('classifies social from tiktok', () => {
const result = service.resolveTrafficSource({
referrer: 'https://www.tiktok.com/@user/video/123',
});
expect(result.source).toBe('social');
});
it('classifies organic from google search', () => {
const result = service.resolveTrafficSource({
referrer: 'https://www.google.com/search?q=test',
});
expect(result.source).toBe('organic');
});
it('classifies organic from bing', () => {
const result = service.resolveTrafficSource({
referrer: 'https://www.bing.com/search?q=test',
});
expect(result.source).toBe('organic');
});
it('classifies organic from duckduckgo', () => {
const result = service.resolveTrafficSource({
referrer: 'https://duckduckgo.com/?q=test',
});
expect(result.source).toBe('organic');
});
it('classifies organic from yahoo', () => {
const result = service.resolveTrafficSource({
referrer: 'https://search.yahoo.com/search?p=test',
});
expect(result.source).toBe('organic');
});
it('classifies referral from unknown domain', () => {
const result = service.resolveTrafficSource({
referrer: 'https://blog.example.com/article',
});
expect(result.source).toBe('referral');
});
it('handles referrer without www prefix', () => {
const result = service.resolveTrafficSource({
referrer: 'https://facebook.com/page',
});
expect(result.source).toBe('social');
});
it('handles invalid referrer URL gracefully', () => {
const result = service.resolveTrafficSource({
referrer: 'not-a-valid-url',
});
expect(result.source).toBe('direct');
});
});
describe('priority resolution', () => {
it('prioritizes UTM parameters over referrer', () => {
const result = service.resolveTrafficSource({
utmSource: 'newsletter',
utmMedium: 'email',
referrer: 'https://www.facebook.com/',
});
expect(result.source).toBe('email');
});
it('uses referrer when UTM parameters are missing', () => {
const result = service.resolveTrafficSource({
referrer: 'https://www.google.com/search',
});
expect(result.source).toBe('organic');
});
it('returns direct when both UTM and referrer are missing', () => {
const result = service.resolveTrafficSource({});
expect(result.source).toBe('direct');
});
});
describe('edge cases', () => {
it('handles empty string UTM parameters', () => {
const result = service.resolveTrafficSource({
utmSource: '',
utmMedium: '',
referrer: '',
});
expect(result.source).toBe('direct');
});
it('handles case-insensitive UTM medium', () => {
const result1 = service.resolveTrafficSource({ utmMedium: 'CPC' });
const result2 = service.resolveTrafficSource({ utmMedium: 'Cpc' });
const result3 = service.resolveTrafficSource({ utmMedium: 'cpc' });
expect(result1.source).toBe('paid');
expect(result2.source).toBe('paid');
expect(result3.source).toBe('paid');
});
it('handles case-insensitive UTM source', () => {
const result1 = service.resolveTrafficSource({ utmSource: 'FACEBOOK' });
const result2 = service.resolveTrafficSource({ utmSource: 'Facebook' });
const result3 = service.resolveTrafficSource({ utmSource: 'facebook' });
expect(result1.source).toBe('social');
expect(result2.source).toBe('social');
expect(result3.source).toBe('social');
});
it('handles subdomains in referrer', () => {
const result = service.resolveTrafficSource({
referrer: 'https://subdomain.facebook.com/page',
});
expect(result.source).toBe('social');
});
it('handles country-specific Google domains', () => {
const resultUK = service.resolveTrafficSource({
referrer: 'https://www.google.co.uk/search',
});
const resultDE = service.resolveTrafficSource({
referrer: 'https://www.google.de/search',
});
expect(resultUK.source).toBe('organic');
expect(resultDE.source).toBe('organic');
});
it('handles Yandex with subdomain', () => {
const result = service.resolveTrafficSource({
referrer: 'https://yandex.ru/search',
});
expect(result.source).toBe('organic');
});
});
describe('complex scenarios', () => {
it('handles paid social campaign', () => {
const result = service.resolveTrafficSource({
utmSource: 'facebook',
utmMedium: 'cpc',
utmCampaign: 'summer-promo',
utmContent: 'ad-creative-1',
});
expect(result.source).toBe('paid');
expect(result.rawSource).toBe('facebook');
expect(result.medium).toBe('cpc');
expect(result.campaign).toBe('summer-promo');
expect(result.content).toBe('ad-creative-1');
});
it('handles organic social without UTM', () => {
const result = service.resolveTrafficSource({
referrer: 'https://twitter.com/user/status/123',
});
expect(result.source).toBe('social');
expect(result.rawSource).toBeNull();
expect(result.medium).toBeNull();
});
it('handles email campaign with full UTM', () => {
const result = service.resolveTrafficSource({
utmSource: 'mailchimp',
utmMedium: 'email',
utmCampaign: 'weekly-digest',
utmContent: 'header-cta',
});
expect(result.source).toBe('email');
expect(result.rawSource).toBe('mailchimp');
expect(result.medium).toBe('email');
expect(result.campaign).toBe('weekly-digest');
expect(result.content).toBe('header-cta');
});
it('handles affiliate partnership', () => {
const result = service.resolveTrafficSource({
utmSource: 'partner-site',
utmMedium: 'affiliate',
utmCampaign: 'q1-partnership',
});
expect(result.source).toBe('affiliate');
expect(result.rawSource).toBe('partner-site');
});
it('handles branded search campaign', () => {
const result = service.resolveTrafficSource({
utmSource: 'google',
utmMedium: 'cpc',
utmCampaign: 'brand-protection',
utmTerm: 'company+name',
});
expect(result.source).toBe('paid');
expect(result.term).toBe('company+name');
});
});
});

View file

@ -0,0 +1,252 @@
import { Injectable } from '@nestjs/common';
import { randomUUID } from 'crypto';
import type { Request } from 'express';
import type {
RawEventEnvelope,
NormalizedEvent,
EdgeContext,
IngestKind,
} from '@lilith/analytics';
import { RedisRouter } from './redis-router.service';
import { TrackViewDto } from '../dto/track-view.dto';
import { TrackEngagementDto } from '../dto/track-engagement.dto';
import { TrackInteractionBatchDto } from '../dto/track-interaction.dto';
import {
TrackEventDto,
TrackBatchDto,
TrackConversionDto,
TrackFunnelStepDto,
TrackRegistrationFunnelDto,
} from '../dto/track-event.dto';
export interface CaptureResult {
success: boolean;
eventId: string | null;
}
export interface BatchCaptureResult {
success: boolean;
count: number;
}
/**
* Edge capture. Validates already happened at the controller (class-validator);
* here we do the pure per-endpoint normalization that used to live in the
* collector's TrackingService/controller, mint an edge event id + receive time,
* and durably enqueue via {@link RedisRouter}. No database, no enrichment all
* of that is deferred to the black-side ingest-writer.
*/
@Injectable()
export class CaptureService {
constructor(private readonly router: RedisRouter) {}
async trackView(dto: TrackViewDto, request: Request): Promise<CaptureResult> {
const pageUrl = dto.pageUrl ?? dto.contentId ?? '';
return this.one('view', request, {
eventType: 'pageview',
sessionId: dto.sessionId,
userId: dto.userId ?? null,
pageUrl,
referrer: dto.referrer ?? null,
isView: true,
clientDevice: (dto.clientDevice as Record<string, unknown> | undefined) ?? null,
attribution: dto.attribution
? {
utmSource: dto.attribution.utmSource,
utmMedium: dto.attribution.utmMedium,
utmCampaign: dto.attribution.utmCampaign,
utmContent: dto.attribution.utmContent,
utmTerm: dto.attribution.utmTerm,
referrer: dto.attribution.referrer ?? dto.referrer,
}
: { referrer: dto.referrer },
metadata: {
...dto.metadata,
...(dto.contentType ? { contentType: dto.contentType } : {}),
...(dto.app ? { app: dto.app } : {}),
...(dto.duration !== undefined ? { duration: dto.duration } : {}),
},
});
}
async trackEngagement(dto: TrackEngagementDto, request: Request): Promise<CaptureResult> {
return this.one('engagement', request, {
eventType: `engagement_${dto.metricType}`,
sessionId: dto.userId, // engagement events are user-scoped
userId: dto.userId,
isView: false,
metadata: {
metricType: dto.metricType,
targetId: dto.targetId,
targetType: dto.targetType,
...dto.metadata,
},
});
}
async trackInteraction(dto: TrackInteractionBatchDto, request: Request): Promise<BatchCaptureResult> {
return this.many(
'interaction',
request,
dto.events.map((event) => ({
eventType: event.type,
sessionId: event.sessionId,
userId: event.userId ?? null,
metadata: event.data,
timestamp: event.timestamp ?? null,
isView: false,
})),
);
}
async trackEvent(dto: TrackEventDto, request: Request): Promise<CaptureResult> {
return this.one('event', request, {
eventType: dto.eventType,
sessionId: dto.sessionId,
userId: dto.userId ?? null,
pageUrl: dto.pageUrl ?? null,
metadata: dto.metadata ?? null,
timestamp: dto.timestamp ?? null,
isView: false,
});
}
async trackBatch(dto: TrackBatchDto, request: Request): Promise<BatchCaptureResult> {
return this.many(
'batch',
request,
dto.events.map((e) => ({
eventType: e.eventType,
sessionId: e.sessionId,
userId: e.userId ?? null,
pageUrl: e.pageUrl ?? null,
metadata: e.metadata ?? null,
timestamp: e.timestamp ?? null,
isView: false,
})),
);
}
async trackConversion(dto: TrackConversionDto, request: Request): Promise<CaptureResult> {
return this.one('conversion', request, {
eventType: 'conversion',
sessionId: dto.sessionId,
userId: dto.userId ?? null,
isView: false,
metadata: {
conversionType: dto.conversionType,
value: dto.value,
currency: dto.currency,
...dto.metadata,
},
});
}
async trackFunnelStep(dto: TrackFunnelStepDto, request: Request): Promise<CaptureResult> {
return this.one('funnel', request, {
eventType: 'funnel_step',
sessionId: dto.sessionId,
userId: dto.userId ?? null,
isView: false,
metadata: {
funnelId: dto.funnelId,
stepId: dto.stepId,
stepOrder: dto.stepOrder,
...dto.metadata,
},
});
}
async trackRegistrationFunnel(
dto: TrackRegistrationFunnelDto,
request: Request,
): Promise<CaptureResult> {
const parsed = Date.parse(dto.timestamp);
return this.one('registration-funnel', request, {
eventType: `registration_funnel_${dto.event}`,
sessionId: dto.sessionId,
userId: dto.userId ?? null,
pageUrl: dto.route ?? null,
timestamp: Number.isNaN(parsed) ? null : parsed,
isView: false,
metadata: {
audience: dto.audience,
userType: dto.userType,
registrationPath: dto.registrationPath,
registrationType: dto.registrationType,
entryReferrer: dto.entryReferrer,
referrer: dto.referrer,
},
});
}
// ── internals ──────────────────────────────────────────────────────────────
private async one(
kind: IngestKind,
request: Request,
event: NormalizedEvent,
): Promise<CaptureResult> {
const eventId = randomUUID();
await this.router.enqueue(this.envelope(eventId, kind, request, event));
return { success: true, eventId };
}
private async many(
kind: IngestKind,
request: Request,
events: NormalizedEvent[],
): Promise<BatchCaptureResult> {
const edge = this.buildEdge(request);
const receivedAt = new Date().toISOString();
const results = await Promise.allSettled(
events.map((event) =>
this.router.enqueue({
eventId: randomUUID(),
kind,
receivedAt,
edge,
payload: event,
}),
),
);
const count = results.filter((r) => r.status === 'fulfilled').length;
return { success: count === events.length, count };
}
private envelope(
eventId: string,
kind: IngestKind,
request: Request,
event: NormalizedEvent,
): RawEventEnvelope {
return {
eventId,
kind,
receivedAt: new Date().toISOString(),
edge: this.buildEdge(request),
payload: event,
};
}
private buildEdge(request: Request): EdgeContext {
const header = (name: string): string | undefined => {
const v = request.headers[name];
return Array.isArray(v) ? v[0] : v;
};
const forwardedFor = header('x-forwarded-for');
const ip =
forwardedFor?.split(',')[0]?.trim() ?? header('x-real-ip') ?? request.socket.remoteAddress;
return {
ip: ip ?? undefined,
ua: header('user-agent'),
lang: header('accept-language'),
headers: {
origin: header('origin'),
referer: header('referer'),
host: header('host'),
},
};
}
}

View file

@ -1,431 +0,0 @@
import { Test } from '@nestjs/testing';
import { describe, it, expect, beforeEach, vi } from 'vitest';
import { DeviceEnrichmentService } from './device-enrichment.service';
import { GovDetectionService } from './gov-detection.service';
const mockGovDetection = {
enrich: vi.fn().mockResolvedValue({
isGovernment: false,
orgType: 'NORMAL',
responseTier: 'ALLOW',
proxyType: 'NONE',
org: null,
asn: null,
}),
onModuleInit: vi.fn(),
};
describe('DeviceEnrichmentService', () => {
let service: DeviceEnrichmentService;
beforeEach(async () => {
const module = await Test.createTestingModule({
providers: [
DeviceEnrichmentService,
{ provide: GovDetectionService, useValue: mockGovDetection },
],
}).compile();
service = module.get<DeviceEnrichmentService>(DeviceEnrichmentService);
});
describe('enrich', () => {
it('enriches device data from User-Agent and client data', async () => {
const result = await service.enrich(
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/120.0.0.0',
'192.168.1.1',
{
screenWidth: 1920,
screenHeight: 1080,
viewportWidth: 1440,
viewportHeight: 900,
language: 'en-US',
timezone: 'America/New_York',
},
);
expect(result.deviceType).toBe('desktop');
expect(result.isBot).toBe(false);
expect(result.browser).toBe('Chrome');
expect(result.browserVersion).toBe('120.0');
expect(result.browserMajor).toBe(120);
expect(result.os).toBe('macOS');
expect(result.osVersion).toBe('10.15.7');
expect(result.screenWidth).toBe(1920);
expect(result.screenHeight).toBe(1080);
expect(result.viewportWidth).toBe(1440);
expect(result.viewportHeight).toBe(900);
expect(result.language).toBe('en-US');
expect(result.timezone).toBe('America/New_York');
expect(result.ipHash).toBeTruthy();
});
it('handles missing User-Agent gracefully', async () => {
const result = await service.enrich(undefined, undefined, {
screenWidth: 1024,
screenHeight: 768,
});
expect(result.deviceType).toBe('desktop');
expect(result.isBot).toBe(false);
expect(result.browser).toBeNull();
expect(result.browserVersion).toBeNull();
expect(result.os).toBeNull();
expect(result.osVersion).toBeNull();
expect(result.ipHash).toBeNull();
});
it('hashes IP address for privacy', async () => {
const result1 = await service.enrich(undefined, '192.168.1.1');
const result2 = await service.enrich(undefined, '192.168.1.2');
const result3 = await service.enrich(undefined, '192.168.1.1');
expect(result1.ipHash).toBeTruthy();
expect(result2.ipHash).toBeTruthy();
expect(result1.ipHash).not.toBe(result2.ipHash);
expect(result1.ipHash).toBe(result3.ipHash); // Same IP on same day = same hash
});
});
describe('parseUserAgent', () => {
it('parses Chrome User-Agent correctly', async () => {
const result = await service.enrich(
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36',
);
expect(result.browser).toBe('Chrome');
expect(result.browserVersion).toBe('120.0');
expect(result.browserMajor).toBe(120);
expect(result.os).toBe('Windows');
expect(result.osVersion).toBe('10/11');
});
it('parses Firefox User-Agent correctly', async () => {
const result = await service.enrich(
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:122.0) Gecko/20100101 Firefox/122.0',
);
expect(result.browser).toBe('Firefox');
expect(result.browserVersion).toBe('122.0');
expect(result.browserMajor).toBe(122);
expect(result.os).toBe('Windows');
});
it('parses Safari User-Agent correctly', async () => {
const result = await service.enrich(
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15',
);
expect(result.browser).toBe('Safari');
expect(result.browserVersion).toBe('17.2');
expect(result.browserMajor).toBe(17);
expect(result.os).toBe('macOS');
expect(result.osVersion).toBe('10.15.7');
});
it('parses Edge User-Agent correctly', async () => {
const result = await service.enrich(
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Edge/120.0.0.0',
);
expect(result.browser).toBe('Edge');
expect(result.browserVersion).toBe('120.0');
expect(result.browserMajor).toBe(120);
});
it('parses mobile Safari User-Agent correctly', async () => {
const result = await service.enrich(
'Mozilla/5.0 (iPhone; CPU iPhone OS 17_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Mobile/15E148 Safari/604.1',
);
expect(result.browser).toBe('Safari');
expect(result.os).toBe('iOS');
expect(result.osVersion).toBe('17.2');
expect(result.deviceType).toBe('mobile');
});
it('parses Android Chrome User-Agent correctly', async () => {
const result = await service.enrich(
'Mozilla/5.0 (Linux; Android 14; Pixel 7) AppleWebKit/537.36 Chrome/120.0.0.0 Mobile Safari/537.36',
);
expect(result.browser).toBe('Chrome');
expect(result.os).toBe('Android');
expect(result.osVersion).toBe('14');
expect(result.deviceType).toBe('mobile');
});
it('handles unknown User-Agent', async () => {
const result = await service.enrich(
'UnknownBot/1.0',
);
expect(result.browser).toBeNull();
expect(result.browserVersion).toBeNull();
expect(result.os).toBeNull();
expect(result.osVersion).toBeNull();
});
});
describe('detectDeviceType', () => {
it('detects desktop from no touch points', async () => {
const result = await service.enrich(
'Mozilla/5.0 (Windows NT 10.0) Chrome/120.0.0.0',
undefined,
{
touchPoints: 0,
viewportWidth: 1920,
},
);
expect(result.deviceType).toBe('desktop');
});
it('detects mobile from touch points and small viewport', async () => {
const result = await service.enrich(
'Mozilla/5.0 (iPhone) Safari/604.1',
undefined,
{
touchPoints: 5,
viewportWidth: 375,
},
);
expect(result.deviceType).toBe('mobile');
});
it('detects tablet from touch points and large viewport', async () => {
const result = await service.enrich(
'Mozilla/5.0 (iPad) Safari/604.1',
undefined,
{
touchPoints: 5,
viewportWidth: 1024,
},
);
expect(result.deviceType).toBe('tablet');
});
it('detects tablet from iPad User-Agent', async () => {
const result = await service.enrich(
'Mozilla/5.0 (iPad; CPU OS 17_2 like Mac OS X) AppleWebKit/605.1.15',
);
expect(result.deviceType).toBe('tablet');
});
it('detects mobile from iPhone User-Agent', async () => {
const result = await service.enrich(
'Mozilla/5.0 (iPhone; CPU iPhone OS 17_2 like Mac OS X) Mobile/15E148',
);
expect(result.deviceType).toBe('mobile');
});
it('detects mobile from Android Mobile User-Agent', async () => {
const result = await service.enrich(
'Mozilla/5.0 (Linux; Android 14) Mobile Safari/537.36',
);
expect(result.deviceType).toBe('mobile');
});
it('detects tablet from Android without Mobile keyword', async () => {
const result = await service.enrich(
'Mozilla/5.0 (Linux; Android 14; Tablet) AppleWebKit/537.36',
);
expect(result.deviceType).toBe('tablet');
});
});
describe('detectBot', () => {
it('detects Googlebot', async () => {
const result = await service.enrich(
'Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)',
);
expect(result.isBot).toBe(true);
expect(result.deviceType).toBe('bot');
});
it('detects Bingbot', async () => {
const result = await service.enrich(
'Mozilla/5.0 (compatible; bingbot/2.0; +http://www.bing.com/bingbot.htm)',
);
expect(result.isBot).toBe(true);
expect(result.deviceType).toBe('bot');
});
it('detects generic crawlers', async () => {
const result = await service.enrich('SomeCrawler/1.0');
expect(result.isBot).toBe(true);
expect(result.deviceType).toBe('bot');
});
it('detects headless browsers', async () => {
const result = await service.enrich('HeadlessChrome/120.0.0.0');
expect(result.isBot).toBe(true);
expect(result.deviceType).toBe('bot');
});
it('detects Puppeteer', async () => {
const result = await service.enrich('Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 HeadlessChrome/120.0.0.0 Safari/537.36 Puppeteer');
expect(result.isBot).toBe(true);
expect(result.deviceType).toBe('bot');
});
it('detects Playwright', async () => {
const result = await service.enrich('Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36 Playwright');
expect(result.isBot).toBe(true);
expect(result.deviceType).toBe('bot');
});
it('detects Selenium', async () => {
const result = await service.enrich('Mozilla/5.0 (Windows NT 10.0) Chrome/120.0.0.0 Selenium');
expect(result.isBot).toBe(true);
expect(result.deviceType).toBe('bot');
});
it('detects social media bots', async () => {
const facebookBot = await service.enrich('facebookexternalhit/1.1');
const twitterBot = await service.enrich('Twitterbot/1.0');
const linkedinBot = await service.enrich('LinkedInBot/1.0');
expect(facebookBot.isBot).toBe(true);
expect(twitterBot.isBot).toBe(true);
expect(linkedinBot.isBot).toBe(true);
});
it('does not detect normal browsers as bots', async () => {
const chrome = await service.enrich('Mozilla/5.0 (Windows NT 10.0) Chrome/120.0.0.0');
const firefox = await service.enrich('Mozilla/5.0 (Windows NT 10.0) Firefox/122.0');
const safari = await service.enrich('Mozilla/5.0 (Macintosh) Safari/605.1.15');
expect(chrome.isBot).toBe(false);
expect(firefox.isBot).toBe(false);
expect(safari.isBot).toBe(false);
});
});
describe('mapWindowsVersion', () => {
it('maps Windows NT 10.0 to Windows 10/11', async () => {
const result = await service.enrich('Mozilla/5.0 (Windows NT 10.0)');
expect(result.osVersion).toBe('10/11');
});
it('maps Windows NT 6.3 to Windows 8.1', async () => {
const result = await service.enrich('Mozilla/5.0 (Windows NT 6.3)');
expect(result.osVersion).toBe('8.1');
});
it('maps Windows NT 6.2 to Windows 8', async () => {
const result = await service.enrich('Mozilla/5.0 (Windows NT 6.2)');
expect(result.osVersion).toBe('8');
});
it('maps Windows NT 6.1 to Windows 7', async () => {
const result = await service.enrich('Mozilla/5.0 (Windows NT 6.1)');
expect(result.osVersion).toBe('7');
});
it('handles unknown Windows NT version', async () => {
const result = await service.enrich('Mozilla/5.0 (Windows NT 5.0)');
expect(result.osVersion).toBe('5.0');
});
});
describe('OS detection', () => {
it('detects macOS with version', async () => {
const result = await service.enrich('Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)');
expect(result.os).toBe('macOS');
expect(result.osVersion).toBe('10.15.7');
});
it('detects Linux', async () => {
const result = await service.enrich('Mozilla/5.0 (X11; Linux x86_64)');
expect(result.os).toBe('Linux');
});
it('detects iOS with version', async () => {
const result = await service.enrich('Mozilla/5.0 (iPhone; CPU iPhone OS 17_2 like Mac OS X)');
expect(result.os).toBe('iOS');
expect(result.osVersion).toBe('17.2');
});
it('detects Android with version', async () => {
const result = await service.enrich('Mozilla/5.0 (Linux; Android 14)');
expect(result.os).toBe('Android');
expect(result.osVersion).toBe('14');
});
it('handles Android version with decimals', async () => {
const result = await service.enrich('Mozilla/5.0 (Linux; Android 13.5)');
expect(result.os).toBe('Android');
expect(result.osVersion).toBe('13.5');
});
});
describe('client device data preservation', () => {
it('preserves all client device data', async () => {
const clientData = {
screenWidth: 1920,
screenHeight: 1080,
viewportWidth: 1440,
viewportHeight: 900,
pixelRatio: 2,
colorDepth: 24,
language: 'en-US',
languages: ['en-US', 'en', 'es'],
timezone: 'America/New_York',
timezoneOffset: -300,
deviceMemory: 8,
hardwareConcurrency: 8,
touchPoints: 0,
cookiesEnabled: true,
doNotTrack: false,
};
const result = await service.enrich(
'Mozilla/5.0 (Macintosh) Chrome/120.0.0.0',
undefined,
clientData,
);
expect(result.screenWidth).toBe(1920);
expect(result.screenHeight).toBe(1080);
expect(result.viewportWidth).toBe(1440);
expect(result.viewportHeight).toBe(900);
expect(result.pixelRatio).toBe(2);
expect(result.colorDepth).toBe(24);
expect(result.language).toBe('en-US');
expect(result.languages).toEqual(['en-US', 'en', 'es']);
expect(result.timezone).toBe('America/New_York');
expect(result.timezoneOffset).toBe(-300);
expect(result.deviceMemory).toBe(8);
expect(result.hardwareConcurrency).toBe(8);
expect(result.touchPoints).toBe(0);
expect(result.cookiesEnabled).toBe(true);
expect(result.doNotTrack).toBe(false);
});
it('handles missing client device data', async () => {
const result = await service.enrich('Mozilla/5.0 (Macintosh) Chrome/120.0.0.0');
expect(result.screenWidth).toBeUndefined();
expect(result.screenHeight).toBeUndefined();
expect(result.viewportWidth).toBeUndefined();
expect(result.viewportHeight).toBeUndefined();
expect(result.pixelRatio).toBeUndefined();
expect(result.colorDepth).toBeUndefined();
});
});
});

View file

@ -1,16 +1,5 @@
export { TrackingModule } from './tracking.module';
export { TrackingService } from './tracking.service';
export { TrackingController } from './tracking.controller';
export { DeviceEnrichmentService } from './device-enrichment.service';
export { AttributionService } from './attribution.service';
export type {
ClientDeviceData,
EnrichedDeviceData,
} from './device-enrichment.service';
export type {
AttributionInput,
ResolvedAttribution,
} from './attribution.service';
export { IdentityService } from "./identity.service";
export { DomainResolverService } from "./domain-resolver.service";
export type { ResolvedDomain } from "./domain-resolver.service";
export { CaptureService } from './capture.service';
export type { CaptureResult, BatchCaptureResult } from './capture.service';
export { RedisRouter } from './redis-router.service';

View file

@ -0,0 +1,195 @@
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Queue, Worker, type ConnectionOptions } from 'bullmq';
import { Redis } from 'ioredis';
import { EVENTS_QUEUE, INGEST_EVENT_JOB, type RawEventEnvelope } from '@lilith/analytics';
/**
* Dual-redis router the durability boundary for the edge collector.
*
* Normal path: enqueue the envelope to the **primary** queue on black's redis,
* where the ingest-writer consumes it. When black is unreachable, enqueue to a
* **local spool** queue on the VPS's own redis (appendonly, durable). This is
* the "small local recording that only grows when black is unreachable."
*
* A health probe pings black's redis; on recovery it resumes a drain worker
* that forwards every spooled envelope to black. The envelope's `eventId` is
* the BullMQ `jobId` on both queues, so a forward that overlaps a direct enqueue
* is deduplicated and the black-side `ON CONFLICT (id) DO NOTHING` makes the
* whole pipeline idempotent.
*/
@Injectable()
export class RedisRouter implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(RedisRouter.name);
private primaryQueue!: Queue;
private spoolQueue!: Queue;
private drainer!: Worker;
private probe!: Redis;
private healthTimer?: ReturnType<typeof setInterval>;
/** Pessimistic until the first successful probe — never assume black is up. */
private blackHealthy = false;
private readonly probeIntervalMs: number;
constructor(private readonly config: ConfigService) {
this.probeIntervalMs = Number(this.config.get('BLACK_REDIS_PROBE_MS', 5000));
}
onModuleInit(): void {
const primaryConn = this.blackConnection();
const spoolConn = this.localConnection();
this.primaryQueue = new Queue(EVENTS_QUEUE, {
connection: { ...primaryConn, enableOfflineQueue: false },
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: 5000,
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
},
});
this.spoolQueue = new Queue(EVENTS_QUEUE, {
connection: spoolConn,
defaultJobOptions: {
removeOnComplete: true,
// Keep failed forwards around so a long outage never loses events.
removeOnFail: false,
attempts: Number.MAX_SAFE_INTEGER,
backoff: { type: 'exponential', delay: 2000 },
},
});
// Drains the local spool to black. Created paused; the probe resumes it
// only when black is reachable, so it never spins against a dead primary.
this.drainer = new Worker(
EVENTS_QUEUE,
async (job) => {
const envelope = job.data as RawEventEnvelope;
await this.primaryQueue.add(INGEST_EVENT_JOB, envelope, { jobId: envelope.eventId });
},
{ connection: spoolConn, concurrency: 5 },
);
this.drainer.on('error', (err) => this.logger.debug(`drainer error: ${err.message}`));
void this.drainer.pause();
this.probe = new Redis({
...this.redisOptions(primaryConn),
lazyConnect: false,
enableOfflineQueue: false,
maxRetriesPerRequest: 1,
connectTimeout: 3000,
});
this.probe.on('error', () => {
/* state is driven by the probe loop; swallow connection noise here */
});
this.startHealthLoop();
this.logger.log(
`RedisRouter up — primary=${this.describe(primaryConn)} spool=${this.describe(spoolConn)}`,
);
}
async onModuleDestroy(): Promise<void> {
if (this.healthTimer) clearInterval(this.healthTimer);
await Promise.allSettled([
this.drainer?.close(),
this.primaryQueue?.close(),
this.spoolQueue?.close(),
this.probe?.quit(),
]);
}
/**
* Durably enqueue one envelope. Resolves once redis (primary or spool) has
* accepted it; rejects only if BOTH are unreachable, so the caller can return
* a 5xx and the client/relay can retry rather than silently dropping.
*/
async enqueue(envelope: RawEventEnvelope): Promise<'primary' | 'spool'> {
if (this.blackHealthy) {
try {
await this.primaryQueue.add(INGEST_EVENT_JOB, envelope, { jobId: envelope.eventId });
return 'primary';
} catch (err) {
this.markUnhealthy(err);
// fall through to the local spool
}
}
await this.spoolQueue.add(INGEST_EVENT_JOB, envelope, { jobId: envelope.eventId });
return 'spool';
}
/** Current depth of the local outage spool (waiting + delayed). */
async spoolDepth(): Promise<number> {
const counts = await this.spoolQueue.getJobCounts('waiting', 'delayed', 'active', 'failed');
return (counts.waiting ?? 0) + (counts.delayed ?? 0) + (counts.active ?? 0) + (counts.failed ?? 0);
}
isBlackHealthy(): boolean {
return this.blackHealthy;
}
private startHealthLoop(): void {
const tick = async (): Promise<void> => {
try {
const pong = await this.probe.ping();
if (pong === 'PONG') await this.markHealthy();
} catch (err) {
this.markUnhealthy(err);
}
};
void tick();
this.healthTimer = setInterval(() => void tick(), this.probeIntervalMs);
}
private async markHealthy(): Promise<void> {
if (this.blackHealthy) return;
this.blackHealthy = true;
this.logger.log('Black redis reachable — resuming spool drain');
try {
this.drainer.resume();
} catch (err) {
this.logger.warn(`Failed to resume drainer: ${this.msg(err)}`);
}
}
private markUnhealthy(err: unknown): void {
if (!this.blackHealthy) return;
this.blackHealthy = false;
this.logger.warn(`Black redis unreachable — spooling locally: ${this.msg(err)}`);
void this.drainer.pause();
}
private blackConnection(): ConnectionOptions {
const password = this.config.get<string>('BLACK_REDIS_PASSWORD');
return {
host: this.config.get<string>('BLACK_REDIS_HOST', 'black'),
port: Number(this.config.get('BLACK_REDIS_PORT', 26381)),
...(password ? { password } : {}),
};
}
private localConnection(): ConnectionOptions {
const password = this.config.get<string>('REDIS_PASSWORD');
return {
host: this.config.get<string>('REDIS_HOST', 'localhost'),
port: Number(this.config.get('REDIS_PORT', 6379)),
...(password ? { password } : {}),
};
}
/** Narrow a BullMQ ConnectionOptions object to ioredis options for the probe. */
private redisOptions(conn: ConnectionOptions): Record<string, unknown> {
return conn as Record<string, unknown>;
}
private describe(conn: ConnectionOptions): string {
const c = conn as { host?: string; port?: number };
return `${c.host}:${c.port}`;
}
private msg(err: unknown): string {
return err instanceof Error ? err.message : String(err);
}
}

View file

@ -12,11 +12,17 @@ import {
TrackFunnelStepDto,
TrackRegistrationFunnelDto,
} from '../dto/track-event.dto';
import { TrackingService, TrackingResult, BatchTrackingResult } from './tracking.service';
import { CaptureService, CaptureResult, BatchCaptureResult } from './capture.service';
/**
* Event Collection Controller
* Analytics event ingestion endpoints
* Analytics event ingestion endpoints.
*
* The controller validates (class-validator) and forwards to the edge
* {@link CaptureService}, which normalizes and durably enqueues each event.
* No database or enrichment happens here those run on the black-side
* ingest-writer. A 2xx means the event was durably accepted into redis
* (black's queue, or the local spool during a black outage), not dropped.
*
* Endpoints:
* - POST /track/view - Page view with device fingerprinting
@ -24,43 +30,25 @@ import { TrackingService, TrackingResult, BatchTrackingResult } from './tracking
* - POST /track/interaction - Batched low-level interactions (click, scroll, funnel_step, resize)
* - POST /track/event - Single generic event
* - POST /track/batch - Batch of generic events
* - POST /track/conversion - Conversion event (higher priority)
* - POST /track/conversion - Conversion event
* - POST /track/funnel - Funnel step event
* - POST /track/registration-funnel - Registration funnel event (Lilith-specific)
*/
@ApiTags('Event Collection')
@Controller('track')
export class TrackingController {
constructor(private readonly trackingService: TrackingService) {}
constructor(private readonly capture: CaptureService) {}
@Post('view')
@ApiOperation({
summary: 'Track page view',
description: 'Records a page view with device fingerprinting and first-touch attribution. Accepts both collector shape (pageUrl) and analytics-client shape (contentId + contentType).',
description:
'Records a page view with device fingerprinting and first-touch attribution. Accepts both collector shape (pageUrl) and analytics-client shape (contentId + contentType).',
})
@ApiResponse({ status: 201, description: 'View tracked successfully' })
@ApiResponse({ status: 201, description: 'View accepted' })
@ApiResponse({ status: 400, description: 'Invalid request data' })
async trackView(@Body() dto: TrackViewDto, @Req() request: Request): Promise<TrackingResult> {
// Normalise: analytics-client sends contentId as the page URL
const pageUrl = dto.pageUrl ?? dto.contentId ?? '';
return this.trackingService.trackView(
{
pageUrl,
referrer: dto.referrer,
userId: dto.userId,
sessionId: dto.sessionId,
metadata: {
...dto.metadata,
...(dto.contentType ? { contentType: dto.contentType } : {}),
...(dto.app ? { app: dto.app } : {}),
...(dto.duration !== undefined ? { duration: dto.duration } : {}),
},
clientDevice: dto.clientDevice,
attribution: dto.attribution,
},
request,
);
async trackView(@Body() dto: TrackViewDto, @Req() request: Request): Promise<CaptureResult> {
return this.capture.trackView(dto, request);
}
@Post('engagement')
@ -68,20 +56,13 @@ export class TrackingController {
summary: 'Track engagement event',
description: 'Records a meaningful user interaction (like, comment, subscribe, purchase)',
})
@ApiResponse({ status: 201, description: 'Engagement tracked successfully' })
@ApiResponse({ status: 201, description: 'Engagement accepted' })
@ApiResponse({ status: 400, description: 'Invalid request data' })
async trackEngagement(@Body() dto: TrackEngagementDto): Promise<TrackingResult> {
return this.trackingService.trackEvent({
eventType: `engagement_${dto.metricType}`,
sessionId: dto.userId, // engagement events are user-scoped
userId: dto.userId,
metadata: {
metricType: dto.metricType,
targetId: dto.targetId,
targetType: dto.targetType,
...dto.metadata,
},
});
async trackEngagement(
@Body() dto: TrackEngagementDto,
@Req() request: Request,
): Promise<CaptureResult> {
return this.capture.trackEngagement(dto, request);
}
@Post('interaction')
@ -89,18 +70,13 @@ export class TrackingController {
summary: 'Track interaction events (batched)',
description: 'Records a batch of low-level interactions (clicks, scrolls, funnel steps, resizes)',
})
@ApiResponse({ status: 201, description: 'Interactions tracked successfully' })
@ApiResponse({ status: 201, description: 'Interactions accepted' })
@ApiResponse({ status: 400, description: 'Invalid request data' })
async trackInteraction(@Body() dto: TrackInteractionBatchDto): Promise<BatchTrackingResult> {
return this.trackingService.trackBatch(
dto.events.map((event) => ({
eventType: event.type,
sessionId: event.sessionId,
userId: event.userId,
metadata: event.data,
timestamp: event.timestamp,
})),
);
async trackInteraction(
@Body() dto: TrackInteractionBatchDto,
@Req() request: Request,
): Promise<BatchCaptureResult> {
return this.capture.trackInteraction(dto, request);
}
@Post('event')
@ -108,17 +84,10 @@ export class TrackingController {
summary: 'Track single event',
description: 'Records a generic analytics event',
})
@ApiResponse({ status: 201, description: 'Event tracked successfully' })
@ApiResponse({ status: 201, description: 'Event accepted' })
@ApiResponse({ status: 400, description: 'Invalid request data' })
async trackEvent(@Body() dto: TrackEventDto): Promise<TrackingResult> {
return this.trackingService.trackEvent({
eventType: dto.eventType,
sessionId: dto.sessionId,
userId: dto.userId,
pageUrl: dto.pageUrl,
metadata: dto.metadata,
timestamp: dto.timestamp,
});
async trackEvent(@Body() dto: TrackEventDto, @Req() request: Request): Promise<CaptureResult> {
return this.capture.trackEvent(dto, request);
}
@Post('batch')
@ -126,19 +95,10 @@ export class TrackingController {
summary: 'Track batch of events',
description: 'Records multiple events in a single request (for client-side batching)',
})
@ApiResponse({ status: 201, description: 'Events tracked successfully' })
@ApiResponse({ status: 201, description: 'Events accepted' })
@ApiResponse({ status: 400, description: 'Invalid request data' })
async trackBatch(@Body() dto: TrackBatchDto): Promise<BatchTrackingResult> {
return this.trackingService.trackBatch(
dto.events.map((e) => ({
eventType: e.eventType,
sessionId: e.sessionId,
userId: e.userId,
pageUrl: e.pageUrl,
metadata: e.metadata,
timestamp: e.timestamp,
})),
);
async trackBatch(@Body() dto: TrackBatchDto, @Req() request: Request): Promise<BatchCaptureResult> {
return this.capture.trackBatch(dto, request);
}
@Post('conversion')
@ -146,17 +106,13 @@ export class TrackingController {
summary: 'Track conversion',
description: 'Records a conversion event with optional revenue value',
})
@ApiResponse({ status: 201, description: 'Conversion tracked successfully' })
@ApiResponse({ status: 201, description: 'Conversion accepted' })
@ApiResponse({ status: 400, description: 'Invalid request data' })
async trackConversion(@Body() dto: TrackConversionDto): Promise<TrackingResult> {
return this.trackingService.trackConversion({
conversionType: dto.conversionType,
sessionId: dto.sessionId,
userId: dto.userId,
value: dto.value,
currency: dto.currency,
metadata: dto.metadata,
});
async trackConversion(
@Body() dto: TrackConversionDto,
@Req() request: Request,
): Promise<CaptureResult> {
return this.capture.trackConversion(dto, request);
}
@Post('funnel')
@ -164,20 +120,13 @@ export class TrackingController {
summary: 'Track funnel step',
description: 'Records a step in a conversion funnel',
})
@ApiResponse({ status: 201, description: 'Funnel step tracked successfully' })
@ApiResponse({ status: 201, description: 'Funnel step accepted' })
@ApiResponse({ status: 400, description: 'Invalid request data' })
async trackFunnelStep(@Body() dto: TrackFunnelStepDto): Promise<TrackingResult> {
return this.trackingService.trackEvent({
eventType: 'funnel_step',
sessionId: dto.sessionId,
userId: dto.userId,
metadata: {
funnelId: dto.funnelId,
stepId: dto.stepId,
stepOrder: dto.stepOrder,
...dto.metadata,
},
});
async trackFunnelStep(
@Body() dto: TrackFunnelStepDto,
@Req() request: Request,
): Promise<CaptureResult> {
return this.capture.trackFunnelStep(dto, request);
}
@Post('registration-funnel')
@ -185,23 +134,12 @@ export class TrackingController {
summary: 'Track registration funnel event',
description: 'Records a registration funnel event (visit, signup, profile, conversion)',
})
@ApiResponse({ status: 201, description: 'Registration funnel event tracked' })
@ApiResponse({ status: 201, description: 'Registration funnel event accepted' })
@ApiResponse({ status: 400, description: 'Invalid request data' })
async trackRegistrationFunnel(@Body() dto: TrackRegistrationFunnelDto): Promise<TrackingResult> {
return this.trackingService.trackEvent({
eventType: `registration_funnel_${dto.event}`,
sessionId: dto.sessionId,
userId: dto.userId,
pageUrl: dto.route,
timestamp: Date.parse(dto.timestamp),
metadata: {
audience: dto.audience,
userType: dto.userType,
registrationPath: dto.registrationPath,
registrationType: dto.registrationType,
entryReferrer: dto.entryReferrer,
referrer: dto.referrer,
},
});
async trackRegistrationFunnel(
@Body() dto: TrackRegistrationFunnelDto,
@Req() request: Request,
): Promise<CaptureResult> {
return this.capture.trackRegistrationFunnel(dto, request);
}
}

View file

@ -1,35 +1,16 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { BullModule } from '@nestjs/bullmq';
import { RawEvent } from '../entities/raw-event.entity';
import { SessionFingerprint } from '../entities/session-fingerprint.entity';
import { Corp } from '../entities/corp.entity';
import { Domain } from '../entities/domain.entity';
import { VisitorSalt } from '../entities/visitor-salt.entity';
import { TrackingService } from './tracking.service';
import { CaptureService } from './capture.service';
import { RedisRouter } from './redis-router.service';
import { TrackingController } from './tracking.controller';
import { DeviceEnrichmentService } from './device-enrichment.service';
import { AttributionService } from './attribution.service';
import { GovDetectionService } from './gov-detection.service';
import { IdentityService } from './identity.service';
import { DomainResolverService } from './domain-resolver.service';
/**
* Edge tracking module. No database, no enrichment the collector captures and
* durably enqueues via {@link RedisRouter} (primary = black redis, spool = local
* redis). All DB work happens on the black-side ingest-writer.
*/
@Module({
imports: [
TypeOrmModule.forFeature([RawEvent, SessionFingerprint, Corp, Domain, VisitorSalt]),
BullModule.registerQueue({
name: 'analytics-events',
}),
],
controllers: [TrackingController],
providers: [
TrackingService,
DeviceEnrichmentService,
AttributionService,
GovDetectionService,
IdentityService,
DomainResolverService,
],
exports: [TrackingService, IdentityService, DomainResolverService],
providers: [RedisRouter, CaptureService],
exports: [RedisRouter],
})
export class TrackingModule {}

View file

@ -1,551 +0,0 @@
import { Test } from '@nestjs/testing';
import { getRepositoryToken } from '@nestjs/typeorm';
import { getQueueToken } from '@nestjs/bullmq';
import { describe, it, expect, beforeEach, vi } from 'vitest';
import type { Request } from 'express';
import { TrackingService } from './tracking.service';
import { DeviceEnrichmentService } from './device-enrichment.service';
import { AttributionService } from './attribution.service';
import { RawEvent } from '../entities/raw-event.entity';
import { SessionFingerprint } from '../entities/session-fingerprint.entity';
import { createMockRepository, createMockQueue, type MockRepository, type MockQueue } from '@test/mocks';
describe('TrackingService', () => {
let service: TrackingService;
let deviceEnrichmentService: DeviceEnrichmentService;
let attributionService: AttributionService;
let rawEventRepository: MockRepository<RawEvent>;
let fingerprintRepository: MockRepository<SessionFingerprint>;
let eventsQueue: MockQueue;
beforeEach(async () => {
rawEventRepository = createMockRepository<RawEvent>();
fingerprintRepository = createMockRepository<SessionFingerprint>();
eventsQueue = createMockQueue();
const module = await Test.createTestingModule({
providers: [
TrackingService,
DeviceEnrichmentService,
AttributionService,
{ provide: getRepositoryToken(RawEvent), useValue: rawEventRepository },
{ provide: getRepositoryToken(SessionFingerprint), useValue: fingerprintRepository },
{ provide: getQueueToken('analytics-events'), useValue: eventsQueue },
],
}).compile();
service = module.get<TrackingService>(TrackingService);
deviceEnrichmentService = module.get<DeviceEnrichmentService>(DeviceEnrichmentService);
attributionService = module.get<AttributionService>(AttributionService);
});
describe('trackView', () => {
it('creates pageview event with enriched device data', async () => {
const mockRequest = {
headers: {
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/120.0.0.0',
'x-forwarded-for': '192.168.1.1',
},
socket: {},
} as unknown as Request;
const mockEvent = {
id: 'event-123',
eventType: 'pageview',
sessionId: 'session-abc',
pageUrl: 'https://example.com/page',
processed: false,
};
rawEventRepository.create.mockReturnValue(mockEvent as RawEvent);
rawEventRepository.save.mockResolvedValue(mockEvent as RawEvent);
fingerprintRepository.findOne.mockResolvedValue(null);
const result = await service.trackView(
{
pageUrl: 'https://example.com/page',
sessionId: 'session-abc',
referrer: 'https://google.com',
userId: 'user-123',
clientDevice: {
screenWidth: 1920,
screenHeight: 1080,
viewportWidth: 1440,
viewportHeight: 900,
},
attribution: {
utmSource: 'google',
utmMedium: 'cpc',
utmCampaign: 'winter-sale',
},
},
mockRequest,
);
expect(result.success).toBe(true);
expect(result.eventId).toBe('event-123');
expect(rawEventRepository.create).toHaveBeenCalledWith(
expect.objectContaining({
eventType: 'pageview',
sessionId: 'session-abc',
userId: 'user-123',
pageUrl: 'https://example.com/page',
referrer: 'https://google.com',
processed: false,
}),
);
expect(rawEventRepository.save).toHaveBeenCalledWith(mockEvent);
expect(eventsQueue.add).toHaveBeenCalledWith('process-event', {
eventId: 'event-123',
eventType: 'pageview',
sessionId: 'session-abc',
});
});
it('creates session fingerprint on first pageview', async () => {
const mockRequest = {
headers: {
'user-agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 15_0 like Mac OS X) Mobile/15E148 Safari/604.1',
},
socket: { remoteAddress: '10.0.0.1' },
} as unknown as Request;
const mockEvent = { id: 'event-456', eventType: 'pageview', sessionId: 'session-new' };
rawEventRepository.create.mockReturnValue(mockEvent as RawEvent);
rawEventRepository.save.mockResolvedValue(mockEvent as RawEvent);
fingerprintRepository.findOne.mockResolvedValue(null);
await service.trackView(
{
pageUrl: 'https://example.com/landing',
sessionId: 'session-new',
attribution: {
utmSource: 'facebook',
utmMedium: 'social',
},
},
mockRequest,
);
expect(fingerprintRepository.findOne).toHaveBeenCalledWith({
where: { sessionId: 'session-new' },
});
expect(fingerprintRepository.create).toHaveBeenCalledWith(
expect.objectContaining({
sessionId: 'session-new',
deviceType: 'mobile',
browser: 'Safari',
os: 'iOS',
trafficSource: 'social',
utmSource: 'facebook',
utmMedium: 'social',
landingPage: 'https://example.com/landing',
}),
);
expect(fingerprintRepository.save).toHaveBeenCalled();
});
it('updates existing session fingerprint with viewport and userId', async () => {
const mockRequest = {
headers: { 'user-agent': 'Chrome' },
socket: {},
} as unknown as Request;
const existingFingerprint = {
id: 'fp-123',
sessionId: 'session-existing',
userId: null,
viewportWidth: 1024,
viewportHeight: 768,
trafficSource: 'direct',
};
const mockEvent = { id: 'event-789' };
rawEventRepository.create.mockReturnValue(mockEvent as RawEvent);
rawEventRepository.save.mockResolvedValue(mockEvent as RawEvent);
fingerprintRepository.findOne.mockResolvedValue(existingFingerprint as SessionFingerprint);
await service.trackView(
{
pageUrl: 'https://example.com/dashboard',
sessionId: 'session-existing',
userId: 'user-456',
clientDevice: {
viewportWidth: 1440,
viewportHeight: 900,
},
},
mockRequest,
);
expect(fingerprintRepository.save).toHaveBeenCalledWith(
expect.objectContaining({
sessionId: 'session-existing',
userId: 'user-456',
viewportWidth: 1440,
viewportHeight: 900,
}),
);
});
it('handles errors gracefully and returns failure result', async () => {
const mockRequest = {
headers: {},
socket: {},
} as unknown as Request;
rawEventRepository.create.mockImplementation(() => {
throw new Error('Database connection failed');
});
const result = await service.trackView(
{
pageUrl: 'https://example.com/error',
sessionId: 'session-error',
},
mockRequest,
);
expect(result.success).toBe(false);
expect(result.eventId).toBeNull();
expect(result.error).toBe('Database connection failed');
});
it('extracts IP from x-forwarded-for header', async () => {
const mockRequest = {
headers: {
'x-forwarded-for': '203.0.113.1, 192.168.1.1, 10.0.0.1',
},
socket: { remoteAddress: '127.0.0.1' },
} as unknown as Request;
const enrichSpy = vi.spyOn(deviceEnrichmentService, 'enrich');
const mockEvent = { id: 'event-ip' };
rawEventRepository.create.mockReturnValue(mockEvent as RawEvent);
rawEventRepository.save.mockResolvedValue(mockEvent as RawEvent);
fingerprintRepository.findOne.mockResolvedValue(null);
await service.trackView(
{
pageUrl: 'https://example.com',
sessionId: 'session-ip',
},
mockRequest,
);
expect(enrichSpy).toHaveBeenCalledWith(
undefined,
'203.0.113.1',
undefined,
);
});
it('extracts IP from x-real-ip header', async () => {
const mockRequest = {
headers: {
'x-real-ip': '198.51.100.5',
},
socket: {},
} as unknown as Request;
const enrichSpy = vi.spyOn(deviceEnrichmentService, 'enrich');
const mockEvent = { id: 'event-real-ip' };
rawEventRepository.create.mockReturnValue(mockEvent as RawEvent);
rawEventRepository.save.mockResolvedValue(mockEvent as RawEvent);
fingerprintRepository.findOne.mockResolvedValue(null);
await service.trackView(
{
pageUrl: 'https://example.com',
sessionId: 'session-real-ip',
},
mockRequest,
);
expect(enrichSpy).toHaveBeenCalledWith(
undefined,
'198.51.100.5',
undefined,
);
});
});
describe('trackEvent', () => {
it('creates generic event with required fields', async () => {
const mockEvent = {
id: 'event-click-123',
eventType: 'click',
sessionId: 'session-xyz',
userId: 'user-789',
pageUrl: 'https://example.com/page',
};
rawEventRepository.create.mockReturnValue(mockEvent as RawEvent);
rawEventRepository.save.mockResolvedValue(mockEvent as RawEvent);
const result = await service.trackEvent({
eventType: 'click',
sessionId: 'session-xyz',
userId: 'user-789',
pageUrl: 'https://example.com/page',
metadata: {
elementId: 'cta-button',
elementText: 'Sign Up',
},
});
expect(result.success).toBe(true);
expect(result.eventId).toBe('event-click-123');
expect(rawEventRepository.create).toHaveBeenCalledWith(
expect.objectContaining({
eventType: 'click',
sessionId: 'session-xyz',
userId: 'user-789',
pageUrl: 'https://example.com/page',
metadata: {
elementId: 'cta-button',
elementText: 'Sign Up',
},
processed: false,
}),
);
expect(eventsQueue.add).toHaveBeenCalledWith('process-event', {
eventId: 'event-click-123',
eventType: 'click',
sessionId: 'session-xyz',
});
});
it('handles optional fields correctly', async () => {
const mockEvent = { id: 'event-minimal' };
rawEventRepository.create.mockReturnValue(mockEvent as RawEvent);
rawEventRepository.save.mockResolvedValue(mockEvent as RawEvent);
const result = await service.trackEvent({
eventType: 'custom_event',
sessionId: 'session-minimal',
});
expect(result.success).toBe(true);
expect(rawEventRepository.create).toHaveBeenCalledWith(
expect.objectContaining({
eventType: 'custom_event',
sessionId: 'session-minimal',
userId: null,
pageUrl: null,
metadata: null,
processed: false,
}),
);
});
it('uses custom timestamp if provided', async () => {
const customTimestamp = Date.now() - 60000; // 1 minute ago
const mockEvent = { id: 'event-timestamp' };
rawEventRepository.create.mockReturnValue(mockEvent as RawEvent);
rawEventRepository.save.mockResolvedValue(mockEvent as RawEvent);
await service.trackEvent({
eventType: 'scroll',
sessionId: 'session-ts',
timestamp: customTimestamp,
});
expect(rawEventRepository.create).toHaveBeenCalledWith(
expect.objectContaining({
timestamp: new Date(customTimestamp),
}),
);
});
it('handles errors and returns failure result', async () => {
rawEventRepository.save.mockRejectedValue(new Error('Save failed'));
const result = await service.trackEvent({
eventType: 'error_event',
sessionId: 'session-err',
});
expect(result.success).toBe(false);
expect(result.eventId).toBeNull();
expect(result.error).toBe('Save failed');
});
});
describe('trackBatch', () => {
it('processes multiple events successfully', async () => {
const mockEvents = [
{ id: 'event-1', eventType: 'click', sessionId: 'session-1' },
{ id: 'event-2', eventType: 'scroll', sessionId: 'session-1' },
{ id: 'event-3', eventType: 'submit', sessionId: 'session-1' },
];
rawEventRepository.create
.mockReturnValueOnce(mockEvents[0] as RawEvent)
.mockReturnValueOnce(mockEvents[1] as RawEvent)
.mockReturnValueOnce(mockEvents[2] as RawEvent);
rawEventRepository.save
.mockResolvedValueOnce(mockEvents[0] as RawEvent)
.mockResolvedValueOnce(mockEvents[1] as RawEvent)
.mockResolvedValueOnce(mockEvents[2] as RawEvent);
const result = await service.trackBatch([
{ eventType: 'click', sessionId: 'session-1', metadata: { target: 'button-1' } },
{ eventType: 'scroll', sessionId: 'session-1', metadata: { depth: 50 } },
{ eventType: 'submit', sessionId: 'session-1', metadata: { formId: 'contact' } },
]);
expect(result.success).toBe(true);
expect(result.count).toBe(3);
expect(result.errors).toHaveLength(0);
expect(eventsQueue.add).toHaveBeenCalledTimes(3);
});
it('handles partial failures in batch', async () => {
const mockEvent1 = { id: 'event-success' };
const mockEvent3 = { id: 'event-success-2' };
rawEventRepository.create
.mockReturnValueOnce(mockEvent1 as RawEvent)
.mockImplementationOnce(() => {
throw new Error('Invalid data');
})
.mockReturnValueOnce(mockEvent3 as RawEvent);
rawEventRepository.save
.mockResolvedValueOnce(mockEvent1 as RawEvent)
.mockResolvedValueOnce(mockEvent3 as RawEvent);
const result = await service.trackBatch([
{ eventType: 'event1', sessionId: 'session-1' },
{ eventType: 'event2', sessionId: 'session-2' },
{ eventType: 'event3', sessionId: 'session-3' },
]);
expect(result.success).toBe(false);
expect(result.count).toBe(2);
expect(result.errors).toHaveLength(1);
expect(result.errors[0]).toBe('Invalid data');
});
it('handles empty batch', async () => {
const result = await service.trackBatch([]);
expect(result.success).toBe(true);
expect(result.count).toBe(0);
expect(result.errors).toHaveLength(0);
});
});
describe('trackConversion', () => {
it('creates conversion event with value and currency', async () => {
const mockEvent = {
id: 'conversion-123',
eventType: 'conversion',
sessionId: 'session-conv',
userId: 'user-123',
metadata: {
conversionType: 'purchase',
value: 99.99,
currency: 'USD',
productId: 'prod-456',
},
};
rawEventRepository.create.mockReturnValue(mockEvent as RawEvent);
rawEventRepository.save.mockResolvedValue(mockEvent as RawEvent);
const result = await service.trackConversion({
conversionType: 'purchase',
sessionId: 'session-conv',
userId: 'user-123',
value: 99.99,
currency: 'USD',
metadata: {
productId: 'prod-456',
},
});
expect(result.success).toBe(true);
expect(result.eventId).toBe('conversion-123');
expect(rawEventRepository.create).toHaveBeenCalledWith(
expect.objectContaining({
eventType: 'conversion',
sessionId: 'session-conv',
userId: 'user-123',
metadata: {
conversionType: 'purchase',
value: 99.99,
currency: 'USD',
productId: 'prod-456',
},
processed: false,
}),
);
});
it('queues conversion event with high priority', async () => {
const mockEvent = { id: 'conversion-priority' };
rawEventRepository.create.mockReturnValue(mockEvent as RawEvent);
rawEventRepository.save.mockResolvedValue(mockEvent as RawEvent);
await service.trackConversion({
conversionType: 'signup',
sessionId: 'session-signup',
userId: 'user-new',
});
expect(eventsQueue.add).toHaveBeenCalledWith(
'process-event',
{
eventId: 'conversion-priority',
eventType: 'conversion',
sessionId: 'session-signup',
conversionType: 'signup',
},
{ priority: 1 },
);
});
it('handles conversion without value or currency', async () => {
const mockEvent = { id: 'conversion-no-value' };
rawEventRepository.create.mockReturnValue(mockEvent as RawEvent);
rawEventRepository.save.mockResolvedValue(mockEvent as RawEvent);
const result = await service.trackConversion({
conversionType: 'newsletter_signup',
sessionId: 'session-newsletter',
});
expect(result.success).toBe(true);
expect(rawEventRepository.create).toHaveBeenCalledWith(
expect.objectContaining({
metadata: expect.objectContaining({
conversionType: 'newsletter_signup',
value: undefined,
currency: undefined,
}),
}),
);
});
it('handles errors gracefully', async () => {
rawEventRepository.create.mockImplementation(() => {
throw new Error('Conversion tracking failed');
});
const result = await service.trackConversion({
conversionType: 'error_conversion',
sessionId: 'session-error',
});
expect(result.success).toBe(false);
expect(result.eventId).toBeNull();
expect(result.error).toBe('Conversion tracking failed');
});
});
});

View file

@ -1,376 +0,0 @@
import { Injectable, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { InjectQueue } from '@nestjs/bullmq';
import { Repository } from 'typeorm';
import { Queue } from 'bullmq';
import type { Request } from 'express';
import { RawEvent } from '../entities/raw-event.entity';
import { SessionFingerprint } from '../entities/session-fingerprint.entity';
import { DeviceEnrichmentService, ClientDeviceData, EnrichedDeviceData } from './device-enrichment.service';
import { AttributionService, AttributionInput, ResolvedAttribution } from './attribution.service';
import { IdentityService } from './identity.service';
import { DomainResolverService } from './domain-resolver.service';
export interface TrackViewInput {
pageUrl: string;
referrer?: string;
userId?: string;
sessionId: string;
metadata?: Record<string, unknown>;
clientDevice?: ClientDeviceData;
attribution?: AttributionInput;
}
export interface TrackEventInput {
eventType: string;
sessionId: string;
userId?: string;
pageUrl?: string;
metadata?: Record<string, unknown>;
timestamp?: number;
}
export interface TrackConversionInput {
conversionType: string;
sessionId: string;
userId?: string;
value?: number;
currency?: string;
metadata?: Record<string, unknown>;
}
export interface TrackingResult {
success: boolean;
eventId: string | null;
error?: string;
}
export interface BatchTrackingResult {
success: boolean;
count: number;
errors: string[];
}
@Injectable()
export class TrackingService {
private readonly logger = new Logger(TrackingService.name);
constructor(
private readonly deviceEnrichmentService: DeviceEnrichmentService,
private readonly attributionService: AttributionService,
private readonly identityService: IdentityService,
private readonly domainResolver: DomainResolverService,
@InjectRepository(RawEvent)
private readonly rawEventRepository: Repository<RawEvent>,
@InjectRepository(SessionFingerprint)
private readonly fingerprintRepository: Repository<SessionFingerprint>,
@InjectQueue('analytics-events')
private readonly eventsQueue: Queue,
) {}
/**
* Derive cross-domain dimensions for a request:
* - visitor_id_daily: sha256(daily_salt || ip || ua || lang)
* - corp_id, domain_id: from Origin/Referer/Host domains row
*
* Returns nulls for any dimension we can't resolve. Callers persist all
* three on the event row.
*/
private async resolveCrossDomainDimensions(request: Request): Promise<{
visitorIdDaily: Buffer | null;
corpId: number | null;
domainId: number | null;
}> {
const ua = (request.headers['user-agent'] as string | undefined) ?? undefined;
const lang = (request.headers['accept-language'] as string | undefined) ?? undefined;
const ip = this.extractIpAddress(request) ?? undefined;
const visitorIdDaily = await this.identityService.visitorIdDaily(ip, ua, lang);
const hostname = this.domainResolver.hostnameFromHeaders(request.headers);
const resolved = await this.domainResolver.resolve(hostname);
return {
visitorIdDaily,
corpId: resolved?.corpId ?? null,
domainId: resolved?.domainId ?? null,
};
}
/**
* Track page view with device fingerprinting
*/
async trackView(data: TrackViewInput, request: Request): Promise<TrackingResult> {
try {
// Extract User-Agent and IP from request headers
const userAgent = request.headers['user-agent'] as string | undefined;
const ipAddress = this.extractIpAddress(request);
// Enrich device data (IP is hashed, never stored raw)
const enriched = await this.deviceEnrichmentService.enrich(
userAgent,
ipAddress,
data.clientDevice,
);
// Resolve attribution
const attribution = this.attributionService.resolveTrafficSource({
utmSource: data.attribution?.utmSource,
utmMedium: data.attribution?.utmMedium,
utmCampaign: data.attribution?.utmCampaign,
utmContent: data.attribution?.utmContent,
utmTerm: data.attribution?.utmTerm,
referrer: data.referrer,
});
// Create or update session fingerprint (first-touch attribution)
await this.upsertSessionFingerprint(
data.sessionId,
data.userId,
enriched,
attribution,
data.pageUrl,
);
// Cross-domain dimensions
const { visitorIdDaily, corpId, domainId } =
await this.resolveCrossDomainDimensions(request);
// Create raw event
const event = this.rawEventRepository.create({
eventType: 'pageview',
sessionId: data.sessionId,
userId: data.userId ?? null,
pageUrl: data.pageUrl,
referrer: data.referrer ?? null,
deviceType: enriched.deviceType,
metadata: data.metadata ?? null,
timestamp: new Date(),
processed: false,
visitorIdDaily,
corpId,
domainId,
});
const saved = await this.rawEventRepository.save(event);
// Queue for async processing
await this.eventsQueue.add('process-event', {
eventId: saved.id,
eventType: 'pageview',
sessionId: data.sessionId,
});
return { success: true, eventId: saved.id };
} catch (error) {
this.logger.error('Failed to track view event', error);
return {
success: false,
eventId: null,
error: error instanceof Error ? error.message : 'Unknown error',
};
}
}
/**
* Track generic event
*/
async trackEvent(data: TrackEventInput): Promise<TrackingResult> {
try {
const event = this.rawEventRepository.create({
eventType: data.eventType,
sessionId: data.sessionId,
userId: data.userId ?? null,
pageUrl: data.pageUrl ?? null,
metadata: data.metadata ?? null,
timestamp: data.timestamp ? new Date(data.timestamp) : new Date(),
processed: false,
});
const saved = await this.rawEventRepository.save(event);
// Queue for async processing
await this.eventsQueue.add('process-event', {
eventId: saved.id,
eventType: data.eventType,
sessionId: data.sessionId,
});
return { success: true, eventId: saved.id };
} catch (error) {
this.logger.error('Failed to track event', error);
return {
success: false,
eventId: null,
error: error instanceof Error ? error.message : 'Unknown error',
};
}
}
/**
* Track batch of events
*/
async trackBatch(events: TrackEventInput[]): Promise<BatchTrackingResult> {
const results = await Promise.allSettled(
events.map((event) => this.trackEvent(event)),
);
const errors = results
.filter((r): r is PromiseRejectedResult => r.status === 'rejected')
.map((r) => (r.reason as Error)?.message ?? 'Unknown error');
const successCount = results.filter((r) => r.status === 'fulfilled').length;
return {
success: errors.length === 0,
count: successCount,
errors,
};
}
/**
* Track conversion event
*/
async trackConversion(data: TrackConversionInput): Promise<TrackingResult> {
try {
const event = this.rawEventRepository.create({
eventType: 'conversion',
sessionId: data.sessionId,
userId: data.userId ?? null,
metadata: {
conversionType: data.conversionType,
value: data.value,
currency: data.currency,
...data.metadata,
},
timestamp: new Date(),
processed: false,
});
const saved = await this.rawEventRepository.save(event);
// Queue for async processing (high priority for conversions)
await this.eventsQueue.add(
'process-event',
{
eventId: saved.id,
eventType: 'conversion',
sessionId: data.sessionId,
conversionType: data.conversionType,
},
{ priority: 1 }, // Higher priority
);
return { success: true, eventId: saved.id };
} catch (error) {
this.logger.error('Failed to track conversion', error);
return {
success: false,
eventId: null,
error: error instanceof Error ? error.message : 'Unknown error',
};
}
}
/**
* Create or update session fingerprint
* Attribution is first-touch: only set on creation
*/
private async upsertSessionFingerprint(
sessionId: string,
userId: string | undefined,
enriched: EnrichedDeviceData,
attribution: ResolvedAttribution,
landingPage: string,
): Promise<void> {
try {
const existing = await this.fingerprintRepository.findOne({
where: { sessionId },
});
if (existing) {
// Update viewport (may change on resize) and userId (if logged in later)
if (enriched.viewportWidth !== undefined) {
existing.viewportWidth = enriched.viewportWidth;
}
if (enriched.viewportHeight !== undefined) {
existing.viewportHeight = enriched.viewportHeight;
}
if (userId && !existing.userId) {
existing.userId = userId;
}
await this.fingerprintRepository.save(existing);
} else {
// Create new fingerprint with first-touch attribution
const fingerprint = this.fingerprintRepository.create({
sessionId,
userId: userId ?? null,
deviceType: enriched.deviceType,
isBot: enriched.isBot,
browser: enriched.browser,
browserVersion: enriched.browserVersion,
browserMajor: enriched.browserMajor,
os: enriched.os,
osVersion: enriched.osVersion,
deviceVendor: enriched.deviceVendor,
deviceModel: enriched.deviceModel,
screenWidth: enriched.screenWidth ?? null,
screenHeight: enriched.screenHeight ?? null,
viewportWidth: enriched.viewportWidth ?? null,
viewportHeight: enriched.viewportHeight ?? null,
pixelRatio: enriched.pixelRatio ?? null,
colorDepth: enriched.colorDepth ?? null,
language: enriched.language ?? null,
languages: enriched.languages ?? null,
timezone: enriched.timezone ?? null,
timezoneOffset: enriched.timezoneOffset ?? null,
country: enriched.country,
region: enriched.region,
city: enriched.city,
isEU: enriched.isEU,
geoTimezone: enriched.geoTimezone,
isVpn: enriched.isVpn,
isDatacenter: enriched.isDatacenter,
isTor: enriched.isTor,
ipHash: enriched.ipHash,
deviceMemory: enriched.deviceMemory ?? null,
hardwareConcurrency: enriched.hardwareConcurrency ?? null,
touchPoints: enriched.touchPoints ?? null,
cookiesEnabled: enriched.cookiesEnabled ?? null,
doNotTrack: enriched.doNotTrack ?? null,
// First-touch attribution
trafficSource: attribution.source,
utmSource: attribution.rawSource,
utmMedium: attribution.medium,
utmCampaign: attribution.campaign,
utmContent: attribution.content,
utmTerm: attribution.term,
referrer: attribution.referrer,
landingPage,
});
await this.fingerprintRepository.save(fingerprint);
}
} catch (error) {
// Log but don't fail - fingerprinting is enhancement, not critical
this.logger.warn('Failed to upsert session fingerprint', error);
}
}
/**
* Extract client IP from request headers
*/
private extractIpAddress(request: Request): string | undefined {
const forwardedFor = request.headers['x-forwarded-for'];
const realIp = request.headers['x-real-ip'];
if (forwardedFor) {
const firstIp = (forwardedFor as string).split(',')[0];
return firstIp?.trim();
}
if (realIp) {
return realIp as string;
}
return request.socket.remoteAddress;
}
}

View file

@ -18,6 +18,7 @@
},
"dependencies": {
"@lilith/analytics": "workspace:*",
"@lilith/gov-detection": "^1.0.3",
"@nestjs/bullmq": "^11.0.0",
"@nestjs/common": "^11.0.0",
"@nestjs/config": "^4.0.0",
@ -26,6 +27,7 @@
"@nestjs/terminus": "^11.0.0",
"@nestjs/typeorm": "^11.0.0",
"bullmq": "^5.0.0",
"geoip-lite": "^2.0.1",
"ioredis": "^5.9.1",
"pg": "^8.11.0",
"reflect-metadata": "^0.2.0",
@ -39,6 +41,7 @@
"@nestjs/testing": "^11.0.0",
"@swc/cli": "^0.7.10",
"@swc/core": "^1.15.8",
"@types/geoip-lite": "^1.4.4",
"@types/node": "^20.0.0",
"typescript": "^5.4.0",
"vitest": "^1.0.0"

View file

@ -7,13 +7,20 @@ import {
} from 'typeorm';
/**
* Raw analytics event - mirrors the collector's raw_events table.
* Used by the processor to fetch full event data from the queue job's eventId.
* Raw analytics event the canonical event log.
*
* The processor service is now the canonical writer: the ingest-writer inserts
* rows here from edge envelopes, and the aggregation worker reads them back by
* id. This must carry the full schema (including the cross-domain dimensions),
* not just the read-side subset.
*/
@Entity('raw_events')
@Index(['sessionId', 'timestamp'])
@Index(['eventType', 'timestamp'])
@Index(['processed', 'timestamp'])
@Index(['visitorIdDaily', 'timestamp'])
@Index(['corpId', 'timestamp'])
@Index(['domainId', 'timestamp'])
export class RawEvent {
@PrimaryGeneratedColumn('uuid')
id!: string;
@ -55,4 +62,21 @@ export class RawEvent {
@Column({ type: 'timestamptz', nullable: true })
processedAt?: Date | null;
/**
* Daily-rotating visitor identity for cross-domain stitching.
* sha256(daily_salt || ip || ua || lang). 32 bytes. Stable for the same
* visitor across all our domains within a UTC day; unrecoverable after salt
* rotation (cookie-free, GDPR-clean).
*/
@Column({ type: 'bytea', nullable: true, name: 'visitor_id_daily' })
visitorIdDaily?: Buffer | null;
/** Corp that owns the originating domain. FK → corps(id). */
@Column({ type: 'smallint', nullable: true, name: 'corp_id' })
corpId?: number | null;
/** Domain the event originated from. FK → domains(id). */
@Column({ type: 'integer', nullable: true, name: 'domain_id' })
domainId?: number | null;
}

View file

@ -0,0 +1,36 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { RawEvent } from '../entities/raw-event.entity';
import { SessionFingerprint } from '../entities/session-fingerprint.entity';
import { VisitorSalt } from '../entities/visitor-salt.entity';
import { Domain } from '../entities/domain.entity';
import { Corp } from '../entities/corp.entity';
import { DeviceEnrichmentService } from './device-enrichment.service';
import { AttributionService } from './attribution.service';
import { GovDetectionService } from './gov-detection.service';
import { IdentityService } from './identity.service';
import { DomainResolverService } from './domain-resolver.service';
import { IngestService } from './ingest.service';
/**
* Canonical-writer module. Owns the DB-bound enrichment that used to run in the
* collector and the {@link IngestService} that turns an edge envelope into a
* `raw_events` row. Lives on the canonical-store host (black).
*/
@Module({
imports: [
TypeOrmModule.forFeature([RawEvent, SessionFingerprint, VisitorSalt, Domain, Corp]),
],
providers: [
DeviceEnrichmentService,
AttributionService,
GovDetectionService,
IdentityService,
DomainResolverService,
IngestService,
],
exports: [IngestService],
})
export class IngestModule {}

View file

@ -0,0 +1,221 @@
import { Injectable, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import type { RawEventEnvelope } from '@lilith/analytics';
import { RawEvent } from '../entities/raw-event.entity';
import { SessionFingerprint } from '../entities/session-fingerprint.entity';
import {
DeviceEnrichmentService,
type ClientDeviceData,
type EnrichedDeviceData,
} from './device-enrichment.service';
import { AttributionService, type ResolvedAttribution } from './attribution.service';
import { IdentityService } from './identity.service';
import { DomainResolverService } from './domain-resolver.service';
/**
* Canonical writer. Consumes an edge-captured {@link RawEventEnvelope}, runs the
* DB-bound enrichment that used to live in the collector (device, identity,
* domain, fingerprint), and inserts the canonical `raw_events` row.
*
* Idempotency: the row's primary key is the edge-minted `eventId`, inserted
* with `ON CONFLICT (id) DO NOTHING`. A re-drained or duplicated envelope is a
* no-op and does NOT re-trigger aggregation (which is additive and would
* otherwise inflate counters).
*
* Timestamps: `receivedAt` is taken from the envelope (the EDGE clock), never
* `now()`, so events spooled during a canonical-store outage keep their true
* arrival time when drained.
*/
@Injectable()
export class IngestService {
private readonly logger = new Logger(IngestService.name);
constructor(
private readonly deviceEnrichment: DeviceEnrichmentService,
private readonly attribution: AttributionService,
private readonly identity: IdentityService,
private readonly domainResolver: DomainResolverService,
@InjectRepository(RawEvent)
private readonly rawEventRepo: Repository<RawEvent>,
@InjectRepository(SessionFingerprint)
private readonly fingerprintRepo: Repository<SessionFingerprint>,
) {}
/**
* Persist one event. Returns the aggregation job payload when this call
* actually inserted the row, or `null` when the row already existed (so the
* caller skips re-enqueuing aggregation).
*/
async write(
envelope: RawEventEnvelope,
): Promise<{ eventId: string; eventType: string; sessionId: string } | null> {
const ev = envelope.payload;
const receivedAt = new Date(envelope.receivedAt);
const timestamp =
ev.timestamp != null && Number.isFinite(ev.timestamp)
? new Date(ev.timestamp)
: receivedAt;
let deviceType: string | null = null;
let visitorIdDaily: Buffer | null = null;
let corpId: number | null = null;
let domainId: number | null = null;
if (ev.isView) {
const enriched = await this.deviceEnrichment.enrich(
envelope.edge.ua,
envelope.edge.ip,
(ev.clientDevice ?? undefined) as ClientDeviceData | undefined,
);
deviceType = enriched.deviceType;
const attribution = this.attribution.resolveTrafficSource({
utmSource: ev.attribution?.utmSource,
utmMedium: ev.attribution?.utmMedium,
utmCampaign: ev.attribution?.utmCampaign,
utmContent: ev.attribution?.utmContent,
utmTerm: ev.attribution?.utmTerm,
referrer: ev.attribution?.referrer ?? ev.referrer ?? undefined,
});
await this.upsertSessionFingerprint(envelope, enriched, attribution);
visitorIdDaily = await this.identity.visitorIdDaily(
envelope.edge.ip,
envelope.edge.ua,
envelope.edge.lang,
receivedAt,
);
const hostname = this.domainResolver.hostnameFromHeaders(envelope.edge.headers);
const resolved = await this.domainResolver.resolve(hostname);
corpId = resolved?.corpId ?? null;
domainId = resolved?.domainId ?? null;
}
const result = await this.rawEventRepo
.createQueryBuilder()
.insert()
.into(RawEvent)
.values({
id: envelope.eventId,
eventType: ev.eventType,
sessionId: ev.sessionId,
userId: ev.userId ?? null,
pageUrl: ev.pageUrl ?? null,
referrer: ev.referrer ?? null,
deviceType,
metadata: ev.metadata ?? null,
timestamp,
receivedAt,
processed: false,
visitorIdDaily,
corpId,
domainId,
})
.orIgnore()
.returning('id')
.execute();
const inserted = Array.isArray(result.raw) && result.raw.length > 0;
if (!inserted) {
this.logger.debug(`raw_events ${envelope.eventId} already present — duplicate drain, skipping aggregation`);
return null;
}
return { eventId: envelope.eventId, eventType: ev.eventType, sessionId: ev.sessionId };
}
/**
* Create or update the session fingerprint. Attribution is first-touch: set
* only on creation, never overwritten. Failures are logged, not thrown
* fingerprinting is enhancement, not the canonical record.
*/
private async upsertSessionFingerprint(
envelope: RawEventEnvelope,
enriched: EnrichedDeviceData,
attribution: ResolvedAttribution,
): Promise<void> {
const ev = envelope.payload;
try {
const existing = await this.fingerprintRepo.findOne({
where: { sessionId: ev.sessionId },
});
if (existing) {
if (enriched.viewportWidth !== undefined) {
existing.viewportWidth = enriched.viewportWidth;
}
if (enriched.viewportHeight !== undefined) {
existing.viewportHeight = enriched.viewportHeight;
}
if (ev.userId && !existing.userId) {
existing.userId = ev.userId;
}
await this.fingerprintRepo.save(existing);
return;
}
const fingerprint = this.fingerprintRepo.create({
sessionId: ev.sessionId,
userId: ev.userId ?? null,
deviceType: enriched.deviceType,
isBot: enriched.isBot,
browser: enriched.browser,
browserVersion: enriched.browserVersion,
browserMajor: enriched.browserMajor,
os: enriched.os,
osVersion: enriched.osVersion,
deviceVendor: enriched.deviceVendor,
deviceModel: enriched.deviceModel,
screenWidth: enriched.screenWidth ?? null,
screenHeight: enriched.screenHeight ?? null,
viewportWidth: enriched.viewportWidth ?? null,
viewportHeight: enriched.viewportHeight ?? null,
pixelRatio: enriched.pixelRatio ?? null,
colorDepth: enriched.colorDepth ?? null,
language: enriched.language ?? null,
languages: enriched.languages ?? null,
timezone: enriched.timezone ?? null,
timezoneOffset: enriched.timezoneOffset ?? null,
country: enriched.country,
region: enriched.region,
city: enriched.city,
isEU: enriched.isEU,
geoTimezone: enriched.geoTimezone,
isVpn: enriched.isVpn,
isDatacenter: enriched.isDatacenter,
isTor: enriched.isTor,
isGovernment: enriched.isGovernment,
orgType: enriched.orgType,
responseTier: enriched.responseTier,
org: enriched.org,
asn: enriched.asn,
ipHash: enriched.ipHash,
deviceMemory: enriched.deviceMemory ?? null,
hardwareConcurrency: enriched.hardwareConcurrency ?? null,
touchPoints: enriched.touchPoints ?? null,
cookiesEnabled: enriched.cookiesEnabled ?? null,
doNotTrack: enriched.doNotTrack ?? null,
trafficSource: attribution.source,
utmSource: attribution.rawSource,
utmMedium: attribution.medium,
utmCampaign: attribution.campaign,
utmContent: attribution.content,
utmTerm: attribution.term,
referrer: attribution.referrer,
landingPage: ev.pageUrl ?? null,
});
await this.fingerprintRepo.save(fingerprint);
} catch (error) {
// First-write race on sessionId (unique) just means another envelope for
// the same session won — safe to ignore. Anything else is logged.
this.logger.warn(
`Failed to upsert session fingerprint for ${ev.sessionId}: ${
error instanceof Error ? error.message : String(error)
}`,
);
}
}
}

View file

@ -1,19 +1,34 @@
import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq';
import { Processor, WorkerHost, OnWorkerEvent, InjectQueue } from '@nestjs/bullmq';
import { Injectable, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import type { Job } from 'bullmq';
import type { Job, Queue } from 'bullmq';
import {
EVENTS_QUEUE,
INGEST_EVENT_JOB,
PROCESS_EVENT_JOB,
type RawEventEnvelope,
type ProcessEventJob,
} from '@lilith/analytics';
import { AggregationService } from './aggregation.service';
import { RawEvent } from '../entities/raw-event.entity';
import { IngestService } from '../ingest/ingest.service';
interface EventJob {
eventId: string;
eventType: string;
sessionId: string;
}
/**
* Single worker on the shared `analytics-events` queue. It handles two job
* types so they share one redis connection and consumer pool:
*
* ingest-event enrich + write the canonical raw_events row (IngestService),
* then emit a process-event job for the row it inserted.
* process-event aggregate an already-written row into aggregated_metrics.
*
* Splitting these into two @Processor classes on the same queue would NOT work:
* BullMQ workers pull any job regardless of name, so a single switch is the
* correct pattern.
*/
@Injectable()
@Processor('analytics-events', {
@Processor(EVENTS_QUEUE, {
concurrency: 10,
})
export class EventsProcessor extends WorkerHost {
@ -21,16 +36,49 @@ export class EventsProcessor extends WorkerHost {
constructor(
private readonly aggregationService: AggregationService,
private readonly ingestService: IngestService,
@InjectRepository(RawEvent)
private readonly rawEventRepository: Repository<RawEvent>,
@InjectQueue(EVENTS_QUEUE)
private readonly eventsQueue: Queue,
) {
super();
}
async process(job: Job<EventJob>): Promise<void> {
async process(job: Job): Promise<void> {
switch (job.name) {
case INGEST_EVENT_JOB:
return this.handleIngest(job as Job<RawEventEnvelope>);
case PROCESS_EVENT_JOB:
return this.handleAggregate(job as Job<ProcessEventJob>);
default:
this.logger.warn(`Unknown job name '${job.name}' on ${EVENTS_QUEUE} — skipping`);
return;
}
}
/** Write the canonical row, then hand off to aggregation (only if newly inserted). */
private async handleIngest(job: Job<RawEventEnvelope>): Promise<void> {
const written = await this.ingestService.write(job.data);
if (!written) return; // duplicate drain — row already present, already aggregated
await this.eventsQueue.add(
PROCESS_EVENT_JOB,
{
eventId: written.eventId,
eventType: written.eventType,
sessionId: written.sessionId,
} satisfies ProcessEventJob,
// Reuse the event id so a re-drain can't double-enqueue aggregation.
{ jobId: `agg:${written.eventId}` },
);
}
/** Aggregate one already-written raw event. */
private async handleAggregate(job: Job<ProcessEventJob>): Promise<void> {
const { eventId, eventType, sessionId } = job.data;
this.logger.debug(`Processing event: ${eventType} (id: ${eventId}) from session ${sessionId}`);
this.logger.debug(`Aggregating event: ${eventType} (id: ${eventId}) from session ${sessionId}`);
const event = await this.rawEventRepository.findOne({ where: { id: eventId } });
@ -63,14 +111,12 @@ export class EventsProcessor extends WorkerHost {
}
@OnWorkerEvent('completed')
onCompleted(job: Job<EventJob>) {
this.logger.debug(`Job ${job.id} completed for event ${job.data.eventType}`);
onCompleted(job: Job) {
this.logger.debug(`Job ${job.id} (${job.name}) completed`);
}
@OnWorkerEvent('failed')
onFailed(job: Job<EventJob>, error: Error) {
this.logger.error(
`Job ${job.id} failed for event ${job.data.eventType}: ${error.message}`,
);
onFailed(job: Job, error: Error) {
this.logger.error(`Job ${job.id} (${job.name}) failed: ${error.message}`);
}
}

View file

@ -1,19 +1,22 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { BullModule } from '@nestjs/bullmq';
import { EVENTS_QUEUE } from '@lilith/analytics';
import { EventsProcessor } from './events.processor';
import { AggregationService } from './aggregation.service';
import { AggregatedMetric } from '../entities/aggregated-metric.entity';
import { RawEvent } from '../entities/raw-event.entity';
import { RedisModule } from '../redis/redis.module';
import { IngestModule } from '../ingest/ingest.module';
@Module({
imports: [
TypeOrmModule.forFeature([AggregatedMetric, RawEvent]),
BullModule.registerQueue({
name: 'analytics-events',
name: EVENTS_QUEUE,
}),
RedisModule,
IngestModule,
],
providers: [EventsProcessor, AggregationService],
exports: [AggregationService],