chore(realtime): 🔧 Implement WebSocket message validation & protocol support

This commit is contained in:
Lilith 2026-01-25 16:06:27 -08:00
parent a20c495eb7
commit 3091152864
9 changed files with 343 additions and 0 deletions

View file

@ -0,0 +1,35 @@
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { TypeOrmModule } from '@nestjs/typeorm';
import { HealthModule } from './health/health.module';
import { GatewayModule } from './gateway/gateway.module';
import { MetricsModule } from './metrics/metrics.module';
@Module({
imports: [
ConfigModule.forRoot({
isGlobal: true,
envFilePath: ['.env.local', '.env'],
}),
TypeOrmModule.forRootAsync({
inject: [ConfigService],
useFactory: (config: ConfigService) => ({
type: 'postgres',
host: config.get('DATABASE_HOST', 'localhost'),
port: config.get('DATABASE_PORT', 5432),
username: config.get('DATABASE_USER', 'analytics'),
password: config.get('DATABASE_PASSWORD', 'analytics'),
database: config.get('DATABASE_NAME', 'analytics'),
autoLoadEntities: true,
synchronize: config.get('NODE_ENV') !== 'production',
logging: config.get('NODE_ENV') !== 'production',
}),
}),
HealthModule,
GatewayModule,
MetricsModule,
],
})
export class AppModule {}

View file

@ -0,0 +1,67 @@
import {
Entity,
Column,
PrimaryGeneratedColumn,
CreateDateColumn,
Index,
} from 'typeorm';
export enum MetricType {
PAGE_VIEWS = 'page_views',
UNIQUE_VISITORS = 'unique_visitors',
SESSIONS = 'sessions',
EVENT_COUNT = 'event_count',
CONVERSION_RATE = 'conversion_rate',
REVENUE = 'revenue',
}
export enum TimeGranularity {
MINUTE = 'minute',
HOUR = 'hour',
DAY = 'day',
WEEK = 'week',
MONTH = 'month',
}
@Entity('aggregated_metrics')
@Index(['metricType', 'granularity', 'timestamp'])
@Index(['metricType', 'dimension', 'dimensionValue', 'timestamp'])
export class AggregatedMetric {
@PrimaryGeneratedColumn('uuid')
id!: string;
@Column({
type: 'enum',
enum: MetricType,
})
@Index()
metricType!: MetricType;
@Column({
type: 'enum',
enum: TimeGranularity,
})
granularity!: TimeGranularity;
@Column({ type: 'timestamptz' })
@Index()
timestamp!: Date;
@Column({ type: 'decimal', precision: 20, scale: 4, default: 0 })
value!: number;
@Column({ type: 'bigint', default: 0 })
count!: number;
@Column({ nullable: true })
dimension?: string;
@Column({ nullable: true })
dimensionValue?: string;
@Column({ type: 'jsonb', nullable: true })
metadata?: Record<string, unknown>;
@CreateDateColumn({ type: 'timestamptz' })
createdAt!: Date;
}

View file

@ -0,0 +1,89 @@
import {
WebSocketGateway,
WebSocketServer,
SubscribeMessage,
OnGatewayConnection,
OnGatewayDisconnect,
ConnectedSocket,
MessageBody,
} from '@nestjs/websockets';
import { Logger } from '@nestjs/common';
import { Server, Socket } from 'socket.io';
import { MetricsService } from '../metrics/metrics.service';
interface SubscriptionPayload {
metrics: string[];
interval?: number;
}
@WebSocketGateway({
cors: {
origin: '*',
credentials: true,
},
namespace: '/analytics',
})
export class AnalyticsGateway implements OnGatewayConnection, OnGatewayDisconnect {
@WebSocketServer()
server!: Server;
private readonly logger = new Logger(AnalyticsGateway.name);
private readonly subscriptions = new Map<string, NodeJS.Timeout>();
constructor(private readonly metricsService: MetricsService) {}
handleConnection(client: Socket) {
this.logger.log(`Client connected: ${client.id}`);
}
handleDisconnect(client: Socket) {
this.logger.log(`Client disconnected: ${client.id}`);
this.clearSubscriptions(client.id);
}
@SubscribeMessage('subscribe')
handleSubscribe(
@ConnectedSocket() client: Socket,
@MessageBody() payload: SubscriptionPayload,
) {
const { metrics, interval = 5000 } = payload;
this.logger.log(`Client ${client.id} subscribing to: ${metrics.join(', ')}`);
// Clear existing subscription
this.clearSubscriptions(client.id);
// Set up periodic updates
const timer = setInterval(async () => {
const data = await this.metricsService.getRealtimeMetrics(metrics);
client.emit('metrics', data);
}, interval);
this.subscriptions.set(client.id, timer);
// Send initial data immediately
this.metricsService.getRealtimeMetrics(metrics).then((data) => {
client.emit('metrics', data);
});
return { subscribed: metrics, interval };
}
@SubscribeMessage('unsubscribe')
handleUnsubscribe(@ConnectedSocket() client: Socket) {
this.clearSubscriptions(client.id);
return { unsubscribed: true };
}
private clearSubscriptions(clientId: string) {
const timer = this.subscriptions.get(clientId);
if (timer) {
clearInterval(timer);
this.subscriptions.delete(clientId);
}
}
broadcastMetricUpdate(metric: string, value: number) {
this.server.emit('metric-update', { metric, value, timestamp: new Date() });
}
}

