万字长文保姆级教你制作自己的多功能QQ机器人

发布于:2023-01-01 ⋅ 阅读:(708) ⋅ 点赞:(0)
转载请注明出处:小锋学长生活大爆炸( https://xfxuezhang.blog.csdn.net/)
若发现存在部分图片缺失,可以访问原文: 万字长文保姆级教你制作自己的多功能QQ机器人 - 小锋学长生活大爆炸

​​​​​​

目录

前言

功能清单

免费领取轻量应用云服务器

SSH连接服务器

常见Ubuntu软件安装与问题修复

搭建mirai环境

Python控制mirai篇

debug输出封装

交互授权

绑定bot

释放bot

未读消息的数量

获取最新的消息

解析消息内容

向好友发送消息

向群发送消息

向群发送富文本消息

Q群消息转发

类似QMsg酱的消息通知

多功能切换的实现设计

翻译查询

领取腾讯免费翻译API

机器人接入翻译功能

实时天气

领取免费的和风天气API

机器人接入天气功能

实时热搜

领取免费的天行热搜API

机器人接入热搜功能

照片上传

领取腾讯对象存储COS

机器人接入图片上传功能

自行添加小功能函数总结

控制树莓派舵机与屏显

腾讯云服务器搭建MQTT环境

待实现功能

接入控制ESP32(实现智能家居控制)

完整代码整理


前言

        QQ、微信是我们平常使用最多的通讯工具,网上也有很多通过软件去控制QQ/微信的开源工具,通过这些工具,我们可以实现许多有意思的效果,而不仅仅局限于消息聊天。
        自从微信网页版被官方禁用后,微信的软件工具几乎已经失效了,现有的一些是通过hook微信本身来实现,这种很容易被官方检测并封号。另一些是通过注册企业号来控制,但不直观且功能受限。
        这里我们借助相对更开放的QQ来制作我们的机器人,对比几款工具后,最终选择了mirai

先放一张整体结构图:

 

功能清单

        网上现有开源的机器人大多只是实现了类似“自动推送天气、接入图灵机器人自动聊天”等等,大多属于自娱自乐,没有发挥最大用途。
        因此,我们的QQ机器人(暂且取名为“小锋仔”)是根据日常所需而制,包含常用功能且设计得易于扩展。
目前包含的功能有:

  • 类似QMsg酱的消息通知
  • QQ群消息转发
  • 翻译查询
  • 照片上传
  • 实况天气
  • 实时热搜
  • 控制树莓派舵机
  • 控制树莓派屏显
  • ... ...

将来可能包含的功能有:

  • 接入控制ESP32(实现智能家居控制)

        接下来详细介绍如何自己搭建一个这样的QQ机器人。篇幅较长保姆级详细,建议收藏后慢慢看。

免费领取轻量应用云服务器

首先为了 能运行mirai,且随时随地能连接,我们需要有一个 具备公网IP的服务器。这里使用腾讯云的 免费服务器

        对于还不想买的童靴,可以免费领取腾讯云提供的1个月服务器试用套餐。步骤:

  1. 进入官网领取:云产品免费试用;需要选购的进:轻量应用服务器专场;不清楚怎么操作的可以看教程:腾讯云产品免费试用教程
  2. 领取完成后,由于后面需要用到端口,因此这里我们提前开放2个端口:88889966

        这里腾讯云可能有个小特点。如果发现在控制台防火墙放行后,还是无法访问。需要再在服务器里放行一下端口。这里先写着,大家可以在后面一节中连接上了服务器,再回过来这里输入指令。

sudo apt install firewalld -y
sudo firewall-cmd --list-all
sudo firewall-cmd --permanent --zone=public --add-port=8888/tcp && sudo firewall-cmd --reload
sudo firewall-cmd --permanent --zone=public --add-port=9966/tcp && sudo firewall-cmd --reload
sudo systemctl start firewalld.service

SSH连接服务器

        服务器初始化完成后,就可以通过SSH去连接了。这里我们可以直接使用powershell来连接,其他SSH软件我强推mobaxterm!!安装包也已经准备好了:MobaXterm.exe

  1. 搜索打开powershell:

  1. 输入以下命令连接SSH:
ssh 用户名@<公网ip>
  1. 或者使用MobaXterm软件:

  1. 先更新一下软件库:
sudo apt upgrade -y
sudo apt autoremove -y
  1. 一般不建议使用管理员账户,因此我们要自己新建一个账户:
sudo adduser sxf


然后将账户加入sudoers组:

sudo apt install vim
sudo vim /etc/sudoers


        然后退出软件,重新用新建的账号登录即可。
        至此,服务器环境就搭建完成了。

常见Ubuntu软件安装与问题修复

        这篇博客里记录了很多我在使用过程中,常用软件的安装,非常详细且经过亲测,时不时也会更新内容,大家可以收藏以备下次使用。
Ubuntu20.04 + VirtualBox相关_小锋学长生活大爆炸的博客-CSDN博客


搭建mirai环境

        接下来就要在服务器上搭建QQ机器人(mirai)基础环境。搭建完成后,我们就可以远程跟机器人进行交互。
        官方mirai的github仓库:GitHub - mamoe/mirai: 高效率 QQ 机器人支持库
        由于github是国外的,而官方已经不再支持gitee的维护,因此如果大家无法访问上面的连接,可以用我帮大家下载下来的安装包:
        其他的一些文档:Mirai | mirai
        官方论坛:主页 | MiraiForum
下面开始正式安装:

  1. 先SSH连接上服务器,建议不要用root用户登录。
  2. 下载安装包mcl-installer-a02f711-linux-amd64:
mkdir qqbot
cd qqbot
wget http://xfxuezhang.cn/web/share/QQBot/mcl-installer-a02f711-linux-amd64
sudo chmod +x mcl-installer-a02f711-linux-amd64

        此时需要输入密码(在上面选购并装完服务器后会显示,当时要求记下的)。

./mcl-installer-a02f711-linux-amd64

        此时进入安装流程,弹出的几个选项都直接回车选默认即可。

  1. 安装完成后,还需要安装mirai-api-http。在当前页面下,继续输入:
./mcl --update-package net.mamoe:mirai-api-http --channel stable-v2 --type plugin
  1. 编辑_config/net.mamoe.mirai-api-http/setting.yml_配置文件 (没有则自行创建)
## 配置文件中的值,全为默认值

## 启用的 adapter, 内置有 http, ws, reverse-ws, webhook
adapters:
  - http
  - ws

## 是否开启认证流程, 若为 true 则建立连接时需要验证 verifyKey
## 建议公网连接时开启
enableVerify: true
verifyKey: 1234567890

## 开启一些调式信息
debug: false

## 是否开启单 session 模式, 若为 true,则自动创建 session 绑定 console 中登录的 bot
## 开启后,接口中任何 sessionKey 不需要传递参数
## 若 console 中有多个 bot 登录,则行为未定义
## 确保 console 中只有一个 bot 登陆时启用
singleMode: false

## 历史消息的缓存大小
## 同时,也是 http adapter 的消息队列容量
cacheSize: 4096

## adapter 的单独配置,键名与 adapters 项配置相同
adapterSettings:
  ## 详情看 http adapter 使用说明 配置
  http:
    # 0.0.0.0是允许远程访问,localhost只能同机器访问
    host: 0.0.0.0
    port: 8888
    cors: ["*"]
    unreadQueueMaxSize: 100
  
  ## 详情看 websocket adapter 使用说明 配置
  ws:
    host: localhost
    port: 8080
    reservedSyncId: -1
  1. 启动mirai即可:
./mcl

        首次启动会自动下载jar包。等待启动完成后,输入"?",可以查看所有支持的mcl命令。

  1. 使用以下命令即可登录QQ号:
/login  [password] 

        如果想要启动mcl后自动登录QQ号,可以用:

/autoLogin add   

        也可以设置不同的设备登录。

/autoLogin setConfig  protocol ANDROID_PAD

        它对应的配置文件其实就在:config/Console/AutoLogin.yml

  1. 现在QQ风控很严了,第一次登录很有可能遇到“需要滑动验证码”的。建议申请小号使用,以免发生不测。并且首次使用时在QQ“账号安全设置”中关闭“安全登录检查”、“陌生设备登录保护”。如果遇到验证码,可以尝试:

    1. 将Captcha link通过另一个QQ,发给待登录的mirai-QQ,手机登录mirai-QQ并点击链接,手动完成滑块验证,然后回到mobaxterm输入回车;

  1. 如果不行,就参考这个链接的方法:GitHub - project-mirai/mirai-login-solver-selenium: SliderCaptcha solver
  2. 还不行,再参考这个链接的方法:mirai-login-solver-selenium | mirai
  3. 还有一个小技巧可以尝试。在手机端先通过手机号登录QQ,如果没问题,再通过手机号在mirai上登录。手机建议先登录上mirai-QQ,有时可能会弹窗提示“是否允许陌生设备登录”等等,要手动点确认的。
  4. 另外,最新申请的QQ号,一般可以成功登录mirai。
  5. 如果以上都不行,目前的终极方案是使用miraiAndroidMiraiAndroid
  • 在手机上的MiraiAndroid登录QQ后导出device.json
  • 将cache目录下的3个文件account.secrets、servers.json、session.bin也复制出来

  • 接下来点击左上角, 再点击“工具”。选择你机器人的账号, 选择 导出 DEVICE.JSON 将其导出。

  • 再次回到服务器端,进入 “bots/<你的QQ号>” 下面, 将导出的 device.json 复制放入。对应的cache文件夹也复制放入。

  • 再次执行 ./mcl 启动 mirai-console 看看效果。
  • 若仍有问题,欢迎加入文末Q群交流。
  1. 至此,mcl就已经能正常接收QQ消息了。而我们的实现代码对mcl的控制,就是通过mirai-api-http插件来实现的。根据上面第4步配置的_setting.yml_文件,再参考官方API文档HttpAdapter文档,即可实现互联互通。(讲起来比较麻烦,no bibi,后面直接show me the code)。

Python控制mirai篇

        当服务器成功运行了mirai后,我们就可以在本地进行Python脚本的编写了。由于最新的mirai-api-http变更过接口规范,因此网上某些一两年前的代码已经失效了。本教程对应的mirai-api-http使用的是最新的2.x版本。
        接下来的操作,都默认已经完成“启动mcl并login了QQ号”
        在上面setting.yml中,有两个配置项值得注意,他是我们脚本可以控制的密钥:

verifyKey: 1234567890
http: port: 8888

debug输出封装

        简单封装下。直接用print也是可以的。

class Logger:
    def __init__(self, level='debug'):
        self.level = level

    def DebugLog(self, *args):
        if self.level == 'debug':
            print(*args)

    def TraceLog(self, *args):
        if self.level == 'trace':
            print(*args)

    def setDebugLevel(self, level):
        self.level = level.lower()

交互授权

        在交互前,脚本需要先向mirai获取一个verifyKey,之后在每个请求时候,都需要带上这个key,也叫session。其中,参数auth_key对应了上面setting.yml里的verifyKey。

auth_key = '1234567890'

def verifySession(self, auth_key):
    """每个Session只能绑定一个Bot,但一个Bot可有多个Session。
        session Key在未进行校验的情况下,一定时间后将会被自动释放"""
    data = {"verifyKey": auth_key}
    url = self.addr+'verify'
    res = requests.post(url, data=json.dumps(data)).json()
    logger.DebugLog(res)
    if res['code'] == 0:
        return res['session']
    return None

绑定bot

        使用此方法校验并激活你的Session,同时将Session与一个已登录的Bot绑定。

qq = '121215'             # mirai登录的那个QQ
session = 'grge8484'     # 上面verifySession函数的返回值

def bindSession(self, session, qq):
    """校验并激活Session,同时将Session与一个已登录的Bot绑定"""
    data = {"sessionKey": session, "qq": qq}
    url = self.addr + 'bind'
    res = requests.post(url, data=json.dumps(data)).json()
    logger.DebugLog(res)
    if res['code'] == 0:
        self.session = session
        return True
    return False

释放bot

        使用此方式释放session及其相关资源(Bot不会被释放)

def releaseSession(self, session, qq):
    """不使用的Session应当被释放,长时间(30分钟)未使用的Session将自动释放,
        否则Session持续保存Bot收到的消息,将会导致内存泄露(开启websocket后将不会自动释放)"""
    data = {"sessionKey": session, "qq": qq}
    url = self.addr + 'release'
    res = requests.post(url, data=json.dumps(data)).json()
    logger.DebugLog(res)
    if res['code'] == 0:
        return True
    return False

未读消息的数量

        获取当前有多少条未读消息。

def getMessageCount(self, session):
    url = self.addr + 'countMessage?sessionKey='+session
    res = requests.get(url).json()
    if res['code'] == 0:
        return res['data']
    return 0

获取最新的消息

        获取消息后会从队列中移除。

def fetchLatestMessage(self, session):
    url = self.addr + 'fetchLatestMessage?count=10&sessionKey='+session
    res = requests.get(url).json()
    if res['code'] == 0:
        return res['data']
    return None

解析消息内容

        简单实现了部分消息类型的解析,会有消息丢失,请根据使用需求自行调整。

data = 'xxx'  # 可以是上面getMsgFromGroup函数的返回值

def parseGroupMsg(self, data):
    res = []
    if data is None:
        return res
    for item in data:
        if item['type'] == 'GroupMessage':
            type = item['messageChain'][-1]['type']
            if type == 'Image':
                text = item['messageChain'][-1]['url']
            elif type == 'Plain':
                text = item['messageChain'][-1]['text']
            elif type == 'Face':
                text = item['messageChain'][-1]['faceId']
            else:
                logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
                continue
                name = item['sender']['memberName']
                group_id = str(item['sender']['group']['id'])
                group_name = item['sender']['group']['name']
                res.append({'text': text, 'type': type, 'name': name, 'groupId': group_id, 'groupName': group_name})
                return res

向好友发送消息

        向指定好友发送消息。

def sendFriendMessage(self, session, qq, msg):
    msg_list = msg.split(r'\n')
    msg_chain = [{ "type": "Plain", "text": m+'\n' } for m in msg_list]

    data = {
        "sessionKey": session,
        "target": qq,
        "messageChain": msg_chain
    }
    url = self.addr + 'sendFriendMessage'
    try:
        res = requests.post(url, data=json.dumps(data)).json()
        except:
            logger.DebugLog(">> 发送失败")
            return 0
        if res['code'] == 0:
            return res['messageId']
        return 0

向群发送消息

        也只是简单实现。

def sendMsgToGroup(self, session, group, msg):
    text = msg['text']
    type = msg['type']
    name = msg['name']
    group_id = msg['groupId']
    group_name = msg['groupName']
    content1 = "【消息中转助手】\n用户:{}\n群号:{}\n群名:{}\n消息:\n{}".format(
        name, group_id, group_name, text)
    content2 = "【消息中转助手】\n用户:{}\n群号:{}\n群名:{}\n消息:\n".format(
        name, group_id, group_name)
    logger.DebugLog(">> 消息类型:" + type)
    if type == 'Plain':
        message = [{"type": type, "text": content1}]
    elif type == 'Image':
        message = [
            {"type": 'Plain', "text": content2},
            {"type": type, "url": text}]
    elif type == 'Face':
        message = [{"type": 'Plain', "text": content2},
                   {"type": type, "faceId": text}]
    else:
        logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
        return 0
    data = {
        "sessionKey": session,
        "group": group,
        "messageChain": message
    }
    logger.DebugLog(">> 消息内容:" + str(data))
    url = self.addr + 'sendGroupMessage'
    try:
        res = requests.post(url, data=json.dumps(data)).json()
        except:
            logger.DebugLog(">> 转发失败")
            return 0
        logger.DebugLog(">> 请求返回:" + str(res))
        if res['code'] == 0:
            return res['messageId']
        return 0

向群发送富文本消息

        跟上面的差不多,消息类型变了一下,从而支持类似HTML形式的消息发送。

def sendPlainTextToGroup(self, session, group, msg):
    msg_list = msg.split(r'\n')
    msg_chain = [{ "type": "Plain", "text": m+'\n' } for m in msg_list]
    data = {
        "sessionKey": session,
        "group": group,
        "messageChain": msg_chain
    }
    url = self.addr + 'sendGroupMessage'
    try:
        res = requests.post(url, data=json.dumps(data)).json()
        except:
            logger.DebugLog(">> 转发失败")
            return 0
        logger.DebugLog(">> 请求返回:" + str(res))
        if res['code'] == 0:
            return res['messageId']
        return 0

        以上就是几个简单、常用的函数。基于这些函数,就已经可以实现蛮多有趣的功能了。

Q群消息转发

        这部分可以直接参考之前的博客:Q群消息转发例程。其实也就是把上面的函数整合一下,放一个完整版:

import requests
from time import sleep

class Logger:
    def __init__(self, level='debug'):
        self.level = level

    def DebugLog(self, *args):
        if self.level == 'debug':
            print(*args)

    def TraceLog(self, *args):
        if self.level == 'trace':
            print(*args)

    def setDebugLevel(self, level):
        self.level = level.lower()

logger = Logger()
class QQBot:
    def __init__(self):
        self.addr = 'http://43.143.12.250:8888/'
        self.session = None

    def verifySession(self, auth_key):
        """每个Session只能绑定一个Bot,但一个Bot可有多个Session。
        session Key在未进行校验的情况下,一定时间后将会被自动释放"""
        data = {"verifyKey": auth_key}
        url = self.addr+'verify'
        res = requests.post(url, data=json.dumps(data)).json()
        logger.DebugLog(res)
        if res['code'] == 0:
            return res['session']
        return None

    def bindSession(self, session, qq):
        """校验并激活Session,同时将Session与一个已登录的Bot绑定"""
        data = {"sessionKey": session, "qq": qq}
        url = self.addr + 'bind'
        res = requests.post(url, data=json.dumps(data)).json()
        logger.DebugLog(res)
        if res['code'] == 0:
            self.session = session
            return True
        return False

    def releaseSession(self, session, qq):
        """不使用的Session应当被释放,长时间(30分钟)未使用的Session将自动释放,
        否则Session持续保存Bot收到的消息,将会导致内存泄露(开启websocket后将不会自动释放)"""
        data = {"sessionKey": session, "qq": qq}
        url = self.addr + 'release'
        res = requests.post(url, data=json.dumps(data)).json()
        logger.DebugLog(res)
        if res['code'] == 0:
            return True
        return False

    def fetchLatestMessage(self, session):
        url = self.addr + 'fetchLatestMessage?count=10&sessionKey='+session
        res = requests.get(url).json()
        if res['code'] == 0:
            return res['data']
        return None

    def parseGroupMsg(self, data):
        res = []
        if data is None:
            return res
        for item in data:
            if item['type'] == 'GroupMessage':
                type = item['messageChain'][-1]['type']
                if type == 'Image':
                    text = item['messageChain'][-1]['url']
                elif type == 'Plain':
                    text = item['messageChain'][-1]['text']
                elif type == 'Face':
                    text = item['messageChain'][-1]['faceId']
                else:
                    logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
                    continue
                name = item['sender']['memberName']
                group_id = str(item['sender']['group']['id'])
                group_name = item['sender']['group']['name']
                res.append({'text': text, 'type': type, 'name': name, 'groupId': group_id, 'groupName': group_name})
        return res

    def getMessageCount(self, session):
        url = self.addr + 'countMessage?sessionKey='+session
        res = requests.get(url).json()
        if res['code'] == 0:
            return res['data']
        return 0

    def sendPlainTextToGroup(self, session, group, msg):
        msg_list = msg.split(r'\n')
        msg_chain = [{ "type": "Plain", "text": m+'\n' } for m in msg_list]
        data = {
          "sessionKey": session,
          "group": group,
          "messageChain": msg_chain
        }
        url = self.addr + 'sendGroupMessage'
        try:
            res = requests.post(url, data=json.dumps(data)).json()
        except:
            logger.DebugLog(">> 转发失败")
            return 0
        logger.DebugLog(">> 请求返回:" + str(res))
        if res['code'] == 0:
            return res['messageId']
        return 0

    def sendMsgToGroup(self, session, group, msg):
        text = msg['text']
        type = msg['type']
        name = msg['name']
        group_id = msg['groupId']
        group_name = msg['groupName']
        content1 = "【消息中转助手】\n用户:{}\n群号:{}\n群名:{}\n消息:\n{}".format(
            name, group_id, group_name, text)
        content2 = "【消息中转助手】\n用户:{}\n群号:{}\n群名:{}\n消息:\n".format(
            name, group_id, group_name)
        logger.DebugLog(">> 消息类型:" + type)
        if type == 'Plain':
            message = [{"type": type, "text": content1}]
        elif type == 'Image':
            message = [
                {"type": 'Plain', "text": content2},
                {"type": type, "url": text}]
        elif type == 'Face':
            message = [{"type": 'Plain', "text": content2},
                       {"type": type, "faceId": text}]
        else:
            logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
            return 0
        data = {
                "sessionKey": session,
                "group": group,
                "messageChain": message
                }
        logger.DebugLog(">> 消息内容:" + str(data))
        url = self.addr + 'sendGroupMessage'
        try:
            res = requests.post(url, data=json.dumps(data)).json()
        except:
            logger.DebugLog(">> 转发失败")
            return 0
        logger.DebugLog(">> 请求返回:" + str(res))
        if res['code'] == 0:
            return res['messageId']
        return 0

    def sendMsgToAllGroups(self, session, receive_groups, send_groups, msg_data):
        # 对每条消息进行检查
        for msg in msg_data:
            group_id = msg['groupId']
            # 接收的消息群正确(目前只支持 消息类型)
            if group_id in receive_groups:
                # 依次将消息转发到目标群
                for g in send_groups:
                    logger.DebugLog(">> 当前群:"+g)
                    if g == group_id:
                        logger.DebugLog(">> 跳过此群")
                        continue
                    res = self.sendMsgToGroup(session, g, msg)
                    if res != 0:
                        logger.TraceLog(">> 转发成功!{}".format(g))

    def sendFriendMessage(self, session, qq, msg):
        msg_list = msg.split(r'\n')
        msg_chain = [{ "type": "Plain", "text": m+'\n' } for m in msg_list]

        data = {
          "sessionKey": session,
          "target": qq,
          "messageChain": msg_chain
        }
        url = self.addr + 'sendFriendMessage'
        try:
            res = requests.post(url, data=json.dumps(data)).json()
        except:
            logger.DebugLog(">> 发送失败")
            return 0
        if res['code'] == 0:
            return res['messageId']
        return 0

def qqTransfer():
    with open('conf.json', 'r+', encoding="utf-8") as f:
        content = f.read()
    conf = json.loads(content)

    auth_key = conf['auth_key']
    bind_qq = conf['bind_qq']
    sleep_time = conf['sleep_time']
    debug_level = conf['debug_level']

    receive_groups = conf['receive_groups']
    send_groups = conf['send_groups']

    logger.setDebugLevel(debug_level)

    session = bot.verifySession(auth_key)
    logger.DebugLog(">> session: "+session)
    bot.bindSession(session, bind_qq)
    while True:
        cnt = bot.getMessageCount(session)
        if cnt:
            logger.DebugLog('>> 有消息了 => {}'.format(cnt))
            logger.DebugLog('获取消息内容')
            data = bot.fetchLatestMessage(session)
            if len(data) == 0:
                logger.DebugLog('消息为空')
                continue
            logger.DebugLog(data)
            logger.DebugLog('解析消息内容')
            data = bot.parseGroupMsg(data)
            logger.DebugLog(data)
            logger.DebugLog('转发消息内容')
            bot.sendMsgToAllGroups(session, receive_groups, send_groups, data)
        # else:
        #     logger.DebugLog('空闲')
        sleep(sleep_time)
    bot.releaseSession(session, bind_qq)

        其中,conf.json内容为:

{
  "auth_key": "1234567890",
  "bind_qq":  "123456",                                                     # mirai登录的QQ (复制时记得删我)
  "sleep_time": 1,
  "receive_groups": ["913182235", "977307922"],  # 要接受消息的群 (复制时记得删我)
  "send_groups": ["913182235", "977307922"],         # 要发送消息的群 (复制时记得删我)
  "debug_level": "debug"
}

        下面,我们就先从类似QMsg酱的消息通知开始。

类似QMsg酱的消息通知

设计目标:通过调用指定的URL,小锋仔机器人就会给指定的好友发送指定的消息。
        关于QMsg酱的使用教程可以看:免费的QQ微信消息推送机器人
        前面我们特地开放了9966端口,因此可以使用Flask来监听这个端口。
        本着越简单越好的原则,我们把“发给好友还是群”、“目标好友或群的号”、“发送的内容”三部分都拼接到URL上,因此有:

http://43.143.12.250:9966/QQ/send/friend?target=123&msg=hello
http://43.143.12.250:9966/QQ/send/group?target=123&msg=hello

因此,代码可以写成:

from flask import Flask, request

app = Flask(__name__)

@app.route('/QQ/send/friend', methods=['GET'])
def qqListenMsgToFriend():
    # 类似于Qmsg的功能
    # flask做得接收HTTP请求转为QQ消息
    qq = request.args.get('target', None)
    msg = request.args.get('msg', None)
    bot.sendFriendMessage(bot.session, qq, msg)
    return 'Hello World! Friend!'

@app.route('/QQ/send/group', methods=['GET'])
def qqListenMsgToGroup():
    # 类似于Qmsg的功能
    # flask做得接收HTTP请求转为QQ消息
    qq = request.args.get('target', None)
    msg = request.args.get('msg', None)
    bot.sendPlainTextToGroup(bot.session, qq, msg)
    return 'Hello World! Group!'

if __name__ == '__main__':
    app.run(port='9966', host='0.0.0.0')

        由于Flask和小锋仔QQBot都要阻塞运行,因此稍微变动一下,让小锋仔以子线程的形式运行即可。

if __name__ == '__main__':
    t = threading.Thread(target=qqTransfer)
    t.setDaemon(True)
    t.start()

    app.run(port='9966', host='0.0.0.0')

        测试一下:

http://localhost:9966/QQ/send/friend?target=1061700625&msg=hello


        如果我们把这个脚本放到服务器上去运行,那么链接就变成了:

http://43.143.12.250:9966/QQ/send/friend?target=1061700625&msg=hello

        当然,能发消息的前提是“先加好友”或“加群”啦。

多功能切换的实现设计

        上面我们进行了简单地尝鲜。
1、从这部分开始,我们涉及的功能比较杂,为了能更好的区分功能,需要设计一个简单的交互协议。

  • 我们发送的内容可以分为:功能选择 与 消息详情
  • 为了区分他俩,可以在选择功能时添加指定前缀,如“CMD+翻译”;
  • 小锋仔接收到后,进入翻译模式准备;
  • 发送指令详情时,就不加前缀。而小锋仔则将收到的消息进行翻译,再把结果返回。

        根据以上内容,小锋仔需要记录的状态信息至少有:

class StatusStore:
    def __init__(self, from_qq:int=None, is_cmd:bool=False, func_name:str=None, need_second:bool=False, msg:str=None) -> None:
        self.from_qq = from_qq          # 发送者的QQ号
        self.is_cmd = is_cmd            # 是否是指令(选择功能)
        self.func_name = func_name      # 选择的功能的名称
        self.need_second = need_second  # 是否需要经过两步:先发cmd指令,再发详细内容
        self.msg = msg                  # 本次发送的消息内容
    
    def detail(self):
        return self.__dict__

2、并且我们设置,只有从指定QQ发过来消息,才能响应。因此在接收到消息时,需要判断对方的信息。对于好友类型的消息,mirai返回格式如消息类型说明

{
  "type": "FriendMessage",
  "sender": {
    "id": 123,
    "nickname": "",
    "remark": ""
  },
  "messageChain": [] // 数组,内容为下文消息类型
}

因此,我们可以从"type"和 "sender:id"入手判断。
3、我们暂时考虑只有一个主QQ能发送指令的情况。
4、定义一个类来专门管理不同功能的函数,例如:

class MultiFunction:
    """多功能函数集合"""
    def __init__(self) -> None:
        pass

    @staticmethod
    def translate(original:str, convert:str='zh2en') -> str:
        return '假装是翻译结果' 
    
    @staticmethod
    def uploadImage(image_path:str) -> str:
        return '假装是上传结果' 

    @staticmethod
    def weather(city:str) -> str:
        return '假装是天气结果' 
    
    @staticmethod
    def hotNews(status_store:StatusStore) -> str:
        return '假装是热搜结果' 


# 多功能函数的映射
# function: 功能对应函数名
# need_second: 是否需要经过两步:先发cmd指令,再发详细内容
# desc: 需要经过两步时,第一次返回的提示语
function_map = {
    '翻译': {'function': MultiFunction.translate, 'need_second': True, 'desc': '请输入您要翻译的内容~'}, 
    '天气': {'function': MultiFunction.weather, 'need_second': True, 'desc': '请问是哪座城市的天气呢?'}, 
    '热搜': {'function': MultiFunction.hotNews, 'need_second': False}
}

def choiceFunction(store_obj:StatusStore):
    res = ''
    if function_map.get(store_obj.func_name):
        res = function_map.get(store_obj.func_name)['function'](store_obj.msg)
    return res 

5、大致实现流程的想法是:


        对应代码实现:

def analyzeFriendMsg(self, data):
    if data is None or data['type'] != 'FriendMessage':
        return None, None, None
    sender_id = data['sender']['id']
    msg_type = data['messageChain'][-1]['type']
    if msg_type == 'Plain':
        msg_text = data['messageChain'][-1]['text']
    elif msg_type == 'Image':
        msg_text = data['messageChain'][-1]['url']
    else:
        msg_text = ''
        return sender_id, msg_type, msg_text

        最终的框架就是:

def xiaofengzai():
    auth_key = '1234567890'     # settings.yml中的verifyKey
    bind_qq = '3126229950'      # mirai登录的QQ
    target_qq = '1061700625'    # 我们自己用的主QQ
    target_qq = int(target_qq)  # 接收到的消息里,QQ是int类型的
    sleep_time = 1              # 轮询间隔
    status_store = {}

    session = bot.verifySession(auth_key)
    logger.DebugLog(">> session: "+session)
    bot.bindSession(session, bind_qq)
    while True:
        cnt = bot.getMessageCount(session)
        if not cnt:
            sleep(sleep_time)
            continue
        logger.DebugLog('>> 有消息了 => {}'.format(cnt))
        logger.DebugLog('获取消息内容')
        data = bot.fetchLatestMessage(session)
        if len(data) == 0:
            logger.DebugLog('消息为空')
            sleep(sleep_time)
            continue
        logger.DebugLog(data)
        logger.DebugLog('解析消息内容')

        sender_id, msg_type, msg_text = bot.analyzeFriendMsg(data[0])
        if not sender_id or sender_id != target_qq:
            sleep(sleep_time)
            continue

        if msg_text.strip().lower().startswith('cmd'):
            _, func_name = msg_text.strip().split('\n')[0].split()
            func_name = func_name.strip()
            store_obj = StatusStore(from_qq=sender_id, is_cmd=True, func_name=func_name)
            # 不需要发两次,直接调用函数返回结果即可
            func_info = function_map.get(func_name)
            if not func_info:
                res = '指令[{}]暂不支持'.format(func_name)
            elif func_info.get('need_second'):
                res = '收到你的指令:{}\n{}'.format(func_name, func_info.get('desc') or '已进入对应状态, 请继续发送详细内容')
                # 添加或更新记录
                status_store[sender_id] = store_obj
            else:
                res = '请求结果为:\n' + str(choiceFunction(store_obj))
                status_store.pop(sender_id, '')
        else:
            res = '请先发送指令哦...'
            store_obj = status_store.get(sender_id)
            if store_obj and store_obj.is_cmd:
                store_obj.msg = msg_text
                res = '请求结果为:\n' + str(choiceFunction(store_obj))
                status_store.pop(sender_id, '')
        
        bot.sendFriendMessage(session, qq=sender_id, msg=res)

        看一下效果:


        至此,骨架有了,接下来开始填充功能了。

翻译查询

        根据上面的骨架可知,我们只需要实现MultiFunction类下的translate函数即可。如果想快速测试函数效果,可以使用以下代码,而不用先启动mirai:

res = choiceFunction(StatusStore(func_name='翻译', msg='你好'))
print(res)

领取腾讯免费翻译API

        要做翻译,最方便的就是调用API了(没错,调包侠!)。
        这里使用腾讯的翻译API,可以免费领取:领取腾讯翻译API。点进链接后,往下拖到“云产品体验”专区,选择“人工智能”,下面有“机器翻译”。他的调用量是每月更新,非常的良心了。


        点击“立即体验”,进入控制台界面,虽然上面显示的是“开通付费版”,但不用担心,他是有免费额度的,更何况你账户里又没充余额,哈哈哈。

        支持很多类型的翻译,这次我们先选文本翻译机器翻译 文本翻译-API 文档-文档中心-腾讯云

        我们用SDK的方式,免去了自己封装复杂的加密步骤:

pip install --upgrade tencentcloud-sdk-python

        然后去获取密钥API密钥管理,记下APPID、SecretId、SecretKey


机器人接入翻译功能

        小锋仔bot结合翻译功能,直接上代码:

import json
from tencentcloud.common import credential
from tencentcloud.common.profile.client_profile import ClientProfile
from tencentcloud.common.profile.http_profile import HttpProfile
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
from tencentcloud.tmt.v20180321 import tmt_client, models

def translate(original:str, convert:str='en'):
    secretId = 'xxx'  # 从API控制台获取
    secretKey = 'xxx' # 从API控制台获取
    AppId = 12123     # 从API控制台获取
    try:
        cred = credential.Credential(secretId, secretKey)
        client = tmt_client.TmtClient(cred, "ap-guangzhou")
        req = models.TextTranslateRequest()
        params = {
            "SourceText": original,
            "Source": "auto",
            "Target": convert,
            "ProjectId": AppId
        }
        req.from_json_string(json.dumps(params))
        resp = client.TextTranslate(req)
        # print(resp.to_json_string())
        return resp.TargetText
    except TencentCloudSDKException as err:
        print(err)
        return ''

        使用测试效果:

print(choiceFunction(StatusStore(func_name='翻译', msg='你好')))

# 输出:
{"TargetText": "Hello", "Source": "zh", "Target": "en", "RequestId": "a1b17f47-751e-44cd-89a5-6a22e9f2c444"}
Hello


实时天气

领取免费的和风天气API

        天气部分,我们是用免费的和风天气API:实时天气 - API
        首先也要进行登录并获取KEY,这个步骤官网讲的很详细,图文并茂的,这边就不多写了,大家可以跳转过去(注意我们选的是Web API):创建应用和KEY - RESOURCE

机器人接入天气功能

        同样的,直接上代码:

def weather(city:str) -> str:
    url_api_weather = 'https://devapi.qweather.com/v7/weather/now?'
    url_api_geo = 'https://geoapi.qweather.com/v2/city/lookup?'
    weather_key = 'xxxxx'  # 和风天气控制台的key

    # 实况天气
    def getCityId(city_kw):
        url_v2 = url_api_geo + 'location=' + city_kw + '&key=' + weather_key
        city = requests.get(url_v2).json()['location'][0]
        return city['id']

    city_name = '广州'
    city_id = getCityId(city_name)
    url = url_api_weather + 'location=' + city_id + '&key=' + weather_key
    res = requests.get(url).json()
    text = "<天气信息获取失败>"
    if res['code'] == '200' or res['code'] == 200:
        text = '实时天气:\n 亲爱的 小主, 您所在的地区为 {},\n 现在的天气是 {},\n 气温 {}°, 湿度 {}%,\n 体感气温为 {}°,\n 风向 {}, 风速 {}km/h'.format(
            city_name, res['now']['text'], res['now']['temp'], res['now']['humidity'], res['now']['feelsLike'], res['now']['windDir'], res['now']['windSpeed']) 
    return text 

        测试效果:


实时热搜

领取免费的天行热搜API

        这部分用的是天行数据,免费会员每天赠送100次调用额度:今日头条新闻API接口 - 天行数据TianAPI。先注册账号,然后点击“申请接口”即可。


        注意,首次注册需要在控制台完成“实名认证”和“邮箱验证”(马上通过,不需要等待审核)。
        对于密钥Key,是在“控制台-数据管理-我的密钥KEY”中。

机器人接入热搜功能

        同样的,直接上代码:

def hotNews(status_store:StatusStore) -> str:
    tianxing_key = 'e05966abe0b054686c9f6b7d60e59a8d'
    def common(data):
        url = data + '?key={}'.format(tianxing_key)
        res = requests.get(url).json()
        return res['newslist']
    res = common('http://api.tianapi.com/topnews/index')
    tops = []
    index = 1
    for item in res:
        tops.append(str(index) + '. ' + item['title'])
        index += 1
        return '\n'.join(tops[0:10])

        测试效果:

照片上传

        有时候我们想保存一些照片,但又不想放手机里,那我们可以做个“通过把照片发给小锋仔机器人,让小锋仔再上传到服务器或者COS上”的功能。

领取腾讯对象存储COS

        还是这个链接:云产品体验 - 腾讯云,在“云产品体验-基础-对象存储COS”下面。对象存储不止可以用来存文件,这里我们只用来存图片。

  1. 领取完成后,进入控制台,创建存储桶:

  1. 配置存储桶信息,访问权限设置为“公有读私有写”,这样别人就能看到了,便于分享图片:

  1. 创建完成后,就可以通过Python APi去控制上传了。不过需要先安装下SDK:
pip install -U cos-python-sdk-v5
  1. 上传图片部分,我们现在随便拿一张图测试:
from qcloud_cos import CosConfig
from qcloud_cos import CosS3Client
import os

def uploadImage(image_path:str) -> str:
    bucket_id = 'image-1253093297'  # 存储桶的名称
    secret_id = 'xxx'
    secret_key = 'xxx'
    region = 'ap-guangzhou'  # 存储桶的地区
    token = None              
    scheme = 'https'          
    config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token, Scheme=scheme)
    client = CosS3Client(config)
    # 本地文件形式上传
    # response = client.upload_file(
    #     Bucket=bucket_id,
    #     LocalFilePath=image_path,
    #     Key=image_path.split(os.sep)[-1],
    #     PartSize=1,
    #     MAXThread=10,
    #     EnableMD5=False
    # )

    # 网络文件形式上传
    file_keyname = image_path.split('/')[-2] + '.jpg'
    stream = requests.get(image_path)
    response = client.put_object(
        Bucket=bucket_id,
        Body=stream,
        Key=file_keyname
    )
    print(response['ETag'])
    img_url = 'https://{}.cos.{}.myqcloud.com/{}'.format(bucket_id, region, file_keyname)
    return '上传成功, ETag: {}\nURL: {}'.format(response['ETag'], img_url)
  1. 上传完成后,在控制台就可以看到了。

  1. 更多cos操作可看官方文档:对象存储 快速入门-SDK 文档-文档中心-腾讯云

