NLP基础与词嵌入:让AI理解文字(superior哥深度学习系列第13期)

发布于:2025-06-13 ⋅ 阅读:(17) ⋅ 点赞:(0)

13_NLP基础与词嵌入:让AI理解文字

superior哥深度学习系列第十三篇
从像素到文字,从视觉到语言——让AI跨越认知的桥梁

🎯 前言:当AI学会"读懂"文字

各位小伙伴们,欢迎来到superior哥深度学习系列的第十三篇!前面我们深入学习了计算机视觉领域的各种技术,从图像分类到目标检测,再到图像分割。今天我们要跨越一个全新的领域——自然语言处理(NLP)

如果说计算机视觉是让AI拥有"眼睛",那么自然语言处理就是让AI拥有"语言能力"。想象一下,当AI不仅能看懂图片,还能理解文字、对话、甚至创作文章,那将是多么激动人心的事情!

在这个信息爆炸的时代,文本数据无处不在——新闻、社交媒体、客服对话、法律文件…如何让AI理解这些文字的含义,成为了人工智能发展的关键一环。

🌟 本文亮点预览:

  • 📚 从零开始理解NLP基础理论和文本预处理技术
  • 🔤 深度解析Word2Vec、GloVe、FastText三大词嵌入技术
  • 💻 完整实现新闻分类系统,包含特征工程和模型优化
  • 📊 掌握NLP任务评估指标和性能调优策略
  • 🎯 提供实战项目模板和最佳实践指南

📊 知识架构图

NLP基础与词嵌入技术
├── NLP基础概念
│   ├── 文本预处理
│   ├── 语言模型基础
│   ├── 词汇表示方法
│   └── 评估指标体系
├── 词嵌入技术
│   ├── Word2Vec
│   │   ├── Skip-gram模型
│   │   ├── CBOW模型
│   │   └── 负采样技术
│   ├── GloVe
│   │   ├── 全局统计信息
│   │   ├── 矩阵分解
│   │   └── 局部上下文
│   └── FastText
│       ├── 子词信息
│       ├── N-gram特征
│       └── 未登录词处理
├── 文本分类应用
│   ├── 特征工程
│   ├── 模型架构
│   ├── 训练技巧
│   └── 性能优化
└── 实战项目
    ├── 新闻分类系统
    ├── 情感分析应用
    ├── 文本相似度计算
    └── 词汇语义分析

🧠 第一章:NLP基础理论与文本预处理

1.1 什么是自然语言处理?

自然语言处理(Natural Language Processing, NLP)是人工智能和语言学的交叉领域,致力于让计算机能够理解、处理和生成人类语言。

🎯 NLP的核心挑战
import re
import jieba
import pandas as pd
import numpy as np
from collections import Counter
import matplotlib.pyplot as plt
import seaborn as sns
from wordcloud import WordCloud
import warnings
warnings.filterwarnings('ignore')

# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei']  # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False   # 用来正常显示负号

class NLPChallengesDemo:
    """NLP核心挑战演示"""
    
    def __init__(self):
        self.challenges = {
            '歧义性': {
                '词汇歧义': ['银行', '苹果', '春天'],
                '句法歧义': ['我看见了望远镜里的人', '飞机上的人很多'],
                '语义歧义': ['时间过得真快', '这个苹果很甜']
            },
            '多样性': {
                '同义词': ['快乐-高兴-愉悦', '汽车-轿车-车辆'],
                '方言差异': ['你好-您好-你好呀'],
                '语言风格': ['正式-非正式-网络语言']
            },
            '上下文依赖': {
                '代词指代': ['他很聪明', '这个很好'],
                '省略现象': ['去吗?', '好的'],
                '语境理解': ['反话', '暗示', '隐喻']
            }
        }
    
    def demonstrate_ambiguity(self):
        """演示语言歧义性"""
        print("=== NLP的核心挑战:语言歧义性 ===\n")
        
        # 词汇歧义示例
        ambiguous_examples = {
            '银行': ['金融机构', '河岸边缘'],
            '苹果': ['水果', '科技公司'],
            '春天': ['季节', '青春年华', '人名'],
            '打球': ['运动', '制作球体'],
            '花': ['植物', '花费', '花纹']
        }
        
        print("🔍 词汇歧义示例:")
        for word, meanings in ambiguous_examples.items():
            print(f"'{word}' 可能的含义:{' | '.join(meanings)}")
        
        # 句法歧义示例
        syntactic_examples = [
            {
                '句子': '我看见了望远镜里的人',
                '理解1': '我通过望远镜看见了人',
                '理解2': '我看见了在望远镜里面的人'
            },
            {
                '句子': '漂亮的女孩的衣服',
                '理解1': '(漂亮的女孩)的衣服',
                '理解2': '漂亮的(女孩的衣服)'
            }
        ]
        
        print(f"\n🔍 句法歧义示例:")
        for example in syntactic_examples:
            print(f"句子:{example['句子']}")
            print(f"  理解1:{example['理解1']}")
            print(f"  理解2:{example['理解2']}")
            print()
    
    def demonstrate_diversity(self):
        """演示语言多样性"""
        print("=== NLP的核心挑战:语言多样性 ===\n")
        
        # 同义词表达
        synonyms_groups = [
            ['快乐', '高兴', '愉悦', '欢喜', '开心', '兴奋'],
            ['汽车', '轿车', '车辆', '座驾', '车子', '代步工具'],
            ['房子', '住宅', '房屋', '居所', '家', '住所'],
            ['食物', '食品', '美食', '吃的', '食材', '餐食']
        ]
        
        print("🔍 同义词表达多样性:")
        for i, group in enumerate(synonyms_groups, 1):
            print(f"组{i}{' | '.join(group)}")
        
        # 语言风格差异
        style_examples = {
            '正式': '尊敬的客户,您好!感谢您对我们产品的关注。',
            '非正式': '亲,你好!感谢你关注我们的产品哦~',
            '网络语言': '宝贝,你好!谢谢关注我们产品,么么哒!',
            '古文风格': '客官,有礼了!感谢垂青敝司产品。'
        }
        
        print(f"\n🔍 语言风格多样性:")
        for style, example in style_examples.items():
            print(f"{style}{example}")
    
    def demonstrate_context_dependency(self):
        """演示上下文依赖"""
        print("=== NLP的核心挑战:上下文依赖 ===\n")
        
        # 代词指代示例
        pronoun_examples = [
            {
                '上下文': '小明和小红一起去图书馆。',
                '句子': '他很喜欢看书。',
                '分析': '"他"指代"小明",需要上下文才能确定'
            },
            {
                '上下文': '这款手机很好用,那款手机也不错。',
                '句子': '这个更便宜一些。',
                '分析': '"这个"指代前面提到的"这款手机"'
            }
        ]
        
        print("🔍 代词指代示例:")
        for example in pronoun_examples:
            print(f"上下文:{example['上下文']}")
            print(f"句子:{example['句子']}")
            print(f"分析:{example['分析']}")
            print()
        
        # 语境理解示例
        context_examples = [
            {
                '场景': '朋友迟到了',
                '话语': '你可真准时啊!',
                '真实含义': '反话,表示不满'
            },
            {
                '场景': '夏天很热',
                '话语': '今天天气真好啊!',
                '真实含义': '反话,表示抱怨'
            }
        ]
        
        print("🔍 语境理解示例:")
        for example in context_examples:
            print(f"场景:{example['场景']}")
            print(f"话语:{example['话语']}")
            print(f"真实含义:{example['真实含义']}")
            print()

# 运行NLP挑战演示
challenges_demo = NLPChallengesDemo()
challenges_demo.demonstrate_ambiguity()
challenges_demo.demonstrate_diversity()
challenges_demo.demonstrate_context_dependency()

1.2 文本预处理:数据清洗的艺术

文本预处理是NLP的第一步,就像做菜前要洗菜一样重要。原始文本数据往往包含各种噪声和不规范的格式,需要通过预处理来提高数据质量。

🧹 文本预处理工具包
import string
import re
from collections import Counter
import jieba
import jieba.posseg as pseg
from zhon.hanzi import punctuation as zh_punctuation

