大模型架构记录6-推荐算法

发布于:2025-03-15 ⋅ 阅读:(14) ⋅ 点赞:(0)

向量数据库 + 推荐系统

PQ算法,先分多个空间,再在空间里面去找对应的最佳值。

一 HNSW算法 

二 推荐算法

三 代码实现推荐系统

3.1 新闻数据集

MIND: Microsoft News Recommendation Dataset

官网:https://msnews.github.io

论文:https://msnews.github.io/assets/doc/ACL2020_MIND.pdf

阿里天池对数据集MIND的介绍:https://tianchi.aliyun.com/dataset/89539

MIcrosoft News Dataset (MIND) 是一个用于新闻推荐研究的大规模数据集。它是从Microsoft新闻网站的匿名行为日志中收集的。MIND的使命是作为新闻推荐的基准数据集,促进新闻推荐和推荐系统领域的研究。

MIND包含约160k篇英文新闻文章和100万用户产生的超过1500万条展示日志
每篇新闻文章都包含丰富的文本内容,包括标题、摘要、正文、类别和实体。

每个展示日志都包含该用户在本次展示前的点击事件、未点击事件和历史新闻点击行为。为了保护用户隐私,当安全地散列到匿名 ID 时,每个用户都会与生产系统取消链接。

  • MINDsmall_train.zip
    • news.tsv:新闻文章的信息
    • behaviors.tsv:用户的点击历史和印象日志
    • entity_embedding.vec:从知识图中提取的新闻中的实体嵌入
    • lation_embedding.vec:从知识图中提取的实体之间的关系的嵌入

新闻文章的详细信息。有7列:

  • News ID 新闻编号
  • Category 类别
  • SubCategory 子类别
  • Title 标题
  • Abstract 摘要
  • URL 新闻网址
  • Title Entities (entities contained in the title of this news)
  • 标题实体(本新闻标题中包含的实体)
  • Abstract Entities (entites contained in the abstract of this news)
  • 摘要实体(本新闻摘要中包含的实体)
import pandas as pd
# 设置显示的列宽为 None,以显示完整文本列内容
pd.set_option('display.max_colwidth', None)
df_news = pd.read_csv(
    './MIND/MINDsmall_train/news.tsv',
    names=["news_id", "category", "sub_category", "title", "abstract", "url", 
           "title_entities", "abstract_entities"],
    sep='\t',
    header=None
)
df_news.head(2)

df_news.iloc[0]

df_news.info()

df_news.describe()

3.2 推荐代码

# 行为数据
df_behaviors = pd.read_csv('MIND\\MINDsmall_train\\behaviors.tsv', 
                           names=["impression_id", "user_id", "time", "click_history", "impression_lpg"],
                           sep='\t', 
                           header=None)
df_behaviors.head(2)

df_behaviors.iloc[0]

df_behaviors.info()

df_behaviors.describe()

unique_user_ids = df_behaviors['user_id'].unique()
print(len(unique_user_ids))
unique_user_ids

3.3 vec 文件

entity_embedding.vec  lation_embedding.vec

entity_embedding.vec和lation_embedding.vec文件包含通过 TransE方法从子图(从WikiData知识图)获知的实体和关系的100维嵌入。在两个文件中,第一列是实体/关系的ID,其他列是嵌入矢量值。我们希望这些数据可以促进对知识意识新闻推荐的研究。

# 设置.vec 文件路径
vec_file_path = 'MIND\\MINDsmall_train\\entity_embedding.vec'

# 读取 .vec 文件
with open(vec_file_path, 'r', encoding='utf-8') as file:
    
    # 逐行读取词向量
    for line in file:
        # print(line)
        line = line.strip()
        parts = line.split('\t')
        # print(parts)
        word = parts[0]
        vector = [float(value) for value in parts[1:]]
        print(f'Word: {word}, Vector: {vector}')
        
        # 如果要查找特定单词的向量,可以在这里添加条件
        if word == 'Q41':
            break

# 设置.vec 文件路径
vec_file_path = 'MIND\\MINDsmall_train\\relation_embedding.vec'

