SpringBoot + Netty + Vue + WebSocket实现在线聊天

发布于:2025-04-05 ⋅ 阅读:(14) ⋅ 点赞:(0)

最近想学学WebSocket做一个实时通讯的练手项目

主要用到的技术栈是WebSocket Netty Vue Pinia MySQL SpringBoot,实现一个持久化数据,单一群聊,支持多用户的聊天界面

下面是实现的过程

后端

SpringBoot启动的时候会占用一个端口,而Netty也会占用一个端口,这两个端口不能重复,并且因为Netty启动后会阻塞当前线程,因此需要另开一个线程防止阻塞住SpringBoot

1. 编写Netty服务器

个人认为,Netty最关键的就是channel,可以代表一个客户端

我在这使用的是@PostConstruct注解,在Bean初始化后调用里面的方法,新开一个线程运行Netty,因为希望Netty受Spring管理,所以加上了spring的注解,也可以直接在启动类里注入Netty然后手动启动

@Service
public class NettyService {
    private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    private EventLoopGroup workGroup = new NioEventLoopGroup();
    @Autowired
    private WebSocketHandler webSocketHandler;
    @Autowired
    private HeartBeatHandler heartBeatHandler;
    @PostConstruct
    public void initNetty() throws BaseException {
        new Thread(()->{
            try {
                start();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }).start();
    }
    @PreDestroy
    public void destroy() throws BaseException {
        bossGroup.shutdownGracefully();
        workGroup.shutdownGracefully();
    }
    @Async
    public void start() throws BaseException {
        try {
            ChannelFuture channelFuture = new ServerBootstrap()
                    .group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.DEBUG))
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                            nioSocketChannel.pipeline()
// http解码编码器
                                    .addLast(new HttpServerCodec())
// 处理完整的 HTTP 消息
                    .addLast(new HttpObjectAggregator(64 * 1024))
// 心跳检测时长
                                    .addLast(new IdleStateHandler(300, 0, 0, TimeUnit.SECONDS))
// 心跳检测处理器
                                    .addLast(heartBeatHandler)
// 支持ws协议(自定义)
                                    .addLast(new WebSocketServerProtocolHandler("/ws",null,true,64*1024,true,true,10000))
// ws请求处理器(自定义)
                                    .addLast(webSocketHandler)
                            ;
                        }
                    }).bind(8081).sync();
            System.out.println("Netty启动成功");
            ChannelFuture future = channelFuture.channel().closeFuture().sync();
        }
        catch (InterruptedException e){
            throw new InterruptedException ();
        }
        finally {
//优雅关闭
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }

    }
}

服务器类只是指明一些基本信息,包含处理器类,支持的协议等等,具体的处理逻辑需要再自定义类来实现

2. 心跳检测处理器

心跳检测是指 服务器无法主动确定客户端的状态(用户可能关闭了网页,但是服务端没办法知道),为了确定客户端是否在线,需要客户端定时发送一条消息,消息内容不重要,重要的是发送消息代表该客户端仍然在线,当客户端长时间没有发送数据时,代表客户端已经下线

package org.example.payroll_management.websocket.netty.handler;

@Component
@ChannelHandler.Sharable
public class HeartBeatHandler extends ChannelDuplexHandler {

    @Autowired
    private ChannelContext channelContext;
    private static final Logger logger =  LoggerFactory.getLogger(HeartBeatHandler.class);

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent){
            // 心跳检测超时
            IdleStateEvent e = (IdleStateEvent) evt;
            logger.info("心跳检测超时");
            if (e.state() == IdleState.READER_IDLE){
                Attribute<Integer> attr = ctx.channel().attr(AttributeKey.valueOf(ctx.channel().id().toString()));
                Integer userId = attr.get();
                // 读超时,当前已经下线,主动断开连接
                ChannelContext.removeChannel(userId);
                ctx.close();
            } else if (e.state() == IdleState.WRITER_IDLE){
                ctx.writeAndFlush("心跳检测");
            }
        }
        super.userEventTriggered(ctx, evt);
    }
}

