初始化模版工程

This commit is contained in:
Cloud Bot
2026-03-20 07:33:46 +00:00
commit 23717e0ecd
386 changed files with 51675 additions and 0 deletions

View File

@@ -0,0 +1,276 @@
import type { DWClient, DWClientDownStream, RobotMessage } from 'dingtalk-stream'
import { HTTPClient } from '@/http'
import { getConversationId, setConversationId } from './user-store'
import { sendMessageAndWaitResponse } from '@/remote-control/shared/nova-bridge'
import { uploadBotAttachments, type BotAttachment } from '@/remote-control/shared/file-uploader'
import { BotLogger } from '@/remote-control/shared/logger'
import { getDefaultAgentId } from '@/app/api/nova-config'
// Extended message types — dingtalk-stream SDK only types text messages
interface DingTalkMediaContent {
downloadCode: string
fileName?: string
}
interface DingTalkRichTextItem {
text?: string
downloadCode?: string
type?: string
}
type ExtendedMessage = {
msgtype: string
senderStaffId: string
senderNick: string
sessionWebhook: string
robotCode: string
conversationType?: string
text?: { content: string }
content?: DingTalkMediaContent | { richText: DingTalkRichTextItem[] } | string
}
const oapiClient = new HTTPClient({
baseURL: process.env.NOVA_BASE_URL,
headers: {
'Content-Type': 'application/json',
'Tenant-Id': process.env.NOVA_TENANT_ID!,
'Authorization': process.env.NOVA_ACCESS_KEY!,
},
})
const userQueues = new Map<string, Promise<void>>()
async function getOrCreateConversation(staffId: string, senderNick: string): Promise<string> {
const existing = getConversationId(staffId)
if (existing) return existing
const agentId = getDefaultAgentId()
const res = await oapiClient.post<{ conversation_id: string }>(
'/v1/oapi/super_agent/chat/create_conversation',
{
agent_id: agentId,
title: `DingTalk User ${senderNick || staffId}`,
},
)
const conversationId = res.conversation_id
setConversationId(staffId, conversationId, senderNick)
return conversationId
}
async function replyMessage(
client: DWClient,
sessionWebhook: string,
senderStaffId: string,
text: string,
): Promise<void> {
const accessToken = await client.getAccessToken()
const body = {
msgtype: 'markdown',
markdown: {
title: '回复',
text,
},
at: {
atUserIds: [senderStaffId],
isAtAll: false,
},
}
const response = await fetch(sessionWebhook, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'x-acs-dingtalk-access-token': accessToken,
},
body: JSON.stringify(body),
})
if (!response.ok) {
console.error(`[DingTalk Bot] 回复消息失败: ${response.status} ${response.statusText}`)
}
}
// --- Attachment extraction helpers ---
function extractTextContent(msg: ExtendedMessage): string {
if (msg.msgtype === 'text' && msg.text?.content) {
return msg.text.content.trim()
}
if (msg.msgtype === 'richText' && msg.content && typeof msg.content === 'object' && 'richText' in msg.content) {
return (msg.content as { richText: DingTalkRichTextItem[] }).richText
.filter(item => item.text)
.map(item => item.text)
.join('')
.trim()
}
return ''
}
async function getDingTalkDownloadUrl(
downloadCode: string,
robotCode: string,
accessToken: string,
): Promise<string> {
const response = await fetch('https://api.dingtalk.com/v1.0/robot/messageFiles/download', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'x-acs-dingtalk-access-token': accessToken,
},
body: JSON.stringify({ downloadCode, robotCode }),
})
if (!response.ok) {
throw new Error(`DingTalk download API failed: ${response.status}`)
}
const data = await response.json() as { downloadUrl: string }
return data.downloadUrl
}
function getFileNameForMsgType(msgtype: string, content: DingTalkMediaContent): string {
if (content.fileName) return content.fileName
const extMap: Record<string, string> = {
picture: '.png',
audio: '.mp3',
video: '.mp4',
file: '',
}
return `dingtalk_${msgtype}_${Date.now()}${extMap[msgtype] || ''}`
}
async function extractAttachments(
msg: ExtendedMessage,
client: DWClient,
): Promise<BotAttachment[]> {
const attachments: BotAttachment[] = []
const accessToken = await client.getAccessToken()
const robotCode = msg.robotCode
// Handle picture/file/audio/video message types
if (['picture', 'file', 'audio', 'video'].includes(msg.msgtype)) {
const content = typeof msg.content === 'string'
? JSON.parse(msg.content) as DingTalkMediaContent
: msg.content as DingTalkMediaContent
if (content?.downloadCode) {
try {
const downloadUrl = await getDingTalkDownloadUrl(content.downloadCode, robotCode, accessToken)
attachments.push({
url: downloadUrl,
fileName: getFileNameForMsgType(msg.msgtype, content),
})
} catch (err) {
console.error('[DingTalk Bot] Failed to get download URL:', err)
}
}
}
// Handle richText messages (may contain inline images)
if (msg.msgtype === 'richText' && msg.content && typeof msg.content === 'object' && 'richText' in msg.content) {
const richText = (msg.content as { richText: DingTalkRichTextItem[] }).richText
for (const item of richText) {
if (item.downloadCode) {
try {
const downloadUrl = await getDingTalkDownloadUrl(item.downloadCode, robotCode, accessToken)
attachments.push({
url: downloadUrl,
fileName: `dingtalk_richtext_${Date.now()}.png`,
})
} catch (err) {
console.error('[DingTalk Bot] Failed to get richText download URL:', err)
}
}
}
}
return attachments
}
// --- Main message processing ---
async function processMessage(client: DWClient, msg: RobotMessage, messageId: string, onMessage?: () => void): Promise<void> {
const extMsg = msg as unknown as ExtendedMessage
const content = extractTextContent(extMsg)
const attachments = await extractAttachments(extMsg, client)
if (!content && attachments.length === 0) {
client.socketCallBackResponse(messageId, { status: 'OK' })
return
}
const staffId = msg.senderStaffId
const senderNick = msg.senderNick || staffId
BotLogger.log({
platform: 'dingtalk',
eventType: 'message_received',
severity: 'info',
message: `收到来自 ${senderNick} 的消息 (${extMsg.msgtype}, ${attachments.length} 附件)`,
details: { staffId, conversationType: msg.conversationType },
})
try {
// Upload attachments if any
let uploadFileIds: string[] | undefined
if (attachments.length > 0) {
uploadFileIds = await uploadBotAttachments(attachments)
if (uploadFileIds.length === 0) uploadFileIds = undefined
}
const conversationId = await getOrCreateConversation(staffId, senderNick)
const response = await sendMessageAndWaitResponse(
`dingtalk:${staffId}`, conversationId, content || '', uploadFileIds,
)
if (!response) {
await replyMessage(client, msg.sessionWebhook, staffId, '没有收到回复,请稍后重试。')
} else {
await replyMessage(client, msg.sessionWebhook, staffId, response)
}
if (onMessage) onMessage()
BotLogger.log({
platform: 'dingtalk',
eventType: 'message_sent',
severity: 'info',
message: `已回复用户 ${senderNick}`,
})
} catch (error) {
const errMsg = error instanceof Error ? error.message : '未知错误'
console.error('[DingTalk Bot] 处理消息失败:', errMsg)
await replyMessage(client, msg.sessionWebhook, staffId, `处理消息时出错: ${errMsg}`).catch(() => {})
BotLogger.log({
platform: 'dingtalk',
eventType: 'error',
severity: 'error',
message: `处理消息失败: ${errMsg}`,
details: { staffId },
})
}
client.socketCallBackResponse(messageId, { status: 'OK' })
}
export function handleRobotCallback(client: DWClient, onMessage?: () => void) {
return async (res: DWClientDownStream) => {
const msg = JSON.parse(res.data) as RobotMessage
const extMsg = msg as unknown as ExtendedMessage
const messageId = res.headers.messageId
const staffId = msg.senderStaffId
console.log(`[DingTalk Bot] 收到消息 from ${msg.senderNick} (${msg.conversationType === '1' ? '单聊' : '群聊'}, type: ${extMsg.msgtype}): ${msg.text?.content?.trim()?.slice(0, 50) || '[非文本消息]'}`)
const currentQueue = userQueues.get(staffId) ?? Promise.resolve()
const newQueue = currentQueue.then(() => processMessage(client, msg, messageId, onMessage))
userQueues.set(staffId, newQueue)
newQueue.finally(() => {
if (userQueues.get(staffId) === newQueue) {
userQueues.delete(staffId)
}
})
}
}

