助记提要
- 基本搜索原理 反向索引;
- 使用Redis构建索引的方法;
- 交并补计算实现搜索功能;
- 转换查询语句;
- 实现复合排序;
- 处理非数值字符的排序;
- 广告价格的计算;
- 广告定向附加值的计算;
- 实现广告的定向操作;
- 记录用户行为并学习;
- 实现职位搜索功能;
7章 基于搜索的应用程序
- 如何搜索和过滤数据
7.1 使用Redis进行搜索
本机搜索文件的时候,被搜索的文档数量越多,搜索时间也会越长。
但是在Web页面上搜索内容的时候,即使文档的体积和数目都很大,也能很快地搜到。
基本搜索原理
为了让搜索比扫描文档更快,需要先建立反向索引。
反向索引:从被索引文档里提取出的一些词,创建表格来记录每个文章包含了哪些词。
Redis的集合和有序集合都很适合处理反向索引。
基于搜索的问题通常需要使用集合以及有序集合的交集、并集和差集操作。
建索引
给文档构建索引,需要先对文档包含的单词做处理。
从文档提取词的过程通常被称为语法分析和标记化,可以生成的一系列用于标识文档的标记(即单词)。
生成标记的规则依据语言的语法而不同。标记生成后,一般还需要移除其中的非用词。非用词是反复出现在文档中,但是没有太大信息量的单词。
说明 | 数据结构 | 名称 | 内容 |
---|---|---|---|
反向索引 | 集合 | ind:单词 | 该单词出现过的文章id |
文档信息 | 散列 | kb:doc文章id | 文章id、创建时间、更新时间、标题等 |
# 非用词集合,根据语言从互联网获取
STOP_WORDS = set()
# 示例使用的单词提取规则
WORDS_RE = re.compile("[a-z']{2,}")
def tokenize(content):
words = set()
# 找文档中所有单词
for match in WORDS_RE.finditer(content.lower()):
# 单词前后的单引号去掉
word = match.group().strip("'")
# 保留至少有两个字符的单词
if len(word) >= 2:
words.add(word)
return words - STOP_WORDS
def index_document(conn, docid, content):
# 标记化处理
words = tokenize(content)
pipeline = conn.pipeline(True)
for words in words:
# 创建反向索引
pipeline.sadd('idx:' + word, docid)
retrun len(pipeline.execute())
如果文档是随时间变化的,就需要在index_document开始添加一段清除已有索引的代码,然后再重建索引。
搜索操作
可以使用集合命令完成不同搜索功能:
- SINTER和SINTERSTORE:同时包含所有给定词的文档;
- SUNION和SUNIONSTORE:包含给定词中的某个词的文档;
- SDIFF和SDIFFSTORE:包含一些词但不包含另一些词的文档;
def _set_common(conn, method, names, ttl=30, execute=True):
id = str(uuid.uuid4())
pipeline = conn.pipeline(True) if execute else conn
names = ['idx:' + name for name in names]
# 执行相应的操作
getattr(pipeline, method)('idx:' + id, *names)
# 保存操作结果的集合设置过期时间
pipeline.expire('idx:' + id, ttl)
if execute:
pipeline.execute()
# 结果集合的id返回
return id
def intersect(conn, items, ttl=30, _execute=True):
return _set_common(conn, 'sinterstore', items, ttl, _execute)
def union(conn, items, ttl=30, _execute=True):
return _set_common(conn, 'sunionstore', items, ttl, _execute)
def difference(conn, items, ttl=30, _execute=True):
return _set_common(conn, 'sdiffstore', items, ttl, _execute)
查询语句转换
将用户的文本查询语句转换为搜索操作。
对查询语句进行语法分析
# 简要语法规则示例
# 单词前面带减号(-)表示这个单词不包含
# 单词前面带加号(+)表示这个单词是前一个词的同义词
QUERY_RE = re.compile("[+-]?[a-z']{2,}")
def parse(query):
unwated = set() # 存储不想要的单词
all = [] # 需要执行交集计算的单词
current = set() # 存储同义词
for match in QUERY_RE.finditer(query.lower())
word = match.group()
# 处理加号或减号前缀
prefix = word[:1]
if prefix in '+-':
word = word[1:]
else:
prefix = None
word = word.strip("'")
if len(word) < 2 or word in STOP_WORDS:
continue
if prefix == '-':
unwated.add(word)
continue
# 同义词集合非空。出现新的没有加号的单词时,就为它建一个新的同义词集合
if current and not prefix:
all.append(list(current))
current = set()
current.add(word)
# 剩余的单词加到交集计算里面处理
if current:
all.append(list(current))
return all, list(unwated)
分析查询语句并搜索文档
def parse_and_search(conn, query, ttl=30):
all, unwanted = parse(query)
# 仅包含非用词,搜索不到结果
if not all:
return None
to_intersect = []
for syn in all:
# 对同义词执行并集计算
if len(syn) > 1:
to_intersect.append(union(conn, syn, ttl=ttl))
# 只有一个词,直接使用这个词
else:
to_intersect.append(syn[0])
# 并集计算的结果不止一个,就计算交集
if len(to_intersect) > 1:
intersect_result = intersect(conn, to_intersect, ttl=ttl)
else:
intersect_result = to_intersect[0]
# 从交集计算结果中去掉包含这些词的文档
if unwanted:
unwanted.insert(0, intersect_result)
return differenct(conn, unwanted, ttl=ttl)
return intersect_result
搜索结果排序
关联度计算:搜索到多个结果后,还需要根据每个文档的重要性对它们进行排序。
判断一个文章是否比另一个文章有更高关联度的一种方法就是看哪个文档的更新时间更新。
def search_and_sort(conn, query, id=None, ttl=300, sort="-updated", start=0, num=20):
# 是否逆序排
desc = sort.startswith('-')
sort = sort.lstrip('-')
by = "kb:doc:*->" + sort
alpha = sort not in ('updated', 'id', 'created')
# 如果给了已有的可用的搜索结果,就延长它的生存时间
if id and not conn.expire(id, ttl):
id = None
# 没给搜索结果或者结果已过期
if not id:
id = parse_and_search(conn, query, ttl=ttl)
pipeline = conn.pipeline(True)
pipeline.scard('idx:' + id)
# 排序。
pipeline.sort('idx:' + id, by=by, alpha=alpha, desc=desc, start=start, num=num)
results = pipeline.execute()
# 搜索结果的id也返回,方便后续取下一页
return results[0], results[1], id
7.2 有序索引
通过散列进行排序,适合处理排序依据可以用字符串或数字表示的情况,不能处理由多个分值组合而成的排列顺序。
复合排序
示例:需要同时基于文章的投票数和更新时间两个因素对文章进行排序
使用有序集合实现复合条件排序
def search_and_zsort(conn, query, id=None, ttl=300, update=1, vote=0, start=0, num=20, desc=True):
if id and not conn.expire(id, ttl):
id = None
if not id:
id = parse_and_search(conn, query, ttl=ttl)
# 调整投票数和更新时间在文章评分中的比重
scored_search = {
id: 0, # 搜索结果集合
'sort:update': update, # 文章更新时间有序集合
'sort:votes': vote # 文章票数有序集合
}
# 使用辅助函数
id = zintersect(conn, scored_search, ttl)
pipeline = conn.pipeline(True)
pipeline.zcard('idx:' + id)
if desc:
pipeline.zrevrange('idx:' + id, start, start + num - 1)
else:
pipeline.zrange('idx:' + id, start, start + num - 1)
results = pipeline.execute()
return results[0], results[1], id
计算交集、并集的辅助函数
def _zset_common(conn, method, scores, ttl=30, **kw):
id = str(uuid.uuid4())
# scores是字典,用来指定排序用到的参数
# 是否在这里使用事务流水线,默认状态是使用
execute = kw.pop('_execute', True)
pipeline = conn.pipeline(True) if execute else conn
# 对输入的键添加前缀
for key in scores.keys():
scores['idx:' + key] = scores.pop(key)
# 执行操作
getattr(pipeline, method)('idx:' + id, scores, **kw)
pipeline.expire('idx:' + id, ttl)
if execute:
pipeline.execute()
return id
def zintersect(conn, items, ttl=30, **kw):
return _zset_common(conn, 'zinterstore', dict(items), ttl, **kw)
def zunion(conn, items, ttl=30, **kw):
return _zset_common(conn, 'zunionstore', dict(items), ttl, **kw)
非数值排序
将多个条件作为排序依据,当排序的依据不是数值时,必须转为数字才能使用有序集合的分值。
Redis中有序集合的分值是以双精度浮点数存储的,最大支持64个二进制位。字符串转换操作最好用到不超过63位。
def string_to_score(string, ignore_case=False):
# 是否忽略大小写
if ignore_case:
string = string.lower()
# 取前6个字符转换为ASCII值
pieces = list(map(ord, string[:6]))
# 长度小于6的,添加占位符
while len(pieces) < 6:
pieces.append(-1)
score = 0
# 把每个字符转换的值计算到分值里面
for piece in pieces:
score = score * 257 + piece + 1
# 多用一位,用来区分字符串是否刚好是6个字符长
return score * 2 + (len(string) > 6)
计算分值时每个字符的分值都会加1,这样空字符的值是1,而不足6个字符的填充值是0。
字符串映射为分值后,程序可以对字符串的前6个字符进行前缀比较排序。
7.3 广告定向
广告服务器:接收各种信息,并通过这些信息找出能够通过点击、浏览或动作获得最大经济收益的广告。用户访问带广告的页面时,Web服务器和用户的浏览器都会向广告服务器发送请求获取广告。
广告预算,每个广告会拥有一个随时间减少的预算。
数据结构
说明 | 数据结构 | 键名 | 内容 |
---|---|---|---|
位置可匹配的广告 | 集合 | idx:req:位置 | 广告的id |
单词可匹配的广告 | 集合 | idx:单词 | 广告的id |
广告的类型 | 哈希 | type: | 广告id对应广告的类型 |
广告的eCPM | 有序集合 | idx:ad:value: | 成员为广告id,分值为广告的eCPM |
广告的基本价格 | 有序集合 | ad:base:value | 成员为广告id,分值为广告的cpc、cpa或cpm |
广告包含的单词 | 集合 | terms:广告id | 单词 |
定向操作的id | 数值 | ads:served: | 自增值 |
定向操作中匹配的单词 | 集合 | terms:matched:定向id | 单词 |
表示该类型的广告的展示次数 | 数值 | type:广告类型:views: | 自增值 |
广告及其内单词的展示次数 | 有序集合 | views:广告id | 成员是单词,分值是展示数。成员为空字符串时表示广告本身的展示数 |
表示该类型的广告的点击次数 | 数值 | type:广告类型:clicks: | 自增值 |
表示该类型的广告的动作执行次数 | 数值 | type:广告类型:actions: | 自增值 |
广告及其内单词的点击次数 | 数值 | clicks:广告id | 自增值 |
广告及其内单词的动作执行次数 | 数值 | actions:广告id | 自增值 |
广告价格的计算
广告计费方式
CPM:cost per mile,按千次展示计费;
CPC:cost per click,按点击次数计费;
CPA:cost per action,按动作执行次数(购买次数)计费;统一计费方式
把其他类型的广告做转换,变成基于每千次展示进行计算,得到估算的CPM。
对于CPC广告,需要把广告每次点击价格乘以广告的点击通过率,然后乘以1000。
对于CPA广告,把广告的点击通过率、用户在页面上执行动作的概率和被执行动作的价格相乘,然后乘以1000。
def cpc_to_ecpm(views, clicks, cpc):
return 1000. * cpc * clicks / views
def cpa_to_ecpm(views, actions, cpa):
return 1000. * cpa * actions / 1000
广告索引
广告索引需要视定向的具体需求来定。
示例使用位置和内容作为定向的选项。
TO_ECPM = {
'cpc': cpc_to_ecpm,
'cpa': cpa_to_ecpm,
'cpm': lambda *args: args[-1],
}
def index_ad(conn, id, locations, content, type, value):
pipeline = conn.pipeline(True)
# 把广告的id加到所有相关的位置集合里
for location in locations:
pipeline.sadd('idx:req:' + location, id)
words = tokenize(content)
# 对广告包含的单词做索引
for word in words:
pipeline.zadd('idx:' + word, id, 0)
# AVERAGE_PER_1K是每千次展示的平均点击数和平均动作执行次数数
rvalue = TO_ECPM[type](1000, AVERAGE_PER_1K.get(type, 1), value)
# 记录广告类型的哈希
pipeline.hset('type:', id, type)
# 记录广告ecpm的有序集合
pipeline.zadd('idx:ad:value:', id, rvalue)
# 记录广告基本价格的有序集合
pipeline.zadd('ad:base:value:', id, value)
# 记录能够对广告进行定向的单词
pipeline.sadd('terms:' + id, *list(words))
pipeline.execute()
广告定向操作
广告定向就是根据需求匹配广告,找出eCPM最高的一个广告。
广告定向操作的要求:
- 基于位置匹配广告;
- 记录页面内容和广告内容的匹配程度,以及不同匹配程度对广告点击通过率的影响等统计;
- 从用户行为中学习,改善广告投放。
定向操作流程
def target_ads(conn, locations, content):
pipeline = conn.pipeline(True)
# 根据位置参数找所有匹配的广告和广告的eCPM
matched_ads, base_ecpm = match_location(pipeline, locations)
# 基于内容计算附加值
words, targeted_ads = finish_scoring(
pipeline, matched_ads, base_ecpm, content
)
# 创造一个id,用于记录定向操作
pipeline.incr('ads:served:')
# 找eCPM最高的广告
pipeline.zrevrange('idx:' + targeted_ads, 0, 0)
target_id, target_ad = pipeline.execute()[-2:]
if not target_ad:
return None, None
ad_id = targeted_ad[0]
# 记录定向操作的执行结果,用于学习用户行为
record_targeting_result(conn, target_id, ad_id, words)
# 返回定向操作记录的id和广告id
return target_id, ad_id
定向id用于追踪广告引发的点击,用于了解定向操作的哪部分对点击做了贡献。
按位置匹配广告的函数
def match_location(pipe, locations):
# 根据所有位置找出需要计算并集的集合键
required = ['req:' + loc for loc in locations]
# 把和给定位置匹配的广告存到集合
matched_ads = union(pipe, required, ttl=300, _execute=False)
# 被匹配广告的基本eCPM
base_ecpm = zintersect(pipe, {matched_ads: 0, 'ad:value:': 1}, _execute=False)
# 返回两波结果数据的id
return matched_ads, base_ecpm
计算附加值
广告投放的效果是由上下文决定的,因此需要通过页面的单词来寻找更好的广告。
开始展示广告后,系统会记录广告中包含的哪个单词改善或损害了广告的预期效果,然后据此修改各个定向单词的相对价格。
计算附加值就是基于页面内容和广告内容两者之间相匹配的单词,算出应该给广告的eCPM价格加上多少增量。
广告的平均eCPM是广告已知的平均CPM加上单词带来的eCPM附加值。理想情况下,需要每个匹配的单词以匹配次数为权重计算加权的eCPM。
但是Redis无法使用有序集合做加权平均计算,所以实际估算时先算这些单词的最大附加值和最小附加值,它俩求平均值,作为多个匹配单词的附加值。
这样计算会有偏差,但是实现起来简单,而且真实的附加值的确就在最大值和最小值之间。
def finish_scoring(pipe, matched, base, content):
bonus_ecpm = {}
# 标记化处理内容
words = tokenize(content)
for word in words:
# 找到既能匹配到位置,又包含页面中的单词的广告
word_bonus = zintersect(
pipe, {matched: 0, word: 1}, _execute=False
)
bonus_ecpm[word_bonus] = 1
if bonus_ecpm:
# 计算广告的最小eCPM附加值和最大eCPM附加值
minimum = zunion(pipe, bonus_ecpm, aggregate='MIN', _execute=False)
maximum = zunion(pipe, bonus_ecpm, aggregate='MAX', _execute=False)
# 计算广告的eCPM估算值(基本值+最小附加值的一半+最大附加值的一半)
return words, zunion(pipe, {base: 1, minimum: 0.5, maximum: 0.5}, _execute=False)
# 页面中的单词都匹配不到,就返回广告的基本eCPM
return words, base
先执行多次小的交集计算,然后再执行并集计算,这种方式比先并集后交集的整体计算量要小。
记录用户行为并学习
通过记录被匹配单词以及被定向广告的相关信息,可以计算点击通过率、动作执行率和每个单词的eCPM附加值。
前面只是简单的给出单词的eCPM的初始值。后续需要从用户行为中发现为每个被匹配单词独立计算附加值的方法。
记录定向操作执行结果
def record_targeting_result(conn, target_id, ad_id, words):
pipeline = conn.pipeline(True)
# 取广告内的单词
terms = conn.smembers('terms:' + ad_id)
# 广告和内容匹配的单词列表
matched = list(words & terms)
# 记录匹配的单词
if matched:
matched_key = 'terms:matched:%s' % target_id
pipeline.sadd(matched_key, *matched)
pipeline.expire(matched_key, 900)
# 分类型记录广告的展示次数
type = conn.hget('type:', ad_id)
pipeline.incr('type:%s:views:' % type)
# 记录广告包含的单词的展示信息
for word in matched:
pipeline.zincrby('views:%s' % ad_id, word)
# 记录广告本身的展示信息
pipeline.zincrby('views:%s' % ad_id, '')
# 每展示100次,就更新广告的eCPM
if not pipeline.execute()[-1] % 100:
update_cpms(conn, ad_id)
记录点击和动作
def record_click(conn, target_id, ad_id, action=False):
pipeline = conn.pipeline(True)
click_key = 'clicks:%s' % ad_id
match_key = 'terms:matched:%s' % target_id
type = conn.hget('type:', ad_id)
if type == 'cpa':
# 刷新单词集合的过期时间
pipeline.expire(match_key, 900)
if action:
# 需记录动作而非点击
click_key = 'actions:%s' % ad_id
# 广告类型的点击/动作计数器
if action and type == 'cpa':
pipeline.incr('type:%s:actions:' % type)
else:
pipeline.incr('type:%s:clicks:' % type)
# 为广告及其内单词记录本次点击
matched = list(conn.smembers(match_key))
matched.append('')
for word in matched:
pipeline.zincrby(click_key, word)
pipeline.execute()
# 更新单词的eCPM
update_cpms(conn, ad_id)
更新匹配的单词集合的过期时间,是为了在点击发生的15分钟内持续对发生的动作进行计数。
广告每展示100至2000次就会引发一次点击或动作,所以每次记录点击时都会更新eCPM。
更新eCPM
def update_cpms(conn, ad_id):
pipeline = conn.pipeline(True)
# 取广告类型、广告价格、广告包含的单词
pipeline.hget('type:', ad_id)
pipeline.zscore('ad:base_value:', ad_id)
pipeline.smembers('terms:' + ad_id)
type, base_value, words = pipeline.execute()
# eCPM的计算方式是基于点击数还是动作执行次数
which = 'clicks'
if type == 'cpa':
which = 'actions'
# 取该类型广告的展示数和点击数
pipeline.get('type:%s:views:' % type)
pipeline.get('type:%s:%s' % (type, which))
type_views, type_clicks = pipeline.execute()
# 类型广告的点击率写入全局字典
AVERAGE_PER_1K[type] = (1000. * int(type_clicks or '1') / int(type_views or '1'))
# CPM广告的eCPM不需要更新
if type == 'cpm':
return
view_key = 'views:%s' % ad_id
click_key - '%s:%s' % (which, ad_id)
to_ecpm = TO_ECPM[type]
# 取广告展示数和点击数
pipeline.zscore(view_key, '')
pipeline.zscore(click_key, '')
ad_views, ad_clicks = pipeline.execute()
if (ad_clicks or 0) < 1:
# 广告没被点击过,使用之前的eCPM
ad_ecpm = conn.zscore('idx:ad:value:', ad_id)
else:
# 点击过的广告计算eCPM并更新
ad_ecpm = to_ecpm(ad_views or 1, ad_clicks or 0, base_value)
pipeline.zadd('idx:ad:value:', ad_id, ad_ecpm)
for word in words:
# 每个单词的展示数和点击数
pipeline.zscore(view_key, word)
pipeline.zscore(click_key, word)
views, clicks = pipeline.execute()[-2:]
# 广告未点击过,不更新eCPM
if (clicks or 0) < 1:
continue
word_ecpm = to_ecpm(views or 1, clicks or 0, base_value)
# 单词的附加值为单词计算的eCPM减去广告的eCPM
bonus = word_ecpm - ad_ecpm
# 更新单词的附加值
pipeline.zadd('idx:' + word, ad_id, bonus)
pipeline.execute()
优化eCPM计算
以上对广告实现定向的实现仅仅是运作得还行,并不完美。但是能够以此为基础做出改进。
- 定期降低广告的展示次数和点击次数。避免由于基数过大后,新的点击和展示无法对点击率造成影响。
- 学习前一天、前一周发生的点击和动作计数,并基于时间段设置不同权重。
- 大型的广告网络实际上不会直接按固定价格对每千次展示或动作进行收费,而是按照被定向广告的价格排名第二的广告价格进行收费。
- 广告网络会为给定的一系列关键字设置多个广告,这些广告会在价格最高的位置上交替出现,直到关键字的预算被耗尽为止。这意味着价格低的新广告不会展示。可以在一定比例的系统时间内,从前100的广告中按eCPM相对值做选择,而不是只选eCPM最高的广告。
- 新加入的广告最开始用于计算eCPM的信息是很少的。初始化时可以使用同类型广告的平均点击通过率,但是发生过一次点击后就不能了。需要在平均点击通过率和基于点击次数计算的通过率之间构建反线性关系,直到展示次数足够为止(一般2-5千次展示才能确定一个可靠的点击通过率)。
- 广告的展示次数达到2-5千次之前,可以人为提高广告的点击通过率或eCPM,以确保获取足够多的流量来学习广告的真正eCPM。
- 计算单词附加值的方式有更严谨的方法。可以考虑使用贝叶斯统计、神经网络、关联规则学习、聚类计算等。
- 记录广告的展示信息、点击信息等操作会耗费返回广告的时间。能以外部任务的方式执行这些操作。
7.4 职位搜索
使用集合和有序集合实现职位搜索功能,并根据求职者的技能为他们寻找合适的岗位。
逐个查找合适的职位
最简单的实现就是一个个地检查求职者的技能是否符合各个职位的要求:
def add_job(conn, jod_id, required_skills):
# 把职位需要的技能加到职位对应的集合
conn.sadd('job:' + job_id, *required_skills)
def is_qualified(conn, job_id, candidate_skills):
temp = str(uuid.uuid4())
pipeline = conn.pipeline(True)
# 求职者拥有的技能加到临时集合内
pipeline.sadd(temp, *candidate_skills)
pipeline.expire(temp, 5)
# 找出职位要求中求职者不具备的技能,记录到结果集合里
pipeline.sdiff('job:' + job_id, temp)
return not pipeline.execute()[-1]
搜索合适的职位
逐个查找的方式效率低下,职位多时性能会差。
按技能索引职位
def index_job(conn, job_id, skills):
pipeline = conn.pipeline(True)
# 把职位id加到技能集合
for skill in skills:
pipeline.sadd('idx:skill:' + skill, job_id)
# 记录职位所需技能的数量
pipeline.zadd('idx:jobs:req', job_id, len(set(skills)))
pipeline.execute()
找求职者能够胜任的职位
def find_jobs(conn, candidate_skills):
# 用于计算职位得分的字典
skills = {}
for skill in candidate_skills:
skills['skills:' + skill] = 1
# 各个技能集合做并集,得到求职者对每个职位的得分
job_scores = zunion(conn, skills)
# 计算求职者能胜任和不能胜任的职位
final_result = zintersect(conn, {job_scores: -1, 'job:req': 1})
# 分值为0即求职者可胜任职位
return conn.zrangebyscore('idx:' + final_result, 0, 0)
职位需求的总分减去求职者在该岗位的得分,结果为0表示能够胜任。
这样搜索的运行速度,取决于被搜索职位的数量和搜索次数,即使总的职位数量再大也不影响性能。