实现消息推送功能
来了,来了,大家做系统应该是最关心这个功能。
【思路】
需求:对全系统【所有的业务操作】进行消息推送,有【群发】、【私发】功能、处理【消息状态(未读/已读)】,websocket持续链接防止因其他故障中断【心跳机制】
【后端篇】
1、确定自己系统的需求,先做数据表
通过代码生成,对后续推送的信息进行保存,通过is_read字段来对消息进行已读未读操作
添加mapper
/**
* 设为已读
* @param id 消息的id
* @return 结果
* */
public int updateWbNoticeMessageReadStatus(Long id);
添加service
/**
* 设为已读
* @param id 消息的id
* @return 结果
* */
public int updateWbNoticeMessageReadStatus(Long id);
添加serviceImpl
/**
* 更新消息的阅读状态
* @param id 消息的id
* @return
*/
@Override
public int updateWbNoticeMessageReadStatus(Long id) {
return wbNoticeMessageMapper.updateWbNoticeMessageReadStatus(id);
}
添加mapper.xml下的方法
<update id="updateWbNoticeMessageReadStatus" parameterType="Long">
update wb_notice_message
set is_read = '1'
where id = #{id}
</update>
2、明确websocket链接
消息的推送,肯定是有推送人和被推送人,根据如何获取这些数据来确定你的websocket链接
// const token // 需要鉴权
const currentUserId = this.$store.state.user.id;
const currentUserNickName = this.$store.state.user.nickName;
const wsUrl = `ws://localhost:8080/websocket/pushMessage?userId=${currentUserId}&nickName=${currentUserNickName}`; // 替换为你的 WebSocket 地址
this.socket = new WebSocket(wsUrl);
这是我的websocket链接,可以看出我是通过前端拼接的userId和userName来获取到推送人信息的。
ps:实际开发过程中最好是通过token来获取,并解析出用户,进行后续的操作,此处是为了方便理解和通用
3、配置WebSocketConfig
package com.ruoyi.websocket.config;
import com.ruoyi.websocket.handler.ChatRoomSessionIdWebsocketHandler;
import com.ruoyi.websocket.handler.ChatRoomUserIdWebsocketHandler;
import com.ruoyi.websocket.handler.ConnectWebsocketHandler;
import com.ruoyi.websocket.handler.PushMessageWebsocketHandler;
import com.ruoyi.websocket.interceptor.WebSocketInterceptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.*;
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Autowired
private PushMessageWebsocketHandler pushMessageWebsocketHandler;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
//连接websocket测试
registry.addHandler(new ConnectWebsocketHandler(), "/websocket")
.setAllowedOrigins("*"); // 允许跨域
//聊天室 -- sessionId版
registry.addHandler(new ChatRoomSessionIdWebsocketHandler(), "/websocket/chatRoomSessionId")
.setAllowedOrigins("*"); // 允许跨域
//聊天室 -- UserId版
registry.addHandler(new ChatRoomUserIdWebsocketHandler(), "/websocket/chatRoomUserId")
.addInterceptors(new WebSocketInterceptor())//拦截器用来获取前端传递过来的userid
.setAllowedOrigins("*"); // 允许跨域
//消息推送
registry.addHandler(pushMessageWebsocketHandler, "/websocket/pushMessage")
.addInterceptors(new WebSocketInterceptor())//拦截器用来获取前端传递过来的userid
.setAllowedOrigins("*"); // 允许跨域
}
}
4、添加拦截器 WebSocketInterceptor 来获取到webocket链接携带的userId和nickName
package com.ruoyi.websocket.interceptor;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import java.net.URI;
import java.util.Map;
@Component
public class WebSocketInterceptor implements HandshakeInterceptor {
@Override
public boolean beforeHandshake(
ServerHttpRequest request,
ServerHttpResponse response,
WebSocketHandler wsHandler,
Map<String, Object> attributes) throws Exception {
URI uri = request.getURI();
String query = uri.getQuery(); // userId=xxx&nickName=yyy
if (query == null) return false;
Map<String, String> paramMap = parseQuery(query);
String userId = paramMap.get("userId");
String nickName = paramMap.get("nickName");
if (userId == null || nickName == null) {
return false; // 拒绝握手
}
// 放入 WebSocketSession attributes,后面 WebSocketHandler 可取
attributes.put("userId", userId);
attributes.put("nickName", nickName);
return true; // 允许握手
}
@Override
public void afterHandshake(
ServerHttpRequest request,
ServerHttpResponse response,
WebSocketHandler wsHandler,
Exception exception) {
// 握手完成后进行的操作
}
//拆分传递的参数
private Map<String, String> parseQuery(String query) {
Map<String, String> map = new java.util.HashMap<>();
if (query == null || query.isEmpty()) return map;
String[] pairs = query.split("&");
for (String pair : pairs) {
int idx = pair.indexOf('=');
if (idx > 0) {
String key = pair.substring(0, idx);
String value = pair.substring(idx + 1);
map.put(key, value);
}
}
return map;
}
}
5、添加 PushMessageWebsocketHandler 来处理推送信息
package com.ruoyi.websocket.handler;
import com.alibaba.fastjson2.JSONObject;
import com.ruoyi.common.core.domain.entity.SysUser;
import com.ruoyi.system.mapper.SysUserMapper;
import com.ruoyi.websocket.domain.WbNoticeMessage;
import com.ruoyi.websocket.service.IWbNoticeMessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.*;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* 消息推送 WebSocket Handler
*/
@Component
public class PushMessageWebsocketHandler extends TextWebSocketHandler {
@Autowired
private IWbNoticeMessageService wbNoticeMessageService;
@Autowired
private SysUserMapper userMapper;
// 存储所有连接的会话
private final Set<WebSocketSession> sessions = Collections.synchronizedSet(new HashSet<>());
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
sessions.add(session);
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
//获取前端发送的message
String payload = message.getPayload();
// 解析整个 JSON 对象
JSONObject jsonObject = JSONObject.parseObject(payload);
// 心跳检测
String type = jsonObject.getString("type");
if ("ping".equalsIgnoreCase(type)) {
session.sendMessage(new TextMessage("{\"type\":\"pong\"}"));
return;
}
//获取websocket携带的参数的userId和nickName
// todo 前端可以通过token携带参数,然后使用ruoyi封装的token方法获取到当前用户,这里方便演示和通用性直接使用前端传递的UserId和nickName
String userId = (String) session.getAttributes().get("userId");
String nickName = (String) session.getAttributes().get("nickName");
// 提取 data 对象--从这里添加前端所需要推送的字段
JSONObject data = jsonObject.getJSONObject("data");
String title = data.getString("title");
String content = data.getString("content");
Long receiverId = data.getLong("receiverId");
String receiverName = data.getString("receiverName");
// 1. 如果receiverId为空则是群发,否则是单发,保存消息到数据库
// todo 可以自行根据前端传递的type来判断是群发还是单发,这里为了方便演示直接通过receiverId是否为空来判断
if (receiverId != null) {
WbNoticeMessage wbNoticeMessage = new WbNoticeMessage();
wbNoticeMessage.setTitle(title);
wbNoticeMessage.setContent(content);
wbNoticeMessage.setSenderId(Long.parseLong(userId));
wbNoticeMessage.setSenderName(nickName);
wbNoticeMessage.setReceiverId(receiverId);
wbNoticeMessage.setReceiverName(receiverName);
wbNoticeMessageService.insertWbNoticeMessage(wbNoticeMessage);
} else {
SysUser user = new SysUser();
List<SysUser> userList = userMapper.selectUserList(user);
for (SysUser sysUser : userList) {
WbNoticeMessage wbNoticeMessage = new WbNoticeMessage();
wbNoticeMessage.setTitle(title);
wbNoticeMessage.setContent(content);
wbNoticeMessage.setSenderId(Long.parseLong(userId));
wbNoticeMessage.setSenderName(nickName);
wbNoticeMessage.setReceiverId(sysUser.getUserId());
wbNoticeMessage.setReceiverName(receiverName);
wbNoticeMessageService.insertWbNoticeMessage(wbNoticeMessage);
}
}
// 2. 给所有在线客户端广播消息
for (WebSocketSession s : sessions) {
if (s.isOpen()) {
s.sendMessage(new TextMessage(payload));
}
}
// todo 3.重要的信息还可以通过邮件等其他方式通知用户
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
sessions.remove(session);
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
exception.printStackTrace();
sessions.remove(session);
if (session.isOpen()) {
session.close();
}
}
}
【前端篇】
1、创建消息铃铛样式,封装成组件
InfoBell.vue代码
<template>
<div>
<el-tooltip :content="noticeContent" effect="dark" placement="bottom">
<el-badge :value="noticeCount" class="right-menu-item hover-effect" :class="{ 'badge-custom': noticeCount > 0 }">
<i class="el-icon-message-solid" @click="toNoticePage"></i>
</el-badge>
</el-tooltip>
</div>
</template>
<script>
import { listWbNoticeMessage } from "@/api/websocket/WbNoticeMessage";
export default {
name: "InfoBell",
props: {
refreshNoticeCount: {
type: Boolean,
default: false
}
},
data() {
return {
noticeContent: "", // 通知内容
noticeCount: 0, // 通知数量
socket: null, // WebSocket 实例
// 查询参数
queryParams: {
pageNum: 1,
pageSize: 10,
title: null,
content: null,
type: null,
senderId: null,
senderName: null,
receiverId: this.$store.state.user.id,
receiverName: null,
isRead: null,
readTime: null,
priority: null,
targetUrl: null,
bizType: null,
bizId: null
},
};
},
created() {
this.getList();
},
mounted() {
this.initWebSocket();
},
beforeDestroy() {
this.closeWebSocket();
},
watch: {
refreshNoticeCount(val) {
if (val) {
this.getList();
}
}
},
methods: {
/**---------------------websocket专栏-------------------- */
/** 初始化/连接 WebSocket */
initWebSocket() {
// const token // 需要鉴权
const currentUserId = this.$store.state.user.id;
const currentUserNickName = this.$store.state.user.nickName;
const wsUrl = `ws://localhost:8080/websocket/pushMessage?userId=${currentUserId}&nickName=${currentUserNickName}`; // 替换为你的 WebSocket 地址
this.socket = new WebSocket(wsUrl);
this.socket.onopen = () => {
console.log("头部导航消息铃铛-WebSocket 连接已建立");
this.startHeartbeat();//启用心跳机制
};
this.socket.onmessage = (event) => {
try {
const msg = JSON.parse(event.data);
if (msg.type === "pong") {
console.log("收到心跳 pong");
return;
}
} catch (e) {
// 非 JSON 消息,继续执行
}
this.getList();
};
this.socket.onerror = (error) => {
console.error("头部导航消息铃铛-WebSocket 发生错误:", error);
};
this.socket.onclose = () => {
console.log("头部导航消息铃铛-WebSocket 已关闭");
this.stopHeartbeat();
this.tryReconnect();
};
},
/** 关闭 WebSocket */
closeWebSocket() {
if (this.socket) {
this.socket.close();
this.socket = null;
}
this.stopHeartbeat();
if (this.reconnectTimer) {
clearInterval(this.reconnectTimer);
this.reconnectTimer = null;
}
},
/** 启动心跳 */
startHeartbeat() {
this.heartbeatTimer = setInterval(() => {
if (this.socket && this.socket.readyState === WebSocket.OPEN) {
this.socket.send(JSON.stringify({ type: "ping" }));
console.log("发送心跳 ping");
}
}, 30000); // 每 30 秒
},
/** 停止心跳 */
stopHeartbeat() {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
},
/** 尝试重连 */
tryReconnect() {
if (this.reconnectTimer) return;
this.reconnectTimer = setInterval(() => {
console.log("尝试重连 InfoBell-WebSocket...");
this.initWebSocket();
if (this.socket && this.socket.readyState === WebSocket.OPEN) {
clearInterval(this.reconnectTimer);
this.reconnectTimer = null;
}
}, 5000); // 每 5 秒重连一次
},
/** -------------------------- 业务处理专栏---------------------- */
/** 查询通知信息框列表 */
getList() {
this.queryParams.isRead = 0;
listWbNoticeMessage(this.queryParams).then(response => {
this.noticeCount = response.total;
this.noticeContent = `您有${this.noticeCount}条未读的信息`;
})
},
/** 跳转到通知页面 */
toNoticePage() {
this.$router.push("/websocket/pushMessage");
},
},
};
</script>
<style lang="scss" scoped>
::v-deep .el-badge__content {
margin-top: 9px;
margin-right: 1px;
}
.badge-custom {
animation: blink-animation 0.5s infinite alternate;
}
@keyframes blink-animation {
0% {
opacity: 1;
}
100% {
opacity: 0.1;
}
}
</style>
2、在顶部导航引用消息铃铛组件(InfoBell)
引入组件后,页面就完成了
3、创建推送信息查看页面
pushMessage.vue代码
<template>
<div style="padding: 50px;">
<el-row :gutter="20">
<el-col :span="5" >
<el-card>
<h3>消息推送(快捷创建)</h3>
<el-form ref="form" :model="form" label-width="90px">
<el-form-item label="通知标题" prop="title">
<el-input v-model="form.title" placeholder="请输入通知标题" />
</el-form-item>
<el-form-item label="通知内容">
<el-input v-model="form.content" placeholder="请输入通知标题" type="textarea" />
</el-form-item>
<el-form-item label="接收人ID" prop="receiverId">
<el-input v-model="form.receiverId" placeholder="请输入接收人ID" />
</el-form-item>
<el-form-item label="接收人昵称" prop="receiverName">
<el-input v-model="form.receiverName" placeholder="请输入接收人昵称" />
</el-form-item>
</el-form>
<div style="color: red;font-weight: 600;font-size: 14px;">PS:不填接受人id则视为群发</div>
<el-button type="primary" @click="sendMessage" style="margin-top: 10px;">
推送消息
</el-button>
<el-divider></el-divider>
<div style="height: 300px; overflow-y: auto; border: 1px solid #ebeef5; padding: 10px;">
<div v-for="(msg, index) in messages" :key="index" style="margin-bottom: 8px;">
<el-tag type="info" size="small">消息 {{ index + 1 }}</el-tag>
<span style="margin-left: 8px;">{{ msg }}</span>
</div>
</div>
</el-card>
</el-col>
<el-col :span="19">
<el-card>
<el-tabs v-model="activeName" @tab-click="handleClick">
<el-tab-pane label="未读" name="unread">
<el-table v-loading="loading" :data="WbNoticeMessageList">
<el-table-column label="id" align="center" prop="id" />
<el-table-column label="通知标题" align="center" prop="title" />
<el-table-column label="通知内容" align="center" prop="content" />
<el-table-column label="消息类型" align="center" prop="type" />
<el-table-column label="发送人ID" align="center" prop="senderId" />
<el-table-column label="发送人名称" align="center" prop="senderName" />
<el-table-column label="接受者ID" align="center" prop="receiverId" />
<el-table-column label="接受者名称" align="center" prop="receiverName" />
<el-table-column label="是否已读" align="center" prop="isRead" />
<el-table-column label="阅读时间" align="center" prop="readTime" width="100">
<template slot-scope="scope">
<span>{{ parseTime(scope.row.readTime, '{y}-{m}-{d}') }}</span>
</template>
</el-table-column>
<el-table-column label="优先级" align="center" prop="priority" />
<el-table-column label="业务类型" align="center" prop="bizType" />
<el-table-column label="业务ID" align="center" prop="bizId" />
<el-table-column label="操作" align="center" class-name="small-padding fixed-width">
<template slot-scope="scope">
<el-button
size="mini"
type="text"
icon="el-icon-edit"
@click="handleUpdateReadStatus(scope.row)"
>设为已读</el-button>
</template>
</el-table-column>
</el-table>
<pagination v-show="total > 0" :total="total" :page.sync="queryParams.pageNum" :limit.sync="queryParams.pageSize" @pagination="getList" />
</el-tab-pane>
<el-tab-pane label="已读" name="read">
<el-table v-loading="loading" :data="WbNoticeMessageList" >
<el-table-column label="id" align="center" prop="id" />
<el-table-column label="通知标题" align="center" prop="title" />
<el-table-column label="通知内容" align="center" prop="content" />
<el-table-column label="消息类型" align="center" prop="type" />
<el-table-column label="发送人ID" align="center" prop="senderId" />
<el-table-column label="发送人名称" align="center" prop="senderName" />
<el-table-column label="接受者ID" align="center" prop="receiverId" />
<el-table-column label="接受者名称" align="center" prop="receiverName" />
<el-table-column label="是否已读" align="center" prop="isRead" />
<el-table-column label="阅读时间" align="center" prop="readTime" width="100">
<template slot-scope="scope">
<span>{{ parseTime(scope.row.readTime, '{y}-{m}-{d}') }}</span>
</template>
</el-table-column>
<el-table-column label="优先级" align="center" prop="priority" />
<el-table-column label="业务类型" align="center" prop="bizType" />
<el-table-column label="业务ID" align="center" prop="bizId" />
</el-table>
<pagination v-show="total > 0" :total="total" :page.sync="queryParams.pageNum" :limit.sync="queryParams.pageSize" @pagination="getList" />
</el-tab-pane>
</el-tabs>
</el-card>
</el-col>
</el-row>
<div v-show="false">
<info-bell :refreshNoticeCount="isRefreshNoticeCount" />
</div>
</div>
</template>
<script>
import { listWbNoticeMessage,updateReadStatus} from "@/api/websocket/WbNoticeMessage"
import InfoBell from "@/components/InfoBell";
export default {
name:"pushMesage",
components: { InfoBell },
data() {
return {
ws: null,
message: '',
messages: [],
loading: true,
total: 0,
WbNoticeMessageList: [],
form:{},
// 查询参数
queryParams: {
pageNum: 1,
pageSize: 10,
title: null,
content: null,
type: null,
senderId: null,
senderName: null,
receiverId: this.$store.state.user.id,
receiverName: null,
isRead: null,
readTime: null,
priority: null,
targetUrl: null,
bizType: null,
bizId: null
},
activeName: 'unread',
isRefreshNoticeCount:false,//是否刷新通知数量
};
},
methods: {
connectWebSocket() {
// 连接 WebSocket,地址根据后端实际情况调整
const currentUserId = this.$store.state.user.id;
const currentUserNickName = this.$store.state.user.nickName;
this.ws = new WebSocket(`ws://localhost:8080/websocket/pushMessage?userId=${currentUserId}&nickName=${currentUserNickName}`);
this.ws.onopen = () => {
console.log("推送信息-WebSocket 已连接");
this.addMessage("推送信息-WebSocket 已连接");
};
this.ws.onmessage = event => {
console.log("收到消息:", event.data);
this.addMessage(event.data);
};
this.ws.onclose = () => {
this.addMessage("推送信息-WebSocket 已关闭");
};
this.ws.onerror = error => {
this.addMessage("推送信息-WebSocket 发生错误");
};
},
sendMessage() {
if (!this.form.content.trim()) {
this.$message.warning("请输入消息内容");
return;
}
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
// 发送整个表单内容
this.ws.send(JSON.stringify({
data: this.form
}));
this.$message.success("消息发送成功");
// 因为websocket发送请求是异步的,为了方便显示这里使用了延时,实际情况还是要在后端通过返回值来显示getList
setTimeout(() => {
this.getList();
}, 500);
} else {
this.$message.error("WebSocket 未连接");
}
},
addMessage(msg) {
this.messages.push(msg);
this.$nextTick(() => {
// 自动滚动到底部
const container = this.$el.querySelector("div[style*='overflow-y']");
if (container) container.scrollTop = container.scrollHeight;
});
},
/** --------------------------------- 信息模块 --------------------- */
handleClick(){
this.getList();
},
/** 查询通知信息框列表 */
getList() {
this.loading = true
this.queryParams.isRead = this.activeName === 'unread' ? 0 : 1;
console.log(this.queryParams);
listWbNoticeMessage(this.queryParams).then(response => {
this.WbNoticeMessageList = response.rows
this.total = response.total
this.loading = false
})
},
handleUpdateReadStatus(row){
if (row.id != null) {
updateReadStatus(row.id).then(response => {
this.isRefreshNoticeCount = true;
console.log(this.$store);
this.$modal.msgSuccess("该信息已标记为已读~")
this.getList();
})
}
}
},
created() {
this.getList();
},
mounted() {
this.connectWebSocket();
},
beforeDestroy() {
if (this.ws) {
this.ws.close();
}
}
};
</script>
<style scoped>
</style>
以下是快捷创建推送信息的页面
4、详解【心跳机制】
一、详解
WebSocket 的心跳机制,是一种保持连接活跃、防止断线、检测对方是否存活的机制。特别是在使用 WebSocket 建立了长连接之后,如果网络设备(如代理、网关、防火墙)或者服务端/客户端本身在长时间没有数据传输时自动断开连接,就会导致推送失败、消息丢失的问题。
二、为什么要使用心跳机制?
1、防止连接被中间设备断开
很多中间设备(比如 Nginx、CDN、防火墙)会在一段时间内没有数据传输时,主动断开“看起来闲置”的连接。
2、检测对方是否在线
如果客户端意外断线(如:网络断了、电脑睡眠、浏览器崩溃),服务器端并不知道,继续保留 WebSocket 会话资源,浪费内存。
3、实现自动重连
通过心跳,可以判断连接是否断开,如果断了,客户端就能自动发起重连。
三、心跳机制怎么工作?
通常的设计方式如下:
角色 | 行为说明 |
---|---|
客户端 | 每隔一段时间(如 30 秒)发送一个特定的“心跳包”消息,如 { "type": "ping" } |
服务端 | 收到 ping 后立即回复 { "type": "pong" } ,表示“我还活着” |
客户端 | 若在预期时间内未收到 pong ,说明可能断线,可以发起重连 |
四、代码实操
【浏览器】,每隔30秒向【后端】发送ping信号,后端接收到了返回pong信号表示通信正常,不做任何业务处理。
可以理解成这是一个地震的救援过程:
遇难者被埋在了地底下,救援人员在进行挖地救援,遇难者每隔30秒向救援人员叫喊一声:ping!,救援人员听到了遇难者的声音得知遇难者还活着,随之回复一声:pong!。表示别怕,我正在救援。表示通信正常。
【前端发起心跳】
/** 启动心跳 */
startHeartbeat() {
this.heartbeatTimer = setInterval(() => {
if (this.socket && this.socket.readyState === WebSocket.OPEN) {
this.socket.send(JSON.stringify({ type: "ping" }));
console.log("发送心跳 ping");
}
}, 30000); // 每 30 秒
},
/** 停止心跳 */
stopHeartbeat() {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
},
/** 尝试重连 */
tryReconnect() {
if (this.reconnectTimer) return;
this.reconnectTimer = setInterval(() => {
console.log("正在尝试重连 InfoBell-WebSocket...");
this.initWebSocket();
if (this.socket && this.socket.readyState === WebSocket.OPEN) {
clearInterval(this.reconnectTimer);
this.reconnectTimer = null;
}
}, 5000); // 每 5 秒重连一次
},
【后端接收心跳】
// 心跳检测
String type = jsonObject.getString("type");
if ("ping".equalsIgnoreCase(type)) {
session.sendMessage(new TextMessage("{\"type\":\"pong\"}"));
return;
}
代码将整理成 ruoyi-vue-websocket上传到git~