View File

@@ -0,0 +1,142 @@
import { DWClient, TOPIC_ROBOT, EventAck } from 'dingtalk-stream'
import { handleRobotCallback } from './handlers'
import { closeConnectionsForPlatform } from '@/remote-control/shared/nova-bridge'
import { BotLogger } from '@/remote-control/shared/logger'
import { ConfigManager, type ConfigChangeEvent } from '@/remote-control/config/manager'
let client: DWClient | null = null
let currentClientId: string = ''
let currentClientSecret: string = ''
let startTime: number = 0
let messagesProcessed: number = 0
let lastError: Error | null = null
export interface BotStatus {
platform: string
status: 'connected' | 'disconnected' | 'connecting'
uptime?: number
messagesProcessed: number
error?: string
}
export async function startBot(clientId: string, clientSecret: string): Promise<void> {
if (!clientId || !clientSecret) {
const msg = '钉钉 Client ID 或 Client Secret 为空,跳过启动'
console.warn(`[DingTalk Bot] ${msg}`)
BotLogger.log({ platform: 'dingtalk', eventType: 'error', severity: 'warning', message: msg })
return
}
// 幂等检查:如果已有活跃连接且凭据未变,直接跳过
if (client && currentClientId === clientId && currentClientSecret === clientSecret) {
console.log('[DingTalk Bot] 已有活跃连接,跳过重复启动')
return
}
// 如果已有旧连接(凭据变更),先停止
if (client) {
console.log('[DingTalk Bot] 凭据已变更,先停止旧连接')
await stopBot()
}
currentClientId = clientId
currentClientSecret = clientSecret
try {
const newClient = new DWClient({
clientId,
clientSecret,
debug: false,
})
newClient.registerCallbackListener(TOPIC_ROBOT, handleRobotCallback(newClient, () => { messagesProcessed++ }))
newClient.registerAllEventListener(() => {
return { status: EventAck.SUCCESS }
})
// 先设置 client再 connect保证闭包和模块变量指向同一实例
client = newClient
client.connect()
startTime = Date.now()
lastError = null
console.log('[DingTalk Bot] 已启动,等待消息...')
BotLogger.log({
platform: 'dingtalk',
eventType: 'connection',
severity: 'info',
message: '钉钉 Bot 已启动',
})
} catch (err) {
const error = err instanceof Error ? err : new Error(String(err))
lastError = error
client = null
currentClientId = ''
currentClientSecret = ''
console.error('[DingTalk Bot] 启动失败:', error.message)
BotLogger.log({
platform: 'dingtalk',
eventType: 'error',
severity: 'error',
message: `钉钉 Bot 启动失败: ${error.message}`,
details: { hint: '请检查 Client ID 和 Client Secret 是否正确' },
})
throw error
}
}
export async function stopBot(): Promise<void> {
closeConnectionsForPlatform('dingtalk')
if (client) {
client.disconnect()
client = null
currentClientId = ''
currentClientSecret = ''
startTime = 0
console.log('[DingTalk Bot] 已停止')
BotLogger.log({
platform: 'dingtalk',
eventType: 'disconnection',
severity: 'info',
message: '钉钉 Bot 已停止',
})
}
}
export function getStatus(): BotStatus {
return {
platform: 'dingtalk',
status: client ? 'connected' : 'disconnected',
uptime: client && startTime ? Math.floor((Date.now() - startTime) / 1000) : undefined,
messagesProcessed,
error: lastError?.message,
}
}
const configManager = ConfigManager.getInstance()
configManager.on('config-changed', async (event: ConfigChangeEvent) => {
const { newConfig } = event
const platformConfig = newConfig.dingtalk
try {
if (platformConfig.enabled && !client) {
await startBot(platformConfig.clientId, platformConfig.clientSecret)
} else if (!platformConfig.enabled && client) {
await stopBot()
} else if (platformConfig.enabled && client) {
if (platformConfig.clientId !== currentClientId || platformConfig.clientSecret !== currentClientSecret) {
await stopBot()
await startBot(platformConfig.clientId, platformConfig.clientSecret)
}
}
lastError = null
} catch (error) {
lastError = error instanceof Error ? error : new Error(String(error))
BotLogger.log({
platform: 'dingtalk',
eventType: 'error',
severity: 'error',
message: `配置变更处理失败: ${lastError.message}`,
})
}
})

View File

@@ -0,0 +1,47 @@
import fs from 'node:fs'
import path from 'node:path'
interface UserMapping {
conversationId: string
senderNick: string
createdAt: string
}
type UserStore = Record<string, UserMapping>
const STORE_PATH = path.join(process.cwd(), 'remote-control', 'data', 'dingtalk-users.json')
function readStore(): UserStore {
try {
if (!fs.existsSync(STORE_PATH)) {
return {}
}
const data = fs.readFileSync(STORE_PATH, 'utf-8')
return JSON.parse(data)
} catch {
return {}
}
}
function writeStore(store: UserStore): void {
const dir = path.dirname(STORE_PATH)
if (!fs.existsSync(dir)) {
fs.mkdirSync(dir, { recursive: true })
}
fs.writeFileSync(STORE_PATH, JSON.stringify(store, null, 2), 'utf-8')
}
export function getConversationId(staffId: string): string | null {
const store = readStore()
return store[staffId]?.conversationId ?? null
}
export function setConversationId(staffId: string, conversationId: string, senderNick: string): void {
const store = readStore()
store[staffId] = {
conversationId,
senderNick,
createdAt: new Date().toISOString(),
}
writeStore(store)
}

View File

@@ -0,0 +1,158 @@
import type { Client, Message } from 'discord.js'
import { HTTPClient } from '@/http'
import { getDefaultAgentId } from '@/app/api/nova-config'
import { getConversationId, setConversationId } from './user-store'
import { sendMessageAndWaitResponse } from '@/remote-control/shared/nova-bridge'
import { uploadBotAttachments, type BotAttachment } from '@/remote-control/shared/file-uploader'
import { BotLogger } from '@/remote-control/shared/logger'
const oapiClient = new HTTPClient({
baseURL: process.env.NOVA_BASE_URL,
headers: {
'Content-Type': 'application/json',
'Tenant-Id': process.env.NOVA_TENANT_ID!,
'Authorization': process.env.NOVA_ACCESS_KEY!,
},
})
const userQueues = new Map<string, Promise<void>>()
function splitMessage(text: string, maxLength = 2000): string[] {
if (text.length <= maxLength) return [text]
const parts: string[] = []
let remaining = text
while (remaining.length > 0) {
if (remaining.length <= maxLength) {
parts.push(remaining)
break
}
let splitIndex = remaining.lastIndexOf('\n', maxLength)
if (splitIndex <= 0) {
splitIndex = maxLength
}
parts.push(remaining.slice(0, splitIndex))
remaining = remaining.slice(splitIndex).trimStart()
}
return parts
}
async function getOrCreateConversation(discordUserId: string): Promise<string> {
const existing = getConversationId(discordUserId)
if (existing) return existing
const res = await oapiClient.post<{ conversation_id: string }>(
'/v1/oapi/super_agent/chat/create_conversation',
{
agent_id: getDefaultAgentId(),
title: `Discord User ${discordUserId}`,
},
)
const conversationId = res.conversation_id
setConversationId(discordUserId, conversationId)
return conversationId
}
async function processMessage(message: Message, onMessage?: () => void): Promise<void> {
const content = message.content
.replace(/<@!?\d+>/g, '')
.trim()
// Extract attachments from Discord message
const attachments: BotAttachment[] = Array.from(message.attachments.values()).map(att => ({
url: att.url,
fileName: att.name ?? `discord_file_${Date.now()}`,
mimeType: att.contentType ?? undefined,
size: att.size,
}))
if (!content && attachments.length === 0) return
BotLogger.log({
platform: 'discord',
eventType: 'message_received',
severity: 'info',
message: `收到来自用户 ${message.author.username} 的消息 (${attachments.length} 附件)`,
details: { userId: message.author.id, channelId: message.channelId },
})
let typingInterval: ReturnType<typeof setInterval> | null = null
try {
const channel = message.channel
if ('sendTyping' in channel) {
await channel.sendTyping()
}
typingInterval = setInterval(() => {
if ('sendTyping' in channel) {
channel.sendTyping().catch(() => {})
}
}, 8000)
// Upload attachments if any
let uploadFileIds: string[] | undefined
if (attachments.length > 0) {
uploadFileIds = await uploadBotAttachments(attachments)
if (uploadFileIds.length === 0) uploadFileIds = undefined
}
const conversationId = await getOrCreateConversation(message.author.id)
const response = await sendMessageAndWaitResponse(
`discord:${message.author.id}`,
conversationId,
content || '',
uploadFileIds,
)
if (!response) {
await message.reply('没有收到回复,请稍后重试。')
return
}
const parts = splitMessage(response)
for (const part of parts) {
await message.reply(part)
}
if (onMessage) onMessage()
BotLogger.log({
platform: 'discord',
eventType: 'message_sent',
severity: 'info',
message: `已回复用户 ${message.author.username}`,
})
} catch (error) {
const errMsg = error instanceof Error ? error.message : '未知错误'
console.error('[Discord Bot] 处理消息失败:', errMsg)
await message.reply(`处理消息时出错: ${errMsg}`).catch(() => {})
BotLogger.log({
platform: 'discord',
eventType: 'error',
severity: 'error',
message: `处理消息失败: ${errMsg}`,
details: { userId: message.author.id },
})
} finally {
if (typingInterval) clearInterval(typingInterval)
}
}
export function registerHandlers(client: Client, onMessage?: () => void): void {
client.on('messageCreate', (message: Message) => {
if (message.author.bot) return
if (!client.user || !message.mentions.has(client.user)) return
const userId = message.author.id
const currentQueue = userQueues.get(userId) ?? Promise.resolve()
const newQueue = currentQueue.then(() => processMessage(message, onMessage))
userQueues.set(userId, newQueue)
newQueue.finally(() => {
if (userQueues.get(userId) === newQueue) {
userQueues.delete(userId)
}
})
})
}

