analytics/services/processor/src/processors/aggregation.service.ts
2026-06-10 03:54:59 -07:00

552 lines
14 KiB
TypeScript

import { Injectable, Logger, OnModuleDestroy } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import {
AggregatedMetric,
MetricType,
TimeGranularity,
} from '../entities/aggregated-metric.entity';
import { RedisSessionService, SessionState } from '../redis/redis-session.service';
import { RedisPublisherService } from '../redis/redis-publisher.service';
interface ProcessableEvent {
eventType: string;
timestamp: Date;
sessionId: string;
userId?: string | null;
properties: Record<string, unknown>;
}
/**
* Engagement thresholds for determining engaged sessions.
* A session is "engaged" if ANY of these conditions are met:
* - Duration >= 10 seconds
* - Page views >= 2
* - Has a conversion event
*/
const ENGAGEMENT_THRESHOLDS = {
minDurationMs: 10_000, // 10 seconds
minPageViews: 2,
};
@Injectable()
export class AggregationService implements OnModuleDestroy {
private readonly logger = new Logger(AggregationService.name);
private metricsBuffer: Array<{
metricType: MetricType;
granularity: TimeGranularity;
value: number;
dimension?: string;
dimensionValue?: string;
}> = [];
private publishTimer?: NodeJS.Timeout;
constructor(
@InjectRepository(AggregatedMetric)
private readonly metricsRepository: Repository<AggregatedMetric>,
private readonly redisSession: RedisSessionService,
private readonly redisPublisher: RedisPublisherService,
) {
// Publish buffered metrics every 5 seconds
this.publishTimer = setInterval(() => {
this.flushMetricsBuffer();
}, 5000);
}
async processEvent(event: ProcessableEvent): Promise<void> {
const { eventType, timestamp, userId, properties } = event;
const hourBucket = this.getTimeBucket(timestamp, TimeGranularity.HOUR);
const dayBucket = this.getTimeBucket(timestamp, TimeGranularity.DAY);
// Update session state (now async with Redis)
const sessionState = await this.updateSessionState(event);
switch (eventType) {
case 'pageView':
case 'pageview':
await this.handlePageView(hourBucket, dayBucket, properties, sessionState);
break;
case 'session_start':
await this.handleSessionStart(hourBucket, dayBucket, sessionState, userId);
break;
case 'session_end':
await this.handleSessionEnd(hourBucket, dayBucket, sessionState);
break;
case 'purchase':
case 'conversion':
sessionState.hasConversion = true;
await this.handleConversion(hourBucket, eventType, properties);
break;
default:
await this.incrementMetric(
MetricType.EVENT_COUNT,
TimeGranularity.HOUR,
hourBucket,
1,
'event_type',
eventType,
);
}
}
/**
* Handle page view event with dimensional tracking
*/
private async handlePageView(
hourBucket: Date,
dayBucket: Date,
properties: Record<string, unknown>,
sessionState: SessionState,
): Promise<void> {
// Core page view metric
await this.incrementMetric(
MetricType.PAGE_VIEWS,
TimeGranularity.HOUR,
hourBucket,
1,
);
// Page view by path dimension
if (properties.path) {
await this.incrementMetric(
MetricType.PAGE_VIEWS,
TimeGranularity.HOUR,
hourBucket,
1,
'path',
String(properties.path),
);
}
// Track by device type if available
if (sessionState.deviceType) {
await this.incrementMetric(
MetricType.PAGE_VIEWS,
TimeGranularity.DAY,
dayBucket,
1,
'device_type',
sessionState.deviceType,
);
}
// Track by traffic source if available
if (sessionState.trafficSource) {
await this.incrementMetric(
MetricType.PAGE_VIEWS,
TimeGranularity.DAY,
dayBucket,
1,
'traffic_source',
sessionState.trafficSource,
);
}
// Track by browser if available
if (sessionState.browser) {
await this.incrementMetric(
MetricType.PAGE_VIEWS,
TimeGranularity.DAY,
dayBucket,
1,
'browser',
sessionState.browser,
);
}
// Track by OS if available
if (sessionState.os) {
await this.incrementMetric(
MetricType.PAGE_VIEWS,
TimeGranularity.DAY,
dayBucket,
1,
'os',
sessionState.os,
);
}
// Track by country if available
if (sessionState.country) {
await this.incrementMetric(
MetricType.PAGE_VIEWS,
TimeGranularity.DAY,
dayBucket,
1,
'country',
sessionState.country,
);
}
}
/**
* Handle session start with user tracking
*/
private async handleSessionStart(
hourBucket: Date,
dayBucket: Date,
sessionState: SessionState,
userId?: string | null,
): Promise<void> {
// Track total sessions
await this.incrementMetric(
MetricType.SESSIONS,
TimeGranularity.HOUR,
hourBucket,
1,
);
// Track new vs returning users (day granularity for better analysis)
if (userId) {
const isNewUser = !(await this.redisSession.hasSeenUser(userId));
await this.redisSession.markUserSeen(userId);
sessionState.isNew = isNewUser;
if (isNewUser) {
await this.incrementMetric(
MetricType.NEW_USERS,
TimeGranularity.DAY,
dayBucket,
1,
);
} else {
await this.incrementMetric(
MetricType.RETURNING_USERS,
TimeGranularity.DAY,
dayBucket,
1,
);
}
}
// Track sessions by traffic source
if (sessionState.trafficSource) {
await this.incrementMetric(
MetricType.SESSIONS,
TimeGranularity.DAY,
dayBucket,
1,
'traffic_source',
sessionState.trafficSource,
);
}
// Track sessions by device type
if (sessionState.deviceType) {
await this.incrementMetric(
MetricType.SESSIONS,
TimeGranularity.DAY,
dayBucket,
1,
'device_type',
sessionState.deviceType,
);
}
// Track sessions by country
if (sessionState.country) {
await this.incrementMetric(
MetricType.SESSIONS,
TimeGranularity.DAY,
dayBucket,
1,
'country',
sessionState.country,
);
}
}
/**
* Handle session end with engagement calculation
*/
private async handleSessionEnd(
hourBucket: Date,
dayBucket: Date,
sessionState: SessionState,
): Promise<void> {
// Calculate session metrics
const durationMs = sessionState.lastEventAt.getTime() - sessionState.firstEventAt.getTime();
const durationSeconds = durationMs / 1000;
// Determine if session was "engaged"
const isEngaged =
durationMs >= ENGAGEMENT_THRESHOLDS.minDurationMs ||
sessionState.pageViews >= ENGAGEMENT_THRESHOLDS.minPageViews ||
sessionState.hasConversion;
// Track engaged sessions
if (isEngaged) {
await this.incrementMetric(
MetricType.ENGAGED_SESSIONS,
TimeGranularity.HOUR,
hourBucket,
1,
);
// Track engaged sessions by traffic source
if (sessionState.trafficSource) {
await this.incrementMetric(
MetricType.ENGAGED_SESSIONS,
TimeGranularity.DAY,
dayBucket,
1,
'traffic_source',
sessionState.trafficSource,
);
}
}
// Track bounce (single-page, short sessions)
const isBounce = sessionState.pageViews === 1 && !sessionState.hasConversion;
if (isBounce) {
await this.incrementMetric(
MetricType.BOUNCE_RATE,
TimeGranularity.HOUR,
hourBucket,
1,
);
}
// Track session duration (store sum for averaging)
await this.addToMetric(
MetricType.AVG_SESSION_DURATION,
TimeGranularity.HOUR,
hourBucket,
durationSeconds,
);
// Track pages per session (store sum for averaging)
await this.addToMetric(
MetricType.PAGES_PER_SESSION,
TimeGranularity.HOUR,
hourBucket,
sessionState.pageViews,
);
// Clean up session state from Redis
await this.redisSession.deleteSession(sessionState.sessionId);
}
/**
* Handle conversion events with revenue tracking
*/
private async handleConversion(
hourBucket: Date,
eventType: string,
properties: Record<string, unknown>,
): Promise<void> {
await this.incrementMetric(
MetricType.EVENT_COUNT,
TimeGranularity.HOUR,
hourBucket,
1,
'event_type',
eventType,
);
if (properties.revenue) {
await this.addToMetric(
MetricType.REVENUE,
TimeGranularity.HOUR,
hourBucket,
Number(properties.revenue),
);
}
// Track conversion type dimension
if (properties.conversionType) {
await this.incrementMetric(
MetricType.CONVERSION_RATE,
TimeGranularity.HOUR,
hourBucket,
1,
'conversion_type',
String(properties.conversionType),
);
}
}
/**
* Update or create session state for engagement tracking
*/
private async updateSessionState(event: ProcessableEvent): Promise<SessionState> {
const { sessionId, timestamp, eventType, userId, properties } = event;
let state = await this.redisSession.getSession(sessionId);
if (!state) {
state = {
sessionId,
userId,
firstEventAt: timestamp,
lastEventAt: timestamp,
pageViews: 0,
totalEvents: 0,
hasConversion: false,
isNew: true,
trafficSource: properties.trafficSource as string | undefined,
deviceType: properties.deviceType as string | undefined,
country: properties.country as string | undefined,
browser: properties.browser as string | undefined,
os: properties.os as string | undefined,
};
}
// Update state
state.lastEventAt = timestamp;
state.totalEvents++;
if (eventType === 'pageView' || eventType === 'pageview') {
state.pageViews++;
}
// Update user ID if provided later (e.g., after login)
if (userId && !state.userId) {
state.userId = userId;
}
// Save updated state to Redis
await this.redisSession.setSession(sessionId, state);
return state;
}
/**
* Increment a metric value (atomic upsert)
*/
private async incrementMetric(
metricType: MetricType,
granularity: TimeGranularity,
timestamp: Date,
value: number,
dimension?: string,
dimensionValue?: string,
): Promise<void> {
try {
// Use raw SQL for atomic upsert with increment
await this.metricsRepository.query(
`
INSERT INTO aggregated_metrics ("metricType", "granularity", "timestamp", "value", "count", "dimension", "dimensionValue", "createdAt")
VALUES ($1, $2, $3, $4, 1, $5, $6, NOW())
ON CONFLICT ("metricType", "granularity", "timestamp", "dimension", "dimensionValue")
DO UPDATE SET
"value" = aggregated_metrics."value" + $4,
"count" = aggregated_metrics."count" + 1
`,
[metricType, granularity, timestamp, value, dimension ?? null, dimensionValue ?? null],
);
// Buffer metric for realtime publishing (only HOUR/MINUTE granularity for realtime)
if (granularity === TimeGranularity.HOUR || granularity === TimeGranularity.MINUTE) {
this.metricsBuffer.push({
metricType,
granularity,
value,
dimension,
dimensionValue,
});
}
} catch (error) {
this.logger.error(`Failed to increment metric ${metricType}: ${error}`);
throw error;
}
}
/**
* Add to a metric value (for averaging later)
*/
private async addToMetric(
metricType: MetricType,
granularity: TimeGranularity,
timestamp: Date,
value: number,
dimension?: string,
dimensionValue?: string,
): Promise<void> {
await this.incrementMetric(
metricType,
granularity,
timestamp,
value,
dimension,
dimensionValue,
);
}
/**
* Get time bucket for given granularity
*/
private getTimeBucket(date: Date, granularity: TimeGranularity): Date {
const bucket = new Date(date);
switch (granularity) {
case TimeGranularity.MINUTE:
bucket.setSeconds(0, 0);
break;
case TimeGranularity.HOUR:
bucket.setMinutes(0, 0, 0);
break;
case TimeGranularity.DAY:
bucket.setHours(0, 0, 0, 0);
break;
case TimeGranularity.WEEK:
bucket.setHours(0, 0, 0, 0);
bucket.setDate(bucket.getDate() - bucket.getDay());
break;
case TimeGranularity.MONTH:
bucket.setHours(0, 0, 0, 0);
bucket.setDate(1);
break;
}
return bucket;
}
/**
* Flush buffered metrics to Redis pub/sub for realtime updates.
* Called periodically (every 5 seconds) and on service shutdown.
*/
private async flushMetricsBuffer(): Promise<void> {
if (this.metricsBuffer.length === 0) {
return;
}
const metrics = [...this.metricsBuffer];
this.metricsBuffer = [];
try {
await this.redisPublisher.publishUpdate({
type: 'metrics_updated',
timestamp: new Date(),
metrics: metrics.map((m) => ({
metricType: m.metricType,
granularity: m.granularity,
value: m.value,
dimension: m.dimension,
dimensionValue: m.dimensionValue,
})),
});
this.logger.debug(`Published ${metrics.length} metrics to realtime channel`);
} catch (error) {
this.logger.error(`Failed to publish metrics: ${error}`);
// Don't throw - publishing is non-critical
}
}
/**
* Cleanup on service shutdown.
*/
async onModuleDestroy() {
if (this.publishTimer) {
clearInterval(this.publishTimer);
}
await this.flushMetricsBuffer();
}
}