第四章:AI集成与机器学习

发布于:2025-09-15 ⋅ 阅读:(19) ⋅ 点赞:(0)

4.1 AI模型集成

模型加载与管理

import ai.models as models
import ai.config as config

# 配置AI服务
config.set_provider("openai", {
    "api_key": "your-api-key",
    "base_url": "https://api.openai.com/v1"
})

config.set_provider("huggingface", {
    "token": "your-hf-token",
    "cache_dir": "./models"
})

# 加载不同类型的模型
class ModelManager {
    func __init__() {
        self.models = {}
        self.model_configs = {
            "text_generation": {
                "openai": "gpt-4",
                "huggingface": "microsoft/DialoGPT-medium"
            },
            "text_embedding": {
                "openai": "text-embedding-ada-002",
                "huggingface": "sentence-transformers/all-MiniLM-L6-v2"
            },
            "image_generation": {
                "openai": "dall-e-3",
                "stability": "stable-diffusion-xl"
            }
        }
    }
    
    func load_model(model_type: str, provider: str = "openai") -> any {
        model_key = f"{model_type}_{provider}"
        
        if model_key in self.models {
            return self.models[model_key]
        }
        
        model_name = self.model_configs[model_type][provider]
        model = models.load(model_name, provider=provider)
        self.models[model_key] = model
        
        return model
    }
    
    func unload_model(model_type: str, provider: str = "openai") {
        model_key = f"{model_type}_{provider}"
        if model_key in self.models {
            self.models[model_key].unload()
            del self.models[model_key]
    }
    
    func list_loaded_models() -> list[str] {
        return list(self.models.keys())
    }
}

# 使用模型管理器
model_manager = ModelManager()

# 加载文本生成模型
text_model = model_manager.load_model("text_generation", "openai")
embedding_model = model_manager.load_model("text_embedding", "huggingface")

模型推理接口

# 统一的推理接口
class AIInference {
    func __init__(model_manager: ModelManager) {
        self.model_manager = model_manager
    }
    
    func generate_text(
        prompt: str, 
        max_tokens: int = 100,
        temperature: float = 0.7,
        provider: str = "openai"
    ) -> str {
        model = self.model_manager.load_model("text_generation", provider)
        
        response = model.generate(
            prompt=prompt,
            max_tokens=max_tokens,
            temperature=temperature
        )
        
        return response.text
    }
    
    func get_embeddings(
        texts: list[str],
        provider: str = "openai"
    ) -> list[list[float]] {
        model = self.model_manager.load_model("text_embedding", provider)
        
        embeddings = []
        for text in texts {
            embedding = model.encode(text)
            embeddings.append(embedding.tolist())
        }
        
        return embeddings
    }
    
    func generate_image(
        prompt: str,
        size: str = "1024x1024",
        quality: str = "standard",
        provider: str = "openai"
    ) -> str {
        model = self.model_manager.load_model("image_generation", provider)
        
        response = model.generate(
            prompt=prompt,
            size=size,
            quality=quality
        )
        
        return response.url
    }
    
    func analyze_sentiment(
        text: str,
        provider: str = "huggingface"
    ) -> dict {
        # 使用预训练的情感分析模型
        model = models.load("cardiffnlp/twitter-roberta-base-sentiment-latest")
        
        result = model.predict(text)
        
        return {
            "label": result.label,
            "confidence": result.confidence,
            "scores": result.scores
        }
    }
}

# 使用推理接口
ai_inference = AIInference(model_manager)

# 文本生成示例
response = ai_inference.generate_text(
    "请解释什么是机器学习",
    max_tokens=200,
    temperature=0.7
)
print(response)

# 文本嵌入示例
texts = ["机器学习", "深度学习", "人工智能"]
embeddings = ai_inference.get_embeddings(texts)
print(f"嵌入维度: {len(embeddings[0])}")

# 情感分析示例
sentiment = ai_inference.analyze_sentiment("今天天气真不错!")
print(f"情感: {sentiment['label']}, 置信度: {sentiment['confidence']:.2f}")

4.2 数据处理管道

数据预处理

import ai.data as data
import ai.preprocessing as prep

class DataPipeline {
    func __init__() {
        self.steps = []
        self.fitted_transformers = {}
    }
    
    func add_step(name: str, transformer: callable) -> self {
        self.steps.append((name, transformer))
        return self
    }
    
