Spring Boot整合T-IO实现即时通讯

发布于:2025-04-18 ⋅ 阅读:(21) ⋅ 点赞:(0)

在当今的互联网应用中,即时通讯功能已经变得不可或缺。无论是社交应用、在线客服还是实时数据推送,都需要高效的通信框架来支持。TIO(Try-IO)是一个高性能的Java网络通信框架,支持TCP/UDP/HTTP/WebSocket等多种协议,非常适合用于即时通讯场景。本文将介绍如何在Spring Boot项目中整合TIO,实现一个简单的即时通讯功能。

1. TIO简介

TIO是一个基于Java的高性能网络通信框架,具有以下特点:
支持多种协议:TCP、UDP、HTTP、WebSocket等。
高性能:基于NIO实现,支持高并发。
易用性:提供了丰富的API和灵活的配置

2. 环境准备

在开始之前,请确保你已经安装了以下工具:
JDK 1.8及以上版本(本项目使用jdk 17版本)
Maven或Gradle
IntelliJ IDEA或Eclipse

4.添加依赖

在pom.xml文件中添加TIO的相关依赖

        <!-- t-io WebSocket依赖 -->
        <dependency>
            <groupId>org.t-io</groupId>
            <artifactId>tio-websocket-server</artifactId>
            <version>3.8.6.v20240801-RELEASE</version>
        </dependency>

        <!-- t-io 核心依赖 -->
        <dependency>
            <groupId>org.t-io</groupId>
            <artifactId>tio-core</artifactId>
            <version>3.8.6.v20240801-RELEASE</version>
        </dependency>

5、创建TIO消息处理器

创建一个实现IWsMsgHandler接口的类,用于处理WebSocket消息:

import com.alibaba.fastjson.JSON;
import com.ruoyi.im.config.TioWebSocketAdapterConfig;
import com.ruoyi.im.domain.ImMessage;
import com.ruoyi.im.model.ChatMessage;
import com.ruoyi.im.service.IImMessageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.tio.core.ChannelContext;
import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse;
import org.tio.websocket.common.WsRequest;
import org.tio.websocket.common.WsResponse;
import org.tio.websocket.server.handler.IWsMsgHandler;

import java.util.Date;
import java.util.UUID;

@Component
public class WebSocketMessageHandler implements IWsMsgHandler {
    
    private static final Logger log = LoggerFactory.getLogger(WebSocketMessageHandler.class);
    
    /**
     * 从ChannelContext中获取参数
     */
    private String getParam(ChannelContext channelContext, String paramName) {
        try {
            // 尝试从channelContext的httpConfig或其他属性中获取参数
            Object requestObj = channelContext.getAttribute("httpRequest");
            if (requestObj != null && requestObj instanceof HttpRequest) {
                HttpRequest httpRequest = (HttpRequest) requestObj;
                return httpRequest.getParam(paramName);
            }
            
            // 从user属性中获取用户ID
            if ("userId".equals(paramName)) {
                return (String) channelContext.getAttribute("userId");
            }
            
            return null;
        } catch (Exception e) {
            log.error("获取参数失败: paramName={}", paramName, e);
            return null;
        }
    }
    
    @Override
    public HttpResponse handshake(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) {
        // 从请求中获取用户ID、token和设备类型
        String userId = httpRequest.getParam("userId");
        String token = httpRequest.getParam("token");
        String deviceTypeStr = httpRequest.getParam("deviceType");
        
        // 验证参数
        if (userId == null || token == null || deviceTypeStr == null) {
            log.error("握手参数不完整: userId={}, token={}, deviceType={}", userId, token, deviceTypeStr);
            return httpResponse;
        }
        
        // 验证token(这里应该调用若依的token验证服务)
        // TODO: 实现token验证逻辑
        
        // 解析设备类型
        UserSessionManager.DeviceType deviceType;
        try {
            deviceType = UserSessionManager.DeviceType.valueOf(deviceTypeStr.toUpperCase());
        } catch (IllegalArgumentException e) {
            log.error("无效的设备类型: {}", deviceTypeStr);
            return httpResponse;
        }
        
        // 保存参数到ChannelContext
        channelContext.setAttribute("httpRequest", httpRequest);
        channelContext.setAttribute("userId", userId);
        
        // 添加用户会话
        userSessionManager.addUserSession(userId, token, deviceType, channelContext);
        
        return httpResponse;
    }
    
