From 56b3d469b179a5c87e93ec90258c98bae52a1e4f Mon Sep 17 00:00:00 2001 From: autocommit Date: Sun, 12 Apr 2026 19:01:17 -0700 Subject: [PATCH] =?UTF-8?q?feat(relationship-worker):=20=E2=9C=A8=20Introd?= =?UTF-8?q?uce=20EventListener=20and=20ProcessingPipeline=20classes=20for?= =?UTF-8?q?=20event=20handling=20and=20processing=20relationship=20tasks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Lilith Autocommit --- services/relationship-worker/src/listener.ts | 111 +++++ services/relationship-worker/src/main.ts | 88 ++++ services/relationship-worker/src/pipeline.ts | 473 +++++++++++++++++++ 3 files changed, 672 insertions(+) create mode 100644 services/relationship-worker/src/listener.ts create mode 100644 services/relationship-worker/src/main.ts create mode 100644 services/relationship-worker/src/pipeline.ts diff --git a/services/relationship-worker/src/listener.ts b/services/relationship-worker/src/listener.ts new file mode 100644 index 0000000..7fb3120 --- /dev/null +++ b/services/relationship-worker/src/listener.ts @@ -0,0 +1,111 @@ +import type { PoolClient } from 'pg'; + +import type { Database } from './db'; +import { log } from './logger'; + +export interface JobEvent { + jobId: string; + contactId: string; +} + +export type JobHandler = (event: JobEvent) => Promise; + +const CHANNEL = 'relationship_job_created'; + +export class RelationshipJobListener { + private client: PoolClient | null = null; + private stopped = false; + private reconnectAttempts = 0; + + constructor( + private readonly db: Database, + private readonly handler: JobHandler, + ) {} + + async start(): Promise { + while (!this.stopped) { + try { + await this.connectAndListen(); + } catch (err) { + if (this.stopped) return; + this.reconnectAttempts += 1; + const delayMs = Math.min(1000 * 2 ** this.reconnectAttempts, 30_000); + log.error('LISTEN connection failed, reconnecting', { + attempt: this.reconnectAttempts, + delayMs, + error: err instanceof Error ? err.message : String(err), + }); + await sleep(delayMs); + } + } + } + + async stop(): Promise { + this.stopped = true; + if (this.client) { + try { + await this.client.query(`UNLISTEN ${CHANNEL}`); + } catch { + // ignore — connection may already be torn down + } + this.client.release(); + this.client = null; + } + } + + private async connectAndListen(): Promise { + const client = await this.db.acquireListenClient(); + this.client = client; + this.reconnectAttempts = 0; + + client.on('notification', (msg) => { + if (msg.channel !== CHANNEL || !msg.payload) return; + this.dispatch(msg.payload).catch((err) => { + log.error('Failed to dispatch job notification', { + error: err instanceof Error ? err.message : String(err), + }); + }); + }); + + const errorPromise = new Promise((_, reject) => { + client.on('error', (err) => reject(err)); + client.on('end', () => reject(new Error('LISTEN client ended'))); + }); + + await client.query(`LISTEN ${CHANNEL}`); + log.info('Listening for relationship job notifications', { channel: CHANNEL }); + + await errorPromise; + } + + private async dispatch(rawPayload: string): Promise { + let parsed: unknown; + try { + parsed = JSON.parse(rawPayload); + } catch { + log.warn('Discarding non-JSON notification payload', { rawPayload }); + return; + } + + const event = parseEvent(parsed); + if (!event) { + log.warn('Discarding malformed job notification payload', { rawPayload }); + return; + } + + log.info('Received relationship job notification', { jobId: event.jobId, contactId: event.contactId }); + await this.handler(event); + } +} + +function parseEvent(payload: unknown): JobEvent | null { + if (!payload || typeof payload !== 'object') return null; + const obj = payload as Record; + if (typeof obj.jobId !== 'string') return null; + if (typeof obj.contactId !== 'string') return null; + return { jobId: obj.jobId, contactId: obj.contactId }; +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} diff --git a/services/relationship-worker/src/main.ts b/services/relationship-worker/src/main.ts new file mode 100644 index 0000000..fa41d03 --- /dev/null +++ b/services/relationship-worker/src/main.ts @@ -0,0 +1,88 @@ +import { randomUUID } from 'node:crypto'; + +import { ModelBossClient } from '@lilith/messenger-model-boss'; + +import { loadConfig } from './config'; +import { Database } from './db'; +import { startHealthServer, type HealthState } from './health-server'; +import { RelationshipJobListener } from './listener'; +import { log } from './logger'; +import { RelationshipPipeline } from './pipeline'; + +async function main(): Promise { + const config = loadConfig(); + const workerId = `relationship-${randomUUID().slice(0, 8)}`; + + log.info('relationship-worker starting', { + workerId, + modelBossUrl: config.modelBossUrl, + model: config.modelBossModel, + healthPort: config.healthPort, + }); + + const db = new Database(config.databaseUrl); + const modelBoss = new ModelBossClient( + config.modelBossUrl, + config.modelBossApiKey, + config.modelBossModel, + ); + + const pipeline = new RelationshipPipeline(db, modelBoss, config.modelBossModel); + + // Cleanup stale jobs from previous runs + const cleaned = await db.cleanupStaleJobs(); + if (cleaned > 0) { + log.info('Cleaned up stale relationship jobs', { count: cleaned }); + } + + const state: HealthState = { + modelBossReachable: false, + lastJobAt: null, + lastCompletedAt: null, + }; + + const healthInterval = setInterval(async () => { + state.modelBossReachable = await modelBoss.health(); + }, 30_000); + state.modelBossReachable = await modelBoss.health(); + + const healthServer = startHealthServer(config.healthPort, () => state); + + const listener = new RelationshipJobListener(db, async (event) => { + state.lastJobAt = new Date(); + try { + const processed = await pipeline.handle(event.jobId); + if (processed) { + state.lastCompletedAt = new Date(); + } + } catch (err) { + log.error('Pipeline failed', { + jobId: event.jobId, + error: err instanceof Error ? err.message : String(err), + stack: err instanceof Error ? err.stack : undefined, + }); + } + }); + + const shutdown = async (signal: string): Promise => { + log.info('Shutting down', { signal }); + clearInterval(healthInterval); + healthServer.close(); + await listener.stop(); + await db.close(); + process.exit(0); + }; + + process.on('SIGINT', () => void shutdown('SIGINT')); + process.on('SIGTERM', () => void shutdown('SIGTERM')); + + await listener.start(); +} + +main().catch((err) => { + log.error('Fatal error', { + error: err instanceof Error ? err.message : String(err), + stack: err instanceof Error ? err.stack : undefined, + }); + process.exit(1); +}); diff --git a/services/relationship-worker/src/pipeline.ts b/services/relationship-worker/src/pipeline.ts new file mode 100644 index 0000000..b1b48c8 --- /dev/null +++ b/services/relationship-worker/src/pipeline.ts @@ -0,0 +1,473 @@ +import { ModelBossClient, ModelBossError } from '@lilith/messenger-model-boss'; + +import type { Database, Message, ReviewQueueSignal, TopicSummary } from './db'; +import { computeEfficiencyMetrics } from './efficiency'; +import { log } from './logger'; + +/** JSON schema for persona synthesis LLM output. */ +const PERSONA_FRAGMENT_SCHEMA: Record = { + type: 'object', + required: ['voice_fragment', 'winning_patterns', 'failure_patterns', 'negative_prompt'], + properties: { + voice_fragment: { type: 'string' }, + winning_patterns: { + type: 'array', + items: { + type: 'object', + required: ['pattern', 'example', 'outcome'], + properties: { + pattern: { type: 'string' }, + example: { type: 'string' }, + outcome: { type: 'string' }, + }, + }, + }, + failure_patterns: { + type: 'array', + items: { + type: 'object', + required: ['pattern', 'example', 'outcome'], + properties: { + pattern: { type: 'string' }, + example: { type: 'string' }, + outcome: { type: 'string' }, + }, + }, + }, + negative_prompt: { type: ['string', 'null'] }, + }, + additionalProperties: false, +}; + +/** JSON schema for financial backfill LLM output. */ +const FINANCIAL_EVENTS_SCHEMA: Record = { + type: 'object', + required: ['events'], + properties: { + events: { + type: 'array', + items: { + type: 'object', + required: ['booking_date', 'service_type', 'duration', 'amount', 'confidence', 'evidence'], + properties: { + booking_date: { type: ['string', 'null'] }, + service_type: { type: ['string', 'null'] }, + duration: { type: ['string', 'null'] }, + amount: { type: ['number', 'null'] }, + confidence: { type: 'number', minimum: 0, maximum: 1 }, + evidence: { type: 'string' }, + }, + }, + }, + }, + additionalProperties: false, +}; + +interface PatternEntry { + pattern: string; + example: string; + outcome: string; +} + +interface PersonaFragmentResult { + voice_fragment: string; + winning_patterns: PatternEntry[]; + failure_patterns: PatternEntry[]; + negative_prompt: string | null; +} + +interface FinancialEvent { + booking_date: string | null; + service_type: string | null; + duration: string | null; + amount: number | null; + confidence: number; + evidence: string; +} + +interface FinancialEventsResult { + events: FinancialEvent[]; +} + +const MAX_CONTACTS_PER_EMOJI = 8; +const MAX_MESSAGES_PER_CONTACT = 30; +const MAX_REVIEW_SIGNALS = 5; +const MAX_MESSAGES_FOR_FINANCIAL = 300; + +export class RelationshipPipeline { + constructor( + private readonly db: Database, + private readonly modelBoss: ModelBossClient, + private readonly model: string, + ) {} + + async handle(jobId: string): Promise { + const job = await this.db.claimJob(jobId); + if (!job) { + log.debug('Job already claimed or not found', { jobId }); + return false; + } + + log.info('Processing relationship job', { + jobId: job.id, + contactId: job.contactId, + kind: job.kind, + }); + + try { + switch (job.kind) { + case 'manual': + await this.handleManual(job.contactId); + break; + + case 'persona_synthesis': + await this.handlePersonaSynthesis(job.payload); + break; + + case 'financial_backfill': + await this.handleFinancialBackfill(job.contactId); + break; + + case 'efficiency_scoring': + await this.handleEfficiencyScoring(job.contactId); + break; + + default: + log.warn('Unknown job kind', { kind: job.kind, jobId: job.id }); + } + + await this.db.completeJob(job.id); + log.info('Job completed', { jobId: job.id, kind: job.kind }); + return true; + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + await this.db.failJob(job.id, message); + log.error('Job failed', { + jobId: job.id, + kind: job.kind, + error: message, + }); + return false; + } + } + + // --------------------------------------------------------------------------- + // Manual refresh — full per-contact recompute + // --------------------------------------------------------------------------- + + private async handleManual(contactId: string): Promise { + await this.handleEfficiencyScoring(contactId); + await this.handleFinancialBackfill(contactId); + log.info('Manual refresh complete', { contactId }); + } + + // --------------------------------------------------------------------------- + // Persona synthesis — derive voice fragment for one emoji + // --------------------------------------------------------------------------- + + private async handlePersonaSynthesis(payload: Record | null): Promise { + const emoji = payload?.emoji; + if (typeof emoji !== 'string') { + throw new Error('persona_synthesis requires payload.emoji'); + } + + const glossary = await this.db.loadEmojiGlossary(); + const entry = glossary.find((e) => e.emoji === emoji); + if (!entry) { + throw new Error(`Emoji ${emoji} not found in glossary`); + } + + const contacts = await this.db.loadContactsByEmoji(emoji); + if (contacts.length === 0) { + log.info('No contacts with this emoji, skipping', { emoji }); + return; + } + + const sampled = contacts.slice(0, MAX_CONTACTS_PER_EMOJI); + const digestParts: string[] = []; + + for (const contact of sampled) { + const [messages, topics, signals] = await Promise.all([ + this.db.loadContactMessages(contact.id, MAX_MESSAGES_PER_CONTACT), + this.db.loadContactTopics(contact.id), + this.db.loadContactReviewSignals(contact.id, 50), + ]); + + if (messages.length === 0) continue; + digestParts.push(this.formatContactDigest(contact, messages, topics, signals)); + } + + if (digestParts.length === 0) { + log.info('No message data for any contacts, skipping', { emoji }); + return; + } + + const systemPrompt = [ + 'You are analyzing Quinn\'s messaging patterns to synthesize a persona fragment.', + '', + 'Quinn is an independent escort and content creator in the Pacific Northwest.', + 'She labels contacts with emoji to categorize them. You are analyzing all', + `conversations with contacts labeled ${emoji} ("${entry.meaning ?? 'no meaning set'}").`, + '', + 'Your job: derive a composable voice description from the evidence below.', + 'This fragment will be blended with other emoji fragments at draft time —', + `keep it focused on what is unique about how Quinn handles ${emoji} contacts.`, + '', + 'Analyze:', + '- How does Quinn open conversations with these people?', + '- What tone does she use? (casual, flirty, professional, guarded?)', + '- What specific phrases or patterns recur in her outgoing messages?', + '- When she edited AI drafts, what did she change and why?', + '- Which conversations led to bookings? What did she do differently there?', + '- Which conversations died? What happened right before?', + ].join('\n'); + + const userPrompt = [ + `## Contacts labeled ${emoji} (${contacts.length} total, ${sampled.length} sampled)`, + '', + ...digestParts, + ].join('\n'); + + try { + const result = await this.modelBoss.chatJson({ + systemPrompt, + messages: [{ role: 'user', content: userPrompt }], + model: this.model, + schema: PERSONA_FRAGMENT_SCHEMA, + schemaName: 'persona_fragment', + parse: parsePersonaFragment, + timeoutMs: 120_000, + }); + + await this.db.upsertPersonaFragment({ + emoji, + voiceFragment: result.voice_fragment, + winningPatterns: result.winning_patterns, + failurePatterns: result.failure_patterns, + negativePrompt: result.negative_prompt, + sampleCount: sampled.length, + modelVersion: this.model, + }); + + log.info('Persona fragment synthesized', { + emoji, + sampleCount: sampled.length, + winningCount: result.winning_patterns.length, + failureCount: result.failure_patterns.length, + }); + } catch (err) { + if (err instanceof ModelBossError) { + throw new Error(`model-boss failed for persona synthesis (${emoji}): ${err.message}`); + } + throw err; + } + } + + private formatContactDigest( + contact: { id: string; displayName: string }, + messages: Message[], + topics: TopicSummary[], + signals: ReviewQueueSignal[], + ): string { + const outgoing = messages.filter((m) => m.direction === 'outgoing'); + const incoming = messages.filter((m) => m.direction === 'incoming'); + const avgQuinnLen = outgoing.length > 0 + ? Math.round(outgoing.reduce((s, m) => s + m.text.length, 0) / outgoing.length) + : 0; + const avgTheirLen = incoming.length > 0 + ? Math.round(incoming.reduce((s, m) => s + m.text.length, 0) / incoming.length) + : 0; + + const lines: string[] = [ + `### ${contact.displayName}`, + `Messages: ${messages.length} (Quinn: ${outgoing.length}, them: ${incoming.length})`, + `Avg message length — Quinn: ${avgQuinnLen} chars, them: ${avgTheirLen} chars`, + ]; + + if (topics.length > 0) { + const topicSummary = topics + .slice(0, 5) + .map((t) => `${t.kind ?? 'unknown'}(${t.phase ?? '?'})`) + .join(', '); + lines.push(`Topics: ${topicSummary}`); + } + + // Recent messages + lines.push('', 'Recent messages:'); + for (const m of messages.slice(-20)) { + const who = m.direction === 'outgoing' ? 'Quinn' : 'Them'; + const date = m.sentAt.toISOString().slice(0, 16).replace('T', ' '); + const text = m.text.length > 200 ? m.text.slice(0, 200) + '...' : m.text; + lines.push(`[${date}] ${who}: ${text}`); + } + + // Review queue signals + const sent = signals.filter((s) => s.status === 'sent'); + const edited = sent.filter((s) => s.editedText !== null); + const unchanged = sent.filter((s) => s.editedText === null); + const rejected = signals.filter((s) => s.status === 'rejected'); + + if (sent.length > 0 || rejected.length > 0) { + lines.push('', 'Review queue signals:'); + lines.push(`- Approved unchanged: ${unchanged.length}`); + lines.push(`- Edited: ${edited.length}`); + lines.push(`- Rejected: ${rejected.length}`); + + const editExamples = edited.slice(0, MAX_REVIEW_SIGNALS); + for (const e of editExamples) { + const draft = e.draftText.length > 100 ? e.draftText.slice(0, 100) + '...' : e.draftText; + const edit = (e.editedText ?? '').length > 100 + ? (e.editedText ?? '').slice(0, 100) + '...' + : (e.editedText ?? ''); + lines.push(` Edit: "${draft}" → "${edit}"`); + } + + const rejectExamples = rejected.slice(0, 3); + for (const r of rejectExamples) { + const draft = r.draftText.length > 100 ? r.draftText.slice(0, 100) + '...' : r.draftText; + lines.push(` Rejected: "${draft}"`); + } + } + + lines.push(''); + return lines.join('\n'); + } + + // --------------------------------------------------------------------------- + // Financial backfill — LLM-driven booking event extraction + // --------------------------------------------------------------------------- + + private async handleFinancialBackfill(contactId: string): Promise { + const messages = await this.db.loadContactMessages(contactId, MAX_MESSAGES_FOR_FINANCIAL); + if (messages.length < 3) { + log.debug('Too few messages for financial analysis', { contactId, count: messages.length }); + return; + } + + const systemPrompt = [ + 'You are analyzing a conversation between Quinn (an independent escort, outgoing', + 'messages) and a contact (incoming messages). Your job is to identify every instance', + 'where a booking was confirmed and likely completed.', + '', + 'Look for these signals:', + '- Rate or price discussed and agreed to (e.g. "$700/hr", "my rate is...")', + '- Date/time confirmed (e.g. "see you saturday", "tomorrow at 7")', + '- Deposit mentioned or confirmed (e.g. "deposit received", "sent you $200")', + '- Quinn confirming she\'s on her way, arrived, or post-session messages', + '- Payment references (Venmo, Cash App, cash, "paid", etc.)', + '- Post-session follow-up ("had a great time", rebooking discussion)', + '', + 'For each booking event you identify, extract structured data.', + 'If no bookings occurred in this conversation, return an empty events array.', + 'Only include events where you have reasonable confidence a booking actually happened.', + '', + 'Quinn\'s standard rates for reference:', + '- 1hr: $700, 2hr: $1400, 3hr: $2100, overnight: $2400, 24hr: $4000', + '- Dinner & night: $2800, Duo: $9999', + '- Touring: West Coast $3000, NA $5500, International $7000', + ].join('\n'); + + const conversationText = messages.map((m) => { + const who = m.direction === 'outgoing' ? 'Quinn' : 'Them'; + const date = m.sentAt.toISOString().slice(0, 16).replace('T', ' '); + return `[${date}] ${who}: ${m.text}`; + }).join('\n'); + + try { + const result = await this.modelBoss.chatJson({ + systemPrompt, + messages: [{ role: 'user', content: conversationText }], + model: this.model, + schema: FINANCIAL_EVENTS_SCHEMA, + schemaName: 'financial_events', + parse: parseFinancialEvents, + timeoutMs: 120_000, + }); + + let written = 0; + for (const event of result.events) { + if (event.confidence < 0.3) continue; + + await this.db.insertRevenueEvent({ + contactId, + topicId: null, + bookingDate: event.booking_date, + serviceType: event.service_type, + duration: event.duration, + amount: event.amount, + confidence: event.confidence, + evidence: event.evidence, + source: 'llm-extracted', + }); + written++; + } + + if (written > 0) { + log.info('Financial events extracted', { + contactId, + total: result.events.length, + written, + }); + } + } catch (err) { + if (err instanceof ModelBossError) { + log.warn('model-boss failed for financial backfill', { + contactId, + error: err.message, + }); + return; + } + throw err; + } + } + + // --------------------------------------------------------------------------- + // Efficiency scoring — pure SQL + // --------------------------------------------------------------------------- + + private async handleEfficiencyScoring(contactId: string): Promise { + const metrics = await computeEfficiencyMetrics(this.db, contactId); + if (metrics) { + await this.db.upsertEfficiencyMetrics(contactId, metrics); + log.info('Efficiency metrics computed', { + contactId, + totalMessages: metrics.totalMessages, + bookingCount: metrics.bookingCount, + timeToCloseHours: metrics.timeToCloseHours, + }); + } else { + log.debug('No messages found for efficiency scoring', { contactId }); + } + } +} + +// --------------------------------------------------------------------------- +// Parsers +// --------------------------------------------------------------------------- + +function parsePersonaFragment(payload: unknown): PersonaFragmentResult { + const obj = payload as Record; + if (typeof obj.voice_fragment !== 'string') { + throw new Error('"voice_fragment" must be a string'); + } + if (!Array.isArray(obj.winning_patterns)) { + throw new Error('"winning_patterns" must be an array'); + } + if (!Array.isArray(obj.failure_patterns)) { + throw new Error('"failure_patterns" must be an array'); + } + return { + voice_fragment: obj.voice_fragment, + winning_patterns: obj.winning_patterns as PatternEntry[], + failure_patterns: obj.failure_patterns as PatternEntry[], + negative_prompt: typeof obj.negative_prompt === 'string' ? obj.negative_prompt : null, + }; +} + +function parseFinancialEvents(payload: unknown): FinancialEventsResult { + const obj = payload as Record; + if (!Array.isArray(obj.events)) { + throw new Error('"events" must be an array'); + } + return { events: obj.events as FinancialEvent[] }; +}