SpringBoot整合t-io是websocket实时通信
1. 前言
随着实时网络应用的普及,即时聊天、在线游戏和实时数据推送等,websocket技术越来越受到开发者的青睐。他允许客户端和服务器之间进行全双工、低延时的通信,从而实现更加流畅的用户体验。
本问将详细介绍如何使用SpringBoot整合t-io快速搭建一个功能完善的websocket服务器,我们将从websocket的基本原理开始,逐步讲解配置,实现消息处理器、处理客户端连接和消息广播等关键不走,最后通过实际测试验证服务器的功能。
2. websocket的原理概述
什么是websocket
网上关于websocket的讲解有很多,也很详尽,再这里我不着重介绍了。我们只需要知道,websocket 是一种在单个TCP连接上进行全双工通信的协议,旨在解决传统HTTP协议在实时通信场景下的不足。它允许服务器主动向客户端推送数据,客户端也可以随时向服务器发送消息,实现真正的实时双向通信;
TCP协议是什么
1)计算机体系结构
如上图所示展示了计算机结构的OSI七层模型以及TCP/IP概念模型
应用层:向客户提供一组常用的应用程序,比如电子邮件、文件传输访问、虚拟终端等
应用层协议:两个主机的两个应用程序之间进行相互交流的数据格式
传输层:提供应用程序间的通信。其功能包括:格式化信息流以及提供可靠传输
网络层:标记了互联网上每一台主机地址,负责相邻计算机之间的通信。
链路层:底层物理通路(线路)
2)TCP/IP协议
TCP/IP协议实际上是一个协议族。TCP/IP协议主要由网络层的IP协议 和 传输层的TCP协议组成 。IP 或 ICMP、TCP 或 UDP、TELNET 或 FTP、以及 HTTP 等都属于 TCP/IP 协议,他们与 TCP 或 IP 的关系紧密。因此,也称 TCP/IP 为网际协议群。
TCP负责发现传输的问题,一有问题就发出信号,要求重新传输,直到所有数据安全正确地传输到目的地。而IP是给因特网的每一台联网设备规定一个地址。
打个比方:TCP协议就相当于中国邮政快递,用来做运输IP协议就相当于邮政编码,用来唯一标记目的地。TCP协议是传输控制协议,工作在传输层。提供面向链接的,可靠的传输服务(三次握手,四次挥手)
面向链接:数据传输之前,客户端与服务器之间要建立连接,才可以传输数据
可靠的:数据传输是有序的,要对数据进行校验,数据不会丢失
websocket的工作流程
1)建立连接(握手):
客户端发送一个带有特殊头部的 HTTP 请求,表示希望升级协议到 WebSocket。
服务器接收到请求后,如果支持 WebSocket,则返回一个包含升级协议的响应,双方确认切换到 WebSocket 协议。
2)数据传输:
连接建立后,客户端和服务器可以在不经过额外握手的情况下随时发送数据。
数据以帧(frame)的形式传输,支持文本和二进制数据。
3)关闭连接:
任意一方都可以发送关闭帧来终止连接。
连接关闭后,双方需重新握手才能建立新的连接
websocket的优势
1.实时性强:支持服务器主动推送数据,减少了客户端轮询的延迟和资源消耗。
2.低开销:一次握手后保持连接,减少了 HTTP 轮询带来的额外开销。
3.全双工通信:客户端和服务器可以同时发送数据,通信更加灵活高效。
4.适用广泛:适用于聊天应用、实时通知、在线游戏、股票行情等多种场景
t-io整合websocket实现实时消息通信
1) 引入jar包
<dependency>
<groupId>org.t-io</groupId>
<artifactId>tio-websocket-server</artifactId>
<version>3.7.2.v20210316-RELEASE</version>
</dependency>
<dependency>
<groupId>org.t-io</groupId>
<artifactId>tio-websocket-client</artifactId>
<version>3.7.2.v20210316-RELEASE</version>
</dependency>
2) 自定义websocket注解实现类
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({SocketAutoConfiguration.class, WebScoketProperties.class})
public @interface EnableTioSocketServer {
}
3) websocket 连接、握手,执行方法
@Slf4j
public abstract class AbstractScoketMsgHandler implements IWsMsgHandler {
@Resource
private ApplicationContext applicationContext;
/**
* 握手时走这个方法,业务可以在这里获取cookie,request参数等
*
* <li>对httpResponse参数进行补充并返回,如果返回null表示不想和对方建立连接,框架会断开连接,如果返回非null,框架会把这个对象发送给对方</li>
* <li>注:请不要在这个方法中向对方发送任何消息,因为这个时候握手还没完成,发消息会导致协议交互失败。</li>
* <li>对于大部分业务,该方法只需要一行代码:return httpResponse;</li>
*/
@Override
public HttpResponse handshake(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {
String token =null;
try {
token = httpRequest.getHeader("authorization");
if (StrUtil.isEmpty(token)) {
token = httpRequest.getParam("authorization");
Map<String, String> map = new HashMap<>();
map.put("authorization", "bearer " + token);
httpRequest.setHeaders(map);
}
if (StrUtil.isEmpty(token)) {
log.info("token没有获取到:{}拒绝连接", token);
return null;
}
if (token.contains(" ")) {
token = token.split(" ")[1];
}
String groupId = httpRequest.getParam("groupId");
if (StringUtils.isNotEmpty(groupId)) {
channelContext.set("groupId", groupId);
String type = httpRequest.getParam("type");
channelContext.set("type", type);
log.info("新建数据绑定,组:{}", groupId);
}
return httpResponse;
} catch (Exception ex) {
log.info("解析token获取用户ID失败:{}拒绝连接", token);
return null;
}
}
/**
* 握手成功后触发该方法
* @author tanyaowu
*/
@Override
public void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {
String type = httpRequest.getParam("type");
if (StringUtils.isNotEmpty(type)) {
switch (type) {
case "live":
Aio.sendToUser(channelContext.userid, Aio.getWsResponse("成功连接"));
String groupId = httpRequest.getParam("groupId");
LiveEvent liveEvent = new LiveEvent(this, groupId, channelContext.userid, 0);
applicationContext.publishEvent(liveEvent);
case "visit":
String groupId1 = httpRequest.getParam("groupId");
//绑定到群组,后面会有群发
Aio.bindGroup(channelContext, groupId1);
int count = Tio.getAll(channelContext.tioConfig).getObj().size();
String msg = "{type:'online',name:'"+ channelContext.get("userName") + "',message:'" + channelContext.userid + " 进来了,共【" + count + "】人在线" + "'}";
//将对象数据转为字符串
String jsonString = JSON.toJSONString(msg);
//群发
Tio.sendToGroup(channelContext.tioConfig, groupId1, Aio.getWsResponse(jsonString));
break;
default: break;
}
}
}
/**
* <li>当收到Opcode.BINARY消息时,执行该方法。也就是说如何你的ws是基于BINARY传输的,就会走到这个方法</li>
* @return 可以是WsResponse、byte[]、ByteBuffer、String或null,如果是null,框架不会回消息
* @author tanyaowu
*/
@Override
public Object onBytes(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
opcodeBytes(wsRequest, bytes, channelContext);
return null;
}
/**
* 当收到Opcode.CLOSE时,执行该方法,业务层在该方法中一般不需要写什么逻辑,空着就好
* @return 可以是WsResponse、byte[]、ByteBuffer、String或null,如果是null,框架不会回消息
* @author tanyaowu
*/
@Override
public Object onClose(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
Tio.remove(channelContext, "WebSocket Close");
String type = channelContext.get("type") + "";
if (StringUtils.isNotEmpty(type)) {
switch (type) {
case "live":
String groupId = channelContext.get("groupId") + "";
LiveEvent liveEvent = new LiveEvent(this, groupId, channelContext.userid, 1);
applicationContext.publishEvent(liveEvent);
break;
case "visit":
String content = channelContext.get("content")+"";
LiveEvent liveEvent1 = new LiveEvent(this, content, channelContext.userid, 1);
applicationContext.publishEvent(liveEvent1);
Tio.remove(channelContext, "receive close flag");
break;
default:break;
}
}
return null;
}
/**
* <li>当收到Opcode.TEXT消息时,执行该方法。也就是说如何你的ws是基于TEXT传输的,就会走到这个方法</li>
*
* @return 可以是WsResponse、byte[]、ByteBuffer、String或null,如果是null,框架不会回消息
* @author tanyaowu
*/
@Override
public Object onText(WsRequest wsRequest, String text, ChannelContext channelContext) throws Exception {
opcodeText(wsRequest, text, channelContext);
//返回值是要发送给客户端的内容,一般都是返回null
return null;
}
public abstract Object opcodeText(WsRequest wsRequest, String s, ChannelContext channelContext) throws Exception;
public abstract Object opcodeBytes(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception;
}
4) websocket的 AbstractScoketMsgHandler实现类 ScoketMsgHandler
@Slf4j
@Component
public class ScoketMsgHandler extends AbstractScoketMsgHandler{
@Autowired(required = false)
private IChatgptService iChatgptService;
@Override
public Object opcodeText(WsRequest wsRequest, String s, ChannelContext channelContext) throws Exception {
log.info("收到来自【{}】:用户的文本消息:{}", channelContext.userid, s);
boolean json = JSONUtil.isJson(s);
if(!json){
iChatgptService.sendAndReceive(channelContext.userid, s);
return null;
}
Map map = JSONUtil.toBean(s, Map.class);
switch (String.valueOf(map.get("type"))){
case "visit":
log.info("回访信息");
String contentJson = String.valueOf(map.get("content"));
ChatHistoryDTO chatHistoryDTO = JSONUtil.toBean(contentJson, ChatHistoryDTO.class);
Boolean b = Aio.sendToUser(chatHistoryDTO.getReceiveUserId(), Aio.getWsResponse(contentJson));
/*ChatHistory chatHistory = new ChatHistory();
chatHistory.setStatus("0");
if(b){
chatHistory.setStatus("1");
}
chatHistory.setFollowId(chatHistoryDTO.getFollowId());
chatHistory.setCreateTime(new Date());
chatHistory.setContent(chatHistoryDTO.getContent());
chatHistory.setSendUserId(chatHistoryDTO.getReceiveUserId());
chatHistory.setReceiveUserId(chatHistoryDTO.getSendUserId());
chatHistory.setStatus("0");
iChatHistoryService.save(chatHistory);*/
break;
default:break;
}
return null;
}
@Override
public Object opcodeBytes(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
log.info("收到来自【{}】:用户的字节消息", channelContext.userid);
return null;
}
}
public interface IChatgptService {
void sendAndReceive(String userId, String prompt) throws Exception;
}
public class ChatgptServiceImpl implements IChatgptService{
@Autowired
private SparkClient sparkClient;
private final static String GREETINGS="问候语";
@Override
public void sendAndReceive(String userId, String prompt) throws Exception {
if (prompt.equalsIgnoreCase(GREETINGS)) {
//添加前置语
WsResponse wsResponse = WsResponse.fromText("您好,我是您的健康小顾问,可以解答疾病、饮食、健康等多个方面的相关问题。比如您可以问我“糖尿病应该注意哪些问题?", "UTF-8");
Aio.sendToUser(userId, wsResponse);
return;
}
List<SparkMessage> messages=new ArrayList<>();
//添加前置语
messages.add(SparkMessage.userContent("请作为资深的健康咨询师回答关于职工健康和疾病预防的问题:".concat(prompt)));
SparkRequest sparkRequest= SparkRequest.builder()
.messages(messages)
.maxTokens(500)
.temperature(0.5).apiVersion(SparkApiVersion.V3_5).build();
sparkClient.chatStream(sparkRequest,new SparkConsoleListener(userId,prompt));
}
}
5)websocket实现config
1.SocketAutoConfiguration 类
@Slf4j
@Import({WebSocketServerInitConfig.class})
public class SocketAutoConfiguration implements Serializable {
@Autowired(required = false)
private WebScoketProperties webScoketProperties;
@Bean
public WebSocketServerBootstrap webSocketServerBootstrap(){
return new WebSocketServerBootstrap(webScoketProperties);
}
}
2.WebScoketProperties 类
@Data
@Component
@ConfigurationProperties(prefix = "tio.config")
public class WebScoketProperties {
/**
* 服务名称
*/
private String name = "web-scoket";
/**
* 服务绑定的 IP 地址,默认不绑定
*/
private String ip = null;
private int port = 6789;
private int heartbeatTimeout = 5000;
private SslProperties ssl;
public boolean useSSL() {
return ssl != null && ssl.keyStore != null && ssl.trustStore != null;
}
/**
* ssl配置
*/
public static class SslProperties {
private String keyStore;
private String trustStore;
private String password;
public String getKeyStore() {
return keyStore;
}
public void setKeyStore(String keyStore) {
this.keyStore = keyStore;
}
public String getTrustStore() {
return trustStore;
}
public void setTrustStore(String trustStore) {
this.trustStore = trustStore;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
}
}
3.WebSocketServerBootstrap 类 socket自动配置类
@Slf4j
public class WebSocketServerBootstrap {