SpringBoot整合t-io是websocket实时通信

发布于:2025-08-05 ⋅ 阅读:(25) ⋅ 点赞:(0)

SpringBoot整合t-io是websocket实时通信

1. 前言

随着实时网络应用的普及,即时聊天、在线游戏和实时数据推送等,websocket技术越来越受到开发者的青睐。他允许客户端和服务器之间进行全双工、低延时的通信,从而实现更加流畅的用户体验。

本问将详细介绍如何使用SpringBoot整合t-io快速搭建一个功能完善的websocket服务器,我们将从websocket的基本原理开始,逐步讲解配置,实现消息处理器、处理客户端连接和消息广播等关键不走,最后通过实际测试验证服务器的功能。

2. websocket的原理概述

什么是websocket

网上关于websocket的讲解有很多,也很详尽,再这里我不着重介绍了。我们只需要知道,websocket 是一种在单个TCP连接上进行全双工通信的协议,旨在解决传统HTTP协议在实时通信场景下的不足。它允许服务器主动向客户端推送数据,客户端也可以随时向服务器发送消息,实现真正的实时双向通信;

TCP协议是什么

1)计算机体系结构

在这里插入图片描述
如上图所示展示了计算机结构的OSI七层模型以及TCP/IP概念模型

应用层:向客户提供一组常用的应用程序,比如电子邮件、文件传输访问、虚拟终端等
应用层协议:两个主机的两个应用程序之间进行相互交流的数据格式
传输层:提供应用程序间的通信。其功能包括:格式化信息流以及提供可靠传输
网络层:标记了互联网上每一台主机地址,负责相邻计算机之间的通信。
链路层:底层物理通路(线路)

2)TCP/IP协议

TCP/IP协议实际上是一个协议族。TCP/IP协议主要由网络层的IP协议 和 传输层的TCP协议组成 。IP 或 ICMP、TCP 或 UDP、TELNET 或 FTP、以及 HTTP 等都属于 TCP/IP 协议,他们与 TCP 或 IP 的关系紧密。因此,也称 TCP/IP 为网际协议群。
TCP负责发现传输的问题,一有问题就发出信号,要求重新传输,直到所有数据安全正确地传输到目的地。而IP是给因特网的每一台联网设备规定一个地址。
打个比方:TCP协议就相当于中国邮政快递,用来做运输IP协议就相当于邮政编码,用来唯一标记目的地。TCP协议是传输控制协议,工作在传输层。提供面向链接的,可靠的传输服务(三次握手,四次挥手)
面向链接:数据传输之前,客户端与服务器之间要建立连接,才可以传输数据
可靠的:数据传输是有序的,要对数据进行校验,数据不会丢失

websocket的工作流程

1)建立连接(握手):

客户端发送一个带有特殊头部的 HTTP 请求,表示希望升级协议到 WebSocket。
服务器接收到请求后,如果支持 WebSocket,则返回一个包含升级协议的响应,双方确认切换到 WebSocket 协议。

2)数据传输:

连接建立后,客户端和服务器可以在不经过额外握手的情况下随时发送数据。
数据以帧(frame)的形式传输,支持文本和二进制数据。

3)关闭连接:

任意一方都可以发送关闭帧来终止连接。
连接关闭后,双方需重新握手才能建立新的连接

websocket的优势

1.实时性强:支持服务器主动推送数据,减少了客户端轮询的延迟和资源消耗。
2.低开销:一次握手后保持连接,减少了 HTTP 轮询带来的额外开销。
3.全双工通信:客户端和服务器可以同时发送数据,通信更加灵活高效。
4.适用广泛:适用于聊天应用、实时通知、在线游戏、股票行情等多种场景

t-io整合websocket实现实时消息通信

1) 引入jar包
        <dependency>
            <groupId>org.t-io</groupId>
            <artifactId>tio-websocket-server</artifactId>
            <version>3.7.2.v20210316-RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.t-io</groupId>
            <artifactId>tio-websocket-client</artifactId>
            <version>3.7.2.v20210316-RELEASE</version>
        </dependency>
2) 自定义websocket注解实现类
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({SocketAutoConfiguration.class, WebScoketProperties.class})
public @interface EnableTioSocketServer {
}
3) websocket 连接、握手,执行方法
@Slf4j
public abstract class AbstractScoketMsgHandler implements IWsMsgHandler {

    @Resource
    private ApplicationContext applicationContext;

