python+django+transformers模型:实现商品推荐功能(集成nacos配置,数据端mongo)

发布于:2025-02-27 ⋅ 阅读:(11) ⋅ 点赞:(0)

一、环境安装准备

   #创建 虚拟运行环境
    python -m venv myicrplatenv
    #刷新
    source myicrplatenv/bin/activate
    #python Django 集成nacos
    pip install nacos-sdk-python
    #安装 Django
    pip3 install Django==5.1
    #安装 pymysql settings.py 里面需要 
    # 强制用pymysql替代默认的MySQLdb pymysql.install_as_MySQLdb()
    pip install pymysql
    # 安装mongo
    pip install djongo pymongo
    pip install transformers
    pip install torch
    #安装Daphne: 
    pip install daphne
    #项目通过 通过 daphne启动
    daphne icrplat.asgi:application

二、构建项目及app模块

#创建app模块
python3 manage.py startapp cs
icrplat

├── README.md
├── cs
│   ├── __init__.py
│   ├── __pycache__
│   ├── admin.py
│   ├── apps.py
│   ├── migrations
│   ├── models.py
│   ├── tests.py
│   └── views.py
├── icrplat
│   ├── __init__.py
│   ├── __pycache__
│   ├── asgi.py
│   ├── common
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
├── manage.py
├── myicrplatenv
│   ├── bin
│   ├── include
│   ├── lib
│   ├── pyvenv.cfg
│   └── share
├── nacos-data
│   └── snapshot
└── templates

三、准备数据mongo

 db.products.insertMany([
        {'name': '手机', 'description': '最新款智能手机,支持5G网络,高清摄像头'},
        {'name': '无线耳机', 'description': '降噪无线耳机,蓝牙连接,长续航'},
        {'name': '智能手表', 'description': '健康监测,运动记录,支持通知提醒'},
        {'name': '平板电脑', 'description': '轻薄便携,高性能处理器,适合办公和娱乐'},
        {'name': '笔记本电脑', 'description': '高性能笔记本,适合游戏和设计工作'},
        {'name': '相机', 'description': '专业级相机,支持4K视频拍摄'},
        {'name': '耳机', 'description': '高保真音质,舒适佩戴'},
        {'name': '充电宝', 'description': '大容量充电宝,支持快充'},
        {'name': '手机壳', 'description': '防摔手机壳,支持多种机型'},
        {'name': '路由器', 'description': '高速无线路由器,支持千兆网络'},
    ])

四、相关代码

#################################settings.py#######################################

from nacos import NacosClient

from icrplat.common.config.nacos.NacosConfigWatcher import nacos_config_watcher


import pymysql

# 强制用pymysql替代默认的MySQLdb
pymysql.install_as_MySQLdb()


INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'cs'  #模块app 注入
]

# Nacos 配置
NACOS_SERVER = "xx"  # Nacos 服务器地址 ip 换成Nacos对应的IP地址
NACOS_NAMESPACE = "dev"  # Nacos 命名空间
NACOS_GROUP = "MICRO_GROUP"  # Nacos 分组
NACOS_DATA_ID = "aics_config_dev"  # Nacos 配置 ID

# 初始化 Nacos 客户端
nacos_client = NacosClient(NACOS_SERVER, namespace=NACOS_NAMESPACE)

# 从 Nacos 获取 config配置
nacos_config = nacos_client.get_config(NACOS_DATA_ID, NACOS_GROUP)
# 将 JSON 字符串转换为字典
nacos_config = eval(nacos_config)
print(f"nacos_config: {nacos_config}")

#解析 mysql 初始配置
mysql_config = nacos_config.get("mysql", {})
print(f"mysql_config: {mysql_config}")

#解析 mongodb 初始配置
mongo_config = nacos_config.get("mongodb",{})
print(f"mongo_config: {mongo_config}")