View File

@@ -0,0 +1,147 @@
import { Client, GatewayIntentBits, Events } from 'discord.js'
import { registerHandlers } from './handlers'
import { closeConnectionsForPlatform } from '@/remote-control/shared/nova-bridge'
import { BotLogger } from '@/remote-control/shared/logger'
import { ConfigManager, type ConfigChangeEvent } from '@/remote-control/config/manager'
let client: Client | null = null
let currentToken: string = ''
let startTime: number = 0
let messagesProcessed: number = 0
let lastError: Error | null = null
export interface BotStatus {
platform: string
status: 'connected' | 'disconnected' | 'connecting'
uptime?: number
messagesProcessed: number
error?: string
}
export async function startBot(token: string): Promise<void> {
if (!token) {
const msg = 'Discord Bot Token 为空,跳过启动'
console.warn(`[Discord Bot] ${msg}`)
BotLogger.log({ platform: 'discord', eventType: 'error', severity: 'warning', message: msg })
return
}
currentToken = token
client = new Client({
intents: [
GatewayIntentBits.Guilds,
GatewayIntentBits.GuildMessages,
GatewayIntentBits.MessageContent,
],
})
client.once(Events.ClientReady, (readyClient) => {
startTime = Date.now()
lastError = null
console.log(`[Discord Bot] 已登录: ${readyClient.user.tag}`)
BotLogger.log({
platform: 'discord',
eventType: 'connection',
severity: 'info',
message: `Discord Bot 已连接,用户名: ${readyClient.user.tag}`,
})
})
client.on('error', (err) => {
lastError = err
console.error('[Discord Bot] 运行时错误:', err.message)
BotLogger.log({
platform: 'discord',
eventType: 'error',
severity: 'error',
message: `Discord Bot 运行时错误: ${err.message}`,
})
})
client.on('disconnect', () => {
console.warn('[Discord Bot] 连接断开')
BotLogger.log({
platform: 'discord',
eventType: 'disconnection',
severity: 'warning',
message: 'Discord Bot 连接断开',
})
})
registerHandlers(client, () => { messagesProcessed++ })
try {
await client.login(token)
} catch (err) {
const error = err instanceof Error ? err : new Error(String(err))
lastError = error
client = null
currentToken = ''
console.error('[Discord Bot] 登录失败:', error.message)
BotLogger.log({
platform: 'discord',
eventType: 'error',
severity: 'error',
message: `Discord Bot 登录失败: ${error.message}`,
details: { hint: '请检查 Bot Token 是否正确,以及是否已在 Discord Developer Portal 中启用了 Bot' },
})
throw error
}
}
export async function stopBot(): Promise<void> {
closeConnectionsForPlatform('discord')
if (client) {
client.destroy()
client = null
currentToken = ''
startTime = 0
console.log('[Discord Bot] 已停止')
BotLogger.log({
platform: 'discord',
eventType: 'disconnection',
severity: 'info',
message: 'Discord Bot 已停止',
})
}
}
export function getStatus(): BotStatus {
return {
platform: 'discord',
status: client ? 'connected' : 'disconnected',
uptime: client && startTime ? Math.floor((Date.now() - startTime) / 1000) : undefined,
messagesProcessed,
error: lastError?.message,
}
}
// 监听配置变更
const configManager = ConfigManager.getInstance()
configManager.on('config-changed', async (event: ConfigChangeEvent) => {
const { newConfig } = event
const platformConfig = newConfig.discord
try {
if (platformConfig.enabled && !client) {
await startBot(platformConfig.botToken)
} else if (!platformConfig.enabled && client) {
await stopBot()
} else if (platformConfig.enabled && client) {
if (platformConfig.botToken !== currentToken) {
await stopBot()
await startBot(platformConfig.botToken)
}
}
lastError = null
} catch (error) {
lastError = error instanceof Error ? error : new Error(String(error))
BotLogger.log({
platform: 'discord',
eventType: 'error',
severity: 'error',
message: `配置变更处理失败: ${lastError.message}`,
})
}
})

View File

@@ -0,0 +1,45 @@
import fs from 'node:fs'
import path from 'node:path'
interface UserMapping {
conversationId: string
createdAt: string
}
type UserStore = Record<string, UserMapping>
const STORE_PATH = path.join(process.cwd(), 'remote-control', 'data', 'discord-users.json')
function readStore(): UserStore {
try {
if (!fs.existsSync(STORE_PATH)) {
return {}
}
const data = fs.readFileSync(STORE_PATH, 'utf-8')
return JSON.parse(data)
} catch {
return {}
}
}
function writeStore(store: UserStore): void {
const dir = path.dirname(STORE_PATH)
if (!fs.existsSync(dir)) {
fs.mkdirSync(dir, { recursive: true })
}
fs.writeFileSync(STORE_PATH, JSON.stringify(store, null, 2), 'utf-8')
}
export function getConversationId(discordUserId: string): string | null {
const store = readStore()
return store[discordUserId]?.conversationId ?? null
}
export function setConversationId(discordUserId: string, conversationId: string): void {
const store = readStore()
store[discordUserId] = {
conversationId,
createdAt: new Date().toISOString(),
}
writeStore(store)
}

View File

