智能体服务封装

发布于:2025-07-28 ⋅ 阅读:(14) ⋅ 点赞:(0)

import asyncio
import logging
from typing import Dict, List, Optional, Any, AsyncIterator, Union
from third.TranssionAgentClient import agent_client_instance
from constants.conversation_constants import AgentConstants

logger = logging.getLogger(name)

class AgentService:
“”“智能体服务封装类”“”

def __init__(self):
    self.client = agent_client_instance

async def chat(self, messages: List[Dict[str, str]], 
               options: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
    """
    智能体普通对话
    
    Args:
        messages: 消息列表,格式: [{"role": "user", "content": "消息内容"}]
        options: 可选参数,支持:
            - dellE3Size: 如果用的是dellE3,生成图片的尺寸大小,必须是 1792x1024、1024x1024 或 1024x1792
            - userId: 如果是传神云文档问答需要在智库授权,并需要传用户工号
    
    Returns:
        Dict包含以下字段:
        - code: 响应码
        - data: 原始模型返回的结构体
        - dataContent: 返回的消息内容(且加工后的)
        - message: 响应消息
        - success: 是否成功
    
    Example:
        result = await agent_service.chat(
            [{"role": "user", "content": "你好"}]
        )
        content = result.get("dataContent", "")
    """
    try:
        if not self.client.is_configured():
            raise ValueError("智能体客户端未正确配置,请检查 AGENT_APP_ID、AGENT_APP_SECRET 和 AGENT_ASSISTANT_CODE")
        
        # 验证消息格式
        if not messages or not isinstance(messages, list):
            raise ValueError("messages 参数必须是非空列表")
        
        for msg in messages:
            if not isinstance(msg, dict) or "role" not in msg or "content" not in msg:
                raise ValueError("消息格式错误,每个消息必须包含 role 和 content 字段")
            if msg["role"] not in ["user", "assistant"]:
                raise ValueError("消息角色必须是 'user' 或 'assistant'")
        
        logger.info(f"[AgentService.chat] 调用智能体普通对话")
        result = await self.client.chat(messages, options)
        
        # 统一响应格式处理
        if result.get(AgentConstants.FIELD_CODE) == AgentConstants.HTTP_SUCCESS and result.get(AgentConstants.FIELD_SUCCESS):
            logger.info(f"[AgentService.chat] 调用成功,响应内容长度: {len(result.get('dataContent', ''))}")
        else:
            logger.warning(f"[AgentService.chat] 调用失败,错误信息: {result.get(AgentConstants.FIELD_MESSAGE, '未知错误')}")
        
        return result
        
    except Exception as e:
        logger.error(f"[AgentService.chat] 智能体对话失败: {str(e)}")
        return {
            AgentConstants.FIELD_CODE: AgentConstants.HTTP_SERVER_ERROR,
            AgentConstants.FIELD_MESSAGE: f"智能体调用失败: {str(e)}",
            AgentConstants.FIELD_DATA: None,
            "dataContent": "",
            AgentConstants.FIELD_SUCCESS: False
        }

async def stream_chat(self, messages: List[Dict[str, str]], 
                     options: Optional[Dict[str, Any]] = None) -> AsyncIterator[Dict[str, Any]]:
    """
    智能体流式对话
    
    Args:
        messages: 消息列表
        options: 可选参数
    
    Yields:
        Dict包含以下字段:
        - data: 当前的状态
        - dataContent: 返回的消息内容(且加工后的)
        - dataObject: 原始的gpt返回对象
        - errorMsg: 错误异常消息
        - errorCode: 错误码
    
    Example:
        async for chunk in agent_service.stream_chat(
            [{"role": "user", "content": "写一篇文章"}]
        ):
            content = chunk.get("dataContent", "")
            if content:
                print(content, end="", flush=True)
    """
    try:
        if not self.client.is_configured():
            yield {
                "data": "error",
                "dataContent": "",
                "dataObject": None,
                "errorMsg": "智能体客户端未正确配置,请检查 AGENT_APP_ID、AGENT_APP_SECRET 和 AGENT_ASSISTANT_CODE",
                "errorCode": 500
            }
            return
        
        # 验证消息格式
        if not messages or not isinstance(messages, list):
            yield {
                "data": "error", 
                "dataContent": "",
                "dataObject": None,
                "errorMsg": "messages 参数必须是非空列表",
                "errorCode": 400
            }
            return
        
        for msg in messages:
            if not isinstance(msg, dict) or "role" not in msg or "content" not in msg:
                yield {
                    "data": "error",
                    "dataContent": "",
                    "dataObject": None,
                    "errorMsg": "消息格式错误,每个消息必须包含 role 和 content 字段",
                    "errorCode": 400
                }
                return
            if msg["role"] not in ["user", "assistant"]:
                yield {
                    "data": "error",
                    "dataContent": "",
                    "dataObject": None,
                    "errorMsg": "消息角色必须是 'user' 或 'assistant'",
                    "errorCode": 400
                }
                return
        
        logger.info(f"[AgentService.stream_chat] 调用智能体流式对话")
        
        async for chunk in self.client.stream_chat(messages, options):
            yield chunk
            
    except Exception as e:
        logger.error(f"[AgentService.stream_chat] 智能体流式对话失败: {str(e)}")
        yield {
            "data": "error",
            "dataContent": "",
            "dataObject": None,
            "errorMsg": f"智能体流式调用失败: {str(e)}",
            "errorCode": 500
        }


async def close(self):
    """关闭服务连接"""
    await self.client.close()

def is_available(self) -> bool:
    """检查服务是否可用"""
    return self.client.is_configured()

创建全局服务实例

agent_service = AgentService()


网站公告

今日签到

点亮在社区的每一天
去签到