class TextPreprocessor:
    """文本预处理工具包"""
    
    def __init__(self):
        self.stop_words = self._load_stop_words()
        self.punctuation = string.punctuation + zh_punctuation
        
    def _load_stop_words(self):
        """加载停用词表"""
        # 常用中文停用词
        stop_words = {
            '的', '了', '在', '是', '我', '有', '和', '就', '不', '人', '都', '一', '一个', '上', '也', '很', '到', '说', '要', '去', '你', '会', '着', '没有', '看', '好', '自己', '这', '那', '它', '他', '她', '我们', '你们', '他们', '这个', '那个', '这些', '那些', '这样', '那样', '什么', '怎么', '为什么', '因为', '所有', '可以', '如果', '虽然', '但是', '然而', '而且', '还有', '或者', '以及', '等等', '比如', '例如', '包括', '关于', '对于', '由于', '根据', '按照', '通过', '经过', '进行', '实现', '完成', '开始', '结束', '继续', '保持', '成为', '变成', '属于', '位于', '来自', '用于', '作为', '当做', '称为', '叫做'
        }
        return stop_words
    
    def clean_text(self, text):
        """基础文本清洗"""
        if not isinstance(text, str):
            return ""
        
        # 移除多余空白字符
        text = re.sub(r'\s+', ' ', text)
        
        # 移除特殊字符和数字(保留中英文)
        text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z\s]', '', text)
        
        # 转为小写(英文)
        text = text.lower()
        
        # 去除首尾空格
        text = text.strip()
        
        return text
    
    def advanced_clean(self, text):
        """高级文本清洗"""
        if not isinstance(text, str):
            return ""
        
        # 移除URL
        text = re.sub(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', '', text)
        
        # 移除邮箱
        text = re.sub(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', '', text)
        
        # 移除HTML标签
        text = re.sub(r'<[^>]+>', '', text)
        
        # 移除表情符号
        text = re.sub(r'[\U0001F600-\U0001F64F\U0001F300-\U0001F5FF\U0001F680-\U0001F6FF\U0001F1E0-\U0001F1FF\U00002702-\U000027B0\U000024C2-\U0001F251]+', '', text)
        
        # 标准化空白字符
        text = re.sub(r'\s+', ' ', text)
        
        return text.strip()
    
    def tokenize_chinese(self, text):
        """中文分词"""
        if not text:
            return []
        
        # 使用jieba进行分词
        tokens = jieba.lcut(text)
        
        # 过滤掉空字符串和单字符
        tokens = [token for token in tokens if len(token) > 1]
        
        return tokens
    
    def remove_stopwords(self, tokens):
        """移除停用词"""
        if not tokens:
            return []
        
        filtered_tokens = [token for token in tokens if token not in self.stop_words]
        return filtered_tokens
    
    def pos_tagging(self, text):
        """词性标注"""
        if not text:
            return []
        
        # 使用jieba进行词性标注
        words = pseg.lcut(text)
        
        # 返回词汇和词性的元组列表
        result = [(word, flag) for word, flag in words if len(word) > 1]
        
        return result
    
    def extract_keywords(self, tokens, top_k=10):
        """提取关键词"""
        if not tokens:
            return []
        
        # 计算词频
        word_freq = Counter(tokens)
        
        # 返回频率最高的top_k个词汇
        keywords = word_freq.most_common(top_k)
        
        return keywords
    
    def preprocess_pipeline(self, text, include_pos=False):
        """完整的预处理管道"""
        if not text:
            return []
        
        # 步骤1:高级清洗
        cleaned_text = self.advanced_clean(text)
        
        # 步骤2:分词
        tokens = self.tokenize_chinese(cleaned_text)
        
        # 步骤3:移除停用词
        filtered_tokens = self.remove_stopwords(tokens)
        
        # 步骤4:词性标注(可选)
        if include_pos:
            pos_result = self.pos_tagging(cleaned_text)
            return {
                'tokens': filtered_tokens,
                'pos_tags': pos_result
            }
        
        return filtered_tokens
    
    def analyze_text_statistics(self, texts):
        """分析文本统计信息"""
        if not texts:
            return {}
        
        # 确保texts是列表格式
        if isinstance(texts, str):
            texts = [texts]
        
        all_tokens = []
        char_counts = []
        word_counts = []
        
        for text in texts:
            tokens = self.preprocess_pipeline(text)
            all_tokens.extend(tokens)
            char_counts.append(len(text))
            word_counts.append(len(tokens))
        
        # 计算统计信息
        stats = {
            'total_documents': len(texts),
            'total_characters': sum(char_counts),
            'total_words': sum(word_counts),
            'avg_char_per_doc': np.mean(char_counts),
            'avg_word_per_doc': np.mean(word_counts),
            'vocabulary_size': len(set(all_tokens)),
            'most_common_words': Counter(all_tokens).most_common(10)
        }
        
        return stats

# 演示文本预处理
preprocessor = TextPreprocessor()

# 示例文本
sample_texts = [
    "这是一个很好的例子!我们来看看NLP的文本预处理效果。😊",
    "今天天气不错,适合出去走走。但是工作还有很多。",
    "人工智能技术正在快速发展,特别是在自然语言处理领域。",
    "机器学习和深度学习是AI的重要分支。",
    "文本分类、情感分析、机器翻译等都是NLP的经典应用。"
]

print("=== 文本预处理演示 ===\n")

# 对每个文本进行预处理
for i, text in enumerate(sample_texts, 1):
    print(f"原始文本 {i}: {text}")
    
    # 基础清洗
    cleaned = preprocessor.clean_text(text)
    print(f"清洗后: {cleaned}")
    
    # 分词
    tokens = preprocessor.tokenize_chinese(cleaned)
    print(f"分词结果: {tokens}")
    
    # 移除停用词
    filtered = preprocessor.remove_stopwords(tokens)
    print(f"移除停用词: {filtered}")
    
    # 词性标注
    pos_tags = preprocessor.pos_tagging(cleaned)
    print(f"词性标注: {pos_tags[:5]}...")  # 只显示前5个
    
    print("-" * 50)

# 统计分析
stats = preprocessor.analyze_text_statistics(sample_texts)
print("\n=== 文本统计分析 ===")
for key, value in stats.items():
    print(f"{key}: {value}")

1.3 语言模型基础

语言模型是NLP的核心概念,它试图估计一个句子出现的概率,或者给定前面的词汇,预测下一个词汇的概率。

📊 N-gram语言模型
import math
from collections import defaultdict, Counter

class NgramLanguageModel:
    """N-gram语言模型实现"""
    
    def __init__(self, n=2):
        self.n = n
        self.ngram_counts = defaultdict(int)
        self.context_counts = defaultdict(int)
        self.vocabulary = set()
        
    def train(self, texts):
        """训练N-gram模型"""
        print(f"开始训练 {self.n}-gram 语言模型...")
        
        for text in texts:
            # 文本预处理
            tokens = preprocessor.preprocess_pipeline(text)
            
            if len(tokens) < self.n:
                continue
                
            # 添加开始和结束标记
            tokens = ['<START>'] * (self.n - 1) + tokens + ['<END>']
            
            # 更新词汇表
            self.vocabulary.update(tokens)
            
            # 统计n-gram
            for i in range(len(tokens) - self.n + 1):
                ngram = tuple(tokens[i:i + self.n])
                context = ngram[:-1] if self.n > 1 else ()
                
                self.ngram_counts[ngram] += 1
                if context:
                    self.context_counts[context] += 1
        
        print(f"训练完成!词汇表大小: {len(self.vocabulary)}")
        print(f"N-gram数量: {len(self.ngram_counts)}")
    
    def get_probability(self, ngram):
        """计算n-gram概率"""
        if isinstance(ngram, str):
            ngram = tuple(ngram.split())
        
        if len(ngram) != self.n:
            return 0.0
        
        ngram_count = self.ngram_counts[ngram]
        
        if self.n == 1:
            # Unigram概率
            total_count = sum(self.ngram_counts.values())
            return ngram_count / total_count if total_count > 0 else 0.0
        else:
            # 条件概率 P(w_n | w_1, ..., w_{n-1})
            context = ngram[:-1]
            context_count = self.context_counts[context]
            return ngram_count / context_count if context_count > 0 else 0.0
    
    def predict_next_word(self, context, top_k=5):
        """预测下一个词"""
        if isinstance(context, str):
            context = tuple(context.split())
        
        if len(context) != self.n - 1:
            return []
        
        # 找出所有以该上下文开始的n-gram
        candidates = []
        for ngram, count in self.ngram_counts.items():
            if ngram[:-1] == context:
                word = ngram[-1]
                prob = self.get_probability(ngram)
                candidates.append((word, prob))
        
        # 按概率排序
        candidates.sort(key=lambda x: x[1], reverse=True)
        
        return candidates[:top_k]
    
    def calculate_perplexity(self, test_texts):
        """计算困惑度"""
        total_log_prob = 0.0
        total_words = 0
        
        for text in test_texts:
            tokens = preprocessor.preprocess_pipeline(text)
            if len(tokens) < self.n:
                continue
            
            tokens = ['<START>'] * (self.n - 1) + tokens + ['<END>']
            
            for i in range(len(tokens) - self.n + 1):
                ngram = tuple(tokens[i:i + self.n])
                prob = self.get_probability(ngram)
                
                if prob > 0:
                    total_log_prob += math.log2(prob)
                    total_words += 1
                else:
                    # 处理未见过的n-gram(简单平滑)
                    total_log_prob += math.log2(1e-10)
                    total_words += 1
        
        if total_words == 0:
            return float('inf')
        
        avg_log_prob = total_log_prob / total_words
        perplexity = 2 ** (-avg_log_prob)
        
        return perplexity
    
    def generate_text(self, start_context, max_length=20):
        """生成文本"""
        if isinstance(start_context, str):
            start_context = start_context.split()
        
        if len(start_context) != self.n - 1:
            return "上下文长度不匹配"
        
        generated = list(start_context)
        current_context = tuple(start_context)
        
        for _ in range(max_length):
            predictions = self.predict_next_word(current_context, top_k=3)
            if not predictions or predictions[0][0] == '<END>':
                break
            
            # 选择概率最高的词(可以改为随机选择)
            next_word = predictions[0][0]
            generated.append(next_word)
            
            # 更新上下文
            current_context = tuple(generated[-(self.n-1):])
        
        return ' '.join(generated)

# 训练和测试N-gram语言模型
training_texts = [
    "机器学习是人工智能的重要分支",
    "深度学习正在改变世界",
    "自然语言处理让计算机理解文本",
    "词嵌入技术将词汇转换为向量",
    "神经网络模型在语言任务中表现出色",
    "文本分类是自然语言处理的基础任务",
    "情感分析可以判断文本的情感倾向",
    "机器翻译帮助人们跨越语言障碍",
    "语言模型能够预测下一个词汇",
    "预训练模型在各种NLP任务中都很有效"
]

# 训练不同的N-gram模型
models = {}
for n in [1, 2, 3]:
    print(f"\n=== 训练 {n}-gram 模型 ===")
    model = NgramLanguageModel(n=n)
    model.train(training_texts)
    models[n] = model
    
    # 测试概率计算
    if n == 2:
        test_bigrams = [
            ('机器', '学习'),
            ('自然', '语言'),
            ('深度', '学习'),
            ('文本', '分类')
        ]
        
        print(f"\n{n}-gram 概率示例:")
        for bigram in test_bigrams:
            prob = model.get_probability(bigram)
            print(f"P({bigram[1]} | {bigram[0]}) = {prob:.4f}")
    
    # 测试文本生成
    if n == 2:
        print(f"\n{n}-gram 文本生成示例:")
        start_contexts = ['机器', '自然', '深度']
        for context in start_contexts:
            generated = model.generate_text([context], max_length=10)
            print(f"起始: {context} -> {generated}")

# 计算困惑度比较
test_texts = [
    "人工智能技术快速发展",
    "机器学习算法不断进步",
    "自然语言处理应用广泛"
]

print(f"\n=== 模型困惑度比较 ===")
for n, model in models.items():
    perplexity = model.calculate_perplexity(test_texts)
    print(f"{n}-gram 模型困惑度: {perplexity:.2f}")

通过这个第一章,我们建立了NLP的基础理论框架,理解了文本预处理的重要性,并实现了基础的N-gram语言模型。接下来我们将深入学习词嵌入技术,这是现代NLP的核心技术之一。

🔤 第二章:词嵌入技术深度解析

词嵌入(Word Embedding)是将词汇转换为稠密向量表示的技术,是现代NLP的基石。它解决了传统one-hot编码的稀疏性问题,让AI能够理解词汇之间的语义关系。

2.1 从One-hot到词嵌入的演进

🎯 传统词汇表示方法的局限性
import numpy as np
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots

class WordRepresentationEvolution:
    """词汇表示方法演进演示"""
    
    def __init__(self):
        self.vocab = ['国王', '王后', '男人', '女人', '苹果', '橙子', '汽车', '飞机']
        self.vocab_size = len(self.vocab)
        
    def demonstrate_onehot(self):
        """演示One-hot编码"""
        print("=== One-hot编码演示 ===\n")
        
        # 创建One-hot编码
        onehot_matrix = np.eye(self.vocab_size)
        
        print("词汇表:", self.vocab)
        print("\nOne-hot编码矩阵:")
        print("词汇    ", end="")
        for i in range(self.vocab_size):
            print(f"{i:3d}", end="")
        print()
        
        for i, word in enumerate(self.vocab):
            print(f"{word:4s} [{' '.join(f'{int(x):2d}' for x in onehot_matrix[i])}]")
        
        # 计算词汇间的相似度
        print(f"\n词汇间余弦相似度(One-hot):")
        print("所有词汇对的相似度都是 0.0(正交)")
        
        # One-hot的问题
        problems = [
            "1. 稀疏性:每个向量只有一个1,其余都是0",
            "2. 高维度:词汇表大小等于向量维度",
            "3. 无语义:无法表示词汇间的语义关系",
            "4. 存储效率低:大量零值占用空间"
        ]
        
        print(f"\nOne-hot编码的问题:")
        for problem in problems:
            print(f"  {problem}")
            
        return onehot_matrix
    
    def create_sample_embeddings(self):
        """创建示例词嵌入"""
        # 手工设计的词嵌入(体现语义关系)
        embeddings = {
            '国王': [0.8, 0.2, 0.1, 0.9],    # 男性、权力
            '王后': [0.2, 0.8, 0.1, 0.9],    # 女性、权力  
            '男人': [0.9, 0.1, 0.5, 0.3],    # 男性、普通
            '女人': [0.1, 0.9, 0.5, 0.3],    # 女性、普通
            '苹果': [0.5, 0.5, 0.9, 0.1],    # 中性、食物
            '橙子': [0.5, 0.5, 0.8, 0.1],    # 中性、食物
            '汽车': [0.6, 0.4, 0.2, 0.8],    # 略男性、交通工具
            '飞机': [0.5, 0.5, 0.1, 0.9]     # 中性、交通工具
        }
        
        return embeddings
    
    def demonstrate_embeddings(self):
        """演示词嵌入的优势"""
        print("\n=== 词嵌入演示(4维示例)===\n")
        
        embeddings = self.create_sample_embeddings()
        
        print("维度含义: [男性度, 女性度, 生活性, 权力度]")
        print("\n词汇嵌入向量:")
        for word, vector in embeddings.items():
            print(f"{word:4s} [{', '.join(f'{x:.1f}' for x in vector)}]")
        
        # 计算相似度
        def cosine_similarity(v1, v2):
            dot_product = np.dot(v1, v2)
            norm1 = np.linalg.norm(v1)
            norm2 = np.linalg.norm(v2)
            return dot_product / (norm1 * norm2)
        
        print(f"\n词汇间余弦相似度(词嵌入):")
        
        # 展示一些有趣的相似度对比
        interesting_pairs = [
            ('国王', '王后'),
            ('男人', '女人'), 
            ('苹果', '橙子'),
            ('汽车', '飞机'),
            ('国王', '男人'),
            ('王后', '女人'),
            ('国王', '苹果')
        ]
        
        for word1, word2 in interesting_pairs:
            sim = cosine_similarity(embeddings[word1], embeddings[word2])
            print(f"{word1}{word2}: {sim:.3f}")
        
        # 词汇关系分析
        print(f"\n语义关系分析:")
        
        # 国王 - 男人 + 女人 ≈ 王后
        king_vec = np.array(embeddings['国王'])
        man_vec = np.array(embeddings['男人'])
        woman_vec = np.array(embeddings['女人'])
        queen_vec = np.array(embeddings['王后'])
        
        analogy_result = king_vec - man_vec + woman_vec
        similarity_to_queen = cosine_similarity(analogy_result, queen_vec)
        
        print(f"国王 - 男人 + 女人 ≈ 王后")
        print(f"计算结果与王后的相似度: {similarity_to_queen:.3f}")
        
        return embeddings
    
    def visualize_embeddings(self, embeddings):
        """可视化词嵌入"""
        # 提取词汇和向量
        words = list(embeddings.keys())
        vectors = np.array(list(embeddings.values()))
        
        # PCA降维到2D
        pca = PCA(n_components=2)
        vectors_2d = pca.fit_transform(vectors)
        
        # 创建可视化
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 6))
        
        # 原始4维向量的前两个维度
        ax1.scatter(vectors[:, 0], vectors[:, 1], s=100, alpha=0.7)
        for i, word in enumerate(words):
            ax1.annotate(word, (vectors[i, 0], vectors[i, 1]), 
                        xytext=(5, 5), textcoords='offset points',
                        fontsize=12, ha='left')
        ax1.set_xlabel('维度1 (男性度)')
        ax1.set_ylabel('维度2 (女性度)')
        ax1.set_title('词嵌入可视化 - 原始维度')
        ax1.grid(True, alpha=0.3)
        
        # PCA降维后的2D可视化
        ax2.scatter(vectors_2d[:, 0], vectors_2d[:, 1], s=100, alpha=0.7)
        for i, word in enumerate(words):
            ax2.annotate(word, (vectors_2d[i, 0], vectors_2d[i, 1]), 
                        xytext=(5, 5), textcoords='offset points',
                        fontsize=12, ha='left')
        ax2.set_xlabel('PCA第1主成分')
        ax2.set_ylabel('PCA第2主成分')
        ax2.set_title('词嵌入可视化 - PCA降维')
        ax2.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.savefig('word_embeddings_visualization.png', dpi=300, bbox_inches='tight')
        plt.show()
        
        # 创建交互式3D可视化
        if vectors.shape[1] >= 3:
            fig_3d = go.Figure(data=[go.Scatter3d(
                x=vectors[:, 0],
                y=vectors[:, 1], 
                z=vectors[:, 2],
                mode='markers+text',
                text=words,
                textposition="middle right",
                marker=dict(
                    size=10,
                    color=np.arange(len(words)),
                    colorscale='viridis',
                    showscale=True
                )
            )])
            
            fig_3d.update_layout(
                title='词嵌入3D可视化',
                scene=dict(
                    xaxis_title='维度1 (男性度)',
                    yaxis_title='维度2 (女性度)',
                    zaxis_title='维度3 (生活性)'
                )
            )
            
            fig_3d.show()

# 演示词汇表示的演进
demo = WordRepresentationEvolution()

print("词汇表示方法演进演示")
print("=" * 50)

# One-hot编码演示
onehot_matrix = demo.demonstrate_onehot()

# 词嵌入演示
embeddings = demo.demonstrate_embeddings()

# 可视化对比
demo.visualize_embeddings(embeddings)

2.2 Word2Vec:开启词嵌入新时代

Word2Vec是Google在2013年提出的词嵌入技术,通过神经网络学习词汇的分布式表示,包含两种主要架构:Skip-gram和CBOW。

⚡ Skip-gram模型实现
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict, Counter
import random
from sklearn.manifold import TSNE
import warnings
warnings.filterwarnings('ignore')

class Word2VecSkipGram:
    """Word2Vec Skip-gram模型实现"""
    
    def __init__(self, embedding_dim=100, window_size=5, negative_samples=5, learning_rate=0.025):
        self.embedding_dim = embedding_dim
        self.window_size = window_size
        self.negative_samples = negative_samples
        self.learning_rate = learning_rate
        
        # 词汇表和映射
        self.word2idx = {}
        self.idx2word = {}
        self.vocab_size = 0
        
        # 嵌入矩阵
        self.W_input = None   # 输入词嵌入矩阵
        self.W_output = None  # 输出词嵌入矩阵
        
        # 训练统计
        self.loss_history = []
        
    def build_vocabulary(self, texts):
        """构建词汇表"""
        print("构建词汇表...")
        
        # 统计词频
        word_counts = Counter()
        for text in texts:
            tokens = preprocessor.preprocess_pipeline(text)
            word_counts.update(tokens)
        
        # 过滤低频词
        min_count = 2
        filtered_words = [word for word, count in word_counts.items() if count >= min_count]
        
        # 创建词汇映射
        self.word2idx = {word: idx for idx, word in enumerate(filtered_words)}
        self.idx2word = {idx: word for word, idx in self.word2idx.items()}
        self.vocab_size = len(filtered_words)
        
        print(f"词汇表大小: {self.vocab_size}")
        print(f"最高频词汇: {word_counts.most_common(10)}")
        
        return word_counts
    
    def initialize_embeddings(self):
        """初始化嵌入矩阵"""
        print("初始化嵌入矩阵...")
        
        # Xavier初始化
        bound = np.sqrt(6.0 / (self.vocab_size + self.embedding_dim))
        self.W_input = np.random.uniform(-bound, bound, (self.vocab_size, self.embedding_dim))
        self.W_output = np.random.uniform(-bound, bound, (self.vocab_size, self.embedding_dim))
        
        print(f"输入嵌入矩阵形状: {self.W_input.shape}")
        print(f"输出嵌入矩阵形状: {self.W_output.shape}")
    
    def generate_training_data(self, texts):
        """生成训练数据(中心词,上下文词)对"""
        print("生成训练数据...")
        
        training_data = []
        
        for text in texts:
            tokens = preprocessor.preprocess_pipeline(text)
            # 过滤掉不在词汇表中的词
            tokens = [token for token in tokens if token in self.word2idx]
            
            if len(tokens) < 2:
                continue
            
            # 为每个词生成上下文
            for i, center_word in enumerate(tokens):
                # 定义上下文窗口
                start = max(0, i - self.window_size)
                end = min(len(tokens), i + self.window_size + 1)
                
                # 收集上下文词
                for j in range(start, end):
                    if i != j:  # 排除中心词本身
                        context_word = tokens[j]
                        training_data.append((center_word, context_word))
        
        print(f"生成训练样本数: {len(training_data)}")
        return training_data
    
    def negative_sampling(self, positive_word_idx):
        """负采样"""
        negative_samples = []
        
        # 简单随机采样(实际应用中应该按词频采样)
        while len(negative_samples) < self.negative_samples:
            neg_idx = random.randint(0, self.vocab_size - 1)
            if neg_idx != positive_word_idx:
                negative_samples.append(neg_idx)
        
        return negative_samples
    
    def sigmoid(self, x):
        """Sigmoid激活函数"""
        # 防止溢出
        x = np.clip(x, -500, 500)
        return 1.0 / (1.0 + np.exp(-x))
    
    def train_step(self, center_word, context_word):
        """单步训练"""
        center_idx = self.word2idx[center_word]
        context_idx = self.word2idx[context_word]
        
        # 获取中心词的嵌入向量
        center_embedding = self.W_input[center_idx]
        
        # 正样本训练
        context_output = self.W_output[context_idx]
        score = np.dot(center_embedding, context_output)
        prob = self.sigmoid(score)
        
        # 计算梯度
        grad = (1 - prob) * self.learning_rate
        
        # 更新中心词嵌入
        self.W_input[center_idx] += grad * context_output
        # 更新上下文词输出嵌入
        self.W_output[context_idx] += grad * center_embedding
        
        # 负样本训练
        negative_samples = self.negative_sampling(context_idx)
        for neg_idx in negative_samples:
            neg_output = self.W_output[neg_idx]
            neg_score = np.dot(center_embedding, neg_output)
            neg_prob = self.sigmoid(neg_score)
            
            # 负样本梯度
            neg_grad = -neg_prob * self.learning_rate
            
            # 更新嵌入
            self.W_input[center_idx] += neg_grad * neg_output
            self.W_output[neg_idx] += neg_grad * center_embedding
        
        # 计算损失(简化版)
        loss = -np.log(prob) - sum(np.log(1 - self.sigmoid(np.dot(center_embedding, self.W_output[neg_idx]))) 
                                 for neg_idx in negative_samples)
        
        return loss
    
    def train(self, texts, epochs=10):
        """训练Word2Vec模型"""
        print("开始训练Word2Vec模型...")
        
        # 构建词汇表
        word_counts = self.build_vocabulary(texts)
        
        # 初始化嵌入矩阵
        self.initialize_embeddings()
        
        # 生成训练数据
        training_data = self.generate_training_data(texts)
        
        # 训练循环
        for epoch in range(epochs):
            total_loss = 0.0
            random.shuffle(training_data)
            
            for i, (center_word, context_word) in enumerate(training_data):
                loss = self.train_step(center_word, context_word)
                total_loss += loss
                
                # 打印进度
                if i % 1000 == 0:
                    print(f"Epoch {epoch+1}/{epochs}, Step {i}/{len(training_data)}, "
                          f"Loss: {loss:.4f}")
            
            avg_loss = total_loss / len(training_data)
            self.loss_history.append(avg_loss)
            print(f"Epoch {epoch+1} 完成,平均损失: {avg_loss:.4f}")
            
            # 动态调整学习率
            self.learning_rate *= 0.9
    
    def get_word_vector(self, word):
        """获取词向量"""
        if word in self.word2idx:
            return self.W_input[self.word2idx[word]]
        else:
            return None
    
    def find_similar_words(self, word, top_k=10):
        """找到相似词汇"""
        if word not in self.word2idx:
            return []
        
        word_vector = self.get_word_vector(word)
        similarities = []
        
        for other_word in self.word2idx:
            if other_word != word:
                other_vector = self.get_word_vector(other_word)
                # 计算余弦相似度
                similarity = np.dot(word_vector, other_vector) / (
                    np.linalg.norm(word_vector) * np.linalg.norm(other_vector)
                )
                similarities.append((other_word, similarity))
        
        # 按相似度排序
        similarities.sort(key=lambda x: x[1], reverse=True)
        
        return similarities[:top_k]
    
    def word_analogy(self, word_a, word_b, word_c, top_k=5):
        """词汇类比:A之于B,如C之于?"""
        if not all(word in self.word2idx for word in [word_a, word_b, word_c]):
            return []
        
        # 计算类比向量:B - A + C
        vec_a = self.get_word_vector(word_a)
        vec_b = self.get_word_vector(word_b)
        vec_c = self.get_word_vector(word_c)
        
        analogy_vector = vec_b - vec_a + vec_c
        
        # 找到最相似的词
        similarities = []
        for word in self.word2idx:
            if word not in [word_a, word_b, word_c]:
                word_vector = self.get_word_vector(word)
                similarity = np.dot(analogy_vector, word_vector) / (
                    np.linalg.norm(analogy_vector) * np.linalg.norm(word_vector)
                )
                similarities.append((word, similarity))
        
        similarities.sort(key=lambda x: x[1], reverse=True)
        return similarities[:top_k]
    
    def visualize_embeddings(self, words_to_plot=None):
        """可视化词嵌入"""
        if words_to_plot is None:
            # 选择一些高频词进行可视化
            words_to_plot = list(self.word2idx.keys())[:20]
        
        # 过滤存在的词
        words_to_plot = [word for word in words_to_plot if word in self.word2idx]
        
        if len(words_to_plot) < 2:
            print("没有足够的词汇进行可视化")
            return
        
        # 获取词向量
        vectors = np.array([self.get_word_vector(word) for word in words_to_plot])
        
        # 使用t-SNE降维
        tsne = TSNE(n_components=2, random_state=42, perplexity=min(30, len(words_to_plot)-1))
        vectors_2d = tsne.fit_transform(vectors)
        
        # 绘制可视化图
        plt.figure(figsize=(12, 8))
        plt.scatter(vectors_2d[:, 0], vectors_2d[:, 1], alpha=0.7, s=100)
        
        for i, word in enumerate(words_to_plot):
            plt.annotate(word, (vectors_2d[i, 0], vectors_2d[i, 1]), 
                        xytext=(5, 5), textcoords='offset points',
                        fontsize=10, ha='left')
        
        plt.title('Word2Vec词嵌入可视化 (t-SNE)')
        plt.xlabel('t-SNE维度1')
        plt.ylabel('t-SNE维度2')
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.savefig('word2vec_visualization.png', dpi=300, bbox_inches='tight')
        plt.show()
    
    def plot_training_loss(self):
        """绘制训练损失曲线"""
        if not self.loss_history:
            print("没有训练历史数据")
            return
        
        plt.figure(figsize=(10, 6))
        plt.plot(self.loss_history)
        plt.title('Word2Vec训练损失曲线')
        plt.xlabel('Epoch')
        plt.ylabel('Average Loss')
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.savefig('word2vec_training_loss.png', dpi=300, bbox_inches='tight')
        plt.show()

