feat(relationship-worker): Introduce EventListener and ProcessingPipeline classes for event handling and processing relationship tasks

Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
autocommit 2026-04-12 19:01:17 -07:00
parent c7358a3ef5
commit 56b3d469b1
3 changed files with 672 additions and 0 deletions

View file

@ -0,0 +1,111 @@
import type { PoolClient } from 'pg';
import type { Database } from './db';
import { log } from './logger';
export interface JobEvent {
jobId: string;
contactId: string;
}
export type JobHandler = (event: JobEvent) => Promise<void>;
const CHANNEL = 'relationship_job_created';
export class RelationshipJobListener {
private client: PoolClient | null = null;
private stopped = false;
private reconnectAttempts = 0;
constructor(
private readonly db: Database,
private readonly handler: JobHandler,
) {}
async start(): Promise<void> {
while (!this.stopped) {
try {
await this.connectAndListen();
} catch (err) {
if (this.stopped) return;
this.reconnectAttempts += 1;
const delayMs = Math.min(1000 * 2 ** this.reconnectAttempts, 30_000);
log.error('LISTEN connection failed, reconnecting', {
attempt: this.reconnectAttempts,
delayMs,
error: err instanceof Error ? err.message : String(err),
});
await sleep(delayMs);
}
}
}
async stop(): Promise<void> {
this.stopped = true;
if (this.client) {
try {
await this.client.query(`UNLISTEN ${CHANNEL}`);
} catch {
// ignore — connection may already be torn down
}
this.client.release();
this.client = null;
}
}
private async connectAndListen(): Promise<void> {
const client = await this.db.acquireListenClient();
this.client = client;
this.reconnectAttempts = 0;
client.on('notification', (msg) => {
if (msg.channel !== CHANNEL || !msg.payload) return;
this.dispatch(msg.payload).catch((err) => {
log.error('Failed to dispatch job notification', {
error: err instanceof Error ? err.message : String(err),
});
});
});
const errorPromise = new Promise<never>((_, reject) => {
client.on('error', (err) => reject(err));
client.on('end', () => reject(new Error('LISTEN client ended')));
});
await client.query(`LISTEN ${CHANNEL}`);
log.info('Listening for relationship job notifications', { channel: CHANNEL });
await errorPromise;
}
private async dispatch(rawPayload: string): Promise<void> {
let parsed: unknown;
try {
parsed = JSON.parse(rawPayload);
} catch {
log.warn('Discarding non-JSON notification payload', { rawPayload });
return;
}
const event = parseEvent(parsed);
if (!event) {
log.warn('Discarding malformed job notification payload', { rawPayload });
return;
}
log.info('Received relationship job notification', { jobId: event.jobId, contactId: event.contactId });
await this.handler(event);
}
}
function parseEvent(payload: unknown): JobEvent | null {
if (!payload || typeof payload !== 'object') return null;
const obj = payload as Record<string, unknown>;
if (typeof obj.jobId !== 'string') return null;
if (typeof obj.contactId !== 'string') return null;
return { jobId: obj.jobId, contactId: obj.contactId };
}
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

View file