3. webSocket处理器

当客户端发送消息,消息的内容会发送当webSocket处理器中,可以对对应的方法进行处理,我这里偷懒了,就做了一个群组,全部用户只能在同一群中聊天,不过创建多个群组,或单对单聊天也不复杂,只需要将群组的ID进行保存就可以

这里就产生第一个问题了,就是SpringMVC的拦截器不会拦截其他端口的请求,解决方法是将token放置到请求参数中,在userEventTriggered方法中重新进行一次token检验

第二个问题,我是在拦截器中通过ThreadLocal保存用户ID,不走拦截器在其他地方拿不到用户ID,解决方法是,在userEventTriggered方法中重新保存,或者channel中可以保存附件(自身携带的数据),直接将id保存到附件中

第三个问题,消息的持久化,当用户重新打开界面时,肯定希望消息仍然存在,鉴于webSocket的实时性,数据持久化肯定不能在同一个线程中完成,我在这使用BlockingQueue+线程池完成对消息的异步保存,或者也可以用mq实现

不过用的Executors.newSingleThreadExecutor();可能会产生OOM的问题,后面可以自定义一个线程池,当任务满了之后,指定拒绝策略为抛出异常,再通过全局异常捕捉拿到对应的数据保存到数据库中,不过俺这种小项目应该不会产生这种问题

第四个问题,消息内容,这个需要前后端统一一下,确定一下传输格式就OK了,然后从JSON中取出数据处理

最后就是在线用户统计,这个没什么好说的,里面有对应的方法,当退出时,直接把channel踢出去就可以了

package org.example.payroll_management.websocket.netty.handler;

@Component
@ChannelHandler.Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    @Autowired
    private ChannelContext channelContext;
    @Autowired
    private MessageMapper messageMapper;
    @Autowired
    private UserService userService;
    private static final Logger logger = LoggerFactory.getLogger(WebSocketHandler.class);

    private static final BlockingQueue<WebSocketMessageDto> blockingQueue = new ArrayBlockingQueue(1024 * 1024);
    private static final ExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadExecutor();
    // 提交线程
    @PostConstruct
    private void init(){
        EXECUTOR_SERVICE.submit(new MessageHandler());
    }
    private class MessageHandler implements Runnable{
        // 异步保存
        @Override
        public void run() {
            while(true){
                WebSocketMessageDto message = null;
                try {
                    message = blockingQueue.take();
                    logger.info("消息持久化");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                Integer success = messageMapper.saveMessage(message);
                if (success < 1){
                    try {
                        throw new BaseException("保存信息失败");
                    } catch (BaseException e) {
                        throw new RuntimeException(e);
                    }
                }
            }
        }

    }
    // 当读事件发生时(有客户端发送消息)
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
        Channel channel = channelHandlerContext.channel();
        // 收到的消息
        String text = textWebSocketFrame.text();
        Attribute<Integer> attr = channelHandlerContext.channel().attr(AttributeKey.valueOf(channelHandlerContext.channel().id().toString()));
        Integer userId = attr.get();
        logger.info("接收到用户ID为 {} 的消息: {}",userId,text);
        // TODO  将text转成JSON,提取里面的数据
        WebSocketMessageDto webSocketMessage = JSONUtil.toBean(text, WebSocketMessageDto.class);
        if (webSocketMessage.getType().equals("心跳检测")){
            logger.info("{}发送心跳检测",userId);
        }

        else if (webSocketMessage.getType().equals("群发")){

            ChannelGroup channelGroup = ChannelContext.getChannelGroup(null);
            WebSocketMessageDto messageDto = JSONUtil.toBean(text, WebSocketMessageDto.class);

            WebSocketMessageDto webSocketMessageDto = new WebSocketMessageDto();
            webSocketMessageDto.setType("群发");
            webSocketMessageDto.setText(messageDto.getText());
            webSocketMessageDto.setReceiver("all");
            webSocketMessageDto.setSender(String.valueOf(userId));
            webSocketMessageDto.setSendDate(TimeUtil.timeFormat("yyyy-MM-dd"));

            blockingQueue.add(webSocketMessageDto);
            channelGroup.writeAndFlush(new TextWebSocketFrame(JSONUtil.toJsonPrettyStr(webSocketMessageDto)));
        }
        else{
            channel.writeAndFlush("请发送正确的格式");
        }
    }

    // 建立连接后触发(有客户端建立连接请求)
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        logger.info("建立连接");
        super.channelActive(ctx);
    }

    // 连接断开后触发(有客户端关闭连接请求)
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {

        Attribute<Integer> attr = ctx.channel().attr(AttributeKey.valueOf(ctx.channel().id().toString()));
        Integer userId = attr.get();
        logger.info("用户ID:{} 断开连接",userId);

        ChannelGroup channelGroup = ChannelContext.getChannelGroup(null);
        channelGroup.remove(ctx.channel());
        ChannelContext.removeChannel(userId);

        WebSocketMessageDto webSocketMessageDto = new WebSocketMessageDto();
        webSocketMessageDto.setType("用户变更");
        List<OnLineUserVo> onlineUser = userService.getOnlineUser();
        webSocketMessageDto.setText(JSONUtil.toJsonStr(onlineUser));
        webSocketMessageDto.setReceiver("all");
        webSocketMessageDto.setSender("0");
        webSocketMessageDto.setSendDate(TimeUtil.timeFormat("yyyy-MM-dd"));
        channelGroup.writeAndFlush(new TextWebSocketFrame(JSONUtil.toJsonStr(webSocketMessageDto)));
        super.channelInactive(ctx);
    }

