需求:
最近在对接一个物联网里设备,他的通信方式是 websocket 。所以我需要在 springboot框架中集成websocket 依赖,从而实现与设备实时通信!
框架:springboot2.7
java版本:java8
好了,还是直接上代码
第一步:引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
第二步写配置:
package com.agentai.base.config;
import com.agentai.base.yumou.webSocket.YuMouDeviceWebSocketHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean;
/**
* WebSocket配置类
* 负责配置WebSocket服务器和注册WebSocket处理器
*/
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
// 注册WebSocket处理器,
// 允许所有来源的跨域请求
registry.addHandler(deviceWebSocketHandler(), "/linker-dev")
.setAllowedOrigins("*");
}
@Bean
public YuMouDeviceWebSocketHandler deviceWebSocketHandler() {
return new YuMouDeviceWebSocketHandler();
}
@Bean
public ServletServerContainerFactoryBean createWebSocketContainer() {
ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
// 设置消息缓冲区大小
container.setMaxTextMessageBufferSize(8192);
container.setMaxBinaryMessageBufferSize(8192);
// 设置会话超时时间(毫秒)
container.setMaxSessionIdleTimeout(60000L);
return container;
}
}
第三方:WebSocket会话管理器
package com.agentai.base.yumou.webSocket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import java.io.IOException;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* WebSocket会话管理器
* 负责管理所有WebSocket会话,包括会话状态跟踪、心跳检测和清理过期会话
*/
@Slf4j
public class WebSocketSessionManager {
// 心跳超时限制(毫秒)
private static final long HEARTBEAT_TIMEOUT = 30000;
// 心跳检查间隔(毫秒)
private static final long HEARTBEAT_CHECK_INTERVAL = 10000;
// 心跳消息内容
private static final String HEARTBEAT_MESSAGE = "{\"type\":\"ping\"}";
// 会话信息,包含WebSocket会话和最后活动时间
private static class SessionInfo {
WebSocketSession session;
long lastActiveTime;
SessionInfo(WebSocketSession session) {
this.session = session;
this.lastActiveTime = Instant.now().toEpochMilli();
}
void updateLastActiveTime() {
this.lastActiveTime = Instant.now().toEpochMilli();
}
}
// 保存所有会话信息
private final Map<String, SessionInfo> sessions = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
public WebSocketSessionManager() {
// 启动心跳检查任务
scheduler.scheduleAtFixedRate(this::checkHeartbeats,
HEARTBEAT_CHECK_INTERVAL, HEARTBEAT_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
}
/**
* 添加新的会话
* @param session 新的WebSocket会话
*/
public void addSession(WebSocketSession session) {
sessions.put(session.getId(), new SessionInfo(session));
log.info("新会话已添加: {}", session.getId());
}
/**
* 移除会话
* @param sessionId 会话ID
*/
public void removeSession(String sessionId) {
sessions.remove(sessionId);
log.info("会话已移除: {}", sessionId);
}
/**
* 更新会话最后活动时间
* @param sessionId 会话ID
*/
public void updateSessionActivity(String sessionId) {
SessionInfo info = sessions.get(sessionId);
if (info != null) {
info.updateLastActiveTime();
}
}
/**
* 发送消息到指定会话
* @param sessionId 会话ID
* @param message 消息内容
* @return 是否发送成功
*/
public boolean sendMessage(String sessionId, String message) {
SessionInfo info = sessions.get(sessionId);
if (info != null && info.session.isOpen()) {
try {
info.session.sendMessage(new TextMessage(message));
return true;
} catch (IOException e) {
log.error("发送消息到会话[{}]失败: {}", sessionId, e.getMessage());
}
}
return false;
}
/**
* 广播消息到所有会话
* @param message 消息内容
*/
public void broadcastMessage(String message) {
sessions.forEach((sessionId, info) -> {
if (info.session.isOpen()) {
try {
info.session.sendMessage(new TextMessage(message));
} catch (IOException e) {
log.error("广播消息到会话[{}]失败: {}", sessionId, e.getMessage());
}
}
});
}
/**
* 检查心跳并清理过期会话
*/
private void checkHeartbeats() {
long now = Instant.now().toEpochMilli();
sessions.forEach((sessionId, info) -> {
if (now - info.lastActiveTime > HEARTBEAT_TIMEOUT) {
try {
// 发送心跳消息
info.session.sendMessage(new TextMessage(HEARTBEAT_MESSAGE));
log.debug("发送心跳到会话: {}", sessionId);
} catch (IOException e) {
// 如果发送失败,关闭并移除会话
log.warn("会话[{}]心跳检测失败,关闭会话: {}", sessionId, e.getMessage());
try {
info.session.close();
} catch (IOException ex) {
log.error("关闭会话[{}]失败: {}", sessionId, ex.getMessage());
}
removeSession(sessionId);
}
}
});
}
/**
* 关闭会话管理器
*/
public void shutdown() {
scheduler.shutdown();
sessions.forEach((sessionId, info) -> {
try {
info.session.close();
} catch (IOException e) {
log.error("关闭会话[{}]失败: {}", sessionId, e.getMessage());
}
});
sessions.clear();
}
}
第四步:设备WebSocket处理器
package com.agentai.base.yumou.webSocket;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.socket.BinaryMessage;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
/**
* 设备WebSocket处理器
* 负责处理设备的WebSocket连接、消息接收和断开连接
*/
@Slf4j
public class YuMouDeviceWebSocketHandler extends TextWebSocketHandler {
private final WebSocketSessionManager sessionManager;
// 构造函数,初始化会话管理器
public YuMouDeviceWebSocketHandler() {
this.sessionManager = new WebSocketSessionManager();
}
/**
* WebSocket连接建立后的处理
* @param session WebSocket会话
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) {
// 将新会话添加到会话管理器
String sessionId = session.getId();
sessionManager.addSession(session);
log.info("WebSocket连接已建立: {}", sessionId);
}
@Autowired
YuMouService yuMouService;
/**
* 处理接收到的文本消息
* @param session 当前会话
* @param message 接收到的文本消息
*/
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
String payload = message.getPayload();
String sessionId = session.getId();
try {
// 更新会话的活动时间
sessionManager.updateSessionActivity(sessionId);
log.info("接收到设备[{}]的文本消息: {}", sessionId, payload);
JSONObject jsonObject = JSONObject.parseObject(payload);
log.info("数据:", jsonObject );
// 处理其他业务消息
// TODO: 添加具体的业务消息处理逻辑
} catch (Exception e) {
log.error("处理设备[{}]消息时发生错误: {}", sessionId, e.getMessage());
}
}
/**
* 处理接收到的二进制消息
* @param session 当前会话
* @param message 接收到的二进制消息
*/
@Override
protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) {
byte[] payload = message.getPayload().array();
String sessionId = session.getId();
log.info("接收到设备[{}]的二进制消息,长度: {} 字节", sessionId, payload.length);
// 目前只打印消息长度,可以根据需求处理二进制数据
// TODO: 添加二进制消息处理逻辑
}
/**
* 处理传输错误
* @param session 当前会话
* @param exception 错误异常
*/
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) {
String sessionId = session.getId();
log.error("设备[{}]连接传输错误: {}", sessionId, exception.getMessage());
}
/**
* WebSocket连接关闭后的处理
* @param session 当前会话
* @param status 关闭状态
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
String sessionId = session.getId();
sessionManager.removeSession(sessionId);
log.info("设备[{}]WebSocket连接已关闭,状态码: {}", sessionId, status.getCode());
}
/**
* 发送消息到指定会话
* @param sessionId 会话ID
* @param message 消息内容
* @return 是否发送成功
*/
public boolean sendMessage(String sessionId, String message) {
return sessionManager.sendMessage(sessionId, message);
}
/**
* 广播消息到所有连接的会话
* @param message 消息内容
*/
public void broadcastMessage(String message) {
sessionManager.broadcastMessage(message);
}
/**
* 关闭WebSocket处理器,清理资源
*/
public void shutdown() {
sessionManager.shutdown();
}
}