大模型Rag-指令调度

发布于:2025-04-18 ⋅ 阅读:(21) ⋅ 点赞:(0)

本文主要记录根据用户问题指令,基于大模型做Rag,匹配最相关描述集进行指令调度,可用于匹配后端接口以及展示答案及图表等。

1.指令查询处理逻辑

1.实现思路

  • 指令识别:主要根据用户的问题q计算与指令描述集is = [i0, ... , im]和指令关键词集ks = [k0, ... , km]之间的匹配距离sim,选取最匹配的指令,top1/top5

  • 用户指令与指令描述集embedding向量匹配相似度

  • 关键词处理:提取指令中的关键参数据,用户的问题q计算与指令提示词集ps = [p0, ... , pm],从用户问题中抽取指令执行需要的参数集

2.指令查询实现参数介绍

参数示例

{ attribute_ids: "[{"paramName":"项目名称","paramCode":"prejectname","paramPrompt":"请提取出项目名称","paramWeight":"0.4"}]", uni_key: "001", candidate_value: "科技园工人数量", name: "查询科技园工人数量", description: "查询科技园工人数量", tip_word: "", id: 2, keyword: "科技园,工人,数量", instruction_id: 2 }

参数代表含义

"attribute_ids": 属性值

"uni_key": 参数编码 示例:projectname

"candidate_value": 候选值

"name": "查询科技园工人数量"

"description": "查询科技园工人数量"

"tip_word": "",参数不全提示词

"id": 2,

"keyword": 关键词,

"instruction_id": 2 

3.指令查询实现细节

这一部分主要对指令描述集和指令关键词集的处理,针对参数为id,description,keyword

指令识别,主要根据用户的问题q计算与指令描述集is = [i0, ... , im]和指令关键词集ks = [k0, ... , km]之间的匹配距离sim

存储在在self.INSTTS中,记录id,指令描述集description,指令关键词集keyword,示例如下

# 提取内容
ists = jsonobj["data"]
for ist in ists:
    cid = ist["id"]
    cdes = ist["description"]
    ckws = ist["keyword"]
    self.INSTTS.append((cid, cdes, ckws))

