Coze源码分析-资源库-创建知识库-基础设施/存储/安全

发布于:2025-09-15 ⋅ 阅读:(21) ⋅ 点赞:(0)

6. 基础设施层

基础设施层为知识库创建功能提供底层技术支撑,包括数据存储、缓存、消息队列、文档处理、向量化等核心服务。

6.1 数据存储服务

6.1.1 MySQL数据库

文件位置: backend/infra/rdb/mysql.go

// MySQLConfig MySQL配置
type MySQLConfig struct {
    Host         string `yaml:"host"`
    Port         int    `yaml:"port"`
    Username     string `yaml:"username"`
    Password     string `yaml:"password"`
    Database     string `yaml:"database"`
    MaxOpenConns int    `yaml:"max_open_conns"`
    MaxIdleConns int    `yaml:"max_idle_conns"`
    MaxLifetime  int    `yaml:"max_lifetime"`
}

// NewMySQLConnection 创建MySQL连接
func NewMySQLConnection(config *MySQLConfig) (*gorm.DB, error) {
    dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local",
        config.Username, config.Password, config.Host, config.Port, config.Database)
    
    db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{
        Logger: logger.Default.LogMode(logger.Info),
        NamingStrategy: schema.NamingStrategy{
            SingularTable: true,
        },
    })
    if err != nil {
        return nil, fmt.Errorf("连接MySQL失败: %w", err)
    }
    
    sqlDB, err := db.DB()
    if err != nil {
        return nil, fmt.Errorf("获取SQL DB失败: %w", err)
    }
    
    // 设置连接池参数
    sqlDB.SetMaxOpenConns(config.MaxOpenConns)
    sqlDB.SetMaxIdleConns(config.MaxIdleConns)
    sqlDB.SetConnMaxLifetime(time.Duration(config.MaxLifetime) * time.Second)
    
    return db, nil
}
6.1.2 Redis缓存

文件位置: backend/infra/cache/redis.go

// RedisConfig Redis配置
type RedisConfig struct {
    Host     string `yaml:"host"`
    Port     int    `yaml:"port"`
    Password string `yaml:"password"`
    DB       int    `yaml:"db"`
    PoolSize int    `yaml:"pool_size"`
}

// NewRedisClient 创建Redis客户端
func NewRedisClient(config *RedisConfig) *redis.Client {
    rdb := redis.NewClient(&redis.Options{
        Addr:     fmt.Sprintf("%s:%d", config.Host, config.Port),
        Password: config.Password,
        DB:       config.DB,
        PoolSize: config.PoolSize,
    })
    
    return rdb
}

// KnowledgeCacheManager 知识库缓存管理器
type KnowledgeCacheManager struct {
    redisClient *redis.Client
    localCache  *cache.Cache
}

func (c *KnowledgeCacheManager) SetKnowledge(ctx context.Context, knowledge *model.Knowledge) error {
    // 1. 序列化知识库数据
    data, err := json.Marshal(knowledge)
    if err != nil {
        return fmt.Errorf("序列化知识库数据失败: %w", err)
    }
    
    // 2. 设置Redis缓存
    cacheKey := fmt.Sprintf("knowledge:%d", knowledge.ID)
    err = c.redisClient.Set(ctx, cacheKey, data, time.Hour).Err()
    if err != nil {
        return fmt.Errorf("设置Redis缓存失败: %w", err)
    }
    
    // 3. 设置本地缓存
    c.localCache.Set(cacheKey, knowledge, time.Hour)
    
    return nil
}

6.2 文档处理服务

6.2.1 文档解析器

文件位置: backend/infra/document/parser.go

// DocumentParser 文档解析器接口
type DocumentParser interface {
    Parse(ctx context.Context, file io.Reader, fileType string) (*ParseResult, error)
    SupportedTypes() []string
}

// ParseResult 解析结果
type ParseResult struct {
    Content   string            `json:"content"`
    Metadata  map[string]string `json:"metadata"`
    Sections  []*Section        `json:"sections"`
    WordCount int               `json:"word_count"`
}

// Section 文档章节
type Section struct {
    Title   string `json:"title"`
    Content string `json:"content"`
    Level   int    `json:"level"`
}

// UniversalDocumentParser 通用文档解析器
type UniversalDocumentParser struct {
    parsers map[string]DocumentParser
}

func (p *UniversalDocumentParser) Parse(ctx context.Context, file io.Reader, fileType string) (*ParseResult, error) {
    parser, exists := p.parsers[fileType]
    if !exists {
        return nil, fmt.Errorf("不支持的文件类型: %s", fileType)
    }
    
    result, err := parser.Parse(ctx, file, fileType)
    if err != nil {
        return nil, fmt.Errorf("解析文档失败: %w", err)
    }
    
    return result, nil
}
6.2.2 文档分片器

文件位置: backend/infra/document/splitter.go

// DocumentSplitter 文档分片器
type DocumentSplitter struct {
    maxChunkSize   int
    overlapSize    int
    separators     []string
}

// SplitDocument 分割文档
func (s *DocumentSplitter) SplitDocument(ctx context.Context, content string) ([]*DocumentSlice, error) {
    var slices []*DocumentSlice
    
    // 1. 按段落分割
    paragraphs := strings.Split(content, "\n\n")
    
    var currentSlice strings.Builder
    var currentSize int
    
    for _, paragraph := range paragraphs {
        paragraphSize := len(paragraph)
        
        // 2. 检查是否需要创建新分片
        if currentSize+paragraphSize > s.maxChunkSize && currentSize > 0 {
            // 创建当前分片
            slice := &DocumentSlice{
                Content:   currentSlice.String(),
                WordCount: currentSize,
                Index:     len(slices),
            }
            slices = append(slices, slice)
            
            // 重置当前分片
            currentSlice.Reset()
            currentSize = 0
            
            // 添加重叠内容
            if s.overlapSize > 0 {
                overlapContent := s.getOverlapContent(slice.Content, s.overlapSize)
                currentSlice.WriteString(overlapContent)
                currentSize = len(overlapContent)
            }
        }
        
        // 3. 添加段落到当前分片
        if currentSize > 0 {
            currentSlice.WriteString("\n\n")
            currentSize += 2
        }
        currentSlice.WriteString(paragraph)
        currentSize += paragraphSize
    }
    
    // 4. 处理最后一个分片
    if currentSize > 0 {
        slice := &DocumentSlice{
            Content:   currentSlice.String(),
            WordCount: currentSize,
            Index:     len(slices),
        }
        slices = append(slices, slice)
    }
    
    return slices, nil
}

// DocumentSlice 文档分片
type DocumentSlice struct {
    Content   string  `json:"content"`
    WordCount int     `json:"word_count"`
    Index     int     `json:"index"`
    Vector    []float32 `json:"vector,omitempty"`
}

6.3 向量化服务

6.3.1 向量化引擎

文件位置: backend/infra/embedding/engine.go

// EmbeddingEngine 向量化引擎接口
type EmbeddingEngine interface {
    Embed(ctx context.Context, texts []string) ([][]float32, error)
    GetDimension() int
    GetModel() string
}

// OpenAIEmbeddingEngine OpenAI向量化引擎
type OpenAIEmbeddingEngine struct {
    client    *openai.Client
    model     string
    dimension int
}

func (e *OpenAIEmbeddingEngine) Embed(ctx context.Context, texts []string) ([][]float32, error) {
    // 1. 构建请求
    req := openai.EmbeddingRequest{
        Input: texts,
        Model: openai.EmbeddingModel(e.model),
    }
    
    // 2. 调用OpenAI API
    resp, err := e.client.CreateEmbeddings(ctx, req)
    if err != nil {
        return nil, fmt.Errorf("调用OpenAI向量化API失败: %w", err)
    }
    
    // 3. 提取向量数据
    vectors := make([][]float32, len(resp.Data))
    for i, embedding := range resp.Data {
        vectors[i] = make([]float32, len(embedding.Embedding))
        for j, val := range embedding.Embedding {
            vectors[i][j] = float32(val)
        }
    }
    
    return vectors, nil
}

6.4 向量存储服务

6.4.1 Milvus向量数据库

文件位置: backend/infra/searchstore/milvus/client.go

// MilvusClient Milvus客户端
type MilvusClient struct {
    client milvus.Client
    config *MilvusConfig
}

// MilvusConfig Milvus配置
type MilvusConfig struct {
    Host       string `yaml:"host"`
    Port       int    `yaml:"port"`
    Username   string `yaml:"username"`
    Password   string `yaml:"password"`
    Database   string `yaml:"database"`
}

// CreateCollection 创建集合
func (c *MilvusClient) CreateCollection(ctx context.Context, collectionName string, dimension int) error {
    // 1. 定义字段
    fields := []*entity.Field{
        {
            Name:       "id",
            DataType:   entity.FieldTypeInt64,
            PrimaryKey: true,
            AutoID:     false,
        },
        {
            Name:     "knowledge_id",
            DataType: entity.FieldTypeInt64,
        },
        {
            Name:     "document_id",
            DataType: entity.FieldTypeInt64,
        },
        {
            Name:     "slice_id",
            DataType: entity.FieldTypeInt64,
        },
        {
            Name:     "content",
            DataType: entity.FieldTypeVarChar,
            TypeParams: map[string]string{
                "max_length": "65535",
            },
        },
        {
            Name:     "vector",
            DataType: entity.FieldTypeFloatVector,
            TypeParams: map[string]string{
                "dim": fmt.Sprintf("%d", dimension),
            },
        },
    }
    
    // 2. 创建集合
    schema := &entity.Schema{
        CollectionName: collectionName,
        Description:    "知识库向量集合",
        Fields:         fields,
    }
    
    err := c.client.CreateCollection(ctx, schema, entity.DefaultShardNumber)
    if err != nil {
        return fmt.Errorf("创建Milvus集合失败: %w", err)
    }
    
    // 3. 创建索引
    indexParam := entity.NewIndexIvfFlat(entity.L2, 1024)
    err = c.client.CreateIndex(ctx, collectionName, "vector", indexParam, false)
    if err != nil {
        return fmt.Errorf("创建向量索引失败: %w", err)
    }
    
    return nil
}

