问题描述:当数据生成速度过快,导致 WebSocket 发送不及,数据发送顺序混乱
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Component;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import java.util.concurrent.LinkedBlockingQueue;
@EnableAsync
@Slf4j
@Component
@RequiredArgsConstructor
public class WebsocketServiceSendMessage {
private final WebsocketService websocketService;
// 定义一个阻塞队列用于存储消息
private final LinkedBlockingQueue<Message> messageQueue = new LinkedBlockingQueue<>();
@PostConstruct
private void startMessageProcessingThread() {
Thread messageProcessingThread = new Thread(() -> {
try {
while (true) {
// 从队列中取出消息,如果队列为空则阻塞等待
Message message = messageQueue.take();
try {
// 调用 websocketService 发送消息
websocketService.sendMessage(message);
} catch (Exception e) {
// 记录发送消息时的异常
log.error("Failed to send WebSocket message: {}", message, e);
}
}
} catch (Exception e) {
// 恢复中断状态
Thread.currentThread().interrupt();
}
});
// 设置线程为守护线程
messageProcessingThread.setDaemon(true);
// 启动线程
messageProcessingThread.start();
}
/**
* 发送WebSocket消息
*
* @param type 消息类型
* @param message 消息内容
*/
@SneakyThrows
@Async
public void sendData(String type, String message) {
// 创建消息对象
Message msg = Message.builder().type(type).message(message).build();
// 将消息添加到队列中
messageQueue.put(msg);
}
}