# 读取 .vec 文件
with open(vec_file_path, 'r', encoding='utf-8') as file:
    
    # 逐行读取词向量
    for line in file:
        # print(line)
        line = line.strip()
        parts = line.split('\t')
        # print(parts)
        word = parts[0]
        vector = [float(value) for value in parts[1:]]
        print(f'Word: {word}, Vector: {vector}')
        
        # 如果要查找特定单词的向量,可以在这里添加条件
        if word == 'P31':
            break

3.4 系统实现思路

为一个用户做新闻推荐:基于当前用户的行为信息,猜测他的偏好,该系统能推荐与偏好相似的新闻文章。

  • 循环 df_behaviors 的每一行,每一行都是一条用户行为记录,想基于用户行为记录信息去做推荐。
  • 排序算法
    • 基于点击历史、用户画像、候选集,用prompt进行排序
      • 点击历史:从用户行为记录中获取当前用户的点击历史,点击历史即新闻ID列表,可以基于新闻ID去 df_news 查询新闻的详细信息。
      • 用户画像:基于点击过的新闻详细信息字符串设计一个prompt,llm输出用户画像(例如:用户爱看的主题和关注的地区)
      • 候选集:利用召回算法从大量的新闻文章里筛选出来
  • 召回算法
    • 第一轮筛选:规则筛选,几千篇
      • 从用户的点击历史中统计出当前用户的(新闻类别,新闻子类别)偏好组合,只筛选符合该组合的新闻
    • 第二轮筛选:向量相似度筛选,20篇
      • 用户画像字符串去做embedding作为user_emb
      • 大量新闻文章向量化后存入向量库等待被检索,news_emb
      • 用户画像得到的user_emb 与 news_emb做相似度计算,只取前20篇

3.5 LLM for 特征工程

论文地址:https://arxiv.org/pdf/2305.06566v4.pdf

该工作(GENRE)在新闻推荐的场景下,用 LLM 构造了三个不同的prompts,分别来进行新闻摘要的改写,用户画像的构建,还有样本增强

首先可以看到它把新闻的title, abstract 还有category 当作输入,然后要求大语言模型来 生成一个摘要,把这个摘要当作这个新闻的 new feature输入下游

然后是用户画像,根据用户过去观看过的新闻的标题,尝试去问大语言模型是否知道这个用户的一些感兴趣的topic,也就是用户的喜好和他所在的位置。

另外,因为有一些用户看过的新闻非常少,所以用大语言模型来做一些样本的扩充。这里是把用户看过的一些新闻的category,还有 title 输入到大语言模型里面去,希望大语言模型能够根据他看过的这些新闻,生成出来一些用户并没有看过,但可能感兴趣的“伪新闻,然后把这些“伪”交互数据也当作训练集的一部分来进行训练。

实验表明这些手段都可以增强原始推荐的效果。

3.6 LLM for 特征编码

评测embedding模型:https://huggingface.co/spaces/mteb/leaderboard

我们选择的embedding模型:https://huggingface.co/DMetaSoul/Dmeta-embedding

3.7 LLM for 打分排序

Chat-REC: https://arxiv.org/pdf/2303.14524.pdf

四 存入向量数据库

# 读取新闻数据
df_news = get_df_news()

# 数据预处理
df_news, news_info_list = preprocess_data(df_news)

# 指定存储 embeddings 的文件夹路径
embedding_folder = 'embeddings_folder'
os.makedirs(embedding_folder, exist_ok=True)

# 计算和存储嵌入
compute_and_store_embeddings(news_info_list, embedding_folder, 1000)

# 加载嵌入和存储到向量数据库
collection_name = "all_news"
load_embeddings_and_save_to_qdrant(collection_name, embedding_folder, 1000)

pd.notna(x):确保仅对有效值进行解析,跳过缺失值

literal_eval(x)安全地将字符串转换为结构化数据,避免代码注入风险。将字符串形式的 Python 字面量(如列表、字典、元组)转换为对应的 Python 对象

