在uniapp里面处理流式请求相对于web端来说有点麻烦,下面我将讲述几种处理流式请求的方式。
1.websocket
WebSocket 是处理实时数据流的最佳选择之一,UniApp 提供了原生的 WebSocket 支持:
<template>
<view class="container">
<scroll-view scroll-y class="data-container" :scroll-top="scrollTop">
<view v-for="(item, index) in messages" :key="index" class="message-item">
{{ item }}
</view>
</scroll-view>
<view class="control-panel">
<button @click="connectSocket" type="primary" :disabled="isConnected">连接</button>
<button @click="closeSocket" type="warn" :disabled="!isConnected">断开</button>
</view>
</view>
</template>
<script>
export default {
data() {
return {
socketTask: null,
isConnected: false,
messages: [],
scrollTop: 0
}
},
methods: {
connectSocket() {
this.socketTask = uni.connectSocket({
url: 'wss://your-websocket-server.com/stream',
success: () => {
console.log('准备连接...')
}
})
this.socketTask.onOpen(() => {
this.isConnected = true
this.addMessage('连接已建立')
// 订阅数据流
this.socketTask.send({
data: JSON.stringify({action: 'subscribe'})
})
})
this.socketTask.onMessage((res) => {
this.addMessage(res.data)
})
this.socketTask.onClose(() => {
this.isConnected = false
this.addMessage('连接已关闭')
})
this.socketTask.onError((res) => {
this.addMessage('错误: ' + JSON.stringify(res))
})
},
closeSocket() {
if (this.socketTask && this.isConnected) {
this.socketTask.close()
}
},
addMessage(msg) {
this.messages.push(typeof msg === 'string' ? msg : JSON.stringify(msg))
this.$nextTick(() => {
this.scrollTop = 99999 // 滚动到底部
})
}
},
onUnload() {
// 页面卸载时关闭连接
this.closeSocket()
}
}
</script>
<style>
.container {
display: flex;
flex-direction: column;
height: 100vh;
padding: 20rpx;
}
.data-container {
flex: 1;
border: 1px solid #eee;
padding: 20rpx;
margin-bottom: 20rpx;
background-color: #f9f9f9;
}
.message-item {
padding: 10rpx;
border-bottom: 1px solid #eee;
word-break: break-all;
}
.control-panel {
display: flex;
justify-content: space-around;
padding: 20rpx 0;
}
</style>
2.uni.request 处理流式请求
这种方式适用于h5页面和小程序,不适用于app
<template>
<view class="container">
<view class="stream-container">
<view v-for="(item, index) in streamData" :key="index" class="stream-item">
{{ item }}
</view>
</view>
<button @click="startStream" type="primary">开始接收流数据</button>
</view>
</template>
<script>
export default {
data() {
return {
streamData: [],
dataBuffer: ''
}
},
methods: {
startStream() {
uni.request({
url: 'https://your-stream-api.com/stream',
method: 'GET',
enableChunked: true,
dataType: 'text',
onChunkReceived: (res) => {
this.handleChunk(res.data);
},
success: (res) => {
uni.showToast({
title: '流数据接收完成',
icon: 'success'
});
},
fail: (err) => {
uni.showModal({
title: '错误',
content: '流数据接收失败: ' + JSON.stringify(err),
showCancel: false
});
}
});
},
handleChunk(chunk) {
// 将接收到的数据添加到缓冲区
this.dataBuffer += chunk;
// 处理可能的换行符分隔的数据
const lines = this.dataBuffer.split('\n');
// 保留最后一个可能不完整的行
this.dataBuffer = lines.pop() || '';
// 处理完整的行
for (const line of lines) {
if (line.trim()) {
try {
// 尝试解析JSON
const data = JSON.parse(line);
this.streamData.push(JSON.stringify(data));
} catch (e) {
// 非JSON数据直接显示
this.streamData.push(line);
}
}
}
}
}
}
</script>
<style>
.container {
padding: 20px;
}
.stream-container {
border: 1px solid #eee;
padding: 10px;
margin-bottom: 20px;
max-height: 300px;
overflow-y: auto;
}
.stream-item {
padding: 5px 0;
border-bottom: 1px solid #f5f5f5;
}
</style>
3.使用 SSE (Server-Sent Events)
虽然 UniApp 没有原生的 SSE API,但可以通过封装 XMLHttpRequest 来实现 SSE:
function createSSEConnection(url) {
// 创建一个标准的XMLHttpRequest对象
const xhr = new XMLHttpRequest()
xhr.open('GET', url, true)
xhr.setRequestHeader('Accept', 'text/event-stream')
xhr.setRequestHeader('Cache-Control', 'no-cache')
// 设置响应类型为文本
xhr.responseType = 'text'
// 数据缓冲区
let buffer = ''
// 处理进度事件
xhr.onprogress = function(e) {
// 获取新数据
const newData = xhr.responseText.substring(buffer.length)
if (newData) {
buffer += newData
// 按行分割数据
const lines = newData.split('\n')
for (const line of lines) {
if (line.startsWith('data:')) {
const eventData = line.substring(5).trim()
// 触发数据处理
handleSSEData(eventData)
}
}
}
}
xhr.onerror = function(e) {
console.error('SSE连接错误:', e)
}
xhr.onreadystatechange = function() {
if (xhr.readyState === 4) {
if (xhr.status === 200) {
console.log('SSE连接完成')
} else {
console.error('SSE连接失败:', xhr.status)
}
}
}
// 发送请求
xhr.send()
return xhr
}
// 处理SSE数据
function handleSSEData(data) {
try {
const parsedData = JSON.parse(data)
console.log('收到SSE数据:', parsedData)
// 处理数据...
} catch (e) {
console.log('收到SSE文本:', data)
// 处理非JSON数据...
}
}
// 使用方法
const sseConnection = createSSEConnection('https://your-sse-endpoint.com/events')
// 关闭连接
function closeSSE() {
if (sseConnection) {
sseConnection.abort()
}
}
4.使用分页和轮询模拟流
对于不支持真正的流式请求的场景,可以使用分页和轮询来模拟流式体验:
// 轮询获取数据
let lastId = 0
let isPolling = false
function startPolling() {
isPolling = true
poll()
}
function stopPolling() {
isPolling = false
}
function poll() {
if (!isPolling) return
uni.request({
url: 'https://your-api.com/data',
data: {
last_id: lastId,
limit: 10
},
success: (res) => {
const data = res.data
if (data && data.items && data.items.length > 0) {
// 处理接收到的数据
processItems(data.items)
// 更新最后ID用于下次请求
lastId = data.items[data.items.length - 1].id
}
// 如果还有更多数据,继续轮询
if (data.has_more) {
setTimeout(poll, 1000) // 1秒后再次轮询
} else {
console.log('所有数据接收完毕')
isPolling = false
}
},
fail: (err) => {
console.error('轮询失败:', err)
// 错误后延迟重试
setTimeout(() => {
if (isPolling) poll()
}, 3000)
}
})
}
5.app端适用renderjs
该方式适用于app端进行流式请求,在app端上面的几种方式我都尝试过除了websocket没有一个能用的,经过反复的查询才找到适用renderjs这种方式。
RenderJS 是 UniApp 提供的一个运行在视图层的 JavaScript 引擎,允许开发者直接操作 DOM 和使用浏览器特有的 API。
<view
class=""
:sseValue="sseValue"
:change:sseValue="renderScript.getSseValue"
:messagesRenderjs="messagesRenderjs"
:change:messagesRenderjs="renderScript.getMessage"
:downSend="downSend"
:change:downSend="renderScript.getDownSend"
:modelchangeValue="modelchangeValue"
:change:modelchangeValue="renderScript.getModel"
></view>
通过这种方式来调用renderjs
<script module="renderScript" lang="renderjs">
export default {
data() {
return {
VITE_AIR14B_url: '',
VITE_AILOCAL:'',
VITE_ANYTHING:'',
modelValue: 'qwen2.5:14b',
messages: [],
downValue: false,
loading: false,
downSentValue: false,
biaoshi: false,
model:'qianwen'
};
},
methods: {
getMessage(val) {
// console.log(val,'message')
this.messages = val;
},
async getSseValue(val) {
console.log(val,'val')
if(this.model=='qianwen'){
await this.changeModel(val);
}else if(this.model=='locel'){
await this.changeModelLocal(val)
}else if(this.model=='water'){
await this.changeModelAnything(val)
}else if(this.model=='sxyd'){
console.log('sxyd')
}
},
// 暂停生成
getDownSend(val) {
console.log(val, 'valvalval');
if (!this.biaoshi) {
this.biaoshi = true;
} else {
this.downSentValue = true;
}
//停止生成的标志
// this.downSentValue=true
},
// 获取模型
getModel(val) {
console.log(val);
this.model=val
},
// 接收流式数据qianwen
async changeModel(val) {
let params = {
model: this.modelValue,
messages: this.messages,
stream: true
};
try {
// 发送请求到 API
const response = await fetch(this.VITE_AIR14B_url + '/api/chat', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(params)
});
// 获取流式响应体
const reader = response.body.getReader();
const decoder = new TextDecoder('utf-8'); // 解码器,将字节流转换为字符串
this.downValue = false;
let value = ''; // 用于拼接收到的每一部分 response 内容
// 获取响应之后关闭loading
// this.$ownerInstance.callMethod('closeLoading', null);
this.messages.push({
role: 'assistant',
content: ''
});
// 逐块读取数据
while (!this.downValue) {
// 读取数据块
const { done: isDone, value: chunk } = await reader.read();
// if (this.onDown.value) {
// this.downValue = true;
// break;
// }
if (this.downSentValue) {
this.downValue = true;
this.downSentValue = false;
this.$ownerInstance.callMethod('setSend', true);
this.messages[this.messages.length - 1].content += '\n\n[已取消生成]';
this.$ownerInstance.callMethod('setMessageValue', this.messages);
break;
}
this.downValue = isDone;
if (this.downValue) {
this.$ownerInstance.callMethod('setSend', true);
} else {
this.$ownerInstance.callMethod('setSend', false);
}
let chunkString = decoder.decode(chunk, { stream: true });
// 将字节流转换为字符串并追加到 value
value += chunkString;
// 打印已接收的部分数据
// console.log('Received chunk:', chunkString);
if (chunkString && this.messages[this.messages.length - 1].role === 'assistant') {
try {
const parsedChunk = JSON.parse(chunkString);
const content = parsedChunk?.message?.content; // 使用可选链避免属性访问错误
if (content === undefined) {
throw new Error('Invalid chunk structure: Missing message.content');
}
// 确保 messages 数组非空
if (this.messages.length === 0) {
throw new Error('Messages array is empty');
}
this.messages[this.messages.length - 1].content += content;
if(this.messages[this.messages.length - 1].content){
// 获取响应之后关闭loading
this.$ownerInstance.callMethod('closeLoading', null);
}
// 将值传递出去
this.$ownerInstance.callMethod('setMessageValue', this.messages);
} catch (error) {
console.error('处理 chunk 时发生错误:', error);
}
}
}
// console.log('Final response:', value);
} catch (error) {
console.error('Error during fetch request:', error);
this.loading = false;
// 显示错误消息
if (this.messages.length > 0 && this.messages[this.messages.length - 1].role === 'assistant') {
this.messages[this.messages.length - 1].content += '\n\n[生成回答时出现错误]';
}
}
},
}
};
</script>