消息推送常见方案
轮询
浏览器以指定的时间间隔
向服务器发出HTTP请求,服务器实时返回数据给浏览器。
长轮询
浏览器发出异步请求,服务器接到请求后,会阻塞请求
,直到有数据或超时才会返回。
SSE(服务器发送事件)
在服务器和浏览器之间打开一个单向通道(是服务器给浏览器的),服务器不再响应一次性数据包,而是test/event-stream类型的数据流信息
,服务器有数据变更时将数据流式传输到客户端。
WebSocket
基于TCP连接上进行全双工通信的协议。
全双工:允许数据在两个方向上
同时传输
半双工:允许数据在两个方向上传输,但是同一时间内只允许一个方向上传输
- 浏览器先发送一个请求,在请求头里会有一个(UPgrade: websocket),表示将HTTP协议升级成WebSocket协议
- 服务器给一个响应101,表示把HTTP协议切换为WebSocket协议
- 在这以后,浏览器和服务器之间就是双向传输数据
WebSocket API
客户端
创建websocket对象
let ws = new WebSocket(URL);
格式:
ws://ip地址/访问路径
websocket对象相关事件
事件 | 事件处理程序 | 描述 |
---|---|---|
open | ws.onopen | 连接建立时触发 |
message | ws.onmessage | 客户端接收到服务器发送的数据时触发 |
close | ws.onclose | 连接关闭时触发 |
websocket对象提供的方法
send():通过websocket对象调用该方法发送数据给服务器
服务器
WebSocket由一系列的Endpoint组成,表示WebSocket链接的一段,对于服务器,可以视为处理具体WebSocket消息的接口。
Endpoint定义
定义EndPoint:
- 编程式:继承
java.websocket.Endpoint
并实现其方法 - 注解式:定义一个POJO类,并添加
@ServerEndpoint
注解
Endpoint生命周期
Endpoint实例在WebSocket握手时创建,并在客户端与服务器连接的过程中有效,在连接关闭时结束。在Endpoint接口中定义了与生命周期相关的方法,会在生命周期的各个阶段调用对应的方法。
方法(编程式开发) | 描述 | 注解(注解式开发) |
---|---|---|
onOpen() | 开启一个新的会话时调用,该方法时客户端与服务器握手成功后调用的方法 | @OnOpen |
onClode() | 当会话关闭时调用 | @OnClode |
OnError() | 当连接过程异常时调用 | @OnError |
服务器与客户端之间的通信
服务器接收客户端的数据
- 编程式:通过添加MessageHandler消息处理器来接收消息
- 注解式:在定义Endpoint时,通过@OnMessage注解指定接收消息的方法
@ServerEndpoint("/chat")
@Component
public class ChatEndpoint {
@OnOpen
public void onOpen(Session session, EndpointConfig config) { // session:webSocket会话
// 连接建立时调用
}
@OnMessage
public void onMessage(String message) { // message:接收到的浏览器数据
// 接收到客户端发送的数据时调用
}
@OnClose
public void onClose(Session session) { // session:webSocket会话
// 连接关闭时被调用
}
}
服务器推送数据给客户端
发送消息由RemoteEndpoint完成,实例由Session维护
- 通过session.getBasicRemote获取
同步
消息发送的实例,然后调用sendXxx()方法发送消息 - 通过session.getAsyncRemote获取
异步
消息发送实例,然后调用sendXxx()方法发送消息
流程分析
代码实现
@ServerEndpoint(value = "/chat", configurator = GetHttpSessionConfig.class)
@Component
public class ChatEndpoint {
/*
static:如果没有static,那么onlineUsers是争对每个客户端连接都有一个新的map;
而我们想要所有的Endpoint都使用同一个map集合
*/
private static final Map<String, Session> onlineUsers = new ConcurrentHashMap<String, Session>();
private HttpSession httpSession;
/**
* 建立WebSocket连接后触发
* @param session
* @param config
*/
@OnOpen
public void onOpen(Session session, EndpointConfig config) {
// 1. 保存Session
httpSession = (HttpSession) config.getUserProperties().get(HttpSession.class.getName());
String username = (String) httpSession.getAttribute("user");
onlineUsers.put(username, session);
// 2. 广播消息,将当前用户登录的信息发给所有在线的用户
/*
message:{"system": true, "fromName": null, "message": ["李四", "王五"]}
*/
broadcastAllUsers(MessageUtils.getMessage(true, null, getOnlineUsers()));
}
/*获得所有在线的用户名称*/
private Set<String> getOnlineUsers() {
return onlineUsers.keySet();
}
/*广播消息*/
private void broadcastAllUsers(String message) {
onlineUsers.forEach((username, session) -> {
try {
// 同步发送消息
session.getBasicRemote().sendText(message);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
/**
* 张三 -> 李四
* message:{"toName": "张三", "message": "你好"}
* 接收到浏览器过来的数据时触发
* @param message
*/
@OnMessage
public void onMessage(String message) throws IOException {
// 1. 将消息推送给指定的用户
Message msg = JSON.parseObject(message, Message.class);
// 2. 获取接收方用户名、发送的消息
String toName = msg.getToName();
String mess = msg.getMessage();
// 3. 发送消息
Session receiverSession = onlineUsers.get(toName); // 接收方session
/*
消息格式:{"system": "false", "fromName": "张三", "message": "你好"}
*/
String fromName = (String) httpSession.getAttribute("user"); // 发送方name
String sendMsg = MessageUtils.getMessage(false, fromName, mess);
receiverSession.getBasicRemote().sendText(sendMsg); // 发消息
}
/**
* 断开连接时触发
* @param session
*/
@OnClose
public void onClose(Session session) {
// 1. 从onlineUsers中剔除当前用户的session对象
String username = (String) httpSession.getAttribute("user");
onlineUsers.remove(username);
// 2. 通知其他所有的用户:当前用户下线了
broadcastAllUsers(MessageUtils.getMessage(false, null, getOnlineUsers()));
}
}