@@ -0,0 +1,354 @@
import type * as lark from '@larksuiteoapi/node-sdk'
import { HTTPClient } from '@/http'
import { getDefaultAgentId } from '@/app/api/nova-config'
import { getConversationId, setConversationId } from './user-store'
import { sendMessageAndWaitResponse } from '@/remote-control/shared/nova-bridge'
import { uploadBotAttachments, type BotAttachment } from '@/remote-control/shared/file-uploader'
import { BotLogger } from '@/remote-control/shared/logger'
const oapiClient = new HTTPClient({
baseURL: process.env.NOVA_BASE_URL,
headers: {
'Content-Type': 'application/json',
'Tenant-Id': process.env.NOVA_TENANT_ID!,
'Authorization': process.env.NOVA_ACCESS_KEY!,
},
})
const userQueues = new Map<string, Promise<void>>()
const processedMessages = new Set<string>()
function extractTextContent(messageType: string, content: string): string {
try {
const parsed = JSON.parse(content)
if (messageType === 'text') {
return (parsed.text || '').trim()
}
if (messageType === 'post') {
// 富文本:遍历 content 数组提取 text 类型的文本
const contentArray = parsed.content || []
const texts: string[] = []
for (const paragraph of contentArray) {
if (Array.isArray(paragraph)) {
for (const item of paragraph) {
if (item.tag === 'text' && item.text) {
texts.push(item.text)
}
}
}
}
return texts.join(' ').trim()
}
return ''
} catch {
return ''
}
}
function cleanMentions(text: string): string {
// 清除 @_user_N 占位符
return text.replace(/@_user_\d+/g, '').trim()
}
async function extractAttachments(
client: lark.Client,
messageType: string,
content: string,
messageId: string,
): Promise<BotAttachment[]> {
const attachments: BotAttachment[] = []
try {
const parsed = JSON.parse(content)
// 处理 image 类型
if (messageType === 'image' && parsed.image_key) {
const resp = await client.im.messageResource.get({
path: { message_id: messageId, file_key: parsed.image_key },
params: { type: 'image' },
})
const stream = resp.getReadableStream()
const chunks: Buffer[] = []
for await (const chunk of stream) {
chunks.push(chunk)
}
const buffer = Buffer.concat(chunks)
// 将 buffer 转为临时 URL通过 data URI
const dataUri = `data:image/png;base64,${buffer.toString('base64')}`
attachments.push({
url: dataUri,
fileName: `lark_image_${Date.now()}.png`,
})
}
// 处理 file 类型
if (messageType === 'file' && parsed.file_key) {
const resp = await client.im.messageResource.get({
path: { message_id: messageId, file_key: parsed.file_key },
params: { type: 'file' },
})
const stream = resp.getReadableStream()
const chunks: Buffer[] = []
for await (const chunk of stream) {
chunks.push(chunk)
}
const buffer = Buffer.concat(chunks)
const dataUri = `data:application/octet-stream;base64,${buffer.toString('base64')}`
attachments.push({
url: dataUri,
fileName: parsed.file_name || `lark_file_${Date.now()}`,
})
}
// 处理 post 富文本中的图片
if (messageType === 'post' && parsed.content) {
for (const paragraph of parsed.content) {
if (Array.isArray(paragraph)) {
for (const item of paragraph) {
if (item.tag === 'img' && item.image_key) {
const resp = await client.im.messageResource.get({
path: { message_id: messageId, file_key: item.image_key },
params: { type: 'image' },
})
const stream = resp.getReadableStream()
const chunks: Buffer[] = []
for await (const chunk of stream) {
chunks.push(chunk)
}
const buffer = Buffer.concat(chunks)
const dataUri = `data:image/png;base64,${buffer.toString('base64')}`
attachments.push({
url: dataUri,
fileName: `lark_post_image_${Date.now()}.png`,
})
}
}
}
}
}
} catch (err) {
console.error('[Lark Bot] 提取附件失败:', err)
}
return attachments
}
async function getOrCreateConversation(openId: string, name: string): Promise<string> {
const existing = getConversationId(openId)
if (existing) return existing
const agentId = getDefaultAgentId()
const res = await oapiClient.post<{ conversation_id: string }>(
'/v1/oapi/super_agent/chat/create_conversation',
{
agent_id: agentId,
title: `Lark User ${name || openId}`,
},
)
const conversationId = res.conversation_id
setConversationId(openId, conversationId, name)
return conversationId
}
async function replyMessage(
client: lark.Client,
chatId: string,
text: string,
): Promise<void> {
// 检查文本中是否包含 Markdown 图片语法
const imageRegex = /!\[([^\]]*)\]\(([^)]+)\)/g
const images: Array<{ alt: string; url: string }> = []
let match: RegExpExecArray | null
while ((match = imageRegex.exec(text)) !== null) {
images.push({ alt: match[1], url: match[2] })
}
// 如果包含图片,使用富文本消息
if (images.length > 0) {
// 移除 Markdown 图片语法,保留其他文本
const textWithoutImages = text.replace(imageRegex, '').trim()
// 构建富文本内容
const content: any[][] = []
// 添加文本段落
if (textWithoutImages) {
const lines = textWithoutImages.split('\n').filter(line => line.trim())
for (const line of lines) {
content.push([{ tag: 'text', text: line }])
}
}
// 上传图片并添加到富文本中
for (const image of images) {
try {
// 下载图片
const response = await fetch(image.url)
const buffer = Buffer.from(await response.arrayBuffer())
// 上传到飞书
const uploadResp = await client.im.image.create({
data: {
image_type: 'message',
image: buffer,
},
})
const imageKey = uploadResp.image_key || uploadResp.data?.image_key
if (imageKey) {
// 添加图片到富文本
content.push([{ tag: 'img', image_key: imageKey }])
} else {
// 如果没有 image_key添加链接
content.push([{ tag: 'a', text: image.alt || '查看图片', href: image.url }])
}
} catch (err) {
console.error('[Lark Bot] 上传图片失败:', err)
// 上传失败,添加图片链接
content.push([{ tag: 'a', text: image.alt || '查看图片', href: image.url }])
}
}
// 飞书富文本格式需要包含语言标识
await client.im.message.create({
params: { receive_id_type: 'chat_id' },
data: {
receive_id: chatId,
content: JSON.stringify({
zh_cn: {
title: '',
content: content
}
}),
msg_type: 'post',
},
})
} else {
// 纯文本消息
await client.im.message.create({
params: { receive_id_type: 'chat_id' },
data: {
receive_id: chatId,
content: JSON.stringify({ text }),
msg_type: 'text',
},
})
}
}
async function processMessage(
client: lark.Client,
data: any,
onMessage?: () => void,
): Promise<void> {
const message = data.message
const sender = data.sender
const messageId = message.message_id
const openId = sender.sender_id.open_id
const name = sender.sender_id.union_id || openId
const chatId = message.chat_id
const messageType = message.message_type
const content = message.content
// 消息去重
if (processedMessages.has(messageId)) {
return
}
processedMessages.add(messageId)
// 限制 Set 大小,防止内存泄漏
if (processedMessages.size > 10000) {
const firstItem = processedMessages.values().next().value
if (firstItem) {
processedMessages.delete(firstItem)
}
}
// 提取文本和附件
let text = extractTextContent(messageType, content)
text = cleanMentions(text)
const attachments = await extractAttachments(client, messageType, content, messageId)
if (!text && attachments.length === 0) {
return
}
BotLogger.log({
platform: 'lark',
eventType: 'message_received',
severity: 'info',
message: `收到来自 ${name} 的消息 (${messageType}, ${attachments.length} 附件)`,
details: { openId, chatId },
})
try {
// 上传附件
let uploadFileIds: string[] | undefined
if (attachments.length > 0) {
uploadFileIds = await uploadBotAttachments(attachments)
if (uploadFileIds.length === 0) uploadFileIds = undefined
}
const conversationId = await getOrCreateConversation(openId, name)
const response = await sendMessageAndWaitResponse(
`lark:${openId}`,
conversationId,
text || '',
uploadFileIds,
)
if (!response) {
await replyMessage(client, chatId, '没有收到回复,请稍后重试。')
} else {
await replyMessage(client, chatId, response)
}
if (onMessage) onMessage()
BotLogger.log({
platform: 'lark',
eventType: 'message_sent',
severity: 'info',
message: `已回复用户 ${name}`,
})
} catch (error) {
const errMsg = error instanceof Error ? error.message : '未知错误'
console.error('[Lark Bot] 处理消息失败:', errMsg)
await replyMessage(client, chatId, `处理消息时出错: ${errMsg}`).catch(() => {})
BotLogger.log({
platform: 'lark',
eventType: 'error',
severity: 'error',
message: `处理消息失败: ${errMsg}`,
details: { openId },
})
}
}
export function createEventHandler(client: lark.Client, onMessage?: () => void) {
return async (data: any) => {
const openId = data.sender?.sender_id?.open_id
if (!openId) return
// 使用队列确保单用户消息顺序处理
const currentQueue = userQueues.get(openId) ?? Promise.resolve()
const newQueue = currentQueue.then(() => processMessage(client, data, onMessage))
userQueues.set(openId, newQueue)
newQueue.finally(() => {
if (userQueues.get(openId) === newQueue) {
userQueues.delete(openId)
}
})
}
}

View File