# 使用示例
print("=== Word2Vec Skip-gram 模型训练 ===\n")

# 准备训练数据
training_texts = [
    "机器学习是人工智能的重要分支,它让计算机能够自动学习和改进",
    "深度学习是机器学习的一个子领域,使用神经网络模型",
    "自然语言处理让计算机能够理解和生成人类语言",
    "词嵌入技术将词汇转换为数值向量表示",
    "卷积神经网络在图像识别任务中表现出色",
    "循环神经网络适合处理序列数据",
    "注意力机制提高了模型的性能",
    "预训练模型在各种任务中都很有效",
    "文本分类是自然语言处理的基础任务",
    "情感分析可以判断文本的情感倾向",
    "机器翻译帮助人们跨越语言障碍",
    "语音识别将声音转换为文本",
    "计算机视觉让机器能够理解图像",
    "数据挖掘从大量数据中发现有用信息",
    "人工智能正在改变我们的世界",
    "算法优化提高了计算效率",
    "数据科学结合了统计学和计算机科学",
    "机器人技术集成了多个学科的知识",
    "云计算提供了强大的计算资源",
    "大数据分析帮助企业做出更好的决策"
]

# 创建和训练模型
model = Word2VecSkipGram(embedding_dim=50, window_size=3, negative_samples=3, learning_rate=0.1)
model.train(training_texts, epochs=20)

# 绘制训练损失
model.plot_training_loss()

# 测试词汇相似度
test_words = ['机器', '学习', '神经', '网络', '数据']
print("\n=== 词汇相似度测试 ===")
for word in test_words:
    if word in model.word2idx:
        similar_words = model.find_similar_words(word, top_k=5)
        print(f"\n与 '{word}' 最相似的词汇:")
        for similar_word, similarity in similar_words:
            print(f"  {similar_word}: {similarity:.4f}")

# 测试词汇类比
print("\n=== 词汇类比测试 ===")
analogy_tests = [
    ('机器', '学习', '深度'),
    ('计算机', '视觉', '自然'),
    ('数据', '挖掘', '文本')
]

for word_a, word_b, word_c in analogy_tests:
    results = model.word_analogy(word_a, word_b, word_c, top_k=3)
    if results:
        print(f"\n{word_a} 之于 {word_b},如 {word_c} 之于:")
        for result_word, similarity in results:
            print(f"  {result_word}: {similarity:.4f}")

# 可视化词嵌入
print("\n开始可视化词嵌入...")
model.visualize_embeddings()

print("\n✅ Word2Vec Skip-gram 模型训练完成!")
🎯 CBOW模型实现

CBOW(Continuous Bag of Words)模型与Skip-gram相反,它使用上下文词汇来预测中心词。

class Word2VecCBOW:
    """Word2Vec CBOW模型实现"""
    
    def __init__(self, embedding_dim=100, window_size=5, negative_samples=5, learning_rate=0.025):
        self.embedding_dim = embedding_dim
        self.window_size = window_size
        self.negative_samples = negative_samples
        self.learning_rate = learning_rate
        
        # 继承Skip-gram的基础结构
        self.word2idx = {}
        self.idx2word = {}
        self.vocab_size = 0
        self.W_input = None
        self.W_output = None
        self.loss_history = []
        
    def build_vocabulary(self, texts):
        """构建词汇表(与Skip-gram相同)"""
        print("构建词汇表...")
        
        word_counts = Counter()
        for text in texts:
            tokens = preprocessor.preprocess_pipeline(text)
            word_counts.update(tokens)
        
        min_count = 2
        filtered_words = [word for word, count in word_counts.items() if count >= min_count]
        
        self.word2idx = {word: idx for idx, word in enumerate(filtered_words)}
        self.idx2word = {idx: word for word, idx in self.word2idx.items()}
        self.vocab_size = len(filtered_words)
        
        print(f"词汇表大小: {self.vocab_size}")
        return word_counts
    
    def initialize_embeddings(self):
        """初始化嵌入矩阵"""
        bound = np.sqrt(6.0 / (self.vocab_size + self.embedding_dim))
        self.W_input = np.random.uniform(-bound, bound, (self.vocab_size, self.embedding_dim))
        self.W_output = np.random.uniform(-bound, bound, (self.vocab_size, self.embedding_dim))
    
    def generate_training_data(self, texts):
        """生成CBOW训练数据(上下文词列表,中心词)"""
        print("生成CBOW训练数据...")
        
        training_data = []
        
        for text in texts:
            tokens = preprocessor.preprocess_pipeline(text)
            tokens = [token for token in tokens if token in self.word2idx]
            
            if len(tokens) < 3:
                continue
            
            for i, center_word in enumerate(tokens):
                # 收集上下文词
                start = max(0, i - self.window_size)
                end = min(len(tokens), i + self.window_size + 1)
                
                context_words = []
                for j in range(start, end):
                    if i != j:
                        context_words.append(tokens[j])
                
                if len(context_words) >= 2:  # 确保有足够的上下文
                    training_data.append((context_words, center_word))
        
        print(f"生成CBOW训练样本数: {len(training_data)}")
        return training_data
    
    def sigmoid(self, x):
        """Sigmoid激活函数"""
        x = np.clip(x, -500, 500)
        return 1.0 / (1.0 + np.exp(-x))
    
    def train_step(self, context_words, center_word):
        """CBOW单步训练"""
        # 获取上下文词的索引
        context_indices = [self.word2idx[word] for word in context_words if word in self.word2idx]
        center_idx = self.word2idx[center_word]
        
        if len(context_indices) == 0:
            return 0.0
        
        # 计算上下文向量的平均值
        context_embeddings = self.W_input[context_indices]
        avg_context = np.mean(context_embeddings, axis=0)
        
        # 正样本训练
        center_output = self.W_output[center_idx]
        score = np.dot(avg_context, center_output)
        prob = self.sigmoid(score)
        
        # 计算梯度
        grad = (1 - prob) * self.learning_rate
        
        # 更新上下文词嵌入
        for ctx_idx in context_indices:
            self.W_input[ctx_idx] += grad * center_output / len(context_indices)
        
        # 更新中心词输出嵌入
        self.W_output[center_idx] += grad * avg_context
        
        # 负采样
        negative_samples = self.negative_sampling(center_idx)
        for neg_idx in negative_samples:
            neg_output = self.W_output[neg_idx]
            neg_score = np.dot(avg_context, neg_output)
            neg_prob = self.sigmoid(neg_score)
            
            neg_grad = -neg_prob * self.learning_rate
            
            # 更新嵌入
            for ctx_idx in context_indices:
                self.W_input[ctx_idx] += neg_grad * neg_output / len(context_indices)
            self.W_output[neg_idx] += neg_grad * avg_context
        
        # 计算损失
        loss = -np.log(prob) - sum(np.log(1 - self.sigmoid(np.dot(avg_context, self.W_output[neg_idx]))) 
                                 for neg_idx in negative_samples)
        
        return loss
    
    def negative_sampling(self, positive_word_idx):
        """负采样"""
        negative_samples = []
        while len(negative_samples) < self.negative_samples:
            neg_idx = random.randint(0, self.vocab_size - 1)
            if neg_idx != positive_word_idx:
                negative_samples.append(neg_idx)
        return negative_samples
    
    def train(self, texts, epochs=10):
        """训练CBOW模型"""
        print("开始训练CBOW模型...")
        
        self.build_vocabulary(texts)
        self.initialize_embeddings()
        training_data = self.generate_training_data(texts)
        
        for epoch in range(epochs):
            total_loss = 0.0
            random.shuffle(training_data)
            
            for i, (context_words, center_word) in enumerate(training_data):
                loss = self.train_step(context_words, center_word)
                total_loss += loss
                
                if i % 1000 == 0:
                    print(f"CBOW Epoch {epoch+1}/{epochs}, Step {i}/{len(training_data)}, "
                          f"Loss: {loss:.4f}")
            
            avg_loss = total_loss / len(training_data)
            self.loss_history.append(avg_loss)
            print(f"CBOW Epoch {epoch+1} 完成,平均损失: {avg_loss:.4f}")
            
            self.learning_rate *= 0.9
    
    def get_word_vector(self, word):
        """获取词向量"""
        if word in self.word2idx:
            return self.W_input[self.word2idx[word]]
        return None
    
    def find_similar_words(self, word, top_k=10):
        """找到相似词汇"""
        if word not in self.word2idx:
            return []
        
        word_vector = self.get_word_vector(word)
        similarities = []
        
        for other_word in self.word2idx:
            if other_word != word:
                other_vector = self.get_word_vector(other_word)
                similarity = np.dot(word_vector, other_vector) / (
                    np.linalg.norm(word_vector) * np.linalg.norm(other_vector)
                )
                similarities.append((other_word, similarity))
        
        similarities.sort(key=lambda x: x[1], reverse=True)
        return similarities[:top_k]

# 比较Skip-gram和CBOW
print("=== Skip-gram vs CBOW 对比训练 ===\n")

# 训练CBOW模型
cbow_model = Word2VecCBOW(embedding_dim=50, window_size=3, negative_samples=3, learning_rate=0.1)
cbow_model.train(training_texts, epochs=15)

# 比较两种模型的效果
test_word = '学习'
if test_word in model.word2idx and test_word in cbow_model.word2idx:
    print(f"\n=== 模型对比:'{test_word}' 的相似词 ===")
    
    print("\nSkip-gram模型结果:")
    skipgram_similar = model.find_similar_words(test_word, top_k=5)
    for word, sim in skipgram_similar:
        print(f"  {word}: {sim:.4f}")
    
    print("\nCBOW模型结果:")
    cbow_similar = cbow_model.find_similar_words(test_word, top_k=5)
    for word, sim in cbow_similar:
        print(f"  {word}: {sim:.4f}")

print("\n✅ CBOW模型训练完成!")

2.3 GloVe:全局词汇向量

GloVe(Global Vectors for Word Representation)结合了全局统计信息和局部上下文信息,通过共现矩阵分解来学习词向量。

🌐 GloVe模型实现
import numpy as np
from scipy.sparse import coo_matrix
from collections import defaultdict
import matplotlib.pyplot as plt

class GloVeModel:
    """GloVe模型实现"""
    
    def __init__(self, embedding_dim=100, x_max=100, alpha=0.75, learning_rate=0.05):
        self.embedding_dim = embedding_dim
        self.x_max = x_max  # 权重函数的截止值
        self.alpha = alpha  # 权重函数的指数
        self.learning_rate = learning_rate
        
        # 词汇表
        self.word2idx = {}
        self.idx2word = {}
        self.vocab_size = 0
        
        # 共现矩阵
        self.cooccurrence_matrix = None
        
        # 嵌入矩阵
        self.W_main = None      # 主词向量
        self.W_context = None   # 上下文词向量
        self.b_main = None      # 主词偏置
        self.b_context = None   # 上下文词偏置
        
        # 训练历史
        self.loss_history = []
    
    def build_vocabulary(self, texts):
        """构建词汇表"""
        print("构建GloVe词汇表...")
        
        word_counts = Counter()
        for text in texts:
            tokens = preprocessor.preprocess_pipeline(text)
            word_counts.update(tokens)
        
        # 过滤低频词
        min_count = 2
        filtered_words = [word for word, count in word_counts.items() if count >= min_count]
        
        self.word2idx = {word: idx for idx, word in enumerate(filtered_words)}
        self.idx2word = {idx: word for word, idx in self.word2idx.items()}
        self.vocab_size = len(filtered_words)
        
        print(f"GloVe词汇表大小: {self.vocab_size}")
        return word_counts
    
    def build_cooccurrence_matrix(self, texts, window_size=5):
        """构建共现矩阵"""
        print("构建共现矩阵...")
        
        # 使用字典存储共现计数
        cooccur_counts = defaultdict(float)
        
        for text in texts:
            tokens = preprocessor.preprocess_pipeline(text)
            tokens = [token for token in tokens if token in self.word2idx]
            
            if len(tokens) < 2:
                continue
            
            # 计算共现
            for i, center_word in enumerate(tokens):
                center_idx = self.word2idx[center_word]
                
                # 定义窗口
                start = max(0, i - window_size)
                end = min(len(tokens), i + window_size + 1)
                
                for j in range(start, end):
                    if i != j:
                        context_word = tokens[j]
                        context_idx = self.word2idx[context_word]
                        
                        # 距离权重(离中心词越远权重越小)
                        distance = abs(i - j)
                        weight = 1.0 / distance
                        
                        cooccur_counts[(center_idx, context_idx)] += weight
        
        # 转换为稀疏矩阵格式
        rows, cols, data = [], [], []
        for (i, j), count in cooccur_counts.items():
            rows.append(i)
            cols.append(j)
            data.append(count)
        
        self.cooccurrence_matrix = coo_matrix((data, (rows, cols)), 
                                            shape=(self.vocab_size, self.vocab_size))
        
        print(f"共现矩阵非零元素数: {len(data)}")
        print(f"共现矩阵密度: {len(data) / (self.vocab_size ** 2):.6f}")
    
    def weight_function(self, x):
        """GloVe权重函数"""
        if x < self.x_max:
            return (x / self.x_max) ** self.alpha
        else:
            return 1.0
    
    def initialize_parameters(self):
        """初始化模型参数"""
        print("初始化GloVe参数...")
        
        # Xavier初始化
        bound = 0.5 / self.embedding_dim
        
        self.W_main = np.random.uniform(-bound, bound, (self.vocab_size, self.embedding_dim))
        self.W_context = np.random.uniform(-bound, bound, (self.vocab_size, self.embedding_dim))
        
        # 偏置项初始化
        self.b_main = np.random.uniform(-bound, bound, self.vocab_size)
        self.b_context = np.random.uniform(-bound, bound, self.vocab_size)
        
        print(f"参数矩阵形状: {self.W_main.shape}")
    
    def train(self, texts, epochs=50, window_size=5):
        """训练GloVe模型"""
        print("开始训练GloVe模型...")
        
        # 构建词汇表和共现矩阵
        self.build_vocabulary(texts)
        self.build_cooccurrence_matrix(texts, window_size)
        self.initialize_parameters()
        
        # 将稀疏矩阵转换为COO格式以便迭代
        cooccur_coo = self.cooccurrence_matrix.tocoo()
        
        for epoch in range(epochs):
            total_loss = 0.0
            
            # 遍历所有非零共现对
            for idx in range(len(cooccur_coo.data)):
                i = cooccur_coo.row[idx]
                j = cooccur_coo.col[idx]
                x_ij = cooccur_coo.data[idx]
                
                # 计算权重
                weight = self.weight_function(x_ij)
                
                # 前向传播
                dot_product = np.dot(self.W_main[i], self.W_context[j])
                log_x_ij = np.log(x_ij)
                
                # 计算误差
                error = dot_product + self.b_main[i] + self.b_context[j] - log_x_ij
                loss = weight * (error ** 2)
                total_loss += loss
                
                # 计算梯度
                grad_factor = weight * error * self.learning_rate
                
                # 更新主词向量
                w_main_grad = grad_factor * self.W_context[j]
                self.W_main[i] -= w_main_grad
                
                # 更新上下文词向量
                w_context_grad = grad_factor * self.W_main[i]
                self.W_context[j] -= w_context_grad
                
                # 更新偏置
                self.b_main[i] -= grad_factor
                self.b_context[j] -= grad_factor
            
            avg_loss = total_loss / len(cooccur_coo.data)
            self.loss_history.append(avg_loss)
            
            if epoch % 10 == 0:
                print(f"GloVe Epoch {epoch+1}/{epochs}, 平均损失: {avg_loss:.4f}")
            
            # 学习率衰减
            self.learning_rate *= 0.99
        
        print(f"GloVe训练完成!最终损失: {self.loss_history[-1]:.4f}")
    
    def get_word_vector(self, word):
        """获取词向量(主向量和上下文向量的和)"""
        if word in self.word2idx:
            idx = self.word2idx[word]
            return self.W_main[idx] + self.W_context[idx]
        return None
    
    def find_similar_words(self, word, top_k=10):
        """找到相似词汇"""
        if word not in self.word2idx:
            return []
        
        word_vector = self.get_word_vector(word)
        similarities = []
        
        for other_word in self.word2idx:
            if other_word != word:
                other_vector = self.get_word_vector(other_word)
                similarity = np.dot(word_vector, other_vector) / (
                    np.linalg.norm(word_vector) * np.linalg.norm(other_vector)
                )
                similarities.append((other_word, similarity))
        
        similarities.sort(key=lambda x: x[1], reverse=True)
        return similarities[:top_k]
    
    def word_analogy(self, word_a, word_b, word_c, top_k=5):
        """词汇类比"""
        if not all(word in self.word2idx for word in [word_a, word_b, word_c]):
            return []
        
        vec_a = self.get_word_vector(word_a)
        vec_b = self.get_word_vector(word_b)
        vec_c = self.get_word_vector(word_c)
        
        analogy_vector = vec_b - vec_a + vec_c
        
        similarities = []
        for word in self.word2idx:
            if word not in [word_a, word_b, word_c]:
                word_vector = self.get_word_vector(word)
                similarity = np.dot(analogy_vector, word_vector) / (
                    np.linalg.norm(analogy_vector) * np.linalg.norm(word_vector)
                )
                similarities.append((word, similarity))
        
        similarities.sort(key=lambda x: x[1], reverse=True)
        return similarities[:top_k]
    
    def plot_training_loss(self):
        """绘制训练损失曲线"""
        plt.figure(figsize=(10, 6))
        plt.plot(self.loss_history)
        plt.title('GloVe训练损失曲线')
        plt.xlabel('Epoch')
        plt.ylabel('Average Loss')
        plt.grid(True, alpha=0.3)
        plt.yscale('log')  # 使用对数刻度
        plt.tight_layout()
        plt.savefig('glove_training_loss.png', dpi=300, bbox_inches='tight')
        plt.show()
    
    def analyze_cooccurrence(self, top_k=10):
        """分析共现统计"""
        print(f"\n=== 共现矩阵分析 ===")
        
        # 转换为COO格式
        coo = self.cooccurrence_matrix.tocoo()
        
        # 找出共现频率最高的词对
        cooccur_pairs = []
        for idx in range(len(coo.data)):
            i, j, count = coo.row[idx], coo.col[idx], coo.data[idx]
            word_i = self.idx2word[i]
            word_j = self.idx2word[j]
            cooccur_pairs.append(((word_i, word_j), count))
        
        # 按共现频率排序
        cooccur_pairs.sort(key=lambda x: x[1], reverse=True)
        
        print(f"共现频率最高的{top_k}个词对:")
        for (word_i, word_j), count in cooccur_pairs[:top_k]:
            print(f"  ({word_i}, {word_j}): {count:.2f}")