    /**
     * 握手时走这个方法,业务可以在这里获取cookie,request参数等
     *
     * <li>对httpResponse参数进行补充并返回,如果返回null表示不想和对方建立连接,框架会断开连接,如果返回非null,框架会把这个对象发送给对方</li>
     * <li>注:请不要在这个方法中向对方发送任何消息,因为这个时候握手还没完成,发消息会导致协议交互失败。</li>
     * <li>对于大部分业务,该方法只需要一行代码:return httpResponse;</li>
     */
    @Override
    public HttpResponse handshake(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {
        String token =null;
        try {
            token = httpRequest.getHeader("authorization");
            if (StrUtil.isEmpty(token)) {
                token = httpRequest.getParam("authorization");
                Map<String, String> map = new HashMap<>();
                map.put("authorization", "bearer " + token);
                httpRequest.setHeaders(map);
            }
            if (StrUtil.isEmpty(token)) {
                log.info("token没有获取到:{}拒绝连接", token);
                return null;
            }
            if (token.contains(" ")) {
                token = token.split(" ")[1];
            }
            String groupId = httpRequest.getParam("groupId");
            if (StringUtils.isNotEmpty(groupId)) {
                channelContext.set("groupId", groupId);
                String type = httpRequest.getParam("type");
                channelContext.set("type", type);
                log.info("新建数据绑定,组:{}", groupId);
            }
            return httpResponse;
        } catch (Exception ex) {
            log.info("解析token获取用户ID失败:{}拒绝连接", token);
            return null;
        }
    }

    /**
     * 握手成功后触发该方法
     * @author tanyaowu
     */
    @Override
    public void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {

        String type = httpRequest.getParam("type");
        if (StringUtils.isNotEmpty(type)) {
            switch (type) {
                case "live":
                    Aio.sendToUser(channelContext.userid, Aio.getWsResponse("成功连接"));
                    String groupId = httpRequest.getParam("groupId");
                    LiveEvent liveEvent = new LiveEvent(this, groupId, channelContext.userid, 0);
                    applicationContext.publishEvent(liveEvent);
                case "visit":
                    String groupId1 = httpRequest.getParam("groupId");
                    //绑定到群组,后面会有群发
                    Aio.bindGroup(channelContext, groupId1);
                    int count = Tio.getAll(channelContext.tioConfig).getObj().size();
                    String msg = "{type:'online',name:'"+ channelContext.get("userName") + "',message:'" + channelContext.userid + " 进来了,共【" + count + "】人在线" + "'}";
                    //将对象数据转为字符串
                    String jsonString = JSON.toJSONString(msg);
                    //群发
                    Tio.sendToGroup(channelContext.tioConfig, groupId1, Aio.getWsResponse(jsonString));
                    break;
                default: break;
            }
        }
    }
    /**
     * <li>当收到Opcode.BINARY消息时,执行该方法。也就是说如何你的ws是基于BINARY传输的,就会走到这个方法</li>
     * @return 可以是WsResponse、byte[]、ByteBuffer、String或null,如果是null,框架不会回消息
     * @author tanyaowu
     */
    @Override
    public Object onBytes(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
        opcodeBytes(wsRequest, bytes, channelContext);
        return null;
    }

    /**
     * 当收到Opcode.CLOSE时,执行该方法,业务层在该方法中一般不需要写什么逻辑,空着就好
     * @return 可以是WsResponse、byte[]、ByteBuffer、String或null,如果是null,框架不会回消息
     * @author tanyaowu
     */
    @Override
    public Object onClose(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
        Tio.remove(channelContext, "WebSocket Close");
        String type = channelContext.get("type") + "";
        if (StringUtils.isNotEmpty(type)) {
            switch (type) {
                case "live":
                    String groupId = channelContext.get("groupId") + "";
                    LiveEvent liveEvent = new LiveEvent(this, groupId, channelContext.userid, 1);
                    applicationContext.publishEvent(liveEvent);
                    break;
                case "visit":
                    String content = channelContext.get("content")+"";
                    LiveEvent liveEvent1 = new LiveEvent(this, content, channelContext.userid, 1);
                    applicationContext.publishEvent(liveEvent1);
                    Tio.remove(channelContext, "receive close flag");
                    break;
                default:break;
            }
        }
        return null;
    }

    /**
     * <li>当收到Opcode.TEXT消息时,执行该方法。也就是说如何你的ws是基于TEXT传输的,就会走到这个方法</li>
     *
     * @return 可以是WsResponse、byte[]、ByteBuffer、String或null,如果是null,框架不会回消息
     * @author tanyaowu
     */
    @Override
    public Object onText(WsRequest wsRequest, String text, ChannelContext channelContext) throws Exception {
        opcodeText(wsRequest, text, channelContext);
        //返回值是要发送给客户端的内容,一般都是返回null
        return null;
    }

    public abstract Object opcodeText(WsRequest wsRequest, String s, ChannelContext channelContext) throws Exception;

    public abstract Object opcodeBytes(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception;
}

4) websocket的 AbstractScoketMsgHandler实现类 ScoketMsgHandler
@Slf4j
@Component
public class ScoketMsgHandler extends AbstractScoketMsgHandler{