@@ -0,0 +1,146 @@
import * as lark from '@larksuiteoapi/node-sdk'
import { createEventHandler } from './handlers'
import { closeConnectionsForPlatform } from '@/remote-control/shared/nova-bridge'
import { BotLogger } from '@/remote-control/shared/logger'
import { ConfigManager, type ConfigChangeEvent } from '@/remote-control/config/manager'
let client: lark.Client | null = null
let wsClient: lark.WSClient | null = null
let currentAppId: string = ''
let currentAppSecret: string = ''
let startTime: number = 0
let messagesProcessed: number = 0
let lastError: Error | null = null
export interface BotStatus {
platform: string
status: 'connected' | 'disconnected' | 'connecting'
uptime?: number
messagesProcessed: number
error?: string
}
export async function startBot(appId: string, appSecret: string): Promise<void> {
if (!appId || !appSecret) {
const msg = '飞书 App ID 或 App Secret 为空,跳过启动'
console.warn(`[Lark Bot] ${msg}`)
BotLogger.log({ platform: 'lark', eventType: 'error', severity: 'warning', message: msg })
return
}
// 幂等检查:如果已有活跃连接且凭据未变,直接跳过
if (wsClient && currentAppId === appId && currentAppSecret === appSecret) {
console.log('[Lark Bot] 已有活跃连接,跳过重复启动')
return
}
// 如果已有旧连接(凭据变更),先停止
if (wsClient) {
console.log('[Lark Bot] 凭据已变更,先停止旧连接')
await stopBot()
}
currentAppId = appId
currentAppSecret = appSecret
try {
client = new lark.Client({ appId, appSecret })
wsClient = new lark.WSClient({
appId,
appSecret,
loggerLevel: lark.LoggerLevel.info,
})
const eventHandler = createEventHandler(client, () => { messagesProcessed++ })
wsClient.start({
eventDispatcher: new lark.EventDispatcher({}).register({
'im.message.receive_v1': eventHandler,
}),
})
startTime = Date.now()
lastError = null
console.log('[Lark Bot] 已启动,等待消息...')
BotLogger.log({
platform: 'lark',
eventType: 'connection',
severity: 'info',
message: '飞书 Bot 已启动',
})
} catch (err) {
const error = err instanceof Error ? err : new Error(String(err))
lastError = error
client = null
wsClient = null
currentAppId = ''
currentAppSecret = ''
console.error('[Lark Bot] 启动失败:', error.message)
BotLogger.log({
platform: 'lark',
eventType: 'error',
severity: 'error',
message: `飞书 Bot 启动失败: ${error.message}`,
details: { hint: '请检查 App ID 和 App Secret 是否正确' },
})
throw error
}
}
export async function stopBot(): Promise<void> {
closeConnectionsForPlatform('lark')
if (wsClient) {
wsClient.close()
wsClient = null
client = null
currentAppId = ''
currentAppSecret = ''
startTime = 0
console.log('[Lark Bot] 已停止')
BotLogger.log({
platform: 'lark',
eventType: 'disconnection',
severity: 'info',
message: '飞书 Bot 已停止',
})
}
}
export function getStatus(): BotStatus {
return {
platform: 'lark',
status: wsClient ? 'connected' : 'disconnected',
uptime: wsClient && startTime ? Math.floor((Date.now() - startTime) / 1000) : undefined,
messagesProcessed,
error: lastError?.message,
}
}
const configManager = ConfigManager.getInstance()
configManager.on('config-changed', async (event: ConfigChangeEvent) => {
const { newConfig } = event
const platformConfig = newConfig.lark
try {
if (platformConfig.enabled && !wsClient) {
await startBot(platformConfig.appId, platformConfig.appSecret)
} else if (!platformConfig.enabled && wsClient) {
await stopBot()
} else if (platformConfig.enabled && wsClient) {
if (platformConfig.appId !== currentAppId || platformConfig.appSecret !== currentAppSecret) {
await stopBot()
await startBot(platformConfig.appId, platformConfig.appSecret)
}
}
lastError = null
} catch (error) {
lastError = error instanceof Error ? error : new Error(String(error))
BotLogger.log({
platform: 'lark',
eventType: 'error',
severity: 'error',
message: `配置变更处理失败: ${lastError.message}`,
})
}
})

View File

@@ -0,0 +1,47 @@
import fs from 'node:fs'
import path from 'node:path'
interface UserMapping {
conversationId: string
name: string
createdAt: string
}
type UserStore = Record<string, UserMapping>
const STORE_PATH = path.join(process.cwd(), 'remote-control', 'data', 'lark-users.json')
function readStore(): UserStore {
try {
if (!fs.existsSync(STORE_PATH)) {
return {}
}
const data = fs.readFileSync(STORE_PATH, 'utf-8')
return JSON.parse(data)
} catch {
return {}
}
}
function writeStore(store: UserStore): void {
const dir = path.dirname(STORE_PATH)
if (!fs.existsSync(dir)) {
fs.mkdirSync(dir, { recursive: true })
}
fs.writeFileSync(STORE_PATH, JSON.stringify(store, null, 2), 'utf-8')
}
export function getConversationId(openId: string): string | null {
const store = readStore()
return store[openId]?.conversationId ?? null
}
export function setConversationId(openId: string, conversationId: string, name: string): void {
const store = readStore()
store[openId] = {
conversationId,
name,
createdAt: new Date().toISOString(),
}
writeStore(store)
}

View File