# 训练GloVe模型
print("=== GloVe模型训练 ===\n")

glove_model = GloVeModel(embedding_dim=50, x_max=100, alpha=0.75, learning_rate=0.05)
glove_model.train(training_texts, epochs=100, window_size=5)

# 绘制训练损失
glove_model.plot_training_loss()

# 分析共现矩阵
glove_model.analyze_cooccurrence()

# 测试GloVe模型效果
test_words = ['机器', '学习', '深度', '神经', '网络']
print(f"\n=== GloVe模型词汇相似度测试 ===")
for word in test_words:
    if word in glove_model.word2idx:
        similar_words = glove_model.find_similar_words(word, top_k=5)
        print(f"\n与 '{word}' 最相似的词汇 (GloVe):")
        for similar_word, similarity in similar_words:
            print(f"  {similar_word}: {similarity:.4f}")

# GloVe词汇类比测试
print(f"\n=== GloVe词汇类比测试 ===")
analogy_tests = [
    ('机器', '学习', '深度'),
    ('计算机', '视觉', '自然'),
    ('数据', '挖掘', '文本')
]

for word_a, word_b, word_c in analogy_tests:
    results = glove_model.word_analogy(word_a, word_b, word_c, top_k=3)
    if results:
        print(f"\nGloVe: {word_a} 之于 {word_b},如 {word_c} 之于:")
        for result_word, similarity in results:
            print(f"  {result_word}: {similarity:.4f}")

print("\n✅ GloVe模型训练完成!")

### 2.4 FastText:考虑子词信息的词嵌入

#### 🚀 FastText的核心创新

FastText是Facebook开发的词嵌入模型,它的核心创新在于**考虑了单词的内部结构**。与Word2Vec和GloVe不同,FastText不仅考虑词汇级别的信息,还考虑**字符级n-gram**信息,这使得它能够:

1. **处理未登录词(OOV)**:通过子词信息推测新词的语义
2. **处理形态变化**:理解单词的词根、前缀、后缀
3. **提升稀有词性能**:通过子词信息增强稀有词的表示

#### 🧠 FastText模型实现

```python
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict, Counter
import re
from typing import List, Tuple, Dict, Set
import jieba
import random

class FastTextModel:
    """FastText词嵌入模型实现"""
    
    def __init__(self, embedding_dim=100, window_size=5, min_count=5, 
                 min_n=3, max_n=6, negative_samples=5, learning_rate=0.025):
        """
        初始化FastText模型
        
        Args:
            embedding_dim: 词向量维度
            window_size: 上下文窗口大小
            min_count: 最小词频
            min_n: 最小n-gram长度
            max_n: 最大n-gram长度
            negative_samples: 负采样数量
            learning_rate: 学习率
        """
        self.embedding_dim = embedding_dim
        self.window_size = window_size
        self.min_count = min_count
        self.min_n = min_n
        self.max_n = max_n
        self.negative_samples = negative_samples
        self.learning_rate = learning_rate
        
        # 初始化词汇表和映射
        self.word2idx = {}
        self.idx2word = {}
        self.word_freq = Counter()
        
        # 初始化子词映射
        self.subword2idx = {}
        self.idx2subword = {}
        
        # 词向量和子词向量
        self.word_vectors = None
        self.subword_vectors = None
        
        # 训练历史
        self.training_history = {'loss': []}
        
    def get_subwords(self, word: str) -> List[str]:
        """
        获取单词的子词列表
        
        Args:
            word: 输入单词
            
        Returns:
            子词列表
        """
        # 添加词边界标记
        word = f"<{word}>"
        subwords = []
        
        # 生成n-gram子词
        for n in range(self.min_n, min(len(word) + 1, self.max_n + 1)):
            for i in range(len(word) - n + 1):
                subword = word[i:i + n]
                subwords.append(subword)
        
        return subwords
        
    def build_vocab(self, texts: List[str]):
        """构建词汇表和子词表"""
        print("🔧 构建词汇表...")
        
        # 统计词频
        for text in texts:
            words = list(jieba.cut(text))
            for word in words:
                if len(word.strip()) > 0:
                    self.word_freq[word] += 1
        
        # 过滤低频词
        filtered_words = {word for word, freq in self.word_freq.items() 
                         if freq >= self.min_count}
        
        # 构建词汇映射
        self.word2idx = {word: idx for idx, word in enumerate(filtered_words)}
        self.idx2word = {idx: word for word, idx in self.word2idx.items()}
        
        # 构建子词表
        all_subwords = set()
        for word in filtered_words:
            subwords = self.get_subwords(word)
            all_subwords.update(subwords)
        
        self.subword2idx = {subword: idx for idx, subword in enumerate(all_subwords)}
        self.idx2subword = {idx: subword for subword, idx in self.subword2idx.items()}
        
        # 初始化词向量和子词向量
        vocab_size = len(self.word2idx)
        subword_size = len(self.subword2idx)
        
        # Xavier初始化
        self.word_vectors = np.random.normal(0, 0.1, (vocab_size, self.embedding_dim))
        self.subword_vectors = np.random.normal(0, 0.1, (subword_size, self.embedding_dim))
        
        print(f"📊 词汇表大小: {vocab_size}")
        print(f"📊 子词表大小: {subword_size}")
        
    def get_word_vector(self, word: str) -> np.ndarray:
        """
        获取单词的向量表示(结合词向量和子词向量)
        
        Args:
            word: 输入单词
            
        Returns:
            单词向量
        """
        vector = np.zeros(self.embedding_dim)
        count = 0
        
        # 如果词在词汇表中,添加词向量
        if word in self.word2idx:
            word_idx = self.word2idx[word]
            vector += self.word_vectors[word_idx]
            count += 1
        
        # 添加子词向量
        subwords = self.get_subwords(word)
        for subword in subwords:
            if subword in self.subword2idx:
                subword_idx = self.subword2idx[subword]
                vector += self.subword_vectors[subword_idx]
                count += 1
        
        # 平均化
        if count > 0:
            vector /= count
            
        return vector
        
    def negative_sampling(self, target_word: str, context_words: List[str]) -> List[str]:
        """负采样"""
        # 基于词频的负采样概率
        word_probs = np.array([self.word_freq[word] ** 0.75 
                              for word in self.word2idx.keys()])
        word_probs = word_probs / word_probs.sum()
        
        negative_words = []
        words_list = list(self.word2idx.keys())
        
        while len(negative_words) < self.negative_samples:
            neg_word = np.random.choice(words_list, p=word_probs)
            if neg_word not in context_words and neg_word != target_word:
                negative_words.append(neg_word)
                
        return negative_words
        
    def sigmoid(self, x):
        """Sigmoid激活函数"""
        x = np.clip(x, -500, 500)  # 防止溢出
        return 1 / (1 + np.exp(-x))
        
    def train_pair(self, target_word: str, context_word: str, label: int):
        """训练单个词对"""
        # 获取向量
        target_vector = self.get_word_vector(target_word)
        context_vector = self.get_word_vector(context_word)
        
        # 计算预测值
        dot_product = np.dot(target_vector, context_vector)
        prediction = self.sigmoid(dot_product)
        
        # 计算梯度
        error = label - prediction
        gradient = error * self.learning_rate
        
        # 更新向量
        target_grad = gradient * context_vector
        context_grad = gradient * target_vector
        
        # 更新目标词的词向量和子词向量
        if target_word in self.word2idx:
            target_idx = self.word2idx[target_word]
            self.word_vectors[target_idx] += target_grad
            
        target_subwords = self.get_subwords(target_word)
        for subword in target_subwords:
            if subword in self.subword2idx:
                subword_idx = self.subword2idx[subword]
                self.subword_vectors[subword_idx] += target_grad / len(target_subwords)
        
        # 更新上下文词的向量
        if context_word in self.word2idx:
            context_idx = self.word2idx[context_word]
            self.word_vectors[context_idx] += context_grad
            
        context_subwords = self.get_subwords(context_word)
        for subword in context_subwords:
            if subword in self.subword2idx:
                subword_idx = self.subword2idx[subword]
                self.subword_vectors[subword_idx] += context_grad / len(context_subwords)
        
        return -np.log(prediction + 1e-8) if label == 1 else -np.log(1 - prediction + 1e-8)
        
    def train(self, texts: List[str], epochs: int = 5):
        """训练FastText模型"""
        print("🚀 开始训练FastText模型...")
        
        # 构建词汇表
        self.build_vocab(texts)
        
        # 准备训练数据
        training_pairs = []
        for text in texts:
            words = [word for word in jieba.cut(text) 
                    if word.strip() and word in self.word2idx]
            
            # 生成正样本
            for i, target_word in enumerate(words):
                for j in range(max(0, i - self.window_size), 
                              min(len(words), i + self.window_size + 1)):
                    if i != j:
                        context_word = words[j]
                        training_pairs.append((target_word, context_word, 1))
        
        print(f"📊 训练样本数: {len(training_pairs)}")
        
        # 训练
        for epoch in range(epochs):
            epoch_loss = 0
            random.shuffle(training_pairs)
            
            for target_word, context_word, label in training_pairs:
                # 正样本训练
                loss = self.train_pair(target_word, context_word, 1)
                epoch_loss += loss
                
                # 负采样训练
                negative_words = self.negative_sampling(target_word, [context_word])
                for neg_word in negative_words:
                    loss = self.train_pair(target_word, neg_word, 0)
                    epoch_loss += loss
            
            avg_loss = epoch_loss / (len(training_pairs) * (1 + self.negative_samples))
            self.training_history['loss'].append(avg_loss)
            
            # 学习率衰减
            self.learning_rate *= 0.98
            
            if epoch % 1 == 0:
                print(f"Epoch {epoch + 1}/{epochs}, Loss: {avg_loss:.4f}, LR: {self.learning_rate:.6f}")
        
        print("✅ FastText模型训练完成!")
        
    def find_similar_words(self, word: str, top_k: int = 10) -> List[Tuple[str, float]]:
        """寻找相似词汇"""
        if not self.word_vectors:
            return []
            
        word_vector = self.get_word_vector(word)
        similarities = []
        
        for other_word in self.word2idx.keys():
            if other_word != word:
                other_vector = self.get_word_vector(other_word)
                # 计算余弦相似度
                norm_product = np.linalg.norm(word_vector) * np.linalg.norm(other_vector)
                if norm_product > 0:
                    similarity = np.dot(word_vector, other_vector) / norm_product
                    similarities.append((other_word, similarity))
        
        similarities.sort(key=lambda x: x[1], reverse=True)
        return similarities[:top_k]
        
    def word_analogy(self, word_a: str, word_b: str, word_c: str, 
                    top_k: int = 5) -> List[Tuple[str, float]]:
        """词汇类比:A之于B,如C之于?"""
        try:
            vec_a = self.get_word_vector(word_a)
            vec_b = self.get_word_vector(word_b)
            vec_c = self.get_word_vector(word_c)
            
            # 计算目标向量:vec_b - vec_a + vec_c
            target_vector = vec_b - vec_a + vec_c
            
            similarities = []
            for word in self.word2idx.keys():
                if word not in [word_a, word_b, word_c]:
                    word_vector = self.get_word_vector(word)
                    norm_product = np.linalg.norm(target_vector) * np.linalg.norm(word_vector)
                    if norm_product > 0:
                        similarity = np.dot(target_vector, word_vector) / norm_product
                        similarities.append((word, similarity))
            
            similarities.sort(key=lambda x: x[1], reverse=True)
            return similarities[:top_k]
            
        except Exception as e:
            print(f"词汇类比计算出错: {e}")
            return []
            
    def plot_training_history(self):
        """绘制训练历史"""
        if not self.training_history['loss']:
            print("没有训练历史数据")
            return
            
        plt.figure(figsize=(12, 4))
        
        # 训练损失
        plt.subplot(1, 2, 1)
        plt.plot(self.training_history['loss'], 'b-', linewidth=2)
        plt.title('FastText训练损失', fontsize=14, fontweight='bold')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.grid(True, alpha=0.3)
        
        # 学习率变化
        plt.subplot(1, 2, 2)
        initial_lr = 0.025
        decay_rate = 0.98
        epochs = len(self.training_history['loss'])
        lr_values = [initial_lr * (decay_rate ** epoch) for epoch in range(epochs)]
        plt.plot(lr_values, 'r-', linewidth=2)
        plt.title('学习率衰减', fontsize=14, fontweight='bold')
        plt.xlabel('Epoch')
        plt.ylabel('Learning Rate')
        plt.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
    def analyze_subwords(self, word: str):
        """分析单词的子词结构"""
        if word not in self.word2idx:
            print(f"词汇 '{word}' 不在词汇表中")
            return
            
        subwords = self.get_subwords(word)
        print(f"\n🔍 单词 '{word}' 的子词分析:")
        print(f"原始单词: {word}")
        print(f"添加边界标记: <{word}>")
        print(f"子词列表 ({len(subwords)}个):")
        
        for i, subword in enumerate(subwords, 1):
            in_vocab = "✓" if subword in self.subword2idx else "✗"
            print(f"  {i:2d}. {subword:10s} {in_vocab}")
            
        # 计算子词贡献的向量
        word_vector = self.get_word_vector(word)
        print(f"\n📊 最终词向量维度: {word_vector.shape}")
        print(f"📊 词向量范数: {np.linalg.norm(word_vector):.4f}")

# 演示FastText模型
print("=" * 60)
print("🚀 FastText词嵌入模型演示")
print("=" * 60)

# 准备训练数据
training_texts = [
    "机器学习是人工智能的核心技术",
    "深度学习使用神经网络进行训练",
    "自然语言处理研究人机交互",
    "计算机视觉分析图像和视频",
    "数据挖掘从大数据中发现模式",
    "算法优化提高计算效率",
    "模式识别用于分类和预测",
    "人工智能改变世界的发展",
    "机器人技术结合多种学科",
    "智能系统具有自主决策能力",
    "深度神经网络学习复杂特征",
    "卷积神经网络处理图像数据",
    "递归神经网络处理序列数据",
    "强化学习通过奖励机制训练",
    "无监督学习发现数据结构",
    "监督学习使用标记数据训练",
    "特征工程提取有用信息",
    "模型评估验证算法性能",
    "交叉验证避免过拟合问题",
    "正则化技术控制模型复杂度"
]

# 创建和训练FastText模型
fasttext_model = FastTextModel(
    embedding_dim=50, 
    window_size=3, 
    min_count=2,
    min_n=2,
    max_n=4,
    negative_samples=3,
    learning_rate=0.01
)

fasttext_model.train(training_texts, epochs=10)

# 绘制训练历史
fasttext_model.plot_training_history()

# 分析子词结构
print(f"\n=== FastText子词分析 ===")
test_subword_words = ['机器学习', '深度', '神经网络']
for word in test_subword_words:
    fasttext_model.analyze_subwords(word)

# 测试FastText模型效果
test_words = ['机器', '学习', '深度', '神经', '网络']
print(f"\n=== FastText模型词汇相似度测试 ===")
for word in test_words:
    similar_words = fasttext_model.find_similar_words(word, top_k=5)
    if similar_words:
        print(f"\n与 '{word}' 最相似的词汇 (FastText):")
        for similar_word, similarity in similar_words:
            print(f"  {similar_word}: {similarity:.4f}")

# FastText词汇类比测试
print(f"\n=== FastText词汇类比测试 ===")
analogy_tests = [
    ('机器', '学习', '深度'),
    ('计算机', '视觉', '自然'),
    ('数据', '挖掘', '文本')
]

for word_a, word_b, word_c in analogy_tests:
    results = fasttext_model.word_analogy(word_a, word_b, word_c, top_k=3)
    if results:
        print(f"\nFastText: {word_a} 之于 {word_b},如 {word_c} 之于:")
        for result_word, similarity in results:
            print(f"  {result_word}: {similarity:.4f}")

# 测试未登录词处理能力
print(f"\n=== FastText未登录词处理测试 ===")
oov_words = ['深度学', '机器习', '智能化']
for oov_word in oov_words:
    vector = fasttext_model.get_word_vector(oov_word)
    print(f"未登录词 '{oov_word}' 向量范数: {np.linalg.norm(vector):.4f}")
    
    # 寻找相似词
    similar_words = fasttext_model.find_similar_words(oov_word, top_k=3)
    if similar_words:
        print(f"  与 '{oov_word}' 最相似的词汇:")
        for similar_word, similarity in similar_words:
            print(f"    {similar_word}: {similarity:.4f}")

print("\n✅ FastText模型演示完成!")

### 2.5 三种词嵌入方法对比分析

#### 📊 模型特点对比

```python
import pandas as pd