    func fit(X: any, y: any? = null) -> self {
        current_X = X
        
        for name, transformer in self.steps {
            if hasattr(transformer, "fit") {
                transformer.fit(current_X, y)
                self.fitted_transformers[name] = transformer
            }
            
            if hasattr(transformer, "transform") {
                current_X = transformer.transform(current_X)
            } else {
                current_X = transformer(current_X)
            }
        }
        
        return self
    }
    
    func transform(X: any) -> any {
        current_X = X
        
        for name, transformer in self.steps {
            if name in self.fitted_transformers {
                current_X = self.fitted_transformers[name].transform(current_X)
            } else {
                current_X = transformer(current_X)
            }
        }
        
        return current_X
    }
    
    func fit_transform(X: any, y: any? = null) -> any {
        return self.fit(X, y).transform(X)
    }
}

# 文本预处理组件
class TextCleaner {
    func __init__(lowercase: bool = true, remove_punctuation: bool = true) {
        self.lowercase = lowercase
        self.remove_punctuation = remove_punctuation
    }
    
    func __call__(texts: list[str]) -> list[str] {
        cleaned_texts = []
        
        for text in texts {
            cleaned = text
            
            if self.lowercase {
                cleaned = cleaned.lower()
            }
            
            if self.remove_punctuation {
                cleaned = re.sub(r'[^\w\s]', '', cleaned)
            }
            
            # 移除多余空格
            cleaned = re.sub(r'\s+', ' ', cleaned).strip()
            cleaned_texts.append(cleaned)
        }
        
        return cleaned_texts
    }
}

class TextTokenizer {
    func __init__(max_length: int = 512, padding: bool = true) {
        self.max_length = max_length
        self.padding = padding
        self.tokenizer = null
    }
    
    func fit(texts: list[str]) {
        # 构建词汇表
        vocab = set()
        for text in texts {
            words = text.split()
            vocab.update(words)
        }
        
        self.vocab = {word: i for i, word in enumerate(sorted(vocab))}
        self.vocab["<PAD>"] = len(self.vocab)
        self.vocab["<UNK>"] = len(self.vocab)
    }
    
    func transform(texts: list[str]) -> list[list[int]] {
        tokenized_texts = []
        
        for text in texts {
            words = text.split()
            tokens = [self.vocab.get(word, self.vocab["<UNK>"]) for word in words]
            
            # 截断或填充
            if len(tokens) > self.max_length {
                tokens = tokens[:self.max_length]
            } elif self.padding and len(tokens) < self.max_length {
                tokens.extend([self.vocab["<PAD>"]] * (self.max_length - len(tokens)))
            }
            
            tokenized_texts.append(tokens)
        }
        
        return tokenized_texts
    }
}

# 使用数据管道
texts = [
    "Hello, World! This is a test.",
    "Machine Learning is Amazing!!!",
    "AI Script makes everything easier."
]

# 构建预处理管道
pipeline = DataPipeline()
pipeline.add_step("clean", TextCleaner())
pipeline.add_step("tokenize", TextTokenizer(max_length=10))

# 训练和转换
processed_texts = pipeline.fit_transform(texts)
print(processed_texts)

特征工程

import ai.features as features
import numpy as np

class FeatureExtractor {
    func __init__() {
        self.extractors = {}
    }
    
    func add_extractor(name: str, extractor: callable) {
        self.extractors[name] = extractor
    }
    
    func extract(data: any) -> dict {
        feature_dict = {}
        
        for name, extractor in self.extractors.items() {
            try {
                feature_dict[name] = extractor(data)
            } catch Exception as e {
                print(f"特征提取器 {name} 失败: {e}")
                feature_dict[name] = null
            }
        }
        
        return feature_dict
    }
    
    func extract_batch(data_list: list) -> list[dict] {
        return [self.extract(data) for data in data_list]
    }
}

# 文本特征提取器
class TextFeatureExtractor(FeatureExtractor) {
    func __init__() {
        super().__init__()
        self._setup_extractors()
    }
    
    func _setup_extractors() {
        # 基础统计特征
        self.add_extractor("length", lambda text: len(text))
        self.add_extractor("word_count", lambda text: len(text.split()))
        self.add_extractor("sentence_count", lambda text: len(text.split('.')))
        
        # 字符特征
        self.add_extractor("uppercase_ratio", self._uppercase_ratio)
        self.add_extractor("digit_ratio", self._digit_ratio)
        self.add_extractor("punctuation_ratio", self._punctuation_ratio)
        
        # 语言特征
        self.add_extractor("avg_word_length", self._avg_word_length)
        self.add_extractor("unique_word_ratio", self._unique_word_ratio)
    }
    