// InsertVectors 插入向量
func (c *MilvusClient) InsertVectors(ctx context.Context, collectionName string, data *VectorData) error {
    // 1. 准备数据
    ids := make([]int64, len(data.IDs))
    knowledgeIDs := make([]int64, len(data.IDs))
    documentIDs := make([]int64, len(data.IDs))
    sliceIDs := make([]int64, len(data.IDs))
    contents := make([]string, len(data.IDs))
    vectors := make([][]float32, len(data.IDs))
    
    for i, item := range data.Items {
        ids[i] = item.ID
        knowledgeIDs[i] = item.KnowledgeID
        documentIDs[i] = item.DocumentID
        sliceIDs[i] = item.SliceID
        contents[i] = item.Content
        vectors[i] = item.Vector
    }
    
    // 2. 构建列数据
    columns := []entity.Column{
        entity.NewColumnInt64("id", ids),
        entity.NewColumnInt64("knowledge_id", knowledgeIDs),
        entity.NewColumnInt64("document_id", documentIDs),
        entity.NewColumnInt64("slice_id", sliceIDs),
        entity.NewColumnVarChar("content", contents),
        entity.NewColumnFloatVector("vector", dimension, vectors),
    }
    
    // 3. 插入数据
    _, err := c.client.Insert(ctx, collectionName, "", columns...)
    if err != nil {
        return fmt.Errorf("插入向量数据失败: %w", err)
    }
    
    return nil
}

6.5 消息队列服务

6.5.1 事件总线

文件位置: backend/infra/eventbus/eventbus.go

// EventBus 事件总线接口
type EventBus interface {
    Publish(ctx context.Context, topic string, event interface{}) error
    Subscribe(topic string, handler EventHandler) error
    Start(ctx context.Context) error
    Stop() error
}

// EventHandler 事件处理器
type EventHandler func(ctx context.Context, event interface{}) error

// KafkaEventBus Kafka事件总线
type KafkaEventBus struct {
    producer sarama.SyncProducer
    consumer sarama.ConsumerGroup
    config   *KafkaConfig
    handlers map[string][]EventHandler
}

// KafkaConfig Kafka配置
type KafkaConfig struct {
    Brokers  []string `yaml:"brokers"`
    GroupID  string   `yaml:"group_id"`
    Username string   `yaml:"username"`
    Password string   `yaml:"password"`
}

// Publish 发布事件
func (k *KafkaEventBus) Publish(ctx context.Context, topic string, event interface{}) error {
    // 1. 序列化事件
    data, err := json.Marshal(event)
    if err != nil {
        return fmt.Errorf("序列化事件失败: %w", err)
    }
    
    // 2. 构建消息
    msg := &sarama.ProducerMessage{
        Topic: topic,
        Value: sarama.StringEncoder(data),
        Headers: []sarama.RecordHeader{
            {
                Key:   []byte("event_type"),
                Value: []byte(reflect.TypeOf(event).Name()),
            },
            {
                Key:   []byte("timestamp"),
                Value: []byte(fmt.Sprintf("%d", time.Now().Unix())),
            },
        },
    }
    
    // 3. 发送消息
    _, _, err = k.producer.SendMessage(msg)
    if err != nil {
        return fmt.Errorf("发送Kafka消息失败: %w", err)
    }
    
    return nil
}

6.6 搜索服务

6.6.1 ElasticSearch

文件位置: backend/infra/es/client.go

// ESClient ElasticSearch客户端
type ESClient struct {
    client *elasticsearch.Client
    config *ESConfig
}

// ESConfig ElasticSearch配置
type ESConfig struct {
    Addresses []string `yaml:"addresses"`
    Username  string   `yaml:"username"`
    Password  string   `yaml:"password"`
    Index     string   `yaml:"index"`
}

// CreateKnowledgeIndex 创建知识库索引
func (c *ESClient) CreateKnowledgeIndex(ctx context.Context, indexName string) error {
    // 1. 定义索引映射
    mapping := map[string]interface{}{
        "mappings": map[string]interface{}{
            "properties": map[string]interface{}{
                "knowledge_id": map[string]interface{}{
                    "type": "long",
                },
                "name": map[string]interface{}{
                    "type":     "text",
                    "analyzer": "ik_max_word",
                },
                "description": map[string]interface{}{
                    "type":     "text",
                    "analyzer": "ik_max_word",
                },
                "content": map[string]interface{}{
                    "type":     "text",
                    "analyzer": "ik_max_word",
                },
                "space_id": map[string]interface{}{
                    "type": "long",
                },
                "creator_id": map[string]interface{}{
                    "type": "long",
                },
                "created_at": map[string]interface{}{
                    "type": "date",
                },
                "status": map[string]interface{}{
                    "type": "integer",
                },
            },
        },
        "settings": map[string]interface{}{
            "number_of_shards":   1,
            "number_of_replicas": 1,
            "analysis": map[string]interface{}{
                "analyzer": map[string]interface{}{
                    "ik_max_word": map[string]interface{}{
                        "type":      "ik_max_word",
                        "tokenizer": "ik_max_word",
                    },
                },
            },
        },
    }
    
    // 2. 创建索引
    mappingJSON, _ := json.Marshal(mapping)
    req := esapi.IndicesCreateRequest{
        Index: indexName,
        Body:  strings.NewReader(string(mappingJSON)),
    }
    
    res, err := req.Do(ctx, c.client)
    if err != nil {
        return fmt.Errorf("创建ES索引失败: %w", err)
    }
    defer res.Body.Close()
    
    if res.IsError() {
        return fmt.Errorf("创建ES索引失败: %s", res.String())
    }
    
    return nil
}

6.7 配置管理

6.7.1 配置中心

文件位置: backend/infra/config/config.go

// Config 应用配置
type Config struct {
    Server    ServerConfig    `yaml:"server"`
    Database  DatabaseConfig  `yaml:"database"`
    Redis     RedisConfig     `yaml:"redis"`
    Kafka     KafkaConfig     `yaml:"kafka"`
    Milvus    MilvusConfig    `yaml:"milvus"`
    ES        ESConfig        `yaml:"elasticsearch"`
    Embedding EmbeddingConfig `yaml:"embedding"`
}

// ServerConfig 服务器配置
type ServerConfig struct {
    Host string `yaml:"host"`
    Port int    `yaml:"port"`
    Mode string `yaml:"mode"`
}

// DatabaseConfig 数据库配置
type DatabaseConfig struct {
    MySQL MySQLConfig `yaml:"mysql"`
}

// EmbeddingConfig 向量化配置
type EmbeddingConfig struct {
    Provider  string `yaml:"provider"`
    Model     string `yaml:"model"`
    APIKey    string `yaml:"api_key"`
    Dimension int    `yaml:"dimension"`
}

// LoadConfig 加载配置
func LoadConfig(configPath string) (*Config, error) {
    // 1. 读取配置文件
    data, err := ioutil.ReadFile(configPath)
    if err != nil {
        return nil, fmt.Errorf("读取配置文件失败: %w", err)
    }
    
    // 2. 解析YAML配置
    var config Config
    err = yaml.Unmarshal(data, &config)
    if err != nil {
        return nil, fmt.Errorf("解析配置文件失败: %w", err)
    }
    
    // 3. 环境变量覆盖
    err = envconfig.Process("", &config)
    if err != nil {
        return nil, fmt.Errorf("处理环境变量失败: %w", err)
    }
    
    return &config, nil
}

6.8 基础设施层总结

基础设施层为知识库创建功能提供了完整的技术支撑:

  1. 数据存储: MySQL主数据库 + Redis缓存
  2. 文档处理: 多格式文档解析 + 智能分片
  3. 向量化: OpenAI/本地模型向量化
  4. 向量存储: Milvus向量数据库
  5. 搜索引擎: ElasticSearch全文搜索
  6. 消息队列: Kafka事件驱动
  7. 配置管理: 统一配置中心

这些基础设施服务通过依赖注入的方式集成到上层业务逻辑中,确保了系统的可扩展性和可维护性。

7. 数据存储层

7.1 数据库表结构

knowledge_base 表设计

文件位置:helm/charts/opencoze/files/mysql/schema.sql

真实DDL结构

CREATE TABLE IF NOT EXISTS `knowledge_base` (
  `id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'knowledge base id',
  `space_id` bigint NOT NULL COMMENT 'space id',
  `creator_id` bigint NOT NULL COMMENT 'creator user id',
  `name` varchar(255) NOT NULL COMMENT 'knowledge base name',
  `description` text NULL COMMENT 'knowledge base description',
  `icon_uri` varchar(255) NULL COMMENT 'icon uri',
  `status` int NOT NULL DEFAULT 1 COMMENT 'status: 1-active, 2-deleted',
  `embedding_model` varchar(100) NOT NULL COMMENT 'embedding model name',
  `chunk_size` int NOT NULL DEFAULT 1000 COMMENT 'document chunk size',
  `chunk_overlap` int NOT NULL DEFAULT 200 COMMENT 'chunk overlap size',
  `document_count` int NOT NULL DEFAULT 0 COMMENT 'total document count',
  `total_size` bigint NOT NULL DEFAULT 0 COMMENT 'total storage size in bytes',
  `settings` json NULL COMMENT 'knowledge base settings',
  `created_at` bigint unsigned NOT NULL DEFAULT 0 COMMENT 'Create Time in Milliseconds',
  `updated_at` bigint unsigned NOT NULL DEFAULT 0 COMMENT 'Update Time in Milliseconds',
  PRIMARY KEY (`id`),
  INDEX `idx_creator_id` (`creator_id`),
  INDEX `idx_space_id` (`space_id`),
  INDEX `idx_status` (`status`),
  INDEX `idx_created_at` (`created_at`)
) ENGINE=InnoDB CHARSET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT 'knowledge_base';
knowledge_document 表设计

真实DDL结构

CREATE TABLE IF NOT EXISTS `knowledge_document` (
  `id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'document id',
  `knowledge_base_id` bigint NOT NULL COMMENT 'knowledge base id',
  `name` varchar(255) NOT NULL COMMENT 'document name',
  `file_type` varchar(50) NOT NULL COMMENT 'file type: pdf, txt, docx, etc',
  `file_size` bigint NOT NULL COMMENT 'file size in bytes',
  `file_path` varchar(500) NOT NULL COMMENT 'file storage path',
  `content_hash` varchar(64) NOT NULL COMMENT 'content hash for deduplication',
  `chunk_count` int NOT NULL DEFAULT 0 COMMENT 'total chunk count',
  `processing_status` int NOT NULL DEFAULT 1 COMMENT 'processing status: 1-pending, 2-processing, 3-completed, 4-failed',
  `error_message` text NULL COMMENT 'error message if processing failed',
  `metadata` json NULL COMMENT 'document metadata',
  `created_at` bigint unsigned NOT NULL DEFAULT 0 COMMENT 'Create Time in Milliseconds',
  `updated_at` bigint unsigned NOT NULL DEFAULT 0 COMMENT 'Update Time in Milliseconds',
  PRIMARY KEY (`id`),
  INDEX `idx_knowledge_base_id` (`knowledge_base_id`),
  INDEX `idx_processing_status` (`processing_status`),
  INDEX `idx_content_hash` (`content_hash`)
) ENGINE=InnoDB CHARSET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT 'knowledge_document';
knowledge_chunk 表设计

