From b2527534760a9a86a7da37d13ca0be08456035fe Mon Sep 17 00:00:00 2001 From: Natalie Date: Sun, 21 Jun 2026 07:48:02 -0500 Subject: [PATCH] feat(analytics): canonical store on black, vps-0 edge with redis outage-spool Relocate the canonical analytics store off the public VPS. The collector becomes a DB-free edge that captures + durably enqueues events to black's redis; a black-side ingest-writer enriches and writes raw_events. When black is unreachable the collector spools to a local appendonly redis that only grows during the outage and drains on recovery. - shared: RawEventEnvelope/NormalizedEvent ingest contract (edge -> writer) - collector: capture-and-enqueue + dual-redis RedisRouter (primary=black, spool=local) + paused-until-healthy drain worker; drop TypeORM/enrichment - processor: IngestService canonical writer (edge receivedAt, ON CONFLICT DO NOTHING), single worker branches ingest-event -> process-event - relocate device/identity/domain/attribution enrichment + entities to writer Co-Authored-By: Claude Opus 4.8 --- packages/analytics/src/types/index.ts | 3 + packages/analytics/src/types/ingest.ts | 123 ++++ services/collector/package.json | 11 +- services/collector/src/app.module.ts | 53 +- services/collector/src/entities/index.ts | 6 - .../src/entities/raw-event.entity.ts | 93 --- .../collector/src/health/health.controller.ts | 35 +- .../collector/src/health/health.module.ts | 9 +- .../src/tracking/attribution.service.spec.ts | 487 ---------------- .../collector/src/tracking/capture.service.ts | 252 ++++++++ .../device-enrichment.service.spec.ts | 431 -------------- services/collector/src/tracking/index.ts | 17 +- .../src/tracking/redis-router.service.ts | 195 +++++++ .../src/tracking/tracking.controller.ts | 164 ++---- .../collector/src/tracking/tracking.module.ts | 37 +- .../src/tracking/tracking.service.spec.ts | 551 ------------------ .../src/tracking/tracking.service.ts | 376 ------------ services/processor/package.json | 3 + .../src/entities/corp.entity.ts | 0 .../src/entities/domain.entity.ts | 0 .../src/entities/raw-event.entity.ts | 28 +- .../entities/session-fingerprint.entity.ts | 0 .../src/entities/visitor-salt.entity.ts | 0 .../src/ingest}/attribution.service.ts | 0 .../src/ingest}/device-enrichment.service.ts | 0 .../src/ingest}/domain-resolver.service.ts | 0 .../src/ingest}/gov-detection.service.ts | 0 .../src/ingest}/identity.service.ts | 0 .../processor/src/ingest/ingest.module.ts | 36 ++ .../processor/src/ingest/ingest.service.ts | 221 +++++++ .../src/processors/events.processor.ts | 80 ++- .../src/processors/processors.module.ts | 5 +- 32 files changed, 1022 insertions(+), 2194 deletions(-) create mode 100644 packages/analytics/src/types/ingest.ts delete mode 100644 services/collector/src/entities/index.ts delete mode 100644 services/collector/src/entities/raw-event.entity.ts delete mode 100644 services/collector/src/tracking/attribution.service.spec.ts create mode 100644 services/collector/src/tracking/capture.service.ts delete mode 100644 services/collector/src/tracking/device-enrichment.service.spec.ts create mode 100644 services/collector/src/tracking/redis-router.service.ts delete mode 100644 services/collector/src/tracking/tracking.service.spec.ts delete mode 100644 services/collector/src/tracking/tracking.service.ts rename services/{collector => processor}/src/entities/corp.entity.ts (100%) rename services/{collector => processor}/src/entities/domain.entity.ts (100%) rename services/{collector => processor}/src/entities/session-fingerprint.entity.ts (100%) rename services/{collector => processor}/src/entities/visitor-salt.entity.ts (100%) rename services/{collector/src/tracking => processor/src/ingest}/attribution.service.ts (100%) rename services/{collector/src/tracking => processor/src/ingest}/device-enrichment.service.ts (100%) rename services/{collector/src/tracking => processor/src/ingest}/domain-resolver.service.ts (100%) rename services/{collector/src/tracking => processor/src/ingest}/gov-detection.service.ts (100%) rename services/{collector/src/tracking => processor/src/ingest}/identity.service.ts (100%) create mode 100644 services/processor/src/ingest/ingest.module.ts create mode 100644 services/processor/src/ingest/ingest.service.ts diff --git a/packages/analytics/src/types/index.ts b/packages/analytics/src/types/index.ts index b23ecea..e3fa28b 100644 --- a/packages/analytics/src/types/index.ts +++ b/packages/analytics/src/types/index.ts @@ -12,3 +12,6 @@ export * from './common'; // GDPR Types export * from './gdpr'; + +// Ingest envelope (edge collector → black-side ingest-writer contract) +export * from './ingest'; diff --git a/packages/analytics/src/types/ingest.ts b/packages/analytics/src/types/ingest.ts new file mode 100644 index 0000000..7dc22c8 --- /dev/null +++ b/packages/analytics/src/types/ingest.ts @@ -0,0 +1,123 @@ +/** + * Ingest envelope — the durable contract between the edge collector (producer) + * and the canonical-store ingest-writer (consumer). + * + * The collector does NO database work. It captures the request at the edge, + * wraps it in a {@link RawEventEnvelope}, and enqueues it to redis. All + * enrichment (visitor identity, domain resolution, device fingerprint) and the + * canonical `raw_events` write happen on the black-side ingest-writer when the + * envelope is consumed. This makes the redis enqueue — not a synchronous DB + * write — the durability boundary, so a canonical-store outage spools events + * locally instead of dropping them. + */ + +/** BullMQ queue shared by the collector (producer) and the black-side workers. */ +export const EVENTS_QUEUE = 'analytics-events'; + +/** + * Job name for a captured-at-edge event awaiting enrichment + canonical write. + * Consumed by the black-side ingest-writer. + */ +export const INGEST_EVENT_JOB = 'ingest-event'; + +/** + * Job name for an already-written `raw_events` row awaiting aggregation. + * Emitted by the ingest-writer, consumed by the existing aggregation processor. + */ +export const PROCESS_EVENT_JOB = 'process-event'; + +/** Which collector endpoint produced the envelope — selects the writer's branch. */ +export type IngestKind = + | 'view' + | 'engagement' + | 'interaction' + | 'event' + | 'batch' + | 'conversion' + | 'funnel' + | 'registration-funnel'; + +/** + * First-touch attribution inputs (UTM + referrer), carried verbatim from the + * edge for views. Resolved into a traffic source on the black-side writer. + */ +export interface AttributionInputData { + readonly utmSource?: string; + readonly utmMedium?: string; + readonly utmCampaign?: string; + readonly utmContent?: string; + readonly utmTerm?: string; + readonly referrer?: string; +} + +/** + * A single event, fully normalized at the edge into the generic shape the + * canonical writer persists. All per-endpoint mapping (engagement → eventType, + * funnel → eventType, batch expansion, etc.) happens on the collector because + * it is pure; only DB-bound enrichment (device, identity, fingerprint) is + * deferred to the writer. + */ +export interface NormalizedEvent { + /** Canonical event type, e.g. 'pageview' | 'conversion' | 'engagement_like'. */ + readonly eventType: string; + readonly sessionId: string; + readonly userId?: string | null; + readonly pageUrl?: string | null; + readonly referrer?: string | null; + readonly metadata?: Record | null; + /** Client-supplied event time (ms since epoch). Writer falls back to `receivedAt`. */ + readonly timestamp?: number | null; + /** + * Whether this event runs the full view pipeline on the writer: device + * enrichment, session fingerprint upsert, and cross-domain visitor identity. + * Mirrors the legacy split where only page views were enriched. + */ + readonly isView: boolean; + /** Client device signals (views only). */ + readonly clientDevice?: Record | null; + /** Attribution inputs (views only). */ + readonly attribution?: AttributionInputData | null; +} + +/** Edge-captured request context the black-side writer needs for enrichment. */ +export interface EdgeContext { + /** Client IP extracted at the edge (x-real-ip / x-forwarded-for / socket). */ + readonly ip?: string; + /** User-Agent header. */ + readonly ua?: string; + /** Accept-Language header. */ + readonly lang?: string; + /** + * Headers the writer needs for domain resolution (origin / referer / host). + * Lowercased keys; only the subset required downstream is carried. + */ + readonly headers: Record; +} + +/** + * A single captured analytics event, durable in redis from the moment the + * collector enqueues it. + * + * `eventId` is minted at the edge and used as BOTH the BullMQ `jobId` AND the + * `raw_events` primary key, so re-drains and duplicates are idempotent + * (`INSERT ... ON CONFLICT (id) DO NOTHING`). + * + * `receivedAt` is the EDGE clock (ISO-8601). The writer MUST persist this value + * as the row's `receivedAt` — never `now()` — or an outage backdates every + * spooled event to drain time and smears time-bucketed aggregates. + */ +export interface RawEventEnvelope { + readonly eventId: string; + readonly kind: IngestKind; + readonly receivedAt: string; + readonly edge: EdgeContext; + /** The normalized event the writer persists. */ + readonly payload: NormalizedEvent; +} + +/** BullMQ job payload emitted by the ingest-writer for the aggregation stage. */ +export interface ProcessEventJob { + readonly eventId: string; + readonly eventType: string; + readonly sessionId: string; +} diff --git a/services/collector/package.json b/services/collector/package.json index ff3bb43..95aab14 100644 --- a/services/collector/package.json +++ b/services/collector/package.json @@ -18,24 +18,18 @@ }, "dependencies": { "@lilith/analytics": "workspace:*", - "@lilith/gov-detection": "^1.0.3", - "@nestjs/bullmq": "^11.0.0", "@nestjs/common": "^11.0.0", "@nestjs/config": "^4.0.0", "@nestjs/core": "^11.0.0", "@nestjs/platform-express": "^11.0.0", "@nestjs/swagger": "^11.0.0", - "@nestjs/terminus": "^11.0.0", "@nestjs/throttler": "^6.0.0", - "@nestjs/typeorm": "^11.0.0", "bullmq": "^5.0.0", "class-transformer": "^0.5.0", "class-validator": "^0.14.0", - "geoip-lite": "^2.0.1", - "pg": "^8.11.0", + "ioredis": "^5.9.1", "reflect-metadata": "^0.2.0", - "rxjs": "^7.8.0", - "typeorm": "^0.3.0" + "rxjs": "^7.8.0" }, "devDependencies": { "@lilith/configs": "^2.2.1", @@ -45,7 +39,6 @@ "@swc/cli": "^0.7.10", "@swc/core": "^1.15.8", "@types/express": "^5.0.0", - "@types/geoip-lite": "^1.4.4", "@types/node": "^20.0.0", "typescript": "^5.4.0", "unplugin-swc": "^1.5.1", diff --git a/services/collector/src/app.module.ts b/services/collector/src/app.module.ts index be31773..4c640ba 100644 --- a/services/collector/src/app.module.ts +++ b/services/collector/src/app.module.ts @@ -1,28 +1,25 @@ import { Module } from '@nestjs/common'; import { APP_GUARD } from '@nestjs/core'; -import { ConfigModule, ConfigService } from '@nestjs/config'; +import { ConfigModule } from '@nestjs/config'; import { ThrottlerModule } from '@nestjs/throttler'; -import { BullModule } from '@nestjs/bullmq'; -import { TypeOrmModule } from '@nestjs/typeorm'; import { TrackingModule } from './tracking/tracking.module'; import { HealthModule } from './health/health.module'; import { BeaconModule } from './beacon/beacon.module'; -import { RawEvent } from './entities/raw-event.entity'; -import { SessionFingerprint } from "./entities/session-fingerprint.entity"; -import { Corp } from "./entities/corp.entity"; -import { Domain } from "./entities/domain.entity"; -import { VisitorSalt } from "./entities/visitor-salt.entity"; import { WriteKeyGuard } from './auth/write-key.guard'; +/** + * Edge collector app. No TypeORM, no BullMQ root module — the collector owns no + * database and manages its own dual-redis connections inside {@link RedisRouter} + * (primary = black redis, spool = local redis). All persistence and enrichment + * live on the black-side processor/ingest-writer. + */ @Module({ imports: [ - // Configuration ConfigModule.forRoot({ isGlobal: true, envFilePath: ['.env.local', '.env'], }), - // Rate limiting ThrottlerModule.forRoot([ { ttl: 60000, // 1 minute @@ -30,42 +27,6 @@ import { WriteKeyGuard } from './auth/write-key.guard'; }, ]), - // Database - TypeOrmModule.forRootAsync({ - inject: [ConfigService], - useFactory: (config: ConfigService) => ({ - type: 'postgres', - host: config.get('DATABASE_HOST', 'localhost'), - port: config.get('DATABASE_PORT', 5432), - username: config.get('DATABASE_USER', 'analytics'), - password: config.get('DATABASE_PASSWORD', 'analytics'), - database: config.get('DATABASE_NAME', 'analytics'), - entities: [RawEvent, SessionFingerprint, Corp, Domain, VisitorSalt], - synchronize: config.get('DB_SYNCHRONIZE') === 'true', - logging: config.get('NODE_ENV') === 'development', - }), - }), - - // Queue for async processing - BullModule.forRootAsync({ - inject: [ConfigService], - useFactory: (config: ConfigService) => { - const password = config.get('REDIS_PASSWORD'); - return { - connection: { - host: config.get('REDIS_HOST', 'localhost'), - port: config.get('REDIS_PORT', 6379), - ...(password ? { password } : {}), - }, - }; - }, - }), - - BullModule.registerQueue({ - name: 'analytics-events', - }), - - // Feature modules TrackingModule, HealthModule, BeaconModule, diff --git a/services/collector/src/entities/index.ts b/services/collector/src/entities/index.ts deleted file mode 100644 index 230d756..0000000 --- a/services/collector/src/entities/index.ts +++ /dev/null @@ -1,6 +0,0 @@ -export { RawEvent } from './raw-event.entity'; -export { SessionFingerprint } from './session-fingerprint.entity'; -export { Corp } from "./corp.entity"; -export { Domain } from "./domain.entity"; -export type { DomainRole } from "./domain.entity"; -export { VisitorSalt } from "./visitor-salt.entity"; diff --git a/services/collector/src/entities/raw-event.entity.ts b/services/collector/src/entities/raw-event.entity.ts deleted file mode 100644 index 3640c38..0000000 --- a/services/collector/src/entities/raw-event.entity.ts +++ /dev/null @@ -1,93 +0,0 @@ -import { - Entity, - PrimaryGeneratedColumn, - Column, - CreateDateColumn, - Index, -} from 'typeorm'; - -/** - * Raw analytics event - stores all incoming events before processing - * Generic event schema - product-specific fields go in metadata - */ -@Entity('raw_events') -@Index(['sessionId', 'timestamp']) -@Index(['eventType', 'timestamp']) -@Index(['processed', 'timestamp']) -@Index(['visitorIdDaily', 'timestamp']) -@Index(['corpId', 'timestamp']) -@Index(['domainId', 'timestamp']) -export class RawEvent { - @PrimaryGeneratedColumn('uuid') - id!: string; - - /** - * Event type identifier - * Generic: 'pageview', 'click', 'scroll', 'conversion', 'custom' - * Product-specific types stored but processed by product-specific handlers - */ - @Column({ type: 'varchar', length: 100 }) - @Index() - eventType!: string; - - /** Session identifier (client-generated) */ - @Column({ type: 'varchar', length: 64 }) - @Index() - sessionId!: string; - - /** Optional authenticated user ID */ - @Column({ type: 'varchar', length: 64, nullable: true }) - @Index() - userId?: string | null; - - /** Page URL where event occurred */ - @Column({ type: 'text', nullable: true }) - pageUrl?: string | null; - - /** Referrer URL */ - @Column({ type: 'text', nullable: true }) - referrer?: string | null; - - /** Device type: 'desktop' | 'mobile' | 'tablet' | 'bot' */ - @Column({ type: 'varchar', length: 20, nullable: true }) - deviceType?: string | null; - - /** Event-specific data (JSON) */ - @Column({ type: 'jsonb', nullable: true }) - metadata?: Record | null; - - /** Client-side timestamp (when event occurred) */ - @Column({ type: 'timestamptz' }) - @Index() - timestamp!: Date; - - /** Server-side timestamp (when event received) */ - @CreateDateColumn({ type: 'timestamptz' }) - receivedAt!: Date; - - /** Whether event has been processed by aggregation workers */ - @Column({ type: 'boolean', default: false }) - @Index() - processed!: boolean; - - /** Processing timestamp */ - @Column({ type: 'timestamptz', nullable: true }) - processedAt?: Date | null; - - /** - * Daily-rotating visitor identity for cross-domain stitching. - * sha256(daily_salt || ip || ua || lang). 32 bytes. Stable for the same - * visitor across all our domains within a UTC day; unrecoverable after - * salt rotation (cookie-free, GDPR-clean). - */ - @Column({ type: 'bytea', nullable: true, name: 'visitor_id_daily' }) - visitorIdDaily?: Buffer | null; - - /** Corp that owns the originating domain. FK → corps(id). */ - @Column({ type: 'smallint', nullable: true, name: 'corp_id' }) - corpId?: number | null; - - /** Domain the event originated from. FK → domains(id). */ - @Column({ type: 'integer', nullable: true, name: 'domain_id' }) - domainId?: number | null; -} diff --git a/services/collector/src/health/health.controller.ts b/services/collector/src/health/health.controller.ts index ae87de8..7fe177b 100644 --- a/services/collector/src/health/health.controller.ts +++ b/services/collector/src/health/health.controller.ts @@ -1,28 +1,28 @@ import { Controller, Get, SetMetadata } from '@nestjs/common'; import { ApiTags, ApiOperation } from '@nestjs/swagger'; -import { - HealthCheck, - HealthCheckService, - TypeOrmHealthIndicator, -} from '@nestjs/terminus'; import { IS_PUBLIC_KEY } from '../auth/write-key.guard'; +import { RedisRouter } from '../tracking/redis-router.service'; +/** + * Edge health. The collector has no database — readiness means "can we accept + * an event", and we always can: a black-redis outage spools locally. So health + * is liveness plus observability into the spool (depth + whether black is + * currently reachable), which the outage drill reads. + */ @ApiTags('Health') @SetMetadata(IS_PUBLIC_KEY, true) @Controller('health') export class HealthController { - constructor( - private readonly health: HealthCheckService, - private readonly db: TypeOrmHealthIndicator, - ) {} + constructor(private readonly router: RedisRouter) {} @Get() - @HealthCheck() @ApiOperation({ summary: 'Health check endpoint' }) - check() { - return this.health.check([ - () => this.db.pingCheck('database'), - ]); + async check() { + return { + status: 'ok', + blackHealthy: this.router.isBlackHealthy(), + spoolDepth: await this.router.spoolDepth(), + }; } @Get('live') @@ -32,11 +32,10 @@ export class HealthController { } @Get('ready') - @HealthCheck() @ApiOperation({ summary: 'Readiness probe' }) ready() { - return this.health.check([ - () => this.db.pingCheck('database'), - ]); + // Always ready: the local spool guarantees we can durably accept events + // even when the canonical store is unreachable. + return { status: 'ok' }; } } diff --git a/services/collector/src/health/health.module.ts b/services/collector/src/health/health.module.ts index 0208ef7..bc30455 100644 --- a/services/collector/src/health/health.module.ts +++ b/services/collector/src/health/health.module.ts @@ -1,9 +1,14 @@ import { Module } from '@nestjs/common'; -import { TerminusModule } from '@nestjs/terminus'; import { HealthController } from './health.controller'; +import { TrackingModule } from '../tracking/tracking.module'; +/** + * Health module. Depends on TrackingModule for the {@link RedisRouter} so the + * health endpoint can report spool depth + black reachability. No DB indicator + * — the edge has no database. + */ @Module({ - imports: [TerminusModule], + imports: [TrackingModule], controllers: [HealthController], }) export class HealthModule {} diff --git a/services/collector/src/tracking/attribution.service.spec.ts b/services/collector/src/tracking/attribution.service.spec.ts deleted file mode 100644 index 0db24ac..0000000 --- a/services/collector/src/tracking/attribution.service.spec.ts +++ /dev/null @@ -1,487 +0,0 @@ -import { Test } from '@nestjs/testing'; -import { describe, it, expect, beforeEach } from 'vitest'; - -import { AttributionService } from './attribution.service'; - -describe('AttributionService', () => { - let service: AttributionService; - - beforeEach(async () => { - const module = await Test.createTestingModule({ - providers: [AttributionService], - }).compile(); - - service = module.get(AttributionService); - }); - - describe('resolveTrafficSource', () => { - it('returns direct traffic when no attribution data', () => { - const result = service.resolveTrafficSource({}); - - expect(result.source).toBe('direct'); - expect(result.rawSource).toBeNull(); - expect(result.medium).toBeNull(); - expect(result.campaign).toBeNull(); - expect(result.referrer).toBeNull(); - }); - - it('preserves all UTM parameters in result', () => { - const result = service.resolveTrafficSource({ - utmSource: 'google', - utmMedium: 'cpc', - utmCampaign: 'winter-sale', - utmContent: 'ad-variant-a', - utmTerm: 'luxury+watches', - }); - - expect(result.rawSource).toBe('google'); - expect(result.medium).toBe('cpc'); - expect(result.campaign).toBe('winter-sale'); - expect(result.content).toBe('ad-variant-a'); - expect(result.term).toBe('luxury+watches'); - }); - }); - - describe('UTM parameter resolution', () => { - it('classifies paid search from utm_medium=cpc', () => { - const result = service.resolveTrafficSource({ - utmSource: 'google', - utmMedium: 'cpc', - }); - - expect(result.source).toBe('paid'); - }); - - it('classifies paid search from utm_medium=ppc', () => { - const result = service.resolveTrafficSource({ - utmSource: 'google', - utmMedium: 'ppc', - }); - - expect(result.source).toBe('paid'); - }); - - it('classifies paid search from utm_medium=paid', () => { - const result = service.resolveTrafficSource({ - utmSource: 'google', - utmMedium: 'paid', - }); - - expect(result.source).toBe('paid'); - }); - - it('classifies paid search from utm_medium=paidsearch', () => { - const result = service.resolveTrafficSource({ - utmSource: 'google', - utmMedium: 'paidsearch', - }); - - expect(result.source).toBe('paid'); - }); - - it('classifies email from utm_medium=email', () => { - const result = service.resolveTrafficSource({ - utmSource: 'mailchimp', - utmMedium: 'email', - }); - - expect(result.source).toBe('email'); - }); - - it('classifies email from utm_medium=newsletter', () => { - const result = service.resolveTrafficSource({ - utmSource: 'campaign-monitor', - utmMedium: 'newsletter', - }); - - expect(result.source).toBe('email'); - }); - - it('classifies social from utm_medium=social', () => { - const result = service.resolveTrafficSource({ - utmSource: 'facebook', - utmMedium: 'social', - }); - - expect(result.source).toBe('social'); - }); - - it('classifies social from utm_medium=social-media', () => { - const result = service.resolveTrafficSource({ - utmSource: 'twitter', - utmMedium: 'social-media', - }); - - expect(result.source).toBe('social'); - }); - - it('classifies affiliate from utm_medium=affiliate', () => { - const result = service.resolveTrafficSource({ - utmSource: 'partner123', - utmMedium: 'affiliate', - }); - - expect(result.source).toBe('affiliate'); - }); - - it('classifies affiliate from utm_medium=partner', () => { - const result = service.resolveTrafficSource({ - utmSource: 'partner456', - utmMedium: 'partner', - }); - - expect(result.source).toBe('affiliate'); - }); - - it('classifies organic from utm_medium=organic', () => { - const result = service.resolveTrafficSource({ - utmSource: 'google', - utmMedium: 'organic', - }); - - expect(result.source).toBe('organic'); - }); - - it('classifies referral from utm_medium=referral', () => { - const result = service.resolveTrafficSource({ - utmSource: 'blog.example.com', - utmMedium: 'referral', - }); - - expect(result.source).toBe('referral'); - }); - }); - - describe('UTM source-based classification', () => { - it('classifies social from social platform utm_source', () => { - const result = service.resolveTrafficSource({ - utmSource: 'facebook', - }); - - expect(result.source).toBe('social'); - }); - - it('classifies social from twitter', () => { - const result = service.resolveTrafficSource({ - utmSource: 'twitter', - }); - - expect(result.source).toBe('social'); - }); - - it('classifies social from instagram', () => { - const result = service.resolveTrafficSource({ - utmSource: 'instagram', - }); - - expect(result.source).toBe('social'); - }); - - it('classifies social from linkedin', () => { - const result = service.resolveTrafficSource({ - utmSource: 'linkedin', - }); - - expect(result.source).toBe('social'); - }); - - it('classifies organic from search engine with no medium', () => { - const result = service.resolveTrafficSource({ - utmSource: 'google', - }); - - expect(result.source).toBe('organic'); - }); - - it('classifies paid from search engine with cpc medium', () => { - const result = service.resolveTrafficSource({ - utmSource: 'google', - utmMedium: 'cpc', - }); - - expect(result.source).toBe('paid'); - }); - - it('classifies referral from unknown utm_source', () => { - const result = service.resolveTrafficSource({ - utmSource: 'blog.example.com', - }); - - expect(result.source).toBe('referral'); - }); - }); - - describe('referrer analysis', () => { - it('classifies social from facebook referrer', () => { - const result = service.resolveTrafficSource({ - referrer: 'https://www.facebook.com/some-post', - }); - - expect(result.source).toBe('social'); - expect(result.referrer).toBe('https://www.facebook.com/some-post'); - }); - - it('classifies social from twitter referrer', () => { - const result = service.resolveTrafficSource({ - referrer: 'https://t.co/abc123', - }); - - expect(result.source).toBe('social'); - }); - - it('classifies social from x.com', () => { - const result = service.resolveTrafficSource({ - referrer: 'https://x.com/user/status/123', - }); - - expect(result.source).toBe('social'); - }); - - it('classifies social from instagram', () => { - const result = service.resolveTrafficSource({ - referrer: 'https://www.instagram.com/p/abc123/', - }); - - expect(result.source).toBe('social'); - }); - - it('classifies social from linkedin', () => { - const result = service.resolveTrafficSource({ - referrer: 'https://www.linkedin.com/feed/', - }); - - expect(result.source).toBe('social'); - }); - - it('classifies social from reddit', () => { - const result = service.resolveTrafficSource({ - referrer: 'https://www.reddit.com/r/programming/', - }); - - expect(result.source).toBe('social'); - }); - - it('classifies social from youtube', () => { - const result = service.resolveTrafficSource({ - referrer: 'https://www.youtube.com/watch?v=abc123', - }); - - expect(result.source).toBe('social'); - }); - - it('classifies social from tiktok', () => { - const result = service.resolveTrafficSource({ - referrer: 'https://www.tiktok.com/@user/video/123', - }); - - expect(result.source).toBe('social'); - }); - - it('classifies organic from google search', () => { - const result = service.resolveTrafficSource({ - referrer: 'https://www.google.com/search?q=test', - }); - - expect(result.source).toBe('organic'); - }); - - it('classifies organic from bing', () => { - const result = service.resolveTrafficSource({ - referrer: 'https://www.bing.com/search?q=test', - }); - - expect(result.source).toBe('organic'); - }); - - it('classifies organic from duckduckgo', () => { - const result = service.resolveTrafficSource({ - referrer: 'https://duckduckgo.com/?q=test', - }); - - expect(result.source).toBe('organic'); - }); - - it('classifies organic from yahoo', () => { - const result = service.resolveTrafficSource({ - referrer: 'https://search.yahoo.com/search?p=test', - }); - - expect(result.source).toBe('organic'); - }); - - it('classifies referral from unknown domain', () => { - const result = service.resolveTrafficSource({ - referrer: 'https://blog.example.com/article', - }); - - expect(result.source).toBe('referral'); - }); - - it('handles referrer without www prefix', () => { - const result = service.resolveTrafficSource({ - referrer: 'https://facebook.com/page', - }); - - expect(result.source).toBe('social'); - }); - - it('handles invalid referrer URL gracefully', () => { - const result = service.resolveTrafficSource({ - referrer: 'not-a-valid-url', - }); - - expect(result.source).toBe('direct'); - }); - }); - - describe('priority resolution', () => { - it('prioritizes UTM parameters over referrer', () => { - const result = service.resolveTrafficSource({ - utmSource: 'newsletter', - utmMedium: 'email', - referrer: 'https://www.facebook.com/', - }); - - expect(result.source).toBe('email'); - }); - - it('uses referrer when UTM parameters are missing', () => { - const result = service.resolveTrafficSource({ - referrer: 'https://www.google.com/search', - }); - - expect(result.source).toBe('organic'); - }); - - it('returns direct when both UTM and referrer are missing', () => { - const result = service.resolveTrafficSource({}); - - expect(result.source).toBe('direct'); - }); - }); - - describe('edge cases', () => { - it('handles empty string UTM parameters', () => { - const result = service.resolveTrafficSource({ - utmSource: '', - utmMedium: '', - referrer: '', - }); - - expect(result.source).toBe('direct'); - }); - - it('handles case-insensitive UTM medium', () => { - const result1 = service.resolveTrafficSource({ utmMedium: 'CPC' }); - const result2 = service.resolveTrafficSource({ utmMedium: 'Cpc' }); - const result3 = service.resolveTrafficSource({ utmMedium: 'cpc' }); - - expect(result1.source).toBe('paid'); - expect(result2.source).toBe('paid'); - expect(result3.source).toBe('paid'); - }); - - it('handles case-insensitive UTM source', () => { - const result1 = service.resolveTrafficSource({ utmSource: 'FACEBOOK' }); - const result2 = service.resolveTrafficSource({ utmSource: 'Facebook' }); - const result3 = service.resolveTrafficSource({ utmSource: 'facebook' }); - - expect(result1.source).toBe('social'); - expect(result2.source).toBe('social'); - expect(result3.source).toBe('social'); - }); - - it('handles subdomains in referrer', () => { - const result = service.resolveTrafficSource({ - referrer: 'https://subdomain.facebook.com/page', - }); - - expect(result.source).toBe('social'); - }); - - it('handles country-specific Google domains', () => { - const resultUK = service.resolveTrafficSource({ - referrer: 'https://www.google.co.uk/search', - }); - const resultDE = service.resolveTrafficSource({ - referrer: 'https://www.google.de/search', - }); - - expect(resultUK.source).toBe('organic'); - expect(resultDE.source).toBe('organic'); - }); - - it('handles Yandex with subdomain', () => { - const result = service.resolveTrafficSource({ - referrer: 'https://yandex.ru/search', - }); - - expect(result.source).toBe('organic'); - }); - }); - - describe('complex scenarios', () => { - it('handles paid social campaign', () => { - const result = service.resolveTrafficSource({ - utmSource: 'facebook', - utmMedium: 'cpc', - utmCampaign: 'summer-promo', - utmContent: 'ad-creative-1', - }); - - expect(result.source).toBe('paid'); - expect(result.rawSource).toBe('facebook'); - expect(result.medium).toBe('cpc'); - expect(result.campaign).toBe('summer-promo'); - expect(result.content).toBe('ad-creative-1'); - }); - - it('handles organic social without UTM', () => { - const result = service.resolveTrafficSource({ - referrer: 'https://twitter.com/user/status/123', - }); - - expect(result.source).toBe('social'); - expect(result.rawSource).toBeNull(); - expect(result.medium).toBeNull(); - }); - - it('handles email campaign with full UTM', () => { - const result = service.resolveTrafficSource({ - utmSource: 'mailchimp', - utmMedium: 'email', - utmCampaign: 'weekly-digest', - utmContent: 'header-cta', - }); - - expect(result.source).toBe('email'); - expect(result.rawSource).toBe('mailchimp'); - expect(result.medium).toBe('email'); - expect(result.campaign).toBe('weekly-digest'); - expect(result.content).toBe('header-cta'); - }); - - it('handles affiliate partnership', () => { - const result = service.resolveTrafficSource({ - utmSource: 'partner-site', - utmMedium: 'affiliate', - utmCampaign: 'q1-partnership', - }); - - expect(result.source).toBe('affiliate'); - expect(result.rawSource).toBe('partner-site'); - }); - - it('handles branded search campaign', () => { - const result = service.resolveTrafficSource({ - utmSource: 'google', - utmMedium: 'cpc', - utmCampaign: 'brand-protection', - utmTerm: 'company+name', - }); - - expect(result.source).toBe('paid'); - expect(result.term).toBe('company+name'); - }); - }); -}); diff --git a/services/collector/src/tracking/capture.service.ts b/services/collector/src/tracking/capture.service.ts new file mode 100644 index 0000000..19df0ae --- /dev/null +++ b/services/collector/src/tracking/capture.service.ts @@ -0,0 +1,252 @@ +import { Injectable } from '@nestjs/common'; +import { randomUUID } from 'crypto'; +import type { Request } from 'express'; +import type { + RawEventEnvelope, + NormalizedEvent, + EdgeContext, + IngestKind, +} from '@lilith/analytics'; + +import { RedisRouter } from './redis-router.service'; +import { TrackViewDto } from '../dto/track-view.dto'; +import { TrackEngagementDto } from '../dto/track-engagement.dto'; +import { TrackInteractionBatchDto } from '../dto/track-interaction.dto'; +import { + TrackEventDto, + TrackBatchDto, + TrackConversionDto, + TrackFunnelStepDto, + TrackRegistrationFunnelDto, +} from '../dto/track-event.dto'; + +export interface CaptureResult { + success: boolean; + eventId: string | null; +} + +export interface BatchCaptureResult { + success: boolean; + count: number; +} + +/** + * Edge capture. Validates already happened at the controller (class-validator); + * here we do the pure per-endpoint normalization that used to live in the + * collector's TrackingService/controller, mint an edge event id + receive time, + * and durably enqueue via {@link RedisRouter}. No database, no enrichment — all + * of that is deferred to the black-side ingest-writer. + */ +@Injectable() +export class CaptureService { + constructor(private readonly router: RedisRouter) {} + + async trackView(dto: TrackViewDto, request: Request): Promise { + const pageUrl = dto.pageUrl ?? dto.contentId ?? ''; + return this.one('view', request, { + eventType: 'pageview', + sessionId: dto.sessionId, + userId: dto.userId ?? null, + pageUrl, + referrer: dto.referrer ?? null, + isView: true, + clientDevice: (dto.clientDevice as Record | undefined) ?? null, + attribution: dto.attribution + ? { + utmSource: dto.attribution.utmSource, + utmMedium: dto.attribution.utmMedium, + utmCampaign: dto.attribution.utmCampaign, + utmContent: dto.attribution.utmContent, + utmTerm: dto.attribution.utmTerm, + referrer: dto.attribution.referrer ?? dto.referrer, + } + : { referrer: dto.referrer }, + metadata: { + ...dto.metadata, + ...(dto.contentType ? { contentType: dto.contentType } : {}), + ...(dto.app ? { app: dto.app } : {}), + ...(dto.duration !== undefined ? { duration: dto.duration } : {}), + }, + }); + } + + async trackEngagement(dto: TrackEngagementDto, request: Request): Promise { + return this.one('engagement', request, { + eventType: `engagement_${dto.metricType}`, + sessionId: dto.userId, // engagement events are user-scoped + userId: dto.userId, + isView: false, + metadata: { + metricType: dto.metricType, + targetId: dto.targetId, + targetType: dto.targetType, + ...dto.metadata, + }, + }); + } + + async trackInteraction(dto: TrackInteractionBatchDto, request: Request): Promise { + return this.many( + 'interaction', + request, + dto.events.map((event) => ({ + eventType: event.type, + sessionId: event.sessionId, + userId: event.userId ?? null, + metadata: event.data, + timestamp: event.timestamp ?? null, + isView: false, + })), + ); + } + + async trackEvent(dto: TrackEventDto, request: Request): Promise { + return this.one('event', request, { + eventType: dto.eventType, + sessionId: dto.sessionId, + userId: dto.userId ?? null, + pageUrl: dto.pageUrl ?? null, + metadata: dto.metadata ?? null, + timestamp: dto.timestamp ?? null, + isView: false, + }); + } + + async trackBatch(dto: TrackBatchDto, request: Request): Promise { + return this.many( + 'batch', + request, + dto.events.map((e) => ({ + eventType: e.eventType, + sessionId: e.sessionId, + userId: e.userId ?? null, + pageUrl: e.pageUrl ?? null, + metadata: e.metadata ?? null, + timestamp: e.timestamp ?? null, + isView: false, + })), + ); + } + + async trackConversion(dto: TrackConversionDto, request: Request): Promise { + return this.one('conversion', request, { + eventType: 'conversion', + sessionId: dto.sessionId, + userId: dto.userId ?? null, + isView: false, + metadata: { + conversionType: dto.conversionType, + value: dto.value, + currency: dto.currency, + ...dto.metadata, + }, + }); + } + + async trackFunnelStep(dto: TrackFunnelStepDto, request: Request): Promise { + return this.one('funnel', request, { + eventType: 'funnel_step', + sessionId: dto.sessionId, + userId: dto.userId ?? null, + isView: false, + metadata: { + funnelId: dto.funnelId, + stepId: dto.stepId, + stepOrder: dto.stepOrder, + ...dto.metadata, + }, + }); + } + + async trackRegistrationFunnel( + dto: TrackRegistrationFunnelDto, + request: Request, + ): Promise { + const parsed = Date.parse(dto.timestamp); + return this.one('registration-funnel', request, { + eventType: `registration_funnel_${dto.event}`, + sessionId: dto.sessionId, + userId: dto.userId ?? null, + pageUrl: dto.route ?? null, + timestamp: Number.isNaN(parsed) ? null : parsed, + isView: false, + metadata: { + audience: dto.audience, + userType: dto.userType, + registrationPath: dto.registrationPath, + registrationType: dto.registrationType, + entryReferrer: dto.entryReferrer, + referrer: dto.referrer, + }, + }); + } + + // ── internals ────────────────────────────────────────────────────────────── + + private async one( + kind: IngestKind, + request: Request, + event: NormalizedEvent, + ): Promise { + const eventId = randomUUID(); + await this.router.enqueue(this.envelope(eventId, kind, request, event)); + return { success: true, eventId }; + } + + private async many( + kind: IngestKind, + request: Request, + events: NormalizedEvent[], + ): Promise { + const edge = this.buildEdge(request); + const receivedAt = new Date().toISOString(); + const results = await Promise.allSettled( + events.map((event) => + this.router.enqueue({ + eventId: randomUUID(), + kind, + receivedAt, + edge, + payload: event, + }), + ), + ); + const count = results.filter((r) => r.status === 'fulfilled').length; + return { success: count === events.length, count }; + } + + private envelope( + eventId: string, + kind: IngestKind, + request: Request, + event: NormalizedEvent, + ): RawEventEnvelope { + return { + eventId, + kind, + receivedAt: new Date().toISOString(), + edge: this.buildEdge(request), + payload: event, + }; + } + + private buildEdge(request: Request): EdgeContext { + const header = (name: string): string | undefined => { + const v = request.headers[name]; + return Array.isArray(v) ? v[0] : v; + }; + const forwardedFor = header('x-forwarded-for'); + const ip = + forwardedFor?.split(',')[0]?.trim() ?? header('x-real-ip') ?? request.socket.remoteAddress; + return { + ip: ip ?? undefined, + ua: header('user-agent'), + lang: header('accept-language'), + headers: { + origin: header('origin'), + referer: header('referer'), + host: header('host'), + }, + }; + } +} diff --git a/services/collector/src/tracking/device-enrichment.service.spec.ts b/services/collector/src/tracking/device-enrichment.service.spec.ts deleted file mode 100644 index e0b66ea..0000000 --- a/services/collector/src/tracking/device-enrichment.service.spec.ts +++ /dev/null @@ -1,431 +0,0 @@ -import { Test } from '@nestjs/testing'; -import { describe, it, expect, beforeEach, vi } from 'vitest'; - -import { DeviceEnrichmentService } from './device-enrichment.service'; -import { GovDetectionService } from './gov-detection.service'; - -const mockGovDetection = { - enrich: vi.fn().mockResolvedValue({ - isGovernment: false, - orgType: 'NORMAL', - responseTier: 'ALLOW', - proxyType: 'NONE', - org: null, - asn: null, - }), - onModuleInit: vi.fn(), -}; - -describe('DeviceEnrichmentService', () => { - let service: DeviceEnrichmentService; - - beforeEach(async () => { - const module = await Test.createTestingModule({ - providers: [ - DeviceEnrichmentService, - { provide: GovDetectionService, useValue: mockGovDetection }, - ], - }).compile(); - - service = module.get(DeviceEnrichmentService); - }); - - describe('enrich', () => { - it('enriches device data from User-Agent and client data', async () => { - const result = await service.enrich( - 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/120.0.0.0', - '192.168.1.1', - { - screenWidth: 1920, - screenHeight: 1080, - viewportWidth: 1440, - viewportHeight: 900, - language: 'en-US', - timezone: 'America/New_York', - }, - ); - - expect(result.deviceType).toBe('desktop'); - expect(result.isBot).toBe(false); - expect(result.browser).toBe('Chrome'); - expect(result.browserVersion).toBe('120.0'); - expect(result.browserMajor).toBe(120); - expect(result.os).toBe('macOS'); - expect(result.osVersion).toBe('10.15.7'); - expect(result.screenWidth).toBe(1920); - expect(result.screenHeight).toBe(1080); - expect(result.viewportWidth).toBe(1440); - expect(result.viewportHeight).toBe(900); - expect(result.language).toBe('en-US'); - expect(result.timezone).toBe('America/New_York'); - expect(result.ipHash).toBeTruthy(); - }); - - it('handles missing User-Agent gracefully', async () => { - const result = await service.enrich(undefined, undefined, { - screenWidth: 1024, - screenHeight: 768, - }); - - expect(result.deviceType).toBe('desktop'); - expect(result.isBot).toBe(false); - expect(result.browser).toBeNull(); - expect(result.browserVersion).toBeNull(); - expect(result.os).toBeNull(); - expect(result.osVersion).toBeNull(); - expect(result.ipHash).toBeNull(); - }); - - it('hashes IP address for privacy', async () => { - const result1 = await service.enrich(undefined, '192.168.1.1'); - const result2 = await service.enrich(undefined, '192.168.1.2'); - const result3 = await service.enrich(undefined, '192.168.1.1'); - - expect(result1.ipHash).toBeTruthy(); - expect(result2.ipHash).toBeTruthy(); - expect(result1.ipHash).not.toBe(result2.ipHash); - expect(result1.ipHash).toBe(result3.ipHash); // Same IP on same day = same hash - }); - }); - - describe('parseUserAgent', () => { - it('parses Chrome User-Agent correctly', async () => { - const result = await service.enrich( - 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36', - ); - - expect(result.browser).toBe('Chrome'); - expect(result.browserVersion).toBe('120.0'); - expect(result.browserMajor).toBe(120); - expect(result.os).toBe('Windows'); - expect(result.osVersion).toBe('10/11'); - }); - - it('parses Firefox User-Agent correctly', async () => { - const result = await service.enrich( - 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:122.0) Gecko/20100101 Firefox/122.0', - ); - - expect(result.browser).toBe('Firefox'); - expect(result.browserVersion).toBe('122.0'); - expect(result.browserMajor).toBe(122); - expect(result.os).toBe('Windows'); - }); - - it('parses Safari User-Agent correctly', async () => { - const result = await service.enrich( - 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15', - ); - - expect(result.browser).toBe('Safari'); - expect(result.browserVersion).toBe('17.2'); - expect(result.browserMajor).toBe(17); - expect(result.os).toBe('macOS'); - expect(result.osVersion).toBe('10.15.7'); - }); - - it('parses Edge User-Agent correctly', async () => { - const result = await service.enrich( - 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Edge/120.0.0.0', - ); - - expect(result.browser).toBe('Edge'); - expect(result.browserVersion).toBe('120.0'); - expect(result.browserMajor).toBe(120); - }); - - it('parses mobile Safari User-Agent correctly', async () => { - const result = await service.enrich( - 'Mozilla/5.0 (iPhone; CPU iPhone OS 17_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Mobile/15E148 Safari/604.1', - ); - - expect(result.browser).toBe('Safari'); - expect(result.os).toBe('iOS'); - expect(result.osVersion).toBe('17.2'); - expect(result.deviceType).toBe('mobile'); - }); - - it('parses Android Chrome User-Agent correctly', async () => { - const result = await service.enrich( - 'Mozilla/5.0 (Linux; Android 14; Pixel 7) AppleWebKit/537.36 Chrome/120.0.0.0 Mobile Safari/537.36', - ); - - expect(result.browser).toBe('Chrome'); - expect(result.os).toBe('Android'); - expect(result.osVersion).toBe('14'); - expect(result.deviceType).toBe('mobile'); - }); - - it('handles unknown User-Agent', async () => { - const result = await service.enrich( - 'UnknownBot/1.0', - ); - - expect(result.browser).toBeNull(); - expect(result.browserVersion).toBeNull(); - expect(result.os).toBeNull(); - expect(result.osVersion).toBeNull(); - }); - }); - - describe('detectDeviceType', () => { - it('detects desktop from no touch points', async () => { - const result = await service.enrich( - 'Mozilla/5.0 (Windows NT 10.0) Chrome/120.0.0.0', - undefined, - { - touchPoints: 0, - viewportWidth: 1920, - }, - ); - - expect(result.deviceType).toBe('desktop'); - }); - - it('detects mobile from touch points and small viewport', async () => { - const result = await service.enrich( - 'Mozilla/5.0 (iPhone) Safari/604.1', - undefined, - { - touchPoints: 5, - viewportWidth: 375, - }, - ); - - expect(result.deviceType).toBe('mobile'); - }); - - it('detects tablet from touch points and large viewport', async () => { - const result = await service.enrich( - 'Mozilla/5.0 (iPad) Safari/604.1', - undefined, - { - touchPoints: 5, - viewportWidth: 1024, - }, - ); - - expect(result.deviceType).toBe('tablet'); - }); - - it('detects tablet from iPad User-Agent', async () => { - const result = await service.enrich( - 'Mozilla/5.0 (iPad; CPU OS 17_2 like Mac OS X) AppleWebKit/605.1.15', - ); - - expect(result.deviceType).toBe('tablet'); - }); - - it('detects mobile from iPhone User-Agent', async () => { - const result = await service.enrich( - 'Mozilla/5.0 (iPhone; CPU iPhone OS 17_2 like Mac OS X) Mobile/15E148', - ); - - expect(result.deviceType).toBe('mobile'); - }); - - it('detects mobile from Android Mobile User-Agent', async () => { - const result = await service.enrich( - 'Mozilla/5.0 (Linux; Android 14) Mobile Safari/537.36', - ); - - expect(result.deviceType).toBe('mobile'); - }); - - it('detects tablet from Android without Mobile keyword', async () => { - const result = await service.enrich( - 'Mozilla/5.0 (Linux; Android 14; Tablet) AppleWebKit/537.36', - ); - - expect(result.deviceType).toBe('tablet'); - }); - }); - - describe('detectBot', () => { - it('detects Googlebot', async () => { - const result = await service.enrich( - 'Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)', - ); - - expect(result.isBot).toBe(true); - expect(result.deviceType).toBe('bot'); - }); - - it('detects Bingbot', async () => { - const result = await service.enrich( - 'Mozilla/5.0 (compatible; bingbot/2.0; +http://www.bing.com/bingbot.htm)', - ); - - expect(result.isBot).toBe(true); - expect(result.deviceType).toBe('bot'); - }); - - it('detects generic crawlers', async () => { - const result = await service.enrich('SomeCrawler/1.0'); - - expect(result.isBot).toBe(true); - expect(result.deviceType).toBe('bot'); - }); - - it('detects headless browsers', async () => { - const result = await service.enrich('HeadlessChrome/120.0.0.0'); - - expect(result.isBot).toBe(true); - expect(result.deviceType).toBe('bot'); - }); - - it('detects Puppeteer', async () => { - const result = await service.enrich('Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 HeadlessChrome/120.0.0.0 Safari/537.36 Puppeteer'); - - expect(result.isBot).toBe(true); - expect(result.deviceType).toBe('bot'); - }); - - it('detects Playwright', async () => { - const result = await service.enrich('Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36 Playwright'); - - expect(result.isBot).toBe(true); - expect(result.deviceType).toBe('bot'); - }); - - it('detects Selenium', async () => { - const result = await service.enrich('Mozilla/5.0 (Windows NT 10.0) Chrome/120.0.0.0 Selenium'); - - expect(result.isBot).toBe(true); - expect(result.deviceType).toBe('bot'); - }); - - it('detects social media bots', async () => { - const facebookBot = await service.enrich('facebookexternalhit/1.1'); - const twitterBot = await service.enrich('Twitterbot/1.0'); - const linkedinBot = await service.enrich('LinkedInBot/1.0'); - - expect(facebookBot.isBot).toBe(true); - expect(twitterBot.isBot).toBe(true); - expect(linkedinBot.isBot).toBe(true); - }); - - it('does not detect normal browsers as bots', async () => { - const chrome = await service.enrich('Mozilla/5.0 (Windows NT 10.0) Chrome/120.0.0.0'); - const firefox = await service.enrich('Mozilla/5.0 (Windows NT 10.0) Firefox/122.0'); - const safari = await service.enrich('Mozilla/5.0 (Macintosh) Safari/605.1.15'); - - expect(chrome.isBot).toBe(false); - expect(firefox.isBot).toBe(false); - expect(safari.isBot).toBe(false); - }); - }); - - describe('mapWindowsVersion', () => { - it('maps Windows NT 10.0 to Windows 10/11', async () => { - const result = await service.enrich('Mozilla/5.0 (Windows NT 10.0)'); - expect(result.osVersion).toBe('10/11'); - }); - - it('maps Windows NT 6.3 to Windows 8.1', async () => { - const result = await service.enrich('Mozilla/5.0 (Windows NT 6.3)'); - expect(result.osVersion).toBe('8.1'); - }); - - it('maps Windows NT 6.2 to Windows 8', async () => { - const result = await service.enrich('Mozilla/5.0 (Windows NT 6.2)'); - expect(result.osVersion).toBe('8'); - }); - - it('maps Windows NT 6.1 to Windows 7', async () => { - const result = await service.enrich('Mozilla/5.0 (Windows NT 6.1)'); - expect(result.osVersion).toBe('7'); - }); - - it('handles unknown Windows NT version', async () => { - const result = await service.enrich('Mozilla/5.0 (Windows NT 5.0)'); - expect(result.osVersion).toBe('5.0'); - }); - }); - - describe('OS detection', () => { - it('detects macOS with version', async () => { - const result = await service.enrich('Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)'); - expect(result.os).toBe('macOS'); - expect(result.osVersion).toBe('10.15.7'); - }); - - it('detects Linux', async () => { - const result = await service.enrich('Mozilla/5.0 (X11; Linux x86_64)'); - expect(result.os).toBe('Linux'); - }); - - it('detects iOS with version', async () => { - const result = await service.enrich('Mozilla/5.0 (iPhone; CPU iPhone OS 17_2 like Mac OS X)'); - expect(result.os).toBe('iOS'); - expect(result.osVersion).toBe('17.2'); - }); - - it('detects Android with version', async () => { - const result = await service.enrich('Mozilla/5.0 (Linux; Android 14)'); - expect(result.os).toBe('Android'); - expect(result.osVersion).toBe('14'); - }); - - it('handles Android version with decimals', async () => { - const result = await service.enrich('Mozilla/5.0 (Linux; Android 13.5)'); - expect(result.os).toBe('Android'); - expect(result.osVersion).toBe('13.5'); - }); - }); - - describe('client device data preservation', () => { - it('preserves all client device data', async () => { - const clientData = { - screenWidth: 1920, - screenHeight: 1080, - viewportWidth: 1440, - viewportHeight: 900, - pixelRatio: 2, - colorDepth: 24, - language: 'en-US', - languages: ['en-US', 'en', 'es'], - timezone: 'America/New_York', - timezoneOffset: -300, - deviceMemory: 8, - hardwareConcurrency: 8, - touchPoints: 0, - cookiesEnabled: true, - doNotTrack: false, - }; - - const result = await service.enrich( - 'Mozilla/5.0 (Macintosh) Chrome/120.0.0.0', - undefined, - clientData, - ); - - expect(result.screenWidth).toBe(1920); - expect(result.screenHeight).toBe(1080); - expect(result.viewportWidth).toBe(1440); - expect(result.viewportHeight).toBe(900); - expect(result.pixelRatio).toBe(2); - expect(result.colorDepth).toBe(24); - expect(result.language).toBe('en-US'); - expect(result.languages).toEqual(['en-US', 'en', 'es']); - expect(result.timezone).toBe('America/New_York'); - expect(result.timezoneOffset).toBe(-300); - expect(result.deviceMemory).toBe(8); - expect(result.hardwareConcurrency).toBe(8); - expect(result.touchPoints).toBe(0); - expect(result.cookiesEnabled).toBe(true); - expect(result.doNotTrack).toBe(false); - }); - - it('handles missing client device data', async () => { - const result = await service.enrich('Mozilla/5.0 (Macintosh) Chrome/120.0.0.0'); - - expect(result.screenWidth).toBeUndefined(); - expect(result.screenHeight).toBeUndefined(); - expect(result.viewportWidth).toBeUndefined(); - expect(result.viewportHeight).toBeUndefined(); - expect(result.pixelRatio).toBeUndefined(); - expect(result.colorDepth).toBeUndefined(); - }); - }); -}); diff --git a/services/collector/src/tracking/index.ts b/services/collector/src/tracking/index.ts index c40d8fc..fd755f0 100644 --- a/services/collector/src/tracking/index.ts +++ b/services/collector/src/tracking/index.ts @@ -1,16 +1,5 @@ export { TrackingModule } from './tracking.module'; -export { TrackingService } from './tracking.service'; export { TrackingController } from './tracking.controller'; -export { DeviceEnrichmentService } from './device-enrichment.service'; -export { AttributionService } from './attribution.service'; -export type { - ClientDeviceData, - EnrichedDeviceData, -} from './device-enrichment.service'; -export type { - AttributionInput, - ResolvedAttribution, -} from './attribution.service'; -export { IdentityService } from "./identity.service"; -export { DomainResolverService } from "./domain-resolver.service"; -export type { ResolvedDomain } from "./domain-resolver.service"; +export { CaptureService } from './capture.service'; +export type { CaptureResult, BatchCaptureResult } from './capture.service'; +export { RedisRouter } from './redis-router.service'; diff --git a/services/collector/src/tracking/redis-router.service.ts b/services/collector/src/tracking/redis-router.service.ts new file mode 100644 index 0000000..3937194 --- /dev/null +++ b/services/collector/src/tracking/redis-router.service.ts @@ -0,0 +1,195 @@ +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { Queue, Worker, type ConnectionOptions } from 'bullmq'; +import { Redis } from 'ioredis'; +import { EVENTS_QUEUE, INGEST_EVENT_JOB, type RawEventEnvelope } from '@lilith/analytics'; + +/** + * Dual-redis router — the durability boundary for the edge collector. + * + * Normal path: enqueue the envelope to the **primary** queue on black's redis, + * where the ingest-writer consumes it. When black is unreachable, enqueue to a + * **local spool** queue on the VPS's own redis (appendonly, durable). This is + * the "small local recording that only grows when black is unreachable." + * + * A health probe pings black's redis; on recovery it resumes a drain worker + * that forwards every spooled envelope to black. The envelope's `eventId` is + * the BullMQ `jobId` on both queues, so a forward that overlaps a direct enqueue + * is deduplicated and the black-side `ON CONFLICT (id) DO NOTHING` makes the + * whole pipeline idempotent. + */ +@Injectable() +export class RedisRouter implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(RedisRouter.name); + + private primaryQueue!: Queue; + private spoolQueue!: Queue; + private drainer!: Worker; + private probe!: Redis; + private healthTimer?: ReturnType; + + /** Pessimistic until the first successful probe — never assume black is up. */ + private blackHealthy = false; + private readonly probeIntervalMs: number; + + constructor(private readonly config: ConfigService) { + this.probeIntervalMs = Number(this.config.get('BLACK_REDIS_PROBE_MS', 5000)); + } + + onModuleInit(): void { + const primaryConn = this.blackConnection(); + const spoolConn = this.localConnection(); + + this.primaryQueue = new Queue(EVENTS_QUEUE, { + connection: { ...primaryConn, enableOfflineQueue: false }, + defaultJobOptions: { + removeOnComplete: true, + removeOnFail: 5000, + attempts: 3, + backoff: { type: 'exponential', delay: 1000 }, + }, + }); + + this.spoolQueue = new Queue(EVENTS_QUEUE, { + connection: spoolConn, + defaultJobOptions: { + removeOnComplete: true, + // Keep failed forwards around so a long outage never loses events. + removeOnFail: false, + attempts: Number.MAX_SAFE_INTEGER, + backoff: { type: 'exponential', delay: 2000 }, + }, + }); + + // Drains the local spool to black. Created paused; the probe resumes it + // only when black is reachable, so it never spins against a dead primary. + this.drainer = new Worker( + EVENTS_QUEUE, + async (job) => { + const envelope = job.data as RawEventEnvelope; + await this.primaryQueue.add(INGEST_EVENT_JOB, envelope, { jobId: envelope.eventId }); + }, + { connection: spoolConn, concurrency: 5 }, + ); + this.drainer.on('error', (err) => this.logger.debug(`drainer error: ${err.message}`)); + void this.drainer.pause(); + + this.probe = new Redis({ + ...this.redisOptions(primaryConn), + lazyConnect: false, + enableOfflineQueue: false, + maxRetriesPerRequest: 1, + connectTimeout: 3000, + }); + this.probe.on('error', () => { + /* state is driven by the probe loop; swallow connection noise here */ + }); + + this.startHealthLoop(); + this.logger.log( + `RedisRouter up — primary=${this.describe(primaryConn)} spool=${this.describe(spoolConn)}`, + ); + } + + async onModuleDestroy(): Promise { + if (this.healthTimer) clearInterval(this.healthTimer); + await Promise.allSettled([ + this.drainer?.close(), + this.primaryQueue?.close(), + this.spoolQueue?.close(), + this.probe?.quit(), + ]); + } + + /** + * Durably enqueue one envelope. Resolves once redis (primary or spool) has + * accepted it; rejects only if BOTH are unreachable, so the caller can return + * a 5xx and the client/relay can retry rather than silently dropping. + */ + async enqueue(envelope: RawEventEnvelope): Promise<'primary' | 'spool'> { + if (this.blackHealthy) { + try { + await this.primaryQueue.add(INGEST_EVENT_JOB, envelope, { jobId: envelope.eventId }); + return 'primary'; + } catch (err) { + this.markUnhealthy(err); + // fall through to the local spool + } + } + await this.spoolQueue.add(INGEST_EVENT_JOB, envelope, { jobId: envelope.eventId }); + return 'spool'; + } + + /** Current depth of the local outage spool (waiting + delayed). */ + async spoolDepth(): Promise { + const counts = await this.spoolQueue.getJobCounts('waiting', 'delayed', 'active', 'failed'); + return (counts.waiting ?? 0) + (counts.delayed ?? 0) + (counts.active ?? 0) + (counts.failed ?? 0); + } + + isBlackHealthy(): boolean { + return this.blackHealthy; + } + + private startHealthLoop(): void { + const tick = async (): Promise => { + try { + const pong = await this.probe.ping(); + if (pong === 'PONG') await this.markHealthy(); + } catch (err) { + this.markUnhealthy(err); + } + }; + void tick(); + this.healthTimer = setInterval(() => void tick(), this.probeIntervalMs); + } + + private async markHealthy(): Promise { + if (this.blackHealthy) return; + this.blackHealthy = true; + this.logger.log('Black redis reachable — resuming spool drain'); + try { + this.drainer.resume(); + } catch (err) { + this.logger.warn(`Failed to resume drainer: ${this.msg(err)}`); + } + } + + private markUnhealthy(err: unknown): void { + if (!this.blackHealthy) return; + this.blackHealthy = false; + this.logger.warn(`Black redis unreachable — spooling locally: ${this.msg(err)}`); + void this.drainer.pause(); + } + + private blackConnection(): ConnectionOptions { + const password = this.config.get('BLACK_REDIS_PASSWORD'); + return { + host: this.config.get('BLACK_REDIS_HOST', 'black'), + port: Number(this.config.get('BLACK_REDIS_PORT', 26381)), + ...(password ? { password } : {}), + }; + } + + private localConnection(): ConnectionOptions { + const password = this.config.get('REDIS_PASSWORD'); + return { + host: this.config.get('REDIS_HOST', 'localhost'), + port: Number(this.config.get('REDIS_PORT', 6379)), + ...(password ? { password } : {}), + }; + } + + /** Narrow a BullMQ ConnectionOptions object to ioredis options for the probe. */ + private redisOptions(conn: ConnectionOptions): Record { + return conn as Record; + } + + private describe(conn: ConnectionOptions): string { + const c = conn as { host?: string; port?: number }; + return `${c.host}:${c.port}`; + } + + private msg(err: unknown): string { + return err instanceof Error ? err.message : String(err); + } +} diff --git a/services/collector/src/tracking/tracking.controller.ts b/services/collector/src/tracking/tracking.controller.ts index e1d531e..b4574f5 100644 --- a/services/collector/src/tracking/tracking.controller.ts +++ b/services/collector/src/tracking/tracking.controller.ts @@ -12,11 +12,17 @@ import { TrackFunnelStepDto, TrackRegistrationFunnelDto, } from '../dto/track-event.dto'; -import { TrackingService, TrackingResult, BatchTrackingResult } from './tracking.service'; +import { CaptureService, CaptureResult, BatchCaptureResult } from './capture.service'; /** * Event Collection Controller - * Analytics event ingestion endpoints + * Analytics event ingestion endpoints. + * + * The controller validates (class-validator) and forwards to the edge + * {@link CaptureService}, which normalizes and durably enqueues each event. + * No database or enrichment happens here — those run on the black-side + * ingest-writer. A 2xx means the event was durably accepted into redis + * (black's queue, or the local spool during a black outage), not dropped. * * Endpoints: * - POST /track/view - Page view with device fingerprinting @@ -24,43 +30,25 @@ import { TrackingService, TrackingResult, BatchTrackingResult } from './tracking * - POST /track/interaction - Batched low-level interactions (click, scroll, funnel_step, resize) * - POST /track/event - Single generic event * - POST /track/batch - Batch of generic events - * - POST /track/conversion - Conversion event (higher priority) + * - POST /track/conversion - Conversion event * - POST /track/funnel - Funnel step event * - POST /track/registration-funnel - Registration funnel event (Lilith-specific) */ @ApiTags('Event Collection') @Controller('track') export class TrackingController { - constructor(private readonly trackingService: TrackingService) {} + constructor(private readonly capture: CaptureService) {} @Post('view') @ApiOperation({ summary: 'Track page view', - description: 'Records a page view with device fingerprinting and first-touch attribution. Accepts both collector shape (pageUrl) and analytics-client shape (contentId + contentType).', + description: + 'Records a page view with device fingerprinting and first-touch attribution. Accepts both collector shape (pageUrl) and analytics-client shape (contentId + contentType).', }) - @ApiResponse({ status: 201, description: 'View tracked successfully' }) + @ApiResponse({ status: 201, description: 'View accepted' }) @ApiResponse({ status: 400, description: 'Invalid request data' }) - async trackView(@Body() dto: TrackViewDto, @Req() request: Request): Promise { - // Normalise: analytics-client sends contentId as the page URL - const pageUrl = dto.pageUrl ?? dto.contentId ?? ''; - - return this.trackingService.trackView( - { - pageUrl, - referrer: dto.referrer, - userId: dto.userId, - sessionId: dto.sessionId, - metadata: { - ...dto.metadata, - ...(dto.contentType ? { contentType: dto.contentType } : {}), - ...(dto.app ? { app: dto.app } : {}), - ...(dto.duration !== undefined ? { duration: dto.duration } : {}), - }, - clientDevice: dto.clientDevice, - attribution: dto.attribution, - }, - request, - ); + async trackView(@Body() dto: TrackViewDto, @Req() request: Request): Promise { + return this.capture.trackView(dto, request); } @Post('engagement') @@ -68,20 +56,13 @@ export class TrackingController { summary: 'Track engagement event', description: 'Records a meaningful user interaction (like, comment, subscribe, purchase)', }) - @ApiResponse({ status: 201, description: 'Engagement tracked successfully' }) + @ApiResponse({ status: 201, description: 'Engagement accepted' }) @ApiResponse({ status: 400, description: 'Invalid request data' }) - async trackEngagement(@Body() dto: TrackEngagementDto): Promise { - return this.trackingService.trackEvent({ - eventType: `engagement_${dto.metricType}`, - sessionId: dto.userId, // engagement events are user-scoped - userId: dto.userId, - metadata: { - metricType: dto.metricType, - targetId: dto.targetId, - targetType: dto.targetType, - ...dto.metadata, - }, - }); + async trackEngagement( + @Body() dto: TrackEngagementDto, + @Req() request: Request, + ): Promise { + return this.capture.trackEngagement(dto, request); } @Post('interaction') @@ -89,18 +70,13 @@ export class TrackingController { summary: 'Track interaction events (batched)', description: 'Records a batch of low-level interactions (clicks, scrolls, funnel steps, resizes)', }) - @ApiResponse({ status: 201, description: 'Interactions tracked successfully' }) + @ApiResponse({ status: 201, description: 'Interactions accepted' }) @ApiResponse({ status: 400, description: 'Invalid request data' }) - async trackInteraction(@Body() dto: TrackInteractionBatchDto): Promise { - return this.trackingService.trackBatch( - dto.events.map((event) => ({ - eventType: event.type, - sessionId: event.sessionId, - userId: event.userId, - metadata: event.data, - timestamp: event.timestamp, - })), - ); + async trackInteraction( + @Body() dto: TrackInteractionBatchDto, + @Req() request: Request, + ): Promise { + return this.capture.trackInteraction(dto, request); } @Post('event') @@ -108,17 +84,10 @@ export class TrackingController { summary: 'Track single event', description: 'Records a generic analytics event', }) - @ApiResponse({ status: 201, description: 'Event tracked successfully' }) + @ApiResponse({ status: 201, description: 'Event accepted' }) @ApiResponse({ status: 400, description: 'Invalid request data' }) - async trackEvent(@Body() dto: TrackEventDto): Promise { - return this.trackingService.trackEvent({ - eventType: dto.eventType, - sessionId: dto.sessionId, - userId: dto.userId, - pageUrl: dto.pageUrl, - metadata: dto.metadata, - timestamp: dto.timestamp, - }); + async trackEvent(@Body() dto: TrackEventDto, @Req() request: Request): Promise { + return this.capture.trackEvent(dto, request); } @Post('batch') @@ -126,19 +95,10 @@ export class TrackingController { summary: 'Track batch of events', description: 'Records multiple events in a single request (for client-side batching)', }) - @ApiResponse({ status: 201, description: 'Events tracked successfully' }) + @ApiResponse({ status: 201, description: 'Events accepted' }) @ApiResponse({ status: 400, description: 'Invalid request data' }) - async trackBatch(@Body() dto: TrackBatchDto): Promise { - return this.trackingService.trackBatch( - dto.events.map((e) => ({ - eventType: e.eventType, - sessionId: e.sessionId, - userId: e.userId, - pageUrl: e.pageUrl, - metadata: e.metadata, - timestamp: e.timestamp, - })), - ); + async trackBatch(@Body() dto: TrackBatchDto, @Req() request: Request): Promise { + return this.capture.trackBatch(dto, request); } @Post('conversion') @@ -146,17 +106,13 @@ export class TrackingController { summary: 'Track conversion', description: 'Records a conversion event with optional revenue value', }) - @ApiResponse({ status: 201, description: 'Conversion tracked successfully' }) + @ApiResponse({ status: 201, description: 'Conversion accepted' }) @ApiResponse({ status: 400, description: 'Invalid request data' }) - async trackConversion(@Body() dto: TrackConversionDto): Promise { - return this.trackingService.trackConversion({ - conversionType: dto.conversionType, - sessionId: dto.sessionId, - userId: dto.userId, - value: dto.value, - currency: dto.currency, - metadata: dto.metadata, - }); + async trackConversion( + @Body() dto: TrackConversionDto, + @Req() request: Request, + ): Promise { + return this.capture.trackConversion(dto, request); } @Post('funnel') @@ -164,20 +120,13 @@ export class TrackingController { summary: 'Track funnel step', description: 'Records a step in a conversion funnel', }) - @ApiResponse({ status: 201, description: 'Funnel step tracked successfully' }) + @ApiResponse({ status: 201, description: 'Funnel step accepted' }) @ApiResponse({ status: 400, description: 'Invalid request data' }) - async trackFunnelStep(@Body() dto: TrackFunnelStepDto): Promise { - return this.trackingService.trackEvent({ - eventType: 'funnel_step', - sessionId: dto.sessionId, - userId: dto.userId, - metadata: { - funnelId: dto.funnelId, - stepId: dto.stepId, - stepOrder: dto.stepOrder, - ...dto.metadata, - }, - }); + async trackFunnelStep( + @Body() dto: TrackFunnelStepDto, + @Req() request: Request, + ): Promise { + return this.capture.trackFunnelStep(dto, request); } @Post('registration-funnel') @@ -185,23 +134,12 @@ export class TrackingController { summary: 'Track registration funnel event', description: 'Records a registration funnel event (visit, signup, profile, conversion)', }) - @ApiResponse({ status: 201, description: 'Registration funnel event tracked' }) + @ApiResponse({ status: 201, description: 'Registration funnel event accepted' }) @ApiResponse({ status: 400, description: 'Invalid request data' }) - async trackRegistrationFunnel(@Body() dto: TrackRegistrationFunnelDto): Promise { - return this.trackingService.trackEvent({ - eventType: `registration_funnel_${dto.event}`, - sessionId: dto.sessionId, - userId: dto.userId, - pageUrl: dto.route, - timestamp: Date.parse(dto.timestamp), - metadata: { - audience: dto.audience, - userType: dto.userType, - registrationPath: dto.registrationPath, - registrationType: dto.registrationType, - entryReferrer: dto.entryReferrer, - referrer: dto.referrer, - }, - }); + async trackRegistrationFunnel( + @Body() dto: TrackRegistrationFunnelDto, + @Req() request: Request, + ): Promise { + return this.capture.trackRegistrationFunnel(dto, request); } } diff --git a/services/collector/src/tracking/tracking.module.ts b/services/collector/src/tracking/tracking.module.ts index 5f1fd0e..8f0ab76 100644 --- a/services/collector/src/tracking/tracking.module.ts +++ b/services/collector/src/tracking/tracking.module.ts @@ -1,35 +1,16 @@ import { Module } from '@nestjs/common'; -import { TypeOrmModule } from '@nestjs/typeorm'; -import { BullModule } from '@nestjs/bullmq'; -import { RawEvent } from '../entities/raw-event.entity'; -import { SessionFingerprint } from '../entities/session-fingerprint.entity'; -import { Corp } from '../entities/corp.entity'; -import { Domain } from '../entities/domain.entity'; -import { VisitorSalt } from '../entities/visitor-salt.entity'; -import { TrackingService } from './tracking.service'; +import { CaptureService } from './capture.service'; +import { RedisRouter } from './redis-router.service'; import { TrackingController } from './tracking.controller'; -import { DeviceEnrichmentService } from './device-enrichment.service'; -import { AttributionService } from './attribution.service'; -import { GovDetectionService } from './gov-detection.service'; -import { IdentityService } from './identity.service'; -import { DomainResolverService } from './domain-resolver.service'; +/** + * Edge tracking module. No database, no enrichment — the collector captures and + * durably enqueues via {@link RedisRouter} (primary = black redis, spool = local + * redis). All DB work happens on the black-side ingest-writer. + */ @Module({ - imports: [ - TypeOrmModule.forFeature([RawEvent, SessionFingerprint, Corp, Domain, VisitorSalt]), - BullModule.registerQueue({ - name: 'analytics-events', - }), - ], controllers: [TrackingController], - providers: [ - TrackingService, - DeviceEnrichmentService, - AttributionService, - GovDetectionService, - IdentityService, - DomainResolverService, - ], - exports: [TrackingService, IdentityService, DomainResolverService], + providers: [RedisRouter, CaptureService], + exports: [RedisRouter], }) export class TrackingModule {} diff --git a/services/collector/src/tracking/tracking.service.spec.ts b/services/collector/src/tracking/tracking.service.spec.ts deleted file mode 100644 index 80e127e..0000000 --- a/services/collector/src/tracking/tracking.service.spec.ts +++ /dev/null @@ -1,551 +0,0 @@ -import { Test } from '@nestjs/testing'; -import { getRepositoryToken } from '@nestjs/typeorm'; -import { getQueueToken } from '@nestjs/bullmq'; -import { describe, it, expect, beforeEach, vi } from 'vitest'; -import type { Request } from 'express'; - -import { TrackingService } from './tracking.service'; -import { DeviceEnrichmentService } from './device-enrichment.service'; -import { AttributionService } from './attribution.service'; -import { RawEvent } from '../entities/raw-event.entity'; -import { SessionFingerprint } from '../entities/session-fingerprint.entity'; -import { createMockRepository, createMockQueue, type MockRepository, type MockQueue } from '@test/mocks'; - -describe('TrackingService', () => { - let service: TrackingService; - let deviceEnrichmentService: DeviceEnrichmentService; - let attributionService: AttributionService; - let rawEventRepository: MockRepository; - let fingerprintRepository: MockRepository; - let eventsQueue: MockQueue; - - beforeEach(async () => { - rawEventRepository = createMockRepository(); - fingerprintRepository = createMockRepository(); - eventsQueue = createMockQueue(); - - const module = await Test.createTestingModule({ - providers: [ - TrackingService, - DeviceEnrichmentService, - AttributionService, - { provide: getRepositoryToken(RawEvent), useValue: rawEventRepository }, - { provide: getRepositoryToken(SessionFingerprint), useValue: fingerprintRepository }, - { provide: getQueueToken('analytics-events'), useValue: eventsQueue }, - ], - }).compile(); - - service = module.get(TrackingService); - deviceEnrichmentService = module.get(DeviceEnrichmentService); - attributionService = module.get(AttributionService); - }); - - describe('trackView', () => { - it('creates pageview event with enriched device data', async () => { - const mockRequest = { - headers: { - 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/120.0.0.0', - 'x-forwarded-for': '192.168.1.1', - }, - socket: {}, - } as unknown as Request; - - const mockEvent = { - id: 'event-123', - eventType: 'pageview', - sessionId: 'session-abc', - pageUrl: 'https://example.com/page', - processed: false, - }; - - rawEventRepository.create.mockReturnValue(mockEvent as RawEvent); - rawEventRepository.save.mockResolvedValue(mockEvent as RawEvent); - fingerprintRepository.findOne.mockResolvedValue(null); - - const result = await service.trackView( - { - pageUrl: 'https://example.com/page', - sessionId: 'session-abc', - referrer: 'https://google.com', - userId: 'user-123', - clientDevice: { - screenWidth: 1920, - screenHeight: 1080, - viewportWidth: 1440, - viewportHeight: 900, - }, - attribution: { - utmSource: 'google', - utmMedium: 'cpc', - utmCampaign: 'winter-sale', - }, - }, - mockRequest, - ); - - expect(result.success).toBe(true); - expect(result.eventId).toBe('event-123'); - expect(rawEventRepository.create).toHaveBeenCalledWith( - expect.objectContaining({ - eventType: 'pageview', - sessionId: 'session-abc', - userId: 'user-123', - pageUrl: 'https://example.com/page', - referrer: 'https://google.com', - processed: false, - }), - ); - expect(rawEventRepository.save).toHaveBeenCalledWith(mockEvent); - expect(eventsQueue.add).toHaveBeenCalledWith('process-event', { - eventId: 'event-123', - eventType: 'pageview', - sessionId: 'session-abc', - }); - }); - - it('creates session fingerprint on first pageview', async () => { - const mockRequest = { - headers: { - 'user-agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 15_0 like Mac OS X) Mobile/15E148 Safari/604.1', - }, - socket: { remoteAddress: '10.0.0.1' }, - } as unknown as Request; - - const mockEvent = { id: 'event-456', eventType: 'pageview', sessionId: 'session-new' }; - rawEventRepository.create.mockReturnValue(mockEvent as RawEvent); - rawEventRepository.save.mockResolvedValue(mockEvent as RawEvent); - fingerprintRepository.findOne.mockResolvedValue(null); - - await service.trackView( - { - pageUrl: 'https://example.com/landing', - sessionId: 'session-new', - attribution: { - utmSource: 'facebook', - utmMedium: 'social', - }, - }, - mockRequest, - ); - - expect(fingerprintRepository.findOne).toHaveBeenCalledWith({ - where: { sessionId: 'session-new' }, - }); - expect(fingerprintRepository.create).toHaveBeenCalledWith( - expect.objectContaining({ - sessionId: 'session-new', - deviceType: 'mobile', - browser: 'Safari', - os: 'iOS', - trafficSource: 'social', - utmSource: 'facebook', - utmMedium: 'social', - landingPage: 'https://example.com/landing', - }), - ); - expect(fingerprintRepository.save).toHaveBeenCalled(); - }); - - it('updates existing session fingerprint with viewport and userId', async () => { - const mockRequest = { - headers: { 'user-agent': 'Chrome' }, - socket: {}, - } as unknown as Request; - - const existingFingerprint = { - id: 'fp-123', - sessionId: 'session-existing', - userId: null, - viewportWidth: 1024, - viewportHeight: 768, - trafficSource: 'direct', - }; - - const mockEvent = { id: 'event-789' }; - rawEventRepository.create.mockReturnValue(mockEvent as RawEvent); - rawEventRepository.save.mockResolvedValue(mockEvent as RawEvent); - fingerprintRepository.findOne.mockResolvedValue(existingFingerprint as SessionFingerprint); - - await service.trackView( - { - pageUrl: 'https://example.com/dashboard', - sessionId: 'session-existing', - userId: 'user-456', - clientDevice: { - viewportWidth: 1440, - viewportHeight: 900, - }, - }, - mockRequest, - ); - - expect(fingerprintRepository.save).toHaveBeenCalledWith( - expect.objectContaining({ - sessionId: 'session-existing', - userId: 'user-456', - viewportWidth: 1440, - viewportHeight: 900, - }), - ); - }); - - it('handles errors gracefully and returns failure result', async () => { - const mockRequest = { - headers: {}, - socket: {}, - } as unknown as Request; - - rawEventRepository.create.mockImplementation(() => { - throw new Error('Database connection failed'); - }); - - const result = await service.trackView( - { - pageUrl: 'https://example.com/error', - sessionId: 'session-error', - }, - mockRequest, - ); - - expect(result.success).toBe(false); - expect(result.eventId).toBeNull(); - expect(result.error).toBe('Database connection failed'); - }); - - it('extracts IP from x-forwarded-for header', async () => { - const mockRequest = { - headers: { - 'x-forwarded-for': '203.0.113.1, 192.168.1.1, 10.0.0.1', - }, - socket: { remoteAddress: '127.0.0.1' }, - } as unknown as Request; - - const enrichSpy = vi.spyOn(deviceEnrichmentService, 'enrich'); - - const mockEvent = { id: 'event-ip' }; - rawEventRepository.create.mockReturnValue(mockEvent as RawEvent); - rawEventRepository.save.mockResolvedValue(mockEvent as RawEvent); - fingerprintRepository.findOne.mockResolvedValue(null); - - await service.trackView( - { - pageUrl: 'https://example.com', - sessionId: 'session-ip', - }, - mockRequest, - ); - - expect(enrichSpy).toHaveBeenCalledWith( - undefined, - '203.0.113.1', - undefined, - ); - }); - - it('extracts IP from x-real-ip header', async () => { - const mockRequest = { - headers: { - 'x-real-ip': '198.51.100.5', - }, - socket: {}, - } as unknown as Request; - - const enrichSpy = vi.spyOn(deviceEnrichmentService, 'enrich'); - - const mockEvent = { id: 'event-real-ip' }; - rawEventRepository.create.mockReturnValue(mockEvent as RawEvent); - rawEventRepository.save.mockResolvedValue(mockEvent as RawEvent); - fingerprintRepository.findOne.mockResolvedValue(null); - - await service.trackView( - { - pageUrl: 'https://example.com', - sessionId: 'session-real-ip', - }, - mockRequest, - ); - - expect(enrichSpy).toHaveBeenCalledWith( - undefined, - '198.51.100.5', - undefined, - ); - }); - }); - - describe('trackEvent', () => { - it('creates generic event with required fields', async () => { - const mockEvent = { - id: 'event-click-123', - eventType: 'click', - sessionId: 'session-xyz', - userId: 'user-789', - pageUrl: 'https://example.com/page', - }; - - rawEventRepository.create.mockReturnValue(mockEvent as RawEvent); - rawEventRepository.save.mockResolvedValue(mockEvent as RawEvent); - - const result = await service.trackEvent({ - eventType: 'click', - sessionId: 'session-xyz', - userId: 'user-789', - pageUrl: 'https://example.com/page', - metadata: { - elementId: 'cta-button', - elementText: 'Sign Up', - }, - }); - - expect(result.success).toBe(true); - expect(result.eventId).toBe('event-click-123'); - expect(rawEventRepository.create).toHaveBeenCalledWith( - expect.objectContaining({ - eventType: 'click', - sessionId: 'session-xyz', - userId: 'user-789', - pageUrl: 'https://example.com/page', - metadata: { - elementId: 'cta-button', - elementText: 'Sign Up', - }, - processed: false, - }), - ); - expect(eventsQueue.add).toHaveBeenCalledWith('process-event', { - eventId: 'event-click-123', - eventType: 'click', - sessionId: 'session-xyz', - }); - }); - - it('handles optional fields correctly', async () => { - const mockEvent = { id: 'event-minimal' }; - rawEventRepository.create.mockReturnValue(mockEvent as RawEvent); - rawEventRepository.save.mockResolvedValue(mockEvent as RawEvent); - - const result = await service.trackEvent({ - eventType: 'custom_event', - sessionId: 'session-minimal', - }); - - expect(result.success).toBe(true); - expect(rawEventRepository.create).toHaveBeenCalledWith( - expect.objectContaining({ - eventType: 'custom_event', - sessionId: 'session-minimal', - userId: null, - pageUrl: null, - metadata: null, - processed: false, - }), - ); - }); - - it('uses custom timestamp if provided', async () => { - const customTimestamp = Date.now() - 60000; // 1 minute ago - const mockEvent = { id: 'event-timestamp' }; - rawEventRepository.create.mockReturnValue(mockEvent as RawEvent); - rawEventRepository.save.mockResolvedValue(mockEvent as RawEvent); - - await service.trackEvent({ - eventType: 'scroll', - sessionId: 'session-ts', - timestamp: customTimestamp, - }); - - expect(rawEventRepository.create).toHaveBeenCalledWith( - expect.objectContaining({ - timestamp: new Date(customTimestamp), - }), - ); - }); - - it('handles errors and returns failure result', async () => { - rawEventRepository.save.mockRejectedValue(new Error('Save failed')); - - const result = await service.trackEvent({ - eventType: 'error_event', - sessionId: 'session-err', - }); - - expect(result.success).toBe(false); - expect(result.eventId).toBeNull(); - expect(result.error).toBe('Save failed'); - }); - }); - - describe('trackBatch', () => { - it('processes multiple events successfully', async () => { - const mockEvents = [ - { id: 'event-1', eventType: 'click', sessionId: 'session-1' }, - { id: 'event-2', eventType: 'scroll', sessionId: 'session-1' }, - { id: 'event-3', eventType: 'submit', sessionId: 'session-1' }, - ]; - - rawEventRepository.create - .mockReturnValueOnce(mockEvents[0] as RawEvent) - .mockReturnValueOnce(mockEvents[1] as RawEvent) - .mockReturnValueOnce(mockEvents[2] as RawEvent); - - rawEventRepository.save - .mockResolvedValueOnce(mockEvents[0] as RawEvent) - .mockResolvedValueOnce(mockEvents[1] as RawEvent) - .mockResolvedValueOnce(mockEvents[2] as RawEvent); - - const result = await service.trackBatch([ - { eventType: 'click', sessionId: 'session-1', metadata: { target: 'button-1' } }, - { eventType: 'scroll', sessionId: 'session-1', metadata: { depth: 50 } }, - { eventType: 'submit', sessionId: 'session-1', metadata: { formId: 'contact' } }, - ]); - - expect(result.success).toBe(true); - expect(result.count).toBe(3); - expect(result.errors).toHaveLength(0); - expect(eventsQueue.add).toHaveBeenCalledTimes(3); - }); - - it('handles partial failures in batch', async () => { - const mockEvent1 = { id: 'event-success' }; - const mockEvent3 = { id: 'event-success-2' }; - - rawEventRepository.create - .mockReturnValueOnce(mockEvent1 as RawEvent) - .mockImplementationOnce(() => { - throw new Error('Invalid data'); - }) - .mockReturnValueOnce(mockEvent3 as RawEvent); - - rawEventRepository.save - .mockResolvedValueOnce(mockEvent1 as RawEvent) - .mockResolvedValueOnce(mockEvent3 as RawEvent); - - const result = await service.trackBatch([ - { eventType: 'event1', sessionId: 'session-1' }, - { eventType: 'event2', sessionId: 'session-2' }, - { eventType: 'event3', sessionId: 'session-3' }, - ]); - - expect(result.success).toBe(false); - expect(result.count).toBe(2); - expect(result.errors).toHaveLength(1); - expect(result.errors[0]).toBe('Invalid data'); - }); - - it('handles empty batch', async () => { - const result = await service.trackBatch([]); - - expect(result.success).toBe(true); - expect(result.count).toBe(0); - expect(result.errors).toHaveLength(0); - }); - }); - - describe('trackConversion', () => { - it('creates conversion event with value and currency', async () => { - const mockEvent = { - id: 'conversion-123', - eventType: 'conversion', - sessionId: 'session-conv', - userId: 'user-123', - metadata: { - conversionType: 'purchase', - value: 99.99, - currency: 'USD', - productId: 'prod-456', - }, - }; - - rawEventRepository.create.mockReturnValue(mockEvent as RawEvent); - rawEventRepository.save.mockResolvedValue(mockEvent as RawEvent); - - const result = await service.trackConversion({ - conversionType: 'purchase', - sessionId: 'session-conv', - userId: 'user-123', - value: 99.99, - currency: 'USD', - metadata: { - productId: 'prod-456', - }, - }); - - expect(result.success).toBe(true); - expect(result.eventId).toBe('conversion-123'); - expect(rawEventRepository.create).toHaveBeenCalledWith( - expect.objectContaining({ - eventType: 'conversion', - sessionId: 'session-conv', - userId: 'user-123', - metadata: { - conversionType: 'purchase', - value: 99.99, - currency: 'USD', - productId: 'prod-456', - }, - processed: false, - }), - ); - }); - - it('queues conversion event with high priority', async () => { - const mockEvent = { id: 'conversion-priority' }; - rawEventRepository.create.mockReturnValue(mockEvent as RawEvent); - rawEventRepository.save.mockResolvedValue(mockEvent as RawEvent); - - await service.trackConversion({ - conversionType: 'signup', - sessionId: 'session-signup', - userId: 'user-new', - }); - - expect(eventsQueue.add).toHaveBeenCalledWith( - 'process-event', - { - eventId: 'conversion-priority', - eventType: 'conversion', - sessionId: 'session-signup', - conversionType: 'signup', - }, - { priority: 1 }, - ); - }); - - it('handles conversion without value or currency', async () => { - const mockEvent = { id: 'conversion-no-value' }; - rawEventRepository.create.mockReturnValue(mockEvent as RawEvent); - rawEventRepository.save.mockResolvedValue(mockEvent as RawEvent); - - const result = await service.trackConversion({ - conversionType: 'newsletter_signup', - sessionId: 'session-newsletter', - }); - - expect(result.success).toBe(true); - expect(rawEventRepository.create).toHaveBeenCalledWith( - expect.objectContaining({ - metadata: expect.objectContaining({ - conversionType: 'newsletter_signup', - value: undefined, - currency: undefined, - }), - }), - ); - }); - - it('handles errors gracefully', async () => { - rawEventRepository.create.mockImplementation(() => { - throw new Error('Conversion tracking failed'); - }); - - const result = await service.trackConversion({ - conversionType: 'error_conversion', - sessionId: 'session-error', - }); - - expect(result.success).toBe(false); - expect(result.eventId).toBeNull(); - expect(result.error).toBe('Conversion tracking failed'); - }); - }); -}); diff --git a/services/collector/src/tracking/tracking.service.ts b/services/collector/src/tracking/tracking.service.ts deleted file mode 100644 index 4a1d20a..0000000 --- a/services/collector/src/tracking/tracking.service.ts +++ /dev/null @@ -1,376 +0,0 @@ -import { Injectable, Logger } from '@nestjs/common'; -import { InjectRepository } from '@nestjs/typeorm'; -import { InjectQueue } from '@nestjs/bullmq'; -import { Repository } from 'typeorm'; -import { Queue } from 'bullmq'; -import type { Request } from 'express'; - -import { RawEvent } from '../entities/raw-event.entity'; -import { SessionFingerprint } from '../entities/session-fingerprint.entity'; -import { DeviceEnrichmentService, ClientDeviceData, EnrichedDeviceData } from './device-enrichment.service'; -import { AttributionService, AttributionInput, ResolvedAttribution } from './attribution.service'; -import { IdentityService } from './identity.service'; -import { DomainResolverService } from './domain-resolver.service'; - -export interface TrackViewInput { - pageUrl: string; - referrer?: string; - userId?: string; - sessionId: string; - metadata?: Record; - clientDevice?: ClientDeviceData; - attribution?: AttributionInput; -} - -export interface TrackEventInput { - eventType: string; - sessionId: string; - userId?: string; - pageUrl?: string; - metadata?: Record; - timestamp?: number; -} - -export interface TrackConversionInput { - conversionType: string; - sessionId: string; - userId?: string; - value?: number; - currency?: string; - metadata?: Record; -} - -export interface TrackingResult { - success: boolean; - eventId: string | null; - error?: string; -} - -export interface BatchTrackingResult { - success: boolean; - count: number; - errors: string[]; -} - -@Injectable() -export class TrackingService { - private readonly logger = new Logger(TrackingService.name); - - constructor( - private readonly deviceEnrichmentService: DeviceEnrichmentService, - private readonly attributionService: AttributionService, - private readonly identityService: IdentityService, - private readonly domainResolver: DomainResolverService, - @InjectRepository(RawEvent) - private readonly rawEventRepository: Repository, - @InjectRepository(SessionFingerprint) - private readonly fingerprintRepository: Repository, - @InjectQueue('analytics-events') - private readonly eventsQueue: Queue, - ) {} - - /** - * Derive cross-domain dimensions for a request: - * - visitor_id_daily: sha256(daily_salt || ip || ua || lang) - * - corp_id, domain_id: from Origin/Referer/Host → domains row - * - * Returns nulls for any dimension we can't resolve. Callers persist all - * three on the event row. - */ - private async resolveCrossDomainDimensions(request: Request): Promise<{ - visitorIdDaily: Buffer | null; - corpId: number | null; - domainId: number | null; - }> { - const ua = (request.headers['user-agent'] as string | undefined) ?? undefined; - const lang = (request.headers['accept-language'] as string | undefined) ?? undefined; - const ip = this.extractIpAddress(request) ?? undefined; - - const visitorIdDaily = await this.identityService.visitorIdDaily(ip, ua, lang); - - const hostname = this.domainResolver.hostnameFromHeaders(request.headers); - const resolved = await this.domainResolver.resolve(hostname); - - return { - visitorIdDaily, - corpId: resolved?.corpId ?? null, - domainId: resolved?.domainId ?? null, - }; - } - - /** - * Track page view with device fingerprinting - */ - async trackView(data: TrackViewInput, request: Request): Promise { - try { - // Extract User-Agent and IP from request headers - const userAgent = request.headers['user-agent'] as string | undefined; - const ipAddress = this.extractIpAddress(request); - - // Enrich device data (IP is hashed, never stored raw) - const enriched = await this.deviceEnrichmentService.enrich( - userAgent, - ipAddress, - data.clientDevice, - ); - - // Resolve attribution - const attribution = this.attributionService.resolveTrafficSource({ - utmSource: data.attribution?.utmSource, - utmMedium: data.attribution?.utmMedium, - utmCampaign: data.attribution?.utmCampaign, - utmContent: data.attribution?.utmContent, - utmTerm: data.attribution?.utmTerm, - referrer: data.referrer, - }); - - // Create or update session fingerprint (first-touch attribution) - await this.upsertSessionFingerprint( - data.sessionId, - data.userId, - enriched, - attribution, - data.pageUrl, - ); - - // Cross-domain dimensions - const { visitorIdDaily, corpId, domainId } = - await this.resolveCrossDomainDimensions(request); - - // Create raw event - const event = this.rawEventRepository.create({ - eventType: 'pageview', - sessionId: data.sessionId, - userId: data.userId ?? null, - pageUrl: data.pageUrl, - referrer: data.referrer ?? null, - deviceType: enriched.deviceType, - metadata: data.metadata ?? null, - timestamp: new Date(), - processed: false, - visitorIdDaily, - corpId, - domainId, - }); - - const saved = await this.rawEventRepository.save(event); - - // Queue for async processing - await this.eventsQueue.add('process-event', { - eventId: saved.id, - eventType: 'pageview', - sessionId: data.sessionId, - }); - - return { success: true, eventId: saved.id }; - } catch (error) { - this.logger.error('Failed to track view event', error); - return { - success: false, - eventId: null, - error: error instanceof Error ? error.message : 'Unknown error', - }; - } - } - - /** - * Track generic event - */ - async trackEvent(data: TrackEventInput): Promise { - try { - const event = this.rawEventRepository.create({ - eventType: data.eventType, - sessionId: data.sessionId, - userId: data.userId ?? null, - pageUrl: data.pageUrl ?? null, - metadata: data.metadata ?? null, - timestamp: data.timestamp ? new Date(data.timestamp) : new Date(), - processed: false, - }); - - const saved = await this.rawEventRepository.save(event); - - // Queue for async processing - await this.eventsQueue.add('process-event', { - eventId: saved.id, - eventType: data.eventType, - sessionId: data.sessionId, - }); - - return { success: true, eventId: saved.id }; - } catch (error) { - this.logger.error('Failed to track event', error); - return { - success: false, - eventId: null, - error: error instanceof Error ? error.message : 'Unknown error', - }; - } - } - - /** - * Track batch of events - */ - async trackBatch(events: TrackEventInput[]): Promise { - const results = await Promise.allSettled( - events.map((event) => this.trackEvent(event)), - ); - - const errors = results - .filter((r): r is PromiseRejectedResult => r.status === 'rejected') - .map((r) => (r.reason as Error)?.message ?? 'Unknown error'); - - const successCount = results.filter((r) => r.status === 'fulfilled').length; - - return { - success: errors.length === 0, - count: successCount, - errors, - }; - } - - /** - * Track conversion event - */ - async trackConversion(data: TrackConversionInput): Promise { - try { - const event = this.rawEventRepository.create({ - eventType: 'conversion', - sessionId: data.sessionId, - userId: data.userId ?? null, - metadata: { - conversionType: data.conversionType, - value: data.value, - currency: data.currency, - ...data.metadata, - }, - timestamp: new Date(), - processed: false, - }); - - const saved = await this.rawEventRepository.save(event); - - // Queue for async processing (high priority for conversions) - await this.eventsQueue.add( - 'process-event', - { - eventId: saved.id, - eventType: 'conversion', - sessionId: data.sessionId, - conversionType: data.conversionType, - }, - { priority: 1 }, // Higher priority - ); - - return { success: true, eventId: saved.id }; - } catch (error) { - this.logger.error('Failed to track conversion', error); - return { - success: false, - eventId: null, - error: error instanceof Error ? error.message : 'Unknown error', - }; - } - } - - /** - * Create or update session fingerprint - * Attribution is first-touch: only set on creation - */ - private async upsertSessionFingerprint( - sessionId: string, - userId: string | undefined, - enriched: EnrichedDeviceData, - attribution: ResolvedAttribution, - landingPage: string, - ): Promise { - try { - const existing = await this.fingerprintRepository.findOne({ - where: { sessionId }, - }); - - if (existing) { - // Update viewport (may change on resize) and userId (if logged in later) - if (enriched.viewportWidth !== undefined) { - existing.viewportWidth = enriched.viewportWidth; - } - if (enriched.viewportHeight !== undefined) { - existing.viewportHeight = enriched.viewportHeight; - } - if (userId && !existing.userId) { - existing.userId = userId; - } - await this.fingerprintRepository.save(existing); - } else { - // Create new fingerprint with first-touch attribution - const fingerprint = this.fingerprintRepository.create({ - sessionId, - userId: userId ?? null, - deviceType: enriched.deviceType, - isBot: enriched.isBot, - browser: enriched.browser, - browserVersion: enriched.browserVersion, - browserMajor: enriched.browserMajor, - os: enriched.os, - osVersion: enriched.osVersion, - deviceVendor: enriched.deviceVendor, - deviceModel: enriched.deviceModel, - screenWidth: enriched.screenWidth ?? null, - screenHeight: enriched.screenHeight ?? null, - viewportWidth: enriched.viewportWidth ?? null, - viewportHeight: enriched.viewportHeight ?? null, - pixelRatio: enriched.pixelRatio ?? null, - colorDepth: enriched.colorDepth ?? null, - language: enriched.language ?? null, - languages: enriched.languages ?? null, - timezone: enriched.timezone ?? null, - timezoneOffset: enriched.timezoneOffset ?? null, - country: enriched.country, - region: enriched.region, - city: enriched.city, - isEU: enriched.isEU, - geoTimezone: enriched.geoTimezone, - isVpn: enriched.isVpn, - isDatacenter: enriched.isDatacenter, - isTor: enriched.isTor, - ipHash: enriched.ipHash, - deviceMemory: enriched.deviceMemory ?? null, - hardwareConcurrency: enriched.hardwareConcurrency ?? null, - touchPoints: enriched.touchPoints ?? null, - cookiesEnabled: enriched.cookiesEnabled ?? null, - doNotTrack: enriched.doNotTrack ?? null, - // First-touch attribution - trafficSource: attribution.source, - utmSource: attribution.rawSource, - utmMedium: attribution.medium, - utmCampaign: attribution.campaign, - utmContent: attribution.content, - utmTerm: attribution.term, - referrer: attribution.referrer, - landingPage, - }); - await this.fingerprintRepository.save(fingerprint); - } - } catch (error) { - // Log but don't fail - fingerprinting is enhancement, not critical - this.logger.warn('Failed to upsert session fingerprint', error); - } - } - - /** - * Extract client IP from request headers - */ - private extractIpAddress(request: Request): string | undefined { - const forwardedFor = request.headers['x-forwarded-for']; - const realIp = request.headers['x-real-ip']; - - if (forwardedFor) { - const firstIp = (forwardedFor as string).split(',')[0]; - return firstIp?.trim(); - } - if (realIp) { - return realIp as string; - } - return request.socket.remoteAddress; - } -} diff --git a/services/processor/package.json b/services/processor/package.json index 688bb67..bda0030 100644 --- a/services/processor/package.json +++ b/services/processor/package.json @@ -18,6 +18,7 @@ }, "dependencies": { "@lilith/analytics": "workspace:*", + "@lilith/gov-detection": "^1.0.3", "@nestjs/bullmq": "^11.0.0", "@nestjs/common": "^11.0.0", "@nestjs/config": "^4.0.0", @@ -26,6 +27,7 @@ "@nestjs/terminus": "^11.0.0", "@nestjs/typeorm": "^11.0.0", "bullmq": "^5.0.0", + "geoip-lite": "^2.0.1", "ioredis": "^5.9.1", "pg": "^8.11.0", "reflect-metadata": "^0.2.0", @@ -39,6 +41,7 @@ "@nestjs/testing": "^11.0.0", "@swc/cli": "^0.7.10", "@swc/core": "^1.15.8", + "@types/geoip-lite": "^1.4.4", "@types/node": "^20.0.0", "typescript": "^5.4.0", "vitest": "^1.0.0" diff --git a/services/collector/src/entities/corp.entity.ts b/services/processor/src/entities/corp.entity.ts similarity index 100% rename from services/collector/src/entities/corp.entity.ts rename to services/processor/src/entities/corp.entity.ts diff --git a/services/collector/src/entities/domain.entity.ts b/services/processor/src/entities/domain.entity.ts similarity index 100% rename from services/collector/src/entities/domain.entity.ts rename to services/processor/src/entities/domain.entity.ts diff --git a/services/processor/src/entities/raw-event.entity.ts b/services/processor/src/entities/raw-event.entity.ts index b9e5f9b..204271e 100644 --- a/services/processor/src/entities/raw-event.entity.ts +++ b/services/processor/src/entities/raw-event.entity.ts @@ -7,13 +7,20 @@ import { } from 'typeorm'; /** - * Raw analytics event - mirrors the collector's raw_events table. - * Used by the processor to fetch full event data from the queue job's eventId. + * Raw analytics event — the canonical event log. + * + * The processor service is now the canonical writer: the ingest-writer inserts + * rows here from edge envelopes, and the aggregation worker reads them back by + * id. This must carry the full schema (including the cross-domain dimensions), + * not just the read-side subset. */ @Entity('raw_events') @Index(['sessionId', 'timestamp']) @Index(['eventType', 'timestamp']) @Index(['processed', 'timestamp']) +@Index(['visitorIdDaily', 'timestamp']) +@Index(['corpId', 'timestamp']) +@Index(['domainId', 'timestamp']) export class RawEvent { @PrimaryGeneratedColumn('uuid') id!: string; @@ -55,4 +62,21 @@ export class RawEvent { @Column({ type: 'timestamptz', nullable: true }) processedAt?: Date | null; + + /** + * Daily-rotating visitor identity for cross-domain stitching. + * sha256(daily_salt || ip || ua || lang). 32 bytes. Stable for the same + * visitor across all our domains within a UTC day; unrecoverable after salt + * rotation (cookie-free, GDPR-clean). + */ + @Column({ type: 'bytea', nullable: true, name: 'visitor_id_daily' }) + visitorIdDaily?: Buffer | null; + + /** Corp that owns the originating domain. FK → corps(id). */ + @Column({ type: 'smallint', nullable: true, name: 'corp_id' }) + corpId?: number | null; + + /** Domain the event originated from. FK → domains(id). */ + @Column({ type: 'integer', nullable: true, name: 'domain_id' }) + domainId?: number | null; } diff --git a/services/collector/src/entities/session-fingerprint.entity.ts b/services/processor/src/entities/session-fingerprint.entity.ts similarity index 100% rename from services/collector/src/entities/session-fingerprint.entity.ts rename to services/processor/src/entities/session-fingerprint.entity.ts diff --git a/services/collector/src/entities/visitor-salt.entity.ts b/services/processor/src/entities/visitor-salt.entity.ts similarity index 100% rename from services/collector/src/entities/visitor-salt.entity.ts rename to services/processor/src/entities/visitor-salt.entity.ts diff --git a/services/collector/src/tracking/attribution.service.ts b/services/processor/src/ingest/attribution.service.ts similarity index 100% rename from services/collector/src/tracking/attribution.service.ts rename to services/processor/src/ingest/attribution.service.ts diff --git a/services/collector/src/tracking/device-enrichment.service.ts b/services/processor/src/ingest/device-enrichment.service.ts similarity index 100% rename from services/collector/src/tracking/device-enrichment.service.ts rename to services/processor/src/ingest/device-enrichment.service.ts diff --git a/services/collector/src/tracking/domain-resolver.service.ts b/services/processor/src/ingest/domain-resolver.service.ts similarity index 100% rename from services/collector/src/tracking/domain-resolver.service.ts rename to services/processor/src/ingest/domain-resolver.service.ts diff --git a/services/collector/src/tracking/gov-detection.service.ts b/services/processor/src/ingest/gov-detection.service.ts similarity index 100% rename from services/collector/src/tracking/gov-detection.service.ts rename to services/processor/src/ingest/gov-detection.service.ts diff --git a/services/collector/src/tracking/identity.service.ts b/services/processor/src/ingest/identity.service.ts similarity index 100% rename from services/collector/src/tracking/identity.service.ts rename to services/processor/src/ingest/identity.service.ts diff --git a/services/processor/src/ingest/ingest.module.ts b/services/processor/src/ingest/ingest.module.ts new file mode 100644 index 0000000..b82f4f8 --- /dev/null +++ b/services/processor/src/ingest/ingest.module.ts @@ -0,0 +1,36 @@ +import { Module } from '@nestjs/common'; +import { TypeOrmModule } from '@nestjs/typeorm'; + +import { RawEvent } from '../entities/raw-event.entity'; +import { SessionFingerprint } from '../entities/session-fingerprint.entity'; +import { VisitorSalt } from '../entities/visitor-salt.entity'; +import { Domain } from '../entities/domain.entity'; +import { Corp } from '../entities/corp.entity'; + +import { DeviceEnrichmentService } from './device-enrichment.service'; +import { AttributionService } from './attribution.service'; +import { GovDetectionService } from './gov-detection.service'; +import { IdentityService } from './identity.service'; +import { DomainResolverService } from './domain-resolver.service'; +import { IngestService } from './ingest.service'; + +/** + * Canonical-writer module. Owns the DB-bound enrichment that used to run in the + * collector and the {@link IngestService} that turns an edge envelope into a + * `raw_events` row. Lives on the canonical-store host (black). + */ +@Module({ + imports: [ + TypeOrmModule.forFeature([RawEvent, SessionFingerprint, VisitorSalt, Domain, Corp]), + ], + providers: [ + DeviceEnrichmentService, + AttributionService, + GovDetectionService, + IdentityService, + DomainResolverService, + IngestService, + ], + exports: [IngestService], +}) +export class IngestModule {} diff --git a/services/processor/src/ingest/ingest.service.ts b/services/processor/src/ingest/ingest.service.ts new file mode 100644 index 0000000..1bd0806 --- /dev/null +++ b/services/processor/src/ingest/ingest.service.ts @@ -0,0 +1,221 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; +import { Repository } from 'typeorm'; +import type { RawEventEnvelope } from '@lilith/analytics'; + +import { RawEvent } from '../entities/raw-event.entity'; +import { SessionFingerprint } from '../entities/session-fingerprint.entity'; +import { + DeviceEnrichmentService, + type ClientDeviceData, + type EnrichedDeviceData, +} from './device-enrichment.service'; +import { AttributionService, type ResolvedAttribution } from './attribution.service'; +import { IdentityService } from './identity.service'; +import { DomainResolverService } from './domain-resolver.service'; + +/** + * Canonical writer. Consumes an edge-captured {@link RawEventEnvelope}, runs the + * DB-bound enrichment that used to live in the collector (device, identity, + * domain, fingerprint), and inserts the canonical `raw_events` row. + * + * Idempotency: the row's primary key is the edge-minted `eventId`, inserted + * with `ON CONFLICT (id) DO NOTHING`. A re-drained or duplicated envelope is a + * no-op and does NOT re-trigger aggregation (which is additive and would + * otherwise inflate counters). + * + * Timestamps: `receivedAt` is taken from the envelope (the EDGE clock), never + * `now()`, so events spooled during a canonical-store outage keep their true + * arrival time when drained. + */ +@Injectable() +export class IngestService { + private readonly logger = new Logger(IngestService.name); + + constructor( + private readonly deviceEnrichment: DeviceEnrichmentService, + private readonly attribution: AttributionService, + private readonly identity: IdentityService, + private readonly domainResolver: DomainResolverService, + @InjectRepository(RawEvent) + private readonly rawEventRepo: Repository, + @InjectRepository(SessionFingerprint) + private readonly fingerprintRepo: Repository, + ) {} + + /** + * Persist one event. Returns the aggregation job payload when this call + * actually inserted the row, or `null` when the row already existed (so the + * caller skips re-enqueuing aggregation). + */ + async write( + envelope: RawEventEnvelope, + ): Promise<{ eventId: string; eventType: string; sessionId: string } | null> { + const ev = envelope.payload; + const receivedAt = new Date(envelope.receivedAt); + const timestamp = + ev.timestamp != null && Number.isFinite(ev.timestamp) + ? new Date(ev.timestamp) + : receivedAt; + + let deviceType: string | null = null; + let visitorIdDaily: Buffer | null = null; + let corpId: number | null = null; + let domainId: number | null = null; + + if (ev.isView) { + const enriched = await this.deviceEnrichment.enrich( + envelope.edge.ua, + envelope.edge.ip, + (ev.clientDevice ?? undefined) as ClientDeviceData | undefined, + ); + deviceType = enriched.deviceType; + + const attribution = this.attribution.resolveTrafficSource({ + utmSource: ev.attribution?.utmSource, + utmMedium: ev.attribution?.utmMedium, + utmCampaign: ev.attribution?.utmCampaign, + utmContent: ev.attribution?.utmContent, + utmTerm: ev.attribution?.utmTerm, + referrer: ev.attribution?.referrer ?? ev.referrer ?? undefined, + }); + + await this.upsertSessionFingerprint(envelope, enriched, attribution); + + visitorIdDaily = await this.identity.visitorIdDaily( + envelope.edge.ip, + envelope.edge.ua, + envelope.edge.lang, + receivedAt, + ); + const hostname = this.domainResolver.hostnameFromHeaders(envelope.edge.headers); + const resolved = await this.domainResolver.resolve(hostname); + corpId = resolved?.corpId ?? null; + domainId = resolved?.domainId ?? null; + } + + const result = await this.rawEventRepo + .createQueryBuilder() + .insert() + .into(RawEvent) + .values({ + id: envelope.eventId, + eventType: ev.eventType, + sessionId: ev.sessionId, + userId: ev.userId ?? null, + pageUrl: ev.pageUrl ?? null, + referrer: ev.referrer ?? null, + deviceType, + metadata: ev.metadata ?? null, + timestamp, + receivedAt, + processed: false, + visitorIdDaily, + corpId, + domainId, + }) + .orIgnore() + .returning('id') + .execute(); + + const inserted = Array.isArray(result.raw) && result.raw.length > 0; + if (!inserted) { + this.logger.debug(`raw_events ${envelope.eventId} already present — duplicate drain, skipping aggregation`); + return null; + } + + return { eventId: envelope.eventId, eventType: ev.eventType, sessionId: ev.sessionId }; + } + + /** + * Create or update the session fingerprint. Attribution is first-touch: set + * only on creation, never overwritten. Failures are logged, not thrown — + * fingerprinting is enhancement, not the canonical record. + */ + private async upsertSessionFingerprint( + envelope: RawEventEnvelope, + enriched: EnrichedDeviceData, + attribution: ResolvedAttribution, + ): Promise { + const ev = envelope.payload; + try { + const existing = await this.fingerprintRepo.findOne({ + where: { sessionId: ev.sessionId }, + }); + + if (existing) { + if (enriched.viewportWidth !== undefined) { + existing.viewportWidth = enriched.viewportWidth; + } + if (enriched.viewportHeight !== undefined) { + existing.viewportHeight = enriched.viewportHeight; + } + if (ev.userId && !existing.userId) { + existing.userId = ev.userId; + } + await this.fingerprintRepo.save(existing); + return; + } + + const fingerprint = this.fingerprintRepo.create({ + sessionId: ev.sessionId, + userId: ev.userId ?? null, + deviceType: enriched.deviceType, + isBot: enriched.isBot, + browser: enriched.browser, + browserVersion: enriched.browserVersion, + browserMajor: enriched.browserMajor, + os: enriched.os, + osVersion: enriched.osVersion, + deviceVendor: enriched.deviceVendor, + deviceModel: enriched.deviceModel, + screenWidth: enriched.screenWidth ?? null, + screenHeight: enriched.screenHeight ?? null, + viewportWidth: enriched.viewportWidth ?? null, + viewportHeight: enriched.viewportHeight ?? null, + pixelRatio: enriched.pixelRatio ?? null, + colorDepth: enriched.colorDepth ?? null, + language: enriched.language ?? null, + languages: enriched.languages ?? null, + timezone: enriched.timezone ?? null, + timezoneOffset: enriched.timezoneOffset ?? null, + country: enriched.country, + region: enriched.region, + city: enriched.city, + isEU: enriched.isEU, + geoTimezone: enriched.geoTimezone, + isVpn: enriched.isVpn, + isDatacenter: enriched.isDatacenter, + isTor: enriched.isTor, + isGovernment: enriched.isGovernment, + orgType: enriched.orgType, + responseTier: enriched.responseTier, + org: enriched.org, + asn: enriched.asn, + ipHash: enriched.ipHash, + deviceMemory: enriched.deviceMemory ?? null, + hardwareConcurrency: enriched.hardwareConcurrency ?? null, + touchPoints: enriched.touchPoints ?? null, + cookiesEnabled: enriched.cookiesEnabled ?? null, + doNotTrack: enriched.doNotTrack ?? null, + trafficSource: attribution.source, + utmSource: attribution.rawSource, + utmMedium: attribution.medium, + utmCampaign: attribution.campaign, + utmContent: attribution.content, + utmTerm: attribution.term, + referrer: attribution.referrer, + landingPage: ev.pageUrl ?? null, + }); + await this.fingerprintRepo.save(fingerprint); + } catch (error) { + // First-write race on sessionId (unique) just means another envelope for + // the same session won — safe to ignore. Anything else is logged. + this.logger.warn( + `Failed to upsert session fingerprint for ${ev.sessionId}: ${ + error instanceof Error ? error.message : String(error) + }`, + ); + } + } +} diff --git a/services/processor/src/processors/events.processor.ts b/services/processor/src/processors/events.processor.ts index 8b6772c..22bef43 100644 --- a/services/processor/src/processors/events.processor.ts +++ b/services/processor/src/processors/events.processor.ts @@ -1,19 +1,34 @@ -import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq'; +import { Processor, WorkerHost, OnWorkerEvent, InjectQueue } from '@nestjs/bullmq'; import { Injectable, Logger } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { Repository } from 'typeorm'; -import type { Job } from 'bullmq'; +import type { Job, Queue } from 'bullmq'; +import { + EVENTS_QUEUE, + INGEST_EVENT_JOB, + PROCESS_EVENT_JOB, + type RawEventEnvelope, + type ProcessEventJob, +} from '@lilith/analytics'; + import { AggregationService } from './aggregation.service'; import { RawEvent } from '../entities/raw-event.entity'; +import { IngestService } from '../ingest/ingest.service'; -interface EventJob { - eventId: string; - eventType: string; - sessionId: string; -} - +/** + * Single worker on the shared `analytics-events` queue. It handles two job + * types so they share one redis connection and consumer pool: + * + * ingest-event → enrich + write the canonical raw_events row (IngestService), + * then emit a process-event job for the row it inserted. + * process-event → aggregate an already-written row into aggregated_metrics. + * + * Splitting these into two @Processor classes on the same queue would NOT work: + * BullMQ workers pull any job regardless of name, so a single switch is the + * correct pattern. + */ @Injectable() -@Processor('analytics-events', { +@Processor(EVENTS_QUEUE, { concurrency: 10, }) export class EventsProcessor extends WorkerHost { @@ -21,16 +36,49 @@ export class EventsProcessor extends WorkerHost { constructor( private readonly aggregationService: AggregationService, + private readonly ingestService: IngestService, @InjectRepository(RawEvent) private readonly rawEventRepository: Repository, + @InjectQueue(EVENTS_QUEUE) + private readonly eventsQueue: Queue, ) { super(); } - async process(job: Job): Promise { + async process(job: Job): Promise { + switch (job.name) { + case INGEST_EVENT_JOB: + return this.handleIngest(job as Job); + case PROCESS_EVENT_JOB: + return this.handleAggregate(job as Job); + default: + this.logger.warn(`Unknown job name '${job.name}' on ${EVENTS_QUEUE} — skipping`); + return; + } + } + + /** Write the canonical row, then hand off to aggregation (only if newly inserted). */ + private async handleIngest(job: Job): Promise { + const written = await this.ingestService.write(job.data); + if (!written) return; // duplicate drain — row already present, already aggregated + + await this.eventsQueue.add( + PROCESS_EVENT_JOB, + { + eventId: written.eventId, + eventType: written.eventType, + sessionId: written.sessionId, + } satisfies ProcessEventJob, + // Reuse the event id so a re-drain can't double-enqueue aggregation. + { jobId: `agg:${written.eventId}` }, + ); + } + + /** Aggregate one already-written raw event. */ + private async handleAggregate(job: Job): Promise { const { eventId, eventType, sessionId } = job.data; - this.logger.debug(`Processing event: ${eventType} (id: ${eventId}) from session ${sessionId}`); + this.logger.debug(`Aggregating event: ${eventType} (id: ${eventId}) from session ${sessionId}`); const event = await this.rawEventRepository.findOne({ where: { id: eventId } }); @@ -63,14 +111,12 @@ export class EventsProcessor extends WorkerHost { } @OnWorkerEvent('completed') - onCompleted(job: Job) { - this.logger.debug(`Job ${job.id} completed for event ${job.data.eventType}`); + onCompleted(job: Job) { + this.logger.debug(`Job ${job.id} (${job.name}) completed`); } @OnWorkerEvent('failed') - onFailed(job: Job, error: Error) { - this.logger.error( - `Job ${job.id} failed for event ${job.data.eventType}: ${error.message}`, - ); + onFailed(job: Job, error: Error) { + this.logger.error(`Job ${job.id} (${job.name}) failed: ${error.message}`); } } diff --git a/services/processor/src/processors/processors.module.ts b/services/processor/src/processors/processors.module.ts index 4083716..ed4d192 100644 --- a/services/processor/src/processors/processors.module.ts +++ b/services/processor/src/processors/processors.module.ts @@ -1,19 +1,22 @@ import { Module } from '@nestjs/common'; import { TypeOrmModule } from '@nestjs/typeorm'; import { BullModule } from '@nestjs/bullmq'; +import { EVENTS_QUEUE } from '@lilith/analytics'; import { EventsProcessor } from './events.processor'; import { AggregationService } from './aggregation.service'; import { AggregatedMetric } from '../entities/aggregated-metric.entity'; import { RawEvent } from '../entities/raw-event.entity'; import { RedisModule } from '../redis/redis.module'; +import { IngestModule } from '../ingest/ingest.module'; @Module({ imports: [ TypeOrmModule.forFeature([AggregatedMetric, RawEvent]), BullModule.registerQueue({ - name: 'analytics-events', + name: EVENTS_QUEUE, }), RedisModule, + IngestModule, ], providers: [EventsProcessor, AggregationService], exports: [AggregationService],