def compare_embedding_methods():
    """对比三种词嵌入方法"""
    
    # 创建对比表格
    comparison_data = {
        '特征': ['训练方式', '向量维度', '语义理解', '计算复杂度', '内存占用', 
                '未登录词', '训练速度', '适用场景'],
        'Word2Vec': [
            '局部上下文', '可调节', '较好', '中等', '中等', 
            '无法处理', '较快', '通用文本'
        ],
        'GloVe': [
            '全局+局部', '可调节', '很好', '较高', '较高', 
            '无法处理', '较慢', '大规模语料'
        ],
        'FastText': [
            '子词+上下文', '可调节', '较好', '较高', '最高', 
            '可以处理', '中等', '形态丰富语言'
        ]
    }
    
    df = pd.DataFrame(comparison_data)
    print("📊 三种词嵌入方法对比:")
    print(df.to_string(index=False))
    
    # 优缺点分析
    methods_analysis = {
        'Word2Vec': {
            '优点': [
                '训练速度快',
                '模型简单易理解',
                '效果稳定可靠',
                '支持Skip-gram和CBOW两种架构'
            ],
            '缺点': [
                '无法处理未登录词',
                '忽略了全局统计信息',
                '对稀有词效果不佳'
            ]
        },
        'GloVe': {
            '优点': [
                '结合全局和局部信息',
                '语义表示更丰富',
                '理论基础扎实',
                '在词汇类比任务上表现优秀'
            ],
            '缺点': [
                '训练时间长',
                '内存消耗大',
                '无法处理未登录词'
            ]
        },
        'FastText': {
            '优点': [
                '可处理未登录词',
                '适合形态变化丰富的语言',
                '对稀有词效果好',
                '考虑子词信息'
            ],
            '缺点': [
                '模型复杂度高',
                '内存占用最大',
                '训练时间较长'
            ]
        }
    }
    
    print(f"\n🎯 详细优缺点分析:")
    for method, analysis in methods_analysis.items():
        print(f"\n{method}:")
        print(f"  ✅ 优点:")
        for advantage in analysis['优点']:
            print(f"    • {advantage}")
        print(f"  ❌ 缺点:")
        for disadvantage in analysis['缺点']:
            print(f"    • {disadvantage}")

# 运行对比分析
compare_embedding_methods()

# 性能测试对比
def performance_comparison():
    """性能对比测试"""
    print(f"\n🏃‍♂️ 性能对比测试:")
    
    # 模拟性能数据
    performance_data = {
        '指标': ['训练时间(秒)', '内存占用(MB)', '相似度准确率(%)', '类比准确率(%)', '未登录词处理'],
        'Word2Vec': [120, 256, 78.5, 65.2, '❌'],
        'GloVe': [280, 512, 82.3, 71.8, '❌'],
        'FastText': [180, 768, 79.1, 67.5, '✅']
    }
    
    df_performance = pd.DataFrame(performance_data)
    print(df_performance.to_string(index=False))
    
    # 推荐使用场景
    print(f"\n💡 使用场景推荐:")
    scenarios = {
        'Word2Vec': [
            '快速原型开发',
            '计算资源有限',
            '英文文本处理',
            '通用语义理解任务'
        ],
        'GloVe': [
            '大规模语料库',
            '需要高质量词向量',
            '词汇类比任务',
            '学术研究项目'
        ],
        'FastText': [
            '多语言文本处理',
            '形态变化丰富的语言',
            '需要处理未登录词',
            '专业领域文本'
        ]
    }
    
    for method, uses in scenarios.items():
        print(f"\n{method} 适用场景:")
        for use_case in uses:
            print(f"  🎯 {use_case}")

performance_comparison()

🎯 第三章:文本分类实战项目

3.1 新闻分类系统设计

🏗️ 系统架构设计

在前面我们学习了各种词嵌入技术,现在让我们将这些技术应用到实际项目中。我们将构建一个新闻分类系统,能够自动将新闻文章分类到不同的类别中。

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.naive_bayes import MultinomialNB
import jieba
import jieba.posseg as pseg
from collections import Counter, defaultdict
import warnings
warnings.filterwarnings('ignore')

class NewsClassificationSystem:
    """新闻分类系统"""
    
    def __init__(self, embedding_model=None, classifier_type='logistic'):
        """
        初始化新闻分类系统
        
        Args:
            embedding_model: 词嵌入模型 (Word2Vec, GloVe, FastText等)
            classifier_type: 分类器类型 ('logistic', 'rf', 'nb')
        """
        self.embedding_model = embedding_model
        self.classifier_type = classifier_type
        self.classifier = None
        self.label_encoder = {}
        self.feature_names = []
        
        # 分类器映射
        self.classifiers = {
            'logistic': LogisticRegression(random_state=42, max_iter=1000),
            'rf': RandomForestClassifier(random_state=42, n_estimators=100),
            'nb': MultinomialNB()
        }
        
        # 训练历史
        self.training_history = {
            'accuracy': [],
            'precision': [],
            'recall': [],
            'f1_score': []
        }
        
    def preprocess_text(self, text: str) -> str:
        """文本预处理"""
        if not isinstance(text, str):
            return ""
            
        # 去除特殊字符,保留中文、英文和数字
        import re
        text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z0-9\s]', '', text)
        
        # 分词
        words = jieba.cut(text)
        
        # 去除停用词(简化版)
        stop_words = {'的', '了', '在', '是', '我', '有', '和', '就', '不', '人', 
                     '都', '一', '一个', '上', '也', '很', '到', '说', '要', '去', 
                     '你', '会', '着', '没有', '看', '好', '自己', '这'}
        
        processed_words = [word for word in words 
                          if len(word.strip()) > 1 and word not in stop_words]
        
        return ' '.join(processed_words)
        
    def extract_features(self, texts: list) -> np.ndarray:
        """
        特征提取
        
        Args:
            texts: 文本列表
            
        Returns:
            特征矩阵
        """
        features = []
        
        for text in texts:
            processed_text = self.preprocess_text(text)
            words = processed_text.split()
            
            if self.embedding_model and hasattr(self.embedding_model, 'get_word_vector'):
                # 使用词嵌入特征
                word_vectors = []
                for word in words:
                    try:
                        vector = self.embedding_model.get_word_vector(word)
                        if np.linalg.norm(vector) > 0:  # 确保向量非零
                            word_vectors.append(vector)
                    except:
                        continue
                
                if word_vectors:
                    # 平均词向量作为文档向量
                    doc_vector = np.mean(word_vectors, axis=0)
                else:
                    # 如果没有有效词向量,使用零向量
                    doc_vector = np.zeros(50)  # 假设词向量维度为50
                    
                features.append(doc_vector)
                
            else:
                # 使用TF-IDF特征(简化版)
                feature_dict = self.compute_tfidf_features(words)
                features.append(feature_dict)
        
        if self.embedding_model:
            return np.array(features)
        else:
            # 转换为矩阵形式
            return self.dict_features_to_matrix(features)
            
    def compute_tfidf_features(self, words: list) -> dict:
        """计算TF-IDF特征(简化版)"""
        word_count = Counter(words)
        total_words = len(words)
        
        # TF (Term Frequency)
        tf_features = {word: count/total_words for word, count in word_count.items()}
        
        return tf_features
        
    def dict_features_to_matrix(self, features_list: list) -> np.ndarray:
        """将字典特征转换为矩阵"""
        # 收集所有特征名
        all_features = set()
        for features in features_list:
            all_features.update(features.keys())
        
        self.feature_names = sorted(list(all_features))
        
        # 构建特征矩阵
        matrix = np.zeros((len(features_list), len(self.feature_names)))
        for i, features in enumerate(features_list):
            for j, feature_name in enumerate(self.feature_names):
                matrix[i, j] = features.get(feature_name, 0)
                
        return matrix
        
    def encode_labels(self, labels: list) -> np.ndarray:
        """标签编码"""
        unique_labels = sorted(list(set(labels)))
        self.label_encoder = {label: idx for idx, label in enumerate(unique_labels)}
        encoded_labels = np.array([self.label_encoder[label] for label in labels])
        return encoded_labels
        
    def decode_labels(self, encoded_labels: np.ndarray) -> list:
        """标签解码"""
        idx_to_label = {idx: label for label, idx in self.label_encoder.items()}
        return [idx_to_label[idx] for idx in encoded_labels]
        
    def train(self, texts: list, labels: list, test_size: float = 0.2):
        """训练分类模型"""
        print("🚀 开始训练新闻分类系统...")
        
        # 特征提取
        print("📊 提取特征...")
        X = self.extract_features(texts)
        y = self.encode_labels(labels)
        
        print(f"特征矩阵形状: {X.shape}")
        print(f"标签分布: {Counter(labels)}")
        
        # 数据分割
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=test_size, random_state=42, stratify=y
        )
        
        # 选择并训练分类器
        self.classifier = self.classifiers[self.classifier_type]
        
        print(f"🔧 使用 {self.classifier_type} 分类器训练...")
        self.classifier.fit(X_train, y_train)
        
        # 评估模型
        y_pred = self.classifier.predict(X_test)
        
        # 计算评估指标
        accuracy = accuracy_score(y_test, y_pred)
        precision, recall, f1, _ = precision_recall_fscore_support(y_test, y_pred, average='weighted')
        
        # 记录训练历史
        self.training_history['accuracy'].append(accuracy)
        self.training_history['precision'].append(precision)
        self.training_history['recall'].append(recall)
        self.training_history['f1_score'].append(f1)
        
        print(f"✅ 训练完成!")
        print(f"📊 测试集准确率: {accuracy:.4f}")
        print(f"📊 精确率: {precision:.4f}")
        print(f"📊 召回率: {recall:.4f}")
        print(f"📊 F1分数: {f1:.4f}")
        
        # 绘制混淆矩阵
        self.plot_confusion_matrix(y_test, y_pred)
        
        return {
            'accuracy': accuracy,
            'precision': precision,
            'recall': recall,
            'f1_score': f1
        }
        
    def predict(self, texts: list) -> list:
        """预测文本类别"""
        if self.classifier is None:
            raise ValueError("模型尚未训练!请先调用train方法。")
            
        X = self.extract_features(texts)
        y_pred_encoded = self.classifier.predict(X)
        y_pred = self.decode_labels(y_pred_encoded)
        
        return y_pred
        
    def predict_proba(self, texts: list) -> np.ndarray:
        """预测类别概率"""
        if self.classifier is None:
            raise ValueError("模型尚未训练!请先调用train方法。")
            
        X = self.extract_features(texts)
        
        if hasattr(self.classifier, 'predict_proba'):
            return self.classifier.predict_proba(X)
        else:
            # 对于不支持概率预测的分类器,返回0/1预测
            predictions = self.classifier.predict(X)
            proba = np.zeros((len(predictions), len(self.label_encoder)))
            for i, pred in enumerate(predictions):
                proba[i, pred] = 1.0
            return proba
            
    def plot_confusion_matrix(self, y_true: np.ndarray, y_pred: np.ndarray):
        """绘制混淆矩阵"""
        # 计算混淆矩阵
        cm = confusion_matrix(y_true, y_pred)
        
        # 获取类别标签
        labels = self.decode_labels(sorted(set(y_true)))
        
        # 绘制热力图
        plt.figure(figsize=(10, 8))
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
                   xticklabels=labels, yticklabels=labels)
        plt.title(f'{self.classifier_type} 分类器混淆矩阵', fontsize=16, fontweight='bold')
        plt.xlabel('预测标签')
        plt.ylabel('真实标签')
        plt.tight_layout()
        plt.show()
        
    def analyze_feature_importance(self, top_k: int = 20):
        """分析特征重要性"""
        if self.classifier is None:
            print("模型尚未训练!")
            return
            
        if hasattr(self.classifier, 'feature_importances_'):
            # 随机森林等基于树的模型
            importances = self.classifier.feature_importances_
            feature_names = self.feature_names if self.feature_names else [f'feature_{i}' for i in range(len(importances))]
            
        elif hasattr(self.classifier, 'coef_'):
            # 线性模型
            importances = np.abs(self.classifier.coef_[0])
            feature_names = self.feature_names if self.feature_names else [f'feature_{i}' for i in range(len(importances))]
            
        else:
            print(f"{self.classifier_type} 分类器不支持特征重要性分析")
            return
            
        # 排序并选择top_k
        indices = np.argsort(importances)[::-1][:top_k]
        top_importances = importances[indices]
        top_features = [feature_names[i] for i in indices]
        
        # 绘制特征重要性
        plt.figure(figsize=(12, 8))
        bars = plt.barh(range(len(top_features)), top_importances)
        plt.yticks(range(len(top_features)), top_features)
        plt.xlabel('重要性分数')
        plt.title(f'Top {top_k} 重要特征 ({self.classifier_type})', fontsize=16, fontweight='bold')
        plt.gca().invert_yaxis()
        
        # 添加数值标签
        for i, (bar, importance) in enumerate(zip(bars, top_importances)):
            plt.text(bar.get_width() + 0.001, bar.get_y() + bar.get_height()/2, 
                    f'{importance:.4f}', ha='left', va='center')
        
        plt.tight_layout()
        plt.show()
        
        return list(zip(top_features, top_importances))

# 创建模拟新闻数据
def create_sample_news_data():
    """创建示例新闻数据"""
    news_data = {
        '科技': [
            "人工智能技术在医疗诊断领域取得重大突破,深度学习算法能够准确识别癌症细胞",
            "最新的量子计算机芯片问世,计算能力比传统芯片提升千倍",
            "自动驾驶汽车通过机器学习算法,在复杂路况下实现安全驾驶",
            "区块链技术在金融领域应用广泛,提供更安全的交易环境",
            "5G网络建设加速推进,为物联网发展奠定基础",
            "机器人在制造业中的应用越来越广泛,提高生产效率",
            "云计算平台为企业提供强大的数据处理能力",
            "虚拟现实技术在教育培训中展现出巨大潜力"
        ],
        '体育': [
            "足球世界杯决赛今晚举行,两支强队将展开激烈角逐",
            "篮球联赛总决赛第六场比赛,主队以微弱优势获胜",
            "奥运会游泳比赛中,运动员打破世界纪录创造历史",
            "网球公开赛男单决赛,顶级选手上演精彩对决",
            "马拉松比赛在雨中进行,参赛者展现出顽强意志",
            "乒乓球世锦赛团体赛,中国队成功卫冕冠军",
            "羽毛球超级赛决赛,选手发挥出色赢得观众喝彩",
            "田径锦标赛跳高比赛,运动员刷新个人最好成绩"
        ],
        '财经': [
            "股市今日大幅上涨,科技股表现尤为亮眼,投资者信心增强",
            "央行宣布降准政策,为市场释放流动性,促进经济发展",
            "房地产市场调控政策出台,房价涨幅有所放缓",
            "新能源汽车销量创新高,相关产业链公司股价上涨",
            "数字货币监管政策明确,行业发展更加规范化",
            "银行业净利润稳步增长,资产质量持续改善",
            "保险行业保费收入增加,市场竞争日趋激烈",
            "基金管理规模扩大,为投资者提供更多选择"
        ],
        '娱乐': [
            "知名导演新电影上映,票房表现超出预期,观众好评如潮",
            "流行歌手发布新专辑,音乐风格独特获得粉丝喜爱",
            "电视剧收视率创新高,剧情精彩演员表演出色",
            "综艺节目创新形式,嘉宾互动有趣引发热议",
            "音乐节现场气氛热烈,歌手与观众互动频繁",
            "时尚周活动精彩纷呈,设计师展示最新作品",
            "网络直播平台用户增长,内容创作者收入提升",
            "游戏行业发展迅速,新游戏上线受到玩家追捧"
        ]
    }
    
    # 展平数据
    texts = []
    labels = []
    for category, articles in news_data.items():
        texts.extend(articles)
        labels.extend([category] * len(articles))
    
    return texts, labels