# 配置 MySQL 数据库
DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.mysql',
        'NAME': mysql_config.get("DATABASE_NAME"),
        'USER': mysql_config.get("DATABASE_USER"),
        'PASSWORD': mysql_config.get("DATABASE_PASSWORD"),
        'HOST': mysql_config.get("DATABASE_HOST"),
        'PORT': mysql_config.get("DATABASE_PORT", "8081"),
        'OPTIONS': {
            'charset': 'utf8mb4',  # 支持更广泛的字符集
        },
    },
    'mongodb': {  # MongoDB配置
        'ENGINE': 'djongo',
        'NAME': mongo_config.get('DB_NAME'),
        'ENFORCE_SCHEMA': False,
        'CLIENT': {
            'host': mongo_config.get('DB_HOST'),
            'port': int(mongo_config.get('DB_PORT')),
            'username': mongo_config.get('DB_USERNAME'),
            'password': mongo_config.get('DB_PASSWORD'),
            'authSource': mongo_config.get('AUTH_DB_SOURCE'),
        }
    }
}
#使用多数据库,需要在 settings.py 中设置数据库路由。
DATABASE_ROUTERS = ['icrpPlat.dbconfig.routers.DatabaseRouter']

print(f"更新前 Databases: {DATABASES}")


def update_nacos_config(config):
    """动态更新 MySQL 配置"""
    print("收到 Nacos 配置更新通知")
    print(f"收到更新后 config: {config}")

    # 使用 json.loads 解析 JSON 字符串  content
    config_dict = json.loads(config['content'])

    #获取更新后的 mysql 配置
    mysql_config = config_dict.get("mysql", {})
    print(f"更新后 Databases: {mysql_config}")
    nacos_config_watcher.update_mysql_config(mysql_config)

    #获取更新后的 elasticsearch 配置
    mongo_config = config_dict.get("mongodb",{})
    print(f"更新后 mongodb: {mongo_config}")
    nacos_config_watcher.update_mongodb_config(mongo_config)





# 监听 Nacos 配置变化
nacos_client.add_config_watcher(NACOS_DATA_ID, NACOS_GROUP, update_nacos_config)









#########################nacos_config_watcher.py 方法##############################

import threading

from django.conf import settings


class NacosConfigWatcher:
    def __init__(self):
        # 创建一个锁对象 可以确保在同一时间只有一个线程能够访问某个共享资源,从而避免多线程环境下的数据竞争问题。
        self.lock = threading.Lock()

    """
        更新myslq 配置
        with 语句主要用于上下文管理,通常用于处理资源的管理和释放。它的核心作用是确保在代码块执行完毕后,
        资源能够被正确地关闭或清理,避免资源泄漏。常见的场景包括文件操作、数据库连接、线程锁等。
    """
    def update_mysql_config(self, mysql_config):
        with self.lock:
            settings.DATABASES['default'].update({
                'NAME': mysql_config.get("DATABASE_NAME"),
                'USER': mysql_config.get("DATABASE_USER"),
                'PASSWORD': mysql_config.get("DATABASE_PASSWORD"),
                'HOST': mysql_config.get("DATABASE_HOST"),
                'PORT': mysql_config.get("DATABASE_PORT", "3306"),
            })

    def update_mongodb_config(self, mongo_config):
        with self.lock:
            settings.DATABASES['mongodb'].update({
                'host': mongo_config.get('DB_HOST'),
                'port': int(mongo_config.get('DB_PORT')),
                'username': mongo_config.get('DB_USERNAME'),
                'password': mongo_config.get('DB_PASSWORD'),
                'authSource': mongo_config.get('AUTH_DB_SOURCE'),
            })

#使用模块级别的单例 可以有多种实现单例的形式
nacos_config_watcher = NacosConfigWatcher()
##################################urls.yml####################################
urlpatterns = [
    path('admin/', admin.site.urls),
    path('cs/handleUserRequest',cs_views.handleUserRequest, name='handleUserRequest'),
    path('cs/getProducts',cs_views.getProducts,name='get_product'),
    path('cs/getdata',cs_views.getdata,name='get_data')
]
####################################views.py################################
from django.shortcuts import render

import json
import logging
import torch
from torch.nn.functional import cosine_similarity
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt


from icrplat.common.enum.ResponeCodeEnum import ResponseCodeEnum
from icrplat.common.exception.BusinessException import BusinessException
from icrplat.common.utils.CommonResult import CommonResult
from cs.models import Products

from functools import lru_cache

