添加websocket
的依赖
<!-- SpringBoot Websocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
修改默认的数据传输大小及会话超时
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
@Bean
public ServletServerContainerFactoryBean servletServerContainerFactoryBean() {
ServletServerContainerFactoryBean factoryBean = new ServletServerContainerFactoryBean();
// 1M
factoryBean.setMaxTextMessageBufferSize(1024 * 1024);
factoryBean.setMaxBinaryMessageBufferSize(1024 * 1024);
// 30 分钟
factoryBean.setMaxSessionIdleTimeout(1000 * 60 * 30L);
return factoryBean;
}
}
websocket
服务端
@Slf4j
@Component
@ServerEndpoint(value = "/websocket/{clientId}", encoders = {WebSocketObjectEncoder.class})
public class WebSocketHandler {
private RedisService redisService = SpringUtils.getBean(RedisService.class);
private static final ConcurrentMap<String, Session> SESSION_MAP = new ConcurrentHashMap<>();
private String clientId;
@OnOpen
public void onOpen(Session session, @PathParam("clientId") String clientId) throws IOException {
String sessionId = session.getId();
log.info("onOpen sessionId: {}, clientId: {}", sessionId, clientId);
this.clientId = clientId;
// 增加权限校验
boolean res = this.validateToken(session.getRequestParameterMap());
if (!res) {
// 未校验通过直接断开
session.close(new CloseReason(CloseReason.CloseCodes.PROTOCOL_ERROR, "非法请求"));
return;
}
SESSION_MAP.put(clientId, session);
this.sendText(session, String.format("客户端【%s】已连接", clientId));
}
@OnMessage
public void onMessage(Session session, @PathParam("clientId") String clientId, String message) {
String sessionId = session.getId();
log.info("onMessage sessionId:{}, clientId:{}, message:{}", sessionId, clientId, message);
this.sendText(session, String.format("客户端【%s】消息已收到", clientId));
}
@OnClose
public void onClose(Session session) {
log.info("sessionId-{} onClose ...", session.getId());
SESSION_MAP.remove(this.clientId);
}
@OnError
public void onError(Session session, Throwable throwable) throws IOException {
log.error("Error for session " + session.getId() + " : " + throwable.getMessage());
if (session.isOpen()) {
session.close(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, throwable.getMessage()));
}
SESSION_MAP.remove(this.clientId);
}
/**
* 检验客户端身份
*
* @param params
* @return
*/
private boolean validateToken(Map<String, List<String>> params) {
try {
boolean res = true;
String token = params.getOrDefault("token", new ArrayList<>()).get(0);
if (StringUtils.isBlank(token)) {
res = false;
}
Claims claims = JwtUtils.parseToken(token);
String jwt_claims_key = "user_id";
if (claims == null || !claims.containsKey(jwt_claims_key)) {
res = false;
}
// RedisKey.USER_LOGIN_TOKEN = "user_login_token:%s"
String redisKey = String.format(RedisKey.USER_LOGIN_TOKEN, JwtUtils.getUserId(claims));
if(!redisService.hasKey(redisKey)){
res = false;
}
return res;
} catch (Exception e) {
log.info("WebSocket token validate error: {}", e.getMessage());
}
return false;
}
/**
* 发送消息,需要对Object进行序列化,所以WebSocketObjectEncoder是必须的
* 或者直接在这里使用JSONObject.toJSONString(obj) 也是可以的
*
* @param clientId
* @param data
*/
public void sendMessage(String clientId, Object data) {
try {
if (SESSION_MAP.containsKey(clientId)) {
SESSION_MAP.get(clientId).getBasicRemote().sendObject(data);
}
} catch (Exception e) {
log.error("sendMessage error:{}", e.getMessage());
}
}
/**
* 发送文本消息
*
* @param session
* @param message
*/
private void sendText(Session session, String message) {
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
e.printStackTrace();
}
}
}
序列化工具 WebSocketObjectEncoder
public class WebSocketObjectEncoder implements Encoder.Text<Object> {
@Override
public String encode(Object obj) throws EncodeException {
return JSONObject.toJSONString(obj);
}
@Override
public void init(EndpointConfig endpointConfig) {
}
@Override
public void destroy() {
}
}
在gateway中配置路由
spring:
cloud:
gateway:
discovery:
locator:
lowerCaseServiceId: true
enabled: false
routes:
# WEBSOCKET服务
- id: websocket
uri: lb:ws://websocket
predicates:
- Path=/websocket/**
# 安全配置
security:
# 不校验白名单
ignore:
whites:
- /websocket/**
测试
使用Postman
,点击左上角的Menu
- File
- New...
找到WebSocket
url
中填写ws://localhost/websocket/{clientId}?token={token}
,如果是使用ssl证书的域名,则填写wss://www.xxx.com/websocket/{clientId}?token={token}
如果能正常接收到服务端返回的消息说明连接成功
如果token
校验错误则会立即断开连接,点击右边的箭头可以查看具体异常信息