    func _uppercase_ratio(text: str) -> float {
        if len(text) == 0 {
            return 0.0
        }
        return sum(1 for c in text if c.isupper()) / len(text)
    }
    
    func _digit_ratio(text: str) -> float {
        if len(text) == 0 {
            return 0.0
        }
        return sum(1 for c in text if c.isdigit()) / len(text)
    }
    
    func _punctuation_ratio(text: str) -> float {
        if len(text) == 0 {
            return 0.0
        }
        punctuation = "!\"#$%&'()*+,-./:;<=>?@[\\]^_`{|}~"
        return sum(1 for c in text if c in punctuation) / len(text)
    }
    
    func _avg_word_length(text: str) -> float {
        words = text.split()
        if len(words) == 0 {
            return 0.0
        }
        return sum(len(word) for word in words) / len(words)
    }
    
    func _unique_word_ratio(text: str) -> float {
        words = text.split()
        if len(words) == 0 {
            return 0.0
        }
        return len(set(words)) / len(words)
    }
}

# 使用特征提取器
text_extractor = TextFeatureExtractor()

sample_texts = [
    "Hello, World! This is a test.",
    "MACHINE LEARNING IS AMAZING!!!",
    "AI Script makes everything easier and more efficient."
]

features_list = text_extractor.extract_batch(sample_texts)
for i, features in enumerate(features_list) {
    print(f"文本 {i+1} 特征:")
    for name, value in features.items() {
        print(f"  {name}: {value:.3f}")
    print()
}

4.3 模型训练与评估

机器学习模型训练

import ai.ml as ml
import ai.metrics as metrics
from sklearn.model_selection import train_test_split

class MLTrainer {
    func __init__() {
        self.models = {}
        self.training_history = {}
    }
    
    func train_classifier(
        X: any, 
        y: any, 
        model_type: str = "random_forest",
        test_size: float = 0.2,
        **kwargs
    ) -> dict {
        # 分割数据
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=test_size, random_state=42
        )
        
        # 选择模型
        model = self._create_model(model_type, **kwargs)
        
        # 训练模型
        print(f"开始训练 {model_type} 模型...")
        start_time = time.time()
        
        model.fit(X_train, y_train)
        
        training_time = time.time() - start_time
        print(f"训练完成,耗时: {training_time:.2f}秒")
        
        # 评估模型
        train_pred = model.predict(X_train)
        test_pred = model.predict(X_test)
        
        results = {
            "model": model,
            "training_time": training_time,
            "train_accuracy": metrics.accuracy_score(y_train, train_pred),
            "test_accuracy": metrics.accuracy_score(y_test, test_pred),
            "train_f1": metrics.f1_score(y_train, train_pred, average="weighted"),
            "test_f1": metrics.f1_score(y_test, test_pred, average="weighted"),
            "confusion_matrix": metrics.confusion_matrix(y_test, test_pred),
            "classification_report": metrics.classification_report(y_test, test_pred)
        }
        
        # 保存模型和结果
        model_name = f"{model_type}_{int(time.time())}"
        self.models[model_name] = model
        self.training_history[model_name] = results
        
        return results
    }
    
    func _create_model(model_type: str, **kwargs) {
        match model_type {
            "random_forest" => {
                from sklearn.ensemble import RandomForestClassifier
                return RandomForestClassifier(
                    n_estimators=kwargs.get("n_estimators", 100),
                    max_depth=kwargs.get("max_depth", null),
                    random_state=42
                )
            }
            "svm" => {
                from sklearn.svm import SVC
                return SVC(
                    kernel=kwargs.get("kernel", "rbf"),
                    C=kwargs.get("C", 1.0),
                    random_state=42
                )
            }
            "logistic_regression" => {
                from sklearn.linear_model import LogisticRegression
                return LogisticRegression(
                    max_iter=kwargs.get("max_iter", 1000),
                    random_state=42
                )
            }
            "neural_network" => {
                from sklearn.neural_network import MLPClassifier
                return MLPClassifier(
                    hidden_layer_sizes=kwargs.get("hidden_layer_sizes", (100,)),
                    max_iter=kwargs.get("max_iter", 500),
                    random_state=42
                )
            }
            _ => {
                raise ValueError(f"不支持的模型类型: {model_type}")
            }
        }
    }
    
    func compare_models(X: any, y: any, model_types: list[str]) -> dict {
        results = {}
        
        for model_type in model_types {
            print(f"\n训练 {model_type} 模型...")
            result = self.train_classifier(X, y, model_type)
            results[model_type] = result
        }
        
        # 比较结果
        print("\n模型比较结果:")
        print(f"{'模型':<20} {'训练准确率':<12} {'测试准确率':<12} {'F1分数':<10} {'训练时间':<10}")
        print("-" * 70)
        
        for model_type, result in results.items() {
            print(f"{model_type:<20} {result['train_accuracy']:<12.3f} {result['test_accuracy']:<12.3f} {result['test_f1']:<10.3f} {result['training_time']:<10.2f}s")
        }
        
        return results
    }
}

# 使用示例
trainer = MLTrainer()

# 准备示例数据(文本分类)
from sklearn.datasets import fetch_20newsgroups
from sklearn.feature_extraction.text import TfidfVectorizer

# 加载数据
categories = ['alt.atheism', 'soc.religion.christian', 'comp.graphics', 'sci.med']
newsgroups_train = fetch_20newsgroups(subset='train', categories=categories)

# 特征提取
vectorizer = TfidfVectorizer(max_features=5000, stop_words='english')
X = vectorizer.fit_transform(newsgroups_train.data)
y = newsgroups_train.target

# 训练和比较多个模型
model_types = ["random_forest", "svm", "logistic_regression"]
comparison_results = trainer.compare_models(X, y, model_types)

深度学习模型训练

import ai.deep_learning as dl
import torch
import torch.nn as nn
import torch.optim as optim

class DeepLearningTrainer {
    func __init__(device: str = "auto") {
        if device == "auto" {
            self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        } else {
            self.device = torch.device(device)
        }
        
        print(f"使用设备: {self.device}")
    }
    