    @Autowired(required = false)
    private IChatgptService iChatgptService;

    @Override
    public Object opcodeText(WsRequest wsRequest, String s, ChannelContext channelContext) throws Exception {
        log.info("收到来自【{}】:用户的文本消息:{}", channelContext.userid, s);
        boolean json = JSONUtil.isJson(s);
        if(!json){
            iChatgptService.sendAndReceive(channelContext.userid, s);
            return null;
        }
        Map map = JSONUtil.toBean(s, Map.class);
        switch (String.valueOf(map.get("type"))){
            case "visit":
                log.info("回访信息");
                String contentJson = String.valueOf(map.get("content"));
                ChatHistoryDTO chatHistoryDTO = JSONUtil.toBean(contentJson, ChatHistoryDTO.class);
                Boolean b = Aio.sendToUser(chatHistoryDTO.getReceiveUserId(), Aio.getWsResponse(contentJson));
                /*ChatHistory chatHistory = new ChatHistory();
                chatHistory.setStatus("0");
                if(b){
                    chatHistory.setStatus("1");
                }
                chatHistory.setFollowId(chatHistoryDTO.getFollowId());
                chatHistory.setCreateTime(new Date());
                chatHistory.setContent(chatHistoryDTO.getContent());
                chatHistory.setSendUserId(chatHistoryDTO.getReceiveUserId());
                chatHistory.setReceiveUserId(chatHistoryDTO.getSendUserId());
                chatHistory.setStatus("0");
                iChatHistoryService.save(chatHistory);*/
                break;
            default:break;
        }
        return null;
    }

    @Override
    public Object opcodeBytes(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
        log.info("收到来自【{}】:用户的字节消息", channelContext.userid);
        return null;
    }
}

public interface IChatgptService {
    void sendAndReceive(String userId, String prompt) throws Exception;
}

public class ChatgptServiceImpl implements IChatgptService{

    @Autowired
    private SparkClient sparkClient;

    private final static String GREETINGS="问候语";

    @Override
    public void sendAndReceive(String userId, String prompt) throws Exception {
        if (prompt.equalsIgnoreCase(GREETINGS)) {
         //添加前置语
            WsResponse wsResponse = WsResponse.fromText("您好,我是您的健康小顾问,可以解答疾病、饮食、健康等多个方面的相关问题。比如您可以问我“糖尿病应该注意哪些问题?", "UTF-8");
            Aio.sendToUser(userId, wsResponse);
            return;
        }
        List<SparkMessage> messages=new ArrayList<>();
        //添加前置语
        messages.add(SparkMessage.userContent("请作为资深的健康咨询师回答关于职工健康和疾病预防的问题:".concat(prompt)));
        SparkRequest sparkRequest= SparkRequest.builder()
                .messages(messages)
                .maxTokens(500)
                .temperature(0.5).apiVersion(SparkApiVersion.V3_5).build();
        sparkClient.chatStream(sparkRequest,new SparkConsoleListener(userId,prompt));
    }
}
5)websocket实现config

1.SocketAutoConfiguration 类

@Slf4j
@Import({WebSocketServerInitConfig.class})
public class SocketAutoConfiguration implements Serializable {

    @Autowired(required = false)
    private WebScoketProperties webScoketProperties;

    @Bean
    public WebSocketServerBootstrap webSocketServerBootstrap(){
        return new WebSocketServerBootstrap(webScoketProperties);
    }
}

2.WebScoketProperties 类

@Data
@Component
@ConfigurationProperties(prefix = "tio.config")
public class WebScoketProperties {

    /**
     * 服务名称
     */
    private String name = "web-scoket";
    /**
     * 服务绑定的 IP 地址,默认不绑定
     */
    private String ip = null;
    private int port = 6789;
    private int heartbeatTimeout = 5000;
    private SslProperties ssl;
    public boolean useSSL() {
        return ssl != null && ssl.keyStore != null && ssl.trustStore != null;
    }

    /**
     * ssl配置
     */
    public static class SslProperties {
        private String keyStore;
        private String trustStore;
        private String password;

        public String getKeyStore() {
            return keyStore;
        }

        public void setKeyStore(String keyStore) {
            this.keyStore = keyStore;
        }

        public String getTrustStore() {
            return trustStore;
        }

        public void setTrustStore(String trustStore) {
            this.trustStore = trustStore;
        }

        public String getPassword() {
            return password;
        }

        public void setPassword(String password) {
            this.password = password;
        }
    }
}

3.WebSocketServerBootstrap 类 socket自动配置类

@Slf4j
public class WebSocketServerBootstrap {

   

网站公告

今日签到

点亮在社区的每一天
去签到