    @Override
    public void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) {
        String userId = httpRequest.getParam("userId");
        log.info("握手成功,userId: {}, channelId: {}", userId, channelContext.getId());
    }
    
    @Override
    public Object onBytes(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) {
        return null;
    }
    
    @Override
    public Object onClose(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) {
        // 从channelContext获取用户ID
        String userId = getParam(channelContext, "userId");
        // 移除用户会话
        if (userId != null) {
            userSessionManager.removeUserSession(userId);
        }
        log.info("连接关闭,userId: {}, channelId: {}", userId, channelContext.getId());
        return null;
    }
    
    @Override
    public Object onText(WsRequest wsRequest, String msg, ChannelContext channelContext) {
        // 从channelContext获取用户ID
        String userId = getParam(channelContext, "userId");
        log.info("收到消息: {}, userId: {}, channelId: {}", msg, userId, channelContext.getId());
        
        try {
            // 解析消息
            ChatMessage chatMessage = JSON.parseObject(msg, ChatMessage.class);
            
            // 设置消息ID和发送时间
            if (chatMessage.getMessageId() == null) {
                chatMessage.setMessageId(UUID.randomUUID().toString());
            }
            if (chatMessage.getSendTime() == null) {
                chatMessage.setSendTime(new Date());
            }
            
            // 设置发送者ID
            chatMessage.setFromUserId(userId);
            
            // 更新用户最后活动时间
            userSessionManager.updateUserLastActiveTime(userId);
            
            // 创建消息实体并持久化
            ImMessage imMessage = new ImMessage();
            imMessage.setMessageId(chatMessage.getMessageId());
            imMessage.setFromUserId(chatMessage.getFromUserId());
            imMessage.setToUserId(chatMessage.getToUserId());
            imMessage.setGroupId(chatMessage.getGroupId());
            imMessage.setContent(chatMessage.getContent());
            imMessage.setMessageType(chatMessage.getMessageType());
            imMessage.setStatus(0); // 0表示未读
            imMessage.setSendTime(chatMessage.getSendTime());
            imMessage.setCreateTime(chatMessage.getSendTime());
            imMessage.setUpdateTime(chatMessage.getSendTime());
            imMessage.setIsSent(1); // 1表示已发送
            imMessage.setRetryCount(0); // 初始重试次数为0
            
            // 保存消息到数据库
            imMessageService.insertImMessage(imMessage);
            
            // 处理消息
            if (chatMessage.getToUserId() != null) {
                // 私聊消息
                userSessionManager.sendMessageToUser(chatMessage.getToUserId(), JSON.toJSONString(chatMessage));
            } else if (chatMessage.getGroupId() != null) {
                // 群聊消息(需要实现群组管理)
                // TODO: 实现群聊消息处理
            } else {
                // 广播消息
                userSessionManager.broadcastMessage(JSON.toJSONString(chatMessage));
            }
            
            // 发送响应消息
            WsResponse wsResponse = WsResponse.fromText("SendMessageSuccess:" + msg, "UTF-8");
            TioWebSocketAdapterConfig.send(channelContext, wsResponse);
            
        } catch (Exception e) {
            log.error("处理消息失败: {}", msg, e);
            WsResponse wsResponse = WsResponse.fromText("处理消息失败: " + e.getMessage(), "UTF-8");
            TioWebSocketAdapterConfig.send(channelContext, wsResponse);
        }
        
        return null;
    }
} 
import com.ruoyi.im.config.TioWebSocketAdapterConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.tio.core.ChannelContext;
import org.tio.websocket.common.WsResponse;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

