diff --git a/services/collector/src/app.module.ts b/services/collector/src/app.module.ts index 31b6492..00bcb8e 100644 --- a/services/collector/src/app.module.ts +++ b/services/collector/src/app.module.ts @@ -7,7 +7,10 @@ import { TypeOrmModule } from '@nestjs/typeorm'; import { TrackingModule } from './tracking/tracking.module'; import { HealthModule } from './health/health.module'; import { RawEvent } from './entities/raw-event.entity'; -import { SessionFingerprint } from './entities/session-fingerprint.entity'; +import { SessionFingerprint } from "./entities/session-fingerprint.entity"; +import { Corp } from "./entities/corp.entity"; +import { Domain } from "./entities/domain.entity"; +import { VisitorSalt } from "./entities/visitor-salt.entity"; import { WriteKeyGuard } from './auth/write-key.guard'; @Module({ @@ -36,7 +39,7 @@ import { WriteKeyGuard } from './auth/write-key.guard'; username: config.get('DATABASE_USER', 'analytics'), password: config.get('DATABASE_PASSWORD', 'analytics'), database: config.get('DATABASE_NAME', 'analytics'), - entities: [RawEvent, SessionFingerprint], + entities: [RawEvent, SessionFingerprint, Corp, Domain, VisitorSalt], synchronize: config.get('DB_SYNCHRONIZE') === 'true', logging: config.get('NODE_ENV') === 'development', }), diff --git a/services/collector/src/entities/raw-event.entity.ts b/services/collector/src/entities/raw-event.entity.ts index 37cd6e4..3640c38 100644 --- a/services/collector/src/entities/raw-event.entity.ts +++ b/services/collector/src/entities/raw-event.entity.ts @@ -14,6 +14,9 @@ import { @Index(['sessionId', 'timestamp']) @Index(['eventType', 'timestamp']) @Index(['processed', 'timestamp']) +@Index(['visitorIdDaily', 'timestamp']) +@Index(['corpId', 'timestamp']) +@Index(['domainId', 'timestamp']) export class RawEvent { @PrimaryGeneratedColumn('uuid') id!: string; @@ -70,4 +73,21 @@ export class RawEvent { /** Processing timestamp */ @Column({ type: 'timestamptz', nullable: true }) processedAt?: Date | null; + + /** + * Daily-rotating visitor identity for cross-domain stitching. + * sha256(daily_salt || ip || ua || lang). 32 bytes. Stable for the same + * visitor across all our domains within a UTC day; unrecoverable after + * salt rotation (cookie-free, GDPR-clean). + */ + @Column({ type: 'bytea', nullable: true, name: 'visitor_id_daily' }) + visitorIdDaily?: Buffer | null; + + /** Corp that owns the originating domain. FK → corps(id). */ + @Column({ type: 'smallint', nullable: true, name: 'corp_id' }) + corpId?: number | null; + + /** Domain the event originated from. FK → domains(id). */ + @Column({ type: 'integer', nullable: true, name: 'domain_id' }) + domainId?: number | null; } diff --git a/services/collector/src/tracking/domain-resolver.service.ts b/services/collector/src/tracking/domain-resolver.service.ts new file mode 100644 index 0000000..a27625e --- /dev/null +++ b/services/collector/src/tracking/domain-resolver.service.ts @@ -0,0 +1,71 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; +import { Repository } from 'typeorm'; + +import { Domain } from '../entities/domain.entity'; + +export interface ResolvedDomain { + domainId: number; + corpId: number; + hostname: string; +} + +/** + * Resolves request → (corp_id, domain_id) by matching the request's hostname + * against the `domains` table. The proxy strips `Host` upstream, so we read + * Origin first, then Referer as a fallback. + * + * Unknown hosts return null — the caller decides whether to drop the event or + * persist with NULL dimensions. Default: drop, to keep the dataset clean. + */ +@Injectable() +export class DomainResolverService { + private readonly logger = new Logger(DomainResolverService.name); + + /** hostname → resolved row. Reloaded on miss. */ + private cache = new Map(); + + constructor( + @InjectRepository(Domain) + private readonly domainRepo: Repository, + ) {} + + /** + * Extract the hostname from the most reliable header available. + * Returns lowercase, no port, no path. + */ + hostnameFromHeaders(headers: Record): string | null { + const pick = (h: string | string[] | undefined): string | undefined => { + if (Array.isArray(h)) return h[0]; + return h; + }; + const origin = pick(headers['origin']); + const referer = pick(headers['referer']); + const host = pick(headers['host']); + const candidate = origin ?? referer ?? host; + if (!candidate) return null; + try { + const u = candidate.includes('://') ? new URL(candidate) : new URL(`http://${candidate}`); + return u.hostname.toLowerCase(); + } catch { + return null; + } + } + + /** Look up a domain row by hostname. */ + async resolve(hostname: string | null): Promise { + if (!hostname) return null; + const cached = this.cache.get(hostname); + if (cached !== undefined) return cached; + + const row = await this.domainRepo.findOne({ where: { hostname } }); + const resolved: ResolvedDomain | null = row + ? { domainId: row.id, corpId: row.corpId, hostname: row.hostname } + : null; + this.cache.set(hostname, resolved); + if (!resolved) { + this.logger.warn({ hostname }, 'Unknown domain — event will be dropped or untagged'); + } + return resolved; + } +} diff --git a/services/collector/src/tracking/identity.service.ts b/services/collector/src/tracking/identity.service.ts new file mode 100644 index 0000000..3385600 --- /dev/null +++ b/services/collector/src/tracking/identity.service.ts @@ -0,0 +1,98 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; +import { Repository } from 'typeorm'; +import * as crypto from 'crypto'; + +import { VisitorSalt } from '../entities/visitor-salt.entity'; + +/** + * Server-side visitor identity for cross-domain stitching. + * + * visitor_id_daily = sha256(daily_salt || ip || ua || accept_language) + * + * Same visitor across all our domains within a UTC day → same id. + * Salt rotates at 00:00 UTC and is purged from `visitor_salts` after 7 days, + * making historical re-identification mathematically impossible. + */ +@Injectable() +export class IdentityService { + private readonly logger = new Logger(IdentityService.name); + + /** Process-local salt cache. Keyed by YYYY-MM-DD (UTC). */ + private cache = new Map(); + + constructor( + @InjectRepository(VisitorSalt) + private readonly saltRepo: Repository, + ) {} + + /** + * Compute today's visitor id from request attributes. + * Returns a 32-byte Buffer (sha256 digest), or null if no identifying + * attribute is present (extremely unlikely — we always have at minimum a UA). + */ + async visitorIdDaily( + ip: string | undefined, + ua: string | undefined, + lang: string | undefined, + now: Date = new Date(), + ): Promise { + if (!ip && !ua && !lang) return null; + + const day = this.utcDay(now); + const salt = await this.getSaltFor(day); + + const h = crypto.createHash('sha256'); + h.update(salt); + h.update('\n'); + h.update(ip ?? ''); + h.update('\n'); + h.update(ua ?? ''); + h.update('\n'); + h.update(lang ?? ''); + return h.digest(); + } + + /** YYYY-MM-DD in UTC. */ + private utcDay(d: Date): string { + return d.toISOString().slice(0, 10); + } + + /** + * Fetch the salt for `day`, creating one race-safely if missing. + * Memoized for the lifetime of the process; cache invalidates when the + * UTC day rolls over (entries from older days remain valid for late events). + */ + private async getSaltFor(day: string): Promise { + const cached = this.cache.get(day); + if (cached) return cached; + + // Try read. + const existing = await this.saltRepo.findOne({ where: { day } }); + if (existing) { + const buf = Buffer.isBuffer(existing.salt) ? existing.salt : Buffer.from(existing.salt); + this.cache.set(day, buf); + return buf; + } + + // Create. Race-safe via ON CONFLICT DO NOTHING — re-fetch if we lost. + const fresh = crypto.randomBytes(32); + await this.saltRepo + .createQueryBuilder() + .insert() + .into(VisitorSalt) + .values({ day, salt: fresh }) + .orIgnore() + .execute(); + + const final = await this.saltRepo.findOne({ where: { day } }); + if (!final) { + // Should be impossible — orIgnore() either inserted ours or kept theirs. + this.logger.error({ day }, 'Failed to read visitor_salts row after race-safe insert'); + throw new Error(`visitor_salts row for ${day} missing after insert`); + } + const buf = Buffer.isBuffer(final.salt) ? final.salt : Buffer.from(final.salt); + this.cache.set(day, buf); + return buf; + } +} diff --git a/services/collector/src/tracking/index.ts b/services/collector/src/tracking/index.ts index ed0320d..c40d8fc 100644 --- a/services/collector/src/tracking/index.ts +++ b/services/collector/src/tracking/index.ts @@ -11,3 +11,6 @@ export type { AttributionInput, ResolvedAttribution, } from './attribution.service'; +export { IdentityService } from "./identity.service"; +export { DomainResolverService } from "./domain-resolver.service"; +export type { ResolvedDomain } from "./domain-resolver.service"; diff --git a/services/collector/src/tracking/tracking.module.ts b/services/collector/src/tracking/tracking.module.ts index e2f620a..5f1fd0e 100644 --- a/services/collector/src/tracking/tracking.module.ts +++ b/services/collector/src/tracking/tracking.module.ts @@ -3,21 +3,33 @@ import { TypeOrmModule } from '@nestjs/typeorm'; import { BullModule } from '@nestjs/bullmq'; import { RawEvent } from '../entities/raw-event.entity'; import { SessionFingerprint } from '../entities/session-fingerprint.entity'; +import { Corp } from '../entities/corp.entity'; +import { Domain } from '../entities/domain.entity'; +import { VisitorSalt } from '../entities/visitor-salt.entity'; import { TrackingService } from './tracking.service'; import { TrackingController } from './tracking.controller'; import { DeviceEnrichmentService } from './device-enrichment.service'; import { AttributionService } from './attribution.service'; import { GovDetectionService } from './gov-detection.service'; +import { IdentityService } from './identity.service'; +import { DomainResolverService } from './domain-resolver.service'; @Module({ imports: [ - TypeOrmModule.forFeature([RawEvent, SessionFingerprint]), + TypeOrmModule.forFeature([RawEvent, SessionFingerprint, Corp, Domain, VisitorSalt]), BullModule.registerQueue({ name: 'analytics-events', }), ], controllers: [TrackingController], - providers: [TrackingService, DeviceEnrichmentService, AttributionService, GovDetectionService], - exports: [TrackingService], + providers: [ + TrackingService, + DeviceEnrichmentService, + AttributionService, + GovDetectionService, + IdentityService, + DomainResolverService, + ], + exports: [TrackingService, IdentityService, DomainResolverService], }) export class TrackingModule {} diff --git a/services/collector/src/tracking/tracking.service.ts b/services/collector/src/tracking/tracking.service.ts index 3127913..4a1d20a 100644 --- a/services/collector/src/tracking/tracking.service.ts +++ b/services/collector/src/tracking/tracking.service.ts @@ -9,6 +9,8 @@ import { RawEvent } from '../entities/raw-event.entity'; import { SessionFingerprint } from '../entities/session-fingerprint.entity'; import { DeviceEnrichmentService, ClientDeviceData, EnrichedDeviceData } from './device-enrichment.service'; import { AttributionService, AttributionInput, ResolvedAttribution } from './attribution.service'; +import { IdentityService } from './identity.service'; +import { DomainResolverService } from './domain-resolver.service'; export interface TrackViewInput { pageUrl: string; @@ -57,6 +59,8 @@ export class TrackingService { constructor( private readonly deviceEnrichmentService: DeviceEnrichmentService, private readonly attributionService: AttributionService, + private readonly identityService: IdentityService, + private readonly domainResolver: DomainResolverService, @InjectRepository(RawEvent) private readonly rawEventRepository: Repository, @InjectRepository(SessionFingerprint) @@ -65,6 +69,35 @@ export class TrackingService { private readonly eventsQueue: Queue, ) {} + /** + * Derive cross-domain dimensions for a request: + * - visitor_id_daily: sha256(daily_salt || ip || ua || lang) + * - corp_id, domain_id: from Origin/Referer/Host → domains row + * + * Returns nulls for any dimension we can't resolve. Callers persist all + * three on the event row. + */ + private async resolveCrossDomainDimensions(request: Request): Promise<{ + visitorIdDaily: Buffer | null; + corpId: number | null; + domainId: number | null; + }> { + const ua = (request.headers['user-agent'] as string | undefined) ?? undefined; + const lang = (request.headers['accept-language'] as string | undefined) ?? undefined; + const ip = this.extractIpAddress(request) ?? undefined; + + const visitorIdDaily = await this.identityService.visitorIdDaily(ip, ua, lang); + + const hostname = this.domainResolver.hostnameFromHeaders(request.headers); + const resolved = await this.domainResolver.resolve(hostname); + + return { + visitorIdDaily, + corpId: resolved?.corpId ?? null, + domainId: resolved?.domainId ?? null, + }; + } + /** * Track page view with device fingerprinting */ @@ -100,6 +133,10 @@ export class TrackingService { data.pageUrl, ); + // Cross-domain dimensions + const { visitorIdDaily, corpId, domainId } = + await this.resolveCrossDomainDimensions(request); + // Create raw event const event = this.rawEventRepository.create({ eventType: 'pageview', @@ -111,6 +148,9 @@ export class TrackingService { metadata: data.metadata ?? null, timestamp: new Date(), processed: false, + visitorIdDaily, + corpId, + domainId, }); const saved = await this.rawEventRepository.save(event);