笔记8:后端服务开发
一、引言
后端服务是AI代理系统的技术基础,负责处理业务逻辑、状态管理和外部集成。本笔记将探讨API设计与服务架构理论,分析OpenHands的后端设计特点,并通过实践构建一个模拟OpenHands核心功能的后端服务模块。
二、API设计与服务架构理论
2.1 API设计原则
- RESTful设计: 资源化URL设计、HTTP方法语义
- GraphQL: 声明式数据查询、减少请求次数
- API版本控制: 路径版本、Header版本、参数版本
- 状态码使用: 合理使用HTTP状态码表达结果
- 幂等性: 相同请求多次执行结果一致
- 安全性: 身份验证、授权和输入验证
2.2 服务架构模式
单体架构:
- 所有功能集中在一个服务中
- 简单部署,适合小型应用
微服务架构:
- 服务按业务能力分解
- 独立部署和扩展
- 技术栈多样化
无服务器架构:
- 事件驱动
- 自动扩展
- 按使用付费
分层架构:
- 表示层、业务逻辑层、数据访问层
- 关注点分离
- 便于维护
2.3 后端技术栈选择
技术类别 | 常见选择 | 特点 |
---|---|---|
语言 | Node.js, Python, Go | 各有性能和生态系统优势 |
Web框架 | Express, FastAPI, Gin | 提供路由、中间件等核心功能 |
数据库 | PostgreSQL, MongoDB, Redis | 关系型vs文档型vs键值型 |
消息队列 | RabbitMQ, Kafka | 异步任务处理,系统解耦 |
身份验证 | JWT, OAuth2, API密钥 | 不同场景下的认证需求 |
2.4 后端架构关键技术
- 依赖注入: 组件解耦,便于测试
- 中间件: 请求处理管道,横切关注点
- ORM/ODM: 对象关系映射,简化数据库操作
- 缓存策略: 提高性能,减轻数据库负担
- 日志与监控: 系统运行状态可观测性
- 异常处理: 优雅处理错误,提高系统稳定性
三、OpenHands后端服务分析
从README_CN.md中,可以推断OpenHands后端服务具有以下特点:
3.1 OpenHands后端架构特点
基于Docker容器:
- 使用容器化部署简化环境一致性
- 主容器和沙箱容器分离架构
状态持久化机制:
- 对话历史保存
- 配置存储
- 工作区状态维护
多模式支持:
- Web界面模式
- 无头模式(Headless)
- CLI模式
- GitHub Action集成
LLM集成:
- 支持多种LLM提供商
- API密钥管理
- 模型调用优化
3.2 OpenHands API端点推测
基于功能需求,OpenHands可能包含以下API端点:
/api/chat
: 处理用户与代理的对话/api/execute
: 执行命令/api/files
: 文件操作接口/api/config
: 配置管理/api/tools
: 工具注册与调用/api/models
: LLM模型管理/api/sessions
: 会话状态管理/api/projects
: 项目管理(对于GitHub Action集成)
3.3 数据流分析
在OpenHands中,后端服务的主要数据流可能如下:
- 用户输入 → 前端界面 → 后端API → LLM解析
- LLM决策 → 工具调用决定 → 工具执行
- 工具执行结果 → LLM分析 → 响应生成 → 前端展示
- 状态变更 → 持久化存储 → 会话恢复
四、实践项目:实现OpenHands核心后端服务
4.1 项目结构设计
src/
├── config/ # 配置文件
│ ├── config.js # 主配置
│ └── logger.js # 日志配置
├── controllers/ # 控制器
│ ├── chatController.js # 对话控制器
│ ├── toolController.js # 工具控制器
│ └── sessionController.js # 会话控制器
├── middlewares/ # 中间件
│ ├── auth.js # 认证中间件
│ ├── errorHandler.js # 错误处理
│ └── requestLogger.js # 请求日志
├── models/ # 数据模型
│ ├── session.js # 会话模型
│ └── toolExecution.js # 工具执行记录模型
├── routes/ # 路由定义
│ ├── chatRoutes.js # 对话路由
│ ├── toolRoutes.js # 工具路由
│ └── configRoutes.js # 配置路由
├── services/ # 业务服务
│ ├── agentService.js # 代理核心服务
│ ├── llmService.js # LLM集成服务
│ ├── toolService.js # 工具管理服务
│ └── dockerService.js # Docker集成服务
├── utils/ # 工具函数
│ ├── promiseUtils.js # Promise工具
│ ├── validationUtils.js # 验证工具
│ └── fsUtils.js # 文件系统工具
└── app.js # 应用入口
4.2 实现核心服务层
agentService.js - 代理核心服务:
// src/services/agentService.js
const EventEmitter = require('events');
const { v4: uuidv4 } = require('uuid');
const logger = require('../config/logger');
const llmService = require('./llmService');
const toolService = require('./toolService');
const dockerService = require('./dockerService');
const { SessionManager } = require('./sessionManager');
class AgentService extends EventEmitter {
constructor() {
super();
this.sessions = new SessionManager();
this.activeAgents = new Map();
this.defaultSystemPrompt = `你是OpenHands AI开发助手,一个专业的软件开发代理。你能够编写代码、运行命令、分析错误并提供解决方案。`;
}
/**
* 初始化代理服务
*/
async initialize() {
try {
// 初始化LLM服务
await llmService.initialize();
// 初始化工具服务
await toolService.initialize();
// 初始化Docker服务
await dockerService.initialize();
// 加载持久化的会话
await this.sessions.loadSessions();
logger.info('Agent service initialized successfully');
return true;
} catch (error) {
logger.error('Failed to initialize agent service:', error);
throw error;
}
}
/**
* 创建新的代理会话
* @param {Object} options - 会话选项
* @returns {String} - 会话ID
*/
async createSession(options = {}) {
const sessionId = options.sessionId || uuidv4();
// 创建新会话
const session = this.sessions.createSession(sessionId, {
createdAt: new Date(),
lastActive: new Date(),
options: {
llmProvider: options.llmProvider || llmService.getDefaultProvider(),
systemPrompt: options.systemPrompt || this.defaultSystemPrompt,
...options
},
history: [],
state: 'idle'
});
// 如果配置了沙箱环境,初始化沙箱
if (options.useSandbox !== false) {
try {
const sandboxId = await dockerService.createSandbox(sessionId);
session.sandboxId = sandboxId;
logger.info(`Created sandbox ${sandboxId} for session ${sessionId}`);
} catch (error) {
logger.error(`Failed to create sandbox for session ${sessionId}:`, error);
// 继续,即使没有沙箱也允许会话创建
}
}
logger.info(`Created new agent session: ${sessionId}`);
this.emit('sessionCreated', sessionId);
return sessionId;
}
/**
* 处理用户输入
* @param {String} sessionId - 会话ID
* @param {String} input - 用户输入
* @param {Object} options - 处理选项
* @returns {Promise<Object>} - 处理结果
*/
async processInput(sessionId, input, options = {}) {
// 验证会话存在
if (!this.sessions.hasSession(sessionId)) {
throw new Error(`Session not found: ${sessionId}`);
}
const session = this.sessions.getSession(sessionId);
session.lastActive = new Date();
// 更新会话状态
this.sessions.updateSessionState(sessionId, 'thinking');
this.emit('stateChanged', { sessionId, state: 'thinking' });
try {
// 添加用户消息到历史
this.sessions.addMessageToHistory(sessionId, {
role: 'user',
content: input,
timestamp: new Date()
});
// 准备上下文
const context = this._prepareContext(session);
// 调用LLM
logger.info(`Processing input for session ${sessionId}: "${input.substring(0, 50)}..."`);
const llmResponse = await llmService.completeChat(
session.options.llmProvider,
input,
context,
session.options
);
// 处理工具调用
if (llmResponse.toolCalls && llmResponse.toolCalls.length > 0) {
return await this._handleToolCalls(sessionId, llmResponse);
}
// 处理标准响应
const response = {
type: 'text',
content: llmResponse.content,
sessionId
};
// 添加助手响应到历史
this.sessions.addMessageToHistory(sessionId, {
role: 'assistant',
content: llmResponse.content,
timestamp: new Date()
});
// 更新会话状态
this.sessions.updateSessionState(sessionId, 'idle');
this.emit('stateChanged', { sessionId, state: 'idle' });
return response;
} catch (error) {
// 处理错误
logger.error(`Error processing input for session ${sessionId}:`, error);
// 更新会话状态
this.sessions.updateSessionState(sessionId, 'error');
this.emit('stateChanged', { sessionId, state: 'error' });
throw error;
}
}
/**
* 处理工具调用
* @private
* @param {String} sessionId - 会话ID
* @param {Object} llmResponse - LLM响应
* @returns {Promise<Object>} - 处理结果
*/
async _handleToolCalls(sessionId, llmResponse) {
const session = this.sessions.getSession(sessionId);
const toolCalls = llmResponse.toolCalls;
const toolResults = [];
// 更新会话状态
this.sessions.updateSessionState(sessionId, 'executing');
this.emit('stateChanged', { sessionId, state: 'executing' });
// 添加思考消息到历史
const assistantMessage = {
role: 'assistant',
content: llmResponse.content || '我将执行一些操作...',
timestamp: new Date(),
toolCalls: toolCalls.map(call => ({
tool: call.name,
args: call.arguments,
status: 'pending'
}))
};
this.sessions.addMessageToHistory(sessionId, assistantMessage);
// 执行工具调用
for (let i = 0; i < toolCalls.length; i++) {
const call = toolCalls[i];
const toolCallIndex = i;
try {
logger.info(`Executing tool ${call.name} for session ${sessionId}`);
// 更新工具状态为执行中
this._updateToolCallStatus(sessionId, assistantMessage, toolCallIndex, 'running');
// 执行工具
const result = await toolService.executeTool(
call.name,
call.arguments,
{ sessionId, sandboxId: session.sandboxId }
);
// 更新工具状态为成功
this._updateToolCallStatus(
sessionId,
assistantMessage,
toolCallIndex,
'success',
{ result }
);
toolResults.push({
tool: call.name,
status: 'success',
result
});
} catch (error) {
// 更新工具状态为失败
this._updateToolCallStatus(
sessionId,
assistantMessage,
toolCallIndex,
'error',
{ error: error.message }
);
toolResults.push({
tool: call.name,
status: 'error',
error: error.message
});
logger.error(`Error executing tool ${call.name}:`, error);
}
}
// 如果有工具执行失败,可以询问LLM如何处理
if (toolResults.some(result => result.status === 'error')) {
// 添加工具执行结果到历史
this.sessions.addMessageToHistory(sessionId, {
role: 'system',
content: `工具执行结果: ${JSON.stringify(toolResults)}`,
timestamp: new Date()
});
// 请求LLM处理错误
const errorInput = `有些工具执行失败了,请处理这些错误: ${JSON.stringify(toolResults)}`;
const context = this._prepareContext(session);
const errorHandlingResponse = await llmService.completeChat(
session.options.llmProvider,
errorInput,
context,
session.options
);
// 添加错误处理响应到历史
this.sessions.addMessageToHistory(sessionId, {
role: 'assistant',
content: errorHandlingResponse.content,
timestamp: new Date()
});
}
// 返回结果
const response = {
type: 'tool_execution',
content: llmResponse.content,
toolResults,
sessionId
};
// 更新会话状态
this.sessions.updateSessionState(sessionId, 'idle');
this.emit('stateChanged', { sessionId, state: 'idle' });
return response;
}
/**
* 更新工具调用状态
* @private
* @param {String} sessionId - 会话ID
* @param {Object} message - 消息对象
* @param {Number} index - 工具调用索引
* @param {String} status - 新状态
* @param {Object} data - 附加数据
*/
_updateToolCallStatus(sessionId, message, index, status, data = {}) {
// 获取历史中最后一条助手消息
const history = this.sessions.getSessionHistory(sessionId);
const messageIndex = history.findIndex(msg =>
msg.role === 'assistant' &&
msg.toolCalls &&
msg.timestamp.getTime() === message.timestamp.getTime()
);
if (messageIndex !== -1 && history[messageIndex].toolCalls[index]) {
history[messageIndex].toolCalls[index] = {
...history[messageIndex].toolCalls[index],
status,
...data
};
// 触发工具状态更新事件
this.emit('toolStatusChanged', {
sessionId,
messageIndex,
toolCallIndex: index,
status,
data
});
}
}
/**
* 准备LLM上下文
* @private
* @param {Object} session - 会话对象
* @returns {Object} - 上下文对象
*/
_prepareContext(session) {
return {
systemPrompt: session.options.systemPrompt,
history: session.history,
availableTools: toolService.getAvailableTools()
};
}
/**
* 执行命令
* @param {String} sessionId - 会话ID
* @param {String} command - 命令字符串
* @returns {Promise<Object>} - 执行结果
*/
async executeCommand(sessionId, command) {
// 验证会话存在
if (!this.sessions.hasSession(sessionId)) {
throw new Error(`Session not found: ${sessionId}`);
}
const session = this.sessions.getSession(sessionId);
// 更新会话状态
this.sessions.updateSessionState(sessionId, 'executing');
this.emit('stateChanged', { sessionId, state: 'executing' });
try {
// 添加命令执行记录
const commandMessage = {
role: 'user',
content: `执行命令: ${command}`,
timestamp: new Date()
};
this.sessions.addMessageToHistory(sessionId, commandMessage);
// 执行命令
const result = await toolService.executeTool(
'execute_command',
{ command },
{ sessionId, sandboxId: session.sandboxId }
);
// 添加执行结果到历史
const resultMessage = {
role: 'system',
content: `命令执行结果:\n${result.stdout || ''}\n${result.stderr || ''}`,
timestamp: new Date(),
command,
result
};
this.sessions.addMessageToHistory(sessionId, resultMessage);
// 更新会话状态
this.sessions.updateSessionState(sessionId, 'idle');
this.emit('stateChanged', { sessionId, state: 'idle' });
return {
type: 'command_execution',
command,
result,
sessionId
};
} catch (error) {
// 处理错误
logger.error(`Error executing command for session ${sessionId}:`, error);
// 添加错误到历史
this.sessions.addMessageToHistory(sessionId, {
role: 'system',
content: `命令执行错误: ${error.message}`,
timestamp: new Date(),
command,
error: error.message
});
// 更新会话状态
this.sessions.updateSessionState(sessionId, 'error');
this.emit('stateChanged', { sessionId, state: 'error' });
throw error;
}
}
/**
* 获取会话历史
* @param {String} sessionId - 会话ID
* @returns {Array} - 会话历史消息
*/
getSessionHistory(sessionId) {
return this.sessions.getSessionHistory(sessionId);
}
/**
* 获取会话状态
* @param {String} sessionId - 会话ID
* @returns {Object} - 会话状态
*/
getSessionState(sessionId) {
if (!this.sessions.hasSession(sessionId)) {
throw new Error(`Session not found: ${sessionId}`);
}
const session = this.sessions.getSession(sessionId);
return {
id: sessionId,
state: session.state,
createdAt: session.createdAt,
lastActive: session.lastActive,
historyLength: session.history.length,
sandboxId: session.sandboxId
};
}
/**
* 列出所有会话
* @returns {Array} - 会话列表
*/
listSessions() {
return this.sessions.listSessions().map(session => ({
id: session.id,
state: session.state,
createdAt: session.createdAt,
lastActive: session.lastActive,
historyLength: session.history.length
}));
}
/**
* 删除会话
* @param {String} sessionId - 会话ID
* @returns {Boolean} - 是否成功删除
*/
async deleteSession(sessionId) {
if (!this.sessions.hasSession(sessionId)) {
return false;
}
const session = this.sessions.getSession(sessionId);
// 如果有沙箱,删除沙箱
if (session.sandboxId) {
try {
await dockerService.removeSandbox(session.sandboxId);
logger.info(`Removed sandbox ${session.sandboxId} for session ${sessionId}`);
} catch (error) {
logger.error(`Error removing sandbox for session ${sessionId}:`, error);
}
}
// 删除会话
const result = this.sessions.deleteSession(sessionId);
if (result) {
this.emit('sessionDeleted', sessionId);
logger.info(`Deleted session: ${sessionId}`);
}
return result;
}
/**
* 关闭服务
*/
async shutdown() {
try {
// 保存所有会话
await this.sessions.saveSessions();
// 关闭所有沙箱
const allSessions = this.sessions.listSessions();
for (const session of allSessions) {
if (session.sandboxId) {
try {
await dockerService.removeSandbox(session.sandboxId);
logger.info(`Removed sandbox ${session.sandboxId} during shutdown`);
} catch (error) {
logger.error(`Error removing sandbox ${session.sandboxId} during shutdown:`, error);
}
}
}
// 关闭其他服务
await llmService.shutdown();
await toolService.shutdown();
await dockerService.shutdown();
logger.info('Agent service shut down successfully');
} catch (error) {
logger.error('Error during agent service shutdown:', error);
throw error;
}
}
}
// 创建单例实例
const agentService = new AgentService();
module.exports = agentService;
sessionManager.js - 会话管理服务:
// src/services/sessionManager.js
const fs = require('fs').promises;
const path = require('path');
const logger = require('../config/logger');
const { ensureDirectoryExists } = require('../utils/fsUtils');
class SessionManager {
constructor(options = {}) {
this.sessionsDir = options.sessionsDir || path.join(process.cwd(), '.openhands', 'sessions');
this.sessions = new Map();
this.maxHistory = options.maxHistory || 200;
this.saveInterval = options.saveInterval || 60000; // 1分钟
this.saveTimer = null;
}
/**
* 初始化会话管理器
*/
async initialize() {
// 确保会话目录存在
await ensureDirectoryExists(this.sessionsDir);
// 加载会话
await this.loadSessions();
// 设置定期保存
this.saveTimer = setInterval(() => {
this.saveSessions().catch(err => {
logger.error('Error auto-saving sessions:', err);
});
}, this.saveInterval);
}
/**
* 加载所有会话
*/
async loadSessions() {
try {
await ensureDirectoryExists(this.sessionsDir);
const files = await fs.readdir(this.sessionsDir);
const sessionFiles = files.filter(file => file.endsWith('.json'));
for (const file of sessionFiles) {
try {
const sessionId = path.basename(file, '.json');
const sessionData = await fs.readFile(path.join(this.sessionsDir, file), 'utf8');
const parsedSession = JSON.parse(sessionData);
// 确保日期字段正确解析
if (parsedSession.createdAt) {
parsedSession.createdAt = new Date(parsedSession.createdAt);
}
if (parsedSession.lastActive) {
parsedSession.lastActive = new Date(parsedSession.lastActive);
}
// 处理历史记录中的日期
if (parsedSession.history && Array.isArray(parsedSession.history)) {
parsedSession.history = parsedSession.history.map(msg => ({
...msg,
timestamp: msg.timestamp ? new Date(msg.timestamp) : new Date()
}));
}
this.sessions.set(sessionId, parsedSession);
logger.debug(`Loaded session: ${sessionId}`);
} catch (error) {
logger.error(`Error loading session from ${file}:`, error);
}
}
logger.info(`Loaded ${this.sessions.size} sessions`);
} catch (error) {
logger.error('Error loading sessions:', error);
throw error;
}
}
/**
* 保存所有会话
*/
async saveSessions() {
try {
await ensureDirectoryExists(this.sessionsDir);
const savePromises = [];
for (const [sessionId, session] of this.sessions.entries()) {
const sessionFile = path.join(this.sessionsDir, `${sessionId}.json`);
const sessionJson = JSON.stringify(session, null, 2);
savePromises.push(fs.writeFile(sessionFile, sessionJson, 'utf8'));
}
await Promise.all(savePromises);
logger.debug(`Saved ${this.sessions.size} sessions`);
return true;
} catch (error) {
logger.error('Error saving sessions:', error);
throw error;
}
}
/**
* 创建新会话
* @param {String} sessionId - 会话ID
* @param {Object} sessionData - 会话数据
* @returns {Object} - 创建的会话
*/
createSession(sessionId, sessionData = {}) {
const session = {
id: sessionId,
createdAt: new Date(),
lastActive: new Date(),
state: 'idle',
history: [],
...sessionData
};
this.sessions.set(sessionId, session);
return session;
}
/**
* 获取会话
* @param {String} sessionId - 会话ID
* @returns {Object|null} - 会话对象或null
*/
getSession(sessionId) {
return this.sessions.get(sessionId) || null;
}
/**
* 检查会话是否存在
* @param {String} sessionId - 会话ID
* @returns {Boolean} - 会话是否存在
*/
hasSession(sessionId) {
return this.sessions.has(sessionId);
}
/**
* 更新会话状态
* @param {String} sessionId - 会话ID
* @param {String} state - 新状态
* @returns {Boolean} - 是否成功更新
*/
updateSessionState(sessionId, state) {
const session = this.sessions.get(sessionId);
if (!session) return false;
session.state = state;
session.lastActive = new Date();
return true;
}
/**
* 添加消息到会话历史
* @param {String} sessionId - 会话ID
* @param {Object} message - 消息对象
* @returns {Boolean} - 是否成功添加
*/
addMessageToHistory(sessionId, message) {
const session = this.sessions.get(sessionId);
if (!session) return false;
// 确保timestamp是Date对象
if (!message.timestamp) {
message.timestamp = new Date();
}
// 添加消息到历史
session.history.push(message);
// 限制历史长度
if (session.history.length > this.maxHistory) {
session.history = session.history.slice(session.history.length - this.maxHistory);
}
session.lastActive = new Date();
return true;
}
/**
* 获取会话历史
* @param {String} sessionId - 会话ID
* @returns {Array} - 会话历史
*/
getSessionHistory(sessionId) {
const session = this.sessions.get(sessionId);
return session ? session.history : [];
}
/**
* 列出所有会话
* @returns {Array} - 会话列表
*/
listSessions() {
return Array.from(this.sessions.values());
}
/**
* 删除会话
* @param {String} sessionId - 会话ID
* @returns {Boolean} - 是否成功删除
*/
async deleteSession(sessionId) {
if (!this.sessions.has(sessionId)) return false;
// 从内存中删除
this.sessions.delete(sessionId);
// 从磁盘中删除
try {
const sessionFile = path.join(this.sessionsDir, `${sessionId}.json`);
await fs.unlink(sessionFile);
} catch (error) {
logger.error(`Error deleting session file for ${sessionId}:`, error);
// 即使文件删除失败,我们仍认为会话删除成功
}
return true;
}
/**
* 清理过期会话
* @param {Number} maxAge - 最大会话年龄(毫秒)
* @returns {Number} - 清理的会话数量
*/
async cleanupSessions(maxAge = 7 * 24 * 60 * 60 * 1000) { // 默认7天
const now = new Date();
let cleanedCount = 0;
for (const [sessionId, session] of this.sessions.entries()) {
const age = now - session.lastActive;
if (age > maxAge) {
await this.deleteSession(sessionId);
cleanedCount++;
}
}
return cleanedCount;
}
/**
* 关闭会话管理器
*/
async shutdown() {
// 取消自动保存
if (this.saveTimer) {
clearInterval(this.saveTimer);
this.saveTimer = null;
}
// 保存所有会话
await this.saveSessions();
}
}
module.exports = { SessionManager };
dockerService.js - Docker集成服务:
// src/services/dockerService.js
const { execFile } = require('child_process');
const { promisify } = require('util');
const logger = require('../config/logger');
const config = require('../config/config');
const execFileAsync = promisify(execFile);
class DockerService {
constructor() {
this.sandboxImage = config.sandbox.image || 'docker.all-hands.dev/all-hands-ai/runtime:0.47-nikolaik';
this.activeSandboxes = new Map();
}
/**
* 初始化Docker服务
*/
async initialize() {
try {
// 检查Docker是否可用
await this._checkDockerAvailability();
// 拉取沙箱镜像
await this._pullSandboxImage();
logger.info('Docker service initialized successfully');
return true;
} catch (error) {
logger.error('Failed to initialize Docker service:', error);
throw error;
}
}
/**
* 检查Docker是否可用
* @private
*/
async _checkDockerAvailability() {
try {
const { stdout } = await execFileAsync('docker', ['version', '--format', '{{.Server.Version}}']);
logger.info(`Docker available, version: ${stdout.trim()}`);
return true;
} catch (error) {
logger.error('Docker not available:', error);
throw new Error('Docker is not available. Please make sure Docker is installed and running.');
}
}
/**
* 拉取沙箱镜像
* @private
*/
async _pullSandboxImage() {
try {
logger.info(`Pulling sandbox image: ${this.sandboxImage}`);
const { stdout, stderr } = await execFileAsync('docker', ['pull', this.sandboxImage]);
if (stderr && !stderr.includes('up to date')) {
logger.warn(`Warning while pulling image: ${stderr}`);
}
logger.info(`Sandbox image pulled: ${this.sandboxImage}`);
return true;
} catch (error) {
logger.error(`Failed to pull sandbox image ${this.sandboxImage}:`, error);
throw error;
}
}
/**
* 创建沙箱容器
* @param {String} sessionId - 关联的会话ID
* @returns {String} - 沙箱容器ID
*/
async createSandbox(sessionId) {
try {
const containerName = `openhands-sandbox-${sessionId.substring(0, 8)}`;
// 创建工作目录
const workdirVolume = `openhands-workdir-${sessionId.substring(0, 8)}`;
// 创建数据卷
await execFileAsync('docker', ['volume', 'create', workdirVolume]);
// 启动容器
const args = [
'run',
'-d', // 后台运行
'--name', containerName,
// 资源限制
'--cpus', config.sandbox.cpus || '1',
'--memory', config.sandbox.memory || '1g',
'--pids-limit', config.sandbox.pidsLimit || '100',
// 安全限制
'--security-opt', 'no-new-privileges',
'--cap-drop', 'ALL',
'--cap-add', 'CHOWN',
'--cap-add', 'DAC_OVERRIDE',
'--cap-add', 'SETGID',
'--cap-add', 'SETUID',
// 挂载工作目录
'-v', `${workdirVolume}:/workspace`,
// 网络设置
'--network', config.sandbox.network || 'bridge',
// 环境变量
'-e', 'OPENHANDS_SANDBOX=true',
'-e', `SESSION_ID=${sessionId}`,
// 镜像
this.sandboxImage
];
// 执行docker run命令
const { stdout } = await execFileAsync('docker', args);
const containerId = stdout.trim();
// 记录沙箱信息
this.activeSandboxes.set(containerId, {
sessionId,
containerName,
workdirVolume,
createdAt: new Date()
});
logger.info(`Created sandbox container ${containerId} for session ${sessionId}`);
return containerId;
} catch (error) {
logger.error(`Failed to create sandbox for session ${sessionId}:`, error);
throw error;
}
}
/**
* 执行沙箱内命令
* @param {String} sandboxId - 沙箱容器ID
* @param {String} command - 要执行的命令
* @param {Object} options - 执行选项
* @returns {Object} - 执行结果
*/
async execInSandbox(sandboxId, command, options = {}) {
try {
if (!this.activeSandboxes.has(sandboxId)) {
throw new Error(`Sandbox ${sandboxId} not found`);
}
const { workingDir = '/workspace', timeout = 30000 } = options;
// 构建exec命令
const args = [
'exec',
// 设置工作目录
'-w', workingDir,
// 设置超时
'--timeout', Math.floor(timeout / 1000).toString(),
// 容器ID
sandboxId,
// 执行命令
'bash', '-c', command
];
logger.debug(`Executing command in sandbox ${sandboxId}: ${command}`);
// 执行命令
const { stdout, stderr } = await execFileAsync('docker', args);
return {
stdout,
stderr,
exitCode: 0
};
} catch (error) {
// 处理执行错误
logger.error(`Error executing command in sandbox ${sandboxId}:`, error);
return {
stdout: error.stdout || '',
stderr: error.stderr || error.message,
exitCode: error.code || 1
};
}
}
/**
* 复制文件到沙箱
* @param {String} sandboxId - 沙箱容器ID
* @param {String} localPath - 本地路径
* @param {String} containerPath - 容器内路径
* @returns {Boolean} - 是否成功
*/
async copyToSandbox(sandboxId, localPath, containerPath) {
try {
if (!this.activeSandboxes.has(sandboxId)) {
throw new Error(`Sandbox ${sandboxId} not found`);
}
// docker cp命令
await execFileAsync('docker', ['cp', localPath, `${sandboxId}:${containerPath}`]);
logger.debug(`Copied ${localPath} to sandbox ${sandboxId}:${containerPath}`);
return true;
} catch (error) {
logger.error(`Error copying to sandbox ${sandboxId}:`, error);
throw error;
}
}
/**
* 从沙箱复制文件
* @param {String} sandboxId - 沙箱容器ID
* @param {String} containerPath - 容器内路径
* @param {String} localPath - 本地路径
* @returns {Boolean} - 是否成功
*/
async copyFromSandbox(sandboxId, containerPath, localPath) {
try {
if (!this.activeSandboxes.has(sandboxId)) {
throw new Error(`Sandbox ${sandboxId} not found`);
}
// docker cp命令
await execFileAsync('docker', ['cp', `${sandboxId}:${containerPath}`, localPath]);
logger.debug(`Copied from sandbox ${sandboxId}:${containerPath} to ${localPath}`);
return true;
} catch (error) {
logger.error(`Error copying from sandbox ${sandboxId}:`, error);
throw error;
}
}
/**
* 删除沙箱
* @param {String} sandboxId - 沙箱容器ID
* @returns {Boolean} - 是否成功
*/
async removeSandbox(sandboxId) {
try {
if (!this.activeSandboxes.has(sandboxId)) {
logger.warn(`Sandbox ${sandboxId} not found, might have been removed already`);
return false;
}
const sandboxInfo = this.activeSandboxes.get(sandboxId);
// 停止并删除容器
try {
await execFileAsync('docker', ['stop', sandboxId]);
await execFileAsync('docker', ['rm', sandboxId]);
logger.debug(`Removed container ${sandboxId}`);
} catch (error) {
logger.error(`Error removing container ${sandboxId}:`, error);
}
// 删除卷
try {
await execFileAsync('docker', ['volume', 'rm', sandboxInfo.workdirVolume]);
logger.debug(`Removed volume ${sandboxInfo.workdirVolume}`);
} catch (error) {
logger.error(`Error removing volume ${sandboxInfo.workdirVolume}:`, error);
}
// 从记录中删除
this.activeSandboxes.delete(sandboxId);
logger.info(`Removed sandbox ${sandboxId}`);
return true;
} catch (error) {
logger.error(`Error removing sandbox ${sandboxId}:`, error);
throw error;
}
}
/**
* 关闭所有沙箱
*/
async shutdown() {
logger.info(`Shutting down Docker service, removing ${this.activeSandboxes.size} sandboxes...`);
const removePromises = [];
for (const [sandboxId] of this.activeSandboxes) {
removePromises.push(this.removeSandbox(sandboxId).catch(error => {
logger.error(`Error removing sandbox ${sandboxId} during shutdown:`, error);
}));
}
await Promise.all(removePromises);
logger.info('Docker service shutdown complete');
}
}
// 创建单例实例
const dockerService = new DockerService();
module.exports = dockerService;
llmService.js - LLM集成服务:
// src/services/llmService.js
const axios = require('axios');
const { v4: uuidv4 } = require('uuid');
const logger = require('../config/logger');
const config = require('../config/config');
class LLMService {
constructor() {
this.providers = new Map();
this.defaultProvider = 'anthropic';
}
/**
* 初始化LLM服务
*/
async initialize() {
try {
// 注册默认提供商
this.registerProvider('anthropic', {
name: 'Anthropic',
baseUrl: 'https://api.anthropic.com/v1',
defaultModel: 'claude-sonnet-4-20250514',
apiKeyName: 'ANTHROPIC_API_KEY',
apiKey: process.env.ANTHROPIC_API_KEY || config.llm?.anthropic?.apiKey || '',
requestHandler: this._anthropicRequestHandler.bind(this)
});
this.registerProvider('openai', {
name: 'OpenAI',
baseUrl: 'https://api.openai.com/v1',
defaultModel: 'gpt-4o',
apiKeyName: 'OPENAI_API_KEY',
apiKey: process.env.OPENAI_API_KEY || config.llm?.openai?.apiKey || '',
requestHandler: this._openaiRequestHandler.bind(this)
});
logger.info('LLM service initialized successfully');
return true;
} catch (error) {
logger.error('Failed to initialize LLM service:', error);
throw error;
}
}
/**
* 注册LLM提供商
* @param {String} id - 提供商ID
* @param {Object} config - 提供商配置
*/
registerProvider(id, config) {
this.providers.set(id, config);
logger.debug(`Registered LLM provider: ${id}`);
}
/**
* 获取默认提供商
* @returns {String} - 默认提供商ID
*/
getDefaultProvider() {
return this.defaultProvider;
}
/**
* 设置默认提供商
* @param {String} providerId - 提供商ID
*/
setDefaultProvider(providerId) {
if (!this.providers.has(providerId)) {
throw new Error(`LLM provider ${providerId} not registered`);
}
this.defaultProvider = providerId;
}
/**
* 设置提供商API密钥
* @param {String} providerId - 提供商ID
* @param {String} apiKey - API密钥
*/
setApiKey(providerId, apiKey) {
if (!this.providers.has(providerId)) {
throw new Error(`LLM provider ${providerId} not registered`);
}
const provider = this.providers.get(providerId);
provider.apiKey = apiKey;
}
/**
* 获取可用提供商列表
* @returns {Array} - 提供商列表
*/
getAvailableProviders() {
return Array.from(this.providers.entries()).map(([id, config]) => ({
id,
name: config.name,
defaultModel: config.defaultModel,
hasApiKey: !!config.apiKey
}));
}
/**
* 执行聊天完成
* @param {String} providerId - 提供商ID
* @param {String} input - 用户输入
* @param {Object} context - 上下文
* @param {Object} options - 选项
* @returns {Promise<Object>} - 响应对象
*/
async completeChat(providerId, input, context, options = {}) {
const provider = this.providers.get(providerId || this.defaultProvider);
if (!provider) {
throw new Error(`LLM provider ${providerId} not found`);
}
if (!provider.apiKey) {
throw new Error(`API key not set for provider ${providerId}`);
}
// 准备请求参数
const requestParams = {
model: options.model || provider.defaultModel,
input,
context,
options
};
try {
// 调用提供商特定的处理器
logger.debug(`Calling LLM provider ${providerId} for completion`);
const response = await provider.requestHandler(requestParams);
return response;
} catch (error) {
logger.error(`Error calling LLM provider ${providerId}:`, error);
throw error;
}
}
/**
* Anthropic请求处理器
* @private
* @param {Object} params - 请求参数
* @returns {Promise<Object>} - 响应对象
*/
async _anthropicRequestHandler(params) {
const provider = this.providers.get('anthropic');
// 构建消息历史
const messages = [];
// 添加系统消息
if (params.context.systemPrompt) {
messages.push({
role: 'system',
content: params.context.systemPrompt
});
}
// 添加历史消息
if (params.context.history && Array.isArray(params.context.history)) {
for (const message of params.context.history) {
if (message.role === 'user' || message.role === 'assistant') {
messages.push({
role: message.role,
content: message.content
});
}
}
}
// 添加工具定义
const tools = [];
if (params.context.availableTools && Array.isArray(params.context.availableTools)) {
for (const tool of params.context.availableTools) {
tools.push({
name: tool.name,
description: tool.description,
input_schema: tool.schema
});
}
}
// 添加当前用户消息
messages.push({
role: 'user',
content: params.input
});
// 构建请求体
const requestBody = {
model: params.model,
messages,
max_tokens: params.options.maxTokens || 2048,
temperature: params.options.temperature || 0.7,
top_p: params.options.topP || 0.95
};
// 如果有工具定义,添加到请求
if (tools.length > 0) {
requestBody.tools = tools;
}
// 发送请求
try {
const response = await axios({
method: 'post',
url: `${provider.baseUrl}/messages`,
headers: {
'Content-Type': 'application/json',
'x-api-key': provider.apiKey,
'anthropic-version': '2023-06-01'
},
data: requestBody
});
// 解析响应
const content = response.data.content?.[0]?.text || '';
// 检查是否有工具调用
const toolCalls = [];
if (response.data.content?.[0]?.type === 'tool_use') {
const toolUse = response.data.content[0].tool_use;
toolCalls.push({
name: toolUse.name,
arguments: toolUse.input
});
}
return {
content,
toolCalls,
model: response.data.model,
usage: response.data.usage
};
} catch (error) {
if (error.response) {
logger.error('Anthropic API error:', error.response.data);
throw new Error(`Anthropic API error: ${error.response.data.error?.message || error.message}`);
}
throw error;
}
}
/**
* OpenAI请求处理器
* @private
* @param {Object} params - 请求参数
* @returns {Promise<Object>} - 响应对象
*/
async _openaiRequestHandler(params) {
const provider = this.providers.get('openai');
// 构建消息历史
const messages = [];
// 添加系统消息
if (params.context.systemPrompt) {
messages.push({
role: 'system',
content: params.context.systemPrompt
});
}
// 添加历史消息
if (params.context.history && Array.isArray(params.context.history)) {
for (const message of params.context.history) {
if (message.role === 'user' || message.role === 'assistant' || message.role === 'system') {
messages.push({
role: message.role,
content: message.content
});
}
}
}
// 添加当前用户消息
messages.push({
role: 'user',
content: params.input
});
// 构建工具定义
const tools = [];
if (params.context.availableTools && Array.isArray(params.context.availableTools)) {
for (const tool of params.context.availableTools) {
tools.push({
type: 'function',
function: {
name: tool.name,
description: tool.description,
parameters: tool.schema
}
});
}
}
// 构建请求体
const requestBody = {
model: params.model,
messages,
max_tokens: params.options.maxTokens || 2048,
temperature: params.options.temperature || 0.7,
top_p: params.options.topP || 0.95
};
// 如果有工具定义,添加到请求
if (tools.length > 0) {
requestBody.tools = tools;
requestBody.tool_choice = 'auto';
}
// 发送请求
try {
const response = await axios({
method: 'post',
url: `${provider.baseUrl}/chat/completions`,
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${provider.apiKey}`
},
data: requestBody
});
// 解析响应
const content = response.data.choices[0]?.message?.content || '';
// 检查是否有工具调用
const toolCalls = [];
if (response.data.choices[0]?.message?.tool_calls) {
for (const toolCall of response.data.choices[0].message.tool_calls) {
if (toolCall.type === 'function') {
toolCalls.push({
name: toolCall.function.name,
arguments: JSON.parse(toolCall.function.arguments)
});
}
}
}
return {
content,
toolCalls,
model: response.data.model,
usage: response.data.usage
};
} catch (error) {
if (error.response) {
logger.error('OpenAI API error:', error.response.data);
throw new Error(`OpenAI API error: ${error.response.data.error?.message || error.message}`);
}
throw error;
}
}
/**
* 关闭LLM服务
*/
async shutdown() {
logger.info('LLM service shutdown');
}
}
// 创建单例实例
const llmService = new LLMService();
module.exports = llmService;
toolService.js - 工具管理服务:
// src/services/toolService.js
const EventEmitter = require('events');
const fs = require('fs').promises;
const path = require('path');
const logger = require('../config/logger');
const { ensureDirectoryExists } = require('../utils/fsUtils');
const dockerService = require('./dockerService');
class ToolService extends EventEmitter {
constructor() {
super();
this.toolsDir = path.join(process.cwd(), 'src', 'tools');
this.tools = new Map();
this.disabledTools = new Set();
}
/**
* 初始化工具服务
*/
async initialize() {
try {
// 确保工具目录存在
await ensureDirectoryExists(this.toolsDir);
// 加载工具
await this._loadBuiltInTools();
logger.info(`Tool service initialized with ${this.tools.size} tools`);
return true;
} catch (error) {
logger.error('Failed to initialize tool service:', error);
throw error;
}
}
/**
* 加载内置工具
* @private
*/
async _loadBuiltInTools() {
// 文件系统工具
this.registerTool('file_system', {
name: 'file_system',
description: '执行文件系统操作,如读取、写入、列出文件',
schema: {
type: 'object',
properties: {
operation: {
type: 'string',
enum: ['read', 'write', 'list', 'exists', 'delete'],
description: '要执行的文件操作'
},
path: {
type: 'string',
description: '文件或目录路径'
},
content: {
type: 'string',
description: '写入操作的文件内容'
}
},
required: ['operation', 'path']
},
execute: this._executeFileSystemTool.bind(this)
});
// 命令执行工具
this.registerTool('execute_command', {
name: 'execute_command',
description: '在沙箱环境中执行Shell命令',
schema: {
type: 'object',
properties: {
command: {
type: 'string',
description: '要执行的命令'
},
workingDir: {
type: 'string',
description: '命令执行的工作目录'
}
},
required: ['command']
},
execute: this._executeCommandTool.bind(this)
});
// HTTP请求工具
this.registerTool('http_request', {
name: 'http_request',
description: '发送HTTP请求',
schema: {
type: 'object',
properties: {
url: {
type: 'string',
description: '请求URL'
},
method: {
type: 'string',
enum: ['GET', 'POST', 'PUT', 'DELETE'],
default: 'GET',
description: 'HTTP方法'
},
headers: {
type: 'object',
description: 'HTTP请求头'
},
data: {
type: 'object',
description: '请求体数据'
}
},
required: ['url']
},
execute: this._executeHttpRequestTool.bind(this)
});
// 工作区工具
this.registerTool('workspace', {
name: 'workspace',
description: '管理项目工作区',
schema: {
type: 'object',
properties: {
operation: {
type: 'string',
enum: ['init', 'status', 'save', 'restore'],
description: '工作区操作'
},
name: {
type: 'string',
description: '工作区名称'
}
},
required: ['operation']
},
execute: this._executeWorkspaceTool.bind(this)
});
}
/**
* 注册工具
* @param {String} id - 工具ID
* @param {Object} tool - 工具配置
*/
registerTool(id, tool) {
this.tools.set(id, tool);
this.emit('tool_registered', id);
logger.debug(`Registered tool: ${id}`);
}
/**
* 获取可用工具列表
* @returns {Array} - 工具列表
*/
getAvailableTools() {
return Array.from(this.tools.entries())
.filter(([id]) => !this.disabledTools.has(id))
.map(([id, tool]) => ({
name: tool.name,
description: tool.description,
schema: tool.schema
}));
}
/**
* 获取工具定义
* @param {String} toolId - 工具ID
* @returns {Object|null} - 工具定义
*/
getToolDefinition(toolId) {
const tool = this.tools.get(toolId);
if (!tool) return null;
return {
name: tool.name,
description: tool.description,
schema: tool.schema
};
}
/**
* 执行工具
* @param {String} toolId - 工具ID
* @param {Object} args - 工具参数
* @param {Object} context - 执行上下文
* @returns {Promise<Object>} - 执行结果
*/
async executeTool(toolId, args, context = {}) {
const tool = this.tools.get(toolId);
if (!tool) {
throw new Error(`Tool ${toolId} not found`);
}
if (this.disabledTools.has(toolId)) {
throw new Error(`Tool ${toolId} is disabled`);
}
// 记录工具调用
this.emit('tool_execution_start', { toolId, args, context });
logger.debug(`Executing tool ${toolId} with args:`, args);
try {
// 执行工具
const result = await tool.execute(args, context);
// 记录执行结果
this.emit('tool_execution_complete', { toolId, args, context, result });
return result;
} catch (error) {
// 记录执行错误
this.emit('tool_execution_error', { toolId, args, context, error });
logger.error(`Error executing tool ${toolId}:`, error);
throw error;
}
}
/**
* 执行文件系统工具
* @private
* @param {Object} args - 工具参数
* @param {Object} context - 执行上下文
* @returns {Promise<Object>} - 执行结果
*/
async _executeFileSystemTool(args, context) {
const { operation, path: filePath, content } = args;
const { sandboxId } = context;
if (!sandboxId) {
throw new Error('Sandbox ID is required for file system operations');
}
switch (operation) {
case 'read':
// 在沙箱中读取文件
const readResult = await dockerService.execInSandbox(
sandboxId,
`cat "${filePath}"`,
{ workingDir: '/workspace' }
);
if (readResult.exitCode !== 0) {
throw new Error(`Failed to read file: ${readResult.stderr}`);
}
return { content: readResult.stdout };
case 'write':
// 将内容写入临时文件
const tempFile = `/tmp/file-${Date.now()}`;
await fs.writeFile(tempFile, content || '');
// 将临时文件复制到沙箱
await dockerService.copyToSandbox(sandboxId, tempFile, filePath);
// 清理临时文件
await fs.unlink(tempFile);
return { success: true };
case 'list':
// 在沙箱中列出目录
const listResult = await dockerService.execInSandbox(
sandboxId,
`ls -la "${filePath}"`,
{ workingDir: '/workspace' }
);
if (listResult.exitCode !== 0) {
throw new Error(`Failed to list directory: ${listResult.stderr}`);
}
return { output: listResult.stdout };
case 'exists':
// 检查文件是否存在
const existsResult = await dockerService.execInSandbox(
sandboxId,
`test -e "${filePath}" && echo "exists" || echo "not exists"`,
{ workingDir: '/workspace' }
);
return { exists: existsResult.stdout.trim() === 'exists' };
case 'delete':
// 删除文件
const deleteResult = await dockerService.execInSandbox(
sandboxId,
`rm -rf "${filePath}"`,
{ workingDir: '/workspace' }
);
if (deleteResult.exitCode !== 0) {
throw new Error(`Failed to delete file: ${deleteResult.stderr}`);
}
return { success: true };
default:
throw new Error(`Unsupported file system operation: ${operation}`);
}
}
/**
* 执行命令工具
* @private
* @param {Object} args - 工具参数
* @param {Object} context - 执行上下文
* @returns {Promise<Object>} - 执行结果
*/
async _executeCommandTool(args, context) {
const { command, workingDir = '/workspace' } = args;
const { sandboxId } = context;
if (!sandboxId) {
throw new Error('Sandbox ID is required for command execution');
}
// 在沙箱中执行命令
const result = await dockerService.execInSandbox(
sandboxId,
command,
{ workingDir }
);
return {
stdout: result.stdout,
stderr: result.stderr,
exitCode: result.exitCode
};
}
/**
* 执行HTTP请求工具
* @private
* @param {Object} args - 工具参数
* @param {Object} context - 执行上下文
* @returns {Promise<Object>} - 执行结果
*/
async _executeHttpRequestTool(args, context) {
const { url, method = 'GET', headers = {}, data } = args;
// 使用axios发送请求
const axios = require('axios');
try {
const response = await axios({
method,
url,
headers,
data,
timeout: 10000,
maxRedirects: 3
});
return {
status: response.status,
statusText: response.statusText,
headers: response.headers,
data: response.data
};
} catch (error) {
if (error.response) {
return {
error: true,
status: error.response.status,
statusText: error.response.statusText,
headers: error.response.headers,
data: error.response.data
};
}
throw error;
}
}
/**
* 执行工作区工具
* @private
* @param {Object} args - 工具参数
* @param {Object} context - 执行上下文
* @returns {Promise<Object>} - 执行结果
*/
async _executeWorkspaceTool(args, context) {
const { operation, name = 'default' } = args;
const { sandboxId, sessionId } = context;
if (!sandboxId) {
throw new Error('Sandbox ID is required for workspace operations');
}
const workspacesDir = path.join(process.cwd(), '.openhands', 'workspaces');
await ensureDirectoryExists(workspacesDir);
switch (operation) {
case 'init':
// 初始化工作区
await dockerService.execInSandbox(
sandboxId,
'mkdir -p /workspace',
{}
);
return { success: true };
case 'status':
// 获取工作区状态
const statusResult = await dockerService.execInSandbox(
sandboxId,
'find /workspace -type f | wc -l',
{}
);
return {
fileCount: parseInt(statusResult.stdout.trim()) || 0,
sandbox: sandboxId,
session: sessionId
};
case 'save':
// 保存工作区
const workspaceFile = path.join(workspacesDir, `${name}.tar`);
// 在沙箱中创建归档
await dockerService.execInSandbox(
sandboxId,
'tar -cf /tmp/workspace.tar -C /workspace .',
{}
);
// 从沙箱复制归档
await dockerService.copyFromSandbox(
sandboxId,
'/tmp/workspace.tar',
workspaceFile
);
return {
success: true,
name,
path: workspaceFile
};
case 'restore':
// 恢复工作区
const restoreFile = path.join(workspacesDir, `${name}.tar`);
// 检查归档是否存在
try {
await fs.access(restoreFile);
} catch (error) {
throw new Error(`Workspace ${name} does not exist`);
}
// 复制归档到沙箱
await dockerService.copyToSandbox(
sandboxId,
restoreFile,
'/tmp/workspace.tar'
);
// 在沙箱中解压归档
await dockerService.execInSandbox(
sandboxId,
'tar -xf /tmp/workspace.tar -C /workspace',
{}
);
return {
success: true,
name
};
default:
throw new Error(`Unsupported workspace operation: ${operation}`);
}
}
/**
* 启用工具
* @param {String} toolId - 工具ID
* @returns {Boolean} - 是否成功
*/
enableTool(toolId) {
if (!this.tools.has(toolId)) {
return false;
}
this.disabledTools.delete(toolId);
this.emit('tool_enabled', toolId);
logger.debug(`Enabled tool: ${toolId}`);
return true;
}
/**
* 禁用工具
* @param {String} toolId - 工具ID
* @returns {Boolean} - 是否成功
*/
disableTool(toolId) {
if (!this.tools.has(toolId)) {
return false;
}
this.disabledTools.add(toolId);
this.emit('tool_disabled', toolId);
logger.debug(`Disabled tool: ${toolId}`);
return true;
}
/**
* 关闭工具服务
*/
async shutdown() {
logger.info('Tool service shutdown');
}
}
// 创建单例实例
const toolService = new ToolService();
module.exports = toolService;
4.3 实现控制器层
chatController.js - 对话控制器:
// src/controllers/chatController.js
const { validationResult } = require('express-validator');
const logger = require('../config/logger');
const agentService = require('../services/agentService');
/**
* 发送消息
* @param {Object} req - 请求对象
* @param {Object} res - 响应对象
* @param {Function} next - 下一个中间件
*/
exports.sendMessage = async (req, res, next) => {
try {
const errors = validationResult(req);
if (!errors.isEmpty()) {
return res.status(400).json({ errors: errors.array() });
}
const { message, conversationId } = req.body;
// 如果没有会话ID,创建新会话
let sessionId = conversationId;
if (!sessionId) {
sessionId = await agentService.createSession();
}
// 处理用户输入
const response = await agentService.processInput(sessionId, message);
res.json({
success: true,
sessionId,
content: response.content,
type: response.type,
toolCalls: response.toolCalls,
toolResults: response.toolResults
});
} catch (error) {
logger.error('Error in sendMessage controller:', error);
next(error);
}
};
/**
* 执行命令
* @param {Object} req - 请求对象
* @param {Object} res - 响应对象
* @param {Function} next - 下一个中间件
*/
exports.executeCommand = async (req, res, next) => {
try {
const errors = validationResult(req);
if (!errors.isEmpty()) {
return res.status(400).json({ errors: errors.array() });
}
const { command, conversationId } = req.body;
if (!conversationId) {
return res.status(400).json({
success: false,
error: 'Conversation ID is required'
});
}
// 执行命令
const result = await agentService.executeCommand(conversationId, command);
res.json({
success: true,
sessionId: conversationId,
command,
output: result.result.stdout,
error: result.result.stderr,
exitCode: result.result.exitCode
});
} catch (error) {
logger.error('Error in executeCommand controller:', error);
next(error);
}
};
/**
* 获取对话历史
* @param {Object} req - 请求对象
* @param {Object} res - 响应对象
* @param {Function} next - 下一个中间件
*/
exports.getHistory = async (req, res, next) => {
try {
const { conversationId } = req.params;
if (!conversationId) {
return res.status(400).json({
success: false,
error: 'Conversation ID is required'
});
}
// 获取会话历史
const history = agentService.getSessionHistory(conversationId);
res.json({
success: true,
sessionId: conversationId,
history
});
} catch (error) {
logger.error('Error in getHistory controller:', error);
next(error);
}
};
/**
* 创建新对话
* @param {Object} req - 请求对象
* @param {Object} res - 响应对象
* @param {Function} next - 下一个中间件
*/
exports.createConversation = async (req, res, next) => {
try {
const options = req.body || {};
// 创建新会话
const sessionId = await agentService.createSession(options);
res.json({
success: true,
sessionId
});
} catch (error) {
logger.error('Error in createConversation controller:', error);
next(error);
}
};
/**
* 删除对话
* @param {Object} req - 请求对象
* @param {Object} res - 响应对象
* @param {Function} next - 下一个中间件
*/
exports.deleteConversation = async (req, res, next) => {
try {
const { conversationId } = req.params;
if (!conversationId) {
return res.status(400).json({
success: false,
error: 'Conversation ID is required'
});
}
// 删除会话
const result = await agentService.deleteSession(conversationId);
res.json({
success: result
});
} catch (error) {
logger.error('Error in deleteConversation controller:', error);
next(error);
}
};
/**
* 获取对话列表
* @param {Object} req - 请求对象
* @param {Object} res - 响应对象
* @param {Function} next - 下一个中间件
*/
exports.listConversations = async (req, res, next) => {
try {
// 获取会话列表
const sessions = agentService.listSessions();
res.json({
success: true,
sessions
});
} catch (error) {
logger.error('Error in listConversations controller:', error);
next(error);
}
};
toolController.js - 工具控制器:
// src/controllers/toolController.js
const { validationResult } = require('express-validator');
const logger = require('../config/logger');
const toolService = require('../services/toolService');
/**
* 获取可用工具列表
* @param {Object} req - 请求对象
* @param {Object} res - 响应对象
* @param {Function} next - 下一个中间件
*/
exports.getAvailableTools = async (req, res, next) => {
try {
// 获取可用工具列表
const tools = toolService.getAvailableTools();
res.json({
success: true,
tools
});
} catch (error) {
logger.error('Error in getAvailableTools controller:', error);
next(error);
}
};
/**
* 获取工具定义
* @param {Object} req - 请求对象
* @param {Object} res - 响应对象
* @param {Function} next - 下一个中间件
*/
exports.getToolDefinition = async (req, res, next) => {
try {
const { toolId } = req.params;
if (!toolId) {
return res.status(400).json({
success: false,
error: 'Tool ID is required'
});
}
// 获取工具定义
const definition = toolService.getToolDefinition(toolId);
if (!definition) {
return res.status(404).json({
success: false,
error: 'Tool not found'
});
}
res.json({
success: true,
tool: definition
});
} catch (error) {
logger.error('Error in getToolDefinition controller:', error);
next(error);
}
};
/**
* 执行工具
* @param {Object} req - 请求对象
* @param {Object} res - 响应对象
* @param {Function} next - 下一个中间件
*/
exports.executeTool = async (req, res, next) => {
try {
const errors = validationResult(req);
if (!errors.isEmpty()) {
return res.status(400).json({ errors: errors.array() });
}
const { toolId } = req.params;
const { args, context } = req.body;
if (!toolId) {
return res.status(400).json({
success: false,
error: 'Tool ID is required'
});
}
// 执行工具
const result = await toolService.executeTool(toolId, args, context);
res.json({
success: true,
toolId,
result
});
} catch (error) {
logger.error('Error in executeTool controller:', error);
next(error);
}
};
/**
* 启用工具
* @param {Object} req - 请求对象
* @param {Object} res - 响应对象
* @param {Function} next - 下一个中间件
*/
exports.enableTool = async (req, res, next) => {
try {
const { toolId } = req.params;
if (!toolId) {
return res.status(400).json({
success: false,
error: 'Tool ID is required'
});
}
// 启用工具
const result = toolService.enableTool(toolId);
res.json({
success: result
});
} catch (error) {
logger.error('Error in enableTool controller:', error);
next(error);
}
};
/**
* 禁用工具
* @param {Object} req - 请求对象
* @param {Object} res - 响应对象
* @param {Function} next - 下一个中间件
*/
exports.disableTool = async (req, res, next) => {
try {
const { toolId } = req.params;
if (!toolId) {
return res.status(400).json({
success: false,
error: 'Tool ID is required'
});
}
// 禁用工具
const result = toolService.disableTool(toolId);
res.json({
success: result
});
} catch (error) {
logger.error('Error in disableTool controller:', error);
next(error);
}
};
configController.js - 配置控制器:
// src/controllers/configController.js
const { validationResult } = require('express-validator');
const logger = require('../config/logger');
const llmService = require('../services/llmService');
const config = require('../config/config');
const fs = require('fs').promises;
const path = require('path');
/**
* 获取配置信息
* @param {Object} req - 请求对象
* @param {Object} res - 响应对象
* @param {Function} next - 下一个中间件
*/
exports.getConfig = async (req, res, next) => {
try {
// 获取可用LLM提供商
const providers = llmService.getAvailableProviders();
const defaultProvider = llmService.getDefaultProvider();
// 过滤敏感信息
const safeConfig = {
version: config.version,
environment: config.environment,
sandbox: {
enabled: config.sandbox.enabled,
image: config.sandbox.image
},
llm: {
providers,
defaultProvider
}
};
res.json({
success: true,
config: safeConfig
});
} catch (error) {
logger.error('Error in getConfig controller:', error);
next(error);
}
};
/**
* 更新API密钥
* @param {Object} req - 请求对象
* @param {Object} res - 响应对象
* @param {Function} next - 下一个中间件
*/
exports.updateApiKey = async (req, res, next) => {
try {
const errors = validationResult(req);
if (!errors.isEmpty()) {
return res.status(400).json({ errors: errors.array() });
}
const { provider, apiKey } = req.body;
if (!provider) {
return res.status(400).json({
success: false,
error: 'Provider is required'
});
}
// 更新API密钥
llmService.setApiKey(provider, apiKey);
// 保存到配置文件
const configDir = path.join(process.cwd(), '.openhands', 'config');
await fs.mkdir(configDir, { recursive: true });
const keysFile = path.join(configDir, 'api-keys.json');
let keys = {};
try {
const keysData = await fs.readFile(keysFile, 'utf8');
keys = JSON.parse(keysData);
} catch (error) {
// 文件不存在或解析错误,使用空对象
}
keys[provider] = apiKey;
await fs.writeFile(keysFile, JSON.stringify(keys, null, 2), 'utf8');
res.json({
success: true,
provider
});
} catch (error) {
logger.error('Error in updateApiKey controller:', error);
next(error);
}
};
/**
* 设置默认LLM提供商
* @param {Object} req - 请求对象
* @param {Object} res - 响应对象
* @param {Function} next - 下一个中间件
*/
exports.setDefaultProvider = async (req, res, next) => {
try {
const errors = validationResult(req);
if (!errors.isEmpty()) {
return res.status(400).json({ errors: errors.array() });
}
const { provider } = req.body;
if (!provider) {
return res.status(400).json({
success: false,
error: 'Provider is required'
});
}
// 设置默认提供商
llmService.setDefaultProvider(provider);
res.json({
success: true,
defaultProvider: provider
});
} catch (error) {
logger.error('Error in setDefaultProvider controller:', error);
next(error);
}
};
4.4 实现路由层
chatRoutes.js - 对话路由:
// src/routes/chatRoutes.js
const express = require('express');
const { body, param } = require('express-validator');
const chatController = require('../controllers/chatController');
const router = express.Router();
/**
* @route POST /api/chat
* @desc 发送消息
* @access Public
*/
router.post(
'/',
[
body('message').notEmpty().withMessage('Message is required')
],
chatController.sendMessage
);
/**
* @route POST /api/chat/execute
* @desc 执行命令
* @access Public
*/
router.post(
'/execute',
[
body('command').notEmpty().withMessage('Command is required'),
body('conversationId').notEmpty().withMessage('Conversation ID is required')
],
chatController.executeCommand
);
/**
* @route GET /api/chat/conversations
* @desc 获取对话列表
* @access Public
*/
router.get(
'/conversations',
chatController.listConversations
);
/**
* @route POST /api/chat/conversations
* @desc 创建新对话
* @access Public
*/
router.post(
'/conversations',
chatController.createConversation
);
/**
* @route GET /api/chat/conversations/:conversationId
* @desc 获取对话历史
* @access Public
*/
router.get(
'/conversations/:conversationId',
[
param('conversationId').notEmpty().withMessage('Conversation ID is required')
],
chatController.getHistory
);
/**
* @route DELETE /api/chat/conversations/:conversationId
* @desc 删除对话
* @access Public
*/
router.delete(
'/conversations/:conversationId',
[
param('conversationId').notEmpty().withMessage('Conversation ID is required')
],
chatController.deleteConversation
);
module.exports = router;
toolRoutes.js - 工具路由:
// src/routes/toolRoutes.js
const express = require('express');
const { body, param } = require('express-validator');
const toolController = require('../controllers/toolController');
const router = express.Router();
/**
* @route GET /api/tools
* @desc 获取可用工具列表
* @access Public
*/
router.get(
'/',
toolController.getAvailableTools
);
/**
* @route GET /api/tools/:toolId
* @desc 获取工具定义
* @access Public
*/
router.get(
'/:toolId',
[
param('toolId').notEmpty().withMessage('Tool ID is required')
],
toolController.getToolDefinition
);
/**
* @route POST /api/tools/:toolId/execute
* @desc 执行工具
* @access Public
*/
router.post(
'/:toolId/execute',
[
param('toolId').notEmpty().withMessage('Tool ID is required'),
body('args').notEmpty().withMessage('Arguments are required')
],
toolController.executeTool
);
/**
* @route PUT /api/tools/:toolId/enable
* @desc 启用工具
* @access Public
*/
router.put(
'/:toolId/enable',
[
param('toolId').notEmpty().withMessage('Tool ID is required')
],
toolController.enableTool
);
/**
* @route PUT /api/tools/:toolId/disable
* @desc 禁用工具
* @access Public
*/
router.put(
'/:toolId/disable',
[
param('toolId').notEmpty().withMessage('Tool ID is required')
],
toolController.disableTool
);
module.exports = router;
configRoutes.js - 配置路由:
// src/routes/configRoutes.js
const express = require('express');
const { body } = require('express-validator');
const configController = require('../controllers/configController');
const router = express.Router();
/**
* @route GET /api/config
* @desc 获取配置信息
* @access Public
*/
router.get(
'/',
configController.getConfig
);
/**
* @route POST /api/config/api-key
* @desc 更新API密钥
* @access Public
*/
router.post(
'/api-key',
[
body('provider').notEmpty().withMessage('Provider is required'),
body('apiKey').notEmpty().withMessage('API key is required')
],
configController.updateApiKey
);
/**
* @route POST /api/config/default-provider
* @desc 设置默认LLM提供商
* @access Public
*/
router.post(
'/default-provider',
[
body('provider').notEmpty().withMessage('Provider is required')
],
configController.setDefaultProvider
);
module.exports = router;
4.5 实现应用入口
app.js - 应用入口:
// src/app.js
const express = require('express');
const cors = require('cors');
const helmet = require('helmet');
const compression = require('compression');
const morgan = require('morgan');
const path = require('path');
const fs = require('fs').promises;
const config = require('./config/config');
const logger = require('./config/logger');
const errorHandler = require('./middlewares/errorHandler');
const requestLogger = require('./middlewares/requestLogger');
const agentService = require('./services/agentService');
const chatRoutes = require('./routes/chatRoutes');
const toolRoutes = require('./routes/toolRoutes');
const configRoutes = require('./routes/configRoutes');
// 创建Express应用
const app = express();
// 初始化中间件
app.use(helmet());
app.use(compression());
app.use(cors());
app.use(express.json({ limit: '10mb' }));
app.use(express.urlencoded({ extended: true, limit: '10mb' }));
// 请求日志
if (config.environment === 'production') {
// 在生产环境中,将日志写入文件
const logDir = path.join(process.cwd(), 'logs');
fs.mkdir(logDir, { recursive: true })
.then(() => {
const accessLogStream = fs.createWriteStream(
path.join(logDir, 'access.log'),
{ flags: 'a' }
);
app.use(morgan('combined', { stream: accessLogStream }));
})
.catch(err => {
logger.error('Failed to create logs directory:', err);
app.use(morgan('combined'));
});
} else {
// 在开发环境中,使用简洁的日志格式
app.use(morgan('dev'));
}
// 请求ID和日志中间件
app.use(requestLogger);
// 挂载API路由
app.use('/api/chat', chatRoutes);
app.use('/api/tools', toolRoutes);
app.use('/api/config', configRoutes);
// 静态文件服务
app.use(express.static(path.join(__dirname, 'public')));
// 错误处理
app.use(errorHandler);
// 启动服务器
const port = config.port || 3000;
let server;
// 优雅启动
async function startServer() {
try {
// 初始化代理服务
await agentService.initialize();
// 启动HTTP服务器
server = app.listen(port, () => {
logger.info(`Server running on port ${port}`);
});
// 处理未捕获的异常
process.on('uncaughtException', (error) => {
logger.error('Uncaught Exception:', error);
});
// 处理未处理的Promise拒绝
process.on('unhandledRejection', (reason, promise) => {
logger.error('Unhandled Promise Rejection:', reason);
});
// 处理进程终止信号
process.on('SIGTERM', gracefulShutdown);
process.on('SIGINT', gracefulShutdown);
} catch (error) {
logger.error('Failed to start server:', error);
process.exit(1);
}
}
// 优雅关闭
async function gracefulShutdown() {
logger.info('Shutting down server...');
if (server) {
server.close(async () => {
logger.info('HTTP server closed');
// 关闭代理服务
try {
await agentService.shutdown();
logger.info('Agent service shut down successfully');
} catch (error) {
logger.error('Error during agent service shutdown:', error);
}
process.exit(0);
});
// 如果在10秒内没有关闭,强制退出
setTimeout(() => {
logger.error('Forcing server shutdown after timeout');
process.exit(1);
}, 10000);
} else {
process.exit(0);
}
}
// 启动服务器
startServer().catch((error) => {
logger.error('Failed to start server:', error);
process.exit(1);
});
module.exports = app;
config.js - 配置文件:
// src/config/config.js
const path = require('path');
const dotenv = require('dotenv');
// 加载环境变量
dotenv.config();
const config = {
// 应用信息
name: 'OpenHands Backend',
version: '1.0.0',
description: 'OpenHands AI开发助手后端服务',
// 环境配置
environment: process.env.NODE_ENV || 'development',
port: parseInt(process.env.PORT || '3000', 10),
// 日志配置
logs: {
level: process.env.LOG_LEVEL || 'info',
file: process.env.LOG_FILE || path.join(process.cwd(), 'logs', 'app.log')
},
// 沙箱配置
sandbox: {
enabled: process.env.SANDBOX_ENABLED !== 'false',
image: process.env.SANDBOX_IMAGE || 'docker.all-hands.dev/all-hands-ai/runtime:0.47-nikolaik',
cpus: process.env.SANDBOX_CPUS || '1',
memory: process.env.SANDBOX_MEMORY || '1g',
pidsLimit: parseInt(process.env.SANDBOX_PIDS_LIMIT || '100', 10),
network: process.env.SANDBOX_NETWORK || 'bridge'
},
// LLM配置
llm: {
anthropic: {
apiKey: process.env.ANTHROPIC_API_KEY || '',
model: process.env.ANTHROPIC_MODEL || 'claude-sonnet-4-20250514'
},
openai: {
apiKey: process.env.OPENAI_API_KEY || '',
model: process.env.OPENAI_MODEL || 'gpt-4o'
}
},
// 会话配置
session: {
maxHistory: parseInt(process.env.MAX_HISTORY || '200', 10),
saveInterval: parseInt(process.env.SAVE_INTERVAL || '60000', 10)
},
// 工具配置
tools: {
disabledTools: (process.env.DISABLED_TOOLS || '').split(',').filter(Boolean)
}
};
module.exports = config;
logger.js - 日志配置:
// src/config/logger.js
const winston = require('winston');
const path = require('path');
const config = require('./config');
// 确保日志目录存在
const fs = require('fs');
const logDir = path.dirname(config.logs.file);
if (!fs.existsSync(logDir)) {
fs.mkdirSync(logDir, { recursive: true });
}
// 创建日志格式
const logFormat = winston.format.combine(
winston.format.timestamp({
format: 'YYYY-MM-DD HH:mm:ss'
}),
winston.format.errors({ stack: true }),
winston.format.printf(info => {
let logMsg = `${info.timestamp} [${info.level.toUpperCase()}]: ${info.message}`;
if (info.stack) {
logMsg += `\n${info.stack}`;
}
return logMsg;
})
);
// 创建Logger实例
const logger = winston.createLogger({
level: config.logs.level,
format: logFormat,
transports: [
// 控制台输出
new winston.transports.Console({
format: winston.format.combine(
winston.format.colorize(),
logFormat
)
}),
// 文件输出
new winston.transports.File({
filename: config.logs.file,
maxsize: 5242880, // 5MB
maxFiles: 5
})
]
});
module.exports = logger;
errorHandler.js - 错误处理中间件:
// src/middlewares/errorHandler.js
const logger = require('../config/logger');
/**
* 错误处理中间件
* @param {Error} err - 错误对象
* @param {Object} req - 请求对象
* @param {Object} res - 响应对象
* @param {Function} next - 下一个中间件
*/
const errorHandler = (err, req, res, next) => {
// 记录错误
logger.error(`Error on ${req.method} ${req.path}: ${err.message}`, {
requestId: req.requestId,
error: err
});
// 判断错误类型
if (err.name === 'ValidationError') {
return res.status(400).json({
success: false,
error: 'Validation Error',
details: err.details || err.message
});
}
if (err.name === 'UnauthorizedError') {
return res.status(401).json({
success: false,
error: 'Unauthorized',
details: err.message
});
}
if (err.name === 'ForbiddenError') {
return res.status(403).json({
success: false,
error: 'Forbidden',
details: err.message
});
}
if (err.name === 'NotFoundError') {
return res.status(404).json({
success: false,
error: 'Not Found',
details: err.message
});
}
// 默认为服务器错误
return res.status(500).json({
success: false,
error: 'Internal Server Error',
details: process.env.NODE_ENV === 'production' ? 'Something went wrong' : err.message
});
};
module.exports = errorHandler;
requestLogger.js - 请求日志中间件:
// src/middlewares/requestLogger.js
const { v4: uuidv4 } = require('uuid');
const logger = require('../config/logger');
/**
* 请求日志中间件
* @param {Object} req - 请求对象
* @param {Object} res - 响应对象
* @param {Function} next - 下一个中间件
*/
const requestLogger = (req, res, next) => {
// 生成请求ID
req.requestId = req.headers['x-request-id'] || uuidv4();
res.setHeader('X-Request-ID', req.requestId);
// 记录请求开始
logger.debug(`Request started: ${req.method} ${req.path}`, {
requestId: req.requestId,
method: req.method,
path: req.originalUrl || req.url,
ip: req.ip,
userAgent: req.get('user-agent')
});
// 记录请求结束
const start = Date.now();
res.on('finish', () => {
const duration = Date.now() - start;
const level = res.statusCode >= 400 ? 'warn' : 'debug';
logger[level](`Request completed: ${req.method} ${req.path} ${res.statusCode} (${duration}ms)`, {
requestId: req.requestId,
method: req.method,
path: req.originalUrl || req.url,
statusCode: res.statusCode,
duration
});
});
next();
};
module.exports = requestLogger;
五、总结与思考
5.1 后端服务架构关键点
本笔记详细实现了一个模拟OpenHands功能的后端服务架构,主要关键点包括:
- 分层架构设计:采用控制器-服务-模型的分层架构,实现关注点分离
- 事件驱动模式:使用EventEmitter实现服务间的松耦合通信
- 沙箱隔离:通过Docker容器提供安全的命令执行环境
- 插件式工具系统:实现可扩展的工具注册和执行机制
- 会话状态管理:完善的会话创建、持久化和恢复功能
- 错误处理策略:统一的错误处理中间件,提高系统稳定性
- 配置管理:灵活的配置系统,支持环境变量覆盖
5.2 与OpenHands实际架构的比较
虽然我们的实现是基于对OpenHands功能的推断,但与实际系统可能存在以下差异:
- 规模复杂度:实际系统可能有更复杂的功能和更大的代码库
- 性能优化:商业系统可能有更多性能优化措施,如缓存、队列等
- 安全措施:更严格的权限控制和安全隔离机制
- 分布式架构:可能采用微服务架构,而非我们实现的单体应用
- 更完善的插件系统:可能有更灵活的插件机制和更丰富的工具集
5.3 实现过程中的技术决策
在开发过程中,我们做出了一些值得关注的技术决策:
- 选择Node.js:因其非阻塞I/O和事件驱动特性,适合构建响应式的AI代理系统
- Docker集成:提供安全的代码执行环境,是AI代理系统的关键安全措施
- 会话持久化:使用文件系统而非数据库,简化部署,适合中小规模应用
- 模块化设计:通过良好的接口定义,使各组件可独立测试和替换
- 统一错误处理:集中式的错误处理策略,提供一致的错误响应
六、下一步学习方向
本笔记实现了OpenHands核心后端功能,但以下方向值得进一步探索:
- 数据库集成:使用MongoDB或PostgreSQL替代文件系统存储,提升扩展性
- 身份认证与授权:实现JWT认证和基于角色的访问控制
- WebSocket支持:添加实时通信功能,提升用户体验
- 任务队列:使用Redis或RabbitMQ实现异步任务处理
- 服务监控:集成Prometheus和Grafana实现系统监控
- API文档:使用Swagger自动生成API文档
- 测试覆盖:编写单元测试和集成测试,提高代码质量
- 容器化部署:创建Docker Compose配置,简化部署过程
- CI/CD流水线:实现自动测试和部署流程