4.1 AI模型集成
模型加载与管理
import ai.models as models
import ai.config as config
# 配置AI服务
config.set_provider("openai", {
"api_key": "your-api-key",
"base_url": "https://api.openai.com/v1"
})
config.set_provider("huggingface", {
"token": "your-hf-token",
"cache_dir": "./models"
})
# 加载不同类型的模型
class ModelManager {
func __init__() {
self.models = {}
self.model_configs = {
"text_generation": {
"openai": "gpt-4",
"huggingface": "microsoft/DialoGPT-medium"
},
"text_embedding": {
"openai": "text-embedding-ada-002",
"huggingface": "sentence-transformers/all-MiniLM-L6-v2"
},
"image_generation": {
"openai": "dall-e-3",
"stability": "stable-diffusion-xl"
}
}
}
func load_model(model_type: str, provider: str = "openai") -> any {
model_key = f"{model_type}_{provider}"
if model_key in self.models {
return self.models[model_key]
}
model_name = self.model_configs[model_type][provider]
model = models.load(model_name, provider=provider)
self.models[model_key] = model
return model
}
func unload_model(model_type: str, provider: str = "openai") {
model_key = f"{model_type}_{provider}"
if model_key in self.models {
self.models[model_key].unload()
del self.models[model_key]
}
func list_loaded_models() -> list[str] {
return list(self.models.keys())
}
}
# 使用模型管理器
model_manager = ModelManager()
# 加载文本生成模型
text_model = model_manager.load_model("text_generation", "openai")
embedding_model = model_manager.load_model("text_embedding", "huggingface")
模型推理接口
# 统一的推理接口
class AIInference {
func __init__(model_manager: ModelManager) {
self.model_manager = model_manager
}
func generate_text(
prompt: str,
max_tokens: int = 100,
temperature: float = 0.7,
provider: str = "openai"
) -> str {
model = self.model_manager.load_model("text_generation", provider)
response = model.generate(
prompt=prompt,
max_tokens=max_tokens,
temperature=temperature
)
return response.text
}
func get_embeddings(
texts: list[str],
provider: str = "openai"
) -> list[list[float]] {
model = self.model_manager.load_model("text_embedding", provider)
embeddings = []
for text in texts {
embedding = model.encode(text)
embeddings.append(embedding.tolist())
}
return embeddings
}
func generate_image(
prompt: str,
size: str = "1024x1024",
quality: str = "standard",
provider: str = "openai"
) -> str {
model = self.model_manager.load_model("image_generation", provider)
response = model.generate(
prompt=prompt,
size=size,
quality=quality
)
return response.url
}
func analyze_sentiment(
text: str,
provider: str = "huggingface"
) -> dict {
# 使用预训练的情感分析模型
model = models.load("cardiffnlp/twitter-roberta-base-sentiment-latest")
result = model.predict(text)
return {
"label": result.label,
"confidence": result.confidence,
"scores": result.scores
}
}
}
# 使用推理接口
ai_inference = AIInference(model_manager)
# 文本生成示例
response = ai_inference.generate_text(
"请解释什么是机器学习",
max_tokens=200,
temperature=0.7
)
print(response)
# 文本嵌入示例
texts = ["机器学习", "深度学习", "人工智能"]
embeddings = ai_inference.get_embeddings(texts)
print(f"嵌入维度: {len(embeddings[0])}")
# 情感分析示例
sentiment = ai_inference.analyze_sentiment("今天天气真不错!")
print(f"情感: {sentiment['label']}, 置信度: {sentiment['confidence']:.2f}")
4.2 数据处理管道
数据预处理
import ai.data as data
import ai.preprocessing as prep
class DataPipeline {
func __init__() {
self.steps = []
self.fitted_transformers = {}
}
func add_step(name: str, transformer: callable) -> self {
self.steps.append((name, transformer))
return self
}
func fit(X: any, y: any? = null) -> self {
current_X = X
for name, transformer in self.steps {
if hasattr(transformer, "fit") {
transformer.fit(current_X, y)
self.fitted_transformers[name] = transformer
}
if hasattr(transformer, "transform") {
current_X = transformer.transform(current_X)
} else {
current_X = transformer(current_X)
}
}
return self
}
func transform(X: any) -> any {
current_X = X
for name, transformer in self.steps {
if name in self.fitted_transformers {
current_X = self.fitted_transformers[name].transform(current_X)
} else {
current_X = transformer(current_X)
}
}
return current_X
}
func fit_transform(X: any, y: any? = null) -> any {
return self.fit(X, y).transform(X)
}
}
# 文本预处理组件
class TextCleaner {
func __init__(lowercase: bool = true, remove_punctuation: bool = true) {
self.lowercase = lowercase
self.remove_punctuation = remove_punctuation
}
func __call__(texts: list[str]) -> list[str] {
cleaned_texts = []
for text in texts {
cleaned = text
if self.lowercase {
cleaned = cleaned.lower()
}
if self.remove_punctuation {
cleaned = re.sub(r'[^\w\s]', '', cleaned)
}
# 移除多余空格
cleaned = re.sub(r'\s+', ' ', cleaned).strip()
cleaned_texts.append(cleaned)
}
return cleaned_texts
}
}
class TextTokenizer {
func __init__(max_length: int = 512, padding: bool = true) {
self.max_length = max_length
self.padding = padding
self.tokenizer = null
}
func fit(texts: list[str]) {
# 构建词汇表
vocab = set()
for text in texts {
words = text.split()
vocab.update(words)
}
self.vocab = {word: i for i, word in enumerate(sorted(vocab))}
self.vocab["<PAD>"] = len(self.vocab)
self.vocab["<UNK>"] = len(self.vocab)
}
func transform(texts: list[str]) -> list[list[int]] {
tokenized_texts = []
for text in texts {
words = text.split()
tokens = [self.vocab.get(word, self.vocab["<UNK>"]) for word in words]
# 截断或填充
if len(tokens) > self.max_length {
tokens = tokens[:self.max_length]
} elif self.padding and len(tokens) < self.max_length {
tokens.extend([self.vocab["<PAD>"]] * (self.max_length - len(tokens)))
}
tokenized_texts.append(tokens)
}
return tokenized_texts
}
}
# 使用数据管道
texts = [
"Hello, World! This is a test.",
"Machine Learning is Amazing!!!",
"AI Script makes everything easier."
]
# 构建预处理管道
pipeline = DataPipeline()
pipeline.add_step("clean", TextCleaner())
pipeline.add_step("tokenize", TextTokenizer(max_length=10))
# 训练和转换
processed_texts = pipeline.fit_transform(texts)
print(processed_texts)
特征工程
import ai.features as features
import numpy as np
class FeatureExtractor {
func __init__() {
self.extractors = {}
}
func add_extractor(name: str, extractor: callable) {
self.extractors[name] = extractor
}
func extract(data: any) -> dict {
feature_dict = {}
for name, extractor in self.extractors.items() {
try {
feature_dict[name] = extractor(data)
} catch Exception as e {
print(f"特征提取器 {name} 失败: {e}")
feature_dict[name] = null
}
}
return feature_dict
}
func extract_batch(data_list: list) -> list[dict] {
return [self.extract(data) for data in data_list]
}
}
# 文本特征提取器
class TextFeatureExtractor(FeatureExtractor) {
func __init__() {
super().__init__()
self._setup_extractors()
}
func _setup_extractors() {
# 基础统计特征
self.add_extractor("length", lambda text: len(text))
self.add_extractor("word_count", lambda text: len(text.split()))
self.add_extractor("sentence_count", lambda text: len(text.split('.')))
# 字符特征
self.add_extractor("uppercase_ratio", self._uppercase_ratio)
self.add_extractor("digit_ratio", self._digit_ratio)
self.add_extractor("punctuation_ratio", self._punctuation_ratio)
# 语言特征
self.add_extractor("avg_word_length", self._avg_word_length)
self.add_extractor("unique_word_ratio", self._unique_word_ratio)
}
func _uppercase_ratio(text: str) -> float {
if len(text) == 0 {
return 0.0
}
return sum(1 for c in text if c.isupper()) / len(text)
}
func _digit_ratio(text: str) -> float {
if len(text) == 0 {
return 0.0
}
return sum(1 for c in text if c.isdigit()) / len(text)
}
func _punctuation_ratio(text: str) -> float {
if len(text) == 0 {
return 0.0
}
punctuation = "!\"#$%&'()*+,-./:;<=>?@[\\]^_`{|}~"
return sum(1 for c in text if c in punctuation) / len(text)
}
func _avg_word_length(text: str) -> float {
words = text.split()
if len(words) == 0 {
return 0.0
}
return sum(len(word) for word in words) / len(words)
}
func _unique_word_ratio(text: str) -> float {
words = text.split()
if len(words) == 0 {
return 0.0
}
return len(set(words)) / len(words)
}
}
# 使用特征提取器
text_extractor = TextFeatureExtractor()
sample_texts = [
"Hello, World! This is a test.",
"MACHINE LEARNING IS AMAZING!!!",
"AI Script makes everything easier and more efficient."
]
features_list = text_extractor.extract_batch(sample_texts)
for i, features in enumerate(features_list) {
print(f"文本 {i+1} 特征:")
for name, value in features.items() {
print(f" {name}: {value:.3f}")
print()
}
4.3 模型训练与评估
机器学习模型训练
import ai.ml as ml
import ai.metrics as metrics
from sklearn.model_selection import train_test_split
class MLTrainer {
func __init__() {
self.models = {}
self.training_history = {}
}
func train_classifier(
X: any,
y: any,
model_type: str = "random_forest",
test_size: float = 0.2,
**kwargs
) -> dict {
# 分割数据
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=42
)
# 选择模型
model = self._create_model(model_type, **kwargs)
# 训练模型
print(f"开始训练 {model_type} 模型...")
start_time = time.time()
model.fit(X_train, y_train)
training_time = time.time() - start_time
print(f"训练完成,耗时: {training_time:.2f}秒")
# 评估模型
train_pred = model.predict(X_train)
test_pred = model.predict(X_test)
results = {
"model": model,
"training_time": training_time,
"train_accuracy": metrics.accuracy_score(y_train, train_pred),
"test_accuracy": metrics.accuracy_score(y_test, test_pred),
"train_f1": metrics.f1_score(y_train, train_pred, average="weighted"),
"test_f1": metrics.f1_score(y_test, test_pred, average="weighted"),
"confusion_matrix": metrics.confusion_matrix(y_test, test_pred),
"classification_report": metrics.classification_report(y_test, test_pred)
}
# 保存模型和结果
model_name = f"{model_type}_{int(time.time())}"
self.models[model_name] = model
self.training_history[model_name] = results
return results
}
func _create_model(model_type: str, **kwargs) {
match model_type {
"random_forest" => {
from sklearn.ensemble import RandomForestClassifier
return RandomForestClassifier(
n_estimators=kwargs.get("n_estimators", 100),
max_depth=kwargs.get("max_depth", null),
random_state=42
)
}
"svm" => {
from sklearn.svm import SVC
return SVC(
kernel=kwargs.get("kernel", "rbf"),
C=kwargs.get("C", 1.0),
random_state=42
)
}
"logistic_regression" => {
from sklearn.linear_model import LogisticRegression
return LogisticRegression(
max_iter=kwargs.get("max_iter", 1000),
random_state=42
)
}
"neural_network" => {
from sklearn.neural_network import MLPClassifier
return MLPClassifier(
hidden_layer_sizes=kwargs.get("hidden_layer_sizes", (100,)),
max_iter=kwargs.get("max_iter", 500),
random_state=42
)
}
_ => {
raise ValueError(f"不支持的模型类型: {model_type}")
}
}
}
func compare_models(X: any, y: any, model_types: list[str]) -> dict {
results = {}
for model_type in model_types {
print(f"\n训练 {model_type} 模型...")
result = self.train_classifier(X, y, model_type)
results[model_type] = result
}
# 比较结果
print("\n模型比较结果:")
print(f"{'模型':<20} {'训练准确率':<12} {'测试准确率':<12} {'F1分数':<10} {'训练时间':<10}")
print("-" * 70)
for model_type, result in results.items() {
print(f"{model_type:<20} {result['train_accuracy']:<12.3f} {result['test_accuracy']:<12.3f} {result['test_f1']:<10.3f} {result['training_time']:<10.2f}s")
}
return results
}
}
# 使用示例
trainer = MLTrainer()
# 准备示例数据(文本分类)
from sklearn.datasets import fetch_20newsgroups
from sklearn.feature_extraction.text import TfidfVectorizer
# 加载数据
categories = ['alt.atheism', 'soc.religion.christian', 'comp.graphics', 'sci.med']
newsgroups_train = fetch_20newsgroups(subset='train', categories=categories)
# 特征提取
vectorizer = TfidfVectorizer(max_features=5000, stop_words='english')
X = vectorizer.fit_transform(newsgroups_train.data)
y = newsgroups_train.target
# 训练和比较多个模型
model_types = ["random_forest", "svm", "logistic_regression"]
comparison_results = trainer.compare_models(X, y, model_types)
深度学习模型训练
import ai.deep_learning as dl
import torch
import torch.nn as nn
import torch.optim as optim
class DeepLearningTrainer {
func __init__(device: str = "auto") {
if device == "auto" {
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
} else {
self.device = torch.device(device)
}
print(f"使用设备: {self.device}")
}
func train_neural_network(
model: nn.Module,
train_loader: any,
val_loader: any,
epochs: int = 10,
learning_rate: float = 0.001,
criterion: any = null,
optimizer: any = null
) -> dict {
# 设置默认损失函数和优化器
if criterion is null {
criterion = nn.CrossEntropyLoss()
}
if optimizer is null {
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
}
model.to(self.device)
# 训练历史
history = {
"train_loss": [],
"train_accuracy": [],
"val_loss": [],
"val_accuracy": []
}
for epoch in range(epochs) {
# 训练阶段
model.train()
train_loss = 0.0
train_correct = 0
train_total = 0
for batch_idx, (data, target) in enumerate(train_loader) {
data, target = data.to(self.device), target.to(self.device)
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
train_loss += loss.item()
_, predicted = torch.max(output.data, 1)
train_total += target.size(0)
train_correct += (predicted == target).sum().item()
}
# 验证阶段
model.eval()
val_loss = 0.0
val_correct = 0
val_total = 0
with torch.no_grad() {
for data, target in val_loader {
data, target = data.to(self.device), target.to(self.device)
output = model(data)
loss = criterion(output, target)
val_loss += loss.item()
_, predicted = torch.max(output.data, 1)
val_total += target.size(0)
val_correct += (predicted == target).sum().item()
}
}
# 计算平均损失和准确率
avg_train_loss = train_loss / len(train_loader)
train_accuracy = 100 * train_correct / train_total
avg_val_loss = val_loss / len(val_loader)
val_accuracy = 100 * val_correct / val_total
# 记录历史
history["train_loss"].append(avg_train_loss)
history["train_accuracy"].append(train_accuracy)
history["val_loss"].append(avg_val_loss)
history["val_accuracy"].append(val_accuracy)
print(f"Epoch {epoch+1}/{epochs}:")
print(f" 训练损失: {avg_train_loss:.4f}, 训练准确率: {train_accuracy:.2f}%")
print(f" 验证损失: {avg_val_loss:.4f}, 验证准确率: {val_accuracy:.2f}%")
}
return history
}
}
# 定义一个简单的神经网络
class SimpleNN(nn.Module) {
func __init__(input_size: int, hidden_size: int, num_classes: int) {
super(SimpleNN, self).__init__()
self.fc1 = nn.Linear(input_size, hidden_size)
self.relu = nn.ReLU()
self.dropout = nn.Dropout(0.2)
self.fc2 = nn.Linear(hidden_size, num_classes)
}
func forward(x) {
x = self.fc1(x)
x = self.relu(x)
x = self.dropout(x)
x = self.fc2(x)
return x
}
}
# 使用深度学习训练器
dl_trainer = DeepLearningTrainer()
# 创建模型
model = SimpleNN(input_size=784, hidden_size=128, num_classes=10)
# 假设我们有训练和验证数据加载器
# train_loader = ...
# val_loader = ...
# 训练模型
# history = dl_trainer.train_neural_network(
# model=model,
# train_loader=train_loader,
# val_loader=val_loader,
# epochs=20,
# learning_rate=0.001
# )
4.4 模型部署与推理
模型序列化与加载
import ai.deployment as deploy
import pickle
import joblib
import torch
class ModelSerializer {
func save_sklearn_model(model: any, filepath: str) {
"""保存scikit-learn模型"""
joblib.dump(model, filepath)
print(f"模型已保存到: {filepath}")
}
func load_sklearn_model(filepath: str) -> any {
"""加载scikit-learn模型"""
model = joblib.load(filepath)
print(f"模型已从 {filepath} 加载")
return model
}
func save_pytorch_model(model: nn.Module, filepath: str) {
"""保存PyTorch模型"""
torch.save(model.state_dict(), filepath)
print(f"PyTorch模型已保存到: {filepath}")
}
func load_pytorch_model(model_class: type, filepath: str, **kwargs) -> nn.Module {
"""加载PyTorch模型"""
model = model_class(**kwargs)
model.load_state_dict(torch.load(filepath))
model.eval()
print(f"PyTorch模型已从 {filepath} 加载")
return model
}
func save_model_metadata(metadata: dict, filepath: str) {
"""保存模型元数据"""
with open(filepath, "w") as f {
json.dump(metadata, f, indent=2)
}
func load_model_metadata(filepath: str) -> dict {
"""加载模型元数据"""
with open(filepath, "r") as f {
return json.load(f)
}
}
}
# 模型部署服务
class ModelService {
func __init__() {
self.models = {}
self.serializer = ModelSerializer()
}
func register_model(
name: str,
model: any,
preprocessor: callable? = null,
postprocessor: callable? = null
) {
"""注册模型到服务中"""
self.models[name] = {
"model": model,
"preprocessor": preprocessor,
"postprocessor": postprocessor,
"created_at": time.time(),
"prediction_count": 0
}
print(f"模型 '{name}' 已注册")
}
func predict(name: str, input_data: any) -> any {
"""使用指定模型进行预测"""
if name not in self.models {
raise ValueError(f"模型 '{name}' 未找到")
}
model_info = self.models[name]
model = model_info["model"]
# 预处理
if model_info["preprocessor"] is not null {
input_data = model_info["preprocessor"](input_data)
}
# 预测
if hasattr(model, "predict") {
prediction = model.predict(input_data)
} elif hasattr(model, "forward") {
with torch.no_grad() {
prediction = model(input_data)
}
} else {
prediction = model(input_data)
}
# 后处理
if model_info["postprocessor"] is not null {
prediction = model_info["postprocessor"](prediction)
}
# 更新统计信息
model_info["prediction_count"] += 1
return prediction
}
func get_model_info(name: str) -> dict {
"""获取模型信息"""
if name not in self.models {
raise ValueError(f"模型 '{name}' 未找到")
}
model_info = self.models[name].copy()
del model_info["model"] # 不返回模型对象
return model_info
}
func list_models() -> list[str] {
"""列出所有注册的模型"""
return list(self.models.keys())
}
}
# 使用示例
model_service = ModelService()
# 注册模型
# 假设我们有一个训练好的模型
# trained_model = ...
# model_service.register_model(
# name="text_classifier",
# model=trained_model,
# preprocessor=lambda x: vectorizer.transform([x]),
# postprocessor=lambda x: x[0]
# )
# 进行预测
# result = model_service.predict("text_classifier", "这是一个测试文本")
# print(f"预测结果: {result}")
API服务部署
import ai.api as api
from flask import Flask, request, jsonify
class ModelAPI {
func __init__(model_service: ModelService) {
self.app = Flask(__name__)
self.model_service = model_service
self._setup_routes()
}
func _setup_routes() {
@self.app.route("/health", methods=["GET"])
func health_check() {
return jsonify({"status": "healthy", "timestamp": time.time()})
}
@self.app.route("/models", methods=["GET"])
func list_models() {
models = self.model_service.list_models()
return jsonify({"models": models})
}
@self.app.route("/models/<model_name>/info", methods=["GET"])
func get_model_info(model_name) {
try {
info = self.model_service.get_model_info(model_name)
return jsonify(info)
} catch ValueError as e {
return jsonify({"error": str(e)}), 404
}
}
@self.app.route("/predict/<model_name>", methods=["POST"])
func predict(model_name) {
try {
data = request.get_json()
if "input" not in data {
return jsonify({"error": "缺少 'input' 字段"}), 400
}
input_data = data["input"]
prediction = self.model_service.predict(model_name, input_data)
return jsonify({
"prediction": prediction.tolist() if hasattr(prediction, "tolist") else prediction,
"model": model_name,
"timestamp": time.time()
})
} catch ValueError as e {
return jsonify({"error": str(e)}), 404
} catch Exception as e {
return jsonify({"error": f"预测失败: {str(e)}"}), 500
}
}
@self.app.route("/batch_predict/<model_name>", methods=["POST"])
func batch_predict(model_name) {
try {
data = request.get_json()
if "inputs" not in data {
return jsonify({"error": "缺少 'inputs' 字段"}), 400
}
inputs = data["inputs"]
predictions = []
for input_data in inputs {
prediction = self.model_service.predict(model_name, input_data)
predictions.append(
prediction.tolist() if hasattr(prediction, "tolist") else prediction
)
}
return jsonify({
"predictions": predictions,
"model": model_name,
"count": len(predictions),
"timestamp": time.time()
})
} catch ValueError as e {
return jsonify({"error": str(e)}), 404
} catch Exception as e {
return jsonify({"error": f"批量预测失败: {str(e)}"}), 500
}
}
}
func run(host: str = "0.0.0.0", port: int = 5000, debug: bool = false) {
print(f"启动模型API服务,地址: http://{host}:{port}")
self.app.run(host=host, port=port, debug=debug)
}
}
# 部署API服务
api_service = ModelAPI(model_service)
# 启动服务
if __name__ == "__main__" {
api_service.run(port=8080, debug=true)
}
4.5 模型监控与维护
性能监控
import ai.monitoring as monitoring
import time
import statistics
class ModelMonitor {
func __init__() {
self.metrics = {
"prediction_times": [],
"prediction_counts": {},
"error_counts": {},
"accuracy_scores": [],
"memory_usage": [],
"cpu_usage": []
}
self.start_time = time.time()
}
func log_prediction(
model_name: str,
prediction_time: float,
success: bool = true,
accuracy: float? = null
) {
# 记录预测时间
self.metrics["prediction_times"].append(prediction_time)
# 记录预测次数
if model_name not in self.metrics["prediction_counts"] {
self.metrics["prediction_counts"][model_name] = 0
}
self.metrics["prediction_counts"][model_name] += 1
# 记录错误次数
if not success {
if model_name not in self.metrics["error_counts"] {
self.metrics["error_counts"][model_name] = 0
}
self.metrics["error_counts"][model_name] += 1
}
# 记录准确率
if accuracy is not null {
self.metrics["accuracy_scores"].append(accuracy)
}
}
func log_system_metrics() {
import psutil
# 记录内存使用率
memory_percent = psutil.virtual_memory().percent
self.metrics["memory_usage"].append(memory_percent)
# 记录CPU使用率
cpu_percent = psutil.cpu_percent()
self.metrics["cpu_usage"].append(cpu_percent)
}
func get_performance_summary() -> dict {
summary = {
"uptime": time.time() - self.start_time,
"total_predictions": sum(self.metrics["prediction_counts"].values()),
"total_errors": sum(self.metrics["error_counts"].values())
}
# 预测时间统计
if self.metrics["prediction_times"] {
times = self.metrics["prediction_times"]
summary["prediction_time_stats"] = {
"mean": statistics.mean(times),
"median": statistics.median(times),
"min": min(times),
"max": max(times),
"std": statistics.stdev(times) if len(times) > 1 else 0
}
}
# 准确率统计
if self.metrics["accuracy_scores"] {
accuracies = self.metrics["accuracy_scores"]
summary["accuracy_stats"] = {
"mean": statistics.mean(accuracies),
"min": min(accuracies),
"max": max(accuracies)
}
}
# 系统资源统计
if self.metrics["memory_usage"] {
summary["memory_usage_avg"] = statistics.mean(self.metrics["memory_usage"])
}
if self.metrics["cpu_usage"] {
summary["cpu_usage_avg"] = statistics.mean(self.metrics["cpu_usage"])
}
# 错误率
if summary["total_predictions"] > 0 {
summary["error_rate"] = summary["total_errors"] / summary["total_predictions"]
} else {
summary["error_rate"] = 0
}
return summary
}
func check_alerts() -> list[str] {
alerts = []
# 检查错误率
summary = self.get_performance_summary()
if summary["error_rate"] > 0.1 {
alerts.append(f"高错误率警告: {summary['error_rate']:.2%}")
}
# 检查预测时间
if "prediction_time_stats" in summary {
avg_time = summary["prediction_time_stats"]["mean"]
if avg_time > 5.0 {
alerts.append(f"预测时间过长警告: {avg_time:.2f}秒")
}
}
# 检查内存使用
if "memory_usage_avg" in summary {
if summary["memory_usage_avg"] > 80 {
alerts.append(f"内存使用率过高警告: {summary['memory_usage_avg']:.1f}%")
}
}
# 检查CPU使用
if "cpu_usage_avg" in summary {
if summary["cpu_usage_avg"] > 80 {
alerts.append(f"CPU使用率过高警告: {summary['cpu_usage_avg']:.1f}%")
}
}
return alerts
}
}
# 使用监控器
monitor = ModelMonitor()
# 模拟监控数据
for i in range(100) {
# 模拟预测
prediction_time = random.uniform(0.1, 2.0)
success = random.random() > 0.05 # 95%成功率
accuracy = random.uniform(0.8, 0.95) if success else null
monitor.log_prediction("text_classifier", prediction_time, success, accuracy)
# 每10次记录系统指标
if i % 10 == 0 {
monitor.log_system_metrics()
}
}
# 获取性能摘要
summary = monitor.get_performance_summary()
print("性能摘要:")
for key, value in summary.items() {
print(f" {key}: {value}")
}
# 检查警告
alerts = monitor.check_alerts()
if alerts {
print("\n警告:")
for alert in alerts {
print(f" - {alert}")
} else {
print("\n系统运行正常")
}
本章小结
本章详细介绍了AI Script中的AI集成与机器学习功能。通过学习本章,你应该:
- 掌握AI模型的加载、管理和推理接口
- 了解数据预处理和特征工程的方法
- 学会使用AI Script进行机器学习和深度学习模型训练
- 掌握模型部署和API服务的实现
- 理解模型监控和维护的重要性
这些AI集成功能使AI Script成为一个强大的AI应用开发平台。
练习题
- 实现一个文本情感分析的完整流程,包括数据预处理、模型训练和部署
- 创建一个图像分类模型的训练和评估系统
- 设计一个模型A/B测试框架来比较不同模型的性能
- 实现一个实时模型监控系统,包括性能指标和告警机制
下一章:第五章:自动化脚本开发