消息队列导致数据库数据读取不一致解决方案

发布于:2025-03-20 ⋅ 阅读:(19) ⋅ 点赞:(0)

我使用的是在数据库添加一个版本字段,记录版本,保证版本一致性,就能保证每次读取的是需要的内容。

具体问题:使用消息队列时,发送方给接收方发送消息,接收方修改了数据库的同时发送方查询数据库,由于是异步操作,会导致发送方读取的数据是修改前的数据,导致数据不一致的问题。

代码:

发送方

// 创建查询条件
        QueryWrapper<QuestionTest> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("userId", loginUser.getId()).eq("questionId", questionId);
// 查询是否存在记录
        QuestionTest existingRecord = questionTestMapper.selectOne(queryWrapper);
 插入一条新记录

        existingRecord.setQuestionId(questionId);
        existingRecord.setUserId(loginUser.getId());
        if (existingRecord.getId() == null) {
            existingRecord.setVersion(Long.valueOf(0));
            questionTestMapper.insert(existingRecord);
        }
        Long id = existingRecord.getId(); // 获取新插入记录的id
        Long version = existingRecord.getVersion();
        existingRecord.setVersion(version);
        Map<String, Object> taskMessage = new HashMap<>();
        taskMessage.put("id", id);
        taskMessage.put("questionId", questionId);
        taskMessage.put("userId", loginUser.getId());
        taskMessage.put("codeLanguage", questionSubmitAddRequest.getCodeLanguage());
        taskMessage.put("userCode", questionSubmitAddRequest.getUserCode());
        taskMessage.put("input", questionSubmitAddRequest.getUserJudgeCase().getInput());
        taskMessage.put("answer", question.getAnswer());
        taskMessage.put("version", version);
//        System.out.println(taskMessage);
        String message = JSONUtil.toJsonStr(taskMessage);
//        System.out.println(message);
        myMessageProducer.sendMessage("code_exchange", "my_routingKey_test", message);
        // 等待接收方处理完毕
//        Long newVersion = null;
        while (!existingRecord.getVersion().equals(version + 1)) {
            existingRecord = questionTestMapper.selectById(id);
        }
        return existingRecord;

 接收方:

Map<String, Object> taskMessage = JSONUtil.toBean(message, Map.class);
            Long questionId = Long.valueOf(taskMessage.get("questionId") != null ? String.valueOf(taskMessage.get("questionId")) : "0");
            Long id = Long.valueOf(taskMessage.get("id") != null ? String.valueOf(taskMessage.get("id")) : "0");
            Long userId = Long.valueOf(taskMessage.get("userId") != null ? String.valueOf(taskMessage.get("userId")) : "0");
            String codeLanguage = String.valueOf(taskMessage.get("codeLanguage"));
            String userCode = String.valueOf(taskMessage.get("userCode"));
            String input = String.valueOf(taskMessage.get("input"));
            String answer = String.valueOf(taskMessage.get("answer"));
            Long version = Long.valueOf(taskMessage.get("version") != null ? String.valueOf(taskMessage.get("version")) : "0");
//            System.out.println(answer);
            // 使用正确代码生成预期结果
            String output = judgeService.dojudgeTestCode(answer, input, codeLanguage);
            // 使用用户代码生成实际结果
            String predicted = judgeService.dojudgeTestCode(userCode, input, codeLanguage);
            JudgeCase currentJudgeCase = new JudgeCase();
            currentJudgeCase.setInput(input);
            currentJudgeCase.setOutput(output);
            JudgeCase userJudgeCase = new JudgeCase();
            userJudgeCase.setInput(input);
            userJudgeCase.setOutput(predicted);
            log.info("测试任务处理成功,Id = {}", id);
            // 5. 将结果存储到数据库或缓存中
            // 修保存到数据库中的测试结果
            version = version + 1;
            QuestionTest questionTest = new QuestionTest();
            questionTest.setVersion(version);
            questionTest.setId(id);
            questionTest.setQuestionId(questionId);
            questionTest.setUserId(userId);
            questionTest.setUserCode(userCode);
            questionTest.setCodeLanguage(codeLanguage);
            questionTest.setCurrentJudgeCase(JSONUtil.toJsonStr(currentJudgeCase));
            questionTest.setUserJudgeCase(JSONUtil.toJsonStr(userJudgeCase));
//            System.out.println("QuestionTest:"+questionTest);
            questionFeignClient.updateQuestionTestById(questionTest);
            // 6. 手动确认消息
            channel.basicAck(deliveryTag, false);