# 演示新闻分类系统
print("=" * 60)
print("🚀 新闻分类系统演示")
print("=" * 60)

# 创建示例数据
texts, labels = create_sample_news_data()
print(f"📊 数据集大小: {len(texts)} 篇新闻")
print(f"📊 类别分布: {Counter(labels)}")

# 测试不同的分类器
classifiers_to_test = ['logistic', 'rf', 'nb']
results = {}

for classifier_type in classifiers_to_test:
    print(f"\n{'='*40}")
    print(f"🔧 测试 {classifier_type} 分类器")
    print(f"{'='*40}")
    
    # 创建分类系统
    news_classifier = NewsClassificationSystem(
        embedding_model=None,  # 使用TF-IDF特征
        classifier_type=classifier_type
    )
    
    # 训练模型
    result = news_classifier.train(texts, labels, test_size=0.3)
    results[classifier_type] = result
    
    # 分析特征重要性
    if classifier_type != 'nb':  # 朴素贝叶斯不支持特征重要性
        news_classifier.analyze_feature_importance(top_k=15)

# 结果对比
print(f"\n{'='*60}")
print("📊 分类器性能对比")
print(f"{'='*60}")

comparison_df = pd.DataFrame(results).T
comparison_df.columns = ['准确率', '精确率', '召回率', 'F1分数']
print(comparison_df.round(4))

# 绘制性能对比图
plt.figure(figsize=(12, 8))
metrics = ['准确率', '精确率', '召回率', 'F1分数']
x = np.arange(len(classifiers_to_test))
width = 0.2

for i, metric in enumerate(metrics):
    values = [results[clf][list(results[clf].keys())[i]] for clf in classifiers_to_test]
    plt.bar(x + i*width, values, width, label=metric)

plt.xlabel('分类器类型')
plt.ylabel('分数')
plt.title('不同分类器性能对比', fontsize=16, fontweight='bold')
plt.xticks(x + width*1.5, classifiers_to_test)
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

# 测试预测功能
print(f"\n{'='*40}")
print("🔍 新闻分类预测测试")
print(f"{'='*40}")

# 使用表现最好的分类器
best_classifier = max(results.keys(), key=lambda x: results[x]['accuracy'])
print(f"🏆 最佳分类器: {best_classifier} (准确率: {results[best_classifier]['accuracy']:.4f})")

# 重新训练最佳分类器
best_news_classifier = NewsClassificationSystem(classifier_type=best_classifier)
best_news_classifier.train(texts, labels, test_size=0.2)

# 测试新闻
test_news = [
    "苹果公司发布最新款智能手机,搭载先进的人工智能芯片,拍照功能显著提升",
    "今晚的足球比赛非常精彩,两队在九十分钟内打成平局,最终通过点球决出胜负",
    "股票市场今日收盘时大幅下跌,投资者对经济前景表示担忧,纷纷抛售股票",
    "著名演员主演的新电影即将上映,预告片发布后引发网友热烈讨论和期待"
]

predictions = best_news_classifier.predict(test_news)
probabilities = best_news_classifier.predict_proba(test_news)

print(f"\n📰 测试新闻分类结果:")
for i, (news, pred) in enumerate(zip(test_news, predictions)):
    print(f"\n新闻 {i+1}: {news[:50]}...")
    print(f"预测类别: {pred}")
    
    # 显示所有类别的概率
    label_names = best_news_classifier.decode_labels(range(len(best_news_classifier.label_encoder)))
    print("各类别概率:")
    for j, (label, prob) in enumerate(zip(label_names, probabilities[i])):
        print(f"  {label}: {prob:.4f}")

print("\n✅ 新闻分类系统演示完成!")

3.2 特征工程与模型优化

🔧 高级特征工程技术
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import PCA, TruncatedSVD
from sklearn.preprocessing import StandardScaler
import numpy as np

class AdvancedFeatureExtractor:
    """高级特征提取器"""
    
    def __init__(self, use_embedding=True, use_tfidf=True, use_ngrams=True):
        """
        初始化特征提取器
        
        Args:
            use_embedding: 是否使用词嵌入特征
            use_tfidf: 是否使用TF-IDF特征
            use_ngrams: 是否使用N-gram特征
        """
        self.use_embedding = use_embedding
        self.use_tfidf = use_tfidf
        self.use_ngrams = use_ngrams
        
        # 特征提取器
        self.tfidf_vectorizer = None
        self.scaler = StandardScaler()
        self.pca = None
        
        # 特征统计
        self.feature_stats = {}
        
    def extract_linguistic_features(self, text: str) -> dict:
        """提取语言学特征"""
        features = {}
        
        # 基本统计特征
        features['text_length'] = len(text)
        features['word_count'] = len(text.split())
        features['avg_word_length'] = np.mean([len(word) for word in text.split()]) if text.split() else 0
        features['sentence_count'] = len([s for s in text.split('。') if s.strip()])
        
        # 词性特征
        words_with_pos = pseg.cut(text)
        pos_counts = Counter()
        for word, pos in words_with_pos:
            pos_counts[pos] += 1
        
        total_pos = sum(pos_counts.values())
        if total_pos > 0:
            features['noun_ratio'] = pos_counts['n'] / total_pos
            features['verb_ratio'] = pos_counts['v'] / total_pos
            features['adj_ratio'] = pos_counts['a'] / total_pos
            features['adv_ratio'] = pos_counts['d'] / total_pos
        else:
            features['noun_ratio'] = features['verb_ratio'] = 0
            features['adj_ratio'] = features['adv_ratio'] = 0
        
        # 字符特征
        features['chinese_char_ratio'] = len([c for c in text if '\u4e00' <= c <= '\u9fff']) / len(text) if text else 0
        features['digit_ratio'] = len([c for c in text if c.isdigit()]) / len(text) if text else 0
        features['punctuation_ratio'] = len([c for c in text if c in ',。!?;:']) / len(text) if text else 0
        
        return features
        
    def extract_tfidf_features(self, texts: list, max_features: int = 5000) -> np.ndarray:
        """提取TF-IDF特征"""
        if self.tfidf_vectorizer is None:
            self.tfidf_vectorizer = TfidfVectorizer(
                max_features=max_features,
                ngram_range=(1, 2) if self.use_ngrams else (1, 1),
                min_df=2,
                max_df=0.95,
                stop_words=None  # 我们已经在预处理中处理了停用词
            )
            tfidf_matrix = self.tfidf_vectorizer.fit_transform(texts)
        else:
            tfidf_matrix = self.tfidf_vectorizer.transform(texts)
            
        return tfidf_matrix.toarray()
        
    def extract_embedding_features(self, texts: list, embedding_model) -> np.ndarray:
        """提取词嵌入特征"""
        features = []
        
        for text in texts:
            words = text.split()
            word_vectors = []
            
            for word in words:
                try:
                    if hasattr(embedding_model, 'get_word_vector'):
                        vector = embedding_model.get_word_vector(word)
                    else:
                        # 假设是预训练的词向量字典
                        vector = embedding_model.get(word, np.zeros(50))
                    
                    if np.linalg.norm(vector) > 0:
                        word_vectors.append(vector)
                except:
                    continue
            
            if word_vectors:
                # 计算多种聚合统计量
                word_vectors = np.array(word_vectors)
                doc_vector = np.concatenate([
                    np.mean(word_vectors, axis=0),  # 平均值
                    np.max(word_vectors, axis=0),   # 最大值
                    np.min(word_vectors, axis=0),   # 最小值
                    np.std(word_vectors, axis=0)    # 标准差
                ])
            else:
                # 零向量填充
                embedding_dim = 50  # 假设词向量维度
                doc_vector = np.zeros(embedding_dim * 4)
                
            features.append(doc_vector)
            
        return np.array(features)
        
    def combine_features(self, texts: list, embedding_model=None) -> np.ndarray:
        """组合多种特征"""
        all_features = []
        
        # 1. 语言学特征
        linguistic_features = []
        for text in texts:
            ling_feat = self.extract_linguistic_features(text)
            linguistic_features.append(list(ling_feat.values()))
        linguistic_features = np.array(linguistic_features)
        all_features.append(linguistic_features)
        print(f"📊 语言学特征维度: {linguistic_features.shape}")
        
        # 2. TF-IDF特征
        if self.use_tfidf:
            tfidf_features = self.extract_tfidf_features(texts)
            all_features.append(tfidf_features)
            print(f"📊 TF-IDF特征维度: {tfidf_features.shape}")
        
        # 3. 词嵌入特征
        if self.use_embedding and embedding_model:
            embedding_features = self.extract_embedding_features(texts, embedding_model)
            all_features.append(embedding_features)
            print(f"📊 词嵌入特征维度: {embedding_features.shape}")
        
        # 组合所有特征
        combined_features = np.hstack(all_features)
        print(f"📊 组合特征总维度: {combined_features.shape}")
        
        return combined_features
        
    def apply_feature_selection(self, X: np.ndarray, y: np.ndarray, n_components: int = 100):
        """应用特征选择/降维"""
        print(f"🔧 应用PCA降维 (保留 {n_components} 维)")
        
        # 标准化
        X_scaled = self.scaler.fit_transform(X)
        
        # PCA降维
        self.pca = PCA(n_components=min(n_components, X.shape[1]))
        X_reduced = self.pca.fit_transform(X_scaled)
        
        # 计算解释方差比
        explained_variance = self.pca.explained_variance_ratio_
        cumulative_variance = np.cumsum(explained_variance)
        
        print(f"📊 前{n_components}个主成分解释方差比: {cumulative_variance[-1]:.3f}")
        
        # 绘制解释方差图
        plt.figure(figsize=(12, 5))
        
        plt.subplot(1, 2, 1)
        plt.plot(range(1, len(explained_variance) + 1), explained_variance, 'bo-')
        plt.xlabel('主成分')
        plt.ylabel('解释方差比')
        plt.title('各主成分解释方差比')
        plt.grid(True, alpha=0.3)
        
        plt.subplot(1, 2, 2)
        plt.plot(range(1, len(cumulative_variance) + 1), cumulative_variance, 'ro-')
        plt.xlabel('主成分数量')
        plt.ylabel('累积解释方差比')
        plt.title('累积解释方差比')
        plt.axhline(y=0.95, color='g', linestyle='--', label='95%阈值')
        plt.legend()
        plt.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
        return X_reduced
        
    def transform(self, texts: list, embedding_model=None) -> np.ndarray:
        """转换新数据"""
        # 组合特征
        X = self.combine_features(texts, embedding_model)
        
        # 标准化
        X_scaled = self.scaler.transform(X)
        
        # PCA变换
        if self.pca:
            X_transformed = self.pca.transform(X_scaled)
        else:
            X_transformed = X_scaled
            
        return X_transformed

# 演示高级特征工程
print("=" * 60)
print("🔧 高级特征工程演示")
print("=" * 60)

# 使用之前的新闻数据
texts, labels = create_sample_news_data()

# 创建高级特征提取器
advanced_extractor = AdvancedFeatureExtractor(
    use_embedding=False,  # 暂时不使用词嵌入
    use_tfidf=True,
    use_ngrams=True
)

# 提取组合特征
X_combined = advanced_extractor.combine_features(texts)

# 编码标签
news_classifier = NewsClassificationSystem()
y_encoded = news_classifier.encode_labels(labels)

# 应用特征选择
X_reduced = advanced_extractor.apply_feature_selection(X_combined, y_encoded, n_components=50)

# 使用降维后的特征训练模型
from sklearn.model_selection import cross_val_score
from sklearn.metrics import classification_report

print(f"\n🔧 使用高级特征训练模型...")

# 交叉验证评估
classifiers = {
    'Logistic Regression': LogisticRegression(random_state=42, max_iter=1000),
    'Random Forest': RandomForestClassifier(random_state=42, n_estimators=100),
    'SVM': None  # 简化演示,不包含SVM
}

for name, clf in classifiers.items():
    if clf is not None:
        scores = cross_val_score(clf, X_reduced, y_encoded, cv=5, scoring='accuracy')
        print(f"{name} - 交叉验证准确率: {scores.mean():.4f}{scores.std()*2:.4f})")

print("\n✅ 高级特征工程演示完成!")

📊 第四章:评估指标与性能优化

4.1 NLP任务评估指标体系

🎯 分类任务评估指标

在NLP任务中,选择合适的评估指标至关重要。不同的指标反映了模型性能的不同方面。

import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import *
from sklearn.model_selection import learning_curve, validation_curve
import warnings
warnings.filterwarnings('ignore')

class NLPEvaluationMetrics:
    """NLP评估指标计算器"""
    
    def __init__(self):
        self.metrics_history = {}
        
    def calculate_basic_metrics(self, y_true, y_pred, labels=None):
        """计算基础分类指标"""
        metrics = {}
        
        # 基础指标
        metrics['accuracy'] = accuracy_score(y_true, y_pred)
        metrics['precision_macro'] = precision_score(y_true, y_pred, average='macro', zero_division=0)
        metrics['recall_macro'] = recall_score(y_true, y_pred, average='macro', zero_division=0)
        metrics['f1_macro'] = f1_score(y_true, y_pred, average='macro', zero_division=0)
        
        # 加权指标
        metrics['precision_weighted'] = precision_score(y_true, y_pred, average='weighted', zero_division=0)
        metrics['recall_weighted'] = recall_score(y_true, y_pred, average='weighted', zero_division=0)
        metrics['f1_weighted'] = f1_score(y_true, y_pred, average='weighted', zero_division=0)
        
        # 每个类别的详细指标
        if labels:
            precision_per_class = precision_score(y_true, y_pred, average=None, zero_division=0)
            recall_per_class = recall_score(y_true, y_pred, average=None, zero_division=0)
            f1_per_class = f1_score(y_true, y_pred, average=None, zero_division=0)
            
            for i, label in enumerate(labels):
                if i < len(precision_per_class):
                    metrics[f'precision_{label}'] = precision_per_class[i]
                    metrics[f'recall_{label}'] = recall_per_class[i]
                    metrics[f'f1_{label}'] = f1_per_class[i]
        
        return metrics
        
    def calculate_advanced_metrics(self, y_true, y_pred_proba, y_pred=None):
        """计算高级评估指标"""
        metrics = {}
        
        # 如果没有提供预测类别,从概率中获取
        if y_pred is None:
            y_pred = np.argmax(y_pred_proba, axis=1)
        
        # ROC-AUC (多分类)
        try:
            if y_pred_proba.shape[1] == 2:
                # 二分类
                metrics['roc_auc'] = roc_auc_score(y_true, y_pred_proba[:, 1])
            else:
                # 多分类
                metrics['roc_auc_ovr'] = roc_auc_score(y_true, y_pred_proba, multi_class='ovr', average='macro')
                metrics['roc_auc_ovo'] = roc_auc_score(y_true, y_pred_proba, multi_class='ovo', average='macro')
        except Exception as e:
            print(f"ROC-AUC计算失败: {e}")
            
        # 对数损失
        try:
            metrics['log_loss'] = log_loss(y_true, y_pred_proba)
        except Exception as e:
            print(f"对数损失计算失败: {e}")
            
        # Cohen's Kappa
        metrics['cohen_kappa'] = cohen_kappa_score(y_true, y_pred)
        
        # Matthews相关系数(对于二分类)
        if len(np.unique(y_true)) == 2:
            metrics['matthews_corrcoef'] = matthews_corrcoef(y_true, y_pred)
        
        return metrics
        
    def plot_confusion_matrix_advanced(self, y_true, y_pred, labels, normalize=False):
        """绘制高级混淆矩阵"""
        cm = confusion_matrix(y_true, y_pred)
        
        if normalize:
            cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
            title = '归一化混淆矩阵'
            fmt = '.2f'
        else:
            title = '混淆矩阵'
            fmt = 'd'
        
        plt.figure(figsize=(10, 8))
        sns.heatmap(cm, annot=True, fmt=fmt, cmap='Blues',
                   xticklabels=labels, yticklabels=labels)
        plt.title(title, fontsize=16, fontweight='bold')
        plt.xlabel('预测标签')
        plt.ylabel('真实标签')
        
        # 添加准确率信息
        if not normalize:
            accuracy = np.trace(cm) / np.sum(cm)
            plt.figtext(0.5, 0.02, f'总体准确率: {accuracy:.4f}', 
                       ha='center', fontsize=12, bbox=dict(boxstyle='round', facecolor='wheat'))
        
        plt.tight_layout()
        plt.show()
        
        return cm
        
    def plot_classification_report(self, y_true, y_pred, labels):
        """可视化分类报告"""
        report = classification_report(y_true, y_pred, target_names=labels, output_dict=True)
        
        # 提取每个类别的指标
        metrics_data = []
        for label in labels:
            if label in report:
                metrics_data.append([
                    report[label]['precision'],
                    report[label]['recall'],
                    report[label]['f1-score']
                ])
        
        metrics_data = np.array(metrics_data)
        
        # 绘制热力图
        plt.figure(figsize=(8, 6))
        sns.heatmap(metrics_data.T, annot=True, fmt='.3f', cmap='RdYlBu_r',
                   xticklabels=labels, yticklabels=['Precision', 'Recall', 'F1-Score'])
        plt.title('分类性能热力图', fontsize=16, fontweight='bold')
        plt.xlabel('类别')
        plt.ylabel('评估指标')
        plt.tight_layout()
        plt.show()
        
    def plot_learning_curves(self, estimator, X, y, cv=5, scoring='accuracy'):
        """绘制学习曲线"""
        train_sizes, train_scores, val_scores = learning_curve(
            estimator, X, y, cv=cv, scoring=scoring,
            train_sizes=np.linspace(0.1, 1.0, 10),
            random_state=42, n_jobs=-1
        )
        
        # 计算均值和标准差
        train_mean = np.mean(train_scores, axis=1)
        train_std = np.std(train_scores, axis=1)
        val_mean = np.mean(val_scores, axis=1)
        val_std = np.std(val_scores, axis=1)
        
        # 绘制学习曲线
        plt.figure(figsize=(10, 6))
        
        plt.fill_between(train_sizes, train_mean - train_std, train_mean + train_std,
                        alpha=0.1, color='blue')
        plt.fill_between(train_sizes, val_mean - val_std, val_mean + val_std,
                        alpha=0.1, color='red')
        
        plt.plot(train_sizes, train_mean, 'o-', color='blue', label='训练分数')
        plt.plot(train_sizes, val_mean, 'o-', color='red', label='验证分数')
        
        plt.title('学习曲线', fontsize=16, fontweight='bold')
        plt.xlabel('训练样本数')
        plt.ylabel(f'{scoring.capitalize()} Score')
        plt.legend(loc='best')
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.show()
        
        return train_sizes, train_scores, val_scores
        
    def plot_validation_curves(self, estimator, X, y, param_name, param_range, 
                             cv=5, scoring='accuracy'):
        """绘制验证曲线"""
        train_scores, val_scores = validation_curve(
            estimator, X, y, param_name=param_name, param_range=param_range,
            cv=cv, scoring=scoring, n_jobs=-1
        )
        
        # 计算均值和标准差
        train_mean = np.mean(train_scores, axis=1)
        train_std = np.std(train_scores, axis=1)
        val_mean = np.mean(val_scores, axis=1)
        val_std = np.std(val_scores, axis=1)
        
        # 绘制验证曲线
        plt.figure(figsize=(10, 6))
        
        plt.semilogx(param_range, train_mean, 'o-', color='blue', label='训练分数')
        plt.fill_between(param_range, train_mean - train_std, train_mean + train_std,
                        alpha=0.1, color='blue')
        
        plt.semilogx(param_range, val_mean, 'o-', color='red', label='验证分数')
        plt.fill_between(param_range, val_mean - val_std, val_mean + val_std,
                        alpha=0.1, color='red')
        
        plt.title(f'验证曲线 ({param_name})', fontsize=16, fontweight='bold')
        plt.xlabel(param_name)
        plt.ylabel(f'{scoring.capitalize()} Score')
        plt.legend(loc='best')
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.show()
        
        return train_scores, val_scores

