入门websocket的基础应该掌握一下问题:
1、什么是握手?
2、什么是websocket?
3、websocket和http的区别,应用场景
4、websocket的使用
5、使用vue+elementui打造简单聊天室
6、使用websocket进行信息实时推送,使用mysql数据库进行存储(若依Ruoyi+websocket)
7、【进阶】使用STOMP(websocket的高级封装)-配合Spring-security服用
8、【进阶】整合rabbitMQ进行信息处理
1、什么是握手?
“握手”在计算机网络中是一个比喻性的术语,用来描述两个设备或程序在通信开始前,互相确认彼此身份、能力和准备状态的过程,就像现实中人们见面时通过“握手”表示友好和确认一样。
简单来说就是:
握手 = 建立通信前的“问好+确认”步骤
目的:确保双方都准备好、安全、可靠地通信。
什么是TCP的三次握手?
“三次握手”是指在建立 TCP连接 时,客户端与服务器之间进行的三个步骤,用于确保双方都能正常发送和接收数据。这是 TCP 协议中非常基础而重要的概念。
为什么要三次握手?
主要目的是为了 确保双方都具备发送和接收能力,并为数据传输建立可靠的连接。
三次握手的详细解析
可以将客户端理解成浏览器,服务器就是后端
客户端 服务器
| |
| --------------------- SYN, Seq = 100 -----------> | 第一次握手
| |
| <--- SYN+ACK, Seq = 200, Ack = 101 ---> | 第二次握手
| |
| ------ ACK, Seq = 101, Ack = 201 -----------> | 第三次握手
| |
模拟的三次抓包详细
第一次握手:客户端 → 服务器
客户端向服务器发送一个 SYN(同步)包,表示希望建立连接。
包含客户端的初始序列号(
Seq = 100
)。
第二次握手:服务器 → 客户端
服务器收到 SYN 后,回复一个 SYN+ACK 包。
表示“我同意建立连接”,并告诉客户端自己的初始序列号(
Seq = 200
)。同时对客户端的 SYN 进行确认(
Ack = 101
)。
第三次握手:客户端 → 服务器
客户端收到 SYN+ACK 后,再发送一个 ACK 包,表示连接建立完成。
确认服务器的序列号(Seq=101,
Ack = 201
)。
现实生活中的例子
小陈:你好,我是 小陈,能听见我吗?(SYN)
小霜:你好 小陈,我是 小霜,能听见你,我这边也能说话,你能听我吗?(SYN+ACK)
小陈:能听见,咱们开始说话吧!(ACK)
2、什么是websocket?
websocket简介
WebSocket 是一种在 单个 TCP 连接上进行全双工通信的协议,它被设计为在客户端(通常是浏览器)和服务器之间建立持久连接,从而实现实时通信。
websocket应用场景
场景:当我们想做一个聊天室功能,建立一个聊天信息表,发信息(新增)的时候,都会执行一次查询聊天信息表,如果想获取到别人发的信息,是不是就要再查询一次数据库,获取到最新的信息?使用普通的http请求来做的话,要么点刷新、要么就做轮询(每几秒查询一次数据库执行查询操作)这样显得非常繁琐,且极大消耗服务器资源(反复的发请求会消耗带宽)。这时候我们就需要websocket来进行全双工通信。
websocket工作原理
WebSocket 使用 HTTP 协议进行 一次性握手,然后升级为 WebSocket 协议(握手升级协议)
客户端(浏览器)发送握手请求
GET /chat HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
服务器(后端)相应握手成功:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
3、websocket和http的区别
对比维度 | HTTP | WebSocket |
---|---|---|
协议层 | 应用层协议(基于 TCP) | 应用层协议(基于 TCP) |
设计目的 | 客户端发起请求,获取数据 | 实时、低延迟、双向通信 |
通信模式 | 单向通信(客户端发起,请求-响应) | 双向通信(全双工,任意一方可主动发送) |
连接方式 | 短连接(默认),可使用 Keep-Alive | 长连接(握手成功后持续存在) |
连接建立 | 直接建立 HTTP 请求 | 先通过 HTTP 握手,然后升级协议 |
协议升级 | 不支持 | 通过 HTTP 101 Switching Protocols 升级 |
数据传输格式 | 文本(HTML、JSON、XML),需携带完整头部 | 轻量帧结构,支持文本和二进制(Blob、ArrayBuffer) |
头部开销 | 每次请求都发送完整 HTTP Header(较大) | 初始握手有一次 HTTP Header,之后数据帧头部极小 |
服务端主动推送 | 不支持 | 支持(服务端可随时向客户端发送消息) |
实时性 | 差(依赖轮询或长轮询) | 高(事件驱动、低延迟) |
资源消耗 | 轻量(请求结束即释放连接) | 高(每个客户端都占用一个 TCP 连接) |
并发能力 | 强,连接瞬时断开 | 并发多时需优化连接管理与线程池 |
跨域支持 | 同源策略限制较多 | 可跨域连接,需配置服务端 |
安全传输 | HTTPS(HTTP over TLS) | WSS(WebSocket over TLS) |
认证机制 | 基于 Cookie、Token、HTTP Header | 通常通过 URL 参数或连接后的自定义认证逻辑 |
浏览器支持 | 所有浏览器 | 现代浏览器均已支持(IE10+、Chrome、Firefox、Safari 等) |
常见应用场景 | 表单提交、文件下载、网页加载、RESTful API | 聊天室、推送通知、在线游戏、实时数据监控、协作编辑等 |
示例协议头 | GET /api/data HTTP/1.1 |
Upgrade: websocket Connection: Upgrade |
4、websocket的使用
一、maven引入websocket
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
二、新建websocketHandler(websocket的处理类)
websocketHandler的使用通常有两种用法
一种是实现 WebSocketHandler,一种是继承 TextWebSocketHandler
WebSocketHandler 是一个底层接口,你必须实现所有的方法,一般是实现对websocket的细节有更强的控制需求。
@Component
public class ConnectWebsocketHandler implements WebSocketHandler {
@Override
// 连接建立时触发
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
System.out.println("连接建立成功:"+session.getId());
}
@Override
// 接收到消息时触发
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
System.out.println("收到消息: " + message.getPayload());
}
@Override
// 出现异常时触发
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
System.out.println("发生错误: " + exception.getMessage());
}
@Override
// 连接关闭时触发
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
System.out.println("连接关闭: " + session.getId());
}
@Override
// 是否支持部分消息
public boolean supportsPartialMessages() {
return false;//默认返回false
}
}
TextWebSocketHandler 是一个简化版的实现类,适用于只处理文本信息的场景
public class ConnectWebsocketHandler extends TextWebSocketHandler {
@Override
public void afterConnectionEstablished(WebSocketSession session) {
// 连接建立后
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
// 只处理文本消息
}
}
应用场景对比
特性 | WebSocketHandler (接口) |
TextWebSocketHandler (类) |
---|---|---|
复杂度 | 高,需要实现全部方法 | 低,继承并重写想用的方法 |
消息支持 | 文本、二进制、分片都可以 | 仅支持文本消息 |
推荐场景 | 高级控制,处理多类型消息 | 聊天系统、通知推送等 |
三、配置websocket
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
//连接websocket测试
registry.addHandler(new ConnectWebsocketHandler(), "/websocket")
.setAllowedOrigins("*"); // 允许跨域
// //聊天室
// registry.addHandler(new ChatRoomWebsocketHandler(), "/chatRoom")
// .setAllowedOrigins("*"); // 允许跨域
//
// //消息推送
// registry.addHandler(new PushMessageWebsocketHandler(), "/pushMessage")
// .setAllowedOrigins("*"); // 允许跨域
}
}
给websocket注册处理器,处理的接口为 "/websocket"。
ConnectWebsocketHandler 是使用了第一种方法,实现了websocketHandler,方便理解,如果没有复杂业务,直接继承TextWebSocketHandler就可以。
@Component
public class ConnectWebsocketHandler implements WebSocketHandler {
@Override
// 连接建立时触发
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
System.out.println("连接建立成功:"+session.getId());
}
@Override
// 接收到消息时触发
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
System.out.println("收到消息: " + message.getPayload());
}
@Override
// 出现异常时触发
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
System.out.println("发生错误: " + exception.getMessage());
}
@Override
// 连接关闭时触发
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
System.out.println("连接关闭: " + session.getId());
}
@Override
// 是否支持部分消息
public boolean supportsPartialMessages() {
return false;//默认返回false
}
}
项目结构目录截图
前端代码
<template>
<div style="padding: 20px">
<el-input v-model="message" placeholder="输入消息" style="width: 300px" @keyup.enter.native="send" />
<el-button type="primary" @click="send">发送</el-button>
<el-button type="success" @click="connect" >连接websocket</el-button>
<el-button type="danger" @click="disconnect">断开websocket</el-button>
<el-divider>消息记录</el-divider>
<div v-for="(msg, index) in messages" :key="index">{{ msg }}</div>
</div>
</template>
<script>
export default {
data() {
return {
ws: null,
message: '',
messages: []
};
},
mounted() {
this.connect();
},
beforeDestroy() {
if (this.ws) this.ws.close();
},
methods: {
connect() {
this.ws = new WebSocket('ws://localhost:8080/websocket');
this.ws.onopen = () => {
this.messages.push('WebSocket连接成功');
};
this.ws.onmessage = (event) => {
this.messages.push('收到:' + event.data);
};
this.ws.onclose = () => {
this.messages.push('WebSocket连接关闭');
};
this.ws.onerror = (err) => {
this.messages.push('WebSocket连接异常');
console.error(err);
};
},
disconnect() {
if (this.ws) {
this.ws.close(1000, "用户主动断开");
this.ws = null;
}
},
send() {
if (this.ws && this.message) {
this.ws.send(this.message);
this.messages.push('发送:' + this.message);
this.message = '';
}else{
alert("未连接websocket或消息为空")
}
}
}
};
</script>
运行结果
后台执行结果
5、实现聊天室功能
效果图:
为了更深的了解websocket,接下来准备使用websocket常用的方法来实现下面的功能清单
功能清单:
功能模块 | 描述 |
---|---|
🧑🤝🧑 在线用户列表 | 实时显示当前在线用户 |
💬 公聊(群聊) | 所有人共享一个频道,任何人发送的消息都广播给所有人 |
💌 私聊(点对点聊天) | 用户之间一对一聊天,消息只发送给指定接收者 |
🛎️ 通知广播 | 系统广播(如:某人上线、下线、踢人) |
🔒 用户鉴权 | 用户登录后建立 WebSocket 连接,鉴权校验(如 token) |
🔁 重连机制 | 断线后自动重连,保持体验流畅 |
❤️ 心跳机制 | 保活机制,防止连接超时关闭 |
🧾 消息已读/未读 | 显示消息状态,比如是否已读、已送达 |
需要用到的机制:
WebSocket机制 | 用于支撑哪些功能 |
---|---|
onopen |
建立连接后触发,通常用于用户上线通知、鉴权 |
onmessage |
收发消息的核心方法(群聊、私聊、系统通知) |
onclose |
用户下线、主动退出、断线等清理操作 |
onerror |
连接异常处理 |
send() |
客户端主动发送消息(聊天、心跳) |
close() |
主动断开连接(退出聊天室) |
广播推送机制 | 服务端将消息同时发送给所有连接的用户 |
用户会话管理 | 服务端需要保存每个用户的 WebSocketSession ,支持点对点通信 |
心跳机制 | 定时 ping/pong 保持连接(客户端或服务端主动发心跳) |
有两个必须要理解和掌握的接口和类:WebSocketSession 和 TextMessage
我们查看源码:
WebSokcetSession
解析:
方法/属性 | 类型 | 说明 |
---|---|---|
getId() |
String |
会话的唯一标识符,Spring 自动生成(如 "1a2b3c..." ) |
getUri() |
URI |
客户端连接时的 URI,通常是 /chat 这类路径 |
getPrincipal() |
Principal |
获取当前用户的认证信息(如登录用户)— 需要集成 Spring Security |
getAttributes() |
Map<String, Object> |
存放连接时设置的自定义属性,可在 HandshakeInterceptor 中初始化 |
getHandshakeHeaders() |
HttpHeaders |
获取握手时的请求头信息(例如 Token、Cookie) |
isOpen() |
boolean |
当前连接是否打开 |
sendMessage(WebSocketMessage<?> message) |
void |
向客户端发送消息(常用) |
close() |
void |
主动关闭连接 |
getRemoteAddress() |
InetSocketAddress |
获取客户端的 IP 地址和端口 |
getLocalAddress() |
InetSocketAddress |
获取本地服务端地址 |
getTextMessageSizeLimit() |
int |
文本消息大小限制(单位:字节) |
setTextMessageSizeLimit(int limit) |
void |
设置文本消息大小限制 |
getBinaryMessageSizeLimit() |
int |
二进制消息大小限制 |
setBinaryMessageSizeLimit(int limit) |
void |
设置二进制消息大小限制 |
在WebSocketConfig中新增配置
//聊天室 -- sessionId版
registry.addHandler(new ChatRoomSessionIdWebsocketHandler(), "/websocket/chatRoomSessionId")
.setAllowedOrigins("*"); // 允许跨域
新增handler类ChatRoomSessionIdWebsocketHandler
@Component
public class ChatRoomSessionIdWebsocketHandler extends TextWebSocketHandler {
Map<String, WebSocketSession> userSessions = new ConcurrentHashMap<>();
@Override
// 连接建立时触发
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
//连接成功后,加入集合
WebSocketSessionUtil.addSession(session);
System.out.println("连接建立成功:"+session.getId());
System.out.println("当前在线人数:"+WebSocketSessionUtil.getOnlineCount());
WebSocketSessionUtil.getAllSessions().forEach(s -> {
System.out.println("在线SessionID:"+s.getId());
});
broadcastMessage("【广播信息】"+"["+session.getId()+"]"+"-连接成功");
// 推送最新用户列表
broadcastUserList();
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
// 断开连接时移除
WebSocketSessionUtil.removeSession(session);
System.out.println("连接关闭:" + session.getId());
System.out.println("当前在线人数:"+WebSocketSessionUtil.getOnlineCount());
WebSocketSessionUtil.getAllSessions().forEach(s -> {
System.out.println("在线SessionID:"+s.getId());
});
broadcastMessage("【广播信息】"+"["+session.getId()+"]"+"-退出连接");
// 推送最新用户列表
broadcastUserList();
}
@Override
public void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
JSONObject json = JSON.parseObject(message.getPayload());
String type = json.getString("type");
String content = json.getString("content");
String fromUserId = session.getId();
System.out.println(session.getId() + " 发送消息:" + content);
if ("broadcast".equals(type)) {
// 广播给所有在线用户
for (WebSocketSession s : WebSocketSessionUtil.getAllSessions()) {
if (s.isOpen()) {
s.sendMessage(new TextMessage("[群聊] " + fromUserId + ":" + content));
}
}
} else if ("private".equals(type)) {
String toUserId = json.getString("toUserId");
WebSocketSession toSession = WebSocketSessionUtil.getSession(toUserId);
if (toSession != null && toSession.isOpen()) {
toSession.sendMessage(new TextMessage("[私聊] " + fromUserId + ":" + content));
} else {
session.sendMessage(new TextMessage("用户 " + toUserId + " 不在线"));
}
}
}
private void broadcastUserList() {
List<String> onlineUsers = WebSocketSessionUtil.getAllSessions().stream()
.map(WebSocketSession::getId) // 你可以替换成用户ID(如 session.getAttributes().get("userId"))
.collect(Collectors.toList());
JSONObject json = new JSONObject();
json.put("type", "userList");
json.put("users", onlineUsers);
TextMessage userListMessage = new TextMessage(json.toJSONString());
for (WebSocketSession s : WebSocketSessionUtil.getAllSessions()) {
if (s.isOpen()) {
try {
s.sendMessage(userListMessage);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
private void broadcastMessage(String message) {
for (WebSocketSession s : WebSocketSessionUtil.getAllSessions()) {
if (s.isOpen()) {
try {
s.sendMessage(new TextMessage(message));
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
前端目录:
前端代码
<template>
<div style="padding: 20px">
<el-input v-model="toUserId" placeholder="接收用户ID(留空表示群发)" style="width: 300px; margin-bottom: 10px" />
<el-input v-model="message" placeholder="输入消息" style="width: 300px" @keyup.enter.native="send" />
<el-button type="primary" @click="send">发送</el-button>
<el-button type="success" @click="connect">连接WebSocket</el-button>
<el-button type="danger" @click="disconnect">断开WebSocket</el-button>
<el-divider>在线用户</el-divider>
<div v-if="userList.length === 0">暂无在线用户</div>
<el-tag
v-for="user in userList"
:key="user"
type="info"
@click="selectUser(user)"
>
{{ user }}
</el-tag>
<el-divider>消息记录</el-divider>
<div v-for="(msg, index) in messages" :key="index">{{ msg }}</div>
</div>
</template>
<script>
export default {
name: 'WebSocketChatRoomSessionId',
data() {
return {
ws: null,
message: '',
messages: [],
toUserId: '',
userList: []
};
},
mounted() {
this.connect();
},
beforeDestroy() {
if (this.ws) this.ws.close();
},
methods: {
connect() {
this.ws = new WebSocket('ws://localhost:8080/websocket/chatRoomSessionId');
this.ws.onopen = () => {
this.messages.push('✅ WebSocket 连接成功');
};
this.ws.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
if (data.type === 'userList') {
this.userList = data.users;
} else if (data.type === 'message') {
this.messages.push(`[${data.fromUserId || '系统'}]: ${data.content}`);
}
} catch (e) {
this.messages.push(event.data);
}
};
this.ws.onclose = () => {
this.messages.push('❌ WebSocket 连接关闭');
};
this.ws.onerror = (err) => {
this.messages.push('⚠️ WebSocket 连接异常');
console.error(err);
};
},
disconnect() {
if (this.ws) {
this.ws.close(1000, "用户主动断开");
this.ws = null;
}
},
send() {
if (this.ws && this.message) {
const payload = {
type: this.toUserId ? 'private' : 'broadcast',
toUserId: this.toUserId || null,
content: this.message
};
this.ws.send(JSON.stringify(payload));
this.message = '';
} else {
alert("未连接 WebSocket 或消息为空");
}
},
selectUser(userId) {
this.toUserId = userId;
}
}
};
</script>
6、实现消息推送功能
来了,来了,大家做系统应该是最关心这个功能。
【思路】
需求:对全系统【所有的业务操作】进行消息推送,有【群发】、【私发】功能、处理【消息状态(未读/已读)】,websocket持续链接防止因其他故障中断【心跳机制】
【后端篇】
1、确定自己系统的需求,先做数据表
通过代码生成,对后续推送的信息进行保存,通过is_read字段来对消息进行已读未读操作
添加mapper
/**
* 设为已读
* @param id 消息的id
* @return 结果
* */
public int updateWbNoticeMessageReadStatus(Long id);
添加service
/**
* 设为已读
* @param id 消息的id
* @return 结果
* */
public int updateWbNoticeMessageReadStatus(Long id);
添加serviceImpl
/**
* 更新消息的阅读状态
* @param id 消息的id
* @return
*/
@Override
public int updateWbNoticeMessageReadStatus(Long id) {
return wbNoticeMessageMapper.updateWbNoticeMessageReadStatus(id);
}
添加mapper.xml下的方法
<update id="updateWbNoticeMessageReadStatus" parameterType="Long">
update wb_notice_message
set is_read = '1'
where id = #{id}
</update>
2、明确websocket链接
消息的推送,肯定是有推送人和被推送人,根据如何获取这些数据来确定你的websocket链接
// const token // 需要鉴权
const currentUserId = this.$store.state.user.id;
const currentUserNickName = this.$store.state.user.nickName;
const wsUrl = `ws://localhost:8080/websocket/pushMessage?userId=${currentUserId}&nickName=${currentUserNickName}`; // 替换为你的 WebSocket 地址
this.socket = new WebSocket(wsUrl);
这是我的websocket链接,可以看出我是通过前端拼接的userId和userName来获取到推送人信息的。
ps:实际开发过程中最好是通过token来获取,并解析出用户,进行后续的操作,此处是为了方便理解和通用
3、配置WebSocketConfig
package com.ruoyi.websocket.config;
import com.ruoyi.websocket.handler.ChatRoomSessionIdWebsocketHandler;
import com.ruoyi.websocket.handler.ChatRoomUserIdWebsocketHandler;
import com.ruoyi.websocket.handler.ConnectWebsocketHandler;
import com.ruoyi.websocket.handler.PushMessageWebsocketHandler;
import com.ruoyi.websocket.interceptor.WebSocketInterceptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.*;
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Autowired
private PushMessageWebsocketHandler pushMessageWebsocketHandler;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
//连接websocket测试
registry.addHandler(new ConnectWebsocketHandler(), "/websocket")
.setAllowedOrigins("*"); // 允许跨域
//聊天室 -- sessionId版
registry.addHandler(new ChatRoomSessionIdWebsocketHandler(), "/websocket/chatRoomSessionId")
.setAllowedOrigins("*"); // 允许跨域
//聊天室 -- UserId版
registry.addHandler(new ChatRoomUserIdWebsocketHandler(), "/websocket/chatRoomUserId")
.addInterceptors(new WebSocketInterceptor())//拦截器用来获取前端传递过来的userid
.setAllowedOrigins("*"); // 允许跨域
//消息推送
registry.addHandler(pushMessageWebsocketHandler, "/websocket/pushMessage")
.addInterceptors(new WebSocketInterceptor())//拦截器用来获取前端传递过来的userid
.setAllowedOrigins("*"); // 允许跨域
}
}
4、添加拦截器 WebSocketInterceptor 来获取到webocket链接携带的userId和nickName
package com.ruoyi.websocket.interceptor;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import java.net.URI;
import java.util.Map;
@Component
public class WebSocketInterceptor implements HandshakeInterceptor {
@Override
public boolean beforeHandshake(
ServerHttpRequest request,
ServerHttpResponse response,
WebSocketHandler wsHandler,
Map<String, Object> attributes) throws Exception {
URI uri = request.getURI();
String query = uri.getQuery(); // userId=xxx&nickName=yyy
if (query == null) return false;
Map<String, String> paramMap = parseQuery(query);
String userId = paramMap.get("userId");
String nickName = paramMap.get("nickName");
if (userId == null || nickName == null) {
return false; // 拒绝握手
}
// 放入 WebSocketSession attributes,后面 WebSocketHandler 可取
attributes.put("userId", userId);
attributes.put("nickName", nickName);
return true; // 允许握手
}
@Override
public void afterHandshake(
ServerHttpRequest request,
ServerHttpResponse response,
WebSocketHandler wsHandler,
Exception exception) {
// 握手完成后进行的操作
}
//拆分传递的参数
private Map<String, String> parseQuery(String query) {
Map<String, String> map = new java.util.HashMap<>();
if (query == null || query.isEmpty()) return map;
String[] pairs = query.split("&");
for (String pair : pairs) {
int idx = pair.indexOf('=');
if (idx > 0) {
String key = pair.substring(0, idx);
String value = pair.substring(idx + 1);
map.put(key, value);
}
}
return map;
}
}
5、添加 PushMessageWebsocketHandler 来处理推送信息
package com.ruoyi.websocket.handler;
import com.alibaba.fastjson2.JSONObject;
import com.ruoyi.common.core.domain.entity.SysUser;
import com.ruoyi.system.mapper.SysUserMapper;
import com.ruoyi.websocket.domain.WbNoticeMessage;
import com.ruoyi.websocket.service.IWbNoticeMessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.*;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* 消息推送 WebSocket Handler
*/
@Component
public class PushMessageWebsocketHandler extends TextWebSocketHandler {
@Autowired
private IWbNoticeMessageService wbNoticeMessageService;
@Autowired
private SysUserMapper userMapper;
// 存储所有连接的会话
private final Set<WebSocketSession> sessions = Collections.synchronizedSet(new HashSet<>());
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
sessions.add(session);
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
//获取前端发送的message
String payload = message.getPayload();
// 解析整个 JSON 对象
JSONObject jsonObject = JSONObject.parseObject(payload);
// 心跳检测
String type = jsonObject.getString("type");
if ("ping".equalsIgnoreCase(type)) {
session.sendMessage(new TextMessage("{\"type\":\"pong\"}"));
return;
}
//获取websocket携带的参数的userId和nickName
// todo 前端可以通过token携带参数,然后使用ruoyi封装的token方法获取到当前用户,这里方便演示和通用性直接使用前端传递的UserId和nickName
String userId = (String) session.getAttributes().get("userId");
String nickName = (String) session.getAttributes().get("nickName");
// 提取 data 对象--从这里添加前端所需要推送的字段
JSONObject data = jsonObject.getJSONObject("data");
String title = data.getString("title");
String content = data.getString("content");
Long receiverId = data.getLong("receiverId");
String receiverName = data.getString("receiverName");
// 1. 如果receiverId为空则是群发,否则是单发,保存消息到数据库
// todo 可以自行根据前端传递的type来判断是群发还是单发,这里为了方便演示直接通过receiverId是否为空来判断
if (receiverId != null) {
WbNoticeMessage wbNoticeMessage = new WbNoticeMessage();
wbNoticeMessage.setTitle(title);
wbNoticeMessage.setContent(content);
wbNoticeMessage.setSenderId(Long.parseLong(userId));
wbNoticeMessage.setSenderName(nickName);
wbNoticeMessage.setReceiverId(receiverId);
wbNoticeMessage.setReceiverName(receiverName);
wbNoticeMessageService.insertWbNoticeMessage(wbNoticeMessage);
} else {
SysUser user = new SysUser();
List<SysUser> userList = userMapper.selectUserList(user);
for (SysUser sysUser : userList) {
WbNoticeMessage wbNoticeMessage = new WbNoticeMessage();
wbNoticeMessage.setTitle(title);
wbNoticeMessage.setContent(content);
wbNoticeMessage.setSenderId(Long.parseLong(userId));
wbNoticeMessage.setSenderName(nickName);
wbNoticeMessage.setReceiverId(sysUser.getUserId());
wbNoticeMessage.setReceiverName(receiverName);
wbNoticeMessageService.insertWbNoticeMessage(wbNoticeMessage);
}
}
// 2. 给所有在线客户端广播消息
for (WebSocketSession s : sessions) {
if (s.isOpen()) {
s.sendMessage(new TextMessage(payload));
}
}
// todo 3.重要的信息还可以通过邮件等其他方式通知用户
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
sessions.remove(session);
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
exception.printStackTrace();
sessions.remove(session);
if (session.isOpen()) {
session.close();
}
}
}
【前端篇】
1、创建消息铃铛样式,封装成组件
InfoBell.vue代码
<template>
<div>
<el-tooltip :content="noticeContent" effect="dark" placement="bottom">
<el-badge :value="noticeCount" class="right-menu-item hover-effect" :class="{ 'badge-custom': noticeCount > 0 }">
<i class="el-icon-message-solid" @click="toNoticePage"></i>
</el-badge>
</el-tooltip>
</div>
</template>
<script>
import { listWbNoticeMessage } from "@/api/websocket/WbNoticeMessage";
export default {
name: "InfoBell",
props: {
refreshNoticeCount: {
type: Boolean,
default: false
}
},
data() {
return {
noticeContent: "", // 通知内容
noticeCount: 0, // 通知数量
socket: null, // WebSocket 实例
// 查询参数
queryParams: {
pageNum: 1,
pageSize: 10,
title: null,
content: null,
type: null,
senderId: null,
senderName: null,
receiverId: this.$store.state.user.id,
receiverName: null,
isRead: null,
readTime: null,
priority: null,
targetUrl: null,
bizType: null,
bizId: null
},
};
},
created() {
this.getList();
},
mounted() {
this.initWebSocket();
},
beforeDestroy() {
this.closeWebSocket();
},
watch: {
refreshNoticeCount(val) {
if (val) {
this.getList();
}
}
},
methods: {
/**---------------------websocket专栏-------------------- */
/** 初始化/连接 WebSocket */
initWebSocket() {
// const token // 需要鉴权
const currentUserId = this.$store.state.user.id;
const currentUserNickName = this.$store.state.user.nickName;
const wsUrl = `ws://localhost:8080/websocket/pushMessage?userId=${currentUserId}&nickName=${currentUserNickName}`; // 替换为你的 WebSocket 地址
this.socket = new WebSocket(wsUrl);
this.socket.onopen = () => {
console.log("头部导航消息铃铛-WebSocket 连接已建立");
this.startHeartbeat();//启用心跳机制
};
this.socket.onmessage = (event) => {
try {
const msg = JSON.parse(event.data);
if (msg.type === "pong") {
console.log("收到心跳 pong");
return;
}
} catch (e) {
// 非 JSON 消息,继续执行
}
this.getList();
};
this.socket.onerror = (error) => {
console.error("头部导航消息铃铛-WebSocket 发生错误:", error);
};
this.socket.onclose = () => {
console.log("头部导航消息铃铛-WebSocket 已关闭");
this.stopHeartbeat();
this.tryReconnect();
};
},
/** 关闭 WebSocket */
closeWebSocket() {
if (this.socket) {
this.socket.close();
this.socket = null;
}
this.stopHeartbeat();
if (this.reconnectTimer) {
clearInterval(this.reconnectTimer);
this.reconnectTimer = null;
}
},
/** 启动心跳 */
startHeartbeat() {
this.heartbeatTimer = setInterval(() => {
if (this.socket && this.socket.readyState === WebSocket.OPEN) {
this.socket.send(JSON.stringify({ type: "ping" }));
console.log("发送心跳 ping");
}
}, 30000); // 每 30 秒
},
/** 停止心跳 */
stopHeartbeat() {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
},
/** 尝试重连 */
tryReconnect() {
if (this.reconnectTimer) return;
this.reconnectTimer = setInterval(() => {
console.log("尝试重连 InfoBell-WebSocket...");
this.initWebSocket();
if (this.socket && this.socket.readyState === WebSocket.OPEN) {
clearInterval(this.reconnectTimer);
this.reconnectTimer = null;
}
}, 5000); // 每 5 秒重连一次
},
/** -------------------------- 业务处理专栏---------------------- */
/** 查询通知信息框列表 */
getList() {
this.queryParams.isRead = 0;
listWbNoticeMessage(this.queryParams).then(response => {
this.noticeCount = response.total;
this.noticeContent = `您有${this.noticeCount}条未读的信息`;
})
},
/** 跳转到通知页面 */
toNoticePage() {
this.$router.push("/websocket/pushMessage");
},
},
};
</script>
<style lang="scss" scoped>
::v-deep .el-badge__content {
margin-top: 9px;
margin-right: 1px;
}
.badge-custom {
animation: blink-animation 0.5s infinite alternate;
}
@keyframes blink-animation {
0% {
opacity: 1;
}
100% {
opacity: 0.1;
}
}
</style>
2、在顶部导航引用消息铃铛组件(InfoBell)
引入组件后,页面就完成了
3、创建推送信息查看页面
pushMessage.vue代码
<template>
<div style="padding: 50px;">
<el-row :gutter="20">
<el-col :span="5" >
<el-card>
<h3>消息推送(快捷创建)</h3>
<el-form ref="form" :model="form" label-width="90px">
<el-form-item label="通知标题" prop="title">
<el-input v-model="form.title" placeholder="请输入通知标题" />
</el-form-item>
<el-form-item label="通知内容">
<el-input v-model="form.content" placeholder="请输入通知标题" type="textarea" />
</el-form-item>
<el-form-item label="接收人ID" prop="receiverId">
<el-input v-model="form.receiverId" placeholder="请输入接收人ID" />
</el-form-item>
<el-form-item label="接收人昵称" prop="receiverName">
<el-input v-model="form.receiverName" placeholder="请输入接收人昵称" />
</el-form-item>
</el-form>
<div style="color: red;font-weight: 600;font-size: 14px;">PS:不填接受人id则视为群发</div>
<el-button type="primary" @click="sendMessage" style="margin-top: 10px;">
推送消息
</el-button>
<el-divider></el-divider>
<div style="height: 300px; overflow-y: auto; border: 1px solid #ebeef5; padding: 10px;">
<div v-for="(msg, index) in messages" :key="index" style="margin-bottom: 8px;">
<el-tag type="info" size="small">消息 {{ index + 1 }}</el-tag>
<span style="margin-left: 8px;">{{ msg }}</span>
</div>
</div>
</el-card>
</el-col>
<el-col :span="19">
<el-card>
<el-tabs v-model="activeName" @tab-click="handleClick">
<el-tab-pane label="未读" name="unread">
<el-table v-loading="loading" :data="WbNoticeMessageList">
<el-table-column label="id" align="center" prop="id" />
<el-table-column label="通知标题" align="center" prop="title" />
<el-table-column label="通知内容" align="center" prop="content" />
<el-table-column label="消息类型" align="center" prop="type" />
<el-table-column label="发送人ID" align="center" prop="senderId" />
<el-table-column label="发送人名称" align="center" prop="senderName" />
<el-table-column label="接受者ID" align="center" prop="receiverId" />
<el-table-column label="接受者名称" align="center" prop="receiverName" />
<el-table-column label="是否已读" align="center" prop="isRead" />
<el-table-column label="阅读时间" align="center" prop="readTime" width="100">
<template slot-scope="scope">
<span>{{ parseTime(scope.row.readTime, '{y}-{m}-{d}') }}</span>
</template>
</el-table-column>
<el-table-column label="优先级" align="center" prop="priority" />
<el-table-column label="业务类型" align="center" prop="bizType" />
<el-table-column label="业务ID" align="center" prop="bizId" />
<el-table-column label="操作" align="center" class-name="small-padding fixed-width">
<template slot-scope="scope">
<el-button
size="mini"
type="text"
icon="el-icon-edit"
@click="handleUpdateReadStatus(scope.row)"
>设为已读</el-button>
</template>
</el-table-column>
</el-table>
<pagination v-show="total > 0" :total="total" :page.sync="queryParams.pageNum" :limit.sync="queryParams.pageSize" @pagination="getList" />
</el-tab-pane>
<el-tab-pane label="已读" name="read">
<el-table v-loading="loading" :data="WbNoticeMessageList" >
<el-table-column label="id" align="center" prop="id" />
<el-table-column label="通知标题" align="center" prop="title" />
<el-table-column label="通知内容" align="center" prop="content" />
<el-table-column label="消息类型" align="center" prop="type" />
<el-table-column label="发送人ID" align="center" prop="senderId" />
<el-table-column label="发送人名称" align="center" prop="senderName" />
<el-table-column label="接受者ID" align="center" prop="receiverId" />
<el-table-column label="接受者名称" align="center" prop="receiverName" />
<el-table-column label="是否已读" align="center" prop="isRead" />
<el-table-column label="阅读时间" align="center" prop="readTime" width="100">
<template slot-scope="scope">
<span>{{ parseTime(scope.row.readTime, '{y}-{m}-{d}') }}</span>
</template>
</el-table-column>
<el-table-column label="优先级" align="center" prop="priority" />
<el-table-column label="业务类型" align="center" prop="bizType" />
<el-table-column label="业务ID" align="center" prop="bizId" />
</el-table>
<pagination v-show="total > 0" :total="total" :page.sync="queryParams.pageNum" :limit.sync="queryParams.pageSize" @pagination="getList" />
</el-tab-pane>
</el-tabs>
</el-card>
</el-col>
</el-row>
<div v-show="false">
<info-bell :refreshNoticeCount="isRefreshNoticeCount" />
</div>
</div>
</template>
<script>
import { listWbNoticeMessage,updateReadStatus} from "@/api/websocket/WbNoticeMessage"
import InfoBell from "@/components/InfoBell";
export default {
name:"pushMesage",
components: { InfoBell },
data() {
return {
ws: null,
message: '',
messages: [],
loading: true,
total: 0,
WbNoticeMessageList: [],
form:{},
// 查询参数
queryParams: {
pageNum: 1,
pageSize: 10,
title: null,
content: null,
type: null,
senderId: null,
senderName: null,
receiverId: this.$store.state.user.id,
receiverName: null,
isRead: null,
readTime: null,
priority: null,
targetUrl: null,
bizType: null,
bizId: null
},
activeName: 'unread',
isRefreshNoticeCount:false,//是否刷新通知数量
};
},
methods: {
connectWebSocket() {
// 连接 WebSocket,地址根据后端实际情况调整
const currentUserId = this.$store.state.user.id;
const currentUserNickName = this.$store.state.user.nickName;
this.ws = new WebSocket(`ws://localhost:8080/websocket/pushMessage?userId=${currentUserId}&nickName=${currentUserNickName}`);
this.ws.onopen = () => {
console.log("推送信息-WebSocket 已连接");
this.addMessage("推送信息-WebSocket 已连接");
};
this.ws.onmessage = event => {
console.log("收到消息:", event.data);
this.addMessage(event.data);
};
this.ws.onclose = () => {
this.addMessage("推送信息-WebSocket 已关闭");
};
this.ws.onerror = error => {
this.addMessage("推送信息-WebSocket 发生错误");
};
},
sendMessage() {
if (!this.form.content.trim()) {
this.$message.warning("请输入消息内容");
return;
}
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
// 发送整个表单内容
this.ws.send(JSON.stringify({
data: this.form
}));
this.$message.success("消息发送成功");
// 因为websocket发送请求是异步的,为了方便显示这里使用了延时,实际情况还是要在后端通过返回值来显示getList
setTimeout(() => {
this.getList();
}, 500);
} else {
this.$message.error("WebSocket 未连接");
}
},
addMessage(msg) {
this.messages.push(msg);
this.$nextTick(() => {
// 自动滚动到底部
const container = this.$el.querySelector("div[style*='overflow-y']");
if (container) container.scrollTop = container.scrollHeight;
});
},
/** --------------------------------- 信息模块 --------------------- */
handleClick(){
this.getList();
},
/** 查询通知信息框列表 */
getList() {
this.loading = true
this.queryParams.isRead = this.activeName === 'unread' ? 0 : 1;
console.log(this.queryParams);
listWbNoticeMessage(this.queryParams).then(response => {
this.WbNoticeMessageList = response.rows
this.total = response.total
this.loading = false
})
},
handleUpdateReadStatus(row){
if (row.id != null) {
updateReadStatus(row.id).then(response => {
this.isRefreshNoticeCount = true;
console.log(this.$store);
this.$modal.msgSuccess("该信息已标记为已读~")
this.getList();
})
}
}
},
created() {
this.getList();
},
mounted() {
this.connectWebSocket();
},
beforeDestroy() {
if (this.ws) {
this.ws.close();
}
}
};
</script>
<style scoped>
</style>
以下是快捷创建推送信息的页面
4、详解【心跳机制】
一、详解
WebSocket 的心跳机制,是一种保持连接活跃、防止断线、检测对方是否存活的机制。特别是在使用 WebSocket 建立了长连接之后,如果网络设备(如代理、网关、防火墙)或者服务端/客户端本身在长时间没有数据传输时自动断开连接,就会导致推送失败、消息丢失的问题。
二、为什么要使用心跳机制?
1、防止连接被中间设备断开
很多中间设备(比如 Nginx、CDN、防火墙)会在一段时间内没有数据传输时,主动断开“看起来闲置”的连接。
2、检测对方是否在线
如果客户端意外断线(如:网络断了、电脑睡眠、浏览器崩溃),服务器端并不知道,继续保留 WebSocket 会话资源,浪费内存。
3、实现自动重连
通过心跳,可以判断连接是否断开,如果断了,客户端就能自动发起重连。
三、心跳机制怎么工作?
通常的设计方式如下:
角色 | 行为说明 |
---|---|
客户端 | 每隔一段时间(如 30 秒)发送一个特定的“心跳包”消息,如 { "type": "ping" } |
服务端 | 收到 ping 后立即回复 { "type": "pong" } ,表示“我还活着” |
客户端 | 若在预期时间内未收到 pong ,说明可能断线,可以发起重连 |
四、代码实操
【浏览器】,每隔30秒向【后端】发送ping信号,后端接收到了返回pong信号表示通信正常,不做任何业务处理。
可以理解成这是一个地震的救援过程:
遇难者被埋在了地底下,救援人员在进行挖地救援,遇难者每隔30秒向救援人员叫喊一声:ping!,救援人员听到了遇难者的声音得知遇难者还活着,随之回复一声:pong!。表示别怕,我正在救援。表示通信正常。
【前端发起心跳】
/** 启动心跳 */
startHeartbeat() {
this.heartbeatTimer = setInterval(() => {
if (this.socket && this.socket.readyState === WebSocket.OPEN) {
this.socket.send(JSON.stringify({ type: "ping" }));
console.log("发送心跳 ping");
}
}, 30000); // 每 30 秒
},
/** 停止心跳 */
stopHeartbeat() {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
},
/** 尝试重连 */
tryReconnect() {
if (this.reconnectTimer) return;
this.reconnectTimer = setInterval(() => {
console.log("正在尝试重连 InfoBell-WebSocket...");
this.initWebSocket();
if (this.socket && this.socket.readyState === WebSocket.OPEN) {
clearInterval(this.reconnectTimer);
this.reconnectTimer = null;
}
}, 5000); // 每 5 秒重连一次
},
【后端接收心跳】
// 心跳检测
String type = jsonObject.getString("type");
if ("ping".equalsIgnoreCase(type)) {
session.sendMessage(new TextMessage("{\"type\":\"pong\"}"));
return;
}
代码将整理成 ruoyi-vue-websocket上传到git~