以下是一个完整的 JavaScript 数据流式获取实现方案,模拟类似 DeepSeek 的对话式逐段返回效果。包含前端实现、后端模拟和详细注释:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>流式对话演示</title>
<style>
#output {
border: 1px solid #ccc;
padding: 20px;
margin: 20px;
min-height: 200px;
font-family: monospace;
white-space: pre-wrap;
}
.loading {
display: none;
color: #666;
}
</style>
</head>
<body>
<button onclick="startStream()">开始对话</button>
<div id="output"></div>
<div id="loading" class="loading">思考中...</div>
<script>
// 流式对话核心逻辑
class StreamDialog {
constructor() {
this.output = document.getElementById('output');
this.loading = document.getElementById('loading');
this.controller = null; // 用于中止请求
}
// 开始流式对话
async start() {
try {
this.showLoading();
this.output.textContent = '';
// 创建中止控制器
this.controller = new AbortController();
// 发起 fetch 请求
const response = await fetch('/stream', {
signal: this.controller.signal,
headers: {
'Content-Type': 'application/json'
}
});
if (!response.ok) throw new Error(response.statusText);
if (!response.body) throw new Error("ReadableStream not supported");
// 创建读取器
const reader = response.body.getReader();
const decoder = new TextDecoder();
// 持续读取流数据
while (true) {
const { done, value } = await reader.read();
if (done) break;
// 解析数据块
const chunk = decoder.decode(value);
const lines = chunk.split('\n').filter(line => line.trim());
for (const line of lines) {
try {
const event = JSON.parse(line.replace('data: ', ''));
this.appendContent(event.content);
} catch (e) {
console.error('解析错误:', e);
}
}
}
} catch (error) {
if (error.name !== 'AbortError') {
this.appendContent('\n[对话异常终止]');
}
} finally {
this.hideLoading();
this.controller = null;
}
}
// 中止对话
abort() {
if (this.controller) {
this.controller.abort();
}
}
// 添加内容到输出区
appendContent(text) {
// 模拟打字机效果
let currentIndex = 0;
const animate = () => {
if (currentIndex < text.length) {
this.output.textContent += text[currentIndex];
currentIndex++;
requestAnimationFrame(animate);
this.output.scrollTop = this.output.scrollHeight; // 自动滚动
}
};
animate();
}
showLoading() {
this.loading.style.display = 'block';
}
hideLoading() {
this.loading.style.display = 'none';
}
}
// 初始化对话实例
const dialog = new StreamDialog();
// 启动对话
function startStream() {
dialog.start();
}
// 中止对话
function stopStream() {
dialog.abort();
}
</script>
</body>
</html>
后端 Node.js 实现(Express):
const express = require('express');
const app = express();
const port = 3000;
// 允许跨域
app.use((req, res, next) => {
res.header('Access-Control-Allow-Origin', '*');
next();
});
// 流式端点
app.get('/stream', (req, res) => {
// 设置流式响应头
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});
// 模拟的对话数据
const responseText = [
"你好!我是 DeepSeek 的智能助手。",
"我正在分析您的问题...",
"根据现有数据,建议如下:\n",
"1. 首先检查网络连接\n",
"2. 验证 API 密钥有效性\n",
"3. 查看服务状态面板\n",
"\n需要更详细的帮助吗?"
];
// 发送流式数据
let index = 0;
const sendChunk = () => {
if (index < responseText.length) {
const chunk = {
content: responseText[index],
finished: index === responseText.length - 1
};
// SSE 格式要求
res.write(`data: ${JSON.stringify(chunk)}\n\n`);
index++;
setTimeout(sendChunk, 500); // 控制发送速度
} else {
res.end();
}
};
sendChunk();
// 处理客户端断开连接
req.on('close', () => {
console.log('客户端断开连接');
res.end();
});
});
app.listen(port, () => {
console.log(`服务运行在 http://localhost:${port}`);
});
核心实现原理:
- 前端实现:
- 使用 Fetch API + ReadableStream 处理流式数据
- 支持请求中止(AbortController)
- 实现打字机动画效果
- 自动滚动保持内容可见
- 加载状态指示
- 后端实现:
- 使用 Server-Sent Events (SSE) 协议
- 分块发送 JSON 格式数据
- 模拟真实响应延迟(500ms/块)
- 支持客户端断开检测
- 数据格式:
{
"content": "当前内容片段",
"finished": false
}
优化扩展建议:
- 性能优化:
// 添加节流控制
const throttle = (func, limit) => {
let lastFunc;
let lastRan;
return function(...args) {
if (!lastRan) {
func.apply(this, args);
lastRan = Date.now();
} else {
clearTimeout(lastFunc);
lastFunc = setTimeout(() => {
if ((Date.now() - lastRan) >= limit) {
func.apply(this, args);
lastRan = Date.now();
}
}, limit - (Date.now() - lastRan));
}
};
};
// 在 appendContent 中使用
this.appendContent = throttle(this.appendContent, 50);
- 错误处理增强:
// 前端添加错误处理
async start() {
try {
// ...原有逻辑...
} catch (error) {
if (error.name === 'AbortError') {
this.appendContent('\n[对话已中止]');
} else {
this.appendContent(`\n[错误: ${error.message}]`);
console.error('Stream Error:', error);
}
}
}
// 后端添加错误模拟
app.get('/stream', (req, res) => {
// 10% 概率模拟错误
if (Math.random() < 0.1) {
res.writeHead(500);
res.end();
return;
}
// ...原有逻辑...
});
- 功能扩展:
// 添加 Markdown 支持
appendContent(text) {
// 简单 Markdown 解析
const parsed = text
.replace(/#{3}/g, '<h3>')
.replace(/#{2}/g, '<h2>')
.replace(/#{1}/g, '<h1>')
.replace(/\*\*(.*?)\*\*/g, '<strong>$1</strong>');
// 使用 DocumentFragment 优化渲染
const fragment = document.createDocumentFragment();
const span = document.createElement('span');
span.innerHTML = parsed;
fragment.appendChild(span);
this.output.appendChild(fragment);
}
这个实现方案完整展示了:
- 前后端流式通信的全流程
- 实时内容渲染优化技巧
- 完整的错误处理机制
- 可扩展的架构设计
- 用户体验优化细节
实际部署时,建议:
- 使用 WebSocket 替代 SSE 实现双向通信
- 添加 JWT 鉴权
- 实现速率限制(Rate Limiting)
- 部署到支持 HTTP/2 的服务器
- 添加前端缓存策略