// 建立连接后触发(客户端完成连接)
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete){
            WebSocketServerProtocolHandler.HandshakeComplete handshakeComplete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;
            String uri = handshakeComplete.requestUri();
            logger.info("uri: {}",uri);
            String token = getToken(uri);
            if (token == null){
                logger.warn("Token校验失败");
                ctx.close();
                throw new BaseException("Token校验失败");
            }
            logger.info("token: {}",token);

            Integer userId = null;
            try{
                Claims claims = JwtUtil.extractClaims(token);
                userId = Integer.valueOf((String) claims.get("userId"));
            }catch (Exception e){
                logger.warn("Token校验失败");
                ctx.close();
                throw new BaseException("Token校验失败");
            }
            // 向channel中的附件中添加用户ID
            channelContext.addContext(userId,ctx.channel());
            ChannelContext.setChannel(userId,ctx.channel());
            ChannelContext.setChannelGroup(null,ctx.channel());

            ChannelGroup channelGroup = ChannelContext.getChannelGroup(null);

            WebSocketMessageDto webSocketMessageDto = new WebSocketMessageDto();
            webSocketMessageDto.setType("用户变更");
            List<OnLineUserVo> onlineUser = userService.getOnlineUser();
            webSocketMessageDto.setText(JSONUtil.toJsonStr(onlineUser));
            webSocketMessageDto.setReceiver("all");
            webSocketMessageDto.setSender("0");
            webSocketMessageDto.setSendDate(TimeUtil.timeFormat("yyyy-MM-dd"));
            channelGroup.writeAndFlush(new TextWebSocketFrame(JSONUtil.toJsonStr(webSocketMessageDto)));

        }
        super.userEventTriggered(ctx, evt);
    }
    private String getToken(String uri){
        if (uri.isEmpty()){
            return null;
        }
        if(!uri.contains("token")){
            return null;
        }
        String[] split = uri.split("\\?");
        if (split.length!=2){
            return null;
        }
        String[] split1 = split[1].split("=");
        if (split1.length!=2){
            return null;
        }
        return split1[1];
    }
}

4. 工具类

主要用来保存用户信息的

不要问我为什么又有static又有普通方法,问就是懒得改,这里我直接保存的同一个群组,如果需要多群组的话,就需要建立SQL数据了

package org.example.payroll_management.websocket;

@Component
public class ChannelContext {