    func train_neural_network(
        model: nn.Module,
        train_loader: any,
        val_loader: any,
        epochs: int = 10,
        learning_rate: float = 0.001,
        criterion: any = null,
        optimizer: any = null
    ) -> dict {
        # 设置默认损失函数和优化器
        if criterion is null {
            criterion = nn.CrossEntropyLoss()
        }
        
        if optimizer is null {
            optimizer = optim.Adam(model.parameters(), lr=learning_rate)
        }
        
        model.to(self.device)
        
        # 训练历史
        history = {
            "train_loss": [],
            "train_accuracy": [],
            "val_loss": [],
            "val_accuracy": []
        }
        
        for epoch in range(epochs) {
            # 训练阶段
            model.train()
            train_loss = 0.0
            train_correct = 0
            train_total = 0
            
            for batch_idx, (data, target) in enumerate(train_loader) {
                data, target = data.to(self.device), target.to(self.device)
                
                optimizer.zero_grad()
                output = model(data)
                loss = criterion(output, target)
                loss.backward()
                optimizer.step()
                
                train_loss += loss.item()
                _, predicted = torch.max(output.data, 1)
                train_total += target.size(0)
                train_correct += (predicted == target).sum().item()
            }
            
            # 验证阶段
            model.eval()
            val_loss = 0.0
            val_correct = 0
            val_total = 0
            
            with torch.no_grad() {
                for data, target in val_loader {
                    data, target = data.to(self.device), target.to(self.device)
                    output = model(data)
                    loss = criterion(output, target)
                    
                    val_loss += loss.item()
                    _, predicted = torch.max(output.data, 1)
                    val_total += target.size(0)
                    val_correct += (predicted == target).sum().item()
                }
            }
            
            # 计算平均损失和准确率
            avg_train_loss = train_loss / len(train_loader)
            train_accuracy = 100 * train_correct / train_total
            avg_val_loss = val_loss / len(val_loader)
            val_accuracy = 100 * val_correct / val_total
            
            # 记录历史
            history["train_loss"].append(avg_train_loss)
            history["train_accuracy"].append(train_accuracy)
            history["val_loss"].append(avg_val_loss)
            history["val_accuracy"].append(val_accuracy)
            
            print(f"Epoch {epoch+1}/{epochs}:")
            print(f"  训练损失: {avg_train_loss:.4f}, 训练准确率: {train_accuracy:.2f}%")
            print(f"  验证损失: {avg_val_loss:.4f}, 验证准确率: {val_accuracy:.2f}%")
        }
        
        return history
    }
}

# 定义一个简单的神经网络
class SimpleNN(nn.Module) {
    func __init__(input_size: int, hidden_size: int, num_classes: int) {
        super(SimpleNN, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.2)
        self.fc2 = nn.Linear(hidden_size, num_classes)
    }
    
