本文章可借鉴学习,不可直接盗用
接入ai要获取ID,Secret,Key,和接口地址,由于我们服务接口类型是websocket,所以要获取相应的接口地址。(千万不要复制粘贴到http的了)
还要获取domain,在操作文档中找到对应模型的admain
安装库pip install flask websocket-client werkzeug
具体看这篇文章
Django接入 免费的 AI 大模型——讯飞星火(2025年4月最新!!!)_星火大模型 api password-CSDN博客
# coding: utf-8
import _thread as thread
import base64
import datetime
import hashlib
import hmac
import json
import ssl
from urllib.parse import urlparse, urlencode
from wsgiref.handlers import format_date_time
from time import mktime
import websocket
class Ws_Param:
def __init__(self, APPID, APIKey, APISecret, gpt_url):
self.APPID = APPID
self.APIKey = APIKey
self.APISecret = APISecret
self.host = urlparse(gpt_url).netloc
self.path = urlparse(gpt_url).path
self.gpt_url = gpt_url
def create_url(self):
now = datetime.datetime.now()
date = format_date_time(mktime(now.timetuple()))
signature_origin = f"host: {self.host}\ndate: {date}\nGET {self.path} HTTP/1.1"
signature_sha = hmac.new(
self.APISecret.encode("utf-8"),
signature_origin.encode("utf-8"),
digestmod=hashlib.sha256
).digest()
signature_sha_base64 = base64.b64encode(signature_sha).decode("utf-8")
authorization_origin = (
f'api_key="{self.APIKey}", algorithm="hmac-sha256", '
f'headers="host date request-line", signature="{signature_sha_base64}"'
)
authorization = base64.b64encode(authorization_origin.encode("utf-8")).decode("utf-8")
url_params = {
"authorization": authorization,
"date": date,
"host": self.host
}
return f"{self.gpt_url}?{urlencode(url_params)}"
def on_error(ws, error):
print("### 连接错误:", error)
def on_close(ws, close_status_code, close_msg):
print("### 连接关闭 ###")
def on_open(ws):
thread.start_new_thread(run, (ws,))
def run(ws):
data = json.dumps(gen_params(ws.appid, ws.query, ws.domain))
ws.send(data)
def on_message(ws, message):
try:
data = json.loads(message)
code = data['header']['code']
if code != 0:
print(f"请求错误: {code}, 错误信息: {data['header']['message']}")
else:
choices = data["payload"]["choices"]
content = choices["text"][0]["content"]
print(content, end='')
if choices["status"] == 2:
print("\n#### 会话结束")
ws.close()
except Exception as e:
print("### 消息解析异常:", e)
def gen_params(appid, query, domain):
return {
"header": {"app_id": appid, "uid": "1234"},
"parameter": {"chat": {"domain": domain, "temperature": 0.5, "max_tokens": 4096}},
"payload": {"message": {"text": [{"role": "user", "content": query}]}}
}
def main(appid, api_secret, api_key, gpt_url, domain, query):
# 验证密钥格式(APPID长度8,APIKey和APISecret长度32)
if not (len(appid) == 8 and len(api_key) == 32 and len(api_secret) == 32):
print("错误: API密钥格式不正确")
return
ws_param = Ws_Param(appid, api_key, api_secret, gpt_url)
ws_url = ws_param.create_url()
ws = websocket.WebSocketApp(
ws_url,
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close
)
ws.appid = appid
ws.query = query
ws.domain = domain
ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
if __name__ == "__main__":
# 根据截图中的 Websocket 认证信息修正参数(Spark4.0 Ultra 配置)
main(
appid="", # 截图中显示的 APPID
api_secret="", # 截图中 APISecret(注意核对隐藏字符)
api_key="", # 截图中 APIKey
gpt_url="", # Websocket 地址需与控制台一致
domain="4.0Ultra", # Spark4.0 对应 domain
query="你叫什么名字"
)
注意这里要替换成你的密钥,接口地址和domain
运行结果如下
前端templates\index.html
<!-- templates/index.html -->
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>AI 智能助手</title>
<link href="https://cdn.bootcdn.net/ajax/libs/font-awesome/6.4.0/css/all.min.css" rel="stylesheet">
<style>
:root {
--primary-color: #4a90e2;
--secondary-color: #f5f5f5;
--success-color: #34d399;
--error-color: #ef4444;
}
* {
box-sizing: border-box;
margin: 0;
padding: 0;
}
body {
font-family: 'Segoe UI', system-ui, -apple-system, sans-serif;
background: #f0f2f5;
min-height: 100vh;
}
.container {
max-width: 1200px;
margin: 0 auto;
padding: 20px;
}
.chat-container {
background: white;
border-radius: 12px;
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);
height: 80vh;
display: flex;
flex-direction: column;
}
#chat-box {
flex: 1;
overflow-y: auto;
padding: 20px;
scroll-behavior: smooth;
}
.message {
display: flex;
gap: 12px;
margin-bottom: 16px;
}
.message.user {
flex-direction: row-reverse;
}
.avatar {
width: 40px;
height: 40px;
border-radius: 50%;
background: var(--secondary-color);
display: flex;
align-items: center;
justify-content: center;
flex-shrink: 0;
}
.message-content {
max-width: 70%;
padding: 12px 16px;
border-radius: 12px;
position: relative;
}
.user .message-content {
background: var(--primary-color);
color: white;
border-radius: 12px 12px 0 12px;
}
.bot .message-content {
background: var(--secondary-color);
border-radius: 12px 12px 12px 0;
}
.file-upload {
background: #f8fafc;
border: 2px dashed #cbd5e1;
border-radius: 8px;
padding: 16px;
text-align: center;
margin: 16px 0;
cursor: pointer;
transition: all 0.3s ease;
}
.file-upload:hover {
border-color: var(--primary-color);
background: rgba(74, 144, 226, 0.05);
}
.input-area {
border-top: 1px solid #eee;
padding: 16px;
display: flex;
gap: 12px;
background: white;
}
#user-input {
flex: 1;
padding: 12px;
border: 1px solid #e2e8f0;
border-radius: 8px;
font-size: 16px;
transition: all 0.3s ease;
}
#user-input:focus {
outline: none;
border-color: var(--primary-color);
box-shadow: 0 0 0 3px rgba(74, 144, 226, 0.2);
}
button {
padding: 12px 24px;
background: var(--primary-color);
color: white;
border: none;
border-radius: 8px;
cursor: pointer;
transition: all 0.2s ease;
display: flex;
align-items: center;
gap: 8px;
}
button:hover {
background: #357abd;
transform: translateY(-1px);
}
button:disabled {
background: #94a3b8;
cursor: not-allowed;
}
.file-item {
display: flex;
align-items: center;
gap: 8px;
padding: 8px;
background: white;
border-radius: 8px;
margin: 8px 0;
}
.file-icon {
font-size: 20px;
}
.progress-bar {
height: 4px;
background: #e2e8f0;
border-radius: 2px;
overflow: hidden;
margin-top: 8px;
}
.progress {
width: 0%;
height: 100%;
background: var(--primary-color);
transition: width 0.3s ease;
}
@media (max-width: 768px) {
.container {
padding: 10px;
}
.message-content {
max-width: 85%;
}
}
</style>
</head>
<body>
<div class="container">
<h1 style="margin-bottom: 20px; color: #1e293b;">AI 智能助手 <i class="fas fa-robot"></i></h1>
<div class="chat-container">
<div id="chat-box">
<div class="message bot">
<div class="avatar"><i class="fas fa-robot"></i></div>
<div class="message-content">您好!我是智能助手,可以回答各种问题,也可以帮您分析上传的文件</div>
</div>
</div>
<div class="input-area">
<label class="file-upload">
<input type="file" id="file-input" hidden multiple>
<i class="fas fa-cloud-upload-alt"></i> 点击上传文件
</label>
<input type="text" id="user-input" placeholder="输入消息...">
<button onclick="sendMessage()" id="send-btn">
<i class="fas fa-paper-plane"></i> 发送
</button>
</div>
</div>
</div>
<script>
let isSending = false
// 文件上传处理
document.getElementById('file-input').addEventListener('change', async function(e) {
const files = e.target.files
for (let file of files) {
await uploadFile(file)
}
e.target.value = ''
})
async function uploadFile(file) {
const formData = new FormData()
formData.append('file', file)
try {
showFileItem(file)
const response = await fetch('/upload', {
method: 'POST',
body: formData
})
const data = await response.json()
if (response.ok) {
updateFileStatus(file.name, 'success', data.url)
} else {
updateFileStatus(file.name, 'error', data.error)
}
} catch (error) {
updateFileStatus(file.name, 'error', '上传失败')
}
}
function showFileItem(file) {
const chatBox = document.getElementById('chat-box')
const itemId = `file-${Date.now()}`
const html = `
<div class="file-item" id="${itemId}">
<i class="fas fa-file-alt file-icon"></i>
<div style="flex:1">
<div>${file.name}</div>
<div class="progress-bar"><div class="progress"></div></div>
</div>
</div>
`
chatBox.insertAdjacentHTML('beforeend', html)
chatBox.scrollTop = chatBox.scrollHeight
}
function updateFileStatus(filename, status, message) {
const items = document.querySelectorAll('.file-item')
const item = Array.from(items).find(el =>
el.textContent.includes(filename))
if (!item) return
if (status === 'success') {
item.innerHTML = `
<i class="fas fa-check-circle" style="color: var(--success-color)"></i>
<div style="flex:1">
<div>${filename}</div>
<a href="${message}" target="_blank" style="color: var(--primary-color); text-decoration: none">
查看文件 <i class="fas fa-external-link-alt"></i>
</a>
</div>
`
} else {
item.innerHTML = `
<i class="fas fa-times-circle" style="color: var(--error-color)"></i>
<div style="flex:1">
<div>${filename}</div>
<div style="color: var(--error-color)">${message}</div>
</div>
`
}
}
// 消息处理
async function sendMessage() {
if (isSending) return
const input = document.getElementById('user-input')
const message = input.value.trim()
if (!message) return
isSending = true
input.disabled = true
document.getElementById('send-btn').disabled = true
try {
appendMessage(message, 'user')
input.value = ''
const response = await fetch('/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message })
})
const data = await response.json()
if (response.ok) {
appendMessage(data.response, 'bot')
} else {
appendMessage(`错误:${data.error}`, 'bot')
}
} catch (error) {
appendMessage('网络请求失败', 'bot')
} finally {
isSending = false
input.disabled = false
document.getElementById('send-btn').disabled = false
input.focus()
}
}
function appendMessage(message, sender) {
const chatBox = document.getElementById('chat-box')
const isUser = sender === 'user'
const messageEl = document.createElement('div')
messageEl.className = `message ${isUser ? 'user' : 'bot'}`
messageEl.innerHTML = `
<div class="avatar">
${isUser ? '<i class="fas fa-user"></i>' : '<i class="fas fa-robot"></i>'}
</div>
<div class="message-content">${message}</div>
`
chatBox.appendChild(messageEl)
chatBox.scrollTop = chatBox.scrollHeight
}
// 回车发送
document.getElementById('user-input').addEventListener('keypress', (e) => {
if (e.key === 'Enter' && !e.shiftKey) {
e.preventDefault()
sendMessage()
}
})
</script>
</body>
</html>
后端flask自命名
# app.py
import os
from flask import Flask, render_template, request, jsonify, send_from_directory
from werkzeug.utils import secure_filename
import _thread as thread
import base64
import datetime
import hashlib
import hmac
import json
import ssl
from urllib.parse import urlparse, urlencode
from wsgiref.handlers import format_date_time
from time import mktime
import websocket
app = Flask(__name__)
app.config.update({
'UPLOAD_FOLDER': 'uploads',
'MAX_CONTENT_LENGTH': 100 * 1024 * 1024, # 100MB
'ALLOWED_EXTENSIONS': {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif', 'zip', 'exe', 'apk'}
})
# 配置信息(生产环境建议使用环境变量)
CONFIG = {
"APPID": "ba110a39",
"API_SECRET": "NWQ5Nzk1OGY5MjkxNGQyMjMyOGFkNjgy",
"API_KEY": "63f807b3f65c6c98a249c167403901f2",
"GPT_URL": "wss://spark-api.xf-yun.com/v4.0/chat",
"DOMAIN": "4.0Ultra"
}
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
class WsParam:
def __init__(self, appid, api_key, api_secret, gpt_url):
self.APPID = appid
self.APIKey = api_key
self.APISecret = api_secret
self.host = urlparse(gpt_url).netloc
self.path = urlparse(gpt_url).path
self.gpt_url = gpt_url
def create_url(self):
now = datetime.datetime.now()
date = format_date_time(mktime(now.timetuple()))
signature_origin = f"host: {self.host}\ndate: {date}\nGET {self.path} HTTP/1.1"
signature_sha = hmac.new(
self.APISecret.encode("utf-8"),
signature_origin.encode("utf-8"),
digestmod=hashlib.sha256
).digest()
signature_sha_base64 = base64.b64encode(signature_sha).decode("utf-8")
authorization_origin = (
f'api_key="{self.APIKey}", algorithm="hmac-sha256", '
f'headers="host date request-line", signature="{signature_sha_base64}"'
)
authorization = base64.b64encode(authorization_origin.encode("utf-8")).decode("utf-8")
return f"{self.gpt_url}?{urlencode({'authorization': authorization, 'date': date, 'host': self.host})}"
def allowed_file(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in app.config['ALLOWED_EXTENSIONS']
def get_spark_response(query):
response_text = []
def on_message(ws, message):
data = json.loads(message)
if data["header"]["code"] != 0:
raise Exception(f'请求错误: {data["header"]["code"]}, {data["header"]["message"]}')
else:
choices = data["payload"]["choices"]
text = choices["text"][0]["content"]
response_text.append(text)
if choices["status"] == 2:
ws.close()
def on_error(ws, error):
raise Exception(f"WebSocket错误: {error}")
def on_close(ws, *args):
pass
def on_open(ws):
data = json.dumps({
"header": {"app_id": CONFIG["APPID"], "uid": "1234"},
"parameter": {"chat": {"domain": CONFIG["DOMAIN"], "temperature": 0.5, "max_tokens": 4096}},
"payload": {"message": {"text": [{"role": "user", "content": query}]}}
})
ws.send(data)
ws_param = WsParam(CONFIG["APPID"], CONFIG["API_KEY"], CONFIG["API_SECRET"], CONFIG["GPT_URL"])
ws_url = ws_param.create_url()
ws = websocket.WebSocketApp(
ws_url,
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close
)
ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
return "".join(response_text)
@app.route('/')
def index():
return render_template('index.html')
@app.route('/chat', methods=['POST'])
def chat():
try:
user_input = request.json.get('message')
if not user_input:
return jsonify({"error": "空消息"}), 400
ai_response = get_spark_response(user_input)
return jsonify({"response": ai_response})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/upload', methods=['POST'])
def upload_file():
if 'file' not in request.files:
return jsonify({"error": "没有选择文件"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"error": "没有选择文件"}), 400
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
return jsonify({
"filename": filename,
"url": f"/uploads/{filename}"
})
return jsonify({"error": "文件类型不允许"}), 400
@app.route('/uploads/<filename>')
def uploaded_file(filename):
return send_from_directory(app.config['UPLOAD_FOLDER'], filename)
if __name__ == '__main__':
app.run(debug=True)
运行结果(未添加结束对话功能和加载进度,待自行开发)