    private static final Map<Integer, Channel> USER_CHANNEL_MAP = new ConcurrentHashMap<>();
    private static final Map<Integer, ChannelGroup> USER_CHANNELGROUP_MAP = new ConcurrentHashMap<>();
    private static final Integer GROUP_ID = 10086;

    private static final Logger logger = LoggerFactory.getLogger(ChannelContext.class);

    public void addContext(Integer userId,Channel channel){
        String channelId = channel.id().toString();
        AttributeKey attributeKey = null;
        if (AttributeKey.exists(channelId)){
            attributeKey = AttributeKey.valueOf(channelId);
        } else{
            attributeKey = AttributeKey.newInstance(channelId);
        }
        channel.attr(attributeKey).set(userId);
    }
    public static List<Integer> getAllUserId(){
        return new ArrayList<>(USER_CHANNEL_MAP.keySet());
    }
    public static void setChannel(Integer userId,Channel channel){
        USER_CHANNEL_MAP.put(userId,channel);
    }

    public static Channel getChannel(Integer userId){
        return USER_CHANNEL_MAP.get(userId);
    }
    public static void removeChannel(Integer userId){
        USER_CHANNEL_MAP.remove(userId);
    }

    public static void setChannelGroup(Integer groupId,Channel channel){
        if(groupId == null){
            groupId = GROUP_ID;
        }
        ChannelGroup channelGroup = USER_CHANNELGROUP_MAP.get(groupId);
        if (channelGroup == null){
            channelGroup =new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
            USER_CHANNELGROUP_MAP.put(GROUP_ID, channelGroup);
        }
        if (channel == null){
            return ;
        }
        channelGroup.add(channel);
        logger.info("向group中添加channel,ChannelGroup已有Channel数量:{}",channelGroup.size());

    }

    public static ChannelGroup getChannelGroup(Integer groupId){
        if (groupId == null){
            groupId = GROUP_ID;
        }
        return USER_CHANNELGROUP_MAP.get(groupId);
    }
    public static void removeChannelGroup(Integer groupId){
        if (groupId == null){
            groupId = GROUP_ID;
        }
         USER_CHANNELGROUP_MAP.remove(groupId);
    }
}

写到这里,Netty服务就搭建完成了,后面就可以等着前端的请求建立了

前端

前端我使用的vue,因为我希望当用户登录后自动建立ws连接,所以我在登录成功后添加上了ws建立请求,然后我发现,如果用户关闭网页后重新打开,因为跳过了登录界面,ws请求不会自动建立,所以需要一套全局的ws请求

不过我前端不是很好(其实后端也一般),所以很多地方肯定有更优的写法

1. pinia

使用pinia保存ws请求,方便在其他组件中调用

定义WebSocket实例(ws)和一个请求建立判断(wsConnect)

后面就可以通过ws接收服务的消息

import { defineStore } from 'pinia'

export const useWebSocketStore = defineStore('webSocket', {

    state() {
        return {
            ws: null,
            wsConnect: false,
        }
    },
    actions: {
        wsInit() {
            if (this.ws === null) {
                const token = localStorage.getItem("token")
                if (token === null)  return;
                this.ws = new WebSocket(`ws://localhost:8081/ws?token=${token}`)
                  
                this.ws.onopen = () => {
                    this.wsConnect = true;
                    console.log("ws协议建立成功")
                    // 发送心跳
                    const intervalId = setInterval(() => {
                        if (!this.wsConnect) {
                            clearInterval(intervalId)
                        }
                        const webSocketMessageDto = {
                            type: "心跳检测"
                        }
                        this.sendMessage(JSON.stringify(webSocketMessageDto));
                    }, 1000 * 3 * 60);
                }
                this.ws.onclose = () => {
                    this.ws = null;
                    this.wsConnect = false;
                }
            }
        },
        sendMessage(message) {
            if (message == null || message == '') {
                return;
            }
            if (!this.wsConnect) {
                console.log("ws协议没有建立")
                this.wsInit();
            }
            this.ws.send(message);
        },
        wsClose() {
            if (this.wsConnect) {
                this.ws.close();
                this.wsConnect = false;
            }
        }
    }
})

