analytics/services/processor/src/processors/processors.module.ts
Natalie b252753476 feat(analytics): canonical store on black, vps-0 edge with redis outage-spool
Relocate the canonical analytics store off the public VPS. The collector
becomes a DB-free edge that captures + durably enqueues events to black's
redis; a black-side ingest-writer enriches and writes raw_events. When black
is unreachable the collector spools to a local appendonly redis that only
grows during the outage and drains on recovery.

- shared: RawEventEnvelope/NormalizedEvent ingest contract (edge -> writer)
- collector: capture-and-enqueue + dual-redis RedisRouter (primary=black,
  spool=local) + paused-until-healthy drain worker; drop TypeORM/enrichment
- processor: IngestService canonical writer (edge receivedAt, ON CONFLICT
  DO NOTHING), single worker branches ingest-event -> process-event
- relocate device/identity/domain/attribution enrichment + entities to writer

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-21 07:48:02 -05:00

24 lines
848 B
TypeScript

import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { BullModule } from '@nestjs/bullmq';
import { EVENTS_QUEUE } from '@lilith/analytics';
import { EventsProcessor } from './events.processor';
import { AggregationService } from './aggregation.service';
import { AggregatedMetric } from '../entities/aggregated-metric.entity';
import { RawEvent } from '../entities/raw-event.entity';
import { RedisModule } from '../redis/redis.module';
import { IngestModule } from '../ingest/ingest.module';
@Module({
imports: [
TypeOrmModule.forFeature([AggregatedMetric, RawEvent]),
BullModule.registerQueue({
name: EVENTS_QUEUE,
}),
RedisModule,
IngestModule,
],
providers: [EventsProcessor, AggregationService],
exports: [AggregationService],
})
export class ProcessorsModule {}