向量数据库 + 推荐系统
PQ算法,先分多个空间,再在空间里面去找对应的最佳值。
一 HNSW算法
二 推荐算法
三 代码实现推荐系统
3.1 新闻数据集
MIND: Microsoft News Recommendation Dataset
官网:https://msnews.github.io
论文:https://msnews.github.io/assets/doc/ACL2020_MIND.pdf
阿里天池对数据集MIND的介绍:https://tianchi.aliyun.com/dataset/89539
MIcrosoft News Dataset (MIND) 是一个用于新闻推荐研究的大规模数据集。它是从Microsoft新闻网站的匿名行为日志中收集的。MIND的使命是作为新闻推荐的基准数据集,促进新闻推荐和推荐系统领域的研究。
MIND包含约160k篇英文新闻文章和100万用户产生的超过1500万条展示日志。
每篇新闻文章都包含丰富的文本内容,包括标题、摘要、正文、类别和实体。每个展示日志都包含该用户在本次展示前的点击事件、未点击事件和历史新闻点击行为。为了保护用户隐私,当安全地散列到匿名 ID 时,每个用户都会与生产系统取消链接。
- MINDsmall_train.zip
- news.tsv:新闻文章的信息
- behaviors.tsv:用户的点击历史和印象日志
- entity_embedding.vec:从知识图中提取的新闻中的实体嵌入
- lation_embedding.vec:从知识图中提取的实体之间的关系的嵌入
新闻文章的详细信息。有7列:
- News ID 新闻编号
- Category 类别
- SubCategory 子类别
- Title 标题
- Abstract 摘要
- URL 新闻网址
- Title Entities (entities contained in the title of this news)
- 标题实体(本新闻标题中包含的实体)
- Abstract Entities (entites contained in the abstract of this news)
- 摘要实体(本新闻摘要中包含的实体)
import pandas as pd
# 设置显示的列宽为 None,以显示完整文本列内容
pd.set_option('display.max_colwidth', None)
df_news = pd.read_csv(
'./MIND/MINDsmall_train/news.tsv',
names=["news_id", "category", "sub_category", "title", "abstract", "url",
"title_entities", "abstract_entities"],
sep='\t',
header=None
)
df_news.head(2)
df_news.iloc[0]
df_news.info()
df_news.describe()
3.2 推荐代码
# 行为数据
df_behaviors = pd.read_csv('MIND\\MINDsmall_train\\behaviors.tsv',
names=["impression_id", "user_id", "time", "click_history", "impression_lpg"],
sep='\t',
header=None)
df_behaviors.head(2)
df_behaviors.iloc[0]
df_behaviors.info()
df_behaviors.describe()
unique_user_ids = df_behaviors['user_id'].unique()
print(len(unique_user_ids))
unique_user_ids
3.3 vec 文件
entity_embedding.vec 和 lation_embedding.vec
entity_embedding.vec和lation_embedding.vec文件包含通过 TransE方法从子图(从WikiData知识图)获知的实体和关系的100维嵌入。在两个文件中,第一列是实体/关系的ID,其他列是嵌入矢量值。我们希望这些数据可以促进对知识意识新闻推荐的研究。
# 设置.vec 文件路径
vec_file_path = 'MIND\\MINDsmall_train\\entity_embedding.vec'
# 读取 .vec 文件
with open(vec_file_path, 'r', encoding='utf-8') as file:
# 逐行读取词向量
for line in file:
# print(line)
line = line.strip()
parts = line.split('\t')
# print(parts)
word = parts[0]
vector = [float(value) for value in parts[1:]]
print(f'Word: {word}, Vector: {vector}')
# 如果要查找特定单词的向量,可以在这里添加条件
if word == 'Q41':
break
# 设置.vec 文件路径
vec_file_path = 'MIND\\MINDsmall_train\\relation_embedding.vec'
# 读取 .vec 文件
with open(vec_file_path, 'r', encoding='utf-8') as file:
# 逐行读取词向量
for line in file:
# print(line)
line = line.strip()
parts = line.split('\t')
# print(parts)
word = parts[0]
vector = [float(value) for value in parts[1:]]
print(f'Word: {word}, Vector: {vector}')
# 如果要查找特定单词的向量,可以在这里添加条件
if word == 'P31':
break
3.4 系统实现思路
为一个用户做新闻推荐:基于当前用户的行为信息,猜测他的偏好,该系统能推荐与偏好相似的新闻文章。
- 循环 df_behaviors 的每一行,每一行都是一条用户行为记录,想基于用户行为记录信息去做推荐。
- 排序算法
- 基于点击历史、用户画像、候选集,用prompt进行排序
- 点击历史:从用户行为记录中获取当前用户的点击历史,点击历史即新闻ID列表,可以基于新闻ID去 df_news 查询新闻的详细信息。
- 用户画像:基于点击过的新闻详细信息字符串设计一个prompt,llm输出用户画像(例如:用户爱看的主题和关注的地区)
- 候选集:利用召回算法从大量的新闻文章里筛选出来
- 基于点击历史、用户画像、候选集,用prompt进行排序
- 召回算法
- 第一轮筛选:规则筛选,几千篇
- 从用户的点击历史中统计出当前用户的(新闻类别,新闻子类别)偏好组合,只筛选符合该组合的新闻
- 第二轮筛选:向量相似度筛选,20篇
- 用户画像字符串去做embedding作为user_emb
- 大量新闻文章向量化后存入向量库等待被检索,news_emb
- 用户画像得到的user_emb 与 news_emb做相似度计算,只取前20篇
- 第一轮筛选:规则筛选,几千篇
3.5 LLM for 特征工程
论文地址:https://arxiv.org/pdf/2305.06566v4.pdf
该工作(GENRE)在新闻推荐的场景下,用 LLM 构造了三个不同的prompts,分别来进行新闻摘要的改写,用户画像的构建,还有样本增强。
首先可以看到它把新闻的title, abstract 还有category 当作输入,然后要求大语言模型来 生成一个摘要,把这个摘要当作这个新闻的 new feature输入下游。
然后是用户画像,根据用户过去观看过的新闻的标题,尝试去问大语言模型是否知道这个用户的一些感兴趣的topic,也就是用户的喜好和他所在的位置。
另外,因为有一些用户看过的新闻非常少,所以用大语言模型来做一些样本的扩充。这里是把用户看过的一些新闻的category,还有 title 输入到大语言模型里面去,希望大语言模型能够根据他看过的这些新闻,生成出来一些用户并没有看过,但可能感兴趣的“伪新闻”,然后把这些“伪”交互数据也当作训练集的一部分来进行训练。
实验表明这些手段都可以增强原始推荐的效果。
3.6 LLM for 特征编码
评测embedding模型:https://huggingface.co/spaces/mteb/leaderboard
我们选择的embedding模型:https://huggingface.co/DMetaSoul/Dmeta-embedding
3.7 LLM for 打分排序
Chat-REC: https://arxiv.org/pdf/2303.14524.pdf
四 存入向量数据库
# 读取新闻数据 df_news = get_df_news() # 数据预处理 df_news, news_info_list = preprocess_data(df_news) # 指定存储 embeddings 的文件夹路径 embedding_folder = 'embeddings_folder' os.makedirs(embedding_folder, exist_ok=True) # 计算和存储嵌入 compute_and_store_embeddings(news_info_list, embedding_folder, 1000) # 加载嵌入和存储到向量数据库 collection_name = "all_news" load_embeddings_and_save_to_qdrant(collection_name, embedding_folder, 1000)
pd.notna(x)
:确保仅对有效值进行解析,跳过缺失值。
literal_eval(x)
:安全地将字符串转换为结构化数据,避免代码注入风险。将字符串形式的 Python 字面量(如列表、字典、元组)转换为对应的 Python 对象。
协作效果:将字符串列转换为统一的 Python 对象列,处理缺失值,为后续分析(如提取实体、统计频次)奠定基础。
import json
import os
from ast import literal_eval
from utils import *
from db_qdrant import Qdrant
def preprocess_data(df_news):
# 数据预处理块
# 将包含字符串表示的列表转换为实际的列表
# pd.notna(x) 检查x是否为非缺失值(即不是NaN),确保不对缺失值进行转换。
# literal_eval(x) 是一个安全的方式来将字符串转换为相应的Python对象
df_news['title_entities'] = df_news['title_entities'].apply(
lambda x: literal_eval(x) if pd.notna(x) else [])
df_news['abstract_entities'] = df_news['abstract_entities'].apply(
lambda x: literal_eval(x) if pd.notna(x) else [])
# 使用空字符串填充其他列的 NaN 值
df_news = df_news.fillna('')
# 新增 news_info 列,合并`类别、子类别、标题、摘要`字符串
concatenation_order = ["category", "sub_category", "title", "abstract"]
df_news['news_info'] = df_news.apply(lambda row: ' '.join(
f"[{col}]:{row[col]}" for col in concatenation_order), axis=1)
news_info_list = df_news['news_info'].values.tolist()
logger.trace(
f"新增 news_info 列 | len(news_info_list): {len(news_info_list)}")
return df_news, news_info_list
def store_embeddings_to_json(embeddings, ids, payloads, file_path):
# 存储嵌入为 JSON 文件
json_data = {
"batch_ids": ids,
"batch_embeddings": embeddings,
"batch_payloads": payloads
}
with open(file_path, 'w') as json_file:
json.dump(json_data, json_file)
def compute_and_store_embeddings(data_list, embedding_folder, batch_size=1000):
# 嵌入计算和存储块
# 分批次向量化
ids = list(range(1, len(data_list) + 1)) # 生成递增的 ids 列表
for batch_idx, i in enumerate(range(0, len(data_list), batch_size)):
# 获取批次数据 batch_ids、batch_payloads
batch_ids = ids[i:i + batch_size]
df_news_batch = df_news.iloc[i:i + batch_size]
batch_payloads = df_news_batch.to_dict(orient='records')
# 计算嵌入 batch_embeddings
batch_data = data_list[i:i + batch_size]
batch_embeddings = embed_sentences(batch_data)
# 存储为 JSON 文件
file_path = os.path.join(
embedding_folder,
f"batch_{batch_idx + 1}.json")
store_embeddings_to_json(
batch_embeddings,
batch_ids,
batch_payloads,
file_path)
# 打印存储信息
logger.info(f"批次 {batch_idx} 数据存储成功,文件路径: {file_path}")
def load_embeddings_and_save_to_qdrant(
collection_name,
embedding_folder,
batch_size):
# 加载嵌入和存储到向量数据库
qdrant = Qdrant()
# 创建新的集合
if qdrant.create_collection(collection_name):
logger.success(f"创建集合成功 | collection_name: {collection_name}")
else:
logger.error(f"创建集合失败 | collection_name: {collection_name}")
# 分批次存储到向量数据库
for batch_idx, i in enumerate(range(0, len(news_info_list), batch_size)):
# 读取 JSON 文件
file_path = os.path.join(
embedding_folder,
f"batch_{batch_idx + 1}.json")
if os.path.exists(file_path):
with open(file_path, 'r') as json_file:
json_data = json.load(json_file)
batch_ids = json_data["batch_ids"]
batch_embeddings = json_data["batch_embeddings"]
batch_payloads = json_data["batch_payloads"]
# 插入数据到 Qdrant
if qdrant.add_points(
collection_name,
batch_ids,
batch_embeddings,
batch_payloads):
logger.success(f"批次 {batch_idx + 1} 插入成功")
else:
logger.error(f"批次 {batch_idx + 1} 插入失败")
else:
logger.warning(f"文件 {file_path} 不存在,跳过该批次数据的插入。")
logger.info("所有数据插入完成。")
# 读取新闻数据
df_news = get_df_news()
# 数据预处理
df_news, news_info_list = preprocess_data(df_news)
# 指定存储 embeddings 的文件夹路径
embedding_folder = 'embeddings_folder'
os.makedirs(embedding_folder, exist_ok=True)
# 计算和存储嵌入
compute_and_store_embeddings(news_info_list, embedding_folder, 1000)
# 加载嵌入和存储到向量数据库
collection_name = "all_news"
load_embeddings_and_save_to_qdrant(collection_name, embedding_folder, 1000)
执行结果:
五 主要的推荐逻辑代码
main.py
from utils import *
from db_qdrant import Qdrant
from qdrant_client.http import models
# 获取数据
df_behaviors_sample = get_df_behaviors_sample()
df_news = get_df_news()
# 循环 df_behaviors_sample 的每一行
for _, row in df_behaviors_sample.iterrows():
user_id = row['user_id']
click_history = row['click_history'].split()
# 召回
# 生成历史交互字符串 historical_records_str
historical_records = generate_historical_records(df_news, click_history)
historical_records_str = '\n'.join(historical_records)
logger.info(
f"历史交互字符串 | historical_records_str: \n{historical_records_str}")
# 生成用户画像 user_profile
user_profile = generate_user_profile(historical_records_str)
# 向量化用户画像 user_emb
user_emb = embed_sentences([user_profile])[0]
# 过滤条件 query_filter
# 统计出当前用户的(新闻类别,新闻子类别)偏好组合
user_favorite_combinations = get_user_favorite_combinations(
click_history, df_news)
should_items = []
for category, sub_category in user_favorite_combinations:
should_item = models.Filter(
must=[
models.FieldCondition(
key="category",
match=models.MatchValue(
value=category,
)
),
models.FieldCondition(
key="sub_category",
match=models.MatchValue(
value=sub_category,
)
)
]
)
should_items.append(should_item)
query_filter = models.Filter(
should=should_items
)
# 使用 Qdrant 查询与用户画像字符串最相似的 news 列表
qdrant = Qdrant()
scored_point_list = qdrant.search_with_query_filter(
"all_news", user_emb, query_filter, 20)
coarse_top_news = [
scored_point.payload for scored_point in scored_point_list]
logger.info(f"len(top_news): {len(coarse_top_news)}")
if coarse_top_news:
# 排序
coarse_top_news_str = '\n'.join(
[f"{idx}. {news}" for idx, news in enumerate(coarse_top_news)])
fine_top_news = fine_ranking(
user_profile,
historical_records_str,
coarse_top_news_str,
5)
for idx in fine_top_news:
news = coarse_top_news[int(idx)]
logger.success(int(idx))
logger.success(news)
break
urls.py
import sys
from collections import Counter
import pandas as pd
import torch
from loguru import logger
from NewsGPT import NewsGPT
from sentence_transformers import SentenceTransformer, util
logger.remove() # 删去import logger之后自动产生的handler,不删除的话会出现重复输出的现象
logger.add(sys.stderr, level="DEBUG") # 调整日志输出级别: INFO|DEBUG|TRACE
model_name = 'Dmeta-embedding'
device = "cuda" if torch.cuda.is_available() else "cpu"
logger.info("Use pytorch device: {}".format(device))
model = SentenceTransformer(model_name, device=device)
def embed_sentences(sentences):
"""
使用预训练的 SentenceTransformer 模型对句子列表进行嵌入。
参数:
- sentences:要嵌入的句子列表。
- model_name:SentenceTransformer 模型的名称。
返回:
- embeddings:句子嵌入的列表。
"""
embeddings = model.encode(sentences, normalize_embeddings=True)
embeddings = embeddings.tolist()
logger.trace(f"向量化 | embeddings: {len(embeddings)}, {type(embeddings)}")
return embeddings
def get_df_behaviors_sample(sample_size=10):
# 读取行为数据
df_behaviors = pd.read_csv(
'MIND/MINDsmall_train/behaviors.tsv',
names=[
"impression_id",
"user_id",
"time",
"click_history",
"impression_lpg"],
sep='\t',
header=None)
# 采样
df_behaviors_sample = df_behaviors.head(sample_size)
logger.info(
f"采样后 | df_behaviors_sample.shape: {df_behaviors_sample.shape}")
return df_behaviors_sample
def get_df_news():
# 读取新闻数据
df_news = pd.read_csv(
'MIND/MINDsmall_train/news.tsv',
names=[
"news_id",
"category",
"sub_category",
"title",
"abstract",
"url",
"title_entities",
"abstract_entities"],
sep='\t',
header=None)
logger.info(f"df_news.shape: {df_news.shape}")
return df_news
def get_user_favorite_combinations(click_history, df_news):
# 从 df_news 中查询并构建用户最喜欢的(category, sub_category)组合
user_favorite_combinations = Counter()
for news_id in click_history:
category = df_news.loc[df_news['news_id']
== news_id, 'category'].values[0]
sub_category = df_news.loc[df_news['news_id']
== news_id, 'sub_category'].values[0]
user_favorite_combinations.update([(category, sub_category)])
logger.info(
f"统计用户偏好类别组合 | user_favorite_combinations: {user_favorite_combinations}")
return user_favorite_combinations
def generate_historical_records(df_news, click_history):
# 根据 click_history 查询每条新闻的详细信息,并组合成字符串
historical_records = []
for idx, news_id in enumerate(click_history, start=1):
# 查询每条新闻的详细信息
record = df_news[df_news['news_id'] == news_id][[
"category", "sub_category", "title", "abstract"]]
# 组合成字符串,添加序号
record_str = f"{idx}. " + ' '.join(
f"[{col}]:{record.iloc[0][col]}" for col in [
"category", "sub_category", "title", "abstract"])
historical_records.append(record_str)
logger.trace(f"历史交互 | historical_records: {historical_records}")
return historical_records
def generate_user_profile(historical_records_str):
# 生成用户画像: 通过理解`用户历史行为序列`,生成`用户感兴趣的话题`以及`用户位置信息`
prompt = f"""Describe user profile based on browsed news list in the following format:
{historical_records_str}
You should describe the related topics and regions in the following format:
[topics]
-[topic1]
[region]
-[region1]
"""
logger.info(f"prompt: \n{prompt}")
# 模拟 NewsGPT 的调用
gpt = NewsGPT()
user_profile = gpt.get_completion(prompt)
# user_profile = '''
# [topics]
# -TV Entertainment
# -Sports
# -Crime
# -Lifestyle
# -Movies
# -Politics
# [regions]
# -United States'''
logger.success(f"用户画像 | user_profile: \n{user_profile}")
return user_profile
def fine_ranking(
user_profile,
historical_records_str,
candidate_list,
top_n=5):
prompt = f"""
I want you to recommend news to a user based on some personal information and historical records of news reading.
User profile: ```
{user_profile}
```
The historical records include news category, subcategory, title, and abstract. You are encouraged to learn his news preferences from the news he has read. Here are some examples:```
{historical_records_str}
```
Here's a list of news that he is likely to like: ```
{candidate_list}
```
Please select the top {top_n} news in the list that is most likely to be liked.
Please only output the order of these news in the form of a numerical list.
Example Output Format: 1,8,2,12,7
Output: """
logger.info(f"prompt: \n{prompt}")
gpt = NewsGPT()
response = gpt.get_completion(prompt)
logger.success(f"response: \n{response}")
top_news = response.strip().split(',')
return top_news