/** * WebSocket 客户端封装 * * 提供自动重连、心跳检测、网络状态监听等功能 */ function latest(value: T) { const ref = { current: value } return ref } export const ReadyState = { Connecting: 0, Open: 1, Closing: 2, Closed: 3, } as const export type ReadyState = (typeof ReadyState)[keyof typeof ReadyState] export interface Result { sendMessage: WebSocket['send'] disconnect: () => void connect: () => void readyState: ReadyState webSocketIns?: WebSocket clearHeartbeat: () => void switchConversation: (conversationId: string) => void cleanup: () => void } export interface HeartbeatOptions { /** 心跳间隔,默认 20000ms */ heartbeatInterval?: number /** 心跳超时,默认 22000ms */ heartbeatTimeout?: number /** 心跳消息,默认 { message_type: 'ping' } */ heartbeatMessage?: string | object | (() => string | object) /** 心跳响应类型,默认 'pong' */ heartbeatResponseType?: string } export interface ApiEvent { event_id: string event_type?: string role?: 'user' | 'assistant' | 'system' content?: { text?: string content?: string [key: string]: unknown } created_at?: string task_id?: string is_display?: boolean metadata?: Record stream?: boolean event_status?: string [key: string]: unknown } // WebSocket 事件类型定义 type WSOpenEvent = Event interface WSCloseEvent { code?: number reason?: string wasClean?: boolean } interface WSMessageEvent { data: string | ArrayBuffer | Blob } type WSErrorEvent = Event export interface Options { /** 重连次数限制,默认 3 */ reconnectLimit?: number /** 重连间隔,默认 3000ms */ reconnectInterval?: number /** 是否手动连接,默认 false */ manual?: boolean /** 连接成功回调 */ onOpen?: (event: WSOpenEvent, instance: WebSocket) => void /** 连接关闭回调 */ onClose?: (event: WSCloseEvent, instance: WebSocket) => void /** 收到消息回调 */ onMessage?: (message: ApiEvent, instance: WebSocket) => void /** 连接错误回调 */ onError?: (event: WSErrorEvent, instance: WebSocket) => void /** WebSocket 协议 */ protocols?: string | string[] /** 心跳配置,传 false 禁用心跳 */ heartbeat?: HeartbeatOptions | boolean /** 是否监听网络状态变化,默认 true */ enableNetworkListener?: boolean /** 是否监听文档可见性变化,默认 true */ enableVisibilityListener?: boolean /** 重连延迟,默认 300ms */ reconnectDelay?: number /** 重连防抖时间,默认 1000ms */ reconnectDebounce?: number /** 获取认证 Token */ getToken?: () => string | undefined /** 获取租户 ID */ getTenantId?: () => string | undefined } /** * 创建 WebSocket 客户端 */ export function createWebSocketClient( socketUrl: string, options: Options = {}, ): Result { const { reconnectLimit = 3, reconnectInterval = 3 * 1000, manual = false, onOpen, onClose, onMessage, onError, protocols, heartbeat: enableHeartbeat = true, enableNetworkListener = true, enableVisibilityListener = true, reconnectDelay = 300, reconnectDebounce = 1000, getToken, getTenantId, } = options const heartbeatOptions: HeartbeatOptions = typeof enableHeartbeat === 'object' ? enableHeartbeat : {} const { heartbeatInterval = 20000, heartbeatTimeout = 22000, heartbeatMessage = { message_type: 'ping' }, heartbeatResponseType = 'pong', } = heartbeatOptions // 提前声明函数,避免使用前未定义 let disconnectFn: () => void = () => { throw new Error('disconnectFn not initialized') } let connectWsFn: () => void = () => { throw new Error('connectWsFn not initialized') } const onOpenRef = latest(onOpen) const onCloseRef = latest(onClose) const onMessageRef = latest(onMessage) const onErrorRef = latest(onError) // 确保 ref 始终指向最新的回调 if (onMessage) { onMessageRef.current = onMessage } const reconnectTimesRef = latest(0) const reconnectTimerRef = latest | undefined>(undefined) const websocketRef = latest(undefined) const readyStateRef = latest(ReadyState.Closed) // 心跳相关 const heartbeatTimerRef = latest | undefined>(undefined) const heartbeatTimeoutTimerRef = latest | undefined>(undefined) const waitingForPongRef = latest(false) // 网络和可见性状态 const isOnlineRef = latest(typeof navigator !== 'undefined' ? navigator.onLine : true) const isVisibleRef = latest(typeof document !== 'undefined' ? !document.hidden : true) // 重连防抖定时器 const reconnectDebounceTimerRef = latest | undefined>(undefined) // 更新 readyState 的辅助函数 const setReadyState = (state: ReadyState) => { readyStateRef.current = state } // 获取当前 readyState const getReadyState = (): ReadyState => { if (websocketRef.current) { const wsState = websocketRef.current.readyState if (wsState === WebSocket.CONNECTING) return ReadyState.Connecting if (wsState === WebSocket.OPEN) return ReadyState.Open if (wsState === WebSocket.CLOSING) return ReadyState.Closing if (wsState === WebSocket.CLOSED) return ReadyState.Closed } return readyStateRef.current } // 清除心跳相关定时器 const clearHeartbeat = () => { if (heartbeatTimerRef.current) { clearTimeout(heartbeatTimerRef.current) heartbeatTimerRef.current = undefined } if (heartbeatTimeoutTimerRef.current) { clearTimeout(heartbeatTimeoutTimerRef.current) heartbeatTimeoutTimerRef.current = undefined } waitingForPongRef.current = false } // 处理心跳超时 const handlePingTimeout = () => { if (!isOnlineRef.current) { waitingForPongRef.current = false clearHeartbeat() return } waitingForPongRef.current = false clearHeartbeat() disconnectFn() } // 发送心跳 const sendHeartbeat = () => { if (!isOnlineRef.current) { return } if (waitingForPongRef.current) { return } if (websocketRef.current && getReadyState() === ReadyState.Open) { try { const message = typeof heartbeatMessage === 'function' ? heartbeatMessage() : heartbeatMessage websocketRef.current.send( typeof message === 'string' ? message : JSON.stringify(message), ) waitingForPongRef.current = true heartbeatTimeoutTimerRef.current = setTimeout( handlePingTimeout, heartbeatTimeout, ) } catch { clearHeartbeat() } } else { clearHeartbeat() } } // 处理心跳响应 const handlePongReceived = () => { if (!waitingForPongRef.current) { return } waitingForPongRef.current = false if (heartbeatTimeoutTimerRef.current) { clearTimeout(heartbeatTimeoutTimerRef.current) heartbeatTimeoutTimerRef.current = undefined } heartbeatTimerRef.current = setTimeout(sendHeartbeat, heartbeatInterval) } // 启动心跳 const startHeartbeat = () => { if (!enableHeartbeat) return clearHeartbeat() heartbeatTimerRef.current = setTimeout(sendHeartbeat, 1000) } // 处理原始消息,检查是否是心跳响应 const handleRawMessage = (messageData: string): boolean => { try { const rawMessage = JSON.parse(messageData) if ( rawMessage.data?.message_type === heartbeatResponseType || rawMessage.message_type === heartbeatResponseType ) { handlePongReceived() return true } return false } catch { return false } } const reconnect = () => { if ( reconnectTimesRef.current < reconnectLimit && getReadyState() !== ReadyState.Open ) { if (reconnectTimerRef.current) { clearTimeout(reconnectTimerRef.current) } reconnectTimerRef.current = setTimeout(() => { connectWsFn() reconnectTimesRef.current++ }, reconnectInterval) } } connectWsFn = () => { if (reconnectTimerRef.current) { clearTimeout(reconnectTimerRef.current) } if (websocketRef.current) { websocketRef.current.close() } // 构建 WebSocket URL const url = new URL(socketUrl) // 添加认证信息 const token = getToken?.() const tenantId = getTenantId?.() if (token) { url.searchParams.set('Authorization', token) } if (tenantId) { url.searchParams.set('Tenant-Id', tenantId) } const ws = new WebSocket(url.toString(), protocols) setReadyState(ReadyState.Connecting) ws.onerror = event => { if (websocketRef.current !== ws) { return } reconnect() onErrorRef.current?.(event, ws) setReadyState(ReadyState.Closed) } ws.onopen = event => { if (websocketRef.current !== ws) { return } onOpenRef.current?.(event, ws) reconnectTimesRef.current = 0 setReadyState(ReadyState.Open) startHeartbeat() } ws.onmessage = (message: WSMessageEvent) => { if (websocketRef.current !== ws) { return } const messageData = typeof message.data === 'string' ? message.data : String(message.data) // 先检查是否是心跳响应 if (enableHeartbeat && handleRawMessage(messageData)) { return } // 解析消息并触发回调 try { const parsedMessage: ApiEvent = JSON.parse(messageData) onMessageRef.current?.(parsedMessage, ws) } catch { // 如果解析失败,尝试作为原始数据传递 console.warn('[WebSocket] Failed to parse message:', messageData) } } ws.onclose = event => { onCloseRef.current?.(event, ws) clearHeartbeat() // closed by server if (websocketRef.current === ws) { reconnect() } // closed by disconnect or closed by server if (!websocketRef.current || websocketRef.current === ws) { setReadyState(ReadyState.Closed) } } websocketRef.current = ws } const sendMessage: WebSocket['send'] = message => { const currentState = getReadyState() if (currentState === ReadyState.Open) { websocketRef.current?.send(message) } else { throw new Error('WebSocket disconnected') } } // 切换会话 const switchConversation = (conversationId: string) => { // 检查网络状态 if (!isOnlineRef.current) { throw new Error('网络连接异常,无法切换会话') } // 检查 WebSocket 连接状态 const currentState = getReadyState() if (!websocketRef.current || currentState !== ReadyState.Open) { throw new Error('WebSocket 未连接,无法切换会话') } try { const message = JSON.stringify({ message_type: 'switch_conversation', conversation_id: conversationId, }) websocketRef.current.send(message) } catch (error) { throw new Error(`切换会话失败: ${error}`) } } const connect = () => { reconnectTimesRef.current = 0 connectWsFn() } disconnectFn = () => { if (reconnectTimerRef.current) { clearTimeout(reconnectTimerRef.current) } if (reconnectDebounceTimerRef.current) { clearTimeout(reconnectDebounceTimerRef.current) } reconnectTimesRef.current = reconnectLimit clearHeartbeat() websocketRef.current?.close(1000, '手动断开') websocketRef.current = undefined setReadyState(ReadyState.Closed) } // 处理网络断开 const handleNetworkOffline = () => { clearHeartbeat() const currentState = getReadyState() if ( currentState === ReadyState.Open || currentState === ReadyState.Connecting ) { disconnectFn() } } // 重连函数 - 统一处理重连逻辑 const attemptReconnect = () => { // 清除之前的防抖定时器 if (reconnectDebounceTimerRef.current) { clearTimeout(reconnectDebounceTimerRef.current) } reconnectDebounceTimerRef.current = setTimeout(() => { const currentState = getReadyState() // 已连接或正在连接时跳过 if ( currentState === ReadyState.Open || currentState === ReadyState.Connecting ) { return } clearHeartbeat() const isClosed = currentState === ReadyState.Closed || currentState === ReadyState.Closing if (isClosed) { connect() } else { disconnectFn() setTimeout(() => { if ( isOnlineRef.current && getReadyState() !== ReadyState.Open && getReadyState() !== ReadyState.Connecting ) { connect() } }, reconnectDelay) } }, reconnectDebounce) } // 网络状态监听 let cleanupNetworkListener: (() => void) | undefined if (enableNetworkListener && typeof window !== 'undefined') { const handleOnline = () => { isOnlineRef.current = true attemptReconnect() } const handleOffline = () => { isOnlineRef.current = false handleNetworkOffline() } window.addEventListener('online', handleOnline) window.addEventListener('offline', handleOffline) cleanupNetworkListener = () => { window.removeEventListener('online', handleOnline) window.removeEventListener('offline', handleOffline) } } // 文档可见性监听 let cleanupVisibilityListener: (() => void) | undefined if (enableVisibilityListener && typeof document !== 'undefined') { const handleVisibilityChange = () => { const isVisible = !document.hidden isVisibleRef.current = isVisible if (isVisible && isOnlineRef.current) { const currentState = getReadyState() if ( currentState === ReadyState.Closed || currentState === ReadyState.Closing ) { attemptReconnect() } } } document.addEventListener('visibilitychange', handleVisibilityChange) cleanupVisibilityListener = () => { document.removeEventListener('visibilitychange', handleVisibilityChange) } } // 自动连接 if (!manual && socketUrl) { connect() } // 清理函数 const cleanup = () => { disconnectFn() cleanupNetworkListener?.() cleanupVisibilityListener?.() } const result: Result = { sendMessage, connect, disconnect: disconnectFn, get readyState() { return getReadyState() }, get webSocketIns() { return websocketRef.current }, clearHeartbeat, switchConversation, cleanup, } return result } export default createWebSocketClient