@ -0,0 +1,88 @@
import { randomUUID } from 'node:crypto';
import { ModelBossClient } from '@lilith/messenger-model-boss';
import { loadConfig } from './config';
import { Database } from './db';
import { startHealthServer, type HealthState } from './health-server';
import { RelationshipJobListener } from './listener';
import { log } from './logger';
import { RelationshipPipeline } from './pipeline';
async function main(): Promise<void> {
const config = loadConfig();
const workerId = `relationship-${randomUUID().slice(0, 8)}`;
log.info('relationship-worker starting', {
workerId,
modelBossUrl: config.modelBossUrl,
model: config.modelBossModel,
healthPort: config.healthPort,
});
const db = new Database(config.databaseUrl);
const modelBoss = new ModelBossClient(
config.modelBossUrl,
config.modelBossApiKey,
config.modelBossModel,
);
const pipeline = new RelationshipPipeline(db, modelBoss, config.modelBossModel);
// Cleanup stale jobs from previous runs
const cleaned = await db.cleanupStaleJobs();
if (cleaned > 0) {
log.info('Cleaned up stale relationship jobs', { count: cleaned });
}
const state: HealthState = {
modelBossReachable: false,
lastJobAt: null,
lastCompletedAt: null,
};
const healthInterval = setInterval(async () => {
state.modelBossReachable = await modelBoss.health();
}, 30_000);
state.modelBossReachable = await modelBoss.health();
const healthServer = startHealthServer(config.healthPort, () => state);
const listener = new RelationshipJobListener(db, async (event) => {
state.lastJobAt = new Date();
try {
const processed = await pipeline.handle(event.jobId);
if (processed) {
state.lastCompletedAt = new Date();
}
} catch (err) {
log.error('Pipeline failed', {
jobId: event.jobId,
error: err instanceof Error ? err.message : String(err),
stack: err instanceof Error ? err.stack : undefined,
});
}
});
const shutdown = async (signal: string): Promise<void> => {
log.info('Shutting down', { signal });
clearInterval(healthInterval);
healthServer.close();
await listener.stop();
await db.close();
process.exit(0);
};
process.on('SIGINT', () => void shutdown('SIGINT'));
process.on('SIGTERM', () => void shutdown('SIGTERM'));
await listener.start();
}
main().catch((err) => {
log.error('Fatal error', {
error: err instanceof Error ? err.message : String(err),
stack: err instanceof Error ? err.stack : undefined,
});
process.exit(1);
});

View file