机器人接入图片上传功能

        通过mirai文档可知,图片消息格式为:

[{'type': 'FriendMessage', 'messageChain': [{'type': 'Source', 'id': 55312, 'time': 1662048857}, {'type': 'Image', 'imageId': '{DCAD8B29-D606-B354-117D-F39479C14FE3}.jpg', 'url': 'http://c2cpicdw.qpic.cn/offpic_new/1061700625//1061700625-141936558-DCAD8B29D606B354117DF39479C14FE3/0?term=2', 'path': None, 'base64': None}], 'sender': {'id': 1061700625, 'nickname': '热心市民', 'remark': '热心市民'}}]

因此只需要拿到里面的URL就行,而我们的analyzeFriendMsg函数就已经提取了URl了,因此啥都不用多改!!(结构好,就是方便呀~)
直接测试:

自行添加小功能函数总结

        通过上面几个小功能,不难发现我们的程序在功能上很方便扩展,总结一下,就2步

  1. MultiFunction类中添加功能函数的实现,入参尽量为字符串型,返回也为字符串型
  2. function_map中添加函数信息;

        下面提供几个好玩的接口,给大家留个作业,自己集成到机器人中去:

# 疫情信息
def getYiQing():
    url = 'https://c.m.163.com/ug/api/wuhan/app/data/list-total?t={}'.format(329091037164)
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
                      'Chrome/96.0.4664.110 Safari/537.36 '
    }
    res = requests.get(url, headers=headers).json()
    total = res['data']['chinaTotal']['total']
    today = res['data']['chinaTotal']['today']
    a = 99
    symbol_today = '+' if today['confirm'] >= 0 else ''
    symbol_total = '+' if today['storeConfirm'] >= 0 else ''
    symbol_input = '+' if today['input'] >= 0 else ''
    confirmTotal = '累计确诊:{},较昨日:{}{}'.format(total['confirm'], symbol_today, today['confirm'])
    confirmToday = '现有确诊:{},较昨日:{}{}'.format(total['confirm'] - total['dead'] - total['heal'], symbol_total, today['storeConfirm'])
    inputs = '境外输入:{},较昨日:{}{}'.format(total['input'], symbol_input, today['input'])
    return inputs + '\n' + confirmToday + '\n' + confirmTotal