from icrplat.common.utils.TransformerUtils import TransformerUtils


# import os
# os.environ["HF_ENDPOINT"] = "https://hf-mirror.com"


# Create your views here.
@csrf_exempt  # 如果需要关闭CSRF验证,可以加上这个装饰器
def handleUserRequest(request):
    try:
        if request.method == 'POST':
            json_data = request.body
            data = json.loads(json_data)
            # 假设你需要处理的数据结构如下
            message = data.get('message')
            logging.info(f"data:{data}")
        else:
            raise BusinessException(ResponseCodeEnum.METHOD_ERROR.status_code, ResponseCodeEnum.METHOD_ERROR.status_message)
        return JsonResponse(CommonResult.success_data(None), json_dumps_params={'ensure_ascii': False})
    except BusinessException as e:
        return JsonResponse(CommonResult.error(e.code, e.message), json_dumps_params={'ensure_ascii': False})



def getProducts(request):
    try:
        if request.method == 'GET':
            products_mongo = Products.objects.using('mongodb').all()
        else:
            raise BusinessException(ResponseCodeEnum.METHOD_ERROR.status_code,
                                    ResponseCodeEnum.METHOD_ERROR.status_message)
        # 将查询结果转化为字典或列表,确保可以被序列化为 JSON
        # products_data = [{'name': product.name, 'description': product.description} for product in products_mongo]
        # 使用列表推导式调用 to_dict 方法,将查询结果转化为字典
        #注意点: 需要通过实例来调用,而不是通过类直接调用
        products_data = [product.to_dict() for product in products_mongo]
        return JsonResponse(CommonResult.success_data(products_data), json_dumps_params={'ensure_ascii': False})
    except BusinessException as e:
        return JsonResponse(CommonResult.error(e.code, e.message), json_dumps_params={'ensure_ascii': False})



@csrf_exempt
def getdata(request):
    try:
        if request.method == 'GET':
            description = request.GET.get("description")
            if not description:
                raise BusinessException(ResponseCodeEnum.PARAM_ERROR.status_code, "描述不能为空")

            products_mongo = Products.objects.using('mongodb').all()
            products_data = [product.to_dict() for product in products_mongo]

            # 生成商品嵌入向量
            product_embeddings = {product['name']: get_cached_bert_embeddings(product['description']) for product in products_data}

            # 生成用户查询的嵌入向量
            user_embeddings = get_cached_bert_embeddings(description)

            # 获取 计算相似度后的结果 商品product_embeddings 向量 用户user_embeddings 向量
            top_products = TransformerUtils.similarities(product_embeddings,user_embeddings)

            if not top_products:
                return JsonResponse({"message": "未找到相关商品"}, json_dumps_params={'ensure_ascii': False})

            result_with_description = {}
            for product_name, similarity in top_products.items():
                product_description = next(
                    (product['description'] for product in products_data if product['name'] == product_name),
                    "暂无描述")
                result_with_description[product_name] = {
                    "相似度": similarity,
                    "描述": product_description
                }

            return JsonResponse(CommonResult.success_data(result_with_description), json_dumps_params={'ensure_ascii': False})

        else:
            raise BusinessException(ResponseCodeEnum.METHOD_ERROR.status_code,
                                   ResponseCodeEnum.METHOD_ERROR.status_message)
    except BusinessException as e:
        return JsonResponse(CommonResult.error(e.code, e.message), json_dumps_params={'ensure_ascii': False})




"""
    在代码中使用 get_cached_bert_embeddings 替代 get_bert_embeddings,以减少重复计算。
"""
@lru_cache(maxsize=1000)
def get_cached_bert_embeddings(text):
    return TransformerUtils.get_bert_embeddings(text)

#################################TransformerUtils.py##########################




import logging

from transformers import BertTokenizer, BertModel
import torch
from torch.nn.functional import cosine_similarity


import os


from transformers.utils.hub import TRANSFORMERS_CACHE