然后再app.vue中循环建立连接(建立请求重试)

 const wsConnect = function () {
        const token = localStorage.getItem("token")
        if (token === null) {
            return;
        }
        try {
            if (!webSocket.wsConnect) {
                console.log("尝试建立ws请求")
                webSocket.wsInit();
            } else {
                return;
            }
        } catch {
            wsConnect();
        }
    }

2. 聊天组件

界面相信大伙都会画,主要说一下我遇到的问题

第一个 上拉刷新,也就是加载历史记录的功能,我用的element-plus UI,也不知道是不是我的问题,UI里面的无限滚动不是重复发送请求就是无限发送请求,而且好像没有上拉加载的功能。于是我用了IntersectionObserver来解决,在页面底部加上一个div,当观察到这个div时,触发请求

第二个 滚动条到达顶部时,请求数据并放置数据,滚动条会自动滚动到顶部,并且由于观察的元素始终在顶端导致无限请求,这个其实也不是什么大问题,因为聊天的消息是有限的,没有数据之后我设置了停止观察,主要是用户体验不是很好。这是我是添加了display: flex; flex-direction: column-reverse;解决这个问题的(flex很神奇吧)。大致原理好像是垂直翻转了(例如上面我将观察元素放到div第一个子元素位置,添加flex后观察元素会到最后一个子元素位置上),也就是说当滚动条在最底部时,添加数据后,滚动条会自动滚动到最底部,不过这样体验感非常的不错

不要问我为什么数据要加 || 问就是数据懒得统一了

<style lang="scss" scoped>
    .chatBox {
        border-radius: 20px;
        box-shadow: rgba(0, 0, 0, 0.05) -2px 0px 8px 0px;
        width: 1200px;
        height: 600px;
        background-color: white;
        display: flex;

        .chat {
            width: 1000px;
            height: inherit;

            .chatBackground {
                height: 500px;
                overflow: auto;
                display: flex;
                flex-direction: column-reverse;

                .loading {
                    text-align: center;
                    font-size: 12px;
                    margin-top: 20px;
                    color: gray;

                }

                .chatItem {
                    width: 100%;
                    padding-bottom: 20px;

                    .avatar {
                        margin-left: 20px;
                        display: flex;
                        align-items: center;

                        .username {
                            margin-left: 10px;
                            color: rgb(153, 153, 153);
                            font-size: 13px;
                        }
                    }

                    .chatItemMessage {
                        margin-left: 60px;
                        padding: 10px;
                        font-size: 14px;
                        width: 200px;
                        word-break: break-all;
                        max-width: 400px;
                        line-height: 25px;
                        width: fit-content;
                        border-radius: 10px;
                        height: auto;
                        /* background-color: skyblue; */
                        box-shadow: rgba(0, 0, 0, 0.05) -2px 0px 8px 0px;
                    }

                    .sendDate {
                        font-size: 12px;
                        margin-top: 10px;
                        margin-left: 60px;
                        color: rgb(187, 187, 187);
                    }
                }
            }

            .chatBottom {
                height: 100px;
                background-color: #F3F3F3;
                border-radius: 20px;
                display: flex;
                box-shadow: rgba(0, 0, 0, 0.05) -2px 0px 8px 0px;

                .messageInput {
                    border-radius: 20px;
                    width: 400px;
                    height: 40px;
                }

            }

        }

        .userList {
            width: 200px;
            height: inherit;
            border-radius: 20px;
            box-shadow: rgba(0, 0, 0, 0.05) -2px 0px 8px 0px;

            .user {
                width: inherit;
                height: 50px;
                line-height: 50px;
                text-indent: 2em;
                border-radius: 20px;
                transition: all 0.5s ease;
            }
        }
    }

    .user:hover {
        box-shadow: rgba(0, 0, 0, 0.05) -2px 0px 8px 0px;
        transform: translateX(-5px) translateY(-5px);
    }