@Component
public class UserSessionManager {
    
    private static final Logger log = LoggerFactory.getLogger(UserSessionManager.class);
    
    // 用户ID -> 用户会话信息
    private final Map<String, UserSession> userSessions = new ConcurrentHashMap<>();
    
    // 设备类型枚举
    public enum DeviceType {
        WEB,    // Web端
        MINI,   // 小程序
        APP     // APP
    }
    
    // 用户会话信息类
    public static class UserSession {
        private String userId;
        private String token;
        private DeviceType deviceType;
        private ChannelContext channelContext;
        private long lastActiveTime;
        
        public UserSession(String userId, String token, DeviceType deviceType, ChannelContext channelContext) {
            this.userId = userId;
            this.token = token;
            this.deviceType = deviceType;
            this.channelContext = channelContext;
            this.lastActiveTime = System.currentTimeMillis();
        }
        
        // Getters and setters
        public String getUserId() { return userId; }
        public void setUserId(String userId) { this.userId = userId; }
        public String getToken() { return token; }
        public void setToken(String token) { this.token = token; }
        public DeviceType getDeviceType() { return deviceType; }
        public void setDeviceType(DeviceType deviceType) { this.deviceType = deviceType; }
        public ChannelContext getChannelContext() { return channelContext; }
        public void setChannelContext(ChannelContext channelContext) { this.channelContext = channelContext; }
        public long getLastActiveTime() { return lastActiveTime; }
        public void setLastActiveTime(long lastActiveTime) { this.lastActiveTime = lastActiveTime; }
    }
    
    // 添加用户会话
    public void addUserSession(String userId, String token, DeviceType deviceType, ChannelContext channelContext) {
        UserSession session = new UserSession(userId, token, deviceType, channelContext);
        userSessions.put(userId, session);
        
        // 绑定用户ID到ChannelContext,使用适配器
        TioWebSocketAdapterConfig.bindUser(channelContext, userId);
        
        log.info("用户会话添加成功: userId={}, deviceType={}", userId, deviceType);
    }
    
    // 移除用户会话
    public void removeUserSession(String userId) {
        UserSession session = userSessions.remove(userId);
        if (session != null) {
            // 解绑用户ID,使用适配器
            TioWebSocketAdapterConfig.unbindUser(session.getChannelContext(), userId);
            log.info("用户会话移除成功: userId={}", userId);
        }
    }
    
    // 获取用户会话
    public UserSession getUserSession(String userId) {
        return userSessions.get(userId);
    }
    
    // 更新用户最后活动时间
    public void updateUserLastActiveTime(String userId) {
        UserSession session = userSessions.get(userId);
        if (session != null) {
            session.setLastActiveTime(System.currentTimeMillis());
        }
    }
    
    // 发送消息给指定用户
    public void sendMessageToUser(String userId, String message) {
        UserSession session = userSessions.get(userId);
        if (session != null && session.getChannelContext() != null) {
            WsResponse wsResponse = WsResponse.fromText(message, "UTF-8");
            // 使用适配器发送消息
            TioWebSocketAdapterConfig.send(session.getChannelContext(), wsResponse);
        } else {
            log.warn("用户不在线,无法发送消息: userId={}", userId);
        }
    }
    
    // 广播消息给所有用户
    public void broadcastMessage(String message) {
        if (userSessions.isEmpty()) {
            return;
        }
        
        WsResponse wsResponse = WsResponse.fromText(message, "UTF-8");
        for (UserSession session : userSessions.values()) {
            if (session.getChannelContext() != null) {
                // 使用适配器发送消息
                TioWebSocketAdapterConfig.send(session.getChannelContext(), wsResponse);
            }
        }
    }
    
    // 获取在线用户数量
    public int getOnlineUserCount() {
        return userSessions.size();
    }
    
    // 获取所有在线用户ID
    public Set<String> getOnlineUserIds() {
        return userSessions.keySet();
    }
} 

