uniapp处理流式请求

发布于:2025-03-25 ⋅ 阅读:(13) ⋅ 点赞:(0)

在uniapp里面处理流式请求相对于web端来说有点麻烦,下面我将讲述几种处理流式请求的方式。

1.websocket

WebSocket 是处理实时数据流的最佳选择之一,UniApp 提供了原生的 WebSocket 支持:

<template>
  <view class="container">
    <scroll-view scroll-y class="data-container" :scroll-top="scrollTop">
      <view v-for="(item, index) in messages" :key="index" class="message-item">
        {{ item }}
      </view>
    </scroll-view>
    <view class="control-panel">
      <button @click="connectSocket" type="primary" :disabled="isConnected">连接</button>
      <button @click="closeSocket" type="warn" :disabled="!isConnected">断开</button>
    </view>
  </view>
</template>

<script>
export default {
  data() {
    return {
      socketTask: null,
      isConnected: false,
      messages: [],
      scrollTop: 0
    }
  },
  methods: {
    connectSocket() {
      this.socketTask = uni.connectSocket({
        url: 'wss://your-websocket-server.com/stream',
        success: () => {
          console.log('准备连接...')
        }
      })
      
      this.socketTask.onOpen(() => {
        this.isConnected = true
        this.addMessage('连接已建立')
        // 订阅数据流
        this.socketTask.send({
          data: JSON.stringify({action: 'subscribe'})
        })
      })
      
      this.socketTask.onMessage((res) => {
        this.addMessage(res.data)
      })
      
      this.socketTask.onClose(() => {
        this.isConnected = false
        this.addMessage('连接已关闭')
      })
      
      this.socketTask.onError((res) => {
        this.addMessage('错误: ' + JSON.stringify(res))
      })
    },
    
    closeSocket() {
      if (this.socketTask && this.isConnected) {
        this.socketTask.close()
      }
    },
    
    addMessage(msg) {
      this.messages.push(typeof msg === 'string' ? msg : JSON.stringify(msg))
      this.$nextTick(() => {
        this.scrollTop = 99999 // 滚动到底部
      })
    }
  },
  onUnload() {
    // 页面卸载时关闭连接
    this.closeSocket()
  }
}
</script>

<style>
.container {
  display: flex;
  flex-direction: column;
  height: 100vh;
  padding: 20rpx;
}
.data-container {
  flex: 1;
  border: 1px solid #eee;
  padding: 20rpx;
  margin-bottom: 20rpx;
  background-color: #f9f9f9;
}
.message-item {
  padding: 10rpx;
  border-bottom: 1px solid #eee;
  word-break: break-all;
}
.control-panel {
  display: flex;
  justify-content: space-around;
  padding: 20rpx 0;
}
</style>

2.uni.request 处理流式请求

这种方式适用于h5页面和小程序,不适用于app

<template>
  <view class="container">
    <view class="stream-container">
      <view v-for="(item, index) in streamData" :key="index" class="stream-item">
        {{ item }}
      </view>
    </view>
    <button @click="startStream" type="primary">开始接收流数据</button>
  </view>
</template>

<script>
export default {
  data() {
    return {
      streamData: [],
      dataBuffer: ''
    }
  },
  methods: {
    startStream() {
      uni.request({
        url: 'https://your-stream-api.com/stream',
        method: 'GET',
        enableChunked: true,
        dataType: 'text',
        onChunkReceived: (res) => {
          this.handleChunk(res.data);
        },
        success: (res) => {
          uni.showToast({
            title: '流数据接收完成',
            icon: 'success'
          });
        },
        fail: (err) => {
          uni.showModal({
            title: '错误',
            content: '流数据接收失败: ' + JSON.stringify(err),
            showCancel: false
          });
        }
      });
    },
    
    handleChunk(chunk) {
      // 将接收到的数据添加到缓冲区
      this.dataBuffer += chunk;
      
      // 处理可能的换行符分隔的数据
      const lines = this.dataBuffer.split('\n');
      
      // 保留最后一个可能不完整的行
      this.dataBuffer = lines.pop() || '';
      
      // 处理完整的行
      for (const line of lines) {
        if (line.trim()) {
          try {
            // 尝试解析JSON
            const data = JSON.parse(line);
            this.streamData.push(JSON.stringify(data));
          } catch (e) {
            // 非JSON数据直接显示
            this.streamData.push(line);
          }
        }
      }
    }
  }
}
</script>