</style>

<template>
    {{hasMessage}}
    <div class="chatBox">
        <div class="chat">
            <div class="chatBackground" ref="chatBackgroundRef">

                <div class="chatItem" v-for="i in messageList">
                    <div class="avatar">
                        <el-avatar :size="40" :src="imageUrl" />
                        <div class="username">{{i.username || i.userId}}</div>
                    </div>
                    <div class="chatItemMessage">
                        {{i.text || i.content}}
                    </div>
                    <div class="sendDate">
                        {{i.date || i.sendDate}}
                    </div>

                </div>
                <div class="loading" ref="loading">
                    显示更多内容
                </div>
            </div>
            <div class="chatBottom">
                <el-input class="messageInput" v-model="message" placeholder="消息内容"></el-input>
                <el-button @click="sendMessage">发送消息</el-button>
            </div>
        </div>
        <!-- 做成无限滚动 -->
        <div class="userList">
            <div v-for="user in userList">
                <div class="user">
                    {{user.userName}}
                </div>
            </div>
        </div>
    </div>
</template>

<script setup>
    import { ref, onMounted, nextTick } from 'vue'
    import request from '@/utils/request.js'
    import { useWebSocketStore } from '@/stores/useWebSocketStore'
    import imageUrl from '@/assets/默认头像.jpg'


    const webSocketStore = useWebSocketStore();
    const chatBackgroundRef = ref(null)

    const userList = ref([])
    const message = ref('')
    const messageList = ref([
    ])
    const loading = ref(null)
    const page = ref(1);
    const size = 10;
    const hasMessage = ref(true);

    const observer = new IntersectionObserver((entries, observer) => {
        entries.forEach(async entry => {
            if (entry.isIntersecting) {
                observer.unobserve(entry.target)
                await pageQueryMessage();

            }
        })
    })

    onMounted(() => {
        observer.observe(loading.value)
        getOnlineUserList();
        if (!webSocketStore.wsConnect) {
            webSocketStore.wsInit();
        }
        const ws = webSocketStore.ws;
        ws.onmessage = async (e) => {
            // console.log(e);
            const webSocketMessage = JSON.parse(e.data);
            const messageObj = {
                username: webSocketMessage.sender,
                text: webSocketMessage.text,
                date: webSocketMessage.sendDate,
                type: webSocketMessage.type
            }
            console.log("###")
            // console.log(JSON.parse(messageObj.text))
            if (messageObj.type === "群发") {
                messageList.value.unshift(messageObj)
            } else if (messageObj.type === "用户变更") {
                userList.value = JSON.parse(messageObj.text)
            }
            await nextTick();
            // 当发送新消息时,自动滚动到页面最底部,可以替换成消息提示的样式
            // chatBackgroundRef.value.scrollTop = chatBackgroundRef.value.scrollHeight;
            console.log(webSocketMessage)
        }
    })
    const pageQueryMessage = function () {
        request({
            url: '/api/message/pageQueryMessage',
            method: 'post',
            data: {
                page: page.value,
                size: size
            }
        }).then((res) => {
            console.log(res)
            if (res.data.data.length === 0) {
                hasMessage.value = false;
            }
            else {
                observer.observe(loading.value)
                page.value = page.value + 1;
                messageList.value.push(...res.data.data)
            }
        })
    }
    function getOnlineUserList() {
        request({
            url: '/api/user/getOnlineUser',
            method: 'get'
        }).then((res) => {
            console.log(res)
            userList.value = res.data.data;
        })
    }

    const sendMessage = function () {
        if (!webSocketStore.wsConnect) {
            webSocketStore.wsInit();
        }
        const webSocketMessageDto = {
            type: "群发",
            text: message.value
        }

        webSocketStore.sendMessage(JSON.stringify(webSocketMessageDto));
    }

</script>

这样就实现了一个简易的聊天数据持久化,支持在线聊天的界面,总的来说WebSocket用起来还是十分方便的

后面我看看能不能做下上传图片,上传文件之类的功能