最近想学学WebSocket做一个实时通讯的练手项目
主要用到的技术栈是WebSocket Netty Vue Pinia MySQL SpringBoot,实现一个持久化数据,单一群聊,支持多用户的聊天界面
下面是实现的过程
后端
SpringBoot启动的时候会占用一个端口,而Netty也会占用一个端口,这两个端口不能重复,并且因为Netty启动后会阻塞当前线程,因此需要另开一个线程防止阻塞住SpringBoot
1. 编写Netty服务器
个人认为,Netty最关键的就是channel,可以代表一个客户端
我在这使用的是@PostConstruct注解,在Bean初始化后调用里面的方法,新开一个线程运行Netty,因为希望Netty受Spring管理,所以加上了spring的注解,也可以直接在启动类里注入Netty然后手动启动
@Service
public class NettyService {
private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
private EventLoopGroup workGroup = new NioEventLoopGroup();
@Autowired
private WebSocketHandler webSocketHandler;
@Autowired
private HeartBeatHandler heartBeatHandler;
@PostConstruct
public void initNetty() throws BaseException {
new Thread(()->{
try {
start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}).start();
}
@PreDestroy
public void destroy() throws BaseException {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
@Async
public void start() throws BaseException {
try {
ChannelFuture channelFuture = new ServerBootstrap()
.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.DEBUG))
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline()
// http解码编码器
.addLast(new HttpServerCodec())
// 处理完整的 HTTP 消息
.addLast(new HttpObjectAggregator(64 * 1024))
// 心跳检测时长
.addLast(new IdleStateHandler(300, 0, 0, TimeUnit.SECONDS))
// 心跳检测处理器
.addLast(heartBeatHandler)
// 支持ws协议(自定义)
.addLast(new WebSocketServerProtocolHandler("/ws",null,true,64*1024,true,true,10000))
// ws请求处理器(自定义)
.addLast(webSocketHandler)
;
}
}).bind(8081).sync();
System.out.println("Netty启动成功");
ChannelFuture future = channelFuture.channel().closeFuture().sync();
}
catch (InterruptedException e){
throw new InterruptedException ();
}
finally {
//优雅关闭
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
服务器类只是指明一些基本信息,包含处理器类,支持的协议等等,具体的处理逻辑需要再自定义类来实现
2. 心跳检测处理器
心跳检测是指 服务器无法主动确定客户端的状态(用户可能关闭了网页,但是服务端没办法知道),为了确定客户端是否在线,需要客户端定时发送一条消息,消息内容不重要,重要的是发送消息代表该客户端仍然在线,当客户端长时间没有发送数据时,代表客户端已经下线
package org.example.payroll_management.websocket.netty.handler;
@Component
@ChannelHandler.Sharable
public class HeartBeatHandler extends ChannelDuplexHandler {
@Autowired
private ChannelContext channelContext;
private static final Logger logger = LoggerFactory.getLogger(HeartBeatHandler.class);
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent){
// 心跳检测超时
IdleStateEvent e = (IdleStateEvent) evt;
logger.info("心跳检测超时");
if (e.state() == IdleState.READER_IDLE){
Attribute<Integer> attr = ctx.channel().attr(AttributeKey.valueOf(ctx.channel().id().toString()));
Integer userId = attr.get();
// 读超时,当前已经下线,主动断开连接
ChannelContext.removeChannel(userId);
ctx.close();
} else if (e.state() == IdleState.WRITER_IDLE){
ctx.writeAndFlush("心跳检测");
}
}
super.userEventTriggered(ctx, evt);
}
}
3. webSocket处理器
当客户端发送消息,消息的内容会发送当webSocket处理器中,可以对对应的方法进行处理,我这里偷懒了,就做了一个群组,全部用户只能在同一群中聊天,不过创建多个群组,或单对单聊天也不复杂,只需要将群组的ID进行保存就可以
这里就产生第一个问题了,就是SpringMVC的拦截器不会拦截其他端口的请求,解决方法是将token放置到请求参数中,在userEventTriggered方法中重新进行一次token检验
第二个问题,我是在拦截器中通过ThreadLocal保存用户ID,不走拦截器在其他地方拿不到用户ID,解决方法是,在userEventTriggered方法中重新保存,或者channel中可以保存附件(自身携带的数据),直接将id保存到附件中
第三个问题,消息的持久化,当用户重新打开界面时,肯定希望消息仍然存在,鉴于webSocket的实时性,数据持久化肯定不能在同一个线程中完成,我在这使用BlockingQueue+线程池完成对消息的异步保存,或者也可以用mq实现
不过用的Executors.newSingleThreadExecutor();可能会产生OOM的问题,后面可以自定义一个线程池,当任务满了之后,指定拒绝策略为抛出异常,再通过全局异常捕捉拿到对应的数据保存到数据库中,不过俺这种小项目应该不会产生这种问题
第四个问题,消息内容,这个需要前后端统一一下,确定一下传输格式就OK了,然后从JSON中取出数据处理
最后就是在线用户统计,这个没什么好说的,里面有对应的方法,当退出时,直接把channel踢出去就可以了
package org.example.payroll_management.websocket.netty.handler;
@Component
@ChannelHandler.Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Autowired
private ChannelContext channelContext;
@Autowired
private MessageMapper messageMapper;
@Autowired
private UserService userService;
private static final Logger logger = LoggerFactory.getLogger(WebSocketHandler.class);
private static final BlockingQueue<WebSocketMessageDto> blockingQueue = new ArrayBlockingQueue(1024 * 1024);
private static final ExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadExecutor();
// 提交线程
@PostConstruct
private void init(){
EXECUTOR_SERVICE.submit(new MessageHandler());
}
private class MessageHandler implements Runnable{
// 异步保存
@Override
public void run() {
while(true){
WebSocketMessageDto message = null;
try {
message = blockingQueue.take();
logger.info("消息持久化");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Integer success = messageMapper.saveMessage(message);
if (success < 1){
try {
throw new BaseException("保存信息失败");
} catch (BaseException e) {
throw new RuntimeException(e);
}
}
}
}
}
// 当读事件发生时(有客户端发送消息)
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
Channel channel = channelHandlerContext.channel();
// 收到的消息
String text = textWebSocketFrame.text();
Attribute<Integer> attr = channelHandlerContext.channel().attr(AttributeKey.valueOf(channelHandlerContext.channel().id().toString()));
Integer userId = attr.get();
logger.info("接收到用户ID为 {} 的消息: {}",userId,text);
// TODO 将text转成JSON,提取里面的数据
WebSocketMessageDto webSocketMessage = JSONUtil.toBean(text, WebSocketMessageDto.class);
if (webSocketMessage.getType().equals("心跳检测")){
logger.info("{}发送心跳检测",userId);
}
else if (webSocketMessage.getType().equals("群发")){
ChannelGroup channelGroup = ChannelContext.getChannelGroup(null);
WebSocketMessageDto messageDto = JSONUtil.toBean(text, WebSocketMessageDto.class);
WebSocketMessageDto webSocketMessageDto = new WebSocketMessageDto();
webSocketMessageDto.setType("群发");
webSocketMessageDto.setText(messageDto.getText());
webSocketMessageDto.setReceiver("all");
webSocketMessageDto.setSender(String.valueOf(userId));
webSocketMessageDto.setSendDate(TimeUtil.timeFormat("yyyy-MM-dd"));
blockingQueue.add(webSocketMessageDto);
channelGroup.writeAndFlush(new TextWebSocketFrame(JSONUtil.toJsonPrettyStr(webSocketMessageDto)));
}
else{
channel.writeAndFlush("请发送正确的格式");
}
}
// 建立连接后触发(有客户端建立连接请求)
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("建立连接");
super.channelActive(ctx);
}
// 连接断开后触发(有客户端关闭连接请求)
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Attribute<Integer> attr = ctx.channel().attr(AttributeKey.valueOf(ctx.channel().id().toString()));
Integer userId = attr.get();
logger.info("用户ID:{} 断开连接",userId);
ChannelGroup channelGroup = ChannelContext.getChannelGroup(null);
channelGroup.remove(ctx.channel());
ChannelContext.removeChannel(userId);
WebSocketMessageDto webSocketMessageDto = new WebSocketMessageDto();
webSocketMessageDto.setType("用户变更");
List<OnLineUserVo> onlineUser = userService.getOnlineUser();
webSocketMessageDto.setText(JSONUtil.toJsonStr(onlineUser));
webSocketMessageDto.setReceiver("all");
webSocketMessageDto.setSender("0");
webSocketMessageDto.setSendDate(TimeUtil.timeFormat("yyyy-MM-dd"));
channelGroup.writeAndFlush(new TextWebSocketFrame(JSONUtil.toJsonStr(webSocketMessageDto)));
super.channelInactive(ctx);
}
// 建立连接后触发(客户端完成连接)
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete){
WebSocketServerProtocolHandler.HandshakeComplete handshakeComplete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;
String uri = handshakeComplete.requestUri();
logger.info("uri: {}",uri);
String token = getToken(uri);
if (token == null){
logger.warn("Token校验失败");
ctx.close();
throw new BaseException("Token校验失败");
}
logger.info("token: {}",token);
Integer userId = null;
try{
Claims claims = JwtUtil.extractClaims(token);
userId = Integer.valueOf((String) claims.get("userId"));
}catch (Exception e){
logger.warn("Token校验失败");
ctx.close();
throw new BaseException("Token校验失败");
}
// 向channel中的附件中添加用户ID
channelContext.addContext(userId,ctx.channel());
ChannelContext.setChannel(userId,ctx.channel());
ChannelContext.setChannelGroup(null,ctx.channel());
ChannelGroup channelGroup = ChannelContext.getChannelGroup(null);
WebSocketMessageDto webSocketMessageDto = new WebSocketMessageDto();
webSocketMessageDto.setType("用户变更");
List<OnLineUserVo> onlineUser = userService.getOnlineUser();
webSocketMessageDto.setText(JSONUtil.toJsonStr(onlineUser));
webSocketMessageDto.setReceiver("all");
webSocketMessageDto.setSender("0");
webSocketMessageDto.setSendDate(TimeUtil.timeFormat("yyyy-MM-dd"));
channelGroup.writeAndFlush(new TextWebSocketFrame(JSONUtil.toJsonStr(webSocketMessageDto)));
}
super.userEventTriggered(ctx, evt);
}
private String getToken(String uri){
if (uri.isEmpty()){
return null;
}
if(!uri.contains("token")){
return null;
}
String[] split = uri.split("\\?");
if (split.length!=2){
return null;
}
String[] split1 = split[1].split("=");
if (split1.length!=2){
return null;
}
return split1[1];
}
}
4. 工具类
主要用来保存用户信息的
不要问我为什么又有static又有普通方法,问就是懒得改,这里我直接保存的同一个群组,如果需要多群组的话,就需要建立SQL数据了
package org.example.payroll_management.websocket;
@Component
public class ChannelContext {
private static final Map<Integer, Channel> USER_CHANNEL_MAP = new ConcurrentHashMap<>();
private static final Map<Integer, ChannelGroup> USER_CHANNELGROUP_MAP = new ConcurrentHashMap<>();
private static final Integer GROUP_ID = 10086;
private static final Logger logger = LoggerFactory.getLogger(ChannelContext.class);
public void addContext(Integer userId,Channel channel){
String channelId = channel.id().toString();
AttributeKey attributeKey = null;
if (AttributeKey.exists(channelId)){
attributeKey = AttributeKey.valueOf(channelId);
} else{
attributeKey = AttributeKey.newInstance(channelId);
}
channel.attr(attributeKey).set(userId);
}
public static List<Integer> getAllUserId(){
return new ArrayList<>(USER_CHANNEL_MAP.keySet());
}
public static void setChannel(Integer userId,Channel channel){
USER_CHANNEL_MAP.put(userId,channel);
}
public static Channel getChannel(Integer userId){
return USER_CHANNEL_MAP.get(userId);
}
public static void removeChannel(Integer userId){
USER_CHANNEL_MAP.remove(userId);
}
public static void setChannelGroup(Integer groupId,Channel channel){
if(groupId == null){
groupId = GROUP_ID;
}
ChannelGroup channelGroup = USER_CHANNELGROUP_MAP.get(groupId);
if (channelGroup == null){
channelGroup =new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
USER_CHANNELGROUP_MAP.put(GROUP_ID, channelGroup);
}
if (channel == null){
return ;
}
channelGroup.add(channel);
logger.info("向group中添加channel,ChannelGroup已有Channel数量:{}",channelGroup.size());
}
public static ChannelGroup getChannelGroup(Integer groupId){
if (groupId == null){
groupId = GROUP_ID;
}
return USER_CHANNELGROUP_MAP.get(groupId);
}
public static void removeChannelGroup(Integer groupId){
if (groupId == null){
groupId = GROUP_ID;
}
USER_CHANNELGROUP_MAP.remove(groupId);
}
}
写到这里,Netty服务就搭建完成了,后面就可以等着前端的请求建立了
前端
前端我使用的vue,因为我希望当用户登录后自动建立ws连接,所以我在登录成功后添加上了ws建立请求,然后我发现,如果用户关闭网页后重新打开,因为跳过了登录界面,ws请求不会自动建立,所以需要一套全局的ws请求
不过我前端不是很好(其实后端也一般),所以很多地方肯定有更优的写法
1. pinia
使用pinia保存ws请求,方便在其他组件中调用
定义WebSocket实例(ws)和一个请求建立判断(wsConnect)
后面就可以通过ws接收服务的消息
import { defineStore } from 'pinia'
export const useWebSocketStore = defineStore('webSocket', {
state() {
return {
ws: null,
wsConnect: false,
}
},
actions: {
wsInit() {
if (this.ws === null) {
const token = localStorage.getItem("token")
if (token === null) return;
this.ws = new WebSocket(`ws://localhost:8081/ws?token=${token}`)
this.ws.onopen = () => {
this.wsConnect = true;
console.log("ws协议建立成功")
// 发送心跳
const intervalId = setInterval(() => {
if (!this.wsConnect) {
clearInterval(intervalId)
}
const webSocketMessageDto = {
type: "心跳检测"
}
this.sendMessage(JSON.stringify(webSocketMessageDto));
}, 1000 * 3 * 60);
}
this.ws.onclose = () => {
this.ws = null;
this.wsConnect = false;
}
}
},
sendMessage(message) {
if (message == null || message == '') {
return;
}
if (!this.wsConnect) {
console.log("ws协议没有建立")
this.wsInit();
}
this.ws.send(message);
},
wsClose() {
if (this.wsConnect) {
this.ws.close();
this.wsConnect = false;
}
}
}
})
然后再app.vue中循环建立连接(建立请求重试)
const wsConnect = function () {
const token = localStorage.getItem("token")
if (token === null) {
return;
}
try {
if (!webSocket.wsConnect) {
console.log("尝试建立ws请求")
webSocket.wsInit();
} else {
return;
}
} catch {
wsConnect();
}
}
2. 聊天组件
界面相信大伙都会画,主要说一下我遇到的问题
第一个 上拉刷新,也就是加载历史记录的功能,我用的element-plus UI,也不知道是不是我的问题,UI里面的无限滚动不是重复发送请求就是无限发送请求,而且好像没有上拉加载的功能。于是我用了IntersectionObserver来解决,在页面底部加上一个div,当观察到这个div时,触发请求
第二个 滚动条到达顶部时,请求数据并放置数据,滚动条会自动滚动到顶部,并且由于观察的元素始终在顶端导致无限请求,这个其实也不是什么大问题,因为聊天的消息是有限的,没有数据之后我设置了停止观察,主要是用户体验不是很好。这是我是添加了display: flex; flex-direction: column-reverse;解决这个问题的(flex很神奇吧)。大致原理好像是垂直翻转了(例如上面我将观察元素放到div第一个子元素位置,添加flex后观察元素会到最后一个子元素位置上),也就是说当滚动条在最底部时,添加数据后,滚动条会自动滚动到最底部,不过这样体验感非常的不错
不要问我为什么数据要加 || 问就是数据懒得统一了
<style lang="scss" scoped>
.chatBox {
border-radius: 20px;
box-shadow: rgba(0, 0, 0, 0.05) -2px 0px 8px 0px;
width: 1200px;
height: 600px;
background-color: white;
display: flex;
.chat {
width: 1000px;
height: inherit;
.chatBackground {
height: 500px;
overflow: auto;
display: flex;
flex-direction: column-reverse;
.loading {
text-align: center;
font-size: 12px;
margin-top: 20px;
color: gray;
}
.chatItem {
width: 100%;
padding-bottom: 20px;
.avatar {
margin-left: 20px;
display: flex;
align-items: center;
.username {
margin-left: 10px;
color: rgb(153, 153, 153);
font-size: 13px;
}
}
.chatItemMessage {
margin-left: 60px;
padding: 10px;
font-size: 14px;
width: 200px;
word-break: break-all;
max-width: 400px;
line-height: 25px;
width: fit-content;
border-radius: 10px;
height: auto;
/* background-color: skyblue; */
box-shadow: rgba(0, 0, 0, 0.05) -2px 0px 8px 0px;
}
.sendDate {
font-size: 12px;
margin-top: 10px;
margin-left: 60px;
color: rgb(187, 187, 187);
}
}
}
.chatBottom {
height: 100px;
background-color: #F3F3F3;
border-radius: 20px;
display: flex;
box-shadow: rgba(0, 0, 0, 0.05) -2px 0px 8px 0px;
.messageInput {
border-radius: 20px;
width: 400px;
height: 40px;
}
}
}
.userList {
width: 200px;
height: inherit;
border-radius: 20px;
box-shadow: rgba(0, 0, 0, 0.05) -2px 0px 8px 0px;
.user {
width: inherit;
height: 50px;
line-height: 50px;
text-indent: 2em;
border-radius: 20px;
transition: all 0.5s ease;
}
}
}
.user:hover {
box-shadow: rgba(0, 0, 0, 0.05) -2px 0px 8px 0px;
transform: translateX(-5px) translateY(-5px);
}
</style>
<template>
{{hasMessage}}
<div class="chatBox">
<div class="chat">
<div class="chatBackground" ref="chatBackgroundRef">
<div class="chatItem" v-for="i in messageList">
<div class="avatar">
<el-avatar :size="40" :src="imageUrl" />
<div class="username">{{i.username || i.userId}}</div>
</div>
<div class="chatItemMessage">
{{i.text || i.content}}
</div>
<div class="sendDate">
{{i.date || i.sendDate}}
</div>
</div>
<div class="loading" ref="loading">
显示更多内容
</div>
</div>
<div class="chatBottom">
<el-input class="messageInput" v-model="message" placeholder="消息内容"></el-input>
<el-button @click="sendMessage">发送消息</el-button>
</div>
</div>
<!-- 做成无限滚动 -->
<div class="userList">
<div v-for="user in userList">
<div class="user">
{{user.userName}}
</div>
</div>
</div>
</div>
</template>
<script setup>
import { ref, onMounted, nextTick } from 'vue'
import request from '@/utils/request.js'
import { useWebSocketStore } from '@/stores/useWebSocketStore'
import imageUrl from '@/assets/默认头像.jpg'
const webSocketStore = useWebSocketStore();
const chatBackgroundRef = ref(null)
const userList = ref([])
const message = ref('')
const messageList = ref([
])
const loading = ref(null)
const page = ref(1);
const size = 10;
const hasMessage = ref(true);
const observer = new IntersectionObserver((entries, observer) => {
entries.forEach(async entry => {
if (entry.isIntersecting) {
observer.unobserve(entry.target)
await pageQueryMessage();
}
})
})
onMounted(() => {
observer.observe(loading.value)
getOnlineUserList();
if (!webSocketStore.wsConnect) {
webSocketStore.wsInit();
}
const ws = webSocketStore.ws;
ws.onmessage = async (e) => {
// console.log(e);
const webSocketMessage = JSON.parse(e.data);
const messageObj = {
username: webSocketMessage.sender,
text: webSocketMessage.text,
date: webSocketMessage.sendDate,
type: webSocketMessage.type
}
console.log("###")
// console.log(JSON.parse(messageObj.text))
if (messageObj.type === "群发") {
messageList.value.unshift(messageObj)
} else if (messageObj.type === "用户变更") {
userList.value = JSON.parse(messageObj.text)
}
await nextTick();
// 当发送新消息时,自动滚动到页面最底部,可以替换成消息提示的样式
// chatBackgroundRef.value.scrollTop = chatBackgroundRef.value.scrollHeight;
console.log(webSocketMessage)
}
})
const pageQueryMessage = function () {
request({
url: '/api/message/pageQueryMessage',
method: 'post',
data: {
page: page.value,
size: size
}
}).then((res) => {
console.log(res)
if (res.data.data.length === 0) {
hasMessage.value = false;
}
else {
observer.observe(loading.value)
page.value = page.value + 1;
messageList.value.push(...res.data.data)
}
})
}
function getOnlineUserList() {
request({
url: '/api/user/getOnlineUser',
method: 'get'
}).then((res) => {
console.log(res)
userList.value = res.data.data;
})
}
const sendMessage = function () {
if (!webSocketStore.wsConnect) {
webSocketStore.wsInit();
}
const webSocketMessageDto = {
type: "群发",
text: message.value
}
webSocketStore.sendMessage(JSON.stringify(webSocketMessageDto));
}
</script>
这样就实现了一个简易的聊天数据持久化,支持在线聊天的界面,总的来说WebSocket用起来还是十分方便的
后面我看看能不能做下上传图片,上传文件之类的功能