🚀 3个月大模型实战学习详细计划
📅 第1个月:Karpathy GPT实战
Week 1: 看视频 + 手写GPT核心代码
🎯 本周目标: 理解Transformer架构,手写GPT核心代码
📚 学习资源:
- 主课程:Let's build GPT: from scratch
- 配套代码:nanoGPT
- 补充阅读:Attention is All You Need
📝 每日任务:
Day 1: 环境准备 + 理论基础
bash
# 环境设置
conda create -n gpt_from_scratch python=3.10 -y
conda activate gpt_from_scratch
pip install torch numpy matplotlib tiktoken datasets
# 学习任务
- 观看视频前30分钟:理解语言模型基础
- 阅读:什么是自回归语言模型
- 实践:下载并运行nanoGPT代码
Day 2: 数据处理与Tokenization
python
# 今日代码实践:data.py
import tiktoken
import torch
class DataLoader:
def __init__(self, text_file, block_size, batch_size):
with open(text_file, 'r') as f:
text = f.read()
# 使用GPT-2的tokenizer
enc = tiktoken.get_encoding('gpt2')
self.tokens = enc.encode(text)
self.block_size = block_size
self.batch_size = batch_size
def get_batch(self):
ix = torch.randint(len(self.tokens) - self.block_size, (self.batch_size,))
x = torch.stack([torch.tensor(self.tokens[i:i+self.block_size]) for i in ix])
y = torch.stack([torch.tensor(self.tokens[i+1:i+self.block_size+1]) for i in ix])
return x, y
# 测试你的实现
loader = DataLoader('shakespeare.txt', block_size=8, batch_size=4)
x, y = loader.get_batch()
print(f"Input shape: {x.shape}, Target shape: {y.shape}")
Day 3: 实现Multi-Head Attention
python
# 今日代码实践:attention.py
import torch
import torch.nn as nn
import torch.nn.functional as F
class Head(nn.Module):
def __init__(self, head_size, n_embd, block_size):
super().__init__()
self.key = nn.Linear(n_embd, head_size, bias=False)
self.query = nn.Linear(n_embd, head_size, bias=False)
self.value = nn.Linear(n_embd, head_size, bias=False)
self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))
def forward(self, x):
B, T, C = x.shape
k = self.key(x)
q = self.query(x)
v = self.value(x)
# 计算attention scores
wei = q @ k.transpose(-2, -1) * (C ** -0.5)
wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf'))
wei = F.softmax(wei, dim=-1)
# 应用attention
out = wei @ v
return out
class MultiHeadAttention(nn.Module):
def __init__(self, num_heads, head_size, n_embd, block_size):
super().__init__()
self.heads = nn.ModuleList([Head(head_size, n_embd, block_size) for _ in range(num_heads)])
self.proj = nn.Linear(n_embd, n_embd)
def forward(self, x):
out = torch.cat([h(x) for h in self.heads], dim=-1)
out = self.proj(out)
return out
# 测试你的实现
mha = MultiHeadAttention(num_heads=4, head_size=16, n_embd=64, block_size=8)
x = torch.randn(4, 8, 64)
out = mha(x)
print(f"Output shape: {out.shape}")
Day 4: 实现Transformer Block
python
# 今日代码实践:transformer_block.py
class FeedForward(nn.Module):
def __init__(self, n_embd):
super().__init__()
self.net = nn.Sequential(
nn.Linear(n_embd, 4 * n_embd),
nn.ReLU(),
nn.Linear(4 * n_embd, n_embd),
)
def forward(self, x):
return self.net(x)
class Block(nn.Module):
def __init__(self, n_embd, n_head, block_size):
super().__init__()
head_size = n_embd // n_head
self.sa = MultiHeadAttention(n_head, head_size, n_embd, block_size)
self.ffwd = FeedForward(n_embd)
self.ln1 = nn.LayerNorm(n_embd)
self.ln2 = nn.LayerNorm(n_embd)
def forward(self, x):
# Pre-norm formulation
x = x + self.sa(self.ln1(x))
x = x + self.ffwd(self.ln2(x))
return x
# 测试
block = Block(n_embd=64, n_head=4, block_size=8)
x = torch.randn(4, 8, 64)
out = block(x)
print(f"Block output shape: {out.shape}")
Day 5: 完整GPT模型实现
python
# 今日代码实践:gpt_model.py
class GPTLanguageModel(nn.Module):
def __init__(self, vocab_size, n_embd, n_head, n_layer, block_size):
super().__init__()
self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
self.position_embedding_table = nn.Embedding(block_size, n_embd)
self.blocks = nn.Sequential(*[Block(n_embd, n_head, block_size) for _ in range(n_layer)])
self.ln_f = nn.LayerNorm(n_embd)
self.lm_head = nn.Linear(n_embd, vocab_size)
self.block_size = block_size
def forward(self, idx, targets=None):
B, T = idx.shape
# 嵌入
tok_emb = self.token_embedding_table(idx)
pos_emb = self.position_embedding_table(torch.arange(T, device=idx.device))
x = tok_emb + pos_emb
# Transformer blocks
x = self.blocks(x)
x = self.ln_f(x)
logits = self.lm_head(x)
if targets is None:
loss = None
else:
B, T, C = logits.shape
logits = logits.view(B*T, C)
targets = targets.view(B*T)
loss = F.cross_entropy(logits, targets)
return logits, loss
def generate(self, idx, max_new_tokens):
for _ in range(max_new_tokens):
idx_cond = idx[:, -self.block_size:]
logits, loss = self(idx_cond)
logits = logits[:, -1, :]
probs = F.softmax(logits, dim=-1)
idx_next = torch.multinomial(probs, num_samples=1)
idx = torch.cat((idx, idx_next), dim=1)
return idx
# 实例化模型
model = GPTLanguageModel(
vocab_size=50257, # GPT-2 vocab size
n_embd=384,
n_head=6,
n_layer=6,
block_size=256
)
print(f"Model parameters: {sum(p.numel() for p in model.parameters())/1e6:.2f}M")
Day 6: 训练循环实现
python
# 今日代码实践:train.py
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter
def train_model():
# 超参数
batch_size = 32
block_size = 256
max_iters = 5000
eval_interval = 500
learning_rate = 3e-4
device = 'cuda' if torch.cuda.is_available() else 'cpu'
# 数据加载
loader = DataLoader('shakespeare.txt', block_size, batch_size)
# 模型和优化器
model = GPTLanguageModel(vocab_size=50257, n_embd=384, n_head=6, n_layer=6, block_size=block_size)
model = model.to(device)
optimizer = optim.AdamW(model.parameters(), lr=learning_rate)
# 训练循环
writer = SummaryWriter('runs/gpt_training')
for iter in range(max_iters):
# 评估
if iter % eval_interval == 0:
model.eval()
with torch.no_grad():
x, y = loader.get_batch()
x, y = x.to(device), y.to(device)
logits, loss = model(x, y)
print(f"step {iter}: loss {loss.item():.4f}")
writer.add_scalar('Loss/eval', loss.item(), iter)
# 训练
model.train()
x, y = loader.get_batch()
x, y = x.to(device), y.to(device)
logits, loss = model(x, y)
optimizer.zero_grad(set_to_none=True)
loss.backward()
optimizer.step()
if iter % 100 == 0:
writer.add_scalar('Loss/train', loss.item(), iter)
# 保存模型
torch.save(model.state_dict(), 'gpt_model.pth')
writer.close()
# 运行训练
if __name__ == "__main__":
train_model()
Day 7: 生成文本 + 周总结
python
# 今日代码实践:generate.py
import tiktoken
def generate_text(model_path, prompt="", max_tokens=100):
# 加载模型
model = GPTLanguageModel(vocab_size=50257, n_embd=384, n_head=6, n_layer=6, block_size=256)
model.load_state_dict(torch.load(model_path))
model.eval()
# 编码prompt
enc = tiktoken.get_encoding('gpt2')
tokens = enc.encode(prompt)
tokens = torch.tensor(tokens, dtype=torch.long)[None, ...]
# 生成
with torch.no_grad():
generated = model.generate(tokens, max_new_tokens=max_tokens)
# 解码
generated_text = enc.decode(generated[0].tolist())
return generated_text
# 测试生成
text = generate_text('gpt_model.pth', prompt="To be or not to be", max_tokens=50)
print(text)
📊 Week 1 交付物:
- 完整的GPT模型实现代码
- 训练好的小模型(在莎士比亚数据上)
- 文本生成Demo
- 学习笔记:Transformer架构理解
Week 2: 在你的GPU上训练小模型
🎯 本周目标: 在真实GPU上训练模型,理解训练过程
📝 每日任务:
Day 1: 优化训练代码
bash
# 使用你的GPU 0
CUDA_VISIBLE_DEVICES=0 python train.py
# 监控训练过程
tensorboard --logdir runs/gpt_training
Day 2-3: 扩展数据集
python
# 今日代码实践:data_processing.py
import datasets
from datasets import load_dataset
# 使用更大的数据集
dataset = load_dataset("openwebtext", split="train[:1%]") # 使用1%的数据
def process_dataset(examples):
enc = tiktoken.get_encoding('gpt2')
all_tokens = []
for text in examples['text']:
tokens = enc.encode(text)
if len(tokens) > 50: # 过滤太短的文本
all_tokens.extend(tokens)
return {'tokens': all_tokens}
# 处理数据
processed = dataset.map(process_dataset, batched=True, batch_size=1000)
Day 4-5: 实验不同模型配置
python
# 今日代码实践:experiments.py
configs = [
{"n_embd": 256, "n_head": 4, "n_layer": 4, "name": "small"},
{"n_embd": 384, "n_head": 6, "n_layer": 6, "name": "medium"},
{"n_embd": 512, "n_head": 8, "n_layer": 8, "name": "large"},
]
for config in configs:
print(f"Training {config['name']} model...")
model = GPTLanguageModel(
vocab_size=50257,
n_embd=config['n_embd'],
n_head=config['n_head'],
n_layer=config['n_layer'],
block_size=256
)
# 训练并保存
train_and_save(model, f"gpt_{config['name']}.pth")
Day 6-7: 评估和比较
python
# 今日代码实践:evaluation.py
def calculate_perplexity(model, test_data):
model.eval()
total_loss = 0
total_tokens = 0
with torch.no_grad():
for batch in test_data:
x, y = batch
logits, loss = model(x, y)
total_loss += loss.item() * x.size(0)
total_tokens += x.size(0)
avg_loss = total_loss / total_tokens
perplexity = torch.exp(torch.tensor(avg_loss))
return perplexity.item()
# 比较不同模型
models = ['small', 'medium', 'large']
for model_name in models:
model = load_model(f"gpt_{model_name}.pth")
ppl = calculate_perplexity(model, test_loader)
print(f"{model_name} model perplexity: {ppl:.2f}")
📊 Week 2 交付物:
- 在更大数据集上训练的模型
- 不同配置的模型性能比较
- 困惑度评估报告
- 训练监控仪表板
Week 3: 扩展功能(添加更多数据、调参)
🎯 本周目标: 优化模型性能,添加高级功能
📝 每日任务:
Day 1-2: 实现学习率调度
python
# 今日代码实践:scheduler.py
import math
class CosineWarmupScheduler:
def __init__(self, optimizer, warmup_steps, max_steps, min_lr=1e-5):
self.optimizer = optimizer
self.warmup_steps = warmup_steps
self.max_steps = max_steps
self.min_lr = min_lr
self.base_lr = optimizer.param_groups[0]['lr']
def step(self, step):
if step < self.warmup_steps:
lr = self.base_lr * step / self.warmup_steps
else:
progress = (step - self.warmup_steps) / (self.max_steps - self.warmup_steps)
lr = self.min_lr + (self.base_lr - self.min_lr) * 0.5 * (1 + math.cos(math.pi * progress))
for param_group in self.optimizer.param_groups:
param_group['lr'] = lr
return lr
# 在训练循环中使用
scheduler = CosineWarmupScheduler(optimizer, warmup_steps=1000, max_steps=10000)
Day 3-4: 添加Gradient Clipping和其他优化
python
# 今日代码实践:advanced_training.py
import torch.nn.utils as utils
def train_with_optimizations():
# 梯度裁剪
max_grad_norm = 1.0
# 混合精度训练
scaler = torch.cuda.amp.GradScaler()
for iter in range(max_iters):
optimizer.zero_grad()
with torch.cuda.amp.autocast():
logits, loss = model(x, y)
scaler.scale(loss).backward()
scaler.unscale_(optimizer)
utils.clip_grad_norm_(model.parameters(), max_grad_norm)
scaler.step(optimizer)
scaler.update()
scheduler.step(iter)
Day 5-6: 实现文本生成策略
python
# 今日代码实践:generation_strategies.py
def generate_with_strategies(model, prompt, strategy="greedy", **kwargs):
"""
支持多种生成策略
"""
tokens = encode(prompt)
if strategy == "greedy":
return greedy_generate(model, tokens, **kwargs)
elif strategy == "top_k":
return top_k_generate(model, tokens, **kwargs)
elif strategy == "top_p":
return top_p_generate(model, tokens, **kwargs)
elif strategy == "temperature":
return temperature_generate(model, tokens, **kwargs)
def top_k_generate(model, tokens, k=10, max_tokens=50):
for _ in range(max_tokens):
logits, _ = model(tokens)
logits = logits[:, -1, :]
# Top-k filtering
top_k_logits, top_k_indices = torch.topk(logits, k)
probs = F.softmax(top_k_logits, dim=-1)
next_token = torch.multinomial(probs, num_samples=1)
next_token = top_k_indices.gather(-1, next_token)
tokens = torch.cat([tokens, next_token], dim=1)
return tokens
def top_p_generate(model, tokens, p=0.9, max_tokens=50):
for _ in range(max_tokens):
logits, _ = model(tokens)
logits = logits[:, -1, :]
# Top-p (nucleus) sampling
sorted_logits, sorted_indices = torch.sort(logits, descending=True)
cumulative_probs = torch.cumsum(F.softmax(sorted_logits, dim=-1), dim=-1)
sorted_indices_to_remove = cumulative_probs > p
sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone()
sorted_indices_to_remove[..., 0] = 0
indices_to_remove = sorted_indices[sorted_indices_to_remove]
logits[:, indices_to_remove] = float('-inf')
probs = F.softmax(logits, dim=-1)
next_token = torch.multinomial(probs, num_samples=1)
tokens = torch.cat([tokens, next_token], dim=1)
return tokens
# 测试不同策略
strategies = ['greedy', 'top_k', 'top_p', 'temperature']
for strategy in strategies:
text = generate_with_strategies(model, "Once upon a time", strategy=strategy)
print(f"{strategy}: {text}")
Day 7: 性能优化和基准测试
python
# 今日代码实践:benchmarks.py
import time
import psutil
import GPUtil
def benchmark_model(model, batch_sizes=[1, 4, 8, 16]):
"""基准测试不同batch size的性能"""
results = {}
for batch_size in batch_sizes:
# 准备数据
x = torch.randint(0, 50257, (batch_size, 256)).cuda()
y = torch.randint(0, 50257, (batch_size, 256)).cuda()
# 预热
for _ in range(10):
model(x, y)
# 测试
torch.cuda.synchronize()
start_time = time.time()
for _ in range(100):
model(x, y)
torch.cuda.synchronize()
end_time = time.time()
# 计算指标
avg_time = (end_time - start_time) / 100
throughput = batch_size / avg_time
results[batch_size] = {
'avg_time': avg_time,
'throughput': throughput,
'gpu_memory': torch.cuda.max_memory_allocated() / 1024**3 # GB
}
return results
# 运行基准测试
benchmark_results = benchmark_model(model)
print("Benchmark Results:")
for batch_size, metrics in benchmark_results.items():
print(f"Batch Size {batch_size}: {metrics['throughput']:.2f} samples/sec, {metrics['gpu_memory']:.2f} GB")
📊 Week 3 交付物:
- 优化后的训练代码
- 多种文本生成策略实现
- 性能基准测试报告
- 模型性能对比分析
Week 4: 部署成Web应用
🎯 本周目标: 将训练好的模型部署为Web应用
📝 每日任务:
Day 1-2: 创建Gradio界面
python
# 今日代码实践:web_app.py
import gradio as gr
import torch
from model import GPTLanguageModel
import tiktoken
class GPTWebApp:
def __init__(self, model_path):
self.model = GPTLanguageModel(vocab_size=50257, n_embd=384, n_head=6, n_layer=6, block_size=256)
self.model.load_state_dict(torch.load(model_path))
self.model.eval()
self.enc = tiktoken.get_encoding('gpt2')
def generate_text(self, prompt, max_tokens, temperature, top_k, top_p):
try:
tokens = self.enc.encode(prompt)
tokens = torch.tensor(tokens, dtype=torch.long)[None, ...]
with torch.no_grad():
generated = self.generate_with_params(
tokens, max_tokens, temperature, top_k, top_p
)
result = self.enc.decode(generated[0].tolist())
return result
except Exception as e:
return f"Error: {str(e)}"
def generate_with_params(self, tokens, max_tokens, temperature, top_k, top_p):
for _ in range(max_tokens):
logits, _ = self.model(tokens)
logits = logits[:, -1, :] / temperature
# Apply top-k and top-p filtering
if top_k > 0:
top_k_logits, top_k_indices = torch.topk(logits, top_k)
logits = torch.full_like(logits, float('-inf'))
logits.scatter_(1, top_k_indices, top_k_logits)
if top_p < 1.0:
sorted_logits, sorted_indices = torch.sort(logits, descending=True)
cumulative_probs = torch.cumsum(F.softmax(sorted_logits, dim=-1), dim=-1)
sorted_indices_to_remove = cumulative_probs > top_p
sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone()
sorted_indices_to_remove[..., 0] = 0
indices_to_remove = sorted_indices[sorted_indices_to_remove]
logits.scatter_(1, indices_to_remove, float('-inf'))
probs = F.softmax(logits, dim=-1)
next_token = torch.multinomial(probs, num_samples=1)
tokens = torch.cat([tokens, next_token], dim=1)
return tokens
# 创建Web应用
app = GPTWebApp('gpt_model.pth')
# Gradio界面
with gr.Blocks(title="My GPT Model") as demo:
gr.Markdown("# 🤖 My GPT Text Generator")
with gr.Row():
with gr.Column():
prompt = gr.Textbox(label="Prompt", placeholder="Enter your prompt here...")
max_tokens = gr.Slider(minimum=1, maximum=200, value=50, label="Max Tokens")
temperature = gr.Slider(minimum=0.1, maximum=2.0, value=0.8, label="Temperature")
top_k = gr.Slider(minimum=0, maximum=100, value=50, label="Top K")
top_p = gr.Slider(minimum=0.1, maximum=1.0, value=0.9, label="Top P")
generate_btn = gr.Button("Generate")
with gr.Column():
output = gr.Textbox(label="Generated Text", lines=10)
generate_btn.click(
fn=app.generate_text,
inputs=[prompt, max_tokens, temperature, top_k, top_p],
outputs=output
)
# 示例
gr.Examples(
examples=[
["Once upon a time", 50, 0.8, 50, 0.9],
["The future of AI is", 100, 0.7, 40, 0.8],
["In a world where", 75, 0.9, 60, 0.95]
],
inputs=[prompt, max_tokens, temperature, top_k, top_p]
)
if __name__ == "__main__":
demo.launch(share=True, server_port=7860)
Day 3-4: 添加模型比较功能
python
# 今日代码实践:model_comparison.py
import gradio as gr
class ModelComparison:
def __init__(self):
self.models = {
'small': GPTWebApp('gpt_small.pth'),
'medium': GPTWebApp('gpt_medium.pth'),
'large': GPTWebApp('gpt_large.pth')
}
def compare_models(self, prompt, max_tokens, temperature):
results = {}
for name, model in self.models.items():
results[name] = model.generate_text(prompt, max_tokens, temperature, 50, 0.9)
return results['small'], results['medium'], results['large']
# 比较界面
comparison = ModelComparison()
with gr.Blocks() as comparison_demo:
gr.Markdown("# 🔍 Model Comparison")
with gr.Row():
prompt = gr.Textbox(label="Prompt")
max_tokens = gr.Slider(1, 100, 50, label="Max Tokens")
temperature = gr.Slider(0.1, 2.0, 0.8, label="Temperature")
compare_btn = gr.Button("Compare Models")
with gr.Row():
small_output = gr.Textbox(label="Small Model", lines=5)
medium_output = gr.Textbox(label="Medium Model", lines=5)
large_output = gr.Textbox(label="Large Model", lines=5)
compare_btn.click(
fn=comparison.compare_models,
inputs=[prompt, max_tokens, temperature],
outputs=[small_output, medium_output, large_output]
)
comparison_demo.launch()
Day 5-6: API服务部署
python
# 今日代码实践:api_server.py
from flask import Flask, request, jsonify
import torch
from model import GPTLanguageModel
import tiktoken
app = Flask(__name__)
# 全局模型加载
model = GPTLanguageModel(vocab_size=50257, n_embd=384, n_head=6, n_layer=6, block_size=256)
model.load_state_dict(torch.load('gpt_model.pth'))
model.eval()
enc = tiktoken.get_encoding('gpt2')
@app.route('/generate', methods=['POST'])
def generate():
try:
data = request.json
prompt = data.get('prompt', '')
max_tokens = data.get('max_tokens', 50)
temperature = data.get('temperature', 0.8)
tokens = enc.encode(prompt)
tokens = torch.tensor(tokens, dtype=torch.long)[None, ...]
with torch.no_grad():
generated = model.generate(tokens, max_new_tokens=max_tokens)
result = enc.decode(generated[0].tolist())
return jsonify({
'success': True,
'generated_text': result,
'prompt': prompt
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
@app.route('/health', methods=['GET'])
def health():
return jsonify({'status': 'healthy'})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Day 7: 性能监控和日志
python
# 今日代码实践:monitoring.py
import logging
import time
import psutil
import GPUtil
from functools import wraps
# 设置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('app.log'),
logging.StreamHandler()
]
)
def monitor_performance(func):
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
start_memory = psutil.virtual_memory().percent
# GPU监控
gpus = GPUtil.getGPUs()
start_gpu_memory = gpus[0].memoryUsed if gpus else 0
result = func(*args, **kwargs)
end_time = time.time()
end_memory = psutil.virtual_memory().percent
end_gpu_memory = gpus[0].memoryUsed if gpus else 0
logging.info(f"Function {func.__name__} executed in {end_time - start_time:.2f}s")
logging.info(f"Memory usage: {end_memory - start_memory:.2f}%")
logging.info(f"GPU memory used: {end_gpu_memory - start_gpu_memory} MB")
return result
return wrapper
# 使用监控装饰器
@monitor_performance
def generate_text_monitored(prompt, max_tokens=50):
# 你的生成代码
pass
# 添加到Flask应用
@app.route('/generate', methods=['POST'])
@monitor_performance
def generate():
# 你的生成代码
pass
📊 Week 4 交付物:
- 完整的Web应用界面
- 模型比较功能
- REST API服务
- 性能监控系统
- 部署文档
📅 第2个月:Hugging Face项目集
Week 1: 情感分析 + Gradio部署
🎯 本周目标: 掌握Hugging Face生态,构建情感分析应用
📚 学习资源:
📝 每日任务:
Day 1: Hugging Face基础
python
# 今日代码实践:hf_basics.py
from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification
import torch
# 使用预训练模型
classifier = pipeline("sentiment-analysis")
result = classifier("I love this movie!")
print(result)
# 手动加载模型
model_name = "cardiffnlp/twitter-roberta-base-sentiment-latest"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name)
# 手动推理
text = "This product is amazing!"
inputs = tokenizer(text, return_tensors="pt")
outputs = model(**inputs)
predictions = torch.nn.functional.softmax(outputs.logits, dim=-1)
print(predictions)
Day 2-3: 构建情感分析应用
python
# 今日代码实践:sentiment_app.py
import gradio as gr
from transformers import pipeline
import matplotlib.pyplot as plt
import numpy as np
class SentimentAnalyzer:
def __init__(self):
self.classifier = pipeline("sentiment-analysis",
model="cardiffnlp/twitter-roberta-base-sentiment-latest",
return_all_scores=True)
def analyze_sentiment(self, text):
if not text.strip():
return "Please enter some text", None
results = self.classifier(text)[0]
# 提取标签和分数
labels = [result['label'] for result in results]
scores = [result['score'] for result in results]
# 创建可视化
fig, ax = plt.subplots(figsize=(10, 6))
colors = ['red' if label == 'NEGATIVE' else 'green' if label == 'POSITIVE' else 'gray'
for label in labels]
bars = ax.bar(labels, scores, color=colors, alpha=0.7)
ax.set_ylabel('Confidence Score')
ax.set_title('Sentiment Analysis Results')
ax.set_ylim(0, 1)
# 添加数值标签
for bar, score in zip(bars, scores):
ax.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
f'{score:.3f}', ha='center', va='bottom')
plt.tight_layout()
# 返回结果文本
max_result = max(results, key=lambda x: x['score'])
result_text = f"Sentiment: {max_result['label']} (Confidence: {max_result['score']:.3f})"
return result_text, fig
def batch_analyze(self, texts):
"""批量分析多个文本"""
results = []
for text in texts.split('\n'):
if text.strip():
result = self.classifier(text)[0]
max_result = max(result, key=lambda x: x['score'])
results.append({
'text': text,
'sentiment': max_result['label'],
'confidence': max_result['score']
})
return results
# 创建Web应用
analyzer = SentimentAnalyzer()
with gr.Blocks(title="Sentiment Analysis App") as demo:
gr.Markdown("# 😊 Sentiment Analysis Tool")
with gr.Tab("Single Text Analysis"):
with gr.Row():
with gr.Column():
text_input = gr.Textbox(
label="Enter text to analyze",
placeholder="Type your text here...",
lines=3
)
analyze_btn = gr.Button("Analyze Sentiment")
with gr.Column():
result_text = gr.Textbox(label="Result", lines=2)
result_plot = gr.Plot(label="Confidence Scores")
analyze_btn.click(
fn=analyzer.analyze_sentiment,
inputs=text_input,
outputs=[result_text, result_plot]
)
with gr.Tab("Batch Analysis"):
batch_input = gr.Textbox(
label="Enter multiple texts (one per line)",
lines=5,
placeholder="Text 1\nText 2\nText 3..."
)
batch_btn = gr.Button("Analyze All")
batch_output = gr.Dataframe(
headers=["Text", "Sentiment", "Confidence"],
label="Results"
)
batch_btn.click(
fn=analyzer.batch_analyze,
inputs=batch_input,
outputs=batch_output
)
with gr.Tab("Examples"):
gr.Examples(
examples=[
["I absolutely love this product! It's amazing!"],
["This is the worst experience I've ever had."],
["The weather is okay today, nothing special."],
["I'm not sure how I feel about this movie."]
],
inputs=text_input
)
if __name__ == "__main__":
demo.launch(share=True, server_port=7861)
Day 4-5: 添加多语言支持
python
# 今日代码实践:multilingual_sentiment.py
from transformers import pipeline
import gradio as gr
class MultilingualSentimentAnalyzer:
def __init__(self):
self.analyzers = {
'English': pipeline("sentiment-analysis",
model="cardiffnlp/twitter-roberta-base-sentiment-latest"),
'Chinese': pipeline("sentiment-analysis",
model="uer/roberta-base-finetuned-dianping-chinese"),
'Spanish': pipeline("sentiment-analysis",
model="nlptown/bert-base-multilingual-uncased-sentiment"),
'French': pipeline("sentiment-analysis",
model="nlptown/bert-base-multilingual-uncased-sentiment")
}
def analyze_multilingual(self, text, language):
try:
analyzer = self.analyzers[language]
result = analyzer(text)
if isinstance(result, list):
result = result[0]
return f"Language: {language}\nSentiment: {result['label']}\nConfidence: {result['score']:.3f}"
except Exception as e:
return f"Error analyzing {language} text: {str(e)}"
# 创建多语言界面
multi_analyzer = MultilingualSentimentAnalyzer()
with gr.Blocks() as multi_demo:
gr.Markdown("# 🌍 Multilingual Sentiment Analysis")
with gr.Row():
text_input = gr.Textbox(label="Text", lines=3)
language_dropdown = gr.Dropdown(
choices=list(multi_analyzer.analyzers.keys()),
value="English",
label="Language"
)
analyze_btn = gr.Button("Analyze")
result_output = gr.Textbox(label="Result", lines=3)
analyze_btn.click(
fn=multi_analyzer.analyze_multilingual,
inputs=[text_input, language_dropdown],
outputs=result_output
)
multi_demo.launch()
Day 6-7: 模型微调
python
# 今日代码实践:fine_tune_sentiment.py
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from transformers import TrainingArguments, Trainer
from datasets import Dataset
import torch
def create_custom_dataset():
"""创建自定义数据集"""
texts = [
"I love this product!", "This is terrible", "Pretty good overall",
"Not bad", "Absolutely amazing", "Could be better"
]
labels = [1, 0, 1, 1, 1, 0] # 1: positive, 0: negative
return Dataset.from_dict({
'text': texts,
'labels': labels
})
def tokenize_function(examples):
return tokenizer(examples['text'], truncation=True, padding=True)
# 加载预训练模型
model_name = "distilbert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=2)
# 准备数据
dataset = create_custom_dataset()
tokenized_dataset = dataset.map(tokenize_function, batched=True)
# 训练配置
training_args = TrainingArguments(
output_dir='./results',
num_train_epochs=3,
per_device_train_batch_size=16,
per_device_eval_batch_size=64,
warmup_steps=500,
weight_decay=0.01,
logging_dir='./logs',
)
# 训练器
trainer = Trainer(
model=model,
args=training_args,
train_dataset=tokenized_dataset,
tokenizer=tokenizer,
)
# 开始训练
trainer.train()
# 保存模型
model.save_pretrained('./fine_tuned_sentiment')
tokenizer.save_pretrained('./fine_tuned_sentiment')
📊 Week 1 交付物:
- 完整的情感分析Web应用
- 多语言支持功能
- 微调的情感分析模型
- 批量分析功能
- 可视化结果展示
Week 2: 问答系统 + RAG实现
🎯 本周目标: 构建问答系统,实现RAG(检索增强生成)
📝 每日任务:
Day 1-2: 基础问答系统
python
# 今日代码实践:qa_system.py
from transformers import pipeline
import gradio as gr
from datasets import load_dataset
class QASystem:
def __init__(self):
self.qa_pipeline = pipeline("question-answering",
model="distilbert-base-cased-distilled-squad")
def answer_question(self, question, context):
if not question.strip() or not context.strip():
return "Please provide both question and context."
try:
result = self.qa_pipeline(question=question, context=context)
confidence = result['score']
answer = result['answer']
start = result['start']
end = result['end']
# 高亮答案在上下文中的位置
highlighted_context = (
context[:start] +
f"**{context[start:end]}**" +
context[end:]
)
return f"**Answer:** {answer}\n\n**Confidence:** {confidence:.3f}\n\n**Context with highlighted answer:**\n{highlighted_context}"
except Exception as e:
return f"Error: {str(e)}"
def batch_qa(self, questions, context):
"""批量问答"""
results = []
for question in questions.split('\n'):
if question.strip():
result = self.qa_pipeline(question=question, context=context)
results.append({
'question': question,
'answer': result['answer'],
'confidence': result['score']
})
return results
# 创建问答应用
qa_system = QASystem()
with gr.Blocks(title="Question Answering System") as qa_demo:
gr.Markdown("# 🤔 Question Answering System")
with gr.Tab("Single Question"):
with gr.Row():
with gr.Column():
context_input = gr.Textbox(
label="Context",
placeholder="Enter the context/passage here...",
lines=8
)
question_input = gr.Textbox(
label="Question",
placeholder="What would you like to know?",
lines=2
)
ask_btn = gr.Button("Ask Question")
with gr.Column():
answer_output = gr.Markdown(label="Answer")
ask_btn.click(
fn=qa_system.answer_question,
inputs=[question_input, context_input],
outputs=answer_output
)
with gr.Tab("Multiple Questions"):
context_batch = gr.Textbox(label="Context", lines=6)
questions_batch = gr.Textbox(
label="Questions (one per line)",
lines=4,
placeholder="Question 1\nQuestion 2\nQuestion 3..."
)
batch_btn = gr.Button("Answer All")
batch_results = gr.Dataframe(
headers=["Question", "Answer", "Confidence"],
label="Results"
)
batch_btn.click(
fn=qa_system.batch_qa,
inputs=[questions_batch, context_batch],
outputs=batch_results
)
with gr.Tab("Examples"):
gr.Examples(
examples=[
[
"What is the capital of France?",
"France is a country in Western Europe. Its capital and largest city is Paris, located in the north-central part of the country. Paris is known for its museums, architecture, and cultural significance."
],
[
"When was the Eiffel Tower built?",
"The Eiffel Tower is an iron lattice tower located in Paris, France. It was constructed from 1887 to 1889 as the entrance arch for the 1889 World's Fair. The tower was designed by Gustave Eiffel and stands 324 meters tall."
]
],
inputs=[question_input, context_input]
)
qa_demo.launch()
Day 3-4: RAG实现
python
# 今日代码实践:rag_system.py
from transformers import pipeline, AutoTokenizer, AutoModel
import torch
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
import faiss
import pickle
class RAGSystem:
def __init__(self):
self.qa_pipeline = pipeline("question-answering")
self.embedding_model = AutoModel.from_pretrained('sentence-transformers/all-MiniLM-L6-v2')
self.tokenizer = AutoTokenizer.from_pretrained('sentence-transformers/all-MiniLM-L6-v2')
self.documents = []
self.embeddings = None
self.index = None
def get_embeddings(self, texts):
"""获取文本嵌入"""
embeddings = []
for text in texts:
inputs = self.tokenizer(text, return_tensors='pt',
truncation=True, padding=True, max_length=512)
with torch.no_grad():
outputs = self.embedding_model(**inputs)
# 使用[CLS]标记的嵌入
embedding = outputs.last_hidden_state[:, 0, :].numpy()
embeddings.append(embedding[0])
return np.array(embeddings)
def build_index(self, documents):
"""构建文档索引"""
self.documents = documents
self.embeddings = self.get_embeddings(documents)
# 使用FAISS构建索引
dimension = self.embeddings.shape[1]
self.index = faiss.IndexFlatIP(dimension) # 内积索引
# 归一化嵌入以使用余弦相似度
normalized_embeddings = self.embeddings / np.linalg.norm(self.embeddings, axis=1, keepdims=True)
self.index.add(normalized_embeddings.astype('float32'))
def retrieve_documents(self, query, top_k=3):
"""检索相关文档"""
query_embedding = self.get_embeddings([query])
query_embedding = query_embedding / np.linalg.norm(query_embedding, axis=1, keepdims=True)
# 搜索最相似的文档
scores, indices = self.index.search(query_embedding.astype('float32'), top_k)
retrieved_docs = []
for i, idx in enumerate(indices[0]):
retrieved_docs.append({
'document': self.documents[idx],
'score': scores[0][i]
})
return retrieved_docs
def rag_answer(self, question, top_k=3):
"""RAG问答"""
if not self.documents:
return "No documents loaded. Please upload documents first."
# 检索相关文档
retrieved_docs = self.retrieve_documents(question, top_k)
# 合并检索到的文档作为上下文
context = "\n\n".join([doc['document'] for doc in retrieved_docs])
# 使用问答模型生成答案
try:
result = self.qa_pipeline(question=question, context=context)
# 格式化输出
answer = result['answer']
confidence = result['score']
# 显示检索到的文档
retrieved_info = "**Retrieved Documents:**\n"
for i, doc in enumerate(retrieved_docs):
retrieved_info += f"{i+1}. (Score: {doc['score']:.3f}) {doc['document'][:200]}...\n\n"
return f"**Answer:** {answer}\n\n**Confidence:** {confidence:.3f}\n\n{retrieved_info}"
except Exception as e:
return f"Error generating answer: {str(e)}"
def save_index(self, filename):
"""保存索引"""
with open(filename, 'wb') as f:
pickle.dump({
'documents': self.documents,
'embeddings': self.embeddings
}, f)
def load_index(self, filename):
"""加载索引"""
with open(filename, 'rb') as f:
data = pickle.load(f)
self.documents = data['documents']
self.embeddings = data['embeddings']
self.build_index(self.documents)
# 创建RAG应用
rag_system = RAGSystem()
def upload_documents(files):
"""上传文档"""
documents = []
for file in files:
with open(file.name, 'r', encoding='utf-8') as f:
content = f.read()
# 将文档分割成段落
paragraphs = content.split('\n\n')
documents.extend([p.strip() for p in paragraphs if p.strip()])
rag_system.build_index(documents)
return f"Uploaded and indexed {len(documents)} document segments."
with gr.Blocks(title="RAG Question Answering") as rag_demo:
gr.Markdown("# 📚 RAG Question Answering System")
with gr.Tab("Upload Documents"):
file_upload = gr.Files(label="Upload Text Files", file_types=['.txt'])
upload_btn = gr.Button("Build Index")
upload_status = gr.Textbox(label="Status")
upload_btn.click(
fn=upload_documents,
inputs=file_upload,
outputs=upload_status
)
with gr.Tab("Ask Questions"):
question_input = gr.Textbox(label="Question", lines=2)
top_k_slider = gr.Slider(
minimum=1, maximum=10, value=3,
label="Number of documents to retrieve"
)
ask_btn = gr.Button("Ask")
answer_output = gr.Markdown(label="Answer")
ask_btn.click(
fn=rag_system.rag_answer,
inputs=[question_input, top_k_slider],
outputs=answer_output
)
rag_demo.launch()
Day 5-6: 向量数据库集成
python
# 今日代码实践:vector_db_rag.py
import chromadb
from chromadb.utils import embedding_functions
import gradio as gr
from transformers import pipeline
class VectorDBRAG:
def __init__(self):
self.client = chromadb.Client()
self.collection = None
self.qa_pipeline = pipeline("question-answering")
# 使用ChromaDB的默认嵌入函数
self.embedding_function = embedding_functions.DefaultEmbeddingFunction()
def create_collection(self, name="documents"):
"""创建向量数据库集合"""
try:
self.collection = self.client.create_collection(
name=name,
embedding_function=self.embedding_function
)
return f"Collection '{name}' created successfully."
except Exception as e:
# 如果集合已存在,获取它
self.collection = self.client.get_collection(name)
return f"Using existing collection '{name}'."
def add_documents(self, documents):
"""添加文档到向量数据库"""
if not self.collection:
self.create_collection()
# 准备文档数据
ids = [f"doc_{i}" for i in range(len(documents))]
# 添加文档
self.collection.add(
documents=documents,
ids=ids
)
return f"Added {len(documents)} documents to the collection."
def query_documents(self, query, n_results=3):
"""查询相似文档"""
if not self.collection:
return []
results = self.collection.query(
query_texts=[query],
n_results=n_results
)
return results['documents'][0] if results['documents'] else []
def rag_answer(self, question, n_results=3):
"""RAG问答"""
if not self.collection:
return "No documents loaded. Please upload documents first."
# 查询相关文档
retrieved_docs = self.query_documents(question, n_results)
if not retrieved_docs:
return "No relevant documents found."
# 合并文档作为上下文
context = "\n\n".join(retrieved_docs)
try:
result = self.qa_pipeline(question=question, context=context)
answer = result['answer']
confidence = result['score']
# 显示检索到的文档
retrieved_info = "**Retrieved Documents:**\n"
for i, doc in enumerate(retrieved_docs):
retrieved_info += f"{i+1}. {doc[:300]}...\n\n"
return f"**Answer:** {answer}\n\n**Confidence:** {confidence:.3f}\n\n{retrieved_info}"
except Exception as e:
return f"Error: {str(e)}"
# 创建向量数据库RAG系统
vector_rag = VectorDBRAG()
def process_uploaded_files(files):
"""处理上传的文件"""
documents = []
for file in files:
with open(file.name, 'r', encoding='utf-8') as f:
content = f.read()
# 分割文档
paragraphs = content.split('\n\n')
documents.extend([p.strip() for p in paragraphs if p.strip()])
status = vector_rag.add_documents(documents)
return status
with gr.Blocks(title="Vector DB RAG") as vector_demo:
gr.Markdown("# 🔍 Vector Database RAG System")
with gr.Tab("Setup"):
collection_name = gr.Textbox(label="Collection Name", value="documents")
create_btn = gr.Button("Create Collection")
creation_status = gr.Textbox(label="Status")
create_btn.click(
fn=vector_rag.create_collection,
inputs=collection_name,
outputs=creation_status
)
gr.Markdown("---")
file_upload = gr.Files(label="Upload Documents", file_types=['.txt'])
upload_btn = gr.Button("Add to Collection")
upload_status = gr.Textbox(label="Upload Status")
upload_btn.click(
fn=process_uploaded_files,
inputs=file_upload,
outputs=upload_status
)
with gr.Tab("Query"):
question_input = gr.Textbox(label="Question", lines=2)
n_results_slider = gr.Slider(
minimum=1, maximum=10, value=3,
label="Number of documents to retrieve"
)
query_btn = gr.Button("Ask Question")
answer_output = gr.Markdown(label="Answer")
query_btn.click(
fn=vector_rag.rag_answer,
inputs=[question_input, n_results_slider],
outputs=answer_output
)
vector_demo.launch()
Day 7: 高级RAG特性
python
# 今日代码实践:advanced_rag.py
import re
from datetime import datetime
import torch
from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification
class AdvancedRAG:
def __init__(self):
self.qa_pipeline = pipeline("question-answering")
self.summarizer = pipeline("summarization", model="facebook/bart-large-cnn")
# 实体识别
self.ner_pipeline = pipeline("ner",
model="dbmdz/bert-large-cased-finetuned-conll03-english",
aggregation_strategy="simple")
self.documents = []
self.document_metadata = []
def preprocess_document(self, text, metadata=None):
"""预处理文档"""
# 提取实体
entities = self.ner_pipeline(text)
# 生成摘要
if len(text) > 1000:
summary = self.summarizer(text, max_length=130, min_length=30, do_sample=False)
summary_text = summary[0]['summary_text']
else:
summary_text = text
processed_doc = {
'text': text,
'summary': summary_text,
'entities': entities,
'metadata': metadata or {},
'timestamp': datetime.now().isoformat()
}
return processed_doc
def smart_chunking(self, text, chunk_size=500, overlap=50):
"""智能文档分块"""
# 按段落分割
paragraphs = text.split('\n\n')
chunks = []
current_chunk = ""
for paragraph in paragraphs:
if len(current_chunk) + len(paragraph) <= chunk_size:
current_chunk += paragraph + "\n\n"
else:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = paragraph + "\n\n"
if current_chunk:
chunks.append(current_chunk.strip())
# 添加重叠
overlapped_chunks = []
for i, chunk in enumerate(chunks):
if i > 0:
# 添加前一个块的结尾
prev_end = chunks[i-1][-overlap:]
chunk = prev_end + " " + chunk
overlapped_chunks.append(chunk)
return overlapped_chunks
def context_aware_retrieval(self, query, conversation_history=None):
"""上下文感知检索"""
# 如果有对话历史,将其与查询结合
if conversation_history:
enhanced_query = f"Previous context: {conversation_history[-1]}\nCurrent query: {query}"
else:
enhanced_query = query
# 提取查询中的实体
query_entities = self.ner_pipeline(enhanced_query)
# 基于实体匹配检索文档
relevant_docs = []
for doc in self.documents:
doc_entities = [entity['word'] for entity in doc['entities']]
query_entity_words = [entity['word'] for entity in query_entities]
# 计算实体重叠
entity_overlap = len(set(doc_entities) & set(query_entity_words))
if entity_overlap > 0:
relevant_docs.append((doc, entity_overlap))
# 按实体重叠排序
relevant_docs.sort(key=lambda x: x[1], reverse=True)
return [doc[0] for doc in relevant_docs[:3]]
def generate_contextual_answer(self, question, retrieved_docs, conversation_history=None):
"""生成上下文感知答案"""
# 合并检索到的文档
context_parts = []
for doc in retrieved_docs:
context_parts.append(doc['text'])
context = "\n\n".join(context_parts)
# 如果有对话历史,添加到上下文
if conversation_history:
context = f"Previous conversation:\n{conversation_history}\n\nRelevant documents:\n{context}"
# 生成答案
try:
result = self.qa_pipeline(question=question, context=context)
# 增强答案格式
answer = result['answer']
confidence = result['score']
# 提取答案中的实体
answer_entities = self.ner_pipeline(answer)
formatted_answer = f"**Answer:** {answer}\n\n**Confidence:** {confidence:.3f}\n\n"
if answer_entities:
formatted_answer += "**Key Entities in Answer:**\n"
for entity in answer_entities:
formatted_answer += f"- {entity['word']} ({entity['entity_group']})\n"
return formatted_answer
except Exception as e:
return f"Error generating answer: {str(e)}"
# 创建高级RAG应用
advanced_rag = AdvancedRAG()
def add_document_with_metadata(text, title="", author="", date=""):
"""添加带元数据的文档"""
metadata = {
'title': title,
'author': author,
'date': date
}
processed_doc = advanced_rag.preprocess_document(text, metadata)
advanced_rag.documents.append(processed_doc)
return f"Document '{title}' added successfully with {len(processed_doc['entities'])} entities extracted."
def contextual_qa(question, conversation_history=""):
"""上下文感知问答"""
if not advanced_rag.documents:
return "No documents loaded."
# 解析对话历史
history = conversation_history.split('\n') if conversation_history else None
# 检索相关文档
retrieved_docs = advanced_rag.context_aware_retrieval(question, history)
if not retrieved_docs:
return "No relevant documents found."
# 生成答案
answer = advanced_rag.generate_contextual_answer(question, retrieved_docs, conversation_history)
return answer
with gr.Blocks(title="Advanced RAG") as advanced_demo:
gr.Markdown("# 🚀 Advanced RAG System")
with gr.Tab("Add Documents"):
doc_text = gr.Textbox(label="Document Text", lines=10)
doc_title = gr.Textbox(label="Title")
doc_author = gr.Textbox(label="Author")
doc_date = gr.Textbox(label="Date")
add_btn = gr.Button("Add Document")
add_status = gr.Textbox(label="Status")
add_btn.click(
fn=add_document_with_metadata,
inputs=[doc_text, doc_title, doc_author, doc_date],
outputs=add_status
)
with gr.Tab("Contextual Q&A"):
question_input = gr.Textbox(label="Question", lines=2)
history_input = gr.Textbox(
label="Conversation History (optional)",
lines=5,
placeholder="Previous questions and answers..."
)
ask_btn = gr.Button("Ask Question")
answer_output = gr.Markdown(label="Answer")
ask_btn.click(
fn=contextual_qa,
inputs=[question_input, history_input],
outputs=answer_output
)
advanced_demo.launch()
📊 Week 2 交付物:
- 基础问答系统
- RAG实现
- 向量数据库集成
- 高级RAG特性(实体识别、上下文感知)
- 完整的文档检索系统
(由于篇幅限制,这里只展示了前2周的详细计划。完整的3个月计划会包含所有12周的详细内容。你希望我继续展示剩余的10周计划吗?)
你想先开始第1周的GPT实战,还是需要我继续完善后续的详细计划?