一、简介
自定义分词库和同义词库是弥合 “标准通用语言” 与 **“特定领域语言”**之间语义鸿沟的关键桥梁。其根本目的是让计算机能像领域专家一样,更精准地理解和处理特定场景下的文本,从而提升搜索、推荐、分析等应用的智能化水平
1.1 分词库 (Custom Segmentation Dictionary)
核心目的:确保特定词语的完整性,避免被错误切分,保障语义的最小单元正确。
如果一个专有名词被错误地切分开,它所代表的完整含义就会丢失,后续的所有分析都将基于错误的基础。
具体应用场景与目的:
- 识别专有名词,保护领域知识完整性
- 企业/产品/品牌名:
- 例: “蓝鲸智云”、“织云平台”、“特斯拉Model Y”
- 目的: 如果不加干预,“蓝鲸智云”可能会被切成 蓝鲸 / 智云,失去了品牌专属性。在做舆情分析、竞品分析时,无法准确捕捉到对特定产品的讨论。
- 行业术语/私域黑话:
- 例: 电商行业的“动销率”、“客单价”;金融行业的“量化对冲”、“滚动收益”;游戏行业的“开黑”、“Gank”。
- 目的: 这些是行业内沟通的基础。错误切分(如 量化 / 对冲)会导致专业文档检索失败,也无法对行业报告、用户评论进行有效的文本挖掘。
- 人名/地名/组织机构名:
- 例: “张三丰”、“朝阳区群众”、“数据智能部”。
- 目的: 保证实体识别(NER)的准确性。例如,在分析客服聊天记录时,需要准确识别出是哪个部门或哪位同事被提及。
- 企业/产品/品牌名:
- 收录网络新词与热词,跟上时代语言变化
- 例: “元宇宙”、“打工人”、“YYDS”、“绝绝子”、“破防了”。
- 目的: 新词热词的出现速度远快于通用词库的更新速度。及时收录这些词,才能准确理解用户在社交媒体、评论区的情绪和讨论焦点,做好舆情监控和用户画像。
- 修正原生分词的常见错误
- 例: “上海市长江大桥”
- 错误切分: 上海 / 市长 / 江大桥
- 正确切分: 上海市 / 长江大桥
- 目的: 提升基础分词的准确率。很多有歧义的词组组合,可以通过自定义词典强制其正确切分,这是提升整体NLP任务效果的基础。
- 例: “上海市长江大桥”
1.2 同义词库 (Custom Synonym Dictionary)
核心目的:将不同说法但含义相同的词语关联起来,统一语义表达,扩大召回并简化分析。
用户在表达同一个意思时,会使用不同的词语。同义词库就是为了抹平这种表达差异,让机器知道“A”和“B”其实在说同一件事。
具体应用场景与目的:
- 提升搜索召回率与用户体验
- 例: 电脑 = 计算机 = PC;笔记本 = 手提电脑;土豆 = 马铃薯 = 洋芋。
- 目的: 当用户搜索“笔记本”时,系统也能将包含“手提电脑”的商品或文章返回给用户,避免因用词不同而搜不到结果,极大提升了搜索的全面性(召回率)。
- 统一公司内部术语与简称
- 例: 人工智能 = AI;北京大学 = 北大;用户增长部 = 增长部。
- 目的: 在知识库或内部系统中,员工可能使用简称进行搜索。通过同义词库,可以确保无论输入全称还是简称,都能定位到同一份文档或数据。
- 兼容用户输入错误与习惯
- 例: 罗技 = 罗计(错别字);帐号 = 账号(异形词)。
- 目的: 增强系统的鲁棒性,即使用户输入了常见的错别字或使用了不同的书写习惯,系统依然能正确理解其意图。
- 统一数据分析与统计口径
- 例: 在客服质检场景中,bug = 缺陷 = 问题 = 故障。
- 目的: 在进行工单分类、问题根因分析时,可以将这些同义词映射为同一个标签(如“产品缺陷”)。这样,在统计“产品缺陷”类问题的数量时,不会因为用词不同而产生遗漏,保证了数据报表的准确性。
1.3 前置知识
1.3.1 基于词典的分词方法
这是最主流和有效的分词范式。其核心思想是:将一个句子与一个足够大的词典进行匹配,切分出所有在词典中存在的词语。因此,词典的质量和覆盖度直接决定了分词效果的上限。
1.3.2 Trie树
- 什么是Trie树? 它是一种树形结构,每个节点代表一个字符,从根节点到某个节点的路径就构成一个词。
- 为什么选择Trie树?
- 极速前缀查询: 它能以 O(L) 的时间复杂度(L为待查字符串长度)判断一个字符串是否存在于词典中或是否为某个词的前缀。这是实现高效分词算法(如最大匹配法)的基础。
- 空间高效: 共享公共前缀的词语可以节省存储空间。
1.3.3 词典的构建与管理
词典文件通常采用简单的文本格式,每一行代表一个词条,可包含以下信息:
词语 词频 词性 (以空格或制表符分隔)
# dict.txt 示例
蓝鲸智云 1000 a_brand
智能运维 800 n
解决方案 1200 n
- 加载与缓存: 首次启动时,程序会读取文本词典来构建Trie树。这个过程可能耗时较长。因此,一个关键的优化是:在首次构建后,将内存中的Trie树 序列化 为一个二进制文件(例如 .trie 缓存)。后续启动时,直接从该二进制文件加载,速度会提升几个数量级。
- 词频对数化: 直接使用原始词频可能导致高频词与低频词权重差距过大。通常会对其进行 对数平滑 处理,使其分布更均匀,便于后续的评分计算。
二、分词
一个完整的分词流程包含 预处理、核心切分、歧义解决 等步骤。
2.1 文本预处理:标准化的第一步
为了消除格式差异,在分词前必须对输入文本进行标准化:
- 大小写转换: 将所有英文字母统一转为小写。
- 全角/半角转换: 将全角字符(如 ,、A)转换为半角(如 ,、A)。
- 繁简转换: 将繁体中文统一转换为简体。
2.2 基础算法:双向最大匹配法 (Bi-directional Maximum Matching)
这是基于词典的最经典的分词算法。
正向最大匹配 (Forward Maximum Matching, FMM): 从左到右扫描句子,每次都试图匹配出当前位置开始的最长词语。
反向最大匹配 (Backward Maximum Matching, BMM): 从右到左扫描句子,每次都试图匹配出当前位置结束的最长词语。
示例: 待分词句为 “北京大学城”,词典中有 “北京”、“大学”、“北京大学”、“大学城”。FMM 结果: 北京大学 / 城
BMM 结果: 北京 / 大学城
当 FMM 和 BMM 的结果不一致时,就产生了 分词歧义。
3. 解决分词歧义:全路径搜索与评分机制
这是分词器最核心、最能体现其“智能”的部分。当检测到歧义时,我们需要一套更复杂的机制来决定哪种切分方式最优。
- 步骤一:识别歧义片段
通过比较双向最大匹配的结果,可以定位出产生分歧的文本片段(如上例中的 “北京大学城”)。 - 步骤二:搜索所有可能的分词路径
针对该歧义片段,使用 深度优先搜索(DFS)或类似算法,并结合Trie树,找出所有可能的分词组合。- 对于 “北京大学城”,可能的路径有:
- [北京, 大学, 城]
- [北京大学, 城]
- [北京, 大学城]
- 对于 “北京大学城”,可能的路径有:
- 步骤三:设计评分函数,选择最佳路径
为每条分词路径计算一个综合得分,选择得分最高的作为最终结果。一个好的评分函数通常会考虑以下因素:- 平均词频: 由更常见(词频更高)的词组成的路径得分更高。
- 词语长度: 倾向于由更长的词组成的路径(例如,选择 北京大学 而不是 北京 和 大学)。这被称为“最少分词数”原则。
- 词语数量惩罚: 对切分得过于零碎的路径进行适当降权,以避免无意义的短词组合。
通过这套机制,分词器能够像人一样,基于统计信息和语言学规则,在多种可能性中做出最合理的选择。
三、同义词
3.1 核心价值:同义词为何如此重要?
在搜索引擎或问答系统中,同义词的核心价值在于弥合用户表达与文档表达之间的“词汇鸿沟”。用户的自然语言表达往往存在多样性,而数据源(如文档、文章、知识库)中的措辞可能与用户用词存在差异,常见的原因包括:
- 同义异形:用户搜索“电脑”,文档中可能使用“计算机”。
- 中英文混杂:用户输入“人工智能”,文档使用“AI”。
- 俗称与学名:用户查询“土豆”,文档为“马铃薯”。
- 概念别称:一个产品或概念存在多个说法,如“自动驾驶”与“无人驾驶”。
- 缩写与全称:如“PM2.5”与“可吸入颗粒物”。
如果不引入同义词扩展,系统仅基于字面匹配,极易导致召回率(Recall)不足,无法覆盖潜在相关的结果。同义词体系可以显著提升检索效果,扩大召回范围的同时兼顾精准度。
3.2 同义词应用策略
为了充分发挥同义词的价值,需要采用了一套分层、加权的策略。总体分成以下三个阶段:
3.2.1 意图切分——从用户问题到核心概念
接收到用户原始问题后,首先进行意图切分,提取最具价值的核心概念。
- 文本预处理
- 中英文分隔:中英文之间添加空格,便于后续 token 处理。
- 标点清理:对特殊字符和标点进行统一处理,如
[ :|\r\n\t,,。??/
!!&^%%(){}<>]+` 统统替换为空格。 - 弱意图词过滤:移除常见疑问修饰词,例如“请问、什么样的、哪家、how、what、do、please”等,以聚焦核心主题。
- 核心概念提取
- 基于空白字符切分词语。
- 对连续英文(非专有名词)进行组合。例如:
用户输入:“我想了解 iPhone 15 Pro 的信息”
核心概念提取结果:“iPhone 15 Pro”
* **推荐权重**:核心概念本身权重设为 **5**,在整体计算时保持核心主导地位。
3.2.2 分层查找—— 区分主次,精细召回
对核心概念执行分层查找,根据同义词的重要性区分主次:
- 第一层:概念级同义词
- 定义:对完整核心概念(如
"iPhone 15 Pro"
)寻找整体替代说法。 - 查找方式:lookup(“iPhone 15 Pro”) → 返回 [“苹果15 Pro”, “苹果15Pro”]。
- 应用目的:整体替换,最大程度捕获用户原始意图的等价表述。
- 推荐权重:0.7
- 定义:对完整核心概念(如
- 第二层:关键词级同义词
- 定义:将核心概念拆分成关键词(
weights
方法内部完成),对每个词寻找同义词。 - 查找方式:
- 定义:将核心概念拆分成关键词(
lookup("iPhone") → ["苹果手机"]
lookup("Pro") → ["专业版"]
- 应用目的:在粒度更细的层面提供召回补充,对文档中存在部分概念匹配时提升召回率。
- 推荐权重:0.2
3.2.3 多维度加权 —— 构建最终召回得分
在最终召回得分计算中,除了同义词层次的基础权重(核心概念、概念同义词、关键词同义词)外,还引入以下补充权重:
- NER(命名实体识别)权重
- 作用:不同实体类别对召回重要性有不同影响,NER 体系赋予差异化加权。
- 词性(POSTag)权重
- 动词、副词、代词等对召回效果贡献较低,设置权重衰减(如
r
,c
,d
→ 0.3)。 - 实词如名词、地名保持较高权重(如
ns
,nt
,n
→ 2-3)。
- 动词、副词、代词等对召回效果贡献较低,设置权重衰减(如
- 词频/逆文档频率(TF/IDF)加权
- 高频词影响力下调,低频但重要词上调。
3.3 综合得分计算公式(示例)
一个词的最终召回得分可简单描述为:
Final_Score = Base_Weight * NER_Weight * POSTag_Weight * IDF_Score
其中:
- Base_Weight 来自不同层次(核心词 5,概念级 0.7,关键词级 0.2)
- NER_Weight, POSTag_Weight 来自词性表和实体类别表
- IDF_Score 在函数中动态计算
四、附录
4.1 需要管理的文件
- 分词库
- huqie.txt (线下维护,文件形式存在代码库中)
- 公司级别 huqie-qifu (线上维护,超管可修改)
- 用户自定义分词库,整个系统共用一份(空间用户可修改)
- ner (命名实体)
- ragflow内置的ner.json (线下维护,文件形式存在代码库中)
- 用户自定义命名实体,整个系统共用一份(空间用户可修改)
- synon (同义词)
- ragflow内置的同义词 (丢弃)
- 用户自定义同义词(空间用户可修改)
- term.freq(术语 频率 )(感觉可以删除掉,跟分词表的词频有点重复)
- 用户自定义术语(空间用户可修改)
五、附录
4.1 分词表格式
ragflow分词表目前用的是huqie,下载地址:https://huggingface.co/InfiniFlow/huqie/tree/main
词频越高,说明这个词越普通,得分越低。
文件格式:分词 词频 词性
金童云商 3 nr
青禾服装 3 nr
救济灾民 3 l
左移 17 nr
低速 176 d
雨果网 3 nr
钢小二 3 nr
词性表:
类型 | 词性 | 权重 | 备注 |
---|---|---|---|
功能词 | r | 0.3 | 代词(pronoun)如:我、你、他 |
c | 连词(conjunction)如:和、但是、而且 | ||
d | 副词(adverb)如:很、非常、不 | ||
名词 | n | 2 | 普通名词(general noun)如:电脑、机器、算法 |
nt | 3 | 机构名(noun of organization) | |
ns | 地名(noun of location) | ||
数字 | [0-9-]+ | 2 | 数字形式(或者编号),视为有信息量的实体。 |
其它 | 1 | 动词、形容词或者其它未知词性 |
词频对应idf得分分布
这里f值范围是1-1000000。第一列f为词库中的原始词频的最小值,实际区间为 [f, next_f)
f(原始词频) | F(词频离散值/分桶) | df(分桶后的词频) | idf(逆文档频率分数) |
---|---|---|---|
1 | -13 | 2 | 6.6021 |
2 | -12 | 6 | 6.1871 |
4 | -11 | 17 | 5.757 |
11 | -10 | 45 | 5.342 |
28 | -9 | 123 | 4.9084 |
75 | -8 | 335 | 4.4744 |
204 | -7 | 912 | 4.0401 |
554 | -6 | 2479 | 3.6066 |
1504 | -5 | 6738 | 3.1741 |
4087 | -4 | 18316 | 2.7443 |
11109 | -3 | 49787 | 2.3219 |
30198 | -2 | 135335 | 1.9185 |
82085 | -1 | 367879 | 1.5585 |
223131 | 0 | 1000000 | 1.2788 |
4.2 命名实体识别格式(NER)
Named Entity Recognition,也可以称为专有名词。以下为ragflow中定义的格式。
文件格式:
{
"上官": "firstnm",
"山子股份": "stock",
"傻逼": "toxic"
}
实体类型:
类型 | 权重 | 含义 |
---|---|---|
toxic | 2 | 敏感/风险词 |
func | 1 | 功能词 / 语法词 如“的”、“是”、“怎么” |
corp | 3 | 公司或组织名 |
loca | 3 | 地点/地名 |
sch | 3 | 学校或教育机构 |
stock | 3 | 股票或证券名称 |
firstnm | 1 | 人名首字母或常见姓名 |
4.3 同义词格式
同义词当前为JSON格式存储,key为用户输入问题中包含的词,值为该词的同义词表
{
"骑士乳业": "832786",
"832786": "骑士乳业",
"官微": ["公众号", "官方号", "官方微信"]
}
4.4 IDF 公式(标准形式)
IDF(Inverse Document Frequency,逆文档频率)是衡量一个词在整个语料库中“稀有程度”的指标,核心思想是:出现频率越低的词越有区分度,应该赋予更高的权重。
IDF(w) = log ( N / (1 + DF(w)) )
N:语料库中的总文档数
DF(w):包含词 w 的文档数(Document Frequency)
1 + DF(w):+1 是平滑处理,防止除以零
简化解释:出现得多(常见词,如“的”,“是”),DF 大 → IDF 小
出现得少(专业词,如“iPhone 15 Pro”,“ChatGPT”),DF 小 → IDF 大
ES6.x之前默认用的是TF-IDF,7.x及之后默认使用了BM25算法,核心思想仍然包含IDF概念,会额外关注词频(某个 term t 在文档 d 中出现的次数)和当前文档d的长度、索引中所有文档的平均长度的影响。
RAGFlow的代码实现中,IDF公式修改成了:
- idf1(权重0.3):math.log10(10 + ((N - freq(t) + 0.5) / (freq(t) + 0.5)))
- t为query分词后的token,N固定为10000000
- freq(t):
- 如果t为2位及以上的数字,则为3
- 如果是词库的词,则计算得分:int(math.exp(F) * 1000000 + 0.5),这是离散化设计的精度损失,本质上是对词频做了分 bin,词频只要落在同一个 log 区间,最终 score 就一样。
- 如果t是字符.空格,则是300
- 对t继续分词,取最小得分/6,最少为30。
- idf2(权重0.7):math.log10(10 + ((N - DF(t) + 0.5) / (DF(t) + 0.5)))
- t为query分词后的token,N固定为1000000000
- df(t):
- [0-9. -]{2,},则为5
- 如果在term.freq(rag/res/)文件中词,取它的词频 + 3
- [a-z. -]+,则为300
- 继续分词,取min(df(sub_t)) / 6,最小值为3
- 其它情况返回3
4.5 分词实现
"""
pip install datrie nltk hanziconv -i https://mirrors.aliyun.com/pypi/simple
Windows平台还需要去安装一个wheel:https://github.com/liuzhenghua/datrie/releases/tag/v202503170859
"""
import logging
import copy
import datrie
import math
import os
import re
import string
from collections import namedtuple
from hanziconv import HanziConv
from nltk import word_tokenize
from nltk.stem import PorterStemmer, WordNetLemmatizer
# 配置日志记录
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] [%(levelname)s] - %(message)s')
# 定义词典条目结构,方便访问
DictEntry = namedtuple('DictEntry', ['word', 'frequency', 'pos_tag'])
# 定义分词路径中的一个节点
TokenNode = namedtuple('TokenNode', ['token', 'score', 'pos_tag'])
def is_chinese(char):
"""判断一个字符是否为中文字符"""
return '\u4e00' <= char <= '\u9fa5'
class CustomTokenizer:
"""
一个高性能、可定制的中英混合文本分词器。
该分词器使用Trie树存储词典,并通过结合多种分词策略来解决歧义,
特别适合需要处理领域专有词汇的场景。
"""
# 评分模型中的常数,用于惩罚切分过碎的结果
PATH_LENGTH_BIAS = 30
# 词频计算的基数分母,用于对数平滑
FREQUENCY_DENOMINATOR = 1000000
# 用于切分中英文/数字混合文本的正则表达式
SPLIT_PATTERN = re.compile(r"([a-zA-Z0-9\._+-]+)")
def __init__(self, dict_path: str, debug=False):
"""
初始化分词器。
Args:
dict_path (str): 主词典文件的路径 (例如 'data/dict.txt')。
"""
self.dict_path = dict_path
self.trie = None
self.DEBUG = debug
self.SPLIT_CHAR = r"([ ,\.<>/?;:'\[\]\\`!@#$%^&*\(\)\{\}\|_+=《》,。?、;‘’:“”【】~!¥%……()——-]+|[a-zA-Z0-9,\.-]+)"
self.stemmer = PorterStemmer()
self.lemmatizer = WordNetLemmatizer()
self._initialize()
def _initialize(self):
"""
核心初始化逻辑:加载词典并构建Trie树。
优先从缓存加载,若缓存不存在或加载失败,则从原始文本文件构建。
"""
triecache_path = self.dict_path + ".trie"
# 尝试从缓存文件加载Trie树
if os.path.exists(triecache_path):
try:
logging.info(f"正在从缓存文件加载Trie树: {triecache_path}")
self.trie = datrie.Trie.load(triecache_path)
logging.info("Trie树加载成功。")
return
except Exception as e:
logging.warning(f"从缓存加载Trie树失败: {e}。将从原始词典重新构建。")
# 如果缓存加载失败或不存在,则从头构建
logging.info("未找到Trie缓存或加载失败,开始从原始词典文件构建...")
self.trie = datrie.Trie(string.printable)
self._load_dictionary(self.dict_path)
def _get_key(self, word: str) -> str:
"""将词语转换为Trie树中存储的键(小写UTF-8),encode再转str是为了兼容中文字符,b''需要去除。"""
return str(word.lower().encode("utf-8"))[2:-1]
def _get_rkey(self, word: str) -> str:
"""存入反向key"""
return str(("DD" + word[::-1].lower()).encode("utf-8"))[2:-1]
def _load_dictionary(self, file_path: str):
"""
从文本文件加载词典,构建或更新Trie树,并创建缓存。
"""
logging.info(f"开始从词典文件构建Trie树: {file_path}")
try:
with open(file_path, "r", encoding='utf-8') as f:
for line in f:
line = line.strip()
if not line:
continue
parts = re.split(r"[ \t]", line)
word, freq, pos_tag = parts[0], int(parts[1]), parts[2]
# 对词频进行对数平滑处理,避免悬殊过大
log_freq = int(math.log(max(1, freq) / self.FREQUENCY_DENOMINATOR) + 0.5)
key = self._get_key(word)
rkey = self._get_rkey(word)
# 仅在当前词不存在或新词频率更高时更新
if key not in self.trie or self.trie[key][0] < log_freq:
self.trie[key] = (log_freq, pos_tag)
self.trie[rkey] = 1
# 构建完成后,保存Trie树到缓存文件
triecache_path = file_path + ".trie"
logging.info(f"Trie树构建完成,正在保存缓存至: {triecache_path}")
self.trie.save(triecache_path)
except Exception as e:
logging.error(f"构建Trie树失败: {file_path}", exc_info=e)
def add_dictionary(self, file_path: str):
"""
动态添加用户自定义词典。新词会补充到现有Trie树中。
"""
self._load_dictionary(file_path)
def _preprocess(self, text: str) -> str:
"""对输入文本进行一系列标准化预处理。"""
text = re.sub(r"\W+", " ", text) # 非字母数字替换为空格
# 1. 全角转半角
r_string = ""
for char in text:
inside_code = ord(char)
if inside_code == 0x3000:
inside_code = 0x0020
else:
inside_code -= 0xfee0
if not (0x0020 <= inside_code <= 0x7e):
r_string += char
else:
r_string += chr(inside_code)
text = r_string
# 2. 转换为小写
text = text.lower()
# 3. 繁简转换
text = HanziConv.toSimplified(text)
return text
def _score_path(self, path: list[TokenNode]) -> tuple[list[str], float]:
"""
为一条分词路径计算综合得分。
得分越高,代表该分词路径越优。
评分策略:
1. 平均词频得分 (avg_freq_score): 路径中所有词的平均(对数)词频。
2. 词长分布得分 (long_word_ratio): 路径中长词(长度>1)的比例,鼓励长词。
3. 路径长度惩罚 (length_penalty): 对切分过于零碎(词数太多)的路径进行惩罚。
"""
if not path:
return [], -1.0
num_tokens = len(path)
total_freq_score = sum(node.score for node in path)
num_long_words = sum(1 for node in path if len(node.token) > 1)
avg_freq_score = total_freq_score / num_tokens
long_word_ratio = num_long_words / num_tokens
length_penalty = self.PATH_LENGTH_BIAS / num_tokens
# 最终得分为三者加权(此处为简单相加)
final_score = avg_freq_score + long_word_ratio + length_penalty
tokens = [node.token for node in path]
logging.debug(f"路径: {tokens}, 最终得分: {final_score:.2f} (AvgFreq: {avg_freq_score:.2f}, "
f"LongWordRatio: {long_word_ratio:.2f}, Penalty: {length_penalty:.2f})")
return tokens, final_score
def score_(self, token_freq_tags: list[tuple[str, tuple[int, str]]]) -> tuple[list[str], float]:
"""
计算分词得分,返回分词列表和得分。
得分公式:score = B / num_tokens + long_token_ratio + total_frequency
"""
base_score = 30
total_frequency = 0
long_token_count = 0
tokens = []
for token, (freq, tag) in token_freq_tags:
total_frequency += freq
if len(token) >= 2:
long_token_count += 1
tokens.append(token)
num_tokens = len(tokens)
long_token_ratio = long_token_count / num_tokens if num_tokens > 0 else 0
score = (base_score / num_tokens) + long_token_ratio + total_frequency if num_tokens > 0 else 0
logging.debug(
f"[SC] tokens={tokens}, count={num_tokens}, long_ratio={long_token_ratio:.4f}, freq_sum={total_frequency}, score={score:.4f}")
return tokens, score
def max_forward(self, text):
"""
使用最大正向匹配算法对 text 分词,返回最佳切分及对应得分。
"""
segments = []
start = 0
while start < len(text):
end = start + 1
current_substr = text[start:end]
# 尽可能向右扩展子串,直到不再匹配 trie 前缀
while end < len(text) and self.trie.has_keys_with_prefix(self._get_key(current_substr)):
end += 1
current_substr = text[start:end]
# 回退:如果最后一个子串不在词典里,逐步缩短
while end - 1 > start and self._get_key(current_substr) not in self.trie:
end -= 1
current_substr = text[start:end]
key = self._get_key(current_substr)
if key in self.trie:
score_info = self.trie[key]
else:
score_info = (0, '') # 默认分数为 0,标签为空
segments.append((current_substr, score_info))
start = end # 移动起始指针
# 根据 segments 计算最终得分和分词序列
return self.score_(segments)
def _max_backward(self, text: str) -> tuple[list[str], float]:
"""
使用最大向后匹配进行分词,返回分词结果及其得分。
"""
token_freq_list = []
start_idx = len(text) - 1
while start_idx >= 0:
end_idx = start_idx + 1
substring = text[start_idx:end_idx]
# 尝试向前扩展匹配
while start_idx > 0 and self.trie.has_keys_with_prefix(self._get_rkey(substring)):
start_idx -= 1
substring = text[start_idx:end_idx]
# 如果无法匹配完整词,逐步缩短直到找到匹配
while start_idx + 1 < end_idx and self._get_key(substring) not in self.trie:
start_idx += 1
substring = text[start_idx:end_idx]
# 加入匹配词或单字
if self._get_key(substring) in self.trie:
freq_tag = self.trie[self._get_key(substring)]
else:
freq_tag = (0, '')
token_freq_list.append((substring, freq_tag))
# 移动到下一个分词位置
start_idx -= 1
# 因为是从后往前扫描,最终结果需要反转
tokens, score = self.score_(token_freq_list[::-1])
return tokens, score
def _split_by_lang(self, line):
"""
将输入文本按语言(中文/非中文)分段。
返回一个列表,每个元素是 (文本片段, 是否中文) 的元组。
"""
result = []
blocks = re.split(self.SPLIT_CHAR, line)
for block in blocks:
if not block:
continue
start = 0
end = 1
current_is_chinese = is_chinese(block[start])
while end < len(block):
next_is_chinese = is_chinese(block[end])
if next_is_chinese == current_is_chinese:
end += 1
else:
result.append((block[start:end], current_is_chinese))
start = end
end = start + 1
current_is_chinese = next_is_chinese
# 处理最后一段
if start < len(block):
result.append((block[start:end], current_is_chinese))
return result
def _dfs(self, text, start_idx, current_tokens, all_tokenizations, depth=0, memo_cache=None):
if memo_cache is None:
memo_cache = {}
MAX_DEPTH = 10
if depth > MAX_DEPTH:
if start_idx < len(text):
remaining = "".join(text[start_idx:])
all_tokenizations.append(current_tokens + [(remaining, (-12, ''))])
return start_idx
state_key = (start_idx, len(current_tokens))
if state_key in memo_cache:
return memo_cache[state_key]
if start_idx >= len(text):
all_tokenizations.append(current_tokens)
memo_cache[state_key] = start_idx
return start_idx
max_reach = start_idx
# 检查是否有重复字符块
if start_idx < len(text) - 4:
char = text[start_idx]
if all(text[start_idx + i] == char for i in range(5)):
end_idx = start_idx
while end_idx < len(text) and text[end_idx] == char:
end_idx += 1
mid_idx = start_idx + min(10, end_idx - start_idx)
token = "".join(text[start_idx:mid_idx])
token_key = self._get_key(token)
token_info = self.trie.get(token_key, (-12, ''))
max_reach = max(max_reach, self._dfs(
text, mid_idx, current_tokens + [(token, token_info)],
all_tokenizations, depth + 1, memo_cache
))
memo_cache[state_key] = max_reach
return max_reach
# 启发式:是否可以跳一个字符
next_start = start_idx + 1
if start_idx + 2 <= len(text):
t1 = "".join(text[start_idx:start_idx + 1])
t2 = "".join(text[start_idx:start_idx + 2])
if self.trie.has_keys_with_prefix(self._get_key(t1)) and not self.trie.has_keys_with_prefix(self._get_key(t2)):
next_start = start_idx + 2
# 连续单字合并启发式
if len(current_tokens) >= 3 and all(len(tok[0]) == 1 for tok in current_tokens[-3:]):
t_merge = current_tokens[-1][0] + "".join(text[start_idx:start_idx + 1])
if self.trie.has_keys_with_prefix(self._get_key(t_merge)):
next_start = start_idx + 2
# 正常匹配
for end_idx in range(next_start, len(text) + 1):
token = "".join(text[start_idx:end_idx])
token_key = self._get_key(token)
if end_idx > start_idx + 1 and not self.trie.has_keys_with_prefix(token_key):
break
if token_key in self.trie:
token_info = self.trie[token_key]
max_reach = max(max_reach, self._dfs(
text, end_idx, current_tokens + [(token, token_info)],
all_tokenizations, depth + 1, memo_cache
))
if max_reach > start_idx:
memo_cache[state_key] = max_reach
return max_reach
# fallback单字
single_char = "".join(text[start_idx:start_idx + 1])
token_info = self.trie.get(self._get_key(single_char), (-12, ''))
result = self._dfs(text, start_idx + 1, current_tokens + [(single_char, token_info)],
all_tokenizations, depth + 1, memo_cache)
memo_cache[state_key] = result
return result
def _sort_tokens(self, tokenizations):
"""
对多个分词方案进行打分排序,得分高的优先返回。
"""
scored_results = []
for token_seq in tokenizations:
tokens, score = self.score_(token_seq)
scored_results.append((tokens, score))
# 按照得分从高到低排序
return sorted(scored_results, key=lambda x: x[1], reverse=True)
def freq(self, tk):
k = self._get_key(tk)
if k not in self.trie:
return 0
return int(math.exp(self.trie[k][0]) * self.FREQUENCY_DENOMINATOR + 0.5)
def _merge(self, token_str):
"""
合并分词结果:如果连续token拼接后是高频词,则优先合并。
"""
merged_tokens = []
tokens = re.sub(r"\s+", " ", token_str).strip().split()
start_idx = 0
while start_idx < len(tokens):
end_idx = start_idx + 1
# 尝试扩展窗口到 [start_idx:end_idx)
for i in range(start_idx + 2, min(len(tokens) + 1, start_idx + 5)):
candidate = "".join(tokens[start_idx:i])
if re.search(self.SPLIT_CHAR, candidate) and self.freq(candidate):
end_idx = i
merged_tokens.append("".join(tokens[start_idx:end_idx]))
start_idx = end_idx
# return " ".join(merged_tokens)
return merged_tokens
def tokenize(self, text: str) -> list[str]:
"""
对输入的文本进行分词,返回分词结果列表。
主要流程:
1. 预处理文本。
2. 按中英文/数字块进行切分。
3. 对中文块应用智能分词算法,对其他块进行简单处理。
4. 合并结果。
"""
if not text:
return []
processed_text = self._preprocess(text)
# 根据语言分割文本
lang_segments = self._split_by_lang(processed_text)
tokens = []
for segment, is_chinese in lang_segments:
if not is_chinese:
# 英文分词 + 词形还原 + stemming
english_tokens = word_tokenize(segment)
processed = [self.stemmer.stem(self.lemmatizer.lemmatize(tok)) for tok in english_tokens]
tokens.extend(processed)
continue
# 中文处理
if len(segment) < 2 or re.match(r"[a-z\.-]+$", segment) or re.match(r"[0-9\.-]+$", segment):
tokens.append(segment)
continue
# 中文用正向和反向分词
forward_tokens, forward_score = self.max_forward(segment)
backward_tokens, backward_score = self._max_backward(segment)
if self.DEBUG:
logging.debug("[FW] {} {}".format(forward_tokens, forward_score))
logging.debug("[BW] {} {}".format(backward_tokens, backward_score))
# 对 forward 和 backward 结果进行对齐
f_idx, b_idx, f_base, b_base = 0, 0, 0, 0
common_prefix_len = 0
while (f_idx + common_prefix_len < len(forward_tokens) and
b_idx + common_prefix_len < len(backward_tokens) and
forward_tokens[f_idx + common_prefix_len] == backward_tokens[b_idx + common_prefix_len]):
common_prefix_len += 1
if common_prefix_len > 0:
tokens.append(" ".join(forward_tokens[f_idx: f_idx + common_prefix_len]))
f_base = f_idx + common_prefix_len
b_base = b_idx + common_prefix_len
f_idx = f_base + 1
b_idx = b_base + 1
# 对剩余的部分进行对齐处理
while b_idx < len(backward_tokens) and f_idx < len(forward_tokens):
b_sub = "".join(backward_tokens[b_base:b_idx])
f_sub = "".join(forward_tokens[f_base:f_idx])
if b_sub != f_sub:
# 谁短谁继续扩展
if len(b_sub) > len(f_sub):
f_idx += 1
else:
b_idx += 1
continue
# 如果子串相等,但单词粒度不同
if backward_tokens[b_idx] != forward_tokens[f_idx]:
b_idx += 1
f_idx += 1
continue
# 对不同分词组合进行 DFS 搜索
dfs_candidates = []
self._dfs("".join(forward_tokens[f_base:f_idx]), 0, [], dfs_candidates)
best_sequence = self._sort_tokens(dfs_candidates)[0][0]
tokens.append(" ".join(best_sequence))
# 找相同的前缀部分
same_len = 1
while (b_idx + same_len < len(backward_tokens) and
f_idx + same_len < len(forward_tokens) and
backward_tokens[b_idx + same_len] == forward_tokens[f_idx + same_len]):
same_len += 1
tokens.append(" ".join(forward_tokens[f_idx: f_idx + same_len]))
b_base = b_idx + same_len
f_base = f_idx + same_len
b_idx = b_base + 1
f_idx = f_base + 1
# 收尾:剩余的尾部一致
if b_base < len(backward_tokens):
assert f_base < len(forward_tokens)
assert "".join(backward_tokens[b_base:]) == "".join(forward_tokens[f_base:])
dfs_candidates = []
self._dfs("".join(forward_tokens[f_base:]), 0, [], dfs_candidates)
best_sequence = self._sort_tokens(dfs_candidates)[0][0]
tokens.append(" ".join(best_sequence))
# 合并、去除多余空格
# return tokens
merged_tokens = " ".join(tokens)
# logging.debug("[TKS] {}".format(self.merge_(merged_tokens)))
return self._merge(merged_tokens)
#### A.2 使用示例
if __name__ == '__main__':
# --- 准备环境 ---
# 1. 创建一个临时目录和词典文件用于演示
data_dir = "temp_dict_data"
os.makedirs(data_dir, exist_ok=True)
dict_file_path = os.path.join(data_dir, "main_dict.txt")
# 2. 写入主词典内容
main_dict_content = """
北京 100000 n
大学 80000 n
北京大学 5000 n_org
大学城 3000 n_loc
智能 90000 adj
运维 60000 n
智能运维 6000 n_tech
解决方案 2000 n_prod
平台 70000 n
蓝鲸 100 n_brand
"""
with open(dict_file_path, "w", encoding="utf-8") as f:
f.write(main_dict_content.strip())
# --- 开始使用分词器 ---
print("=" * 20 + " 1. 初始化分词器 " + "=" * 20)
# 首次初始化会从txt文件构建,并生成.trie缓存
tokenizer = CustomTokenizer(dict_path=dict_file_path)
print("\n" + "=" * 20 + " 2. 测试分词(歧义句) " + "=" * 20)
ambiguous_text = "北京大学城"
tokens = tokenizer.tokenize(ambiguous_text)
print(f"输入: '{ambiguous_text}'")
print(f"分词结果: {tokens}") # 期望: ['北京', '大学城'] 或 ['北京大学', '城'] 取决于词频和评分模型
print("\n" + "=" * 20 + " 3. 测试中英混合文本 " + "=" * 20)
mixed_text = "我司的蓝鲸智云平台是领先的AIOps智能运维解决方案"
tokens = tokenizer.tokenize(mixed_text)
print(f"输入: '{mixed_text}'")
print(f"分词结果: {tokens}") # 期望: ['我司的', '蓝鲸', '智云', '平台', '是', '领先', '的', 'aiops', '智能运维', '解决方案']
# --- 动态添加用户词典 ---
print("\n" + "=" * 20 + " 4. 动态添加用户词典 " + "=" * 20)
user_dict_path = os.path.join(data_dir, "user_dict.txt")
user_dict_content = """
蓝鲸智云 9999 n_brand
AIOps 9999 n_tech
我司 100 phrase
"""
with open(user_dict_path, "w", encoding="utf-8") as f:
f.write(user_dict_content.strip())
tokenizer.add_dictionary(user_dict_path)
print("用户词典已添加。")
print("\n" + "=" * 20 + " 5. 再次测试中英混合文本 " + "=" * 20)
tokens_after_add = tokenizer.tokenize(mixed_text)
print(f"输入: '{mixed_text}'")
print(
f"分词结果 (添加词典后): {tokens_after_add}") # 期望: ['我司的', '蓝鲸智云', '平台', '是', '领先', '的', 'aiops', '智能运维', '解决方案']
# --- 清理临时文件 ---
import shutil
shutil.rmtree(data_dir)
print(f"\n演示完成,已清理临时目录: {data_dir}")