class TransformerUtils:
    """
        这个函数的主要功能是通过 BERT 模型生成文本的嵌入向量。
        如果在加载模型或生成嵌入的过程中遇到任何问题,函数会返回一个默认的零向量,
        确保程序能够继续运行,而不会因为异常而中断。
    """

    @staticmethod
    def get_bert_embeddings(texts):
        # 输出 BERT 模型的默认缓存路径。通常,预训练模型和分词器会从缓存中加载,以减少重复下载的时间。
        logging.info(f"Default cache path: {TRANSFORMERS_CACHE}")

        """ 
            如果输入的 texts 为空或者不是字符串类型,函数会返回一个长度为 768 的零向量。
            BERT 模型的嵌入向量通常是 768 维的,所以这里返回一个零向量作为默认值。 0.0
            返回一个长度为 768 的列表,所有元素均为浮点数 0.0,例如:[0.0, 0.0, ..., 0.0]  # 共768个元素  
        """
        if not texts or not isinstance(texts, str):
            return [0.0] * 768

        # 指定模型路径
        path = "/Users/jiajiamao/soft/python/space/bert-base-chinese"

        logging.info(f"模型路径指定 path:{path}")
        try:
            # 检查路径是否存在
            if not os.path.exists(path):
                logging.info(f"模型路径 不存在!")
                # 这里指定了 BERT 模型的路径,并检查该路径是否存在。如果路径不存在,会抛出一个 FileNotFoundError 异常
                raise FileNotFoundError(f"Model path {path} does not exist.")

            """
                加载分词器:
                    使用 BertTokenizer.from_pretrained 方法加载分词器。
                        输入‌:"高性能"  转变成 [高,性,能]
                    分词器的作用是将输入文本转换为模型能够理解的 token 序列。
                        词向量转换(tokens)  # 转为数字矩阵
            """
            tokenizer = BertTokenizer.from_pretrained(path)
            logging.info(f"********************加载分词器:Tokenizer loaded successfully")

            """
                加载模型:
                    使用 BertModel.from_pretrained 方法加载 BERT 模型。
                    加载成功后,model 变量将包含预训练的 BERT 模型。
            """
            model = BertModel.from_pretrained(path)
            logging.info(f"********************加载模型:BertModel loaded successfully")

            # 确保加载成功
            if tokenizer is None:
                raise ValueError("Tokenizer failed to load.")
            if model is None:
                raise ValueError("Model failed to load.")

            """
                # 原理实现步骤:
                # 1. 添加特殊标记 → [CLS] 我 在 学习 BERT 模型 [SEP]
                # 2. 分词 → ['降', '噪', '无', '线', '耳', '机', ',', '蓝', '牙', '连', '接', ',', '长', '续', '航']
                # 3. 转换为ID → [ 101, 7360, 1692, 3187, 5296, 5455, 3322, 8024, 5905, 4280, 6825, 2970,
                #          8024, 7270, 5330, 5661,  102] 方便输入模型
            """
            inputs = tokenizer(texts, return_tensors='pt', max_length=512, truncation=True, padding=True)
            logging.info(f"********************tokenizer 分词:{tokenizer.tokenize(texts)}")
            logging.info(f"********************input:{inputs}")

            """
                with torch.no_grad(): 表示在推理过程中不计算梯度,以提高效率。
                把分好词的句子变成一系列向量,每个词对应一个向量。
                每句话里的每个词都会得到一个最终的向量表示
                模型被调用时它会自动执行自注意力机制的计算。BERT由多个Transformer编码层堆叠而成,每一层都包含一个自注意力模块:
                    # 每个词生成3个向量:
                        Q(查询向量):用来询问其他词和它的关系。
                        K(钥匙向量):用来衡量其他词和它的相关性。
                        V(值向量):用来表示这个词的实际内容。
                        
                    # 计算"高"对每个词的关注度:
                        attention_score = Q_高 ⋅ [K_高, K_性, K_能]  
                        # → 得到 [0.8, 0.1, 0.1](更关注自己)
                        用“高”的查询向量(Q_高)去分别与“高”、“性”、“能”的钥匙向量(K_高, K_性, K_能)做点积运算。
                        点积的结果就是注意力分数,比如这里得到 [0.8, 0.1, 0.1]。
                        这表示“高”更关注自己(0.8),而对“性”和“能”的关注度较低。
                            
                    # 加权求和:
                        用注意力分数对“高”、“性”、“能”的值向量(V_高, V_性, V_能)进行加权求和。
                        比如,新的“高”的表示 = 0.8 * V_高 + 0.1 * V_性 + 0.1 * V_能。
                        
                这个过程的意义在于,通过计算每个词对其他词的关注度,让模型能够更好地理解上下文关系。
                比如“高”这个词在这里更关注自己,所以它会更多地保留自己的信息,而稍微融入一点“性”和“能”的信息。
            """
            with torch.no_grad():
                outputs = model(**inputs)

            """
               1.从BERT的输出中提取最后一层的隐藏状态(last_hidden_state)。
               2.对所有词的向量做平均,得到整个句子的向量表示(mean(dim=1))。
               3.去掉多余的维度(squeeze())。
               4.把Tensor转换成NumPy数组(.numpy())。
               5.最后把数组转换成列表(.tolist())
            """
            embeddings = outputs.last_hidden_state.mean(dim=1).squeeze().numpy().tolist()
            logging.info(f"********************原内容Text: {texts}")
            logging.info(f"********************BERT生成一个768维的向量: {embeddings[:10]}")  # 只打印前10维,方便检查
            return embeddings
        except Exception as e:
            print(f"Error in get_bert_embeddings: {e}")
            return [0.0] * 768  # 出现异常时返回默认值




    """
        计算相似度
            通过分析用户的兴趣(用户向量)和商品的特点(商品向量),计算它们之间的匹配度。
            然后根据匹配度排序,筛选出最相关的前 5 个商品。
            最后,只保留那些匹配度足够高(大于阈值)的商品,确保推荐的内容对用户来说是有意义的。
            
            @:param product_embeddings 商品向量 
            @:param user_embeddings 用户向量
    """
    @staticmethod
    def similarities(product_embeddings,user_embeddings):
        #将用户向量转换为张量形式
        user_embeds_tensor = torch.tensor(user_embeddings, dtype=torch.float32).unsqueeze(0)
        # 计算相似度
        similarities = {}
        for product_name, embedding in product_embeddings.items():
            #将商品的向量转换为张量形式,并调整其维度以匹配用户向量的计算
            product_embeds_tensor = torch.tensor(embedding, dtype=torch.float32).unsqueeze(0)
            #计算用户向量与商品向量之间的余弦相似度,值范围为 [-1, 1],值越接近 1 表示相似度越高。
            similarity = cosine_similarity(user_embeds_tensor, product_embeds_tensor).item()
            #将结果存储在一个字典中,键为商品名称,值为相似度值
            similarities[product_name] = similarity
            print(f"Product: {product_name}, Similarity: {similarity}")

        # 根据相似度排序(从高到低)
        sorted_products = sorted(similarities.items(), key=lambda item: item[1], reverse=True)
        print(f"Sorted Products: {sorted_products}")

        # 设置相似度阈值,并返回相似度最高的前5个商品(设置阈值并筛选商品)
        threshold = 0.6
        top_products = {k: v for k, v in sorted_products[:5] if v > threshold}
        return top_products