# 历史上的今天
def getHistoryToday():
    url = 'https://api.oick.cn/lishi/api.php'
    res = requests.get(url).json()
    historyToday = []
    for item in res['result']:
        historyToday.append(item['date'] + ', ' + item['title'])
    return '\n'.join(random.choices(historyToday, k=3))

# 一言
def dailysentence():
    url = 'https://res.abeim.cn/api-text_yiyan'
    res = requests.get(url).json()
    return res['content']

# 天行api
def common(data):
    tianxing_key = ''  # 天行key
    url = data + '?key={}'.format(tianxing_key)
    res = requests.get(url).json()
    return res['newslist']

# 天行api - 小窍门
def dailyTips():
    res = common('http://api.tianapi.com/qiaomen/index')
    tipsArray = res[0]
    return tipsArray['content']

# 天行api - 健康小知识
def healthTips():
    res = common('http://api.tianapi.com/healthtip/index')
    tipsArray = res[0]
    return tipsArray['content']

如果以后功能越来越多,我们很容易记不住关键词是啥,因此,稍稍变动一下,让我们可以知道功能清单。在xiaofengzai函数这个位置添加一段代码:

if msg_text.strip() == '功能清单':
    res = '目前支持的关键词有:\n' + '\n'.join(function_map.keys())