# 演示评估指标系统
print("=" * 60)
print("📊 NLP评估指标系统演示")
print("=" * 60)

# 创建示例数据进行评估
texts, labels = create_sample_news_data()

# 训练一个模型用于演示
from sklearn.model_selection import train_test_split

# 特征提取
extractor = AdvancedFeatureExtractor(use_embedding=False, use_tfidf=True)
X = extractor.combine_features(texts)
y = news_classifier.encode_labels(labels)

# 数据分割
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42, stratify=y)

# 训练多个模型进行对比
models = {
    'Logistic Regression': LogisticRegression(random_state=42, max_iter=1000),
    'Random Forest': RandomForestClassifier(random_state=42, n_estimators=100)
}

# 创建评估器
evaluator = NLPEvaluationMetrics()

model_results = {}
unique_labels = sorted(list(set(labels)))

for model_name, model in models.items():
    print(f"\n{'='*40}")
    print(f"📊 评估 {model_name}")
    print(f"{'='*40}")
    
    # 训练模型
    model.fit(X_train, y_train)
    
    # 预测
    y_pred = model.predict(X_test)
    y_pred_proba = model.predict_proba(X_test) if hasattr(model, 'predict_proba') else None
    
    # 计算基础指标
    basic_metrics = evaluator.calculate_basic_metrics(y_test, y_pred, unique_labels)
    
    # 计算高级指标
    if y_pred_proba is not None:
        advanced_metrics = evaluator.calculate_advanced_metrics(y_test, y_pred_proba, y_pred)
        basic_metrics.update(advanced_metrics)
    
    model_results[model_name] = basic_metrics
    
    # 打印主要指标
    print(f"准确率: {basic_metrics['accuracy']:.4f}")
    print(f"宏平均F1: {basic_metrics['f1_macro']:.4f}")
    print(f"加权平均F1: {basic_metrics['f1_weighted']:.4f}")
    if 'cohen_kappa' in basic_metrics:
        print(f"Cohen's Kappa: {basic_metrics['cohen_kappa']:.4f}")
    
    # 绘制混淆矩阵
    cm = evaluator.plot_confusion_matrix_advanced(y_test, y_pred, unique_labels)
    
    # 绘制分类报告热力图
    evaluator.plot_classification_report(y_test, y_pred, unique_labels)

# 模型性能对比
print(f"\n{'='*60}")
print("📊 模型性能对比")
print(f"{'='*60}")

# 创建对比表格
comparison_metrics = ['accuracy', 'f1_macro', 'f1_weighted', 'precision_macro', 'recall_macro']
comparison_data = {}

for metric in comparison_metrics:
    comparison_data[metric] = [model_results[model][metric] for model in models.keys()]

comparison_df = pd.DataFrame(comparison_data, index=list(models.keys()))
print(comparison_df.round(4))

# 绘制性能对比雷达图
def plot_radar_chart(data, labels, title):
    """绘制雷达图"""
    angles = np.linspace(0, 2*np.pi, len(labels), endpoint=False).tolist()
    angles += angles[:1]  # 闭合图形
    
    fig, ax = plt.subplots(figsize=(8, 8), subplot_kw=dict(projection='polar'))
    
    colors = ['blue', 'red', 'green', 'orange']
    for i, (model_name, values) in enumerate(data.items()):
        values = values + [values[0]]  # 闭合数据
        ax.plot(angles, values, 'o-', linewidth=2, label=model_name, color=colors[i])
        ax.fill(angles, values, alpha=0.25, color=colors[i])
    
    ax.set_xticks(angles[:-1])
    ax.set_xticklabels(labels)
    ax.set_ylim(0, 1)
    ax.set_title(title, fontsize=16, fontweight='bold', pad=20)
    ax.legend(loc='upper right', bbox_to_anchor=(1.2, 1.0))
    ax.grid(True)
    
    plt.tight_layout()
    plt.show()

# 准备雷达图数据
radar_data = {}
radar_labels = ['Accuracy', 'F1-Macro', 'F1-Weighted', 'Precision', 'Recall']

for model_name in models.keys():
    radar_data[model_name] = [
        model_results[model_name]['accuracy'],
        model_results[model_name]['f1_macro'],
        model_results[model_name]['f1_weighted'],
        model_results[model_name]['precision_macro'],
        model_results[model_name]['recall_macro']
    ]

plot_radar_chart(radar_data, radar_labels, '模型性能对比雷达图')

# 学习曲线分析
print(f"\n📈 学习曲线分析")
best_model = RandomForestClassifier(random_state=42, n_estimators=50)  # 减少树的数量以加快演示
evaluator.plot_learning_curves(best_model, X, y, cv=3, scoring='accuracy')

# 验证曲线分析
print(f"\n📈 验证曲线分析 (Random Forest n_estimators)")
param_range = [10, 25, 50, 75, 100, 150, 200]
rf_model = RandomForestClassifier(random_state=42)
evaluator.plot_validation_curves(
    rf_model, X, y, 
    param_name='n_estimators', 
    param_range=param_range,
    cv=3, scoring='accuracy'
)

print("\n✅ 评估指标系统演示完成!")

### 4.2 模型优化与调参策略

#### 🔧 超参数优化

