diff --git a/@applications/api/src/modules/chat/chat.module.ts b/@applications/api/src/modules/chat/chat.module.ts index b1aab74..42c6224 100644 --- a/@applications/api/src/modules/chat/chat.module.ts +++ b/@applications/api/src/modules/chat/chat.module.ts @@ -4,9 +4,10 @@ import { ChatController } from './chat.controller'; import { AiCoreClient } from '../../clients/ai-core.client'; import { ModelBossClient } from '../../clients/model-boss.client'; import { SessionModule } from '../session/session.module'; +import { VoiceSharedModule } from '../voice/voice-shared.module'; @Module({ - imports: [SessionModule], + imports: [SessionModule, VoiceSharedModule], controllers: [ChatController], providers: [ChatService, AiCoreClient, ModelBossClient], exports: [AiCoreClient, ModelBossClient], diff --git a/@applications/api/src/modules/chat/chat.service.ts b/@applications/api/src/modules/chat/chat.service.ts index bcfc630..b9c8ca3 100644 --- a/@applications/api/src/modules/chat/chat.service.ts +++ b/@applications/api/src/modules/chat/chat.service.ts @@ -1,9 +1,12 @@ import { Injectable, Logger } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; +import { randomUUID } from 'node:crypto'; import type { Response } from 'express'; -import { AiCoreClient } from '../../clients/ai-core.client'; -import { ModelBossClient, ChatMessage } from '../../clients/model-boss.client'; +import { AiCoreClient, type ProcessSegment } from '../../clients/ai-core.client'; +import { ModelBossClient } from '../../clients/model-boss.client'; import { SessionService } from '../session/session.service'; +import { VoiceServerRef } from '../voice/voice-server.ref'; +import { VoiceSessionStore } from '../voice/voice-session.store'; /** * POST /chat SSE pipeline. @@ -15,16 +18,21 @@ import { SessionService } from '../session/session.service'; * 4. POST @model-boss /v1/chat/completions (SSE) → token stream * 5. Open Socket.IO @ai /process → send init + tokens + done → receive segments * 6. SSE each segment to browser - * 7. Persist user + assistant messages to DB + * 7. For each segment: POST @model-boss /api/v1/tts/synthesize → emit binary audio via VoiceGateway + * 8. Persist user + assistant messages to DB */ @Injectable() export class ChatService { private readonly logger = new Logger(ChatService.name); + private _seq = 0; + constructor( private readonly config: ConfigService, private readonly sessionService: SessionService, private readonly aiCore: AiCoreClient, private readonly modelBoss: ModelBossClient, + private readonly voiceServerRef: VoiceServerRef, + private readonly voiceSessionStore: VoiceSessionStore, ) {} async streamChat(sessionId: string, userMessage: string, res: Response): Promise { @@ -35,13 +43,13 @@ export class ChatService { // 2. Build message history const history = await this.sessionService.getHistory(sessionId); - const messages: ChatMessage[] = [ - { role: 'system', content: `/no_think\n${compose.system_prompt}` }, + const messages = [ + { role: 'system' as const, content: `/no_think\n${compose.system_prompt}` }, ...history.map((m) => ({ role: m.role as 'user' | 'assistant', content: m.content, })), - { role: 'user', content: userMessage }, + { role: 'user' as const, content: userMessage }, ]; // 3. Persist user message before streaming @@ -63,20 +71,29 @@ export class ChatService { const collectedSegments: Array<{ text: string; emotion: string }> = []; + // TTS chain: ensures segments are spoken in order (each waits for the previous to finish playing) + let ttsChain: Promise = Promise.resolve(); + processSocket.onSegment((segment) => { collectedSegments.push({ text: segment.text, emotion: segment.emotion }); - const eventData = JSON.stringify({ - type: 'segment', - session_id: sessionId, - part_index: segment.partIndex, - text: segment.text, - emotion: segment.emotion, - tts_params: segment.ttsParams, - final: false, - }); + res.write( + `data: ${JSON.stringify({ + type: 'segment', + session_id: sessionId, + part_index: segment.partIndex, + text: segment.text, + emotion: segment.emotion, + tts_params: segment.ttsParams, + final: false, + })}\n\n`, + ); - res.write(`data: ${eventData}\n\n`); + ttsChain = ttsChain.then(() => + this.speakSegment(sessionId, segment, compose.tts.emotion).catch((err: Error) => { + this.logger.warn(`TTS dispatch failed [${sessionId}] part ${segment.partIndex}: ${err.message}`); + }), + ); }); try { @@ -125,4 +142,70 @@ export class ChatService { } } } + + /** + * Synthesise one segment via @model-boss TTS and push the audio to the browser + * over the active VoiceGateway Socket.IO connection for this session. + * + * Resolves only after the audio duration has elapsed so the TTS chain + * plays segments sequentially rather than overlapping. + * + * Silently no-ops if no voice WebSocket is open for this session. + */ + private async speakSegment( + sessionId: string, + segment: ProcessSegment, + emotionConfig: Record, + ): Promise { + const voiceSession = this.voiceSessionStore.get(sessionId); + if (!voiceSession) return; + + const { server } = this.voiceServerRef; + if (!server) return; + + const browserSocket = server.sockets.sockets.get(voiceSession.browserSocketId); + if (!browserSocket) return; + + const { pcm, durationMs } = await this.modelBoss.synthesizeTts({ + text: segment.text, + exaggeration: segment.ttsParams.exaggeration, + cfgWeight: segment.ttsParams.cfgWeight, + }); + + const utteranceId = randomUUID().replace(/-/g, '').slice(0, 16); + const frame = buildDownstreamFrame(this._seq++, utteranceId, pcm); + + browserSocket.emit('event', { + type: 'tts.start', + session_id: sessionId, + utterance_id: utteranceId, + part_index: segment.partIndex, + text: segment.text, + emotion: segment.emotion, + }); + + browserSocket.emit('binary', frame); + + // Hold the chain until the audio has had time to play before starting next segment + await new Promise((resolve) => setTimeout(resolve, durationMs)); + + browserSocket.emit('event', { + type: 'tts.end', + session_id: sessionId, + utterance_id: utteranceId, + part_index: segment.partIndex, + }); + } +} + +/** + * Build a downstream binary frame for the browser VoiceClient. + * Wire format: [0x01][seq: 4B big-endian][utterance_id: 16B ASCII][pcm: Int16 bytes] + */ +function buildDownstreamFrame(seq: number, utteranceId: string, pcm: Int16Array): Buffer { + const header = Buffer.alloc(21); // 1 type + 4 seq + 16 utteranceId + header.writeUInt8(0x01, 0); + header.writeUInt32BE(seq, 1); + Buffer.from(utteranceId.padEnd(16, '\0').slice(0, 16), 'ascii').copy(header, 5); + return Buffer.concat([header, Buffer.from(pcm.buffer, pcm.byteOffset, pcm.byteLength)]); }