协作效果:将字符串列转换为统一的 Python 对象列,处理缺失值,为后续分析(如提取实体、统计频次)奠定基础。


import json
import os
from ast import literal_eval

from utils import *
from db_qdrant import Qdrant


def preprocess_data(df_news):
    # 数据预处理块

    # 将包含字符串表示的列表转换为实际的列表
    # pd.notna(x) 检查x是否为非缺失值(即不是NaN),确保不对缺失值进行转换。
    # literal_eval(x) 是一个安全的方式来将字符串转换为相应的Python对象
    df_news['title_entities'] = df_news['title_entities'].apply(
        lambda x: literal_eval(x) if pd.notna(x) else [])
    df_news['abstract_entities'] = df_news['abstract_entities'].apply(
        lambda x: literal_eval(x) if pd.notna(x) else [])

    # 使用空字符串填充其他列的 NaN 值
    df_news = df_news.fillna('')

    # 新增 news_info 列,合并`类别、子类别、标题、摘要`字符串
    concatenation_order = ["category", "sub_category", "title", "abstract"]
    df_news['news_info'] = df_news.apply(lambda row: ' '.join(
        f"[{col}]:{row[col]}" for col in concatenation_order), axis=1)
    news_info_list = df_news['news_info'].values.tolist()
    logger.trace(
        f"新增 news_info 列 | len(news_info_list): {len(news_info_list)}")
    return df_news, news_info_list


def store_embeddings_to_json(embeddings, ids, payloads, file_path):
    # 存储嵌入为 JSON 文件
    json_data = {
        "batch_ids": ids,
        "batch_embeddings": embeddings,
        "batch_payloads": payloads
    }
    with open(file_path, 'w') as json_file:
        json.dump(json_data, json_file)


def compute_and_store_embeddings(data_list, embedding_folder, batch_size=1000):
    # 嵌入计算和存储块

    # 分批次向量化
    ids = list(range(1, len(data_list) + 1))  # 生成递增的 ids 列表

    for batch_idx, i in enumerate(range(0, len(data_list), batch_size)):
        # 获取批次数据 batch_ids、batch_payloads
        batch_ids = ids[i:i + batch_size]
        df_news_batch = df_news.iloc[i:i + batch_size]
        batch_payloads = df_news_batch.to_dict(orient='records')

        # 计算嵌入 batch_embeddings
        batch_data = data_list[i:i + batch_size]
        batch_embeddings = embed_sentences(batch_data)

        # 存储为 JSON 文件
        file_path = os.path.join(
            embedding_folder,
            f"batch_{batch_idx + 1}.json")
        store_embeddings_to_json(
            batch_embeddings,
            batch_ids,
            batch_payloads,
            file_path)

        # 打印存储信息
        logger.info(f"批次 {batch_idx} 数据存储成功,文件路径: {file_path}")


def load_embeddings_and_save_to_qdrant(
        collection_name,
        embedding_folder,
        batch_size):
    # 加载嵌入和存储到向量数据库

    qdrant = Qdrant()

    # 创建新的集合
    if qdrant.create_collection(collection_name):
        logger.success(f"创建集合成功 | collection_name: {collection_name}")
    else:
        logger.error(f"创建集合失败 | collection_name: {collection_name}")

    # 分批次存储到向量数据库
    for batch_idx, i in enumerate(range(0, len(news_info_list), batch_size)):
        # 读取 JSON 文件
        file_path = os.path.join(
            embedding_folder,
            f"batch_{batch_idx + 1}.json")
        if os.path.exists(file_path):
            with open(file_path, 'r') as json_file:
                json_data = json.load(json_file)

                batch_ids = json_data["batch_ids"]
                batch_embeddings = json_data["batch_embeddings"]
                batch_payloads = json_data["batch_payloads"]

                # 插入数据到 Qdrant
                if qdrant.add_points(
                        collection_name,
                        batch_ids,
                        batch_embeddings,
                        batch_payloads):
                    logger.success(f"批次 {batch_idx + 1} 插入成功")
                else:
                    logger.error(f"批次 {batch_idx + 1} 插入失败")
        else:
            logger.warning(f"文件 {file_path} 不存在,跳过该批次数据的插入。")

    logger.info("所有数据插入完成。")