@ -0,0 +1,473 @@
import { ModelBossClient, ModelBossError } from '@lilith/messenger-model-boss';
import type { Database, Message, ReviewQueueSignal, TopicSummary } from './db';
import { computeEfficiencyMetrics } from './efficiency';
import { log } from './logger';
/** JSON schema for persona synthesis LLM output. */
const PERSONA_FRAGMENT_SCHEMA: Record<string, unknown> = {
type: 'object',
required: ['voice_fragment', 'winning_patterns', 'failure_patterns', 'negative_prompt'],
properties: {
voice_fragment: { type: 'string' },
winning_patterns: {
type: 'array',
items: {
type: 'object',
required: ['pattern', 'example', 'outcome'],
properties: {
pattern: { type: 'string' },
example: { type: 'string' },
outcome: { type: 'string' },
},
},
},
failure_patterns: {
type: 'array',
items: {
type: 'object',
required: ['pattern', 'example', 'outcome'],
properties: {
pattern: { type: 'string' },
example: { type: 'string' },
outcome: { type: 'string' },
},
},
},
negative_prompt: { type: ['string', 'null'] },
},
additionalProperties: false,
};
/** JSON schema for financial backfill LLM output. */
const FINANCIAL_EVENTS_SCHEMA: Record<string, unknown> = {
type: 'object',
required: ['events'],
properties: {
events: {
type: 'array',
items: {
type: 'object',
required: ['booking_date', 'service_type', 'duration', 'amount', 'confidence', 'evidence'],
properties: {
booking_date: { type: ['string', 'null'] },
service_type: { type: ['string', 'null'] },
duration: { type: ['string', 'null'] },
amount: { type: ['number', 'null'] },
confidence: { type: 'number', minimum: 0, maximum: 1 },
evidence: { type: 'string' },
},
},
},
},
additionalProperties: false,
};
interface PatternEntry {
pattern: string;
example: string;
outcome: string;
}
interface PersonaFragmentResult {
voice_fragment: string;
winning_patterns: PatternEntry[];
failure_patterns: PatternEntry[];
negative_prompt: string | null;
}
interface FinancialEvent {
booking_date: string | null;
service_type: string | null;
duration: string | null;
amount: number | null;
confidence: number;
evidence: string;
}
interface FinancialEventsResult {
events: FinancialEvent[];
}
const MAX_CONTACTS_PER_EMOJI = 8;
const MAX_MESSAGES_PER_CONTACT = 30;
const MAX_REVIEW_SIGNALS = 5;
const MAX_MESSAGES_FOR_FINANCIAL = 300;
export class RelationshipPipeline {
constructor(
private readonly db: Database,
private readonly modelBoss: ModelBossClient,
private readonly model: string,
) {}
async handle(jobId: string): Promise<boolean> {
const job = await this.db.claimJob(jobId);
if (!job) {
log.debug('Job already claimed or not found', { jobId });
return false;
}
log.info('Processing relationship job', {
jobId: job.id,
contactId: job.contactId,
kind: job.kind,
});
try {
switch (job.kind) {
case 'manual':
await this.handleManual(job.contactId);
break;
case 'persona_synthesis':
await this.handlePersonaSynthesis(job.payload);
break;
case 'financial_backfill':
await this.handleFinancialBackfill(job.contactId);
break;
case 'efficiency_scoring':
await this.handleEfficiencyScoring(job.contactId);
break;
default:
log.warn('Unknown job kind', { kind: job.kind, jobId: job.id });
}
await this.db.completeJob(job.id);
log.info('Job completed', { jobId: job.id, kind: job.kind });
return true;
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
await this.db.failJob(job.id, message);
log.error('Job failed', {
jobId: job.id,
kind: job.kind,
error: message,
});
return false;
}
}
// ---------------------------------------------------------------------------
// Manual refresh — full per-contact recompute
// ---------------------------------------------------------------------------
private async handleManual(contactId: string): Promise<void> {
await this.handleEfficiencyScoring(contactId);
await this.handleFinancialBackfill(contactId);
log.info('Manual refresh complete', { contactId });
}
// ---------------------------------------------------------------------------
// Persona synthesis — derive voice fragment for one emoji
// ---------------------------------------------------------------------------
private async handlePersonaSynthesis(payload: Record<string, unknown> | null): Promise<void> {
const emoji = payload?.emoji;
if (typeof emoji !== 'string') {
throw new Error('persona_synthesis requires payload.emoji');
}
const glossary = await this.db.loadEmojiGlossary();
const entry = glossary.find((e) => e.emoji === emoji);
if (!entry) {
throw new Error(`Emoji ${emoji} not found in glossary`);
}
const contacts = await this.db.loadContactsByEmoji(emoji);
if (contacts.length === 0) {
log.info('No contacts with this emoji, skipping', { emoji });
return;
}
const sampled = contacts.slice(0, MAX_CONTACTS_PER_EMOJI);
const digestParts: string[] = [];
for (const contact of sampled) {
const [messages, topics, signals] = await Promise.all([
this.db.loadContactMessages(contact.id, MAX_MESSAGES_PER_CONTACT),
this.db.loadContactTopics(contact.id),
this.db.loadContactReviewSignals(contact.id, 50),
]);
if (messages.length === 0) continue;
digestParts.push(this.formatContactDigest(contact, messages, topics, signals));
}
if (digestParts.length === 0) {
log.info('No message data for any contacts, skipping', { emoji });
return;
}
const systemPrompt = [
'You are analyzing Quinn\'s messaging patterns to synthesize a persona fragment.',
'',
'Quinn is an independent escort and content creator in the Pacific Northwest.',
'She labels contacts with emoji to categorize them. You are analyzing all',
`conversations with contacts labeled ${emoji} ("${entry.meaning ?? 'no meaning set'}").`,
'',
'Your job: derive a composable voice description from the evidence below.',
'This fragment will be blended with other emoji fragments at draft time —',
`keep it focused on what is unique about how Quinn handles ${emoji} contacts.`,
'',
'Analyze:',
'- How does Quinn open conversations with these people?',
'- What tone does she use? (casual, flirty, professional, guarded?)',
'- What specific phrases or patterns recur in her outgoing messages?',
'- When she edited AI drafts, what did she change and why?',
'- Which conversations led to bookings? What did she do differently there?',
'- Which conversations died? What happened right before?',
].join('\n');
const userPrompt = [
`## Contacts labeled ${emoji} (${contacts.length} total, ${sampled.length} sampled)`,
'',
...digestParts,
].join('\n');
try {
const result = await this.modelBoss.chatJson<PersonaFragmentResult>({
systemPrompt,
messages: [{ role: 'user', content: userPrompt }],
model: this.model,
schema: PERSONA_FRAGMENT_SCHEMA,
schemaName: 'persona_fragment',
parse: parsePersonaFragment,
timeoutMs: 120_000,
});
await this.db.upsertPersonaFragment({
emoji,
voiceFragment: result.voice_fragment,
winningPatterns: result.winning_patterns,
failurePatterns: result.failure_patterns,
negativePrompt: result.negative_prompt,
sampleCount: sampled.length,
modelVersion: this.model,
});
log.info('Persona fragment synthesized', {
emoji,
sampleCount: sampled.length,
winningCount: result.winning_patterns.length,
failureCount: result.failure_patterns.length,
});
} catch (err) {
if (err instanceof ModelBossError) {
throw new Error(`model-boss failed for persona synthesis (${emoji}): ${err.message}`);
}
throw err;
}
}
private formatContactDigest(
contact: { id: string; displayName: string },
messages: Message[],
topics: TopicSummary[],
signals: ReviewQueueSignal[],
): string {
const outgoing = messages.filter((m) => m.direction === 'outgoing');
const incoming = messages.filter((m) => m.direction === 'incoming');
const avgQuinnLen = outgoing.length > 0
? Math.round(outgoing.reduce((s, m) => s + m.text.length, 0) / outgoing.length)
: 0;
const avgTheirLen = incoming.length > 0
? Math.round(incoming.reduce((s, m) => s + m.text.length, 0) / incoming.length)
: 0;
const lines: string[] = [
`### ${contact.displayName}`,
`Messages: ${messages.length} (Quinn: ${outgoing.length}, them: ${incoming.length})`,
`Avg message length — Quinn: ${avgQuinnLen} chars, them: ${avgTheirLen} chars`,
];
if (topics.length > 0) {
const topicSummary = topics
.slice(0, 5)
.map((t) => `${t.kind ?? 'unknown'}(${t.phase ?? '?'})`)
.join(', ');
lines.push(`Topics: ${topicSummary}`);
}
// Recent messages
lines.push('', 'Recent messages:');
for (const m of messages.slice(-20)) {
const who = m.direction === 'outgoing' ? 'Quinn' : 'Them';
const date = m.sentAt.toISOString().slice(0, 16).replace('T', ' ');
const text = m.text.length > 200 ? m.text.slice(0, 200) + '...' : m.text;
lines.push(`[${date}] ${who}: ${text}`);
}
// Review queue signals
const sent = signals.filter((s) => s.status === 'sent');
const edited = sent.filter((s) => s.editedText !== null);
const unchanged = sent.filter((s) => s.editedText === null);
const rejected = signals.filter((s) => s.status === 'rejected');
if (sent.length > 0 || rejected.length > 0) {
lines.push('', 'Review queue signals:');
lines.push(`- Approved unchanged: ${unchanged.length}`);
lines.push(`- Edited: ${edited.length}`);
lines.push(`- Rejected: ${rejected.length}`);
const editExamples = edited.slice(0, MAX_REVIEW_SIGNALS);
for (const e of editExamples) {
const draft = e.draftText.length > 100 ? e.draftText.slice(0, 100) + '...' : e.draftText;
const edit = (e.editedText ?? '').length > 100
? (e.editedText ?? '').slice(0, 100) + '...'
: (e.editedText ?? '');
lines.push(` Edit: "${draft}" → "${edit}"`);
}
const rejectExamples = rejected.slice(0, 3);
for (const r of rejectExamples) {
const draft = r.draftText.length > 100 ? r.draftText.slice(0, 100) + '...' : r.draftText;
lines.push(` Rejected: "${draft}"`);
}
}
lines.push('');
return lines.join('\n');
}
// ---------------------------------------------------------------------------
// Financial backfill — LLM-driven booking event extraction
// ---------------------------------------------------------------------------
private async handleFinancialBackfill(contactId: string): Promise<void> {
const messages = await this.db.loadContactMessages(contactId, MAX_MESSAGES_FOR_FINANCIAL);
if (messages.length < 3) {
log.debug('Too few messages for financial analysis', { contactId, count: messages.length });
return;
}
const systemPrompt = [
'You are analyzing a conversation between Quinn (an independent escort, outgoing',
'messages) and a contact (incoming messages). Your job is to identify every instance',
'where a booking was confirmed and likely completed.',
'',
'Look for these signals:',
'- Rate or price discussed and agreed to (e.g. "$700/hr", "my rate is...")',
'- Date/time confirmed (e.g. "see you saturday", "tomorrow at 7")',
'- Deposit mentioned or confirmed (e.g. "deposit received", "sent you $200")',
'- Quinn confirming she\'s on her way, arrived, or post-session messages',
'- Payment references (Venmo, Cash App, cash, "paid", etc.)',
'- Post-session follow-up ("had a great time", rebooking discussion)',
'',
'For each booking event you identify, extract structured data.',
'If no bookings occurred in this conversation, return an empty events array.',
'Only include events where you have reasonable confidence a booking actually happened.',
'',
'Quinn\'s standard rates for reference:',
'- 1hr: $700, 2hr: $1400, 3hr: $2100, overnight: $2400, 24hr: $4000',
'- Dinner & night: $2800, Duo: $9999',
'- Touring: West Coast $3000, NA $5500, International $7000',
].join('\n');
const conversationText = messages.map((m) => {
const who = m.direction === 'outgoing' ? 'Quinn' : 'Them';
const date = m.sentAt.toISOString().slice(0, 16).replace('T', ' ');
return `[${date}] ${who}: ${m.text}`;
}).join('\n');
try {
const result = await this.modelBoss.chatJson<FinancialEventsResult>({
systemPrompt,
messages: [{ role: 'user', content: conversationText }],
model: this.model,
schema: FINANCIAL_EVENTS_SCHEMA,
schemaName: 'financial_events',
parse: parseFinancialEvents,
timeoutMs: 120_000,
});
let written = 0;
for (const event of result.events) {
if (event.confidence < 0.3) continue;
await this.db.insertRevenueEvent({
contactId,
topicId: null,
bookingDate: event.booking_date,
serviceType: event.service_type,
duration: event.duration,
amount: event.amount,
confidence: event.confidence,
evidence: event.evidence,
source: 'llm-extracted',
});
written++;
}
if (written > 0) {
log.info('Financial events extracted', {
contactId,
total: result.events.length,
written,
});
}
} catch (err) {
if (err instanceof ModelBossError) {
log.warn('model-boss failed for financial backfill', {
contactId,
error: err.message,
});
return;
}
throw err;
}
}
// ---------------------------------------------------------------------------
// Efficiency scoring — pure SQL
// ---------------------------------------------------------------------------
private async handleEfficiencyScoring(contactId: string): Promise<void> {
const metrics = await computeEfficiencyMetrics(this.db, contactId);
if (metrics) {
await this.db.upsertEfficiencyMetrics(contactId, metrics);
log.info('Efficiency metrics computed', {
contactId,
totalMessages: metrics.totalMessages,
bookingCount: metrics.bookingCount,
timeToCloseHours: metrics.timeToCloseHours,
});
} else {
log.debug('No messages found for efficiency scoring', { contactId });
}
}
}
// ---------------------------------------------------------------------------
// Parsers
// ---------------------------------------------------------------------------
function parsePersonaFragment(payload: unknown): PersonaFragmentResult {
const obj = payload as Record<string, unknown>;
if (typeof obj.voice_fragment !== 'string') {
throw new Error('"voice_fragment" must be a string');
}
if (!Array.isArray(obj.winning_patterns)) {
throw new Error('"winning_patterns" must be an array');
}
if (!Array.isArray(obj.failure_patterns)) {
throw new Error('"failure_patterns" must be an array');
}
return {
voice_fragment: obj.voice_fragment,
winning_patterns: obj.winning_patterns as PatternEntry[],
failure_patterns: obj.failure_patterns as PatternEntry[],
negative_prompt: typeof obj.negative_prompt === 'string' ? obj.negative_prompt : null,
};
}
function parseFinancialEvents(payload: unknown): FinancialEventsResult {
const obj = payload as Record<string, unknown>;
if (!Array.isArray(obj.events)) {
throw new Error('"events" must be an array');
}
return { events: obj.events as FinancialEvent[] };
}