<style>
.container {
  padding: 20px;
}
.stream-container {
  border: 1px solid #eee;
  padding: 10px;
  margin-bottom: 20px;
  max-height: 300px;
  overflow-y: auto;
}
.stream-item {
  padding: 5px 0;
  border-bottom: 1px solid #f5f5f5;
}
</style>

3.使用 SSE (Server-Sent Events)

虽然 UniApp 没有原生的 SSE API,但可以通过封装 XMLHttpRequest 来实现 SSE:

function createSSEConnection(url) {
  // 创建一个标准的XMLHttpRequest对象
  const xhr = new XMLHttpRequest()
  xhr.open('GET', url, true)
  xhr.setRequestHeader('Accept', 'text/event-stream')
  xhr.setRequestHeader('Cache-Control', 'no-cache')
  
  // 设置响应类型为文本
  xhr.responseType = 'text'
  
  // 数据缓冲区
  let buffer = ''
  
  // 处理进度事件
  xhr.onprogress = function(e) {
    // 获取新数据
    const newData = xhr.responseText.substring(buffer.length)
    if (newData) {
      buffer += newData
      
      // 按行分割数据
      const lines = newData.split('\n')
      for (const line of lines) {
        if (line.startsWith('data:')) {
          const eventData = line.substring(5).trim()
          // 触发数据处理
          handleSSEData(eventData)
        }
      }
    }
  }
  
  xhr.onerror = function(e) {
    console.error('SSE连接错误:', e)
  }
  
  xhr.onreadystatechange = function() {
    if (xhr.readyState === 4) {
      if (xhr.status === 200) {
        console.log('SSE连接完成')
      } else {
        console.error('SSE连接失败:', xhr.status)
      }
    }
  }
  
  // 发送请求
  xhr.send()
  
  return xhr
}

// 处理SSE数据
function handleSSEData(data) {
  try {
    const parsedData = JSON.parse(data)
    console.log('收到SSE数据:', parsedData)
    // 处理数据...
  } catch (e) {
    console.log('收到SSE文本:', data)
    // 处理非JSON数据...
  }
}

// 使用方法
const sseConnection = createSSEConnection('https://your-sse-endpoint.com/events')

// 关闭连接
function closeSSE() {
  if (sseConnection) {
    sseConnection.abort()
  }
}

4.使用分页和轮询模拟流

对于不支持真正的流式请求的场景,可以使用分页和轮询来模拟流式体验:

// 轮询获取数据
let lastId = 0
let isPolling = false

function startPolling() {
  isPolling = true
  poll()
}

function stopPolling() {
  isPolling = false
}

function poll() {
  if (!isPolling) return
  
  uni.request({
    url: 'https://your-api.com/data',
    data: {
      last_id: lastId,
      limit: 10
    },
    success: (res) => {
      const data = res.data
      if (data && data.items && data.items.length > 0) {
        // 处理接收到的数据
        processItems(data.items)
        
        // 更新最后ID用于下次请求
        lastId = data.items[data.items.length - 1].id
      }
      
      // 如果还有更多数据,继续轮询
      if (data.has_more) {
        setTimeout(poll, 1000) // 1秒后再次轮询
      } else {
        console.log('所有数据接收完毕')
        isPolling = false
      }
    },
    fail: (err) => {
      console.error('轮询失败:', err)
      // 错误后延迟重试
      setTimeout(() => {
        if (isPolling) poll()
      }, 3000)
    }
  })
}

5.app端适用renderjs

该方式适用于app端进行流式请求,在app端上面的几种方式我都尝试过除了websocket没有一个能用的,经过反复的查询才找到适用renderjs这种方式。

RenderJS 是 UniApp 提供的一个运行在视图层的 JavaScript 引擎,允许开发者直接操作 DOM 和使用浏览器特有的 API。

<view
			class=""
			:sseValue="sseValue"
			:change:sseValue="renderScript.getSseValue"
			:messagesRenderjs="messagesRenderjs"
			:change:messagesRenderjs="renderScript.getMessage"
			:downSend="downSend"
			:change:downSend="renderScript.getDownSend"
			:modelchangeValue="modelchangeValue"
			:change:modelchangeValue="renderScript.getModel"
		></view>