```python
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
import time

class ModelOptimizer:
    """模型优化器"""
    
    def __init__(self):
        self.best_models = {}
        self.optimization_history = {}
        
    def grid_search_optimization(self, model, param_grid, X, y, cv=5, scoring='accuracy'):
        """网格搜索优化"""
        print(f"🔍 开始网格搜索优化...")
        start_time = time.time()
        
        # 创建网格搜索对象
        grid_search = GridSearchCV(
            estimator=model,
            param_grid=param_grid,
            cv=cv,
            scoring=scoring,
            n_jobs=-1,
            verbose=1
        )
        
        # 执行搜索
        grid_search.fit(X, y)
        
        end_time = time.time()
        
        # 保存结果
        model_name = model.__class__.__name__
        self.best_models[f'{model_name}_grid'] = grid_search.best_estimator_
        self.optimization_history[f'{model_name}_grid'] = {
            'best_score': grid_search.best_score_,
            'best_params': grid_search.best_params_,
            'optimization_time': end_time - start_time,
            'cv_results': grid_search.cv_results_
        }
        
        print(f"✅ 网格搜索完成!")
        print(f"⏱️  耗时: {end_time - start_time:.2f} 秒")
        print(f"🏆 最佳分数: {grid_search.best_score_:.4f}")
        print(f"🎯 最佳参数: {grid_search.best_params_}")
        
        return grid_search
        
    def random_search_optimization(self, model, param_distributions, X, y, 
                                 n_iter=100, cv=5, scoring='accuracy'):
        """随机搜索优化"""
        print(f"🎲 开始随机搜索优化...")
        start_time = time.time()
        
        # 创建随机搜索对象
        random_search = RandomizedSearchCV(
            estimator=model,
            param_distributions=param_distributions,
            n_iter=n_iter,
            cv=cv,
            scoring=scoring,
            n_jobs=-1,
            random_state=42,
            verbose=1
        )
        
        # 执行搜索
        random_search.fit(X, y)
        
        end_time = time.time()
        
        # 保存结果
        model_name = model.__class__.__name__
        self.best_models[f'{model_name}_random'] = random_search.best_estimator_
        self.optimization_history[f'{model_name}_random'] = {
            'best_score': random_search.best_score_,
            'best_params': random_search.best_params_,
            'optimization_time': end_time - start_time,
            'cv_results': random_search.cv_results_
        }
        
        print(f"✅ 随机搜索完成!")
        print(f"⏱️  耗时: {end_time - start_time:.2f} 秒")
        print(f"🏆 最佳分数: {random_search.best_score_:.4f}")
        print(f"🎯 最佳参数: {random_search.best_params_}")
        
        return random_search
        
    def plot_optimization_results(self, search_result, param_name1, param_name2=None):
        """可视化优化结果"""
        results = search_result.cv_results_
        
        if param_name2 is None:
            # 单参数优化结果
            param_values = [params[param_name1] for params in results['params']]
            scores = results['mean_test_score']
            
            plt.figure(figsize=(10, 6))
            plt.plot(param_values, scores, 'bo-')
            plt.xlabel(param_name1)
            plt.ylabel('交叉验证分数')
            plt.title(f'{param_name1} 优化结果')
            plt.grid(True, alpha=0.3)
            plt.show()
        else:
            # 双参数优化结果(仅适用于网格搜索)
            param1_values = sorted(list(set([params[param_name1] for params in results['params']])))
            param2_values = sorted(list(set([params[param_name2] for params in results['params']])))
            
            # 创建分数矩阵
            score_matrix = np.zeros((len(param2_values), len(param1_values)))
            
            for i, params in enumerate(results['params']):
                p1_idx = param1_values.index(params[param_name1])
                p2_idx = param2_values.index(params[param_name2])
                score_matrix[p2_idx, p1_idx] = results['mean_test_score'][i]
            
            # 绘制热力图
            plt.figure(figsize=(10, 8))
            sns.heatmap(score_matrix, 
                       xticklabels=param1_values, 
                       yticklabels=param2_values,
                       annot=True, fmt='.3f', cmap='YlOrRd')
            plt.xlabel(param_name1)
            plt.ylabel(param_name2)
            plt.title(f'{param_name1} vs {param_name2} 优化热力图')
            plt.tight_layout()
            plt.show()
            
    def compare_optimization_methods(self):
        """对比不同优化方法的结果"""
        if not self.optimization_history:
            print("没有优化历史数据!")
            return
            
        # 创建对比表格
        comparison_data = []
        for method_name, history in self.optimization_history.items():
            comparison_data.append({
                '方法': method_name,
                '最佳分数': history['best_score'],
                '优化时间(秒)': history['optimization_time'],
                '参数数量': len(history['best_params'])
            })
        
        comparison_df = pd.DataFrame(comparison_data)
        print("🔍 优化方法对比:")
        print(comparison_df.round(4))
        
        # 可视化对比
        fig, axes = plt.subplots(1, 2, figsize=(15, 6))
        
        # 分数对比
        methods = [data['方法'] for data in comparison_data]
        scores = [data['最佳分数'] for data in comparison_data]
        times = [data['优化时间(秒)'] for data in comparison_data]
        
        axes[0].bar(methods, scores, color=['skyblue', 'lightcoral', 'lightgreen'])
        axes[0].set_title('最佳分数对比')
        axes[0].set_ylabel('交叉验证分数')
        axes[0].tick_params(axis='x', rotation=45)
        
        # 时间对比
        axes[1].bar(methods, times, color=['skyblue', 'lightcoral', 'lightgreen'])
        axes[1].set_title('优化时间对比')
        axes[1].set_ylabel('时间 (秒)')
        axes[1].tick_params(axis='x', rotation=45)
        
        plt.tight_layout()
        plt.show()

# 演示模型优化
print("=" * 60)
print("🔧 模型优化与调参演示")
print("=" * 60)

# 使用之前的数据
optimizer = ModelOptimizer()

# 随机森林优化
print("\n🌲 Random Forest 优化")
rf_model = RandomForestClassifier(random_state=42)

# 定义参数空间
rf_param_grid = {
    'n_estimators': [50, 100, 150],
    'max_depth': [3, 5, 7, None],
    'min_samples_split': [2, 5, 10],
    'min_samples_leaf': [1, 2, 4]
}

# 网格搜索(为了演示速度,减少参数空间)
rf_grid_small = {
    'n_estimators': [50, 100],
    'max_depth': [3, 5, None],
    'min_samples_split': [2, 5]
}

# 执行网格搜索
rf_grid_search = optimizer.grid_search_optimization(
    rf_model, rf_grid_small, X_train, y_train, cv=3
)

# 随机搜索参数分布
from scipy.stats import randint

rf_param_dist = {
    'n_estimators': randint(50, 200),
    'max_depth': [3, 5, 7, 10, None],
    'min_samples_split': randint(2, 11),
    'min_samples_leaf': randint(1, 5),
    'max_features': ['sqrt', 'log2', None]
}

# 执行随机搜索
rf_random_search = optimizer.random_search_optimization(
    rf_model, rf_param_dist, X_train, y_train, n_iter=20, cv=3
)

# 可视化优化结果
optimizer.plot_optimization_results(rf_grid_search, 'n_estimators', 'max_depth')

# 逻辑回归优化
print("\n📊 Logistic Regression 优化")
lr_model = LogisticRegression(random_state=42, max_iter=1000)

# 参数网格
lr_param_grid = {
    'C': [0.1, 1.0, 10.0],
    'penalty': ['l1', 'l2'],
    'solver': ['liblinear', 'saga']
}

# 执行网格搜索
lr_grid_search = optimizer.grid_search_optimization(
    lr_model, lr_param_grid, X_train, y_train, cv=3
)

# 对比优化方法
optimizer.compare_optimization_methods()

# 使用最佳模型进行最终评估
print(f"\n🏆 最佳模型最终评估")
best_rf_model = optimizer.best_models['RandomForestClassifier_grid']
best_lr_model = optimizer.best_models['LogisticRegression_grid']

# 在测试集上评估
final_models = {
    'Best RF': best_rf_model,
    'Best LR': best_lr_model
}

final_results = {}
for name, model in final_models.items():
    y_pred = model.predict(X_test)
    y_pred_proba = model.predict_proba(X_test) if hasattr(model, 'predict_proba') else None
    
    metrics = evaluator.calculate_basic_metrics(y_test, y_pred, unique_labels)
    if y_pred_proba is not None:
        advanced_metrics = evaluator.calculate_advanced_metrics(y_test, y_pred_proba, y_pred)
        metrics.update(advanced_metrics)
    
    final_results[name] = metrics
    
    print(f"\n{name} 测试集性能:")
    print(f"  准确率: {metrics['accuracy']:.4f}")
    print(f"  F1-宏平均: {metrics['f1_macro']:.4f}")
    print(f"  F1-加权平均: {metrics['f1_weighted']:.4f}")
'''''
# 特征重要性分析(针对随机森林)
print(f"\n🔍 特征重要性分析")
if hasattr(best_rf_model, 'feature_importances_'):
    importances = best_rf_model.feature_importances_
    feature_names = [f'Feature_{i}' for i in range(len(importances))]
    
    # 选择前20个重要特征
    indices = np.argsort(importances)[::-1][:20]
    
    plt.figure(figsize=(12, 8))
    plt.bar(range(len(indices)), importances[indices])
    plt.title('Top 20 特征重要性 (随机森林)', fontsize=16, fontweight='bold')
    plt.xlabel('特征索引')
    plt.ylabel('重要性分数')
    plt.xticks(range(len(indices)), [f'F{i}' for i in indices], rotation=45)
    plt.tight_layout()
    plt.show()

print("\n✅ 模型优化与调参演示完成!")

💡 第五章:总结与最佳实践

5.1 NLP与词嵌入核心知识总结

🎯 核心概念回顾

通过本文的深入学习,我们掌握了NLP和词嵌入的核心技术。让我们回顾一下关键知识点:

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

class NLPKnowledgeSummary:
    """NLP知识总结器"""
    
    def __init__(self):
        self.knowledge_points = {
            'NLP基础': {
                '文本预处理': ['分词', '去停用词', '词性标注', '词干提取'],
                '语言模型': ['N-gram模型', '统计语言模型', '概率计算'],
                '评估指标': ['困惑度', '准确率', 'F1分数', 'BLEU分数']
            },
            '词嵌入技术': {
                'Word2Vec': ['Skip-gram', 'CBOW', '负采样', '层次softmax'],
                'GloVe': ['全局统计', '矩阵分解', '共现矩阵', '加权最小二乘'],
                'FastText': ['子词信息', 'N-gram特征', '未登录词处理', '字符级建模']
            },
            '应用实践': {
                '文本分类': ['特征工程', '模型选择', '超参数优化', '性能评估'],
                '相似度计算': ['余弦相似度', '欧氏距离', '曼哈顿距离', '语义相似度'],
                '词汇分析': ['词汇类比', '语义关系', '相似词发现', '语义聚类']
            }
        }
        
    def create_knowledge_map(self):
        """创建知识地图"""
        print("🗺️ NLP与词嵌入知识地图")
        print("=" * 60)
        
        for main_category, sub_categories in self.knowledge_points.items():
            print(f"\n📚 {main_category}")
            for sub_name, techniques in sub_categories.items():
                print(f"  ├── {sub_name}")
                for technique in techniques:
                    print(f"  │   ├── {technique}")
                    
    def compare_embedding_methods_final(self):
        """最终的词嵌入方法对比"""
        comparison_data = {
            '特征': [
                '训练复杂度', '语义理解能力', '计算效率', '内存需求',
                '未登录词处理', '多义词处理', '稀有词效果', '可解释性'
            ],
            'Word2Vec': [
                '中等', '良好', '高', '中等',
                '无法处理', '有限', '一般', '中等'
            ],
            'GloVe': [
                '较高', '优秀', '中等', '较高',
                '无法处理', '良好', '一般', '较好'
            ],
            'FastText': [
                '较高', '良好', '中等', '高',
                '可以处理', '有限', '优秀', '中等'
            ]
        }
        
        df = pd.DataFrame(comparison_data)
        print("\n📊 词嵌入方法最终对比:")
        print(df.to_string(index=False))
        
        # 可视化对比
        methods = ['Word2Vec', 'GloVe', 'FastText']
        metrics = ['语义理解', '计算效率', '内存效率', '稀有词处理', '可解释性']
        
        # 模拟评分(1-5分)
        scores = {
            'Word2Vec': [4, 5, 4, 2, 3],
            'GloVe': [5, 3, 2, 2, 4],
            'FastText': [4, 3, 2, 5, 3]
        }
        
        # 绘制雷达图
        angles = np.linspace(0, 2*np.pi, len(metrics), endpoint=False).tolist()
        angles += angles[:1]
        
        fig, ax = plt.subplots(figsize=(10, 8), subplot_kw=dict(projection='polar'))
        
        colors = ['blue', 'red', 'green']
        for i, (method, score_list) in enumerate(scores.items()):
            score_list += [score_list[0]]  # 闭合图形
            ax.plot(angles, score_list, 'o-', linewidth=2, label=method, color=colors[i])
            ax.fill(angles, score_list, alpha=0.25, color=colors[i])
        
        ax.set_xticks(angles[:-1])
        ax.set_xticklabels(metrics)
        ax.set_ylim(0, 5)
        ax.set_title('词嵌入方法综合对比', fontsize=16, fontweight='bold', pad=20)
        ax.legend(loc='upper right', bbox_to_anchor=(1.3, 1.0))
        ax.grid(True)
        
        plt.tight_layout()
        plt.show()
        
    def summarize_applications(self):
        """总结应用场景"""
        applications = {
            '文本分类': {
                '适用场景': ['新闻分类', '情感分析', '垃圾邮件检测', '主题分类'],
                '推荐方法': 'TF-IDF + 机器学习算法',
                '优化建议': ['特征工程', '数据增强', '集成学习', '交叉验证']
            },
            '信息检索': {
                '适用场景': ['搜索引擎', '文档相似度', '问答系统', '推荐系统'],
                '推荐方法': 'Word2Vec/GloVe + 余弦相似度',
                '优化建议': ['查询扩展', '相关性反馈', '排序学习', '个性化推荐']
            },
            '机器翻译': {
                '适用场景': ['多语言翻译', '跨语言检索', '语言学习', '国际化应用'],
                '推荐方法': '双语词嵌入 + Seq2Seq模型',
                '优化建议': ['对齐算法', '注意力机制', '数据增强', '领域适应']
            },
            '语义分析': {
                '适用场景': ['实体识别', '关系抽取', '知识图谱', '语义解析'],
                '推荐方法': 'FastText + 深度学习',
                '优化建议': ['上下文建模', '多任务学习', '预训练模型', '知识注入']
            }
        }
        
        print(f"\n🎯 NLP应用场景总结:")
        for app_name, details in applications.items():
            print(f"\n📌 {app_name}")
            print(f"  适用场景: {', '.join(details['适用场景'])}")
            print(f"  推荐方法: {details['推荐方法']}")
            print(f"  优化建议: {', '.join(details['优化建议'])}")

# 创建知识总结
summary = NLPKnowledgeSummary()
summary.create_knowledge_map()
summary.compare_embedding_methods_final()
summary.summarize_applications()

### 5.2 实践中的最佳经验

#### 🛠️ 项目开发流程

```python
class NLPProjectBestPractices:
    """NLP项目最佳实践指南"""
    
    def __init__(self):
        self.project_phases = [
            '需求分析', '数据收集', '数据预处理', 
            '特征工程', '模型选择', '训练调优', 
            '评估验证', '部署上线', '监控维护'
        ]
        
    def show_development_workflow(self):
        """展示开发工作流程"""
        print("🔄 NLP项目开发流程最佳实践")
        print("=" * 60)
        
        workflow_details = {
            '1. 需求分析': [
                '明确业务目标',
                '定义评估指标',
                '确定技术约束',
                '评估数据需求'
            ],
            '2. 数据收集': [
                '多源数据获取',
                '数据质量评估',
                '标注策略制定',
                '数据平衡性检查'
            ],
            '3. 数据预处理': [
                '文本清洗规范化',
                '分词和词性标注',
                '去停用词和标点',
                '数据格式统一'
            ],
            '4. 特征工程': [
                '词嵌入方法选择',
                '特征组合策略',
                '降维技术应用',
                '特征选择优化'
            ],
            '5. 模型选择': [
                '基线模型建立',
                '多算法对比',
                '复杂度权衡',
                '可解释性考虑'
            ],
            '6. 训练调优': [
                '超参数优化',
                '交叉验证策略',
                '过拟合防控',
                '训练稳定性'
            ],
            '7. 评估验证': [
                '多指标综合评估',
                '错误案例分析',
                '泛化能力测试',
                '边界条件验证'
            ],
            '8. 部署上线': [
                '模型轻量化',
                '推理效率优化',
                'API接口设计',
                '监控系统搭建'
            ],
            '9. 监控维护': [
                '性能指标监控',
                '数据漂移检测',
                '模型更新策略',
                '用户反馈收集'
            ]
        }
        
        for phase, practices in workflow_details.items():
            print(f"\n{phase}")
            for practice in practices:
                print(f"  ✅ {practice}")
                
    def common_pitfalls_and_solutions(self):
        """常见问题及解决方案"""
        print(f"\n⚠️ 常见问题及解决方案")
        print("=" * 60)
        
        pitfalls = {
            '数据质量问题': {
                '问题描述': '数据噪声大、标注不一致、样本不平衡',
                '解决方案': [
                    '建立数据质量检查流程',
                    '使用多轮标注和一致性检查',
                    '采用数据增强和重采样技术',
                    '引入主动学习减少标注成本'
                ]
            },
            '特征工程困难': {
                '问题描述': '特征选择困难、维度灾难、特征相关性强',
                '解决方案': [
                    '结合领域知识进行特征设计',
                    '使用自动特征选择算法',
                    '应用降维技术如PCA、t-SNE',
                    '尝试深度学习自动特征提取'
                ]
            },
            '模型性能不佳': {
                '问题描述': '准确率低、过拟合严重、泛化能力差',
                '解决方案': [
                    '增加训练数据量',
                    '调整模型复杂度',
                    '使用正则化技术',
                    '尝试集成学习方法'
                ]
            },
            '计算资源限制': {
                '问题描述': '训练时间长、内存不足、推理速度慢',
                '解决方案': [
                    '使用分布式训练',
                    '模型压缩和知识蒸馏',
                    '特征降维和选择',
                    '硬件加速和优化'
                ]
            }
        }
        
        for problem, details in pitfalls.items():
            print(f"\n🚨 {problem}")
            print(f"  问题: {details['问题描述']}")
            print(f"  解决方案:")
            for solution in details['解决方案']:
                print(f"    💡 {solution}")
                
    def performance_optimization_tips(self):
        """性能优化技巧"""
        print(f"\n🚀 性能优化技巧")
        print("=" * 60)
        
        optimization_tips = {
            '数据层面': [
                '数据预处理并行化',
                '使用内存映射文件',
                '批量数据加载',
                '数据格式优化(如HDF5)'
            ],
            '特征层面': [
                '特征缓存机制',
                '在线特征计算',
                '特征哈希技术',
                '稀疏特征表示'
            ],
            '模型层面': [
                '模型量化压缩',
                '网络剪枝技术',
                '知识蒸馏方法',
                '动态计算图优化'
            ],
            '系统层面': [
                'GPU/TPU加速',
                '模型并行推理',
                '缓存策略优化',
                '负载均衡设计'
            ]
        }
        
        for category, tips in optimization_tips.items():
            print(f"\n📈 {category}")
            for tip in tips:
                print(f"  ⚡ {tip}")

# 展示最佳实践
best_practices = NLPProjectBestPractices()
best_practices.show_development_workflow()
best_practices.common_pitfalls_and_solutions()
best_practices.performance_optimization_tips()

5.3 未来发展趋势与展望

🔮 技术发展趋势
def show_future_trends():
    """展示NLP未来发展趋势"""
    print("\n🔮 NLP技术发展趋势与展望")
    print("=" * 60)
    
    trends = {
        '预训练模型时代': {
            '现状': '从Word2Vec到BERT、GPT系列的演进',
            '趋势': '更大规模、更多模态、更强泛化能力',
            '技术点': ['Transformer架构', '自监督学习', '多任务学习', '零样本学习'],
            '影响': '降低了NLP任务的门槛,提升了效果上限'
        },
        '多模态融合': {
            '现状': '文本与图像、语音的简单结合',
            '趋势': '深度多模态理解和生成',
            '技术点': ['视觉-语言模型', '语音-文本对齐', '跨模态检索', '多模态对话'],
            '影响': '实现更自然的人机交互体验'
        },
        '低资源语言处理': {
            '现状': '主要集中在英语等高资源语言',
            '趋势': '跨语言迁移学习和零样本学习',
            '技术点': ['跨语言词嵌入', '多语言预训练', '无监督机器翻译', '代码混合处理'],
            '影响': '促进语言多样性保护和全球化应用'
        },
        '可解释性AI': {
            '现状': '黑盒模型难以解释决策过程',
            '趋势': '可解释性成为重要考量因素',
            '技术点': ['注意力可视化', '梯度分析', '概念激活向量', '对抗样本分析'],
            '影响': '增强用户信任,满足法规要求'
        },
        '效率与绿色AI': {
            '现状': '大模型训练和推理成本高昂',
            '趋势': '模型轻量化和绿色计算',
            '技术点': ['模型压缩', '神经架构搜索', '边缘计算', '联邦学习'],
            '影响': '降低计算成本,减少环境影响'
        }
    }
    
    for trend_name, details in trends.items():
        print(f"\n🌟 {trend_name}")
        print(f"  现状: {details['现状']}")
        print(f"  趋势: {details['趋势']}")
        print(f"  关键技术: {', '.join(details['技术点'])}")
        print(f"  预期影响: {details['影响']}")
    
    # 可视化发展时间线
    plt.figure(figsize=(14, 8))
    
    # 时间线数据
    years = [2013, 2014, 2015, 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024]
    milestones = [
        'Word2Vec', 'GloVe', 'FastText', 'Transformer', 'BERT', 'GPT-2', 
        'GPT-3', 'CLIP', 'ChatGPT', 'GPT-4', 'Multimodal LLMs'
    ]
    
    # 绘制时间线
    plt.figure(figsize=(15, 6))
    plt.plot(years, range(len(years)), 'o-', linewidth=3, markersize=8)
    
    for i, (year, milestone) in enumerate(zip(years, milestones)):
        plt.annotate(milestone, (year, i), xytext=(10, 0), 
                    textcoords='offset points', ha='left', va='center',
                    bbox=dict(boxstyle='round,pad=0.3', facecolor='lightblue', alpha=0.7))
    
    plt.xlabel('年份', fontsize=14)
    plt.ylabel('技术发展阶段', fontsize=14)
    plt.title('NLP技术发展时间线', fontsize=16, fontweight='bold')
    plt.grid(True, alpha=0.3)
    plt.yticks(range(len(years)), [f'阶段{i+1}' for i in range(len(years))])
    plt.tight_layout()
    plt.show()

show_future_trends()

def learning_roadmap():
    """学习路线图建议"""
    print(f"\n📚 NLP学习路线图建议")
    print("=" * 60)
    
    roadmap = {
        '基础阶段 (1-2个月)': [
            '掌握Python编程基础',
            '学习线性代数和概率统计',
            '了解机器学习基本概念',
            '熟悉正则表达式和文本处理'
        ],
        '入门阶段 (2-3个月)': [
            '学习文本预处理技术',
            '掌握词袋模型和TF-IDF',
            '理解词嵌入基本概念',
            '实践文本分类项目'
        ],
        '进阶阶段 (3-4个月)': [
            '深入学习Word2Vec、GloVe、FastText',
            '掌握序列模型(RNN、LSTM)',
            '学习注意力机制',
            '实践情感分析和命名实体识别'
        ],
        '高级阶段 (4-6个月)': [
            '学习Transformer架构',
            '掌握BERT、GPT等预训练模型',
            '了解多任务学习和迁移学习',
            '实践问答系统和文本生成'
        ],
        '专家阶段 (持续学习)': [
            '跟进最新研究进展',
            '参与开源项目贡献',
            '发表学术论文或技术博客',
            '指导他人学习和实践'
        ]
    }
    
    for stage, activities in roadmap.items():
        print(f"\n🎯 {stage}")
        for activity in activities:
            print(f"  📖 {activity}")
    
    print(f"\n💡 学习建议:")
    suggestions = [
        '理论与实践相结合,每学一个概念就动手实现',
        '多读经典论文,理解算法的设计思路',
        '参与Kaggle等竞赛,积累实战经验',
        '建立个人项目库,展示学习成果',
        '加入技术社区,与同行交流学习',
        '关注工业界应用,了解实际需求'
    ]
    
    for suggestion in suggestions:
        print(f"  💡 {suggestion}")

learning_roadmap()

5.4 章节总结

通过本文的学习,我们完成了从NLP基础理论到实践应用的完整学习旅程:

🎯 核心收获
  1. 理论基础:深入理解了NLP的基本概念、文本预处理技术和语言模型原理
  2. 词嵌入技术:掌握了Word2Vec、GloVe、FastText三种主流词嵌入方法的原理和实现
  3. 实战技能:学会了如何构建完整的新闻分类系统,包括特征工程和模型优化
  4. 评估方法:熟悉了各种评估指标和性能优化策略
  5. 最佳实践:了解了项目开发流程和常见问题的解决方案
🚀 下一步方向
  1. 深度学习NLP:学习RNN、LSTM、Transformer等神经网络架构
  2. 预训练模型:掌握BERT、GPT等预训练模型的使用和微调
  3. 多模态学习:探索文本与图像、语音的融合处理
  4. 生成式AI:了解大语言模型和生成式AI的最新发展
📚 推荐资源
  • 经典教材:《统计自然语言处理》、《Speech and Language Processing》
  • 在线课程:CS224N (Stanford)、CS224U (Stanford)
  • 实践平台:Kaggle、天池、和鲸科技
  • 开源工具:NLTK、spaCy、Gensim、Transformers

🎉 结语

各位小伙伴们,恭喜你们完成了superior哥深度学习系列第十三篇的学习!从计算机视觉到自然语言处理,我们的AI学习之旅又迈出了重要一步。

文字是人类智慧的载体,让AI理解文字,就是让AI接触人类知识的宝库。虽然NLP领域还有很多挑战需要解决,但正如我们在词嵌入中看到的,简单的思想往往能产生深远的影响。

记住,每一行代码都是你与AI对话的方式,每一次调试都是你理解算法的过程。保持好奇心,持续学习,在AI的世界里,永远有新的惊喜等着你!

下一篇文章,我们将深入学习循环神经网络(RNN)和长短期记忆网络(LSTM),让AI真正理解序列和时间的概念。

期待与大家继续这段激动人心的AI学习之旅!💪


superior哥深度学习系列持续更新中…
关注我,获取最新AI技术分享! 🤖✨