From 4c8758cbdaa254d2d9b17a2b0f6758ff0882a20a Mon Sep 17 00:00:00 2001 From: autocommit Date: Sat, 16 May 2026 21:10:57 -0700 Subject: [PATCH] =?UTF-8?q?refactor(acquisition):=20=E2=99=BB=EF=B8=8F=20R?= =?UTF-8?q?emove=20corpRawEventsFilter=20dependency=20and=20simplify=20Acq?= =?UTF-8?q?uisitionService=20logic=20while=20updating=20documentation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Lilith Autocommit --- .../src/acquisition/acquisition.service.ts | 166 +++++++++--------- 1 file changed, 81 insertions(+), 85 deletions(-) diff --git a/services/api/src/acquisition/acquisition.service.ts b/services/api/src/acquisition/acquisition.service.ts index 6938712..7814d8e 100644 --- a/services/api/src/acquisition/acquisition.service.ts +++ b/services/api/src/acquisition/acquisition.service.ts @@ -2,7 +2,7 @@ import { Injectable, Logger } from '@nestjs/common'; import { InjectDataSource } from '@nestjs/typeorm'; import { DataSource } from 'typeorm'; import { AcquisitionQueryDto, AcquisitionCompareDto, ReferrerQueryDto } from './dto/acquisition-query.dto'; -import { resolveCorpId, corpSessionFilter, corpRawEventsFilter } from '../common/corp-filter.util'; +import { resolveCorpId, corpSessionFilter } from '../common/corp-filter.util'; export interface ChannelMetrics { channel: string; @@ -232,102 +232,98 @@ export class AcquisitionService { } /** - * Get source/medium breakdown. - * Prefers aggregated_metrics traffic_source dimension; falls back to - * deriving source from raw_events.referrer when no aggregated data exists. + * Get source/medium breakdown from session_fingerprints. + * Groups by (utmSource | trafficSource) + (utmMedium | classification), + * computing engaged_sessions and conversions inline against raw_events. + * Optional channel filter narrows by sf."trafficSource". */ async getSources(query: AcquisitionQueryDto): Promise { - const { startDate, endDate, limit } = query; + const { startDate, endDate, channel, limit } = query; const corpId = await resolveCorpId(this.dataSource, query.corp); - // First try the aggregated_metrics traffic_source dimension (populated by processor) - // NOTE: aggregated_metrics has no corp dimension — when a corp scope is requested, - // skip this path and go straight to raw_events derivation (which IS corp-scoped). - const aggregatedQuery = ` - WITH source_totals AS ( + const params: (string | number)[] = [startDate, endDate]; + let paramIndex = 3; + const extraConditions: string[] = []; + + if (channel) { + extraConditions.push(`sf."trafficSource" = $${paramIndex}`); + params.push(channel); + paramIndex++; + } + + if (corpId !== null) { + extraConditions.push(corpSessionFilter(paramIndex, corpId).replace(/^\s*AND\s+/i, '')); + params.push(corpId); + paramIndex++; + } + + const extraWhere = extraConditions.length > 0 + ? ' AND ' + extraConditions.join(' AND ') + : ''; + + const sourcesQuery = ` + WITH session_data AS ( SELECT - "dimensionValue" AS source, - SUM(value) AS sessions - FROM aggregated_metrics - WHERE "metricType" = 'page_views' - AND dimension = 'traffic_source' - AND "dimensionValue" IS NOT NULL - AND timestamp BETWEEN $1 AND $2 - GROUP BY "dimensionValue" + sf."sessionId", + sf."userId", + COALESCE(sf."utmSource", sf."trafficSource", 'direct') AS source, + COALESCE( + sf."utmMedium", + CASE + WHEN sf."trafficSource" IN ('organic', 'paid', 'social', 'referral', 'email', 'display') THEN sf."trafficSource" + ELSE 'none' + END + ) AS medium, + EXISTS ( + SELECT 1 FROM raw_events re + WHERE re."sessionId" = sf."sessionId" + AND re."eventType" IN ('conversion', 'purchase') + ) AS has_conversion, + COALESCE(( + SELECT EXTRACT(EPOCH FROM (MAX(re.timestamp) - MIN(re.timestamp))) + FROM raw_events re WHERE re."sessionId" = sf."sessionId" + ), 0) AS duration_seconds, + COALESCE(( + SELECT COUNT(*) FROM raw_events re + WHERE re."sessionId" = sf."sessionId" AND re."eventType" IN ('pageView', 'pageview') + ), 0) AS page_views + FROM session_fingerprints sf + WHERE sf."createdAt" BETWEEN $1 AND $2 + AND sf."isBot" = false${extraWhere} ) - SELECT source, sessions FROM source_totals + SELECT + sd.source, + sd.medium, + COUNT(DISTINCT sd."sessionId") AS sessions, + COUNT(DISTINCT sd."userId") AS users, + COUNT(DISTINCT CASE WHEN sd.duration_seconds >= 10 OR sd.page_views >= 2 OR sd.has_conversion THEN sd."sessionId" END) AS engaged_sessions, + COUNT(DISTINCT CASE WHEN sd.has_conversion THEN sd."sessionId" END) AS conversions + FROM session_data sd + GROUP BY sd.source, sd.medium ORDER BY sessions DESC - LIMIT $3 + LIMIT $${paramIndex} `; + params.push(limit ?? 20); + try { - const aggregated = corpId === null - ? await this.dataSource.query(aggregatedQuery, [startDate, endDate, limit ?? 20]) - : []; + const result = await this.dataSource.query(sourcesQuery, params); - if (aggregated.length > 0) { - return aggregated.map((row: Record) => ({ + return result.map((row: Record) => { + const sessions = Number(row.sessions) || 0; + const engagedSessions = Number(row.engaged_sessions) || 0; + const conversions = Number(row.conversions) || 0; + + return { source: row.source as string, - medium: 'none', - sessions: Number(row.sessions) || 0, - users: 0, - engagementRate: 0, - conversions: 0, - conversionRate: 0, - })); - } - - // Fallback: derive traffic source from raw_events.referrer per unique session - const rawQuery = ` - WITH first_event AS ( - SELECT DISTINCT ON ("sessionId") - "sessionId", - referrer, - "deviceType", - CASE - WHEN referrer IS NULL OR referrer = '' THEN 'direct' - WHEN referrer LIKE '%google.%' OR referrer LIKE '%bing.%' - OR referrer LIKE '%yahoo.%' OR referrer LIKE '%duckduckgo.%' THEN 'organic' - WHEN referrer LIKE '%twitter.%' OR referrer LIKE '%t.co%' THEN 'twitter' - WHEN referrer LIKE '%facebook.%' OR referrer LIKE '%fb.com%' THEN 'facebook' - WHEN referrer LIKE '%instagram.%' THEN 'instagram' - WHEN referrer LIKE '%reddit.%' THEN 'reddit' - WHEN referrer LIKE '%onlyfans.%' THEN 'onlyfans' - WHEN referrer IS NOT NULL AND referrer != '' - THEN COALESCE(substring(referrer FROM 'https?://([^/]+)'), 'referral') - ELSE 'direct' - END AS source - FROM raw_events - WHERE "eventType" IN ('pageview', 'pageView') - AND "deviceType" IS DISTINCT FROM 'bot' - AND timestamp BETWEEN $1 AND $2${corpRawEventsFilter(4, corpId, 'raw_events')} - ORDER BY "sessionId", timestamp ASC - ) - SELECT - source, - 'none' AS medium, - COUNT(DISTINCT "sessionId") AS sessions, - 0::int AS users, - 0::int AS conversions - FROM first_event - GROUP BY source - ORDER BY sessions DESC - LIMIT $3 - `; - - const rawParams: (string | number)[] = [startDate, endDate, limit ?? 20]; - if (corpId !== null) rawParams.push(corpId); - const raw = await this.dataSource.query(rawQuery, rawParams); - - return raw.map((row: Record) => ({ - source: row.source as string, - medium: row.medium as string, - sessions: Number(row.sessions) || 0, - users: 0, - engagementRate: 0, - conversions: 0, - conversionRate: 0, - })); + medium: row.medium as string, + sessions, + users: Number(row.users) || 0, + engagementRate: sessions > 0 ? engagedSessions / sessions : 0, + conversions, + conversionRate: sessions > 0 ? conversions / sessions : 0, + }; + }); } catch (error) { this.logger.error('Failed to get sources', error); throw error;