控制树莓派舵机与屏显

        这部分摘自我前面的博客:

        与树莓派的主要交互,这里主要有两种方式:

  • 树莓派上也运行mirai。通过设置不同的protocol,是可以实现同时在线的。
  • 通过MQTT通信。这个比较好用,是个物联网协议,广泛适用于IoT场景,推荐。

        我的另一个大型项目“基于树莓派的智能魔镜”,它里面树莓派与手机的通信,就是通过MQTT实现的。很贴心的,B站还有配套的视频教程,欢迎来踩,哈哈哈~小锋学长生活大爆炸的个人空间


腾讯云服务器搭建MQTT环境

        树莓派由于不在身边,因此这部分暂时先略过,大家可以通过上面几篇博客自学一下,他们也都是使用到了mirai的。这里讲一下MQTT的安装,也可以参考安装EMQX MQTT

  1. SSH进入我们的服务器后,输入指令:
sudo apt-add-repository ppa:mosquitto-dev/mosquitto-ppa
sudo apt-get update
sudo apt-get install mosquitto
sudo apt-get install mosquitto-clients
sudo apt clean
  1. 然后去腾讯云控制台开放下1883端口。
  2. 再在防火墙软件上也放行下1883端口
sudo firewall-cmd --permanent --zone=public --add-port=1883/tcp && sudo firewall-cmd --reload
sudo systemctl start firewalld.service
  1. 通过Python调用MQTT的示例可以参考:Python MQTT
  2. 想了解学习MQTT概念的可以参考:MQTT V3.1协议规范
  3. 更多MQTT使用示例可以参考:

    1. Ubuntu18和Raspbian搭建LAMP环境+部署图片上传网页+安装Mosquitto
    2. 纯JavaScript实现的MQTT智能门锁
    3. Qt搭建MQTT环境

