424 lines
13 KiB
TypeScript
424 lines
13 KiB
TypeScript
import WebSocket from 'ws'
|
|
import { getDefaultAgentId } from '@/app/api/nova-config'
|
|
import { HTTPClient } from '@/http'
|
|
|
|
const novaClient = new HTTPClient({
|
|
baseURL: process.env.NOVA_BASE_URL,
|
|
headers: {
|
|
'Content-Type': 'application/json',
|
|
'Tenant-Id': process.env.NOVA_TENANT_ID!,
|
|
'Authorization': process.env.NOVA_ACCESS_KEY!,
|
|
},
|
|
})
|
|
|
|
interface BotArtifact {
|
|
path: string
|
|
fileName: string
|
|
taskId?: string
|
|
}
|
|
|
|
interface UserConnection {
|
|
ws: WebSocket
|
|
conversationId: string
|
|
lastActivity: number
|
|
idleTimer: ReturnType<typeof setTimeout> | null
|
|
heartbeatTimer: ReturnType<typeof setTimeout> | null
|
|
pendingResolve: ((text: string) => void) | null
|
|
pendingReject: ((error: Error) => void) | null
|
|
responseTexts: Map<string, string>
|
|
artifacts: BotArtifact[]
|
|
}
|
|
|
|
const connections = new Map<string, UserConnection>()
|
|
|
|
let totalMessages = 0
|
|
let totalResponseTime = 0
|
|
let responseCount = 0
|
|
|
|
const IDLE_TIMEOUT = 5 * 60 * 1000
|
|
const RESPONSE_TIMEOUT = 10 * 60 * 1000
|
|
const HEARTBEAT_INTERVAL = 20 * 1000
|
|
|
|
function buildWssUrl(conversationId: string): string {
|
|
const baseUrl = process.env.NOVA_BASE_URL!
|
|
const wssBase = baseUrl.replace('https://', 'wss://').replace('http://', 'ws://')
|
|
const authorization = process.env.NOVA_ACCESS_KEY
|
|
const tenantId = process.env.NOVA_TENANT_ID
|
|
return `${wssBase}/v1/super_agent/chat/completions?Authorization=${authorization}&X-Locale=zh&X-Region=CN&Tenant-Id=${tenantId}&conversation_id=${conversationId}`
|
|
}
|
|
|
|
function extractText(obj: unknown): string {
|
|
if (typeof obj === 'string') return obj
|
|
if (obj && typeof obj === 'object') {
|
|
const o = obj as Record<string, unknown>
|
|
if (typeof o.text === 'string') return o.text
|
|
if (o.content) return extractText(o.content)
|
|
}
|
|
return ''
|
|
}
|
|
|
|
interface DirFileItem {
|
|
file_name?: string
|
|
path?: string
|
|
}
|
|
|
|
const extractDirFileList = (payload: unknown): DirFileItem[] => {
|
|
if (Array.isArray(payload)) return payload
|
|
if (!payload || typeof payload !== 'object') return []
|
|
const obj = payload as Record<string, unknown>
|
|
if (Array.isArray(obj.data)) return obj.data as DirFileItem[]
|
|
return []
|
|
}
|
|
|
|
const resolvePathByName = (files: DirFileItem[], fileName?: string): string | null => {
|
|
if (!fileName) return null
|
|
const normalizedName = fileName.trim().startsWith('/upload/') ? fileName.slice('/upload/'.length) : fileName.trim()
|
|
if (!normalizedName) return null
|
|
|
|
const expectedSegment = `upload/${normalizedName}`
|
|
const matched = files.find(item => {
|
|
if (typeof item.file_name !== 'string' || !item.file_name) return false
|
|
return item.file_name.includes(expectedSegment)
|
|
})
|
|
|
|
return matched?.path || null
|
|
}
|
|
|
|
const toAbsoluteHttpUrl = (value: unknown): string | null => {
|
|
if (typeof value !== 'string' || !value) return null
|
|
if (value.startsWith('http://') || value.startsWith('https://')) return value
|
|
return null
|
|
}
|
|
|
|
async function resolveArtifactUrls(artifacts: BotArtifact[], defaultTaskId: string): Promise<{ fileName: string; url: string }[]> {
|
|
const results: { fileName: string; url: string }[] = []
|
|
|
|
// Create a Map with a composite key to keep uniqueness
|
|
const uniqueArtifacts = new Map<string, BotArtifact>()
|
|
for (const artifact of artifacts) {
|
|
const key = artifact.path || artifact.fileName
|
|
if (!uniqueArtifacts.has(key)) {
|
|
uniqueArtifacts.set(key, artifact)
|
|
}
|
|
}
|
|
|
|
for (const artifact of uniqueArtifacts.values()) {
|
|
try {
|
|
const taskId = artifact.taskId || defaultTaskId
|
|
let resolvedPath = artifact.path
|
|
|
|
if (taskId && resolvedPath) {
|
|
const dirFilesResponse = await novaClient.post<unknown>(
|
|
'/v1/super_agent/chat/get_dir_file',
|
|
{ task_id: taskId }
|
|
)
|
|
const dirFiles = extractDirFileList(dirFilesResponse)
|
|
const matchedPath = resolvePathByName(dirFiles, artifact.fileName)
|
|
if (matchedPath) {
|
|
resolvedPath = matchedPath
|
|
}
|
|
}
|
|
|
|
const response = await novaClient.post<string>('/v1/super_agent/chat/oss_url', {
|
|
file_path: resolvedPath,
|
|
task_id: taskId,
|
|
})
|
|
|
|
const signedUrl = toAbsoluteHttpUrl(response)
|
|
const fallback = toAbsoluteHttpUrl(artifact.path)
|
|
const finalUrl = signedUrl || fallback
|
|
|
|
if (finalUrl) {
|
|
results.push({ fileName: artifact.fileName, url: finalUrl })
|
|
}
|
|
} catch (err) {
|
|
console.error(`[Nova Bridge] 解析附件 URL 失败 (${artifact.fileName}):`, err)
|
|
}
|
|
}
|
|
|
|
return results
|
|
}
|
|
|
|
function startHeartbeat(conn: UserConnection): void {
|
|
stopHeartbeat(conn)
|
|
conn.heartbeatTimer = setInterval(() => {
|
|
if (conn.ws.readyState === WebSocket.OPEN) {
|
|
conn.ws.send(JSON.stringify({ message_type: 'ping' }))
|
|
}
|
|
}, HEARTBEAT_INTERVAL)
|
|
}
|
|
|
|
function stopHeartbeat(conn: UserConnection): void {
|
|
if (conn.heartbeatTimer) {
|
|
clearInterval(conn.heartbeatTimer)
|
|
conn.heartbeatTimer = null
|
|
}
|
|
}
|
|
|
|
function resetIdleTimer(userId: string, conn: UserConnection): void {
|
|
if (conn.idleTimer) {
|
|
clearTimeout(conn.idleTimer)
|
|
}
|
|
conn.idleTimer = setTimeout(() => {
|
|
closeConnection(userId)
|
|
}, IDLE_TIMEOUT)
|
|
}
|
|
|
|
function closeConnection(userId: string): void {
|
|
const conn = connections.get(userId)
|
|
if (!conn) return
|
|
stopHeartbeat(conn)
|
|
if (conn.idleTimer) clearTimeout(conn.idleTimer)
|
|
if (conn.ws.readyState === WebSocket.OPEN || conn.ws.readyState === WebSocket.CONNECTING) {
|
|
conn.ws.close(1000)
|
|
}
|
|
connections.delete(userId)
|
|
}
|
|
|
|
function createConnection(userId: string, conversationId: string): Promise<UserConnection> {
|
|
return new Promise((resolve, reject) => {
|
|
const wsUrl = buildWssUrl(conversationId)
|
|
const ws = new WebSocket(wsUrl)
|
|
|
|
const conn: UserConnection = {
|
|
ws,
|
|
conversationId,
|
|
lastActivity: Date.now(),
|
|
idleTimer: null,
|
|
heartbeatTimer: null,
|
|
pendingResolve: null,
|
|
pendingReject: null,
|
|
responseTexts: new Map(),
|
|
artifacts: [],
|
|
}
|
|
|
|
const connectTimeout = setTimeout(() => {
|
|
ws.close()
|
|
reject(new Error('WebSocket 连接超时'))
|
|
}, 10000)
|
|
|
|
ws.on('open', () => {
|
|
clearTimeout(connectTimeout)
|
|
ws.send(JSON.stringify({
|
|
message_type: 'switch_conversation',
|
|
conversation_id: conversationId,
|
|
}))
|
|
connections.set(userId, conn)
|
|
startHeartbeat(conn)
|
|
resetIdleTimer(userId, conn)
|
|
resolve(conn)
|
|
})
|
|
|
|
ws.on('message', async (raw: Buffer) => {
|
|
const data = raw.toString()
|
|
try {
|
|
const parsed = JSON.parse(data)
|
|
|
|
if (parsed?.data?.message_type === 'pong' || parsed?.message_type === 'pong') {
|
|
return
|
|
}
|
|
|
|
const event = parsed?.data
|
|
if (!event) return
|
|
|
|
if (event.role === 'assistant' && event.event_type !== 'tool_call') {
|
|
const text = extractText(event.content)
|
|
if (text && event.event_id) {
|
|
conn.responseTexts.set(event.event_id, text)
|
|
}
|
|
|
|
// 提取附件
|
|
if (event.content && typeof event.content === 'object') {
|
|
const content = event.content as Record<string, any>
|
|
const taskId = event.task_id as string | undefined
|
|
|
|
if (content.attachments) {
|
|
const arr = Array.isArray(content.attachments) ? content.attachments : [content.attachments]
|
|
arr.forEach((att: any) => {
|
|
if (att?.file_name) {
|
|
conn.artifacts.push({ path: att.path || att.file_url || att.file_id || '', fileName: att.file_name, taskId })
|
|
}
|
|
})
|
|
}
|
|
if (content.attachment_files && Array.isArray(content.attachment_files)) {
|
|
content.attachment_files.forEach((file: any) => {
|
|
if (file?.file_name) {
|
|
conn.artifacts.push({ path: file.path || file.url || '', fileName: file.file_name, taskId })
|
|
}
|
|
})
|
|
}
|
|
if (content.files && Array.isArray(content.files)) {
|
|
content.files.forEach((file: any) => {
|
|
const fileName = file.name || file.file_name
|
|
if (fileName) {
|
|
conn.artifacts.push({ path: file.path || file.url || file.id || '', fileName, taskId })
|
|
}
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
const taskStatus = (event.task_status ?? event.event_status) as string | undefined
|
|
if (taskStatus === 'success' || taskStatus === 'completed') {
|
|
if (conn.pendingResolve) {
|
|
let fullText = Array.from(conn.responseTexts.values()).join('\n')
|
|
|
|
// 拼接附件 URL 到 Markdown 中
|
|
if (conn.artifacts.length > 0) {
|
|
const resolvedUrls = await resolveArtifactUrls(conn.artifacts, conn.conversationId)
|
|
const imageExts = new Set(['png', 'jpg', 'jpeg', 'gif', 'webp', 'svg'])
|
|
|
|
const attachmentsText = resolvedUrls.map(att => {
|
|
const ext = att.fileName.split('.').pop()?.toLowerCase() || ''
|
|
if (imageExts.has(ext)) {
|
|
// 图片直接渲染 Markdown 图片语法
|
|
return ``
|
|
} else {
|
|
// 文件渲染为普通链接
|
|
return `[${att.fileName}](${att.url})`
|
|
}
|
|
}).join('\n\n')
|
|
|
|
if (attachmentsText) {
|
|
fullText = fullText ? `${fullText}\n\n${attachmentsText}` : attachmentsText
|
|
}
|
|
}
|
|
|
|
conn.pendingResolve(fullText)
|
|
conn.pendingResolve = null
|
|
conn.pendingReject = null
|
|
conn.responseTexts.clear()
|
|
conn.artifacts = []
|
|
}
|
|
} else if (taskStatus === 'failed' || taskStatus === 'error') {
|
|
if (conn.pendingReject) {
|
|
conn.pendingReject(new Error('Nova 任务失败'))
|
|
conn.pendingResolve = null
|
|
conn.pendingReject = null
|
|
conn.responseTexts.clear()
|
|
conn.artifacts = []
|
|
}
|
|
}
|
|
} catch {
|
|
// 忽略解析失败的消息
|
|
}
|
|
})
|
|
|
|
ws.on('error', (err) => {
|
|
clearTimeout(connectTimeout)
|
|
console.error(`[Nova Bridge] WebSocket 错误 (用户: ${userId}):`, err.message)
|
|
if (conn.pendingReject) {
|
|
conn.pendingReject(new Error('WebSocket 连接错误'))
|
|
conn.pendingResolve = null
|
|
conn.pendingReject = null
|
|
}
|
|
})
|
|
|
|
ws.on('close', () => {
|
|
stopHeartbeat(conn)
|
|
connections.delete(userId)
|
|
if (conn.pendingReject) {
|
|
conn.pendingReject(new Error('WebSocket 连接关闭'))
|
|
conn.pendingResolve = null
|
|
conn.pendingReject = null
|
|
}
|
|
})
|
|
})
|
|
}
|
|
|
|
async function getConnection(userId: string, conversationId: string): Promise<UserConnection> {
|
|
const existing = connections.get(userId)
|
|
if (existing && existing.ws.readyState === WebSocket.OPEN) {
|
|
existing.lastActivity = Date.now()
|
|
resetIdleTimer(userId, existing)
|
|
return existing
|
|
}
|
|
if (existing) {
|
|
closeConnection(userId)
|
|
}
|
|
return createConnection(userId, conversationId)
|
|
}
|
|
|
|
export async function sendMessageAndWaitResponse(
|
|
userId: string,
|
|
conversationId: string,
|
|
content: string,
|
|
uploadFileIds?: string[],
|
|
): Promise<string> {
|
|
const conn = await getConnection(userId, conversationId)
|
|
|
|
conn.responseTexts.clear()
|
|
conn.artifacts = []
|
|
totalMessages++
|
|
const sendTime = Date.now()
|
|
|
|
const agentId = getDefaultAgentId()
|
|
|
|
return new Promise((resolve, reject) => {
|
|
const timeout = setTimeout(() => {
|
|
if (conn.pendingReject) {
|
|
conn.pendingReject(new Error('响应超时'))
|
|
conn.pendingResolve = null
|
|
conn.pendingReject = null
|
|
conn.responseTexts.clear()
|
|
}
|
|
}, RESPONSE_TIMEOUT)
|
|
|
|
conn.pendingResolve = (text: string) => {
|
|
clearTimeout(timeout)
|
|
totalResponseTime += Date.now() - sendTime
|
|
responseCount++
|
|
resolve(text)
|
|
}
|
|
conn.pendingReject = (err: Error) => {
|
|
clearTimeout(timeout)
|
|
reject(err)
|
|
}
|
|
|
|
const message = JSON.stringify({
|
|
message_type: 'chat',
|
|
conversation_id: conversationId,
|
|
agent_id: agentId,
|
|
content,
|
|
upload_file_ids: uploadFileIds,
|
|
})
|
|
|
|
try {
|
|
conn.ws.send(message)
|
|
} catch (err) {
|
|
clearTimeout(timeout)
|
|
conn.pendingResolve = null
|
|
conn.pendingReject = null
|
|
reject(err)
|
|
}
|
|
})
|
|
}
|
|
|
|
export function closeAllConnections(): void {
|
|
for (const userId of connections.keys()) {
|
|
closeConnection(userId)
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Close only connections whose key starts with `${platform}:`.
|
|
* Each bot should call this in stopBot() instead of closeAllConnections()
|
|
* so that stopping one bot doesn't break other bots' connections.
|
|
*/
|
|
export function closeConnectionsForPlatform(platform: string): void {
|
|
const prefix = `${platform}:`
|
|
for (const key of connections.keys()) {
|
|
if (key.startsWith(prefix)) {
|
|
closeConnection(key)
|
|
}
|
|
}
|
|
}
|
|
|
|
export function getStats() {
|
|
return {
|
|
activeConnections: connections.size,
|
|
totalMessages,
|
|
averageResponseTime: responseCount > 0 ? Math.round(totalResponseTime / responseCount) : 0,
|
|
}
|
|
}
|