背景:用户刷新页面或者切换tab页后断链的续流问题
目前有两种方案:
方案一:后端:Mongo + 实时存 (数据库压力大,每个流都要进行一次入库操作) 前端轮询
方案二:后端:Redis + Mongo (推荐, 流的过程中使用Redis,流数据结束后再去入一次库)
前端:调新接口
方案一实现:
public SseEmitter sendinfo (QuestionnaireDTO dto) {
Flux<String> flux = algorithmUtils.code_bswj(dto, webClient);
flux
.doOnError(e -> {
try {
completeEmitter(emitter, e, isCompleted);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
})// 处理客户端断开连接
.doOnComplete(() -> {
try {
completeEmitter(emitter, null, isCompleted);
} catch (IOException e) {
throw new RuntimeException(e);
}
}) // 传null表示正常完成
.subscribe(data -> {
try {
sendDataToEmitter(emitter, data, isCompleted);
} catch (IOException e) {
throw new RuntimeException(e);
}
});// 订阅 Flux 并发送数据到 SseEmitter
return emitter;
}
private void sendDataToEmitter(SseEmitter emitter, String data, AtomicBoolean isCompleted) throws IOException {
if (!isCompleted.get()) {
try {
processChunk(data, questionnaire,isCompleted, traceId);
} catch (IOException | JSONException e) {
if (e instanceof ClientAbortException) {
this.emitter = new SseEmitterUTF8(1000000L);
return;
}
completeEmitter(emitter, e, isCompleted,questionnaire); // 处理发送过程中的异常
}
}
}
private void processChunk( String chunk, Questionnaire questionnaire, AtomicBoolean isCompleted) throws IOException, JSONException {
System.out.println("chunk handing before =========>:" + chunk);
if (!StringUtils.hasText(chunk)) return;
JSONObject jsonObject = new JSONObject(chunk);
if (!jsonObject.getBoolean("is_success")) {
emitter.send(jsonObject.getString("err_msg"));
return;
}
String result = jsonObject.getString("results");
JSONObject jsonObjectRes = new JSONObject(result);
String type = jsonObjectRes.getString("type");//answer
String value = jsonObjectRes.getString("value");
JSONObject response = new JSONObject();
JSONObject results = new JSONObject();
if (type.equals("progress_indicator")) {
JSONObject valueJson = new JSONObject(value);
String text = valueJson.getString("text");
if (text.equals("[DONE]")) {
questionnaireRepository.save(questionnaire);
completeEmitter(emitter, null, isCompleted, questionnaire);
return;
}
results.put("type", "progress_indicator");
results.put("data", text);
if (isBase) {
String progressBase = questionnaire.getProgressBase();
progressBase = progressBase == null ? "" : progressBase;
progressBase += text;
questionnaire.setProgressBase(progressBase);
} else {
String progressCustom = questionnaire.getProgressCustom();
progressCustom = progressCustom == null ? "" : progressCustom;
progressCustom += text;
questionnaire.setProgressCustom(progressCustom);
}
questionnaireRepository.save(questionnaire);
} else if (type.equals("survey")) {
JSONObject valueJson = new JSONObject(value);
String customSurvey = valueJson.getString("custom_survey");
questionnaire.setCustomQuestion(customSurvey);
questionnaireRepository.save(questionnaire);
results.put("type", "survey");
results.put("data", customSurvey);
}else if (type.equals("finished_thinking")) {
JSONObject valueJson = new JSONObject(value);
String text = valueJson.getString("text");
results.put("type", "finished_thinking");
results.put("data", text);
if (isBase) {
questionnaire.setStatus("SurveyGenerating");
}else {
questionnaire.setStatus("MapGenerating");
}
questionnaireRepository.save(questionnaire);
}
response.put("results", results);
if (!isCompleted.get()){
System.out.println("后端发送给前端时间:" + TimeUtil.getCurrentTime());
emitter.send(response.toString());
}
}
通过实时插入数据库,前端感知到刷新或者切换tab后,根据状态轮询调用历史记录接口,因为此时后端与算法的流还没有断开,所以是在实时保存的。前端此时轮询历史接口可以伪造出流式输出的形式,使用户无感知。
方案二实现:(代码几乎同上,就是在处理流式的方法里做了改动)
private void processChunk( String chunk, Questionnaire questionnaire, AtomicBoolean isCompleted, String traceId) throws IOException, JSONException {
System.out.println("chunk handing before =========>:" + chunk);
if (!StringUtils.hasText(chunk)) return;
JSONObject jsonObject = new JSONObject(chunk);
if (!jsonObject.getBoolean("is_success")) {
emitter.send(jsonObject.getString("err_msg"));
return;
}
String result = jsonObject.getString("results");
JSONObject jsonObjectRes = new JSONObject(result);
String type = jsonObjectRes.getString("type");//answer
String value = jsonObjectRes.getString("value");
JSONObject response = new JSONObject();
JSONObject results = new JSONObject();
if (type.equals("progress_indicator")) {
JSONObject valueJson = new JSONObject(value);
String text = valueJson.getString("text");
if (text.equals("[DONE]")) {
questionnaireRepository.save(questionnaire);
//设置redis中key为questionnaire.getId()的过期时间为5分钟,目前redis中是有这个key的
// redisService.expire(questionnaire.getId(), 1);
completeEmitter(emitter, null, isCompleted, questionnaire);
return;
}
results.put("type", "progress_indicator");
results.put("data", text);
if (isBase) {
String progressBase = questionnaire.getProgressBase();
progressBase = progressBase == null ? "" : progressBase;
progressBase += text;
questionnaire.setProgressBase(progressBase);
} else {
String progressCustom = questionnaire.getProgressCustom();
progressCustom = progressCustom == null ? "" : progressCustom;
progressCustom += text;
questionnaire.setProgressCustom(progressCustom);
}
} else if (type.equals("survey")) {
JSONObject valueJson = new JSONObject(value);
String customSurvey = valueJson.getString("custom_survey");
questionnaire.setCustomQuestion(customSurvey);
questionnaireRepository.save(questionnaire);
accompanyLearningLog.uploadLogByTranceId("processChunk", "后端处理定制问卷", "INFO", JsonUtil.object2Json(customSurvey),traceId);
results.put("type", "survey");
results.put("data", customSurvey);
} else if (type.equals("text")) {
JSONObject valueJson = new JSONObject(value);
String text = valueJson.getString("text");
results.put("type", "text");
results.put("data", text);
questionnaire.setPlanCustom(text);
accompanyLearningLog.uploadLogByTranceId("processChunk", "后端处理多链路地图", "INFO", JsonUtil.object2Json(text),traceId);
questionnaireRepository.save(questionnaire);
}else if (type.equals("finished_thinking")) {
JSONObject valueJson = new JSONObject(value);
String text = valueJson.getString("text");
results.put("type", "finished_thinking");
results.put("data", text);
if (isBase) {
questionnaire.setStatus("SurveyGenerating");
}else {
questionnaire.setStatus("MapGenerating");
}
questionnaireRepository.save(questionnaire);
}
response.put("results", results);
//这里有了一个新的逻辑,把思维链保存的redis的队列中,然后如果前端断开连接,想要做断点续传,就从队列中取出,然后继续,队列的key是questionnaire.getId()
// 把思维链保存到 Redis 队列中
redisService.rightPush(questionnaire.getId(), response.toString());
if (!isCompleted.get()){
System.out.println("后端发送给前端时间:" + TimeUtil.getCurrentTime());
emitter.send(response.toString());
}
}
前端刷新页面或者来回切换了tab后会调用我的新接口,也是一个流式输出的接口
/**
* 断点续传方法,用于从 Redis 中获取数据并通过 SseEmitter 发送给前端。
* 如果数据库中有符合条件的数据则直接处理,否则从 Redis 中获取数据。
*/
@Override
public SseEmitter refreshStream(QuestionnaireDTO dto, String userId, String traceId) {
// 创建一个 SseEmitter 对象,设置超时时间为 1000000 毫秒
SseEmitter emitter = new SseEmitterUTF8(1000000L);
// 先从数据库取数据,如果取到了就不查redis了
/*Questionnaire questionnaire = questionnaireRepository.findByUserId(userId);
if (questionnaire != null) {
try {
if(questionnaire.getStatus().equals("baseCompleted")||questionnaire.getStatus().equals("customCompleted")){
// 若问卷状态为 baseCompleted 或 customCompleted,发送 [DONE] 并完成 SseEmitter
emitter.send("[DONE]");
emitter.complete();
}
return emitter;
} catch (Exception e) {
// 处理异常,将错误信息记录到日志并完成 SseEmitter
emitter.completeWithError(e);
accompanyLearningLog.uploadLogByTranceId("refreshStream", "从数据库获取数据发送失败", "ERROR", JsonUtil.object2Json(e), traceId);
return emitter;
}
}*/
// 获取 Redis 中数据列表的键
String key = dto.getId();
// 从redis中取key为dto.getId()的列表的长度
Long listLength = redisService.size(key);
// 若 Redis 列表长度不为空且大于 0
if (listLength != null && listLength > 0) {
// 记录这个长度
final long[] initialLength = {listLength};
// 从redis中取列表中的数据
List<Object> redisDataList = redisService.range(key, 0, initialLength[0] - 1);
// 用于拼接 type 为 progress_indicator 的 data 数据
StringBuilder progressData = new StringBuilder();
// 标记 SseEmitter 是否已经完成
AtomicBoolean isEmitterCompleted = new AtomicBoolean(false);
// 遍历 Redis 数据列表
for (Object data : redisDataList) {
try {
// 将数据转换为 JSONObject
JSONObject jsonData = new JSONObject(data.toString());
// 获取 results 字段
JSONObject results = jsonData.getJSONObject("results");
// 获取 type 字段
String type = results.getString("type");
if("generate_indicator".equals(type)){
// 若 type 为 generate_indicator,直接发送数据
emitter.send(data.toString());
}else if ("progress_indicator".equals(type)) {
// 若 type 为 progress_indicator,拼接 data 数据
progressData.append(results.getString("data"));
} else {
// 若 SseEmitter 未完成,发送非 progress_indicator 类型的数据
if (!isEmitterCompleted.get()) {
System.out.println("data:" + data.toString());
emitter.send(data.toString());
}
}
} catch (JSONException e) {
}catch (IOException e) {
// 处理发送数据时的 IO 异常,完成 SseEmitter 并记录日志
emitter.completeWithError(e);
}
}
// 以{"results":{"type":"progress_indicator","data":"所有的data"}}格式推给前端
try {
// 若 progressData 不为空
if (progressData.length() > 0) {
// 创建响应 JSON 对象
JSONObject response = new JSONObject();
JSONObject results = new JSONObject();
results.put("type", "progress_indicator");
results.put("data", progressData.toString());
response.put("results", results);
System.out.println("response:"+response.toString());
// 若 SseEmitter 未完成,发送拼接后的 progress_indicator 数据
if (!isEmitterCompleted.get()) {
emitter.send(response.toString());
}
}
// 从记录的长度开始,后面的就不需要拼接了,直接取到后推给前端就可以了
// 使用数组包装 subscription
final Disposable[] subscription = new Disposable[1];
// 每秒检查一次 Redis 列表长度是否有变化
subscription[0] = Flux.interval(Duration.ofSeconds(1))
.subscribe(interval -> {
// 获取当前 Redis 列表长度
Long currentLength = redisService.size(key);
if (currentLength != null && currentLength > initialLength[0]) {
// 获取新增的数据
List<Object> newDataList = redisService.range(key, initialLength[0], currentLength - 1);
for (Object newData : newDataList) {
try {
if(newData.toString().equals("[DONE]")){
// 若数据为 [DONE],发送数据,完成 SseEmitter,取消订阅并删除 Redis 键
if (!isEmitterCompleted.get()) {
emitter.send(newData.toString());
emitter.complete();
isEmitterCompleted.set(true);
// 取消订阅
if (subscription[0] != null) {
subscription[0].dispose();
}
redisService.delete(key);
}
return;
}
System.out.println("newData:"+newData.toString());
// 若 SseEmitter 未完成,发送新增数据
if (!isEmitterCompleted.get()) {
emitter.send(newData.toString());
try {
// 线程休眠 50 毫秒
Thread.sleep(50);
} catch (InterruptedException e) {
// 处理线程休眠中断异常,完成 SseEmitter
Thread.currentThread().interrupt();
emitter.completeWithError(e);
}
}
} catch (IOException e) {
emitter.completeWithError(e);
}
}
// 更新初始长度
initialLength[0] = currentLength;
}
});
} catch (Exception e) {
// 处理异常,完成 SseEmitter 并记录日志
emitter.completeWithError(e);
}
} else {
try {
// 若 Redis 列表长度为空或为 0,完成 SseEmitter
emitter.complete();
} catch (Exception e) {
// 处理异常,完成 SseEmitter 并记录日志
emitter.completeWithError(e);
}
}
return emitter;
}