待实现功能

接入控制ESP32(实现智能家居控制)

        ESP32是一块可以链接WIFI的嵌入式开发板,支持MQTT协议。这样一来,只要通过跟我们的机器人互相订阅Topic,在通过设计一套通信协议,就可以实现远程交互了。进一步地,给ESP32接入外设,就可以很容易的实现一个智能家居,而我们则可以通过QQ机器人来实现对智能家居的控制。

完整代码整理

        为了方便,我们把所有需要修改的变量,都统一提取到了最前面。大家在复制过程中,务必记得都填上自己的!!
![Q073C@O[}BS4ON(A]H$C_YX.png](https://cdn.nlark.com/yuque/0/2022/png/21876370/1662051010396-4111cadd-c0f7-444e-88bf-f5ab77abd9c1.png#clientId=u1cf12aa3-c8f3-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=547&id=u1448e26e&margin=%5Bobject%20Object%5D&name=Q073C%40O%5B%7DBS4ON%28A%5DH%24C_YX.png&originHeight=903&originWidth=1013&originalType=binary&ratio=1&rotation=0&showTitle=false&size=46750&status=done&style=none&taskId=ud48045de-328e-4049-870b-8561bc1feb4&title=&width=614)
        最后,贴上完整代码。由于水平有限,写的可能不是很好。也欢迎大家DIY魔改成自己的。如果有问题,欢迎加入文末Q群一起交流~~~

import json
import os

import requests
from flask import Flask, request
from time import sleep
import threading
import json
from tencentcloud.common import credential
from tencentcloud.common.profile.client_profile import ClientProfile
from tencentcloud.common.profile.http_profile import HttpProfile
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
from tencentcloud.tmt.v20180321 import tmt_client, models
import requests
from qcloud_cos import CosConfig
from qcloud_cos import CosS3Client


# ---------------------------- 变量定义区 ---------------------------- #
# 自己运行mirai的服务器IP和mirai-api-http的监听端口
mirai_server_url = 'http://43.143.12.250:8888/'
# settings.yml中的verifyKey
auth_key = '1234567890'     
# mirai登录的QQ
bind_qq = 'xxx'      
# 我们自己用的主QQ。接收到的消息里,QQ是int类型的
target_qq = 123123

# 腾讯COS存储桶的名称
tencent_cos_bucket_id = 'xxx'  
# 腾讯COS存储桶的地区
tencent_cos_region = 'ap-guangzhou'  
# 腾讯控制台的SecretId
tencent_secret_id = 'xxx'
# 腾讯控制台的SecretKey
tencent_secret_key = 'xxx'
# 腾讯机器翻译的appid
tencent_translate_AppId = xxx
# 和风天气API的key
weather_key = 'xxx'
# 天行API的key
tianxing_key = 'xxx'
# ------------------------------------------------------------------ #


class Logger:
    def __init__(self, level='debug'):
        self.level = level

    def DebugLog(self, *args):
        if self.level == 'debug':
            print(*args)

    def TraceLog(self, *args):
        if self.level == 'trace':
            print(*args)

    def setDebugLevel(self, level):
        self.level = level.lower()


class QQBot:
    def __init__(self):
        self.addr = mirai_server_url
        self.session = None

    def verifySession(self, auth_key):
        """每个Session只能绑定一个Bot,但一个Bot可有多个Session。
        session Key在未进行校验的情况下,一定时间后将会被自动释放"""
        data = {"verifyKey": auth_key}
        url = self.addr+'verify'
        res = requests.post(url, data=json.dumps(data)).json()
        logger.DebugLog(res)
        if res['code'] == 0:
            return res['session']
        return None

    def bindSession(self, session, qq):
        """校验并激活Session,同时将Session与一个已登录的Bot绑定"""
        data = {"sessionKey": session, "qq": qq}
        url = self.addr + 'bind'
        res = requests.post(url, data=json.dumps(data)).json()
        logger.DebugLog(res)
        if res['code'] == 0:
            self.session = session
            return True
        return False

    def releaseSession(self, session, qq):
        """不使用的Session应当被释放,长时间(30分钟)未使用的Session将自动释放,
        否则Session持续保存Bot收到的消息,将会导致内存泄露(开启websocket后将不会自动释放)"""
        data = {"sessionKey": session, "qq": qq}
        url = self.addr + 'release'
        res = requests.post(url, data=json.dumps(data)).json()
        logger.DebugLog(res)
        if res['code'] == 0:
            return True
        return False

    def fetchLatestMessage(self, session):
        url = self.addr + 'fetchLatestMessage?count=10&sessionKey='+session
        res = requests.get(url).json()
        if res['code'] == 0:
            return res['data']
        return None


    def parseGroupMsg(self, data):
        res = []
        if data is None:
            return res
        for item in data:
            if item['type'] == 'GroupMessage':
                type = item['messageChain'][-1]['type']
                if type == 'Image':
                    text = item['messageChain'][-1]['url']
                elif type == 'Plain':
                    text = item['messageChain'][-1]['text']
                elif type == 'Face':
                    text = item['messageChain'][-1]['faceId']
                else:
                    logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
                    continue
                name = item['sender']['memberName']
                group_id = str(item['sender']['group']['id'])
                group_name = item['sender']['group']['name']
                res.append({'text': text, 'type': type, 'name': name, 'groupId': group_id, 'groupName': group_name})
        return res

    def getMessageCount(self, session):
        url = self.addr + 'countMessage?sessionKey='+session
        res = requests.get(url).json()
        if res['code'] == 0:
            return res['data']
        return 0

    def peekMessage(self, session):
        url = self.addr + 'peekMessage?sessionKey='+session
        res = requests.get(url).json()
        if res['code'] == 0:
            return res['data']
        return 0

    def sendPlainTextToGroup(self, session, group, msg):
        msg_list = msg.split(r'\n')
        msg_chain = [{ "type": "Plain", "text": m+'\n' } for m in msg_list]
        data = {
          "sessionKey": session,
          "group": group,
          "messageChain": msg_chain
        }
        url = self.addr + 'sendGroupMessage'
        try:
            res = requests.post(url, data=json.dumps(data)).json()
        except:
            logger.DebugLog(">> 转发失败")
            return 0
        logger.DebugLog(">> 请求返回:" + str(res))
        if res['code'] == 0:
            return res['messageId']
        return 0

    def sendMsgToGroup(self, session, group, msg):
        text = msg['text']
        type = msg['type']
        name = msg['name']
        group_id = msg['groupId']
        group_name = msg['groupName']
        content1 = "【消息中转助手】\n用户:{}\n群号:{}\n群名:{}\n消息:\n{}".format(
            name, group_id, group_name, text)
        content2 = "【消息中转助手】\n用户:{}\n群号:{}\n群名:{}\n消息:\n".format(
            name, group_id, group_name)
        logger.DebugLog(">> 消息类型:" + type)
        if type == 'Plain':
            message = [{"type": type, "text": content1}]
        elif type == 'Image':
            message = [
                {"type": 'Plain', "text": content2},
                {"type": type, "url": text}]
        elif type == 'Face':
            message = [{"type": 'Plain', "text": content2},
                       {"type": type, "faceId": text}]
        else:
            logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
            return 0
        data = {
                "sessionKey": session,
                "group": group,
                "messageChain": message
                }
        logger.DebugLog(">> 消息内容:" + str(data))
        url = self.addr + 'sendGroupMessage'
        try:
            res = requests.post(url, data=json.dumps(data)).json()
        except:
            logger.DebugLog(">> 转发失败")
            return 0
        logger.DebugLog(">> 请求返回:" + str(res))
        if res['code'] == 0:
            return res['messageId']
        return 0

    def sendMsgToAllGroups(self, session, receive_groups, send_groups, msg_data):
        # 对每条消息进行检查
        for msg in msg_data:
            group_id = msg['groupId']
            # 接收的消息群正确(目前只支持 消息类型)
            if group_id in receive_groups:
                # 依次将消息转发到目标群
                for g in send_groups:
                    logger.DebugLog(">> 当前群:"+g)
                    if g == group_id:
                        logger.DebugLog(">> 跳过此群")
                        continue
                    res = self.sendMsgToGroup(session, g, msg)
                    if res != 0:
                        logger.TraceLog(">> 转发成功!{}".format(g))

    def sendFriendMessage(self, session, qq, msg):
        msg_list = msg.split(r'\n')
        msg_chain = [{ "type": "Plain", "text": m+'\n' } for m in msg_list]

        data = {
          "sessionKey": session,
          "target": qq,
          "messageChain": msg_chain
        }
        url = self.addr + 'sendFriendMessage'
        try:
            res = requests.post(url, data=json.dumps(data)).json()
        except:
            logger.DebugLog(">> 发送失败")
            return 0
        if res['code'] == 0:
            return res['messageId']
        return 0


    def analyzeFriendMsg(self, data):
        if data is None or data['type'] != 'FriendMessage':
            return None, None, None
        sender_id = data['sender']['id']
        msg_type = data['messageChain'][-1]['type']
        if msg_type == 'Plain':
            msg_text = data['messageChain'][-1]['text']
        elif msg_type == 'Image':
            msg_text = data['messageChain'][-1]['url']
        else:
            msg_text = ''
        return sender_id, msg_type, msg_text




logger = Logger()
bot = QQBot()
app = Flask(__name__)

def qqTransfer():
    with open('conf.json', 'r+', encoding="utf-8") as f:
        content = f.read()
    conf = json.loads(content)

    auth_key = conf['auth_key']
    bind_qq = conf['bind_qq']
    sleep_time = conf['sleep_time']
    debug_level = conf['debug_level']

    receive_groups = conf['receive_groups']
    send_groups = conf['send_groups']

    logger.setDebugLevel(debug_level)

    session = bot.verifySession(auth_key)
    logger.DebugLog(">> session: "+session)
    bot.bindSession(session, bind_qq)
    while True:
        cnt = bot.getMessageCount(session)
        if cnt:
            logger.DebugLog('>> 有消息了 => {}'.format(cnt))
            logger.DebugLog('获取消息内容')
            data = bot.fetchLatestMessage(session)
            if len(data) == 0:
                logger.DebugLog('消息为空')
                continue
            logger.DebugLog(data)
            logger.DebugLog('解析消息内容')
            data = bot.parseGroupMsg(data)
            logger.DebugLog(data)
            logger.DebugLog('转发消息内容')
            bot.sendMsgToAllGroups(session, receive_groups, send_groups, data)
        # else:
        #     logger.DebugLog('空闲')
        sleep(sleep_time)
    bot.releaseSession(session, bind_qq)





class StatusStore:
    def __init__(self, from_qq:int=None, is_cmd:bool=False, func_name:str=None, need_second:bool=False, msg:str=None) -> None:
        self.from_qq = from_qq          # 发送者的QQ号
        self.is_cmd = is_cmd            # 是否是指令(选择功能)
        self.func_name = func_name      # 选择的功能的名称
        self.need_second = need_second  # 是否需要经过两步:先发cmd指令,再发详细内容
        self.msg = msg                  # 本次发送的消息内容
    
    def detail(self):
        return self.__dict__

class MultiFunction:
    """多功能函数集合"""
    def __init__(self) -> None:
        pass

    @staticmethod
    def translate(original:str, convert:str='en'):
        try:
            cred = credential.Credential(tencent_secret_id, tencent_secret_key)
            client = tmt_client.TmtClient(cred, "ap-guangzhou")
            req = models.TextTranslateRequest()
            params = {
                "SourceText": original,
                "Source": "auto",
                "Target": convert,
                "ProjectId": tencent_translate_AppId
            }
            req.from_json_string(json.dumps(params))
            resp = client.TextTranslate(req)
            # print(resp.to_json_string())
            return resp.TargetText
        except TencentCloudSDKException as err:
            print(err)
        return ''
    
    @staticmethod
    def uploadImage(image_path:str) -> str:
        token = None              
        scheme = 'https'          
        config = CosConfig(Region=tencent_cos_region, SecretId=tencent_secret_id, SecretKey=tencent_secret_key, Token=token, Scheme=scheme)
        client = CosS3Client(config)
        # 本地文件形式上传
        # response = client.upload_file(
        #     Bucket=bucket_id,
        #     LocalFilePath=image_path,
        #     Key=image_path.split(os.sep)[-1],
        #     PartSize=1,
        #     MAXThread=10,
        #     EnableMD5=False
        # )

        # 网络文件形式上传
        file_keyname = image_path.split('/')[-2] + '.jpg'
        stream = requests.get(image_path)
        response = client.put_object(
            Bucket=tencent_cos_bucket_id,
            Body=stream,
            Key=file_keyname
        )
        print(response['ETag'])
        img_url = 'https://{}.cos.{}.myqcloud.com/{}'.format(tencent_cos_bucket_id, tencent_cos_region, file_keyname)
        return '上传成功, ETag: {}\nURL: {}'.format(response['ETag'], img_url)

    @staticmethod
    def weather(city_name:str='广州') -> str:
        url_api_weather = 'https://devapi.qweather.com/v7/weather/now?'
        url_api_geo = 'https://geoapi.qweather.com/v2/city/lookup?'
        
        # 实况天气
        def getCityId(city_kw):
            url_v2 = url_api_geo + 'location=' + city_kw + '&key=' + weather_key
            city = requests.get(url_v2).json()['location'][0]
            return city['id']

        city_id = getCityId(city_name)
        url = url_api_weather + 'location=' + city_id + '&key=' + weather_key
        res = requests.get(url).json()
        text = "<天气信息获取失败>"
        print(res)
        if res['code'] == '200' or res['code'] == 200:
            text = '实时天气:\n 亲爱的 小主, 您所在的地区为 {},\n 现在的天气是 {},\n 气温 {}°, 湿度 {}%,\n 体感气温为 {}°,\n 风向 {}, 风速 {}km/h'.format(
                city_name, res['now']['text'], res['now']['temp'], res['now']['humidity'], res['now']['feelsLike'], res['now']['windDir'], res['now']['windSpeed']) 
        return text 
    
    @staticmethod
    def hotNews(status_store:StatusStore) -> str:
        def common(data):
            url = data + '?key={}'.format(tianxing_key)
            res = requests.get(url).json()
            return res['newslist']
        res = common('http://api.tianapi.com/topnews/index')
        tops = []
        index = 1
        for item in res:
            tops.append(str(index) + '. ' + item['title'])
            index += 1
        return '\n'.join(tops[0:10])






# 多功能函数的映射
# function: 功能对应函数名
# need_second: 是否需要经过两步:先发cmd指令,再发详细内容
# desc: 需要经过两步时,第一次返回的提示语
function_map = {
    '翻译': {'function': MultiFunction.translate, 'need_second': True, 'desc': '请输入您要翻译的内容~'}, 
    '天气': {'function': MultiFunction.weather, 'need_second': True, 'desc': '请问是哪座城市的天气呢?'}, 
    '热搜': {'function': MultiFunction.hotNews, 'need_second': False},
    '上传图片': {'function': MultiFunction.uploadImage, 'need_second': True, 'desc': '请发送图片过来吧~'},
}
def choiceFunction(store_obj:StatusStore):
    res = ''
    if function_map.get(store_obj.func_name):
        res = function_map.get(store_obj.func_name)['function'](store_obj.msg)
    return res 






def xiaofengzai():
    sleep_time = 1              # 轮询间隔
    status_store = {}

    session = bot.verifySession(auth_key)
    logger.DebugLog(">> session: "+session)
    bot.bindSession(session, bind_qq)
    while True:
        cnt = bot.getMessageCount(session)
        if not cnt:
            sleep(sleep_time)
            continue
        logger.DebugLog('>> 有消息了 => {}'.format(cnt))
        logger.DebugLog('获取消息内容')
        data = bot.fetchLatestMessage(session)
        if len(data) == 0:
            logger.DebugLog('消息为空')
            sleep(sleep_time)
            continue
        logger.DebugLog(data)
        logger.DebugLog('解析消息内容')

        sender_id, msg_type, msg_text = bot.analyzeFriendMsg(data[0])
        if not sender_id or sender_id != target_qq:
            sleep(sleep_time)
            continue

        if msg_text.strip() == '功能清单':
            res = '目前支持的关键词有:\n' + '\n'.join(function_map.keys())
        elif msg_text.strip().lower().startswith('cmd'):
            _, func_name = msg_text.strip().split('\n')[0].split()
            func_name = func_name.strip()
            store_obj = StatusStore(from_qq=sender_id, is_cmd=True, func_name=func_name)
            # 不需要发两次,直接调用函数返回结果即可
            func_info = function_map.get(func_name)
            if not func_info:
                res = '指令[{}]暂不支持'.format(func_name)
            elif func_info.get('need_second'):
                res = '收到你的指令:{}\n{}'.format(func_name, func_info.get('desc') or '已进入对应状态, 请继续发送详细内容')
                # 添加或更新记录
                status_store[sender_id] = store_obj
            else:
                res = '请求结果为:\n' + str(choiceFunction(store_obj))
                status_store.pop(sender_id, '')
        else:
            res = '请先发送指令哦...'
            store_obj = status_store.get(sender_id)
            if store_obj and store_obj.is_cmd:
                store_obj.msg = msg_text
                res = '请求结果为:\n' + str(choiceFunction(store_obj))
                status_store.pop(sender_id, '')
        
        bot.sendFriendMessage(session, qq=sender_id, msg=res)

        


@app.route('/QQ/send', methods=['GET'])
def qqListenMsg():
    # 类似于Qmsg的功能
    # flask做得接收HTTP请求转为QQ消息
    qq = request.args.get('target', None)
    msg = request.args.get('msg', None)
    bot.sendFriendMessage(bot.session, qq, msg)
    return 'Hello World!'

@app.route('/QQ/send/friend', methods=['GET'])
def qqListenMsgToFriend():
    # 类似于Qmsg的功能
    # flask做得接收HTTP请求转为QQ消息
    qq = request.args.get('target', None)
    msg = request.args.get('msg', None)
    bot.sendFriendMessage(bot.session, qq, msg)
    return 'Hello World! Friend!'

@app.route('/QQ/send/group', methods=['GET'])
def qqListenMsgToGroup():
    # 类似于Qmsg的功能
    # flask做得接收HTTP请求转为QQ消息
    qq = request.args.get('target', None)
    msg = request.args.get('msg', None)
    bot.sendPlainTextToGroup(bot.session, qq, msg)
    return 'Hello World! Group!'


if __name__ == '__main__':
    t = threading.Thread(target=xiaofengzai)
    t.setDaemon(True)
    t.start()

    # t = threading.Thread(target=qqTransfer)
    # t.setDaemon(True)
    # t.start()

    app.run(port='9966', host='0.0.0.0')