# self.INSTTS值
self.INSTTS = [(2, '查询科技园工人数量', '科技园,工人,数量'), (4, '查询3月份营业额', 营业额'), (5, '打开百度首页', '打开,百度'), (6, '查询某项目某地区某段时间的营业额', '查询,项目,地区,营业额'),  (7, '请帮我打开智慧工地指挥中心', '打开,智慧工地,指挥中心'), (8, '打开数据标注', '打开,数据标注'), (9, '测试表格指令', '输出,测试表格'), (10, '查询七日气温', '查询,气温'), (12, '测试折线图', '测试折线图'), (13, '测试饼图', '测试饼图'), (14, '测试柱状图', '测试柱状图')]

指令描述集向量化,记录id2des,id2kws

self.id2key:记录编号和id,用于后续查询

self.id2des: 记录id和des指令描述,用于向量化

self.id2kws: 记录id和kws关键词,用于计算用户query和关键词相似度

num0 = 0
for record in self.INSTTS:
    _id = record[0]
    _des = record[1]
    _kws = record[2]

    d_embeddings = self.model.encode([self.instruction + _des], normalize_embeddings=True)
    self.index.add(d_embeddings)

    self.i2key[num0] = _id
    self.id2des[_id] = _des
    self.id2kws[_id] = _kws

    num0 += 1

4.属性查询实现细节

这一部分主要针对指令,对应参数为tip_word:参数不全提示词,attribute_ids:属性值的处理,其中attribute_ids的参数示例如下

attribute_ids: "[{"paramName":"项目名称","paramCode":"prejectname","paramPrompt":"请提取出项目名称","paramWeight":"0.4"}]",

参数代表意思解析

attribute_ids: "[{ "paramName":"项目名称", "paramCode":"prejectname", "paramPrompt":"请提取出项目名称", # 需要提取的内容 "paramWeight":"0.4"}]" # 参数权重

参数的存储,记录TIPSINFOS,cvalue,PARAMINFOS

self.TIPSINFOS:记录id和tip_word,后续用于根据id提取对应参数

cvalue: 记录paramName, paramCode, paramPrompt, paramWeight

self.PARAMINFOS:记录id和cvalue

jsonobj = json.loads(response.text)
params = jsonobj["data"]
for param in params:
    print(param)
    cid = param["id"]
    if "tip_word" in param:
        tip_word = param["tip_word"]
        self.TIPSINFOS[cid] = tip_word
    cvalue = []
    if "attribute_ids" in param and len(param["attribute_ids"])>0:
        attrs = json.loads(param["attribute_ids"])
        for attr in attrs:
            paramName = attr["paramName"]
            paramCode = attr["paramCode"]
            paramPrompt = attr["paramPrompt"]
            paramWeight = attr["paramWeight"]
            if len(paramName)>0 and len(paramCode)>0 and len(paramPrompt)>0 and len(paramWeight)>0 :
                cvalue.append((paramName, paramCode, paramPrompt, paramWeight))
                print(cid, paramName, paramCode, paramPrompt, paramWeight)
                
    self.PARAMINFOS[cid] = cvalue

2.用户指令与指令描述集处理

1.计算用户指令与指令描述集相似度,选取topk

  • 整体代码

question:用户指令问题

k:topk选取

theata:相似度阈值

def find_vec2(self, question, k, theata):
    print(question)
    q_embeddings = self.model.encode([self.instruction + question], normalize_embeddings=True)
    # print(q_embeddings)
    D, I = self.index.search(q_embeddings, k)  # v为待检索向量,返回的I为每个待检索query最相似TopK的索引list,D为其对应的距离

    result = []
    for i in range(k):
        sort_k = i + 1
        sim_i = I[0][i]
        uuid = self.i2key.get(sim_i, "none")
        sim_v = D[0][i]
        midkws = self.id2kws.get(uuid, "none")
        score = self.calculate_score(sim_v, question, midkws)  # 计算得分
 
        if score < theata:
            doc = {}
            doc["score"] = score
            doc["action_id"] = uuid
            result.append(doc)
    return result

uuid = self.i2key.get(sim_i, "none"): 根据编号查询id

midkws = self.id2kws.get(uuid, "none"): 根据id查询kws关键词

  • 进一步计算相似度

score = self.calculate_score(sim_v, question, midkws) :根据sim_v向量得分,question用户指令问题,midkws,kws关键词进一步计算匹配度得分

def calculate_score(self, score, question, kws):
    if kws == "none" or kws == "":
        return int(score * 1000)
    else:
        ckws = kws.split(",")
        chit = 0
        for kw in ckws:
            if question.find(kw) > -1:      # 检测问题中是否包含关键词
                chit += 1
        # print(score, chit, question, kws)
        return int(score * 1000) - 100 * chit

2.加载指令候选参数

如果命中指令,提取指令候选参数

self.TIPSINFOS: 参数不全提示词

self.PARAMINFOS:参数属性信息

optstr = self.TIPSINFOS.get(action_id, "指令参数不全。")   # 通过action_id获取参数不全提示词
cur_parms = self.PARAMINFOS.get(action_id, [])  # 通过action_id获取属性信息

3.依据大模型判断是否参数不全

没有参数不全提取任务

如果cur_parms的长度为0,证明没有判断参数不全的任务,直接返回结果

self.SUCCESS_SIGN:参数提前标志,默认为1,代表成功(没有参数提取任务也设置为1)

self.EMPTY_PARMS: 缺失参数字符,默认为空字符串

self.MISS_VALUE:缺失字符,默认为空字符串

if len(cur_parms) == 0:
    return self.SUCCESS_SIGN, self.EMPTY_PARMS, self.MISS_VALUE

有参数不全提取任务

  • datestr = time.strftime("%Y%m%d", time.localtime())
    parmnames = []
    for cp in cur_parms:
        parmnames.append(cp[0])
    parmnames_str = "、".join(parmnames)
    newquestion1 = "问题:'" + "我想查看" + question + "'\n" + \
              "假设今天是" + datestr + ",请根据以上问题,提取"+parmnames_str+",要求:\n"
    for n1 in range(len(cur_parms)):
        record = str(n1+1) + "、" + cur_parms[n1][2] + ";\n"
        newquestion1 = newquestion1 + record
    
    newquestion1 = newquestion1 + "请以JSON格式输出,要求:\n"
    for n1 in range(len(cur_parms)):
        record = str(n1+1) + "、" + cur_parms[n1][0] + "用" + cur_parms[n1][1] + "表示;\n"
        newquestion1 = newquestion1 + record
    
    print(newquestion1)
    
    rststr = self.excute_prompt(str(newquestion1)) # 访问星火大模型

    封装提示词给大模型

示例 newquestion1值

这个示例中,问题为我想查看打开百度,经过属性信息将要提取的参数封装到prompt提示词中

"问题:'我想查看打开百度' 假设今天是20250417,请根据以上问题,提取项目名称,要求: 1、请提取出项目名称; 请以JSON格式输出,要求: 1、项目名称用projectname表示; "

访问大模型之后的rststr值

'```json

{

"projectname": "查看打开百度"

}

```'

参数不全处理逻辑

默认设置参数缺失

这里cur_parms=[('项目名称', 'projectname', '请提取出项目名称', '0.4')],

默认设置miss_value中每一个参数都是缺失的

keywords_value = []
for cp in cur_parms:
    keywords_value.append(cp[1])
miss_value = {key: '1' for key in keywords_value}

4.参数不全处理

这里使用的星火大模型,模型返回的结果中,如果参数不全,会出现"由于问题中"的提示字符,这里通过它进行判断,如果要提取的参数key在大模型返回的结果中,则设置miss_value[key] = '0',表示该参数key不缺失,例如miss_value={'projectname': '0'}代表projectname不缺失。

if "由于问题中" in rststr:
    # rst = {key: '' for key in keywords_value}
    rststr = self.excute_prompt(str(newquestion1))
    if "由于问题中" in rststr:
        rststr = self.excute_prompt(str(newquestion1))
        if "由于问题中" in rststr:
            return self.PARAMTER_PARTIAL, optstr, miss_value
        else:
            for key in keywords_value:
                if key in rststr:
                    miss_value[key] = '0'
    else:
        for key in keywords_value:
            if key in rststr:
                miss_value[key] = '0'
else:
    for key in keywords_value:
        if key in rststr:
            miss_value[key] = '0'
# 打印结果
print(miss_value)

5.解析大模型返回的参数

sindex = rststr.find("{")
eindex = rststr.find("}")
print(sindex, eindex)
if sindex == -1 and eindex == -1:
    return self.PARAMTER_PARTIAL, optstr, miss_value

rststrjson = rststr[sindex:eindex + 1]
print(rststrjson)
rststrjson = rststrjson.replace("'", "\"")
print(rststrjson)

rst = {}
rstjson = json.loads(rststrjson)
n = len(cur_parms)
# rst = json.dumps(rst)

6.计算相似度

  • 根据大模型返回参数和配置的参数计算相似度

    for n1 in range(len(cur_parms)):
        midcode = cur_parms[n1][1]
        midtheta = 0.4
        try:
            midtheta = float(cur_parms[n1][3])
        except:
            pass
        print(midcode, midtheta)
        if midcode in rstjson:
            midvalue = rstjson[midcode]
            if midtheta > 0.0:
                msim, mval = self.search.match_parameter(action_id, midcode, midvalue, theata=midtheta)
                if msim > 0:
                    rst[midcode] = mval
                else:  # 查询的项目名称与参数完全不同
                    print("msim is 0.0.")
                    rst[midcode] = ''
                    miss_value[midcode] = '1'
                    n-=1
                    continue
                    # return self.FAILED_SIGN, json.dumps(rst), miss_value
            else:
                print("midtheta is 0.0.")
                rst[midcode] = midvalue
        else:
            print(midcode, " not in ", rstjson)
            rst["optstr"] = optstr
            return self.FAILED_SIGN, json.dumps(rst), miss_value
    print(rst)
    if n != len(cur_parms):
        rst["optstr"] = optstr
        return self.FAILED_SIGN, json.dumps(rst), miss_value
    return self.SUCCESS_SIGN, json.dumps(rst), miss_value
  • 参数相似度计算,找到最相似的参数

    def match_parameter(self, action_id, uni_key, parm, theata=0.4):
        midkey = str(action_id) + "_" + uni_key
        # 存储参数parm中的元素
        cwds = {}
        for i in range(len(parm)):
            cwds[parm[i]] = ""
    
        rstsim = 0.0    # 初始化相似度结果为0.0
        rstsss = ""     # 初始化最匹配的候选值为空字符串
        """
        # 遍历 self.PARAMS 字典中以 midkey 为键的所有候选值及其拆分后的字典表示
        # sss 表示候选值,wds 表示候选值拆分后的字典
        """
        for sss, wds in self.PARAMS[midkey].items():
            # print(sss, wds)
            csim = self.sim_parameter(cwds, wds)    # 计算输入的cwds 与 库里的 wds相似度
            # 如果相似度csim大于预先设定的theata阈值,并且相似度csim大于之前设定的最大相似度rstsim
            if csim >= theata and csim > rstsim:
                # 更新最大相似度为当前相似度
                rstsim = csim
                # 更新最匹配的候选值为当前候选值
                rstsss = sss
        # 打印匹配结果,包括输入参数、相似度(保留三位小数)、最匹配的候选值和阈值
        print("match_parameter --- ", parm, round(rstsim, 3), rstsss, theata)
        # 返回最大相似度和最匹配的候选值
        return rstsim, rstsss
  • 计算交并比

def sim_parameter(self, parm1, parm2):
    union_ws = {}
    join_ws = {}
    
    for w1 in parm1.keys():
        union_ws[w1] = ""
        if w1 in parm2:
            join_ws[w1] = ""

    for w2 in parm2.keys():
        union_ws[w2] = ""
        
    return 1.0 * len(join_ws.keys()) / len(union_ws.keys())


网站公告

今日签到

点亮在社区的每一天
去签到