import WebSocket from 'ws' import { getDefaultAgentId } from '@/app/api/nova-config' import { HTTPClient } from '@/http' const novaClient = new HTTPClient({ baseURL: process.env.NOVA_BASE_URL, headers: { 'Content-Type': 'application/json', 'Tenant-Id': process.env.NOVA_TENANT_ID!, 'Authorization': process.env.NOVA_ACCESS_KEY!, }, }) interface BotArtifact { path: string fileName: string taskId?: string } interface UserConnection { ws: WebSocket conversationId: string lastActivity: number idleTimer: ReturnType | null heartbeatTimer: ReturnType | null pendingResolve: ((text: string) => void) | null pendingReject: ((error: Error) => void) | null responseTexts: Map artifacts: BotArtifact[] } const connections = new Map() let totalMessages = 0 let totalResponseTime = 0 let responseCount = 0 const IDLE_TIMEOUT = 5 * 60 * 1000 const RESPONSE_TIMEOUT = 10 * 60 * 1000 const HEARTBEAT_INTERVAL = 20 * 1000 function buildWssUrl(conversationId: string): string { const baseUrl = process.env.NOVA_BASE_URL! const wssBase = baseUrl.replace('https://', 'wss://').replace('http://', 'ws://') const authorization = process.env.NOVA_ACCESS_KEY const tenantId = process.env.NOVA_TENANT_ID return `${wssBase}/v1/super_agent/chat/completions?Authorization=${authorization}&X-Locale=zh&X-Region=CN&Tenant-Id=${tenantId}&conversation_id=${conversationId}` } function extractText(obj: unknown): string { if (typeof obj === 'string') return obj if (obj && typeof obj === 'object') { const o = obj as Record if (typeof o.text === 'string') return o.text if (o.content) return extractText(o.content) } return '' } interface DirFileItem { file_name?: string path?: string } const extractDirFileList = (payload: unknown): DirFileItem[] => { if (Array.isArray(payload)) return payload if (!payload || typeof payload !== 'object') return [] const obj = payload as Record if (Array.isArray(obj.data)) return obj.data as DirFileItem[] return [] } const resolvePathByName = (files: DirFileItem[], fileName?: string): string | null => { if (!fileName) return null const normalizedName = fileName.trim().startsWith('/upload/') ? fileName.slice('/upload/'.length) : fileName.trim() if (!normalizedName) return null const expectedSegment = `upload/${normalizedName}` const matched = files.find(item => { if (typeof item.file_name !== 'string' || !item.file_name) return false return item.file_name.includes(expectedSegment) }) return matched?.path || null } const toAbsoluteHttpUrl = (value: unknown): string | null => { if (typeof value !== 'string' || !value) return null if (value.startsWith('http://') || value.startsWith('https://')) return value return null } async function resolveArtifactUrls(artifacts: BotArtifact[], defaultTaskId: string): Promise<{ fileName: string; url: string }[]> { const results: { fileName: string; url: string }[] = [] // Create a Map with a composite key to keep uniqueness const uniqueArtifacts = new Map() for (const artifact of artifacts) { const key = artifact.path || artifact.fileName if (!uniqueArtifacts.has(key)) { uniqueArtifacts.set(key, artifact) } } for (const artifact of uniqueArtifacts.values()) { try { const taskId = artifact.taskId || defaultTaskId let resolvedPath = artifact.path if (taskId && resolvedPath) { const dirFilesResponse = await novaClient.post( '/v1/super_agent/chat/get_dir_file', { task_id: taskId } ) const dirFiles = extractDirFileList(dirFilesResponse) const matchedPath = resolvePathByName(dirFiles, artifact.fileName) if (matchedPath) { resolvedPath = matchedPath } } const response = await novaClient.post('/v1/super_agent/chat/oss_url', { file_path: resolvedPath, task_id: taskId, }) const signedUrl = toAbsoluteHttpUrl(response) const fallback = toAbsoluteHttpUrl(artifact.path) const finalUrl = signedUrl || fallback if (finalUrl) { results.push({ fileName: artifact.fileName, url: finalUrl }) } } catch (err) { console.error(`[Nova Bridge] 解析附件 URL 失败 (${artifact.fileName}):`, err) } } return results } function startHeartbeat(conn: UserConnection): void { stopHeartbeat(conn) conn.heartbeatTimer = setInterval(() => { if (conn.ws.readyState === WebSocket.OPEN) { conn.ws.send(JSON.stringify({ message_type: 'ping' })) } }, HEARTBEAT_INTERVAL) } function stopHeartbeat(conn: UserConnection): void { if (conn.heartbeatTimer) { clearInterval(conn.heartbeatTimer) conn.heartbeatTimer = null } } function resetIdleTimer(userId: string, conn: UserConnection): void { if (conn.idleTimer) { clearTimeout(conn.idleTimer) } conn.idleTimer = setTimeout(() => { closeConnection(userId) }, IDLE_TIMEOUT) } function closeConnection(userId: string): void { const conn = connections.get(userId) if (!conn) return stopHeartbeat(conn) if (conn.idleTimer) clearTimeout(conn.idleTimer) if (conn.ws.readyState === WebSocket.OPEN || conn.ws.readyState === WebSocket.CONNECTING) { conn.ws.close(1000) } connections.delete(userId) } function createConnection(userId: string, conversationId: string): Promise { return new Promise((resolve, reject) => { const wsUrl = buildWssUrl(conversationId) const ws = new WebSocket(wsUrl) const conn: UserConnection = { ws, conversationId, lastActivity: Date.now(), idleTimer: null, heartbeatTimer: null, pendingResolve: null, pendingReject: null, responseTexts: new Map(), artifacts: [], } const connectTimeout = setTimeout(() => { ws.close() reject(new Error('WebSocket 连接超时')) }, 10000) ws.on('open', () => { clearTimeout(connectTimeout) ws.send(JSON.stringify({ message_type: 'switch_conversation', conversation_id: conversationId, })) connections.set(userId, conn) startHeartbeat(conn) resetIdleTimer(userId, conn) resolve(conn) }) ws.on('message', async (raw: Buffer) => { const data = raw.toString() try { const parsed = JSON.parse(data) if (parsed?.data?.message_type === 'pong' || parsed?.message_type === 'pong') { return } const event = parsed?.data if (!event) return if (event.role === 'assistant' && event.event_type !== 'tool_call') { const text = extractText(event.content) if (text && event.event_id) { conn.responseTexts.set(event.event_id, text) } // 提取附件 if (event.content && typeof event.content === 'object') { const content = event.content as Record const taskId = event.task_id as string | undefined if (content.attachments) { const arr = Array.isArray(content.attachments) ? content.attachments : [content.attachments] arr.forEach((att: any) => { if (att?.file_name) { conn.artifacts.push({ path: att.path || att.file_url || att.file_id || '', fileName: att.file_name, taskId }) } }) } if (content.attachment_files && Array.isArray(content.attachment_files)) { content.attachment_files.forEach((file: any) => { if (file?.file_name) { conn.artifacts.push({ path: file.path || file.url || '', fileName: file.file_name, taskId }) } }) } if (content.files && Array.isArray(content.files)) { content.files.forEach((file: any) => { const fileName = file.name || file.file_name if (fileName) { conn.artifacts.push({ path: file.path || file.url || file.id || '', fileName, taskId }) } }) } } } const taskStatus = (event.task_status ?? event.event_status) as string | undefined if (taskStatus === 'success' || taskStatus === 'completed') { if (conn.pendingResolve) { let fullText = Array.from(conn.responseTexts.values()).join('\n') // 拼接附件 URL 到 Markdown 中 if (conn.artifacts.length > 0) { const resolvedUrls = await resolveArtifactUrls(conn.artifacts, conn.conversationId) const imageExts = new Set(['png', 'jpg', 'jpeg', 'gif', 'webp', 'svg']) const attachmentsText = resolvedUrls.map(att => { const ext = att.fileName.split('.').pop()?.toLowerCase() || '' if (imageExts.has(ext)) { // 图片直接渲染 Markdown 图片语法 return `![${att.fileName}](${att.url})` } else { // 文件渲染为普通链接 return `[${att.fileName}](${att.url})` } }).join('\n\n') if (attachmentsText) { fullText = fullText ? `${fullText}\n\n${attachmentsText}` : attachmentsText } } conn.pendingResolve(fullText) conn.pendingResolve = null conn.pendingReject = null conn.responseTexts.clear() conn.artifacts = [] } } else if (taskStatus === 'failed' || taskStatus === 'error') { if (conn.pendingReject) { conn.pendingReject(new Error('Nova 任务失败')) conn.pendingResolve = null conn.pendingReject = null conn.responseTexts.clear() conn.artifacts = [] } } } catch { // 忽略解析失败的消息 } }) ws.on('error', (err) => { clearTimeout(connectTimeout) console.error(`[Nova Bridge] WebSocket 错误 (用户: ${userId}):`, err.message) if (conn.pendingReject) { conn.pendingReject(new Error('WebSocket 连接错误')) conn.pendingResolve = null conn.pendingReject = null } }) ws.on('close', () => { stopHeartbeat(conn) connections.delete(userId) if (conn.pendingReject) { conn.pendingReject(new Error('WebSocket 连接关闭')) conn.pendingResolve = null conn.pendingReject = null } }) }) } async function getConnection(userId: string, conversationId: string): Promise { const existing = connections.get(userId) if (existing && existing.ws.readyState === WebSocket.OPEN) { existing.lastActivity = Date.now() resetIdleTimer(userId, existing) return existing } if (existing) { closeConnection(userId) } return createConnection(userId, conversationId) } export async function sendMessageAndWaitResponse( userId: string, conversationId: string, content: string, uploadFileIds?: string[], ): Promise { const conn = await getConnection(userId, conversationId) conn.responseTexts.clear() conn.artifacts = [] totalMessages++ const sendTime = Date.now() const agentId = getDefaultAgentId() return new Promise((resolve, reject) => { const timeout = setTimeout(() => { if (conn.pendingReject) { conn.pendingReject(new Error('响应超时')) conn.pendingResolve = null conn.pendingReject = null conn.responseTexts.clear() } }, RESPONSE_TIMEOUT) conn.pendingResolve = (text: string) => { clearTimeout(timeout) totalResponseTime += Date.now() - sendTime responseCount++ resolve(text) } conn.pendingReject = (err: Error) => { clearTimeout(timeout) reject(err) } const message = JSON.stringify({ message_type: 'chat', conversation_id: conversationId, agent_id: agentId, content, upload_file_ids: uploadFileIds, }) try { conn.ws.send(message) } catch (err) { clearTimeout(timeout) conn.pendingResolve = null conn.pendingReject = null reject(err) } }) } export function closeAllConnections(): void { for (const userId of connections.keys()) { closeConnection(userId) } } /** * Close only connections whose key starts with `${platform}:`. * Each bot should call this in stopBot() instead of closeAllConnections() * so that stopping one bot doesn't break other bots' connections. */ export function closeConnectionsForPlatform(platform: string): void { const prefix = `${platform}:` for (const key of connections.keys()) { if (key.startsWith(prefix)) { closeConnection(key) } } } export function getStats() { return { activeConnections: connections.size, totalMessages, averageResponseTime: responseCount > 0 ? Math.round(totalResponseTime / responseCount) : 0, } }