@@ -0,0 +1,285 @@
import type { App, SayFn } from '@slack/bolt'
import { type WebClient } from '@slack/web-api'
import axios from 'axios'
import { HTTPClient } from '@/http'
import { getDefaultAgentId } from '@/app/api/nova-config'
import { getConversationId, setConversationId } from './user-store'
import { sendMessageAndWaitResponse } from '@/remote-control/shared/nova-bridge'
import { uploadBotAttachments, type BotAttachment } from '@/remote-control/shared/file-uploader'
import { BotLogger } from '@/remote-control/shared/logger'
/**
* Download a file from Slack using axios (same HTTP client as @slack/web-api).
* Node's undici fetch strips Authorization headers on cross-origin redirects,
* but axios preserves them properly.
*/
async function downloadSlackFile(url: string, token: string): Promise<Buffer | null> {
try {
const resp = await axios.get(url, {
headers: { Authorization: `Bearer ${token}` },
responseType: 'arraybuffer',
maxRedirects: 5,
validateStatus: (status) => status < 400,
})
// Guard: reject HTML responses (Slack login page on auth failure)
const contentType = resp.headers['content-type'] || ''
if (contentType.includes('text/html')) {
console.error('[Slack Bot] 下载返回 HTMLBot 可能缺少 files:read 权限')
return null
}
return Buffer.from(resp.data)
} catch (err) {
const msg = err instanceof Error ? err.message : String(err)
console.error(`[Slack Bot] 文件下载失败: ${msg}`)
return null
}
}
const oapiClient = new HTTPClient({
baseURL: process.env.NOVA_BASE_URL,
headers: {
'Content-Type': 'application/json',
'Tenant-Id': process.env.NOVA_TENANT_ID!,
'Authorization': process.env.NOVA_ACCESS_KEY!,
},
})
const userQueues = new Map<string, Promise<void>>()
/**
* Convert standard Markdown to Slack mrkdwn format.
* - `[text](url)` -> `<url|text>`
* - Remove `![alt](url)` image syntax (images are handled via Block Kit)
*/
function convertMarkdownToMrkdwn(text: string): string {
// Remove image syntax first
let result = text.replace(/!\[[^\]]*\]\([^)]+\)/g, '')
// Convert links
result = result.replace(/\[([^\]]+)\]\(([^)]+)\)/g, '<$2|$1>')
return result.trim()
}
/**
* Extract image URLs from Markdown `![alt](url)` syntax.
*/
function extractImageUrls(text: string): string[] {
const regex = /!\[[^\]]*\]\(([^)]+)\)/g
const urls: string[] = []
let match: RegExpExecArray | null
while ((match = regex.exec(text)) !== null) {
urls.push(match[1])
}
return urls
}
async function getOrCreateConversation(slackUserId: string, name: string): Promise<string> {
const existing = getConversationId(slackUserId)
if (existing) return existing
const res = await oapiClient.post<{ conversation_id: string }>(
'/v1/oapi/super_agent/chat/create_conversation',
{
agent_id: getDefaultAgentId(),
title: `Slack User ${name || slackUserId}`,
},
)
const conversationId = res.conversation_id
setConversationId(slackUserId, conversationId, name)
return conversationId
}
async function processMessage(
message: any,
say: SayFn,
client: WebClient,
botToken: string,
onMessage?: () => void,
): Promise<void> {
// Filter out bot messages
if (message.bot_id || message.subtype === 'bot_message') return
const userId = message.user as string
if (!userId) return
// Extract text, strip mention markup
const text = ((message.text as string) || '').replace(/<@[A-Z0-9]+>/g, '').trim()
const hasFiles = message.files && Array.isArray(message.files) && message.files.length > 0
// Skip empty messages (no text, no files)
if (!text && !hasFiles) return
// Fetch user info for display name
let userName = userId
try {
const userInfo = await client.users.info({ user: userId })
userName = userInfo.user?.real_name || userInfo.user?.name || userId
} catch {
// fallback to userId
}
// Extract file attachments — download with auth and convert to data URI
const attachments: BotAttachment[] = []
let fileDownloadFailed = false
if (hasFiles) {
for (const file of message.files as any[]) {
const downloadUrl = file.url_private_download || file.url_private
if (!downloadUrl) { console.warn('[Slack Bot] 文件无下载地址:', file.id, file.name); continue }
try {
console.log(`[Slack Bot] 开始下载文件: ${file.name}, URL: ${downloadUrl.substring(0, 80)}...`)
const buffer = await downloadSlackFile(downloadUrl, botToken)
if (!buffer) {
console.error(`[Slack Bot] 下载文件失败: ${file.name},请检查 Bot 是否有 files:read 权限`)
fileDownloadFailed = true
continue
}
const mimeType = file.mimetype || 'application/octet-stream'
const dataUri = `data:${mimeType};base64,${buffer.toString('base64')}`
attachments.push({
url: dataUri,
fileName: file.name ?? `slack_file_${Date.now()}`,
mimeType,
size: buffer.length,
})
} catch (err) {
console.error(`[Slack Bot] 下载文件异常:`, err)
fileDownloadFailed = true
}
}
}
BotLogger.log({
platform: 'slack',
eventType: 'message_received',
severity: 'info',
message: `收到来自 ${userName} 的消息 (${attachments.length} 附件)`,
details: { userId, channel: message.channel },
})
// If only files were sent and all downloads failed, notify user
if (!text && attachments.length === 0 && fileDownloadFailed) {
await say({ text: '文件下载失败,请联系管理员检查 Bot 的 files:read 权限配置。' }).catch(() => {})
BotLogger.log({
platform: 'slack',
eventType: 'error',
severity: 'error',
message: `文件下载失败,可能缺少 files:read 权限`,
details: { userId },
})
return
}
if (!text && attachments.length === 0) return
try {
// Upload attachments if any
let uploadFileIds: string[] | undefined
if (attachments.length > 0) {
uploadFileIds = await uploadBotAttachments(attachments)
if (uploadFileIds.length === 0) uploadFileIds = undefined
}
const conversationId = await getOrCreateConversation(userId, userName)
const response = await sendMessageAndWaitResponse(
`slack:${userId}`,
conversationId,
text || '',
uploadFileIds,
)
if (!response) {
await say({ text: '没有收到回复,请稍后重试。' })
return
}
// Check for images in response
const imageUrls = extractImageUrls(response)
if (imageUrls.length > 0) {
const mrkdwnText = convertMarkdownToMrkdwn(response)
const blocks: any[] = []
// Add text section if there's text content
if (mrkdwnText) {
blocks.push({
type: 'section',
text: {
type: 'mrkdwn',
text: mrkdwnText,
},
})
}
// Add image blocks
for (const url of imageUrls) {
blocks.push({
type: 'image',
image_url: url,
alt_text: 'image',
})
}
await say({
text: mrkdwnText || 'image',
blocks,
})
} else {
await say({ text: convertMarkdownToMrkdwn(response) })
}
if (onMessage) onMessage()
BotLogger.log({
platform: 'slack',
eventType: 'message_sent',
severity: 'info',
message: `已回复用户 ${userName}`,
})
} catch (error) {
const errMsg = error instanceof Error ? error.message : '未知错误'
console.error('[Slack Bot] 处理消息失败:', errMsg)
await say({ text: `处理消息时出错: ${errMsg}` }).catch(() => {})
BotLogger.log({
platform: 'slack',
eventType: 'error',
severity: 'error',
message: `处理消息失败: ${errMsg}`,
details: { userId },
})
}
}
function enqueueMessage(
userId: string,
message: any,
say: SayFn,
client: WebClient,
botToken: string,
onMessage?: () => void,
): void {
const currentQueue = userQueues.get(userId) ?? Promise.resolve()
const newQueue = currentQueue.then(() => processMessage(message, say, client, botToken, onMessage))
userQueues.set(userId, newQueue)
newQueue.finally(() => {
if (userQueues.get(userId) === newQueue) {
userQueues.delete(userId)
}
})
}
export function registerHandlers(app: App, botToken: string, onMessage?: () => void): void {
// Handle direct messages and channel messages
app.message(async ({ message, say, client }) => {
const userId = (message as any).user as string | undefined
if (!userId) return
enqueueMessage(userId, message, say, client, botToken, onMessage)
})
// Handle @mentions
app.event('app_mention', async ({ event, say, client }) => {
const userId = event.user
if (!userId) return
enqueueMessage(userId, event, say, client, botToken, onMessage)
})
}

View File

@@ -0,0 +1,143 @@
import { App, LogLevel } from '@slack/bolt'
import { registerHandlers } from './handlers'
import { closeConnectionsForPlatform } from '@/remote-control/shared/nova-bridge'
import { BotLogger } from '@/remote-control/shared/logger'
import { ConfigManager, type ConfigChangeEvent } from '@/remote-control/config/manager'
let app: App | null = null
let currentBotToken: string = ''
let currentAppToken: string = ''
let startTime: number = 0
let messagesProcessed: number = 0
let lastError: Error | null = null
export interface BotStatus {
platform: string
status: 'connected' | 'disconnected' | 'connecting'
uptime?: number
messagesProcessed: number
error?: string
}
export async function startBot(botToken: string, appToken: string): Promise<void> {
// Trim tokens to avoid ERR_INVALID_CHAR in HTTP headers
botToken = botToken.trim()
appToken = appToken.trim()
if (!botToken || !appToken) {
const msg = 'Slack Bot Token 或 App Token 为空,跳过启动'
console.warn(`[Slack Bot] ${msg}`)
BotLogger.log({ platform: 'slack', eventType: 'error', severity: 'warning', message: msg })
return
}
// Idempotent check
if (app && currentBotToken === botToken && currentAppToken === appToken) {
console.log('[Slack Bot] 已有活跃连接,跳过重复启动')
return
}
// Stop old connection if credentials changed
if (app) {
console.log('[Slack Bot] 凭据已变更,先停止旧连接')
await stopBot()
}
currentBotToken = botToken
currentAppToken = appToken
try {
app = new App({
token: botToken,
socketMode: true,
appToken,
logLevel: LogLevel.INFO,
})
registerHandlers(app, botToken, () => { messagesProcessed++ })
await app.start()
startTime = Date.now()
lastError = null
console.log('[Slack Bot] 已启动,等待消息...')
BotLogger.log({
platform: 'slack',
eventType: 'connection',
severity: 'info',
message: 'Slack Bot 已启动 (Socket Mode)',
})
} catch (err) {
const error = err instanceof Error ? err : new Error(String(err))
lastError = error
app = null
currentBotToken = ''
currentAppToken = ''
console.error('[Slack Bot] 启动失败:', error.message)
BotLogger.log({
platform: 'slack',
eventType: 'error',
severity: 'error',
message: `Slack Bot 启动失败: ${error.message}`,
details: { hint: '请检查 Bot Token 和 App Token 是否正确,以及是否已启用 Socket Mode' },
})
throw error
}
}
export async function stopBot(): Promise<void> {
closeConnectionsForPlatform('slack')
if (app) {
await app.stop()
app = null
currentBotToken = ''
currentAppToken = ''
startTime = 0
console.log('[Slack Bot] 已停止')
BotLogger.log({
platform: 'slack',
eventType: 'disconnection',
severity: 'info',
message: 'Slack Bot 已停止',
})
}
}
export function getStatus(): BotStatus {
return {
platform: 'slack',
status: app ? 'connected' : 'disconnected',
uptime: app && startTime ? Math.floor((Date.now() - startTime) / 1000) : undefined,
messagesProcessed,
error: lastError?.message,
}
}
// Listen for config changes
const configManager = ConfigManager.getInstance()
configManager.on('config-changed', async (event: ConfigChangeEvent) => {
const { newConfig } = event
const platformConfig = newConfig.slack
try {
if (platformConfig.enabled && !app) {
await startBot(platformConfig.botToken, platformConfig.appToken)
} else if (!platformConfig.enabled && app) {
await stopBot()
} else if (platformConfig.enabled && app) {
if (platformConfig.botToken !== currentBotToken || platformConfig.appToken !== currentAppToken) {
await stopBot()
await startBot(platformConfig.botToken, platformConfig.appToken)
}
}
lastError = null
} catch (error) {
lastError = error instanceof Error ? error : new Error(String(error))
BotLogger.log({
platform: 'slack',
eventType: 'error',
severity: 'error',
message: `配置变更处理失败: ${lastError.message}`,
})
}
})