    func forward(x) {
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x
    }
}

# 使用深度学习训练器
dl_trainer = DeepLearningTrainer()

# 创建模型
model = SimpleNN(input_size=784, hidden_size=128, num_classes=10)

# 假设我们有训练和验证数据加载器
# train_loader = ...
# val_loader = ...

# 训练模型
# history = dl_trainer.train_neural_network(
#     model=model,
#     train_loader=train_loader,
#     val_loader=val_loader,
#     epochs=20,
#     learning_rate=0.001
# )

4.4 模型部署与推理

模型序列化与加载

import ai.deployment as deploy
import pickle
import joblib
import torch

class ModelSerializer {
    func save_sklearn_model(model: any, filepath: str) {
        """保存scikit-learn模型"""
        joblib.dump(model, filepath)
        print(f"模型已保存到: {filepath}")
    }
    
    func load_sklearn_model(filepath: str) -> any {
        """加载scikit-learn模型"""
        model = joblib.load(filepath)
        print(f"模型已从 {filepath} 加载")
        return model
    }
    
    func save_pytorch_model(model: nn.Module, filepath: str) {
        """保存PyTorch模型"""
        torch.save(model.state_dict(), filepath)
        print(f"PyTorch模型已保存到: {filepath}")
    }
    
    func load_pytorch_model(model_class: type, filepath: str, **kwargs) -> nn.Module {
        """加载PyTorch模型"""
        model = model_class(**kwargs)
        model.load_state_dict(torch.load(filepath))
        model.eval()
        print(f"PyTorch模型已从 {filepath} 加载")
        return model
    }
    
    func save_model_metadata(metadata: dict, filepath: str) {
        """保存模型元数据"""
        with open(filepath, "w") as f {
            json.dump(metadata, f, indent=2)
    }
    
    func load_model_metadata(filepath: str) -> dict {
        """加载模型元数据"""
        with open(filepath, "r") as f {
            return json.load(f)
        }
    }
}

# 模型部署服务
class ModelService {
    func __init__() {
        self.models = {}
        self.serializer = ModelSerializer()
    }
    
    func register_model(
        name: str, 
        model: any, 
        preprocessor: callable? = null,
        postprocessor: callable? = null
    ) {
        """注册模型到服务中"""
        self.models[name] = {
            "model": model,
            "preprocessor": preprocessor,
            "postprocessor": postprocessor,
            "created_at": time.time(),
            "prediction_count": 0
        }
        print(f"模型 '{name}' 已注册")
    }
    
    func predict(name: str, input_data: any) -> any {
        """使用指定模型进行预测"""
        if name not in self.models {
            raise ValueError(f"模型 '{name}' 未找到")
        }
        
        model_info = self.models[name]
        model = model_info["model"]
        
        # 预处理
        if model_info["preprocessor"] is not null {
            input_data = model_info["preprocessor"](input_data)
        }
        
        # 预测
        if hasattr(model, "predict") {
            prediction = model.predict(input_data)
        } elif hasattr(model, "forward") {
            with torch.no_grad() {
                prediction = model(input_data)
            }
        } else {
            prediction = model(input_data)
        }
        
        # 后处理
        if model_info["postprocessor"] is not null {
            prediction = model_info["postprocessor"](prediction)
        }
        
        # 更新统计信息
        model_info["prediction_count"] += 1
        
        return prediction
    }
    
    func get_model_info(name: str) -> dict {
        """获取模型信息"""
        if name not in self.models {
            raise ValueError(f"模型 '{name}' 未找到")
        }
        
        model_info = self.models[name].copy()
        del model_info["model"]  # 不返回模型对象
        return model_info
    }
    