真实DDL结构

CREATE TABLE IF NOT EXISTS `knowledge_chunk` (
  `id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'chunk id',
  `knowledge_base_id` bigint NOT NULL COMMENT 'knowledge base id',
  `document_id` bigint NOT NULL COMMENT 'document id',
  `chunk_index` int NOT NULL COMMENT 'chunk index in document',
  `content` text NOT NULL COMMENT 'chunk content',
  `content_hash` varchar(64) NOT NULL COMMENT 'content hash',
  `token_count` int NOT NULL DEFAULT 0 COMMENT 'token count',
  `embedding_vector` json NULL COMMENT 'embedding vector data',
  `metadata` json NULL COMMENT 'chunk metadata',
  `created_at` bigint unsigned NOT NULL DEFAULT 0 COMMENT 'Create Time in Milliseconds',
  `updated_at` bigint unsigned NOT NULL DEFAULT 0 COMMENT 'Update Time in Milliseconds',
  PRIMARY KEY (`id`),
  INDEX `idx_knowledge_base_id` (`knowledge_base_id`),
  INDEX `idx_document_id` (`document_id`),
  INDEX `idx_content_hash` (`content_hash`)
) ENGINE=InnoDB CHARSET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT 'knowledge_chunk';

表结构特点

  1. 关联设计:knowledge_base、knowledge_document和knowledge_chunk通过外键关联,支持级联查询
  2. 空间隔离:通过 space_id 实现多租户数据隔离
  3. JSON存储settingsmetadataembedding_vector使用JSON类型,支持复杂结构数据
  4. 状态管理:knowledge_document表包含处理状态字段,支持异步处理流程
  5. 索引优化:在关键查询字段上建立索引,优化查询性能
  6. 字符集:使用 utf8mb4_0900_ai_ci 排序规则,支持完整的Unicode字符集
  7. 向量存储:支持嵌入向量的JSON存储,便于语义搜索
  8. 去重机制:通过content_hash实现内容去重

knowledge_base字段详解

  • id:自增主键,唯一标识每个知识库
  • space_id:工作空间ID,实现租户级别的数据隔离
  • creator_id:创建者用户ID,用于权限控制和查询优化
  • name:知识库名称
  • description:知识库描述信息
  • icon_uri:知识库图标URI
  • status:知识库状态(1-活跃,2-已删除)
  • embedding_model:嵌入模型名称
  • chunk_size:文档分块大小
  • chunk_overlap:分块重叠大小
  • document_count:文档总数
  • total_size:总存储大小(字节)
  • settings:知识库设置,JSON格式
  • created_at/updated_at:毫秒级时间戳,记录创建和更新时间

knowledge_document字段详解

  • id:自增主键,唯一标识每个文档
  • knowledge_base_id:关联的知识库ID
  • name:文档名称
  • file_type:文件类型(pdf、txt、docx等)
  • file_size:文件大小(字节)
  • file_path:文件存储路径
  • content_hash:内容哈希,用于去重
  • chunk_count:分块总数
  • processing_status:处理状态(1-待处理,2-处理中,3-已完成,4-失败)
  • error_message:处理失败时的错误信息
  • metadata:文档元数据,JSON格式
  • created_at/updated_at:毫秒级时间戳,记录创建和更新时间

knowledge_chunk字段详解

  • id:自增主键,唯一标识每个分块
  • knowledge_base_id:关联的知识库ID
  • document_id:关联的文档ID
  • chunk_index:在文档中的分块索引
  • content:分块内容
  • content_hash:内容哈希
  • token_count:令牌数量
  • embedding_vector:嵌入向量数据,JSON格式
  • metadata:分块元数据,JSON格式
  • created_at/updated_at:毫秒级时间戳,记录创建和更新时间

7.2 ElasticSearch 索引架构

coze_resource 统一索引

索引设计理念
Coze平台采用统一索引策略,将所有资源类型(插件、工作流、知识库、提示词、数据库等)存储在同一个 coze_resource 索引中,通过 res_type 字段进行类型区分。

知识库在索引中的映射

{
  "mappings": {
    "properties": {
      "res_id": {
        "type": "long",
        "description": "资源ID,对应knowledge_base.id"
      },
      "res_type": {
        "type": "integer", 
        "description": "资源类型,知识库为4"
      },
      "name": {
        "type": "text",
        "analyzer": "standard",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        },
        "description": "知识库名称,支持全文搜索和精确匹配"
      },
      "description": {
        "type": "text",
        "analyzer": "standard",
        "description": "知识库描述,支持全文搜索"
      },
      "owner_id": {
        "type": "long",
        "description": "所有者ID,对应creator_id"
      },
      "space_id": {
        "type": "long",
        "description": "工作空间ID"
      },
      "embedding_model": {
        "type": "keyword",
        "description": "嵌入模型名称"
      },
      "document_count": {
        "type": "integer",
        "description": "文档数量"
      },
      "total_size": {
        "type": "long",
        "description": "总存储大小"
      },
      "status": {
        "type": "integer",
        "description": "知识库状态"
      },
      "create_time": {
        "type": "long",
        "description": "创建时间戳(毫秒)"
      },
      "update_time": {
        "type": "long",
        "description": "更新时间戳(毫秒)"
      }
    }
  }
}
knowledge_content 内容索引

知识库内容专用索引

{
  "mappings": {
    "properties": {
      "chunk_id": {
        "type": "long",
        "description": "分块ID"
      },
      "knowledge_base_id": {
        "type": "long",
        "description": "知识库ID"
      },
      "document_id": {
        "type": "long",
        "description": "文档ID"
      },
      "content": {
        "type": "text",
        "analyzer": "ik_max_word",
        "search_analyzer": "ik_smart",
        "description": "分块内容,支持中文分词"
      },
      "embedding_vector": {
        "type": "dense_vector",
        "dims": 1536,
        "description": "嵌入向量,用于语义搜索"
      },
      "metadata": {
        "type": "object",
        "description": "分块元数据"
      },
      "token_count": {
        "type": "integer",
        "description": "令牌数量"
      }
    }
  }
}

资源类型常量定义

const (
    ResTypePlugin    = 1  // 插件
    ResTypeWorkflow  = 2  // 工作流
    ResTypeKnowledge = 4  // 知识库
    ResTypePrompt    = 6  // 提示词
    ResTypeDatabase  = 7  // 数据库
)

7.3 数据同步机制

事件驱动的创建同步架构

创建同步流程

  1. 创建操作触发:知识库创建操作触发创建领域事件
  2. 事件发布:通过事件总线发布 ResourceDomainEvent 创建事件
  3. 事件处理resourceHandlerImpl 监听并处理创建事件
  4. 索引建立:将创建操作同步到ElasticSearch,建立相关索引
  5. 向量存储:同时在向量数据库中创建知识库向量空间

创建同步核心代码

// 资源创建事件处理器
type resourceHandlerImpl struct {
    esClient     es.Client
    vectorClient vector.Client
    logger       logs.Logger
}

// 处理知识库创建领域事件
func (r *resourceHandlerImpl) HandleKnowledgeCreateEvent(ctx context.Context, event *entity.ResourceDomainEvent) error {
    if event.OpType != entity.Created {
        return fmt.Errorf("invalid operation type for create handler: %v", event.OpType)
    }
    
    // 记录创建操作日志
    r.logger.InfoCtx(ctx, "Processing knowledge base create event", 
        "knowledge_base_id", event.ResID,
        "space_id", event.SpaceID,
        "operator_id", event.OperatorID)
    
    // 创建ES索引
    if err := r.createResourceIndex(ctx, event); err != nil {
        return fmt.Errorf("create resource index failed: %w", err)
    }
    
    // 创建向量空间
    if err := r.createVectorSpace(ctx, event); err != nil {
        r.logger.WarnCtx(ctx, "Failed to create vector space", 
            "knowledge_base_id", event.ResID, "error", err)
        // 向量空间创建失败不阻塞主流程
    }
    
    return nil
}

// 在索引中创建知识库
func (r *resourceHandlerImpl) createResourceIndex(ctx context.Context, event *entity.ResourceDomainEvent) error {
    indexName := "coze_resource"
    docID := conv.Int64ToStr(event.ResID)
    
    // 构建索引文档
    document := map[string]interface{}{
        "res_id": event.ResID,
        "res_type": 4, // 知识库类型
        "name": event.Name,
        "description": event.Description,
        "owner_id": event.OperatorID,
        "space_id": event.SpaceID,
        "embedding_model": event.EmbeddingModel,
        "document_count": 0,
        "total_size": 0,
        "status": 1,
        "create_time": event.CreateTime,
        "update_time": event.UpdateTime,
    }
    
    // 执行索引创建
    err := r.esClient.Create(ctx, indexName, docID, document)
    if err != nil {
        r.logger.ErrorCtx(ctx, "Failed to create knowledge base index", 
            "knowledge_base_id", event.ResID, "error", err)
        return fmt.Errorf("create knowledge base ES index failed: %w", err)
    }
    
    // 验证创建结果
    exists, checkErr := r.esClient.Exists(ctx, indexName, docID)
    if checkErr != nil {
        r.logger.WarnCtx(ctx, "Failed to verify creation", 
            "knowledge_base_id", event.ResID, "error", checkErr)
    } else if !exists {
        r.logger.ErrorCtx(ctx, "Knowledge base index not found after creation", 
            "knowledge_base_id", event.ResID)
        return fmt.Errorf("knowledge base creation verification failed")
    }
    
    r.logger.InfoCtx(ctx, "Successfully created knowledge base index", 
        "knowledge_base_id", event.ResID)
    return nil
}

// 创建向量空间
func (r *resourceHandlerImpl) createVectorSpace(ctx context.Context, event *entity.ResourceDomainEvent) error {
    spaceName := fmt.Sprintf("kb_%d", event.ResID)
    
    // 创建向量集合
    err := r.vectorClient.CreateCollection(ctx, &vector.CreateCollectionRequest{
        CollectionName: spaceName,
        Dimension:      1536, // OpenAI embedding维度
        MetricType:     "COSINE",
        Description:    fmt.Sprintf("Vector space for knowledge base %d", event.ResID),
    })
    
    if err != nil {
        return fmt.Errorf("create vector collection failed: %w", err)
    }
    
    r.logger.InfoCtx(ctx, "Successfully created vector space", 
        "knowledge_base_id", event.ResID, "collection_name", spaceName)
    return nil
}

7.4 知识库创建操作存储层设计原则

知识库创建数据一致性保证
  1. 创建一致性:采用事件驱动模式,保证MySQL创建和ElasticSearch索引建立的最终一致性
  2. 创建幂等性:知识库创建操作支持重试,避免重复创建导致的数据冲突
  3. 创建事务边界:知识库数据库创建操作和创建事件发布在同一事务中,保证原子性
  4. 创建验证:知识库创建完成后验证数据确实被正确存储,确保创建操作的完整性
  5. 向量空间创建:确保知识库创建时同步创建向量存储空间,维护数据完整性
知识库创建性能优化策略
  1. 创建索引优化:基于知识库主键ID的创建操作,具有最佳性能
  2. 批量创建:支持批量创建知识库操作,减少数据库和ES的操作次数
  3. 异步创建处理:知识库创建事件处理采用异步模式,不阻塞创建主流程
  4. 创建缓存预热:创建后及时预热知识库相关缓存,提高后续访问性能
  5. 分批向量创建:向量空间采用分批创建策略,避免大量向量创建时的性能问题
知识库创建操作扩展性考虑
  1. 分片创建:支持按 space_id 进行分片创建,提高大规模知识库创建的效率
  2. 创建队列:使用消息队列处理知识库创建事件,支持高并发创建场景
  3. 创建监控:独立的知识库创建操作监控,及时发现创建异常
  4. 多存储协调:协调MySQL、ElasticSearch、向量数据库等多存储的创建操作
知识库创建安全保障
  1. 权限验证:严格的知识库创建权限验证,确保只有授权用户可以创建
  2. 创建审计:完整的知识库创建操作审计日志,支持创建行为追踪
  3. 创建限制:实施知识库创建频率限制,防止恶意批量创建
  4. 数据备份:创建操作完成后及时备份知识库数据,支持数据恢复
  5. 向量验证:创建知识库时验证向量空间的创建完整性
  6. 重复检查:创建前检查知识库名称和配置是否重复,避免冲突

7.5 知识库创建操作监控和运维

知识库创建操作监控
// 知识库创建操作监控指标
type KnowledgeCreateMetrics struct {
    KnowledgeCreateSuccessCount int64         // 知识库创建成功次数
    KnowledgeCreateFailureCount int64         // 知识库创建失败次数
    KnowledgeCreateLatency      time.Duration // 知识库创建操作延迟
    LastKnowledgeCreateTime     time.Time     // 最后知识库创建时间
    KnowledgeIndexCreateCount   int64         // 知识库索引创建次数
    KnowledgeCreateEventCount   int64         // 知识库创建事件处理次数
    VectorSpaceCreateCount      int64         // 向量空间创建次数
    KnowledgeCreateQueueSize    int64         // 知识库创建队列大小
    KnowledgeCreateRateLimit    int64         // 知识库创建频率限制触发次数
    KnowledgeDuplicateCount     int64         // 知识库重复创建检测次数
    DocumentProcessingCount     int64         // 文档处理次数
    EmbeddingGenerationCount    int64         // 向量生成次数
}

// 知识库创建监控指标收集
func (r *resourceHandlerImpl) collectKnowledgeCreateMetrics(ctx context.Context, startTime time.Time, knowledgeID int64, err error) {
    latency := time.Since(startTime)
    if err != nil {
        metrics.KnowledgeCreateFailureCount++
        log.ErrorCtx(ctx, "knowledge base create failed", 
            "knowledge_id", knowledgeID, "error", err, "latency", latency)
    } else {
        metrics.KnowledgeCreateSuccessCount++
        metrics.KnowledgeCreateLatency = latency
        metrics.LastKnowledgeCreateTime = time.Now()
        log.InfoCtx(ctx, "knowledge base create succeeded", 
            "knowledge_id", knowledgeID, "latency", latency)
    }
}

// 知识库创建操作健康检查
func (r *resourceHandlerImpl) knowledgeCreateHealthCheck(ctx context.Context) error {
    // 检查数据库连接
    if err := r.db.Ping(); err != nil {
        return fmt.Errorf("database connection failed: %w", err)
    }
    
    // 检查ES连接
    if _, err := r.esClient.Ping(ctx); err != nil {
        return fmt.Errorf("elasticsearch connection failed: %w", err)
    }
    
    // 检查向量数据库连接
    if err := r.vectorClient.Ping(ctx); err != nil {
        return fmt.Errorf("vector database connection failed: %w", err)
    }
    
    // 检查知识库创建队列状态
    if queueSize := r.getKnowledgeCreateQueueSize(); queueSize > 1000 {
        return fmt.Errorf("knowledge create queue size too large: %d", queueSize)
    }
    
    // 检查向量空间创建状态
    if vectorErrors := r.getVectorSpaceCreateErrors(); len(vectorErrors) > 10 {
        return fmt.Errorf("too many vector space create errors: %d", len(vectorErrors))
    }
    
    // 检查创建频率限制状态
    if rateLimitHits := r.getCreateRateLimitHits(); rateLimitHits > 100 {
        return fmt.Errorf("too many rate limit hits: %d", rateLimitHits)
    }
    
    // 检查文档处理队列状态
    if docQueueSize := r.getDocumentProcessingQueueSize(); docQueueSize > 5000 {
        return fmt.Errorf("document processing queue size too large: %d", docQueueSize)
    }
    
    return nil
}
知识库创建数据质量保证
  1. 创建一致性检查:定期验证MySQL、ElasticSearch和向量数据库中知识库创建数据的一致性
  2. 创建完整性验证:确保知识库创建操作完全建立了相关数据、索引和向量空间
  3. 向量空间验证:验证知识库创建时向量空间的创建完整性和配置正确性
  4. 创建异常恢复:提供知识库创建失败的重试和修复机制
  5. 创建性能监控:监控知识库创建操作性能,及时发现和解决性能问题
  6. 创建审计追踪:完整记录知识库创建操作的执行过程和结果
  7. 多存储一致性:确保MySQL、ElasticSearch、向量数据库等多存储创建的一致性
  8. 重复检测:检测和防止知识库重复创建,维护数据唯一性
  9. 创建回滚机制:创建失败时的数据回滚和清理机制
  10. 文档处理监控:监控知识库创建过程中的文档处理和向量化进度
  11. 存储配额检查:创建前检查存储配额,确保有足够空间存储知识库数据
  12. 嵌入模型验证:验证知识库创建时指定的嵌入模型配置正确性

8. 知识库创建安全和权限验证机制

8.1 知识库创建身份认证

JWT Token验证

  • 创建知识库的所有API请求都需要携带有效的JWT Token
  • Token包含用户ID、工作空间权限等关键信息
  • 通过中间件统一验证Token的有效性和完整性
// 知识库创建身份验证中间件
func KnowledgeCreateAuthMiddleware() app.HandlerFunc {
    return func(c context.Context, ctx *app.RequestContext) {
        token := ctx.GetHeader("Authorization")
        if token == nil {
            ctx.JSON(401, gin.H{"error": "创建知识库需要登录认证"})
            ctx.Abort()
            return
        }
        
        userInfo, err := validateJWTToken(string(token))
        if err != nil {
            ctx.JSON(401, gin.H{"error": "Token无效,无法创建知识库"})
            ctx.Abort()
            return
        }
        
        // 验证用户是否有创建知识库的权限
        if !userInfo.HasKnowledgeCreatePermission {
            ctx.JSON(403, gin.H{"error": "用户无创建知识库权限"})
            ctx.Abort()
            return
        }
        
        ctx.Set("user_id", userInfo.UserID)
        ctx.Set("space_id", userInfo.SpaceID)
        ctx.Set("creator_id", userInfo.UserID)
        ctx.Next()
    }
}

8.2 知识库创建工作空间权限控制

空间隔离机制

  • 每个用户只能在其所属工作空间中创建知识库
  • 通过 space_id 字段实现知识库创建权限隔离
  • 在知识库创建操作中强制验证空间权限
// 知识库创建工作空间权限验证
func (s *KnowledgeApplicationService) validateKnowledgeCreateSpacePermission(ctx context.Context, req *service.CreateKnowledgeRequest) error {
    userSpaceID := ctx.Value("space_id").(int64)
    
    // 验证请求的空间ID是否与用户所属空间一致
    if req.SpaceID != userSpaceID {
        return errors.New("无权限在该工作空间创建知识库")
    }
    
    // 检查工作空间是否允许创建知识库
    spaceConfig, err := s.spaceService.GetSpaceConfig(ctx, userSpaceID)
    if err != nil {
        return fmt.Errorf("获取工作空间配置失败: %w", err)
    }
    
    if !spaceConfig.AllowKnowledgeCreation {
        return errors.New("该工作空间不允许创建知识库")
    }
    
    // 检查工作空间知识库数量限制
    knowledgeCount, err := s.getSpaceKnowledgeCount(ctx, userSpaceID)
    if err != nil {
        return fmt.Errorf("获取工作空间知识库数量失败: %w", err)
    }
    
    if knowledgeCount >= spaceConfig.MaxKnowledgeCount {
        return fmt.Errorf("工作空间知识库数量已达上限: %d", spaceConfig.MaxKnowledgeCount)
    }
    
    // 检查工作空间存储配额
    storageUsage, err := s.getSpaceStorageUsage(ctx, userSpaceID)
    if err != nil {
        return fmt.Errorf("获取工作空间存储使用量失败: %w", err)
    }
    
    if storageUsage >= spaceConfig.MaxStorageQuota {
        return fmt.Errorf("工作空间存储配额已满: %d GB", spaceConfig.MaxStorageQuota/1024/1024/1024)
    }
    
    return nil
}

8.3 知识库创建资源级权限验证

知识库创建用户权限验证

  • 严格验证用户是否具有知识库创建权限
  • 验证用户在指定工作空间的操作权限
  • 通过存储配额和向量空间权限进行资源级控制
// 知识库创建权限验证
func (s *KnowledgeApplicationService) validateKnowledgeCreatePermission(ctx context.Context, req *service.CreateKnowledgeRequest) error {
    userID := ctx.Value("user_id").(int64)
    
    // 验证用户是否具有知识库创建权限
    hasPermission, err := s.userService.HasKnowledgeCreatePermission(ctx, userID)
    if err != nil {
        return fmt.Errorf("验证知识库创建权限失败: %w", err)
    }
    
    if !hasPermission {
        return errorx.New(errno.ErrKnowledgePermissionCode, 
            errorx.KV(errno.KnowledgeMsgKey, "用户无知识库创建权限"),
            errorx.KV("user_id", userID))
    }
    
    // 验证工作空间权限
    spacePermission, err := s.spaceService.CheckUserSpacePermission(ctx, userID, req.SpaceID)
    if err != nil {
        return fmt.Errorf("验证工作空间权限失败: %w", err)
    }
    
    if !spacePermission.CanCreateKnowledge {
        return errorx.New(errno.ErrKnowledgeSpacePermissionCode, 
            errorx.KV(errno.KnowledgeMsgKey, "用户在该工作空间无知识库创建权限"),
            errorx.KV("user_id", userID),
            errorx.KV("space_id", req.SpaceID))
    }
    
    // 检查用户创建知识库频率限制
    createCount, err := s.getUserKnowledgeCreateCount(ctx, userID, time.Now().Add(-24*time.Hour))
    if err != nil {
        return fmt.Errorf("检查知识库创建频率失败: %w", err)
    }
    
    if createCount >= 20 { // 24小时内最多创建20个知识库
        return errorx.New(errno.ErrKnowledgeCreateRateLimitCode, 
            errorx.KV("user_id", userID),
            errorx.KV("create_count", createCount))
    }
    
    // 检查知识库名称是否重复
    exists, err := s.checkKnowledgeNameExists(ctx, req.SpaceID, req.Name)
    if err != nil {
        return fmt.Errorf("检查知识库名称重复失败: %w", err)
    }
    
    if exists {
        return errorx.New(errno.ErrKnowledgeNameExistsCode, 
            errorx.KV("knowledge_name", req.Name),
            errorx.KV("space_id", req.SpaceID))
    }
    
    // 检查存储配额
    storageQuota, err := s.checkStorageQuota(ctx, userID, req.SpaceID)
    if err != nil {
        return fmt.Errorf("检查存储配额失败: %w", err)
    }
    
    if !storageQuota.CanCreate {
        return errorx.New(errno.ErrKnowledgeStorageQuotaExceededCode, 
            errorx.KV("user_id", userID),
            errorx.KV("used_storage", storageQuota.UsedStorage),
            errorx.KV("max_storage", storageQuota.MaxStorage))
    }
    
    // 检查向量空间权限
    vectorPermission, err := s.checkVectorSpacePermission(ctx, userID, req.EmbeddingModel)
    if err != nil {
        return fmt.Errorf("检查向量空间权限失败: %w", err)
    }
    
    if !vectorPermission.CanCreateSpace {
        return errorx.New(errno.ErrKnowledgeVectorSpacePermissionCode, 
            errorx.KV("user_id", userID),
            errorx.KV("embedding_model", req.EmbeddingModel))
    }
    
    return nil
}

// 检查知识库名称是否存在
func (s *KnowledgeApplicationService) checkKnowledgeNameExists(ctx context.Context, spaceID int64, name string) (bool, error) {
    // 检查同一工作空间下是否存在同名知识库
    knowledges, err := s.DomainSVC.ListKnowledges(ctx, &service.ListKnowledgesRequest{
        SpaceID: spaceID,
        PageInfo: entity.PageInfo{PageSize: 1},
    })
    if err != nil {
        return false, err
    }
    
    for _, knowledge := range knowledges.Knowledges {
        if knowledge.Name == name {
            return true, nil
        }
    }
    
    return false, nil
}

// 检查存储配额
func (s *KnowledgeApplicationService) checkStorageQuota(ctx context.Context, userID, spaceID int64) (*StorageQuotaInfo, error) {
    // 获取用户存储配额信息
    quota, err := s.storageService.GetUserStorageQuota(ctx, userID)
    if err != nil {
        return nil, err
    }
    
    // 获取当前使用量
    usage, err := s.storageService.GetUserStorageUsage(ctx, userID)
    if err != nil {
        return nil, err
    }
    
    return &StorageQuotaInfo{
        UsedStorage: usage,
        MaxStorage:  quota,
        CanCreate:   usage < quota*0.95, // 使用率不超过95%
    }, nil
}

// 检查向量空间权限
func (s *KnowledgeApplicationService) checkVectorSpacePermission(ctx context.Context, userID int64, embeddingModel string) (*VectorSpacePermission, error) {
    // 检查用户是否有权限使用指定的嵌入模型
    modelPermission, err := s.embeddingService.CheckModelPermission(ctx, userID, embeddingModel)
    if err != nil {
        return nil, err
    }
    
    // 检查向量空间创建配额
    spaceCount, err := s.vectorService.GetUserVectorSpaceCount(ctx, userID)
    if err != nil {
        return nil, err
    }
    
    maxSpaces := s.getUserMaxVectorSpaces(userID)
    
    return &VectorSpacePermission{
        CanCreateSpace: modelPermission && spaceCount < maxSpaces,
        CurrentSpaces:  spaceCount,
        MaxSpaces:      maxSpaces,
    }, nil
}

8.4 知识库创建API访问控制

创建请求频率限制

  • 实现基于用户的知识库创建频率限制
  • 防止恶意批量创建知识库
  • 支持不同用户等级的差异化创建限流策略
  • 基于文档处理能力的动态限流

创建操作安全验证

  • 严格验证创建请求的合法性
  • 防止恶意创建和资源滥用攻击
  • 使用多重安全检查机制
  • 文档内容安全扫描和验证
  • 向量空间创建安全验证
// 知识库创建参数验证
func validateKnowledgeCreateRequest(req *service.CreateKnowledgeRequest) error {
    if req.SpaceID <= 0 {
        return errors.New("无效的工作空间ID")
    }
    
    if req.CreatorID <= 0 {
        return errors.New("无效的创建者ID")
    }
    
    // 验证知识库名称
    if req.Name == "" {
        return errors.New("知识库名称不能为空")
    }
    
    if len(req.Name) > 100 {
        return errors.New("知识库名称长度不能超过100字符")
    }
    
    // 验证知识库描述
    if req.Description != "" && len(req.Description) > 1000 {
        return errors.New("知识库描述长度不能超过1000字符")
    }
    
    // 验证嵌入模型
    if req.EmbeddingModel == "" {
        return errors.New("嵌入模型不能为空")
    }
    
    if !isValidEmbeddingModel(req.EmbeddingModel) {
        return errors.New("不支持的嵌入模型")
    }
    
    // 验证分块策略
    if req.ChunkStrategy != nil {
        if req.ChunkStrategy.ChunkSize <= 0 || req.ChunkStrategy.ChunkSize > 8192 {
            return errors.New("分块大小必须在1-8192之间")
        }
        
        if req.ChunkStrategy.ChunkOverlap < 0 || req.ChunkStrategy.ChunkOverlap >= req.ChunkStrategy.ChunkSize {
            return errors.New("分块重叠大小必须小于分块大小")
        }
    }
    
    // 验证图标URI
    if req.IconURI != "" && !isValidIconURI(req.IconURI) {
        return errors.New("无效的图标URI格式")
    }
    
    return nil
}

// 知识库创建操作安全检查
func (s *KnowledgeApplicationService) validateKnowledgeCreateSafety(ctx context.Context, req *service.CreateKnowledgeRequest) error {
    userID := ctx.Value("user_id").(int64)
    
    // 检查用户知识库创建频率限制
    createCount, err := s.getUserKnowledgeCreateCount(ctx, userID, time.Now().Add(-24*time.Hour))
    if err != nil {
        return fmt.Errorf("检查知识库创建频率失败: %w", err)
    }
    
    if createCount >= 20 { // 24小时内最多创建20个知识库
        return errorx.New(errno.ErrKnowledgeCreateRateLimitCode, 
            errorx.KV("user_id", userID),
            errorx.KV("create_count", createCount))
    }
    
    // 检查嵌入模型可用性
    modelAvailable, err := s.checkEmbeddingModelAvailable(ctx, req.EmbeddingModel)
    if err != nil {
        return fmt.Errorf("检查嵌入模型可用性失败: %w", err)
    }
    
    if !modelAvailable {
        return errors.New("嵌入模型当前不可用")
    }
    
    // 检查用户存储配额
    storageQuota, err := s.checkUserStorageQuota(ctx, userID)
    if err != nil {
        return fmt.Errorf("检查用户存储配额失败: %w", err)
    }
    
    if !storageQuota.CanCreateKnowledge {
        return errorx.New(errno.ErrKnowledgeStorageQuotaExceededCode, 
            errorx.KV("user_id", userID),
            errorx.KV("used_storage", storageQuota.UsedStorage),
            errorx.KV("max_storage", storageQuota.MaxStorage))
    }
    
    // 检查向量数据库连接
    vectorDBHealthy, err := s.checkVectorDatabaseHealth(ctx)
    if err != nil {
        return fmt.Errorf("检查向量数据库健康状态失败: %w", err)
    }
    
    if !vectorDBHealthy {
        return errors.New("向量数据库当前不可用,无法创建知识库")
    }
    
    // 检查文档处理服务状态
    docProcessorHealthy, err := s.checkDocumentProcessorHealth(ctx)
    if err != nil {
        return fmt.Errorf("检查文档处理服务状态失败: %w", err)
    }
    
    if !docProcessorHealthy {
        return errors.New("文档处理服务当前不可用,无法创建知识库")
    }
    
    return nil
}

// 检查嵌入模型可用性
func (s *KnowledgeApplicationService) checkEmbeddingModelAvailable(ctx context.Context, modelName string) (bool, error) {
    // 检查模型是否在支持列表中
    supportedModels := []string{
        "text-embedding-ada-002",
        "text-embedding-3-small",
        "text-embedding-3-large",
        "bge-large-zh-v1.5",
        "bge-base-zh-v1.5",
    }
    
    for _, model := range supportedModels {
        if model == modelName {
            // 检查模型服务是否可用
            return s.embeddingService.IsModelAvailable(ctx, modelName)
        }
    }
    
    return false, nil
}

// 检查向量数据库健康状态
func (s *KnowledgeApplicationService) checkVectorDatabaseHealth(ctx context.Context) (bool, error) {
    // 发送健康检查请求到向量数据库
    healthCheck := &VectorDBHealthCheck{
        Timeout: 5 * time.Second,
    }
    
    healthy, err := s.vectorService.HealthCheck(ctx, healthCheck)
    if err != nil {
        logs.CtxWarnf(ctx, "Vector database health check failed: %v", err)
        return false, nil
    }
    
    return healthy, nil
}

// 检查文档处理服务健康状态
func (s *KnowledgeApplicationService) checkDocumentProcessorHealth(ctx context.Context) (bool, error) {
    // 检查文档处理队列状态
    queueStatus, err := s.documentService.GetQueueStatus(ctx)
    if err != nil {
        logs.CtxWarnf(ctx, "Document processor queue status check failed: %v", err)
        return false, nil
    }
    
    // 如果队列积压过多,认为服务不健康
    if queueStatus.PendingJobs > 10000 {
        logs.CtxWarnf(ctx, "Document processor queue overloaded: %d pending jobs", queueStatus.PendingJobs)
        return false, nil
    }
    
    return true, nil
}

// 获取用户存储使用量
func (s *PluginApplicationService) getUserStorageUsage(ctx context.Context, userID int64) (int64, error) {
    // 查询用户所有插件的存储使用量
    plugins, err := s.DomainSVC.ListUserPlugins(ctx, userID)
    if err != nil {
        return 0, fmt.Errorf("获取用户插件列表失败: %w", err)
    }
    
    var totalSize int64
    for _, plugin := range plugins {
        // 计算插件manifest和openapi_doc的存储大小
        if plugin.Manifest != nil {
            totalSize += int64(len(plugin.Manifest))
        }
        if plugin.OpenapiDoc != nil {
            totalSize += int64(len(plugin.OpenapiDoc))
        }
    }
    
    return totalSize, nil
}

// 获取用户最大存储配额
func (s *PluginApplicationService) getMaxStorageQuota(userID int64) int64 {
    // 根据用户等级返回不同的存储配额
    // 这里简化处理,实际应该从用户配置中获取
    return 100 * 1024 * 1024 // 100MB
}

// URL格式验证
func isValidURL(urlStr string) bool {
    u, err := url.Parse(urlStr)
    return err == nil && u.Scheme != "" && u.Host != ""
}

// 插件类型验证
func isValidPluginType(pluginType common.PluginType) bool {
    validTypes := []common.PluginType{
        common.PluginTypeHTTP,
        common.PluginTypeLocal,
    }
    
    for _, validType := range validTypes {
        if pluginType == validType {
            return true
        }
    }
    
    return false
}

9. 知识库创建错误处理和日志记录

9.1 知识库创建分层错误处理机制

知识库创建错误分类体系

// 知识库创建错误类型定义
type KnowledgeCreateErrorType int

const (
    // 知识库创建业务错误
    ErrKnowledgeCreateBusiness KnowledgeCreateErrorType = iota + 1000
    ErrKnowledgeNameExists
    ErrKnowledgePermissionDenied
    ErrKnowledgeCreateRateLimit
    ErrKnowledgeInvalidParameters
    ErrKnowledgeEmbeddingModelNotSupported
    ErrKnowledgeStorageQuotaExceeded
    ErrKnowledgeDocumentProcessingFailed
    ErrKnowledgeInvalidFileType
    ErrKnowledgeFileSizeExceeded
    ErrKnowledgeInvalidChunkSize
    ErrKnowledgeInvalidIconURI
    ErrKnowledgeInvalidSpaceID
    ErrKnowledgeDuplicateName
    ErrKnowledgeVectorSpaceCreateFailed
    
    // 知识库创建系统错误
    ErrKnowledgeCreateSystem KnowledgeCreateErrorType = iota + 2000
    ErrKnowledgeDatabaseConnection
    ErrKnowledgeElasticSearchTimeout
    ErrKnowledgeServiceUnavailable
    ErrKnowledgeCreateEventPublishFailed
    ErrKnowledgeIndexCreateFailed
    ErrKnowledgeTransactionRollbackFailed
    ErrKnowledgeVectorStoreTimeout
    ErrKnowledgeIDGenerationFailed
    ErrKnowledgeEmbeddingServiceFailed
    ErrKnowledgeContentIndexFailed
    
    // 知识库创建网络错误
    ErrKnowledgeCreateNetwork KnowledgeCreateErrorType = iota + 3000
    ErrKnowledgeCreateRequestTimeout
    ErrKnowledgeCreateConnectionRefused
    ErrKnowledgeCreateServiceDown
    ErrKnowledgeCreateESConnectionFailed
    ErrKnowledgeVectorDBConnectionFailed
    ErrKnowledgeEmbeddingAPITimeout
)

知识库创建错误处理流程

  1. 捕获阶段:在知识库创建各层级捕获具体错误
  2. 包装阶段:添加知识库创建操作相关上下文信息和错误码
  3. 记录阶段:根据错误级别记录知识库创建操作日志
  4. 响应阶段:返回用户友好的知识库创建错误信息
  5. 回滚阶段:知识库创建失败时进行必要的数据回滚操作
  6. 向量处理:处理向量空间创建失败的错误
  7. 重试机制:对于可重试的创建错误提供重试建议
  8. 用户指导:为常见创建错误提供解决方案指导

9.2 知识库创建统一错误响应格式

// 知识库创建错误响应结构
type KnowledgeCreateErrorResponse struct {
    Code         int    `json:"code"`
    Message      string `json:"message"`
    Details      string `json:"details,omitempty"`
    TraceID      string `json:"trace_id"`
    KnowledgeID  int64  `json:"knowledge_id,omitempty"`
    Operation    string `json:"operation"`
    CanRetry     bool   `json:"can_retry"`
    DocumentsProcessed int    `json:"documents_processed,omitempty"`
    DocumentsFailed    int    `json:"documents_failed,omitempty"`
    ValidationErrors   []string `json:"validation_errors,omitempty"`
    SuggestedFix       string `json:"suggested_fix,omitempty"`
    FieldErrors        map[string]string `json:"field_errors,omitempty"`
    VectorSpaceStatus  string `json:"vector_space_status,omitempty"`
    EmbeddingModel     string `json:"embedding_model,omitempty"`
}

// 知识库创建错误处理中间件
func KnowledgeCreateErrorHandlerMiddleware() app.HandlerFunc {
    return func(c context.Context, ctx *app.RequestContext) {
        defer func() {
            if err := recover(); err != nil {
                traceID := ctx.GetString("trace_id")
                userID := ctx.GetInt64("user_id")
                spaceID := ctx.GetInt64("space_id")
                
                logs.CtxErrorf(c, "Knowledge base creation panic recovered: %v, userID=%d, spaceID=%d, traceID=%s", 
                    err, userID, spaceID, traceID)
                
                ctx.JSON(500, KnowledgeCreateErrorResponse{
                    Code:      5000,
                    Message:   "知识库创建服务器内部错误",
                    TraceID:   traceID,
                    Operation: "create_knowledge",
                    CanRetry:  true,
                    SuggestedFix: "请稍后重试,如果问题持续存在请联系技术支持",
                })
            }
        }()
        ctx.Next()
    }
}

// 插件创建业务错误处理
func handlePluginCreateBusinessError(ctx *app.RequestContext, err error) {
    traceID := ctx.GetString("trace_id")
    
    var response PluginCreateErrorResponse
    response.TraceID = traceID
    response.Operation = "create_plugin"
    
    switch {
    case errors.Is(err, errno.ErrPluginInvalidParamCode):
        response.Code = 400
        response.Message = "插件参数无效"
        response.CanRetry = false
        response.SuggestedFix = "请检查插件名称、描述、服务器URL等参数是否正确"
        
    case errors.Is(err, errno.ErrPluginPermissionCode):
        response.Code = 403
        response.Message = "无权限创建插件"
        response.CanRetry = false
        response.SuggestedFix = "请确保已登录且具有插件创建权限"
        
    case errors.Is(err, errno.ErrPluginInvalidManifest):
        response.Code = 400
        response.Message = "插件清单格式无效"
        response.CanRetry = false
        response.SuggestedFix = "请检查插件清单文件格式是否符合规范"
        
    case errors.Is(err, errno.ErrPluginInvalidOpenapi3Doc):
        response.Code = 400
        response.Message = "OpenAPI文档格式无效"
        response.CanRetry = false
        response.SuggestedFix = "请检查OpenAPI文档格式是否符合OpenAPI 3.0规范"
        
    case errors.Is(err, errno.ErrPluginIDExist):
        response.Code = 409
        response.Message = "插件ID已存在"
        response.CanRetry = false
        response.SuggestedFix = "请使用不同的插件名称或检查是否已存在同名插件"
        
    case errors.Is(err, errno.ErrPluginCreateRateLimit):
        response.Code = 429
        response.Message = "创建操作过于频繁,请稍后再试"
        response.CanRetry = true
        response.SuggestedFix = "请等待一段时间后重试"
        
    case errors.Is(err, errno.ErrPluginStorageQuotaExceeded):
        response.Code = 413
        response.Message = "存储配额已满"
        response.CanRetry = false
        response.SuggestedFix = "请清理不需要的插件或升级存储配额"
        
    case errors.Is(err, errno.ErrPluginServerURLNotAccessible):
        response.Code = 400
        response.Message = "插件服务器URL不可访问"
        response.CanRetry = true
        response.SuggestedFix = "请检查服务器URL是否正确且可访问"
        
    default:
        response.Code = 500
        response.Message = "插件创建失败"
        response.CanRetry = true
        response.SuggestedFix = "请稍后重试,如果问题持续存在请联系技术支持"
    }
    
    ctx.JSON(response.Code, response)
}

// 插件创建系统错误处理
func handlePluginCreateSystemError(ctx *app.RequestContext, err error) {
    traceID := ctx.GetString("trace_id")
    
    var response PluginCreateErrorResponse
    response.TraceID = traceID
    response.Operation = "create_plugin"
    
    switch {
    case errors.Is(err, errno.ErrPluginDatabaseConnection):
        response.Code = 500
        response.Message = "插件数据库连接失败"
        response.CanRetry = true
        response.SuggestedFix = "数据库连接异常,请稍后重试"
        
    case errors.Is(err, errno.ErrPluginElasticSearchTimeout):
        response.Code = 500
        response.Message = "插件索引操作超时"
        response.CanRetry = true
        response.SuggestedFix = "搜索服务响应超时,请稍后重试"
        
    case errors.Is(err, errno.ErrPluginServiceUnavailable):
        response.Code = 503
        response.Message = "插件创建服务暂时不可用"
        response.CanRetry = true
        response.SuggestedFix = "服务正在维护中,请稍后重试"
        
    case errors.Is(err, errno.ErrPluginCreateEventPublishFailed):
        response.Code = 500
        response.Message = "插件创建事件发布失败"
        response.CanRetry = true
        response.SuggestedFix = "事件发布异常,插件已创建但可能影响搜索,请稍后重试"
        
    case errors.Is(err, errno.ErrPluginIndexCreateFailed):
        response.Code = 500
        response.Message = "插件索引创建失败"
        response.CanRetry = true
        response.SuggestedFix = "搜索索引创建失败,插件已创建但可能无法搜索到"
        
    case errors.Is(err, errno.ErrPluginTransactionRollbackFailed):
        response.Code = 500
        response.Message = "插件创建事务回滚失败"
        response.CanRetry = false
        response.SuggestedFix = "数据一致性异常,请联系技术支持"
        
    case errors.Is(err, errno.ErrPluginIDGenerationFailed):
        response.Code = 500
        response.Message = "插件ID生成失败"
        response.CanRetry = true
        response.SuggestedFix = "ID生成服务异常,请稍后重试"
        
    default:
        response.Code = 5000
        response.Message = "插件创建失败"
        response.Details = "服务器内部错误,请稍后重试"
        response.CanRetry = true
        response.SuggestedFix = "系统内部错误,请稍后重试或联系技术支持"
    }
    
    ctx.JSON(response.Code, response)
}

9.3 知识库创建日志记录策略

知识库创建日志级别定义

  • DEBUG:知识库创建详细调试信息,包括参数值、向量处理过程、文档分块详情
  • INFO:知识库创建关键业务流程信息,如创建开始、参数验证、数据插入、向量空间创建
  • WARN:知识库创建潜在问题警告,如存储配额警告、文档处理警告、向量生成警告
  • ERROR:知识库创建错误信息,包括创建失败、权限错误、向量空间创建失败
  • FATAL:知识库创建严重错误,可能导致数据不一致或向量空间损坏

知识库创建结构化日志格式

// 知识库创建日志记录示例
func (s *KnowledgeApplicationService) CreateKnowledge(ctx context.Context, req *knowledgeAPI.CreateDatasetRequest) (*knowledgeAPI.CreateDatasetResponse, error) {
    traceID := generateTraceID()
    ctx = context.WithValue(ctx, "trace_id", traceID)
    
    userID := ctxutil.GetUIDFromCtx(ctx)
    
    // 记录知识库创建开始
    logs.CtxInfof(ctx, "CreateKnowledge started, userID=%d, knowledgeName=%s, spaceID=%d, embeddingModel=%s, traceID=%s", 
        userID, req.GetName(), req.GetSpaceID(), req.GetEmbeddingModel(), traceID)
    
    startTime := time.Now()
    defer func() {
        duration := time.Since(startTime)
        logs.CtxInfof(ctx, "CreateKnowledge completed, duration=%dms, traceID=%s", 
            duration.Milliseconds(), traceID)
    }()
    
    // 记录关键步骤
    logs.CtxInfof(ctx, "Validating knowledge create parameters, knowledgeName=%s, embeddingModel=%s, chunkSize=%d, traceID=%s", 
        req.GetName(), req.GetEmbeddingModel(), req.GetChunkSize(), traceID)
    
    // 权限验证日志
    logs.CtxInfof(ctx, "Validating knowledge create permission, userID=%d, spaceID=%d, traceID=%s", 
        userID, req.GetSpaceID(), traceID)
    
    // 存储配额检查日志
    logs.CtxInfof(ctx, "Checking storage quota, userID=%d, traceID=%s", userID, traceID)
    
    // 向量空间创建日志
    logs.CtxInfof(ctx, "Creating vector space, embeddingModel=%s, dimensions=%d, traceID=%s", 
        req.GetEmbeddingModel(), getModelDimensions(req.GetEmbeddingModel()), traceID)
    
    // 数据库创建操作日志
    logs.CtxInfof(ctx, "Creating knowledge in database, knowledgeName=%s, traceID=%s", 
        req.GetName(), traceID)
    
    // ElasticSearch索引创建日志
    logs.CtxInfof(ctx, "Creating ElasticSearch index, knowledgeID=%d, traceID=%s", 
        knowledgeID, traceID)
    
    // 事件发布日志
    logs.CtxInfof(ctx, "Publishing knowledge create event, knowledgeID=%d, traceID=%s", 
        knowledgeID, traceID)
    
    return resp, nil
}

// 知识库创建操作审计日志
func (s *KnowledgeApplicationService) logKnowledgeCreateAudit(ctx context.Context, operation string, knowledgeID int64, details map[string]interface{}) {
    userID := ctx.Value("user_id").(int64)
    spaceID := ctx.Value("space_id").(int64)
    traceID := ctx.Value("trace_id").(string)
    
    auditLog := map[string]interface{}{
        "operation":       operation,
        "knowledge_id":    knowledgeID,
        "user_id":         userID,
        "space_id":        spaceID,
        "trace_id":        traceID,
        "timestamp":       time.Now().Unix(),
        "details":         details,
        "knowledge_name":  details["knowledge_name"],
        "embedding_model": details["embedding_model"],
        "chunk_size":      details["chunk_size"],
        "chunk_overlap":   details["chunk_overlap"],
        "vector_space_id": details["vector_space_id"],
        "storage_used":    details["storage_used"],
    }
    
    logs.CtxInfof(ctx, "Knowledge create audit log: %+v", auditLog)
}

// 文档处理日志记录
func (s *KnowledgeApplicationService) logDocumentProcessing(ctx context.Context, knowledgeID int64, documentID int64, operation string, details map[string]interface{}) {
    traceID := ctx.Value("trace_id").(string)
    
    docLog := map[string]interface{}{
        "operation":     operation,
        "knowledge_id":  knowledgeID,
        "document_id":   documentID,
        "trace_id":      traceID,
        "timestamp":     time.Now().Unix(),
        "details":       details,
        "file_name":     details["file_name"],
        "file_size":     details["file_size"],
        "chunk_count":   details["chunk_count"],
        "vector_count":  details["vector_count"],
        "processing_time": details["processing_time"],
    }
    
    logs.CtxInfof(ctx, "Document processing log: %+v", docLog)
}

// 向量空间操作日志
func (s *KnowledgeApplicationService) logVectorSpaceOperation(ctx context.Context, operation string, vectorSpaceID string, details map[string]interface{}) {
    traceID := ctx.Value("trace_id").(string)
    
    vectorLog := map[string]interface{}{
        "operation":        operation,
        "vector_space_id":  vectorSpaceID,
        "trace_id":         traceID,
        "timestamp":        time.Now().Unix(),
        "details":          details,
        "embedding_model":  details["embedding_model"],
        "dimensions":       details["dimensions"],
        "vector_count":     details["vector_count"],
        "index_type":       details["index_type"],
    }
    
    logs.CtxInfof(ctx, "Vector space operation log: %+v", vectorLog)
}

知识库创建日志内容规范

  • 请求日志:记录用户ID、工作空间ID、知识库名称、嵌入模型、分块策略、TraceID
  • 业务日志:记录知识库创建步骤、参数验证结果、权限验证结果、向量空间创建过程
  • 性能日志:记录创建接口响应时间、数据库插入时间、向量空间创建时间、文档处理时间
  • 错误日志:记录创建错误堆栈、知识库相关上下文信息、向量处理失败原因
  • 审计日志:记录知识库的创建操作、创建参数、创建结果、关联的文档和向量信息
  • 安全日志:记录创建频率、权限验证、存储配额检查、可疑创建行为
  • 文档处理日志:记录文档上传、分块处理、向量生成、索引创建等详细过程
  • 向量空间日志:记录向量空间创建、配置、索引构建、查询性能等信息

9.4 知识库创建监控和告警

知识库创建关键指标监控

  • 创建性能:知识库创建响应时间、创建成功率、创建QPS、创建吞吐量
  • 资源使用:数据库连接数、向量空间创建延迟、内存使用率、文档处理队列长度
  • 业务指标:知识库创建成功率、创建频率分布、不同嵌入模型使用比例、用户创建活跃度
  • 安全指标:权限验证通过率、恶意创建尝试次数、创建频率限制触发次数、存储配额检查失败率
  • 质量指标:向量空间创建成功率、文档处理成功率、嵌入模型响应率、索引创建成功率
  • 存储指标:存储使用量、向量数量、文档数量、索引大小、存储增长率
  • 向量处理指标:向量生成延迟、向量维度分布、嵌入模型调用次数、向量相似度计算性能

知识库创建告警策略

  • 创建失败率告警:当知识库创建失败率超过3%时触发告警
  • 性能告警:当知识库创建响应时间超过10秒时触发告警
  • 资源告警:当数据库连接数超过80%或向量数据库连接异常时触发告警
  • 安全告警:当检测到异常创建行为或存储配额滥用时立即触发告警
  • 数据一致性告警:当MySQL、ES和向量数据库创建状态不一致时触发告警
  • 配额告警:当用户存储使用量超过90%时触发告警
  • 向量服务告警:当嵌入模型服务不可用或响应超时时触发告警
  • 文档处理告警:当文档处理队列积压超过阈值时触发告警
// 知识库创建监控指标收集
type KnowledgeCreateMetrics struct {
    CreateSuccessCount      int64         // 创建成功次数
    CreateFailureCount      int64         // 创建失败次数
    CreateLatency           time.Duration // 创建延迟
    PermissionDeniedCount   int64         // 权限拒绝次数
    RateLimitCount          int64         // 频率限制次数
    ParameterValidationFailCount int64    // 参数验证失败次数
    VectorSpaceCreateLatency time.Duration // 向量空间创建延迟
    VectorSpaceCreateFailCount int64      // 向量空间创建失败次数
    DocumentProcessingLatency time.Duration // 文档处理延迟
    EmbeddingGenerationLatency time.Duration // 嵌入生成延迟
    EmbeddingModelFailCount int64         // 嵌入模型调用失败次数
    StorageQuotaExceededCount int64       // 存储配额超限次数
    IndexCreateLatency      time.Duration // 索引创建延迟
    IndexCreateFailCount    int64         // 索引创建失败次数
    EventPublishLatency     time.Duration // 事件发布延迟
    DatabaseInsertLatency   time.Duration // 数据库插入延迟
    VectorDatabaseLatency   time.Duration // 向量数据库操作延迟
    TotalStorageUsed        int64         // 总存储使用量
    TotalVectorCount        int64         // 总向量数量
    TotalDocumentCount      int64         // 总文档数量
}

// 知识库创建监控指标上报
func (s *KnowledgeApplicationService) reportCreateMetrics(ctx context.Context, operation string, startTime time.Time, knowledgeID int64, req *knowledgeAPI.CreateDatasetRequest, err error) {
    latency := time.Since(startTime)
    
    if err != nil {
        metrics.CreateFailureCount++
        
        // 根据错误类型分类统计
        switch {
        case errors.Is(err, errno.ErrKnowledgePermissionCode):
            metrics.PermissionDeniedCount++
        case errors.Is(err, errno.ErrKnowledgeCreateRateLimitCode):
            metrics.RateLimitCount++
        case errors.Is(err, errno.ErrKnowledgeInvalidParamCode):
            metrics.ParameterValidationFailCount++
        case errors.Is(err, errno.ErrKnowledgeStorageQuotaExceededCode):
            metrics.StorageQuotaExceededCount++
        case errors.Is(err, errno.ErrKnowledgeVectorSpaceCreateFailedCode):
            metrics.VectorSpaceCreateFailCount++
        case errors.Is(err, errno.ErrKnowledgeEmbeddingModelFailedCode):
            metrics.EmbeddingModelFailCount++
        case errors.Is(err, errno.ErrKnowledgeIndexCreateFailedCode):
            metrics.IndexCreateFailCount++
        }
        
        logs.CtxErrorf(ctx, "Knowledge %s failed, knowledgeName=%s, spaceID=%d, embeddingModel=%s, error=%v, latency=%dms", 
            operation, req.GetName(), req.GetSpaceID(), req.GetEmbeddingModel(), err, latency.Milliseconds())
    } else {
        metrics.CreateSuccessCount++
        metrics.CreateLatency = latency
        
        // 记录知识库类型统计
        embeddingModel := req.GetEmbeddingModel()
        chunkSize := req.GetChunkSize()
        
        logs.CtxInfof(ctx, "Knowledge %s succeeded, knowledgeID=%d, knowledgeName=%s, embeddingModel=%s, chunkSize=%d, latency=%dms", 
            operation, knowledgeID, req.GetName(), embeddingModel, chunkSize, latency.Milliseconds())
    }
    
    // 上报到监控系统
    s.metricsReporter.Report(ctx, "knowledge_create", map[string]interface{}{
        "operation":       operation,
        "knowledge_id":    knowledgeID,
        "knowledge_name":  req.GetName(),
        "embedding_model": req.GetEmbeddingModel(),
        "chunk_size":      req.GetChunkSize(),
        "chunk_overlap":   req.GetChunkOverlap(),
        "space_id":        req.GetSpaceID(),
        "success":         err == nil,
        "latency_ms":      latency.Milliseconds(),
        "error_type":      getKnowledgeCreateErrorType(err),
        "vector_dimensions": getModelDimensions(req.GetEmbeddingModel()),
        "storage_used":    getStorageUsed(ctx, req.GetSpaceID()),
    })
}

// 获取知识库创建错误类型
func getKnowledgeCreateErrorType(err error) string {
    if err == nil {
        return "none"
    }
    
    // 基于知识库错误码定义
    switch {
    case errors.Is(err, errno.ErrKnowledgePermissionCode):
        return "permission_denied"
    case errors.Is(err, errno.ErrKnowledgeNameExistsCode):
        return "knowledge_exists"
    case errors.Is(err, errno.ErrKnowledgeInvalidParamCode):
        return "invalid_parameters"
    case errors.Is(err, errno.ErrKnowledgeStorageQuotaExceededCode):
        return "storage_quota_exceeded"
    case errors.Is(err, errno.ErrKnowledgeVectorSpaceCreateFailedCode):
        return "vector_space_create_failed"
    case errors.Is(err, errno.ErrKnowledgeEmbeddingModelFailedCode):
        return "embedding_model_failed"
    case errors.Is(err, errno.ErrKnowledgeIndexCreateFailedCode):
        return "index_create_failed"
    case errors.Is(err, errno.ErrKnowledgeDocumentProcessingFailedCode):
        return "document_processing_failed"
    case errors.Is(err, errno.ErrKnowledgeVectorDatabaseTimeoutCode):
        return "vector_database_timeout"
    case errors.Is(err, errno.ErrKnowledgeCreateRateLimitCode):
        return "rate_limit_exceeded"
    default:
        return "system_error"
    }
}

// 知识库创建告警检查
func (s *KnowledgeApplicationService) checkCreateAlerts(ctx context.Context, metrics *KnowledgeCreateMetrics) {
    // 创建失败率告警
    totalCreates := metrics.CreateSuccessCount + metrics.CreateFailureCount
    if totalCreates > 100 {
        failureRate := float64(metrics.CreateFailureCount) / float64(totalCreates)
        if failureRate > 0.03 { // 3%
            s.alertManager.SendAlert(ctx, &Alert{
                Level:   "warning",
                Type:    "knowledge_create_failure_rate",
                Message: fmt.Sprintf("知识库创建失败率过高: %.2f%%", failureRate*100),
                Metrics: map[string]interface{}{
                    "failure_rate": failureRate,
                    "total_creates": totalCreates,
                },
            })
        }
    }
    
    // 性能告警
    if metrics.CreateLatency > 10*time.Second {
        s.alertManager.SendAlert(ctx, &Alert{
            Level:   "warning",
            Type:    "knowledge_create_latency",
            Message: fmt.Sprintf("知识库创建延迟过高: %dms", metrics.CreateLatency.Milliseconds()),
            Metrics: map[string]interface{}{
                "latency_ms": metrics.CreateLatency.Milliseconds(),
            },
        })
    }
    
    // 存储配额告警
    if metrics.StorageQuotaExceededCount > 10 {
        s.alertManager.SendAlert(ctx, &Alert{
            Level:   "critical",
            Type:    "knowledge_storage_quota_exceeded",
            Message: fmt.Sprintf("存储配额超限次数过多: %d", metrics.StorageQuotaExceededCount),
            Metrics: map[string]interface{}{
                "quota_exceeded_count": metrics.StorageQuotaExceededCount,
            },
        })
    }
    
    // 向量空间创建失败告警
    if metrics.VectorSpaceCreateFailCount > 5 {
        s.alertManager.SendAlert(ctx, &Alert{
            Level:   "critical",
            Type:    "knowledge_vector_space_create_failed",
            Message: fmt.Sprintf("向量空间创建失败次数过多: %d", metrics.VectorSpaceCreateFailCount),
            Metrics: map[string]interface{}{
                "vector_space_fail_count": metrics.VectorSpaceCreateFailCount,
            },
        })
    }
    
    // 嵌入模型失败告警
    if metrics.EmbeddingModelFailCount > 20 {
        s.alertManager.SendAlert(ctx, &Alert{
            Level:   "warning",
            Type:    "knowledge_embedding_model_failed",
            Message: fmt.Sprintf("嵌入模型调用失败次数过多: %d", metrics.EmbeddingModelFailCount),
            Metrics: map[string]interface{}{
                "embedding_fail_count": metrics.EmbeddingModelFailCount,
            },
        })
    }
}