diff --git a/services/api/src/acquisition/acquisition.service.ts b/services/api/src/acquisition/acquisition.service.ts index 596d7cc..25d1683 100644 --- a/services/api/src/acquisition/acquisition.service.ts +++ b/services/api/src/acquisition/acquisition.service.ts @@ -223,82 +223,95 @@ export class AcquisitionService { } /** - * Get source/medium breakdown + * Get source/medium breakdown. + * Prefers aggregated_metrics traffic_source dimension; falls back to + * deriving source from raw_events.referrer when no aggregated data exists. */ async getSources(query: AcquisitionQueryDto): Promise { - const { startDate, endDate, channel, limit } = query; + const { startDate, endDate, limit } = query; - const conditions: string[] = [ - 'sf."createdAt" BETWEEN $1 AND $2', - 'sf."isBot" = false', - ]; - const params: (string | number)[] = [startDate, endDate]; - let paramIndex = 3; - - if (channel) { - conditions.push(`sf."trafficSource" = $${paramIndex}`); - params.push(channel); - paramIndex++; - } - - const whereClause = conditions.join(' AND '); - - const sourcesQuery = ` - WITH session_data AS ( + // First try the aggregated_metrics traffic_source dimension (populated by processor) + const aggregatedQuery = ` + WITH source_totals AS ( SELECT - sf."sessionId", - sf."userId", - COALESCE(sf."utmSource", 'direct') as source, - COALESCE(sf."utmMedium", 'none') as medium, - EXISTS ( - SELECT 1 FROM raw_events re - WHERE re."sessionId" = sf."sessionId" - AND re."eventType" IN ('conversion', 'purchase') - ) as has_conversion, - COALESCE(( - SELECT EXTRACT(EPOCH FROM (MAX(re.timestamp) - MIN(re.timestamp))) - FROM raw_events re WHERE re."sessionId" = sf."sessionId" - ), 0) as duration_seconds, - COALESCE(( - SELECT COUNT(*) FROM raw_events re - WHERE re."sessionId" = sf."sessionId" AND re."eventType" IN ('pageView', 'pageview') - ), 0) as page_views - FROM session_fingerprints sf - WHERE ${whereClause} + "dimensionValue" AS source, + SUM(value) AS sessions + FROM aggregated_metrics + WHERE "metricType" = 'page_views' + AND dimension = 'traffic_source' + AND "dimensionValue" IS NOT NULL + AND timestamp BETWEEN $1 AND $2 + GROUP BY "dimensionValue" ) - SELECT - sd.source, - sd.medium, - COUNT(DISTINCT sd."sessionId") as sessions, - COUNT(DISTINCT sd."userId") as users, - COUNT(DISTINCT CASE WHEN sd.duration_seconds >= 10 OR sd.page_views >= 2 OR sd.has_conversion THEN sd."sessionId" END) as engaged_sessions, - COUNT(DISTINCT CASE WHEN sd.has_conversion THEN sd."sessionId" END) as conversions - FROM session_data sd - GROUP BY sd.source, sd.medium + SELECT source, sessions FROM source_totals ORDER BY sessions DESC - LIMIT $${paramIndex} + LIMIT $3 `; - params.push(limit ?? 20); - try { - const result = await this.dataSource.query(sourcesQuery, params); + const aggregated = await this.dataSource.query(aggregatedQuery, [startDate, endDate, limit ?? 20]); - return result.map((row: Record) => { - const sessions = Number(row.sessions) || 0; - const engagedSessions = Number(row.engaged_sessions) || 0; - const conversions = Number(row.conversions) || 0; - - return { + if (aggregated.length > 0) { + return aggregated.map((row: Record) => ({ source: row.source as string, - medium: row.medium as string, - sessions, - users: Number(row.users) || 0, - engagementRate: sessions > 0 ? engagedSessions / sessions : 0, - conversions, - conversionRate: sessions > 0 ? conversions / sessions : 0, - }; - }); + medium: 'none', + sessions: Number(row.sessions) || 0, + users: 0, + engagementRate: 0, + conversions: 0, + conversionRate: 0, + })); + } + + // Fallback: derive traffic source from raw_events.referrer per unique session + const rawQuery = ` + WITH first_event AS ( + SELECT DISTINCT ON ("sessionId") + "sessionId", + referrer, + "deviceType", + CASE + WHEN referrer IS NULL OR referrer = '' THEN 'direct' + WHEN referrer LIKE '%google.%' OR referrer LIKE '%bing.%' + OR referrer LIKE '%yahoo.%' OR referrer LIKE '%duckduckgo.%' THEN 'organic' + WHEN referrer LIKE '%twitter.%' OR referrer LIKE '%t.co%' THEN 'twitter' + WHEN referrer LIKE '%facebook.%' OR referrer LIKE '%fb.com%' THEN 'facebook' + WHEN referrer LIKE '%instagram.%' THEN 'instagram' + WHEN referrer LIKE '%reddit.%' THEN 'reddit' + WHEN referrer LIKE '%onlyfans.%' THEN 'onlyfans' + WHEN referrer IS NOT NULL AND referrer != '' + THEN COALESCE(substring(referrer FROM 'https?://([^/]+)'), 'referral') + ELSE 'direct' + END AS source + FROM raw_events + WHERE "eventType" IN ('pageview', 'pageView') + AND "deviceType" IS DISTINCT FROM 'bot' + AND timestamp BETWEEN $1 AND $2 + ORDER BY "sessionId", timestamp ASC + ) + SELECT + source, + 'none' AS medium, + COUNT(DISTINCT "sessionId") AS sessions, + 0::int AS users, + 0::int AS conversions + FROM first_event + GROUP BY source + ORDER BY sessions DESC + LIMIT $3 + `; + + const raw = await this.dataSource.query(rawQuery, [startDate, endDate, limit ?? 20]); + + return raw.map((row: Record) => ({ + source: row.source as string, + medium: row.medium as string, + sessions: Number(row.sessions) || 0, + users: 0, + engagementRate: 0, + conversions: 0, + conversionRate: 0, + })); } catch (error) { this.logger.error('Failed to get sources', error); throw error; diff --git a/services/api/src/audience/audience.service.ts b/services/api/src/audience/audience.service.ts index d46854b..be7e2bc 100644 --- a/services/api/src/audience/audience.service.ts +++ b/services/api/src/audience/audience.service.ts @@ -215,66 +215,46 @@ export class AudienceService { } /** - * Get device breakdown + * Get device breakdown from aggregated_metrics (device_type dimension). + * Falls back to raw_events when no aggregated data is available. */ async getDevices(query: DeviceQueryDto): Promise { const { startDate, endDate, deviceType, limit } = query; - const conditions: string[] = [ - 'sf."createdAt" BETWEEN $1 AND $2', - 'sf."isBot" = false', - ]; - const params: (string | number)[] = [startDate, endDate]; - let paramIndex = 3; - - if (deviceType) { - conditions.push(`sf."deviceType" = $${paramIndex}`); - params.push(deviceType); - paramIndex++; - } - - const whereClause = conditions.join(' AND '); - + // Use aggregated_metrics for device breakdown — avoids dependence on + // session_fingerprints which may not have data for all sessions. const devicesQuery = ` - WITH session_data AS ( + WITH device_totals AS ( SELECT - sf."sessionId", - sf."userId", - COALESCE(sf."deviceType", 'unknown') as device_type, - EXISTS ( - SELECT 1 FROM raw_events re - WHERE re."sessionId" = sf."sessionId" - AND re."eventType" IN ('conversion', 'purchase') - ) as has_conversion, - COALESCE(( - SELECT EXTRACT(EPOCH FROM (MAX(re.timestamp) - MIN(re.timestamp))) - FROM raw_events re WHERE re."sessionId" = sf."sessionId" - ), 0) as duration_seconds, - COALESCE(( - SELECT COUNT(*) FROM raw_events re - WHERE re."sessionId" = sf."sessionId" AND re."eventType" IN ('pageView', 'pageview') - ), 0) as page_views - FROM session_fingerprints sf - WHERE ${whereClause} + "dimensionValue" AS device_type, + SUM(value) AS page_views + FROM aggregated_metrics + WHERE "metricType" = 'page_views' + AND dimension = 'device_type' + AND "dimensionValue" != 'bot' + AND "dimensionValue" IS NOT NULL + ${deviceType ? `AND "dimensionValue" = $4` : ''} + AND timestamp BETWEEN $1 AND $2 + GROUP BY "dimensionValue" ), - total AS ( - SELECT COUNT(*) as total_sessions FROM session_data + grand_total AS ( + SELECT COALESCE(SUM(page_views), 0) AS total FROM device_totals ) SELECT - sd.device_type, - COUNT(sd."sessionId") as sessions, - COUNT(DISTINCT sd."userId") as users, - COUNT(sd."sessionId")::float / NULLIF(t.total_sessions, 0) as percentage, - COUNT(CASE WHEN sd.duration_seconds >= 10 OR sd.page_views >= 2 OR sd.has_conversion THEN 1 END)::float / NULLIF(COUNT(*), 0) as engagement_rate, - COUNT(CASE WHEN sd.has_conversion THEN 1 END)::float / NULLIF(COUNT(*), 0) as conversion_rate - FROM session_data sd - CROSS JOIN total t - GROUP BY sd.device_type, t.total_sessions - ORDER BY sessions DESC - LIMIT $${paramIndex} + dt.device_type, + dt.page_views::bigint AS sessions, + 0::bigint AS users, + CASE WHEN gt.total > 0 THEN dt.page_views / gt.total ELSE 0 END AS percentage, + 0.0 AS engagement_rate, + 0.0 AS conversion_rate + FROM device_totals dt + CROSS JOIN grand_total gt + ORDER BY dt.page_views DESC + LIMIT $3 `; - params.push(limit ?? 20); + const params: (string | number)[] = [startDate, endDate, limit ?? 20]; + if (deviceType) params.push(deviceType); try { const result = await this.dataSource.query(devicesQuery, params); diff --git a/services/processor/src/processors/aggregation.service.ts b/services/processor/src/processors/aggregation.service.ts index 1228615..1921727 100644 --- a/services/processor/src/processors/aggregation.service.ts +++ b/services/processor/src/processors/aggregation.service.ts @@ -146,6 +146,42 @@ export class AggregationService implements OnModuleDestroy { sessionState.trafficSource, ); } + + // Track by browser if available + if (sessionState.browser) { + await this.incrementMetric( + MetricType.PAGE_VIEWS, + TimeGranularity.DAY, + dayBucket, + 1, + 'browser', + sessionState.browser, + ); + } + + // Track by OS if available + if (sessionState.os) { + await this.incrementMetric( + MetricType.PAGE_VIEWS, + TimeGranularity.DAY, + dayBucket, + 1, + 'os', + sessionState.os, + ); + } + + // Track by country if available + if (sessionState.country) { + await this.incrementMetric( + MetricType.PAGE_VIEWS, + TimeGranularity.DAY, + dayBucket, + 1, + 'country', + sessionState.country, + ); + } } /** @@ -356,6 +392,8 @@ export class AggregationService implements OnModuleDestroy { trafficSource: properties.trafficSource as string | undefined, deviceType: properties.deviceType as string | undefined, country: properties.country as string | undefined, + browser: properties.browser as string | undefined, + os: properties.os as string | undefined, }; } diff --git a/services/processor/src/redis/redis-session.service.ts b/services/processor/src/redis/redis-session.service.ts index fa57f97..7d18ef1 100644 --- a/services/processor/src/redis/redis-session.service.ts +++ b/services/processor/src/redis/redis-session.service.ts @@ -18,6 +18,8 @@ export interface SessionState { trafficSource?: string; deviceType?: string; country?: string; + browser?: string; + os?: string; } /**