    func list_models() -> list[str] {
        """列出所有注册的模型"""
        return list(self.models.keys())
    }
}

# 使用示例
model_service = ModelService()

# 注册模型
# 假设我们有一个训练好的模型
# trained_model = ...
# model_service.register_model(
#     name="text_classifier",
#     model=trained_model,
#     preprocessor=lambda x: vectorizer.transform([x]),
#     postprocessor=lambda x: x[0]
# )

# 进行预测
# result = model_service.predict("text_classifier", "这是一个测试文本")
# print(f"预测结果: {result}")

API服务部署

import ai.api as api
from flask import Flask, request, jsonify

class ModelAPI {
    func __init__(model_service: ModelService) {
        self.app = Flask(__name__)
        self.model_service = model_service
        self._setup_routes()
    }
    
    func _setup_routes() {
        @self.app.route("/health", methods=["GET"])
        func health_check() {
            return jsonify({"status": "healthy", "timestamp": time.time()})
        }
        
        @self.app.route("/models", methods=["GET"])
        func list_models() {
            models = self.model_service.list_models()
            return jsonify({"models": models})
        }
        
        @self.app.route("/models/<model_name>/info", methods=["GET"])
        func get_model_info(model_name) {
            try {
                info = self.model_service.get_model_info(model_name)
                return jsonify(info)
            } catch ValueError as e {
                return jsonify({"error": str(e)}), 404
            }
        }
        
        @self.app.route("/predict/<model_name>", methods=["POST"])
        func predict(model_name) {
            try {
                data = request.get_json()
                
                if "input" not in data {
                    return jsonify({"error": "缺少 'input' 字段"}), 400
                }
                
                input_data = data["input"]
                prediction = self.model_service.predict(model_name, input_data)
                
                return jsonify({
                    "prediction": prediction.tolist() if hasattr(prediction, "tolist") else prediction,
                    "model": model_name,
                    "timestamp": time.time()
                })
                
            } catch ValueError as e {
                return jsonify({"error": str(e)}), 404
            } catch Exception as e {
                return jsonify({"error": f"预测失败: {str(e)}"}), 500
            }
        }
        
        @self.app.route("/batch_predict/<model_name>", methods=["POST"])
        func batch_predict(model_name) {
            try {
                data = request.get_json()
                
                if "inputs" not in data {
                    return jsonify({"error": "缺少 'inputs' 字段"}), 400
                }
                
                inputs = data["inputs"]
                predictions = []
                
                for input_data in inputs {
                    prediction = self.model_service.predict(model_name, input_data)
                    predictions.append(
                        prediction.tolist() if hasattr(prediction, "tolist") else prediction
                    )
                }
                
                return jsonify({
                    "predictions": predictions,
                    "model": model_name,
                    "count": len(predictions),
                    "timestamp": time.time()
                })
                
            } catch ValueError as e {
                return jsonify({"error": str(e)}), 404
            } catch Exception as e {
                return jsonify({"error": f"批量预测失败: {str(e)}"}), 500
            }
        }
    }
    
    func run(host: str = "0.0.0.0", port: int = 5000, debug: bool = false) {
        print(f"启动模型API服务,地址: http://{host}:{port}")
        self.app.run(host=host, port=port, debug=debug)
    }
}

# 部署API服务
api_service = ModelAPI(model_service)

# 启动服务
if __name__ == "__main__" {
    api_service.run(port=8080, debug=true)
}

4.5 模型监控与维护

性能监控

import ai.monitoring as monitoring
import time
import statistics

class ModelMonitor {
    func __init__() {
        self.metrics = {
            "prediction_times": [],
            "prediction_counts": {},
            "error_counts": {},
            "accuracy_scores": [],
            "memory_usage": [],
            "cpu_usage": []
        }
        self.start_time = time.time()
    }
    
    func log_prediction(
        model_name: str, 
        prediction_time: float, 
        success: bool = true,
        accuracy: float? = null
    ) {
        # 记录预测时间
        self.metrics["prediction_times"].append(prediction_time)
        
        # 记录预测次数
        if model_name not in self.metrics["prediction_counts"] {
            self.metrics["prediction_counts"][model_name] = 0
        }
        self.metrics["prediction_counts"][model_name] += 1
        
        # 记录错误次数
        if not success {
            if model_name not in self.metrics["error_counts"] {
                self.metrics["error_counts"][model_name] = 0
            }
            self.metrics["error_counts"][model_name] += 1
        }
        
        # 记录准确率
        if accuracy is not null {
            self.metrics["accuracy_scores"].append(accuracy)
        }
    }
    