View File

@@ -0,0 +1,47 @@
import fs from 'node:fs'
import path from 'node:path'
interface UserMapping {
conversationId: string
name: string
createdAt: string
}
type UserStore = Record<string, UserMapping>
const STORE_PATH = path.join(process.cwd(), 'remote-control', 'data', 'slack-users.json')
function readStore(): UserStore {
try {
if (!fs.existsSync(STORE_PATH)) {
return {}
}
const data = fs.readFileSync(STORE_PATH, 'utf-8')
return JSON.parse(data)
} catch {
return {}
}
}
function writeStore(store: UserStore): void {
const dir = path.dirname(STORE_PATH)
if (!fs.existsSync(dir)) {
fs.mkdirSync(dir, { recursive: true })
}
fs.writeFileSync(STORE_PATH, JSON.stringify(store, null, 2), 'utf-8')
}
export function getConversationId(slackUserId: string): string | null {
const store = readStore()
return store[slackUserId]?.conversationId ?? null
}
export function setConversationId(slackUserId: string, conversationId: string, name: string): void {
const store = readStore()
store[slackUserId] = {
conversationId,
name,
createdAt: new Date().toISOString(),
}
writeStore(store)
}

View File

@@ -0,0 +1,251 @@
import type { Bot, Context } from 'grammy'
import { InputFile } from 'grammy'
import { HTTPClient } from '@/http'
import { getDefaultAgentId } from '@/app/api/nova-config'
import { getConversationId, setConversationId } from './user-store'
import { sendMessageAndWaitResponse } from '@/remote-control/shared/nova-bridge'
import { uploadBotAttachments, type BotAttachment } from '@/remote-control/shared/file-uploader'
import { BotLogger } from '@/remote-control/shared/logger'
const oapiClient = new HTTPClient({
baseURL: process.env.NOVA_BASE_URL,
headers: {
'Content-Type': 'application/json',
'Tenant-Id': process.env.NOVA_TENANT_ID!,
'Authorization': process.env.NOVA_ACCESS_KEY!,
},
})
const userQueues = new Map<string, Promise<void>>()
function splitMessage(text: string, maxLength = 4096): string[] {
if (text.length <= maxLength) return [text]
const parts: string[] = []
let remaining = text
while (remaining.length > 0) {
if (remaining.length <= maxLength) {
parts.push(remaining)
break
}
let splitIndex = remaining.lastIndexOf('\n', maxLength)
if (splitIndex <= 0) {
splitIndex = maxLength
}
parts.push(remaining.slice(0, splitIndex))
remaining = remaining.slice(splitIndex).trimStart()
}
return parts
}
async function getOrCreateConversation(telegramUserId: string, name: string): Promise<string> {
const existing = getConversationId(telegramUserId)
if (existing) return existing
const agentId = getDefaultAgentId()
const res = await oapiClient.post<{ conversation_id: string }>(
'/v1/oapi/super_agent/chat/create_conversation',
{
agent_id: agentId,
title: `Telegram User ${name || telegramUserId}`,
},
)
const conversationId = res.conversation_id
setConversationId(telegramUserId, conversationId, name)
return conversationId
}
async function processMessage(
ctx: Context,
botToken: string,
onMessage?: () => void,
): Promise<void> {
const message = ctx.message
if (!message) return
const userId = String(message.from?.id ?? '')
const userName = message.from?.username || message.from?.first_name || userId
// Extract text
const text = (message.text || message.caption || '').trim()
// Extract attachments
const attachments: BotAttachment[] = []
// Handle photos - take the last one (largest size)
if (message.photo && message.photo.length > 0) {
const largestPhoto = message.photo[message.photo.length - 1]
try {
const file = await ctx.api.getFile(largestPhoto.file_id)
if (file.file_path) {
const downloadUrl = `https://api.telegram.org/file/bot${botToken}/${file.file_path}`
attachments.push({
url: downloadUrl,
fileName: `telegram_photo_${Date.now()}.jpg`,
mimeType: 'image/jpeg',
size: largestPhoto.file_size,
})
}
} catch (err) {
console.error('[Telegram Bot] 获取图片文件失败:', err)
}
}
// Handle documents
if (message.document) {
try {
const file = await ctx.api.getFile(message.document.file_id)
if (file.file_path) {
const downloadUrl = `https://api.telegram.org/file/bot${botToken}/${file.file_path}`
attachments.push({
url: downloadUrl,
fileName: message.document.file_name || `telegram_file_${Date.now()}`,
mimeType: message.document.mime_type ?? undefined,
size: message.document.file_size,
})
}
} catch (err) {
console.error('[Telegram Bot] 获取文档文件失败:', err)
}
}
if (!text && attachments.length === 0) return
BotLogger.log({
platform: 'telegram',
eventType: 'message_received',
severity: 'info',
message: `收到来自用户 ${userName} 的消息 (${attachments.length} 附件)`,
details: { userId, chatId: String(message.chat.id) },
})
try {
// Send typing action
await ctx.api.sendChatAction(message.chat.id, 'typing').catch(() => {})
const typingInterval = setInterval(() => {
ctx.api.sendChatAction(message.chat.id, 'typing').catch(() => {})
}, 5000)
try {
// Upload attachments if any
let uploadFileIds: string[] | undefined
if (attachments.length > 0) {
uploadFileIds = await uploadBotAttachments(attachments)
if (uploadFileIds.length === 0) uploadFileIds = undefined
}
const conversationId = await getOrCreateConversation(userId, userName)
const response = await sendMessageAndWaitResponse(
`telegram:${userId}`,
conversationId,
text || '',
uploadFileIds,
)
if (!response) {
await ctx.reply('没有收到回复,请稍后重试。')
return
}
// Check for Markdown image syntax in response
const imageRegex = /!\[([^\]]*)\]\(([^)]+)\)/g
const images: Array<{ alt: string; url: string }> = []
let match: RegExpExecArray | null
while ((match = imageRegex.exec(response)) !== null) {
images.push({ alt: match[1], url: match[2] })
}
if (images.length > 0) {
// Remove image markdown from text
const textWithoutImages = response.replace(imageRegex, '').trim()
// Send images with caption
for (let i = 0; i < images.length; i++) {
const image = images[i]
try {
if (i === 0 && textWithoutImages) {
// First image with caption (Telegram caption limit: 1024 chars)
if (textWithoutImages.length <= 1024) {
await ctx.replyWithPhoto(image.url, {
caption: textWithoutImages,
parse_mode: 'HTML',
})
} else {
// Caption too long - send photo first, then text separately
await ctx.replyWithPhoto(image.url)
const textParts = splitMessage(textWithoutImages)
for (const part of textParts) {
await ctx.reply(part)
}
}
} else {
await ctx.replyWithPhoto(image.url)
}
} catch {
// If photo sending fails, send as link
await ctx.reply(`${image.alt || '图片'}: ${image.url}`)
}
}
// If no caption was sent with the first image and there's text
if (images.length > 0 && textWithoutImages && textWithoutImages.length > 1024) {
// Already handled above
}
} else {
// Plain text response
const parts = splitMessage(response)
for (const part of parts) {
await ctx.reply(part)
}
}
if (onMessage) onMessage()
BotLogger.log({
platform: 'telegram',
eventType: 'message_sent',
severity: 'info',
message: `已回复用户 ${userName}`,
})
} finally {
clearInterval(typingInterval)
}
} catch (error) {
const errMsg = error instanceof Error ? error.message : '未知错误'
console.error('[Telegram Bot] 处理消息失败:', errMsg)
await ctx.reply(`处理消息时出错: ${errMsg}`).catch(() => {})
BotLogger.log({
platform: 'telegram',
eventType: 'error',
severity: 'error',
message: `处理消息失败: ${errMsg}`,
details: { userId },
})
}
}
export function registerHandlers(bot: Bot, botToken: string, onMessage?: () => void): void {
const handleMessage = (ctx: Context) => {
const userId = String(ctx.message?.from?.id ?? '')
if (!userId) return
// Filter bot's own messages
if (ctx.message?.from?.is_bot) return
const currentQueue = userQueues.get(userId) ?? Promise.resolve()
const newQueue = currentQueue.then(() => processMessage(ctx, botToken, onMessage))
userQueues.set(userId, newQueue)
newQueue.finally(() => {
if (userQueues.get(userId) === newQueue) {
userQueues.delete(userId)
}
})
}
bot.on('message:text', handleMessage)
bot.on('message:photo', handleMessage)
bot.on('message:document', handleMessage)
}