###############################ResponseCodeEnum###############################

from enum import Enum



"""
    返回通用响应类
"""
class ResponseCodeEnum(Enum):
    ###################################公共响应#############################
    SUCCESS = (200, "操作成功!")
    PARAMS_ERROR = (400, "参数解析失败,请核对参数!")
    UNAUTHORIZED = (401, "未认证(签名错误)")
    FORBIDDEN = (402, "请求错误")  # 返回失败业务公共code
    MEDIA_TYPE_ERROR = (403, "不支持的媒体异常,请核对contentType!")
    URL_REQ_NULL = (404, "请求路径不存在")
    METHOD_ERROR = (405, "不支持当前请求方法,请核对请求方法!")
    TIMEOUT_EXPIRE_ERROR = (406, "token登录过期!")
    TOKEN_ILLEGAL = (407, "非法token!")
    INTERNAL_SERVER_ERROR = (500, "服务器内部错误!")  # 系统异常公共code

    ###################################相关业务返回#############################
    RESOURCES_IP_EXIST = (1001,"资源IP已存在!")


    """
        在枚举值初始化时,将元组中的code和message分别赋值给实例属性self.code和self.message。
        这样每个枚举值都有独立的code和message属性。
    """
    def __init__(self, code, message):
        self.code = code
        self.message = message

    """
        @property 是 Python 中用于将类的方法转换为属性访问的装饰器。
        使用 @property 装饰器,你可以像访问属性一样访问方法,而不需要调用它
        示例:ResponseCodeEnum.SUCCESS.status_code      
    """
    @property
    def status_code(self):
        return self.code

    @property
    def status_message(self):
        return self.message




 