6、创建TIO配置类和 TIO WebSocket适配器配置

import com.ruoyi.im.websocket.WebSocketMessageHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.tio.websocket.server.WsServerConfig;
import org.tio.websocket.server.WsServerStarter;

import java.io.IOException;

@Configuration
public class TioWebSocketConfig {
    
    @Autowired
    private WebSocketMessageHandler webSocketMessageHandler;
    
    @Bean
    public WsServerStarter wsServerStarter() throws IOException {
        // 配置t-io websocket服务器,指定端口
        WsServerConfig wsServerConfig = new WsServerConfig(9321);
        
        // 创建WebSocket服务器
        WsServerStarter wsServerStarter = new WsServerStarter(wsServerConfig, webSocketMessageHandler);
        
        // 这里不再获取和配置ServerTioConfig
        
        return wsServerStarter;
    }
} 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.core.intf.Packet;

/**
 * t-io WebSocket适配器配置
 * 帮助处理不同版本的t-io兼容性问题
 */
@Configuration
public class TioWebSocketAdapterConfig {
    
    private static final Logger log = LoggerFactory.getLogger(TioWebSocketAdapterConfig.class);

    /**
     * 发送消息
     *
     * @param channelContext 通道上下文
     * @param packet 数据包
     */
    public static void send(ChannelContext channelContext, Packet packet) {
        if (channelContext == null || packet == null) {
            log.warn("发送消息失败:通道上下文或数据包为空");
            return;
        }

        try {
            // 直接调用 Tio 的 send 方法
            Tio.send(channelContext, packet);
        } catch (Exception e) {
            log.error("发送消息失败", e);
        }
    }


    /**
     * 绑定用户
     *
     * @param channelContext 通道上下文
     * @param userId 用户ID
     */
    public static void bindUser(ChannelContext channelContext, String userId) {
        if (channelContext == null || userId == null) {
            log.warn("绑定用户失败:通道上下文或用户ID为空");
            return;
        }

        try {
            // 直接调用 Tio 的 bindUser 方法
            Tio.bindUser(channelContext, userId);
        } catch (Exception e) {
            log.error("绑定用户失败", e);
        }
    }

    /**
     * 解绑用户
     *
     * @param channelContext 通道上下文
     * @param userId 用户ID
     */
    public static void unbindUser(ChannelContext channelContext, String userId) {
        if (channelContext == null) {
            log.warn("解绑用户失败:通道上下文为空");
            return;
        }

        try {
            // 直接调用 Tio 的 unbindUser 方法
            Tio.unbindUser(channelContext);
        } catch (Exception e) {
            log.error("解绑用户失败", e);
        }
    }
} 

7、配置TIO启动类

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.tio.websocket.server.WsServerStarter;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.lang.reflect.Method;

@Component
public class WebSocketServer {
    
    private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);
    
    @PostConstruct
    public void start() {
        try {
            wsServerStarter.start();
            log.info("WebSocket服务器启动成功,监听端口:9321");
        } catch (IOException e) {
            log.error("WebSocket服务器启动失败", e);
        }
    }
    

启动springboot主类,即可启动tio服务

  public static void main(String[] args) {
        SpringApplication.run(RuoYiApplication.class, args);
    }

8、测试即时通讯功能

可以使用websocket工具进行测试

##链接地址:
ws://127.0.0.1:9321?userId=1&token=1&deviceType=MINI

##链接参数说明:
userId:当前用户id
token:token值
deviceType:设备类型  WEB Web端、MINI 小程序 、APP

##发送消息示例:
{
    "fromUserId": "1",发送者用户ID
    "toUserId": "2",//接收者用户ID
    "content": "测试消息内容",//消息内容
    "messageType": "1"//消息类型1=文本,2=图片,3=语音,4=视频
}

##发送后响应示例:
SendMessageSuccess:{ "fromUserId": "1", "toUserId": "2", "content": "测试消息内容", "messageType": "1" }

网站公告

今日签到

点亮在社区的每一天
去签到