View File

@@ -0,0 +1,185 @@
import { Bot } from 'grammy'
import { registerHandlers } from './handlers'
import { closeConnectionsForPlatform } from '@/remote-control/shared/nova-bridge'
import { BotLogger } from '@/remote-control/shared/logger'
import { ConfigManager, type ConfigChangeEvent } from '@/remote-control/config/manager'
const CONNECTION_TIMEOUT = 30_000 // 30s to establish first connection
let bot: Bot | null = null
let currentToken: string = ''
let isConnected: boolean = false
let connectTimer: ReturnType<typeof setTimeout> | null = null
let startTime: number = 0
let messagesProcessed: number = 0
let lastError: Error | null = null
export interface BotStatus {
platform: string
status: 'connected' | 'disconnected' | 'connecting'
uptime?: number
messagesProcessed: number
error?: string
}
export async function startBot(token: string): Promise<void> {
if (!token) {
const msg = 'Telegram Bot Token 为空,跳过启动'
console.warn(`[Telegram Bot] ${msg}`)
BotLogger.log({ platform: 'telegram', eventType: 'error', severity: 'warning', message: msg })
return
}
// Idempotent check
if (bot && currentToken === token) {
console.log('[Telegram Bot] 已有活跃连接,跳过重复启动')
return
}
// If token changed, stop old bot first
if (bot) {
console.log('[Telegram Bot] Token 已变更,先停止旧连接')
await stopBot()
}
currentToken = token
try {
bot = new Bot(token)
bot.catch((err) => {
lastError = err.error instanceof Error ? err.error : new Error(String(err.error))
console.error('[Telegram Bot] 运行时错误:', lastError.message)
BotLogger.log({
platform: 'telegram',
eventType: 'error',
severity: 'error',
message: `Telegram Bot 运行时错误: ${lastError.message}`,
})
})
registerHandlers(bot, token, () => { messagesProcessed++ })
// Connection timeout — if onStart is not called within CONNECTION_TIMEOUT,
// stop the bot and mark as disconnected
connectTimer = setTimeout(() => {
if (!isConnected && bot) {
const msg = '连接超时,请检查 Token 是否正确以及网络是否可达'
lastError = new Error(msg)
console.error(`[Telegram Bot] ${msg}`)
BotLogger.log({ platform: 'telegram', eventType: 'error', severity: 'error', message: msg })
void stopBot()
}
}, CONNECTION_TIMEOUT)
// Start long polling (non-blocking).
// bot.start() returns a promise that resolves when bot.stop() is called,
// so we must NOT await it. Connection errors surface via bot.catch().
bot.start({
onStart: (botInfo) => {
if (connectTimer) { clearTimeout(connectTimer); connectTimer = null }
isConnected = true
startTime = Date.now()
lastError = null
console.log(`[Telegram Bot] 已启动: @${botInfo.username}`)
BotLogger.log({
platform: 'telegram',
eventType: 'connection',
severity: 'info',
message: `Telegram Bot 已连接,用户名: @${botInfo.username}`,
})
},
}).catch((err) => {
// Long polling loop ended with error (e.g. invalid token, network issue)
if (connectTimer) { clearTimeout(connectTimer); connectTimer = null }
const error = err instanceof Error ? err : new Error(String(err))
if (error.message === 'Aborted delay') return // Normal stop, ignore
lastError = error
isConnected = false
console.error('[Telegram Bot] 长轮询异常退出:', error.message)
BotLogger.log({
platform: 'telegram',
eventType: 'error',
severity: 'error',
message: `Telegram Bot 长轮询异常: ${error.message}`,
})
})
} catch (err) {
const error = err instanceof Error ? err : new Error(String(err))
lastError = error
bot = null
currentToken = ''
isConnected = false
console.error('[Telegram Bot] 启动失败:', error.message)
BotLogger.log({
platform: 'telegram',
eventType: 'error',
severity: 'error',
message: `Telegram Bot 启动失败: ${error.message}`,
details: { hint: '请检查 Bot Token 是否正确,以及是否已在 @BotFather 中创建了 Bot' },
})
throw error
}
}
export async function stopBot(): Promise<void> {
if (connectTimer) { clearTimeout(connectTimer); connectTimer = null }
closeConnectionsForPlatform('telegram')
if (bot) {
try {
await bot.stop()
} catch {
// "Aborted delay" is expected when stopping long polling — ignore
}
bot = null
currentToken = ''
isConnected = false
startTime = 0
console.log('[Telegram Bot] 已停止')
BotLogger.log({
platform: 'telegram',
eventType: 'disconnection',
severity: 'info',
message: 'Telegram Bot 已停止',
})
}
}
export function getStatus(): BotStatus {
return {
platform: 'telegram',
status: isConnected ? 'connected' : bot ? 'connecting' : 'disconnected',
uptime: isConnected && startTime ? Math.floor((Date.now() - startTime) / 1000) : undefined,
messagesProcessed,
error: lastError?.message,
}
}
// Listen for config changes
const configManager = ConfigManager.getInstance()
configManager.on('config-changed', async (event: ConfigChangeEvent) => {
const { newConfig } = event
const platformConfig = newConfig.telegram
try {
if (platformConfig.enabled && !bot) {
await startBot(platformConfig.botToken)
} else if (!platformConfig.enabled && bot) {
await stopBot()
} else if (platformConfig.enabled && bot) {
if (platformConfig.botToken !== currentToken) {
await stopBot()
await startBot(platformConfig.botToken)
}
}
lastError = null
} catch (error) {
lastError = error instanceof Error ? error : new Error(String(error))
BotLogger.log({
platform: 'telegram',
eventType: 'error',
severity: 'error',
message: `配置变更处理失败: ${lastError.message}`,
})
}
})

View File

@@ -0,0 +1,47 @@
import fs from 'node:fs'
import path from 'node:path'
interface UserMapping {
conversationId: string
name: string
createdAt: string
}
type UserStore = Record<string, UserMapping>
const STORE_PATH = path.join(process.cwd(), 'remote-control', 'data', 'telegram-users.json')
function readStore(): UserStore {
try {
if (!fs.existsSync(STORE_PATH)) {
return {}
}
const data = fs.readFileSync(STORE_PATH, 'utf-8')
return JSON.parse(data)
} catch {
return {}
}
}
function writeStore(store: UserStore): void {
const dir = path.dirname(STORE_PATH)
if (!fs.existsSync(dir)) {
fs.mkdirSync(dir, { recursive: true })
}
fs.writeFileSync(STORE_PATH, JSON.stringify(store, null, 2), 'utf-8')
}
export function getConversationId(telegramUserId: string): string | null {
const store = readStore()
return store[telegramUserId]?.conversationId ?? null
}
export function setConversationId(telegramUserId: string, conversationId: string, name: string): void {
const store = readStore()
store[telegramUserId] = {
conversationId,
name,
createdAt: new Date().toISOString(),
}
writeStore(store)
}