# 读取新闻数据
df_news = get_df_news()

# 数据预处理
df_news, news_info_list = preprocess_data(df_news)

# 指定存储 embeddings 的文件夹路径
embedding_folder = 'embeddings_folder'
os.makedirs(embedding_folder, exist_ok=True)

# 计算和存储嵌入
compute_and_store_embeddings(news_info_list, embedding_folder, 1000)

# 加载嵌入和存储到向量数据库
collection_name = "all_news"
load_embeddings_and_save_to_qdrant(collection_name, embedding_folder, 1000)

执行结果:

五 主要的推荐逻辑代码

main.py


from utils import *
from db_qdrant import Qdrant
from qdrant_client.http import models

# 获取数据
df_behaviors_sample = get_df_behaviors_sample()
df_news = get_df_news()

# 循环 df_behaviors_sample 的每一行
for _, row in df_behaviors_sample.iterrows():
    user_id = row['user_id']
    click_history = row['click_history'].split()

    # 召回

    # 生成历史交互字符串 historical_records_str
    historical_records = generate_historical_records(df_news, click_history)
    historical_records_str = '\n'.join(historical_records)
    logger.info(
        f"历史交互字符串 | historical_records_str: \n{historical_records_str}")

    # 生成用户画像 user_profile
    user_profile = generate_user_profile(historical_records_str)

    # 向量化用户画像 user_emb
    user_emb = embed_sentences([user_profile])[0]

    # 过滤条件 query_filter
    # 统计出当前用户的(新闻类别,新闻子类别)偏好组合
    user_favorite_combinations = get_user_favorite_combinations(
        click_history, df_news)

    should_items = []
    for category, sub_category in user_favorite_combinations:
        should_item = models.Filter(
            must=[
                models.FieldCondition(
                    key="category",
                    match=models.MatchValue(
                        value=category,
                    )
                ),
                models.FieldCondition(
                    key="sub_category",
                    match=models.MatchValue(
                        value=sub_category,
                    )
                )
            ]
        )

        should_items.append(should_item)

    query_filter = models.Filter(
        should=should_items
    )

    # 使用 Qdrant 查询与用户画像字符串最相似的 news 列表
    qdrant = Qdrant()
    scored_point_list = qdrant.search_with_query_filter(
        "all_news", user_emb, query_filter, 20)
    coarse_top_news = [
        scored_point.payload for scored_point in scored_point_list]
    logger.info(f"len(top_news): {len(coarse_top_news)}")

    if coarse_top_news:
        # 排序
        coarse_top_news_str = '\n'.join(
            [f"{idx}. {news}" for idx, news in enumerate(coarse_top_news)])
        fine_top_news = fine_ranking(
            user_profile,
            historical_records_str,
            coarse_top_news_str,
            5)

        for idx in fine_top_news:
            news = coarse_top_news[int(idx)]
            logger.success(int(idx))
            logger.success(news)
    break

urls.py


import sys
from collections import Counter

import pandas as pd
import torch
from loguru import logger

from NewsGPT import NewsGPT
from sentence_transformers import SentenceTransformer, util

logger.remove()  # 删去import logger之后自动产生的handler,不删除的话会出现重复输出的现象
logger.add(sys.stderr, level="DEBUG")  # 调整日志输出级别: INFO|DEBUG|TRACE


model_name = 'Dmeta-embedding'
device = "cuda" if torch.cuda.is_available() else "cpu"
logger.info("Use pytorch device: {}".format(device))
model = SentenceTransformer(model_name, device=device)


def embed_sentences(sentences):
    """
    使用预训练的 SentenceTransformer 模型对句子列表进行嵌入。

    参数:
    - sentences:要嵌入的句子列表。
    - model_name:SentenceTransformer 模型的名称。

    返回:
    - embeddings:句子嵌入的列表。
    """
    embeddings = model.encode(sentences, normalize_embeddings=True)
    embeddings = embeddings.tolist()
    logger.trace(f"向量化 | embeddings: {len(embeddings)}, {type(embeddings)}")
    return embeddings


