From 0c0cfc0b69e17b1373ad5e414fb4f2cb3fbffa59 Mon Sep 17 00:00:00 2001 From: autocommit Date: Wed, 10 Jun 2026 03:54:59 -0700 Subject: [PATCH] =?UTF-8?q?feat(processor):=20=E2=9C=A8=20Introduce=20Aggr?= =?UTF-8?q?egationService=20for=20data=20aggregation=20and=20SchemaGuardSe?= =?UTF-8?q?rvice=20for=20schema=20validation=20with=20module=20registratio?= =?UTF-8?q?n?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Lilith Autocommit --- services/processor/src/app.module.ts | 2 + .../src/processors/aggregation.service.ts | 2 +- .../processor/src/schema-guard.service.ts | 38 +++++++++++++++++++ 3 files changed, 41 insertions(+), 1 deletion(-) create mode 100644 services/processor/src/schema-guard.service.ts diff --git a/services/processor/src/app.module.ts b/services/processor/src/app.module.ts index 1f10853..0558c68 100644 --- a/services/processor/src/app.module.ts +++ b/services/processor/src/app.module.ts @@ -5,6 +5,7 @@ import { TypeOrmModule } from '@nestjs/typeorm'; import { HealthModule } from './health/health.module'; import { ProcessorsModule } from './processors/processors.module'; import { AggregatedMetric } from './entities/aggregated-metric.entity'; +import { SchemaGuardService } from './schema-guard.service'; @Module({ imports: [ @@ -47,5 +48,6 @@ import { AggregatedMetric } from './entities/aggregated-metric.entity'; HealthModule, ProcessorsModule, ], + providers: [SchemaGuardService], }) export class AppModule {} diff --git a/services/processor/src/processors/aggregation.service.ts b/services/processor/src/processors/aggregation.service.ts index 1921727..1613414 100644 --- a/services/processor/src/processors/aggregation.service.ts +++ b/services/processor/src/processors/aggregation.service.ts @@ -54,7 +54,7 @@ export class AggregationService implements OnModuleDestroy { } async processEvent(event: ProcessableEvent): Promise { - const { eventType, timestamp, sessionId, userId, properties } = event; + const { eventType, timestamp, userId, properties } = event; const hourBucket = this.getTimeBucket(timestamp, TimeGranularity.HOUR); const dayBucket = this.getTimeBucket(timestamp, TimeGranularity.DAY); diff --git a/services/processor/src/schema-guard.service.ts b/services/processor/src/schema-guard.service.ts new file mode 100644 index 0000000..b560d9c --- /dev/null +++ b/services/processor/src/schema-guard.service.ts @@ -0,0 +1,38 @@ +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { InjectDataSource } from '@nestjs/typeorm'; +import { DataSource } from 'typeorm'; + +/** + * Ensures DDL that the entity decorators cannot express exists before the + * processor starts draining the queue. + * + * The aggregation upsert (aggregation.service.ts) relies on + * `ON CONFLICT ("metricType","granularity","timestamp","dimension","dimensionValue")`, + * which requires a unique index over exactly those columns that treats NULLs + * as equal — global metrics key on NULL dimension/dimensionValue, so a plain + * UNIQUE constraint (NULLs distinct) never conflicts and every upsert fails. + * + * Prod runs `synchronize: false` with no migration runner, so the `@Unique` + * decorator on AggregatedMetric is never applied there — and even where it + * is (dev sync), it lacks NULLS NOT DISTINCT. The 2026-05-16 → 2026-06-07 + * outage was exactly this: a fresh table without the index, every + * aggregation failing for three weeks. This guard makes the fix survive + * fresh deploys and new per-provider databases. + * + * Requires PostgreSQL 15+ (NULLS NOT DISTINCT). + */ +@Injectable() +export class SchemaGuardService implements OnModuleInit { + private readonly logger = new Logger(SchemaGuardService.name); + + constructor(@InjectDataSource() private readonly dataSource: DataSource) {} + + async onModuleInit(): Promise { + await this.dataSource.query(` + CREATE UNIQUE INDEX IF NOT EXISTS uq_aggregated_metrics_dedup + ON aggregated_metrics ("metricType", "granularity", "timestamp", "dimension", "dimensionValue") + NULLS NOT DISTINCT + `); + this.logger.log('uq_aggregated_metrics_dedup ensured (NULLS NOT DISTINCT)'); + } +}