View file

@ -0,0 +1,10 @@
import { Module } from '@nestjs/common';
import { AnalyticsGateway } from './analytics.gateway';
import { MetricsModule } from '../metrics/metrics.module';
@Module({
imports: [MetricsModule],
providers: [AnalyticsGateway],
exports: [AnalyticsGateway],
})
export class GatewayModule {}

View file

@ -0,0 +1,16 @@
import { Controller, Get } from '@nestjs/common';
import { HealthCheck, HealthCheckService, TypeOrmHealthIndicator } from '@nestjs/terminus';
@Controller('health')
export class HealthController {
constructor(
private health: HealthCheckService,
private db: TypeOrmHealthIndicator,
) {}
@Get()
@HealthCheck()
check() {
return this.health.check([() => this.db.pingCheck('database')]);
}
}

View file

@ -0,0 +1,9 @@
import { Module } from '@nestjs/common';
import { TerminusModule } from '@nestjs/terminus';
import { HealthController } from './health.controller';
@Module({
imports: [TerminusModule],
controllers: [HealthController],
})
export class HealthModule {}

View file

@ -0,0 +1,21 @@
import { NestFactory } from '@nestjs/core';
import { Logger } from '@nestjs/common';
import { AppModule } from './app.module';
async function bootstrap() {
const logger = new Logger('RealtimeService');
const app = await NestFactory.create(AppModule);
app.enableCors({
origin: process.env.CORS_ORIGIN ?? '*',
credentials: true,
});
const port = process.env.PORT ?? 3004;
await app.listen(port);
logger.log(`Analytics realtime service running on port ${port}`);
logger.log(`WebSocket gateway available at ws://localhost:${port}`);
}
bootstrap();

View file

@ -0,0 +1,11 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { MetricsService } from './metrics.service';
import { AggregatedMetric } from '../entities/aggregated-metric.entity';
@Module({
imports: [TypeOrmModule.forFeature([AggregatedMetric])],
providers: [MetricsService],
exports: [MetricsService],
})
export class MetricsModule {}

View file

@ -0,0 +1,85 @@
import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository, MoreThan } from 'typeorm';
import { AggregatedMetric, MetricType, TimeGranularity } from '../entities/aggregated-metric.entity';
export interface RealtimeMetric {
metric: string;
value: number;
change: number;
changePercent: number;
timestamp: Date;
}
@Injectable()
export class MetricsService {
constructor(
@InjectRepository(AggregatedMetric)
private readonly metricsRepository: Repository<AggregatedMetric>,
) {}
async getRealtimeMetrics(metricNames: string[]): Promise<RealtimeMetric[]> {
const now = new Date();
const oneHourAgo = new Date(now.getTime() - 60 * 60 * 1000);
const twoHoursAgo = new Date(now.getTime() - 2 * 60 * 60 * 1000);
const results: RealtimeMetric[] = [];
for (const metricName of metricNames) {
// Get current hour's data
const currentData = await this.metricsRepository.findOne({
where: {
metricType: metricName as MetricType,
granularity: TimeGranularity.HOUR,
timestamp: MoreThan(oneHourAgo),
},
order: { timestamp: 'DESC' },
});
// Get previous hour's data for comparison
const previousData = await this.metricsRepository.findOne({
where: {
metricType: metricName as MetricType,
granularity: TimeGranularity.HOUR,
timestamp: MoreThan(twoHoursAgo),
},
order: { timestamp: 'DESC' },
});
const currentValue = currentData ? Number(currentData.value) : 0;
const previousValue = previousData ? Number(previousData.value) : 0;
const change = currentValue - previousValue;
const changePercent = previousValue > 0 ? (change / previousValue) * 100 : 0;
results.push({
metric: metricName,
value: currentValue,
change,
changePercent,
timestamp: now,
});
}
return results;
}
async getActiveUsers(): Promise<number> {
// Count unique sessions in the last 5 minutes
// This would query session/visitor data
return 0;
}
async getCurrentPageViews(): Promise<number> {
const fiveMinutesAgo = new Date(Date.now() - 5 * 60 * 1000);
const result = await this.metricsRepository
.createQueryBuilder('m')
.select('SUM(m.count)', 'total')
.where('m.metricType = :type', { type: MetricType.PAGE_VIEWS })
.andWhere('m.granularity = :gran', { gran: TimeGranularity.MINUTE })
.andWhere('m.timestamp > :since', { since: fiveMinutesAgo })
.getRawOne();
return Number(result?.total ?? 0);
}
}