def get_df_behaviors_sample(sample_size=10):

    # 读取行为数据
    df_behaviors = pd.read_csv(
        'MIND/MINDsmall_train/behaviors.tsv',
        names=[
            "impression_id",
            "user_id",
            "time",
            "click_history",
            "impression_lpg"],
        sep='\t',
        header=None)

    # 采样
    df_behaviors_sample = df_behaviors.head(sample_size)
    logger.info(
        f"采样后 | df_behaviors_sample.shape: {df_behaviors_sample.shape}")

    return df_behaviors_sample


def get_df_news():

    # 读取新闻数据
    df_news = pd.read_csv(
        'MIND/MINDsmall_train/news.tsv',
        names=[
            "news_id",
            "category",
            "sub_category",
            "title",
            "abstract",
            "url",
            "title_entities",
            "abstract_entities"],
        sep='\t',
        header=None)
    logger.info(f"df_news.shape: {df_news.shape}")

    return df_news


def get_user_favorite_combinations(click_history, df_news):

    # 从 df_news 中查询并构建用户最喜欢的(category, sub_category)组合
    user_favorite_combinations = Counter()
    for news_id in click_history:
        category = df_news.loc[df_news['news_id']
                               == news_id, 'category'].values[0]
        sub_category = df_news.loc[df_news['news_id']
                                   == news_id, 'sub_category'].values[0]
        user_favorite_combinations.update([(category, sub_category)])

    logger.info(
        f"统计用户偏好类别组合 | user_favorite_combinations: {user_favorite_combinations}")
    return user_favorite_combinations


def generate_historical_records(df_news, click_history):
    # 根据 click_history 查询每条新闻的详细信息,并组合成字符串
    historical_records = []
    for idx, news_id in enumerate(click_history, start=1):
        # 查询每条新闻的详细信息
        record = df_news[df_news['news_id'] == news_id][[
            "category", "sub_category", "title", "abstract"]]
        # 组合成字符串,添加序号
        record_str = f"{idx}. " + ' '.join(
            f"[{col}]:{record.iloc[0][col]}" for col in [
                "category", "sub_category", "title", "abstract"])
        historical_records.append(record_str)
    logger.trace(f"历史交互 | historical_records: {historical_records}")
    return historical_records


def generate_user_profile(historical_records_str):
    # 生成用户画像: 通过理解`用户历史行为序列`,生成`用户感兴趣的话题`以及`用户位置信息`
    prompt = f"""Describe user profile based on browsed news list in the following format:

{historical_records_str}

You should describe the related topics and regions in the following format:

[topics]
-[topic1]

[region]
-[region1]
"""
    logger.info(f"prompt: \n{prompt}")

    # 模拟 NewsGPT 的调用
    gpt = NewsGPT()
    user_profile = gpt.get_completion(prompt)
#     user_profile = '''
# [topics]
# -TV Entertainment
# -Sports
# -Crime
# -Lifestyle
# -Movies
# -Politics
# [regions]
# -United States'''
    logger.success(f"用户画像 | user_profile: \n{user_profile}")
    return user_profile


def fine_ranking(
        user_profile,
        historical_records_str,
        candidate_list,
        top_n=5):
    prompt = f"""
I want you to recommend news to a user based on some personal information and historical records of news reading.

User profile: ```
{user_profile}
```

The historical records include news category, subcategory, title, and abstract. You are encouraged to learn his news preferences from the news he has read. Here are some examples:```
{historical_records_str}
```

Here's a list of news that he is likely to like: ```
{candidate_list}
```

Please select the top {top_n} news in the list that is most likely to be liked.
Please only output the order of these news in the form of a numerical list.
Example Output Format: 1,8,2,12,7

Output: """
    logger.info(f"prompt: \n{prompt}")
    gpt = NewsGPT()
    response = gpt.get_completion(prompt)
    logger.success(f"response: \n{response}")

    top_news = response.strip().split(',')
    return top_news