    func log_system_metrics() {
        import psutil
        
        # 记录内存使用率
        memory_percent = psutil.virtual_memory().percent
        self.metrics["memory_usage"].append(memory_percent)
        
        # 记录CPU使用率
        cpu_percent = psutil.cpu_percent()
        self.metrics["cpu_usage"].append(cpu_percent)
    }
    
    func get_performance_summary() -> dict {
        summary = {
            "uptime": time.time() - self.start_time,
            "total_predictions": sum(self.metrics["prediction_counts"].values()),
            "total_errors": sum(self.metrics["error_counts"].values())
        }
        
        # 预测时间统计
        if self.metrics["prediction_times"] {
            times = self.metrics["prediction_times"]
            summary["prediction_time_stats"] = {
                "mean": statistics.mean(times),
                "median": statistics.median(times),
                "min": min(times),
                "max": max(times),
                "std": statistics.stdev(times) if len(times) > 1 else 0
            }
        }
        
        # 准确率统计
        if self.metrics["accuracy_scores"] {
            accuracies = self.metrics["accuracy_scores"]
            summary["accuracy_stats"] = {
                "mean": statistics.mean(accuracies),
                "min": min(accuracies),
                "max": max(accuracies)
            }
        }
        
        # 系统资源统计
        if self.metrics["memory_usage"] {
            summary["memory_usage_avg"] = statistics.mean(self.metrics["memory_usage"])
        }
        
        if self.metrics["cpu_usage"] {
            summary["cpu_usage_avg"] = statistics.mean(self.metrics["cpu_usage"])
        }
        
        # 错误率
        if summary["total_predictions"] > 0 {
            summary["error_rate"] = summary["total_errors"] / summary["total_predictions"]
        } else {
            summary["error_rate"] = 0
        }
        
        return summary
    }
    
    func check_alerts() -> list[str] {
        alerts = []
        
        # 检查错误率
        summary = self.get_performance_summary()
        if summary["error_rate"] > 0.1 {
            alerts.append(f"高错误率警告: {summary['error_rate']:.2%}")
        }
        
        # 检查预测时间
        if "prediction_time_stats" in summary {
            avg_time = summary["prediction_time_stats"]["mean"]
            if avg_time > 5.0 {
                alerts.append(f"预测时间过长警告: {avg_time:.2f}秒")
            }
        }
        
        # 检查内存使用
        if "memory_usage_avg" in summary {
            if summary["memory_usage_avg"] > 80 {
                alerts.append(f"内存使用率过高警告: {summary['memory_usage_avg']:.1f}%")
            }
        }
        
        # 检查CPU使用
        if "cpu_usage_avg" in summary {
            if summary["cpu_usage_avg"] > 80 {
                alerts.append(f"CPU使用率过高警告: {summary['cpu_usage_avg']:.1f}%")
            }
        }
        
        return alerts
    }
}

# 使用监控器
monitor = ModelMonitor()

# 模拟监控数据
for i in range(100) {
    # 模拟预测
    prediction_time = random.uniform(0.1, 2.0)
    success = random.random() > 0.05  # 95%成功率
    accuracy = random.uniform(0.8, 0.95) if success else null
    
    monitor.log_prediction("text_classifier", prediction_time, success, accuracy)
    
    # 每10次记录系统指标
    if i % 10 == 0 {
        monitor.log_system_metrics()
    }
}

# 获取性能摘要
summary = monitor.get_performance_summary()
print("性能摘要:")
for key, value in summary.items() {
    print(f"  {key}: {value}")
}

# 检查警告
alerts = monitor.check_alerts()
if alerts {
    print("\n警告:")
    for alert in alerts {
        print(f"  - {alert}")
} else {
    print("\n系统运行正常")
}

本章小结

本章详细介绍了AI Script中的AI集成与机器学习功能。通过学习本章,你应该:

  • 掌握AI模型的加载、管理和推理接口
  • 了解数据预处理和特征工程的方法
  • 学会使用AI Script进行机器学习和深度学习模型训练
  • 掌握模型部署和API服务的实现
  • 理解模型监控和维护的重要性

这些AI集成功能使AI Script成为一个强大的AI应用开发平台。


练习题

  1. 实现一个文本情感分析的完整流程,包括数据预处理、模型训练和部署
  2. 创建一个图像分类模型的训练和评估系统
  3. 设计一个模型A/B测试框架来比较不同模型的性能
  4. 实现一个实时模型监控系统,包括性能指标和告警机制

下一章第五章:自动化脚本开发