使用消息队列解决webSoket推送数据无序的问题

发布于:2025-02-12 ⋅ 阅读:(15) ⋅ 点赞:(0)

问题描述:当数据生成速度过快,导致 WebSocket 发送不及,数据发送顺序混乱 

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Component;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;

import java.util.concurrent.LinkedBlockingQueue;

@EnableAsync
@Slf4j
@Component
@RequiredArgsConstructor
public class WebsocketServiceSendMessage {

    private final WebsocketService websocketService;

    // 定义一个阻塞队列用于存储消息
    private final LinkedBlockingQueue<Message> messageQueue = new LinkedBlockingQueue<>();


    @PostConstruct
    private void startMessageProcessingThread() {
        Thread messageProcessingThread = new Thread(() -> {
            try {
                while (true) {
                    // 从队列中取出消息,如果队列为空则阻塞等待
                    Message message = messageQueue.take();
                    try {
                        // 调用 websocketService 发送消息
                        websocketService.sendMessage(message);
                    } catch (Exception e) {
                        // 记录发送消息时的异常
                        log.error("Failed to send WebSocket message: {}", message, e);
                    }
                }
            } catch (Exception e) {
                // 恢复中断状态
                Thread.currentThread().interrupt();
            }
        });
        // 设置线程为守护线程
        messageProcessingThread.setDaemon(true);
        // 启动线程
        messageProcessingThread.start();
    }

    /**
     * 发送WebSocket消息
     *
     * @param type    消息类型
     * @param message 消息内容
     */
    @SneakyThrows
    @Async
    public void sendData(String type, String message) {
        // 创建消息对象
        Message msg = Message.builder().type(type).message(message).build();
        // 将消息添加到队列中
        messageQueue.put(msg);
    }
}