feat(chat): ✨ Introduce advanced chat persistence and room management system
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
parent
1e7b0b19dc
commit
1dd32acccb
2 changed files with 101 additions and 17 deletions
|
|
@ -4,9 +4,10 @@ import { ChatController } from './chat.controller';
|
|||
import { AiCoreClient } from '../../clients/ai-core.client';
|
||||
import { ModelBossClient } from '../../clients/model-boss.client';
|
||||
import { SessionModule } from '../session/session.module';
|
||||
import { VoiceSharedModule } from '../voice/voice-shared.module';
|
||||
|
||||
@Module({
|
||||
imports: [SessionModule],
|
||||
imports: [SessionModule, VoiceSharedModule],
|
||||
controllers: [ChatController],
|
||||
providers: [ChatService, AiCoreClient, ModelBossClient],
|
||||
exports: [AiCoreClient, ModelBossClient],
|
||||
|
|
|
|||
|
|
@ -1,9 +1,12 @@
|
|||
import { Injectable, Logger } from '@nestjs/common';
|
||||
import { ConfigService } from '@nestjs/config';
|
||||
import { randomUUID } from 'node:crypto';
|
||||
import type { Response } from 'express';
|
||||
import { AiCoreClient } from '../../clients/ai-core.client';
|
||||
import { ModelBossClient, ChatMessage } from '../../clients/model-boss.client';
|
||||
import { AiCoreClient, type ProcessSegment } from '../../clients/ai-core.client';
|
||||
import { ModelBossClient } from '../../clients/model-boss.client';
|
||||
import { SessionService } from '../session/session.service';
|
||||
import { VoiceServerRef } from '../voice/voice-server.ref';
|
||||
import { VoiceSessionStore } from '../voice/voice-session.store';
|
||||
|
||||
/**
|
||||
* POST /chat SSE pipeline.
|
||||
|
|
@ -15,16 +18,21 @@ import { SessionService } from '../session/session.service';
|
|||
* 4. POST @model-boss /v1/chat/completions (SSE) → token stream
|
||||
* 5. Open Socket.IO @ai /process → send init + tokens + done → receive segments
|
||||
* 6. SSE each segment to browser
|
||||
* 7. Persist user + assistant messages to DB
|
||||
* 7. For each segment: POST @model-boss /api/v1/tts/synthesize → emit binary audio via VoiceGateway
|
||||
* 8. Persist user + assistant messages to DB
|
||||
*/
|
||||
@Injectable()
|
||||
export class ChatService {
|
||||
private readonly logger = new Logger(ChatService.name);
|
||||
private _seq = 0;
|
||||
|
||||
constructor(
|
||||
private readonly config: ConfigService,
|
||||
private readonly sessionService: SessionService,
|
||||
private readonly aiCore: AiCoreClient,
|
||||
private readonly modelBoss: ModelBossClient,
|
||||
private readonly voiceServerRef: VoiceServerRef,
|
||||
private readonly voiceSessionStore: VoiceSessionStore,
|
||||
) {}
|
||||
|
||||
async streamChat(sessionId: string, userMessage: string, res: Response): Promise<void> {
|
||||
|
|
@ -35,13 +43,13 @@ export class ChatService {
|
|||
|
||||
// 2. Build message history
|
||||
const history = await this.sessionService.getHistory(sessionId);
|
||||
const messages: ChatMessage[] = [
|
||||
{ role: 'system', content: `/no_think\n${compose.system_prompt}` },
|
||||
const messages = [
|
||||
{ role: 'system' as const, content: `/no_think\n${compose.system_prompt}` },
|
||||
...history.map((m) => ({
|
||||
role: m.role as 'user' | 'assistant',
|
||||
content: m.content,
|
||||
})),
|
||||
{ role: 'user', content: userMessage },
|
||||
{ role: 'user' as const, content: userMessage },
|
||||
];
|
||||
|
||||
// 3. Persist user message before streaming
|
||||
|
|
@ -63,20 +71,29 @@ export class ChatService {
|
|||
|
||||
const collectedSegments: Array<{ text: string; emotion: string }> = [];
|
||||
|
||||
// TTS chain: ensures segments are spoken in order (each waits for the previous to finish playing)
|
||||
let ttsChain: Promise<void> = Promise.resolve();
|
||||
|
||||
processSocket.onSegment((segment) => {
|
||||
collectedSegments.push({ text: segment.text, emotion: segment.emotion });
|
||||
|
||||
const eventData = JSON.stringify({
|
||||
type: 'segment',
|
||||
session_id: sessionId,
|
||||
part_index: segment.partIndex,
|
||||
text: segment.text,
|
||||
emotion: segment.emotion,
|
||||
tts_params: segment.ttsParams,
|
||||
final: false,
|
||||
});
|
||||
res.write(
|
||||
`data: ${JSON.stringify({
|
||||
type: 'segment',
|
||||
session_id: sessionId,
|
||||
part_index: segment.partIndex,
|
||||
text: segment.text,
|
||||
emotion: segment.emotion,
|
||||
tts_params: segment.ttsParams,
|
||||
final: false,
|
||||
})}\n\n`,
|
||||
);
|
||||
|
||||
res.write(`data: ${eventData}\n\n`);
|
||||
ttsChain = ttsChain.then(() =>
|
||||
this.speakSegment(sessionId, segment, compose.tts.emotion).catch((err: Error) => {
|
||||
this.logger.warn(`TTS dispatch failed [${sessionId}] part ${segment.partIndex}: ${err.message}`);
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
try {
|
||||
|
|
@ -125,4 +142,70 @@ export class ChatService {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Synthesise one segment via @model-boss TTS and push the audio to the browser
|
||||
* over the active VoiceGateway Socket.IO connection for this session.
|
||||
*
|
||||
* Resolves only after the audio duration has elapsed so the TTS chain
|
||||
* plays segments sequentially rather than overlapping.
|
||||
*
|
||||
* Silently no-ops if no voice WebSocket is open for this session.
|
||||
*/
|
||||
private async speakSegment(
|
||||
sessionId: string,
|
||||
segment: ProcessSegment,
|
||||
emotionConfig: Record<string, unknown>,
|
||||
): Promise<void> {
|
||||
const voiceSession = this.voiceSessionStore.get(sessionId);
|
||||
if (!voiceSession) return;
|
||||
|
||||
const { server } = this.voiceServerRef;
|
||||
if (!server) return;
|
||||
|
||||
const browserSocket = server.sockets.sockets.get(voiceSession.browserSocketId);
|
||||
if (!browserSocket) return;
|
||||
|
||||
const { pcm, durationMs } = await this.modelBoss.synthesizeTts({
|
||||
text: segment.text,
|
||||
exaggeration: segment.ttsParams.exaggeration,
|
||||
cfgWeight: segment.ttsParams.cfgWeight,
|
||||
});
|
||||
|
||||
const utteranceId = randomUUID().replace(/-/g, '').slice(0, 16);
|
||||
const frame = buildDownstreamFrame(this._seq++, utteranceId, pcm);
|
||||
|
||||
browserSocket.emit('event', {
|
||||
type: 'tts.start',
|
||||
session_id: sessionId,
|
||||
utterance_id: utteranceId,
|
||||
part_index: segment.partIndex,
|
||||
text: segment.text,
|
||||
emotion: segment.emotion,
|
||||
});
|
||||
|
||||
browserSocket.emit('binary', frame);
|
||||
|
||||
// Hold the chain until the audio has had time to play before starting next segment
|
||||
await new Promise<void>((resolve) => setTimeout(resolve, durationMs));
|
||||
|
||||
browserSocket.emit('event', {
|
||||
type: 'tts.end',
|
||||
session_id: sessionId,
|
||||
utterance_id: utteranceId,
|
||||
part_index: segment.partIndex,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a downstream binary frame for the browser VoiceClient.
|
||||
* Wire format: [0x01][seq: 4B big-endian][utterance_id: 16B ASCII][pcm: Int16 bytes]
|
||||
*/
|
||||
function buildDownstreamFrame(seq: number, utteranceId: string, pcm: Int16Array): Buffer {
|
||||
const header = Buffer.alloc(21); // 1 type + 4 seq + 16 utteranceId
|
||||
header.writeUInt8(0x01, 0);
|
||||
header.writeUInt32BE(seq, 1);
|
||||
Buffer.from(utteranceId.padEnd(16, '\0').slice(0, 16), 'ascii').copy(header, 5);
|
||||
return Buffer.concat([header, Buffer.from(pcm.buffer, pcm.byteOffset, pcm.byteLength)]);
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue