refactor(acquisition): ♻️ Remove corpRawEventsFilter dependency and simplify AcquisitionService logic while updating documentation

Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
autocommit 2026-05-16 21:10:57 -07:00
parent 1ffb0b3710
commit 4c8758cbda

View file

@ -2,7 +2,7 @@ import { Injectable, Logger } from '@nestjs/common';
import { InjectDataSource } from '@nestjs/typeorm';
import { DataSource } from 'typeorm';
import { AcquisitionQueryDto, AcquisitionCompareDto, ReferrerQueryDto } from './dto/acquisition-query.dto';
import { resolveCorpId, corpSessionFilter, corpRawEventsFilter } from '../common/corp-filter.util';
import { resolveCorpId, corpSessionFilter } from '../common/corp-filter.util';
export interface ChannelMetrics {
channel: string;
@ -232,102 +232,98 @@ export class AcquisitionService {
}
/**
* Get source/medium breakdown.
* Prefers aggregated_metrics traffic_source dimension; falls back to
* deriving source from raw_events.referrer when no aggregated data exists.
* Get source/medium breakdown from session_fingerprints.
* Groups by (utmSource | trafficSource) + (utmMedium | classification),
* computing engaged_sessions and conversions inline against raw_events.
* Optional channel filter narrows by sf."trafficSource".
*/
async getSources(query: AcquisitionQueryDto): Promise<SourceMetrics[]> {
const { startDate, endDate, limit } = query;
const { startDate, endDate, channel, limit } = query;
const corpId = await resolveCorpId(this.dataSource, query.corp);
// First try the aggregated_metrics traffic_source dimension (populated by processor)
// NOTE: aggregated_metrics has no corp dimension — when a corp scope is requested,
// skip this path and go straight to raw_events derivation (which IS corp-scoped).
const aggregatedQuery = `
WITH source_totals AS (
const params: (string | number)[] = [startDate, endDate];
let paramIndex = 3;
const extraConditions: string[] = [];
if (channel) {
extraConditions.push(`sf."trafficSource" = $${paramIndex}`);
params.push(channel);
paramIndex++;
}
if (corpId !== null) {
extraConditions.push(corpSessionFilter(paramIndex, corpId).replace(/^\s*AND\s+/i, ''));
params.push(corpId);
paramIndex++;
}
const extraWhere = extraConditions.length > 0
? ' AND ' + extraConditions.join(' AND ')
: '';
const sourcesQuery = `
WITH session_data AS (
SELECT
"dimensionValue" AS source,
SUM(value) AS sessions
FROM aggregated_metrics
WHERE "metricType" = 'page_views'
AND dimension = 'traffic_source'
AND "dimensionValue" IS NOT NULL
AND timestamp BETWEEN $1 AND $2
GROUP BY "dimensionValue"
sf."sessionId",
sf."userId",
COALESCE(sf."utmSource", sf."trafficSource", 'direct') AS source,
COALESCE(
sf."utmMedium",
CASE
WHEN sf."trafficSource" IN ('organic', 'paid', 'social', 'referral', 'email', 'display') THEN sf."trafficSource"
ELSE 'none'
END
) AS medium,
EXISTS (
SELECT 1 FROM raw_events re
WHERE re."sessionId" = sf."sessionId"
AND re."eventType" IN ('conversion', 'purchase')
) AS has_conversion,
COALESCE((
SELECT EXTRACT(EPOCH FROM (MAX(re.timestamp) - MIN(re.timestamp)))
FROM raw_events re WHERE re."sessionId" = sf."sessionId"
), 0) AS duration_seconds,
COALESCE((
SELECT COUNT(*) FROM raw_events re
WHERE re."sessionId" = sf."sessionId" AND re."eventType" IN ('pageView', 'pageview')
), 0) AS page_views
FROM session_fingerprints sf
WHERE sf."createdAt" BETWEEN $1 AND $2
AND sf."isBot" = false${extraWhere}
)
SELECT source, sessions FROM source_totals
SELECT
sd.source,
sd.medium,
COUNT(DISTINCT sd."sessionId") AS sessions,
COUNT(DISTINCT sd."userId") AS users,
COUNT(DISTINCT CASE WHEN sd.duration_seconds >= 10 OR sd.page_views >= 2 OR sd.has_conversion THEN sd."sessionId" END) AS engaged_sessions,
COUNT(DISTINCT CASE WHEN sd.has_conversion THEN sd."sessionId" END) AS conversions
FROM session_data sd
GROUP BY sd.source, sd.medium
ORDER BY sessions DESC
LIMIT $3
LIMIT $${paramIndex}
`;
params.push(limit ?? 20);
try {
const aggregated = corpId === null
? await this.dataSource.query(aggregatedQuery, [startDate, endDate, limit ?? 20])
: [];
const result = await this.dataSource.query(sourcesQuery, params);
if (aggregated.length > 0) {
return aggregated.map((row: Record<string, unknown>) => ({
return result.map((row: Record<string, unknown>) => {
const sessions = Number(row.sessions) || 0;
const engagedSessions = Number(row.engaged_sessions) || 0;
const conversions = Number(row.conversions) || 0;
return {
source: row.source as string,
medium: 'none',
sessions: Number(row.sessions) || 0,
users: 0,
engagementRate: 0,
conversions: 0,
conversionRate: 0,
}));
}
// Fallback: derive traffic source from raw_events.referrer per unique session
const rawQuery = `
WITH first_event AS (
SELECT DISTINCT ON ("sessionId")
"sessionId",
referrer,
"deviceType",
CASE
WHEN referrer IS NULL OR referrer = '' THEN 'direct'
WHEN referrer LIKE '%google.%' OR referrer LIKE '%bing.%'
OR referrer LIKE '%yahoo.%' OR referrer LIKE '%duckduckgo.%' THEN 'organic'
WHEN referrer LIKE '%twitter.%' OR referrer LIKE '%t.co%' THEN 'twitter'
WHEN referrer LIKE '%facebook.%' OR referrer LIKE '%fb.com%' THEN 'facebook'
WHEN referrer LIKE '%instagram.%' THEN 'instagram'
WHEN referrer LIKE '%reddit.%' THEN 'reddit'
WHEN referrer LIKE '%onlyfans.%' THEN 'onlyfans'
WHEN referrer IS NOT NULL AND referrer != ''
THEN COALESCE(substring(referrer FROM 'https?://([^/]+)'), 'referral')
ELSE 'direct'
END AS source
FROM raw_events
WHERE "eventType" IN ('pageview', 'pageView')
AND "deviceType" IS DISTINCT FROM 'bot'
AND timestamp BETWEEN $1 AND $2${corpRawEventsFilter(4, corpId, 'raw_events')}
ORDER BY "sessionId", timestamp ASC
)
SELECT
source,
'none' AS medium,
COUNT(DISTINCT "sessionId") AS sessions,
0::int AS users,
0::int AS conversions
FROM first_event
GROUP BY source
ORDER BY sessions DESC
LIMIT $3
`;
const rawParams: (string | number)[] = [startDate, endDate, limit ?? 20];
if (corpId !== null) rawParams.push(corpId);
const raw = await this.dataSource.query(rawQuery, rawParams);
return raw.map((row: Record<string, unknown>) => ({
source: row.source as string,
medium: row.medium as string,
sessions: Number(row.sessions) || 0,
users: 0,
engagementRate: 0,
conversions: 0,
conversionRate: 0,
}));
medium: row.medium as string,
sessions,
users: Number(row.users) || 0,
engagementRate: sessions > 0 ? engagedSessions / sessions : 0,
conversions,
conversionRate: sessions > 0 ? conversions / sessions : 0,
};
});
} catch (error) {
this.logger.error('Failed to get sources', error);
throw error;