通过这种方式来调用renderjs
<script module="renderScript" lang="renderjs">
export default {
	data() {
		return {
			VITE_AIR14B_url: '',
			VITE_AILOCAL:'',
			VITE_ANYTHING:'',
			modelValue: 'qwen2.5:14b',
			messages: [],
			downValue: false,
			loading: false,
			downSentValue: false,
			biaoshi: false,
			model:'qianwen'
		};
	},
	methods: {
		getMessage(val) {
			// console.log(val,'message')
			this.messages = val;
		},
		async getSseValue(val) {
			console.log(val,'val')
			if(this.model=='qianwen'){
				await this.changeModel(val);
			}else if(this.model=='locel'){
				await this.changeModelLocal(val)
			}else if(this.model=='water'){
				await this.changeModelAnything(val)
			}else if(this.model=='sxyd'){
				console.log('sxyd')
			}
		},
		// 暂停生成
		getDownSend(val) {
			console.log(val, 'valvalval');
			if (!this.biaoshi) {
				this.biaoshi = true;
			} else {
				this.downSentValue = true;
			}

			//停止生成的标志
			// this.downSentValue=true
		},
		// 获取模型
		getModel(val) {
			console.log(val);
			this.model=val
		},
		// 接收流式数据qianwen
		async changeModel(val) {
			let params = {
				model: this.modelValue,
				messages: this.messages,
				stream: true
			};
			try {
				// 发送请求到 API
				const response = await fetch(this.VITE_AIR14B_url + '/api/chat', {
					method: 'POST',
					headers: {
						'Content-Type': 'application/json'
					},
					body: JSON.stringify(params)
				});
				// 获取流式响应体
				const reader = response.body.getReader();
				const decoder = new TextDecoder('utf-8'); // 解码器,将字节流转换为字符串
				this.downValue = false;
				let value = ''; // 用于拼接收到的每一部分 response 内容
				// 获取响应之后关闭loading
				// this.$ownerInstance.callMethod('closeLoading', null);

				this.messages.push({
					role: 'assistant',
					content: ''
				});
				// 逐块读取数据
				while (!this.downValue) {
					// 读取数据块
					const { done: isDone, value: chunk } = await reader.read();
					// if (this.onDown.value) {
					//     this.downValue = true;
					//     break;
					// }
					if (this.downSentValue) {
						this.downValue = true;
						this.downSentValue = false;
						this.$ownerInstance.callMethod('setSend', true);
						this.messages[this.messages.length - 1].content += '\n\n[已取消生成]';
						this.$ownerInstance.callMethod('setMessageValue', this.messages);
						break;
					}
					this.downValue = isDone;
					if (this.downValue) {
						this.$ownerInstance.callMethod('setSend', true);
					} else {
						this.$ownerInstance.callMethod('setSend', false);
					}
					let chunkString = decoder.decode(chunk, { stream: true });
					// 将字节流转换为字符串并追加到 value
					value += chunkString;
					// 打印已接收的部分数据
					// console.log('Received chunk:', chunkString);
					if (chunkString && this.messages[this.messages.length - 1].role === 'assistant') {
						try {
							const parsedChunk = JSON.parse(chunkString);
							const content = parsedChunk?.message?.content; // 使用可选链避免属性访问错误
							if (content === undefined) {
								throw new Error('Invalid chunk structure: Missing message.content');
							}
							// 确保 messages 数组非空
							if (this.messages.length === 0) {
								throw new Error('Messages array is empty');
							}
							this.messages[this.messages.length - 1].content += content;
							if(this.messages[this.messages.length - 1].content){
								// 获取响应之后关闭loading
							    this.$ownerInstance.callMethod('closeLoading', null);
							}
							// 将值传递出去
							this.$ownerInstance.callMethod('setMessageValue', this.messages);
						} catch (error) {
							console.error('处理 chunk 时发生错误:', error);
						}
					}
				}
				// console.log('Final response:', value);
			} catch (error) {
				console.error('Error during fetch request:', error);
				this.loading = false;
				// 显示错误消息
				if (this.messages.length > 0 && this.messages[this.messages.length - 1].role === 'assistant') {
					this.messages[this.messages.length - 1].content += '\n\n[生成回答时出现错误]';
				}
			}
		},
	}
};
</script>


网站公告

今日签到

点亮在社区的每一天
去签到