#####################################CommonResult.py###############################




from typing import Any, Optional

from icrplat.common.enum.ResponeCodeEnum import ResponseCodeEnum


class CommonResult:
    """
       :param res: 类型为 ResponseCodeEnum,用于存储响应状态码和消息
       :param data: 类型为 Any,用于存储返回的数据
       :param pagination: 类型为 Optional[dict],是一个可选的参数,用于存储分页信息(默认为 None)
    """
    def __init__(self, res: ResponseCodeEnum, data: Any, pagination: Optional[dict] = None):
        self.ResponseCodeEnum = ResponseCodeEnum
        self.data = data
        self.pagination = pagination


    """
         这是一个装饰器,用于定义静态方法。
         静态方法不需要访问类实例(self)或类本身(cls),可以直接通过类名调用。
         CommonResult.success_pagination
    """
    @staticmethod
    def success_pagination(data: Any, pagination: Optional[dict] = None):
        """
           :param data: Any: 表示返回的数据,类型为 Any(可以是任意类型)。
           :param pagination: 表示可选的分页信息,类型为字典(dict),默认值为 None。
           :return:
                        返回一个字典,包含以下键值对:
                        code: 状态码,取自 ResponseCodeEnum.SUCCESS.status_code。
                        message: 响应消息,取自 ResponseCodeEnum.SUCCESS.message。
                        data: 传入的 data 数据。
                        pagination: 传入的分页信息(如果未传入则为 None)
         """
        return {
            'code': ResponseCodeEnum.SUCCESS.code,
            'message': ResponseCodeEnum.SUCCESS.message,
            'data': data,  # 将 QuerySet 序列化为 JSON,
            'pagination': pagination
        }


    @staticmethod
    def success_data(data: Any):
        return {
            'code': ResponseCodeEnum.SUCCESS.code,
            'message': ResponseCodeEnum.SUCCESS.message,
            'data': data,  # 将 QuerySet 序列化为 JSON,
        }

    @staticmethod
    def error(code,message):
        return {
            'code': code,
            'message': message
        }

    @staticmethod
    def error():
        return {
            'code':ResponseCodeEnum.INTERNAL_SERVER_ERROR.code,
            'message':ResponseCodeEnum.INTERNAL_SERVER_ERROR.message
        }
###############################BusinessException.py###############################



class BusinessException(Exception):
   def __init__(self,message,code):
       self.message = message
       self.code = code
       super().__init__(message)
#####################################Products.py##################################


from django.db import models

# Create your models here.


from django.db import models


class Products(models.Model):
    # 定义 name 字段
    name = models.CharField(max_length=100)
    # 定义 description 字段
    description = models.TextField()

    class Meta:
        app_label = 'mongodb'
        db_table = 'products'  #


    """
        自定义 to_dict 实例方法
    """
    def to_dict(self):
        """将模型实例转化为字典"""
        return {
            'name': self.name,
            'description': self.description
        }

 

五、nacos配置

#换成自己的地址



{
  "mysql": {
    "DATABASE_NAME": "micro_aics",
    "DATABASE_USER": "xx",
    "DATABASE_PASSWORD": "xx",
    "DATABASE_HOST": "xx",
    "DATABASE_PORT": "8081"
  },
  "mongodb": {
    "DB_NAME": "action_log",
    "DB_HOST": "xx",
    "DB_PORT": "xx",
    "DB_USERNAME": "xx",
    "DB_PASSWORD": "xx",
    "AUTH_DB_SOURCE": "xx"
  }
}

六、启动项目

#通过 daphne 启动应用
daphne icrplat.asgi:application

七、测试