引言
在前两篇文章中,我们分别介绍了Java网络编程的基础模型和NIO技术。本文将探讨两个更加高级的主题:WebSocket协议和Reactor模式。这两种技术分别解决了实时双向通信和高并发处理的问题,是构建现代网络应用的重要工具。
WebSocket:实时双向通信的解决方案
WebSocket是一种在单个TCP连接上进行全双工通信的协议,它允许服务器主动向客户端推送数据,使得Web应用能够实现真正的实时交互。
WebSocket的核心特性
- 持久连接:建立连接后保持长时间开放,避免频繁的连接建立和断开
- 全双工通信:客户端和服务器可以同时发送和接收数据
- 轻量级协议:相比HTTP,WebSocket的帧头部更小,减少了数据传输量
- 基于标准:被所有主流浏览器支持,可以无缝集成到Web应用中
基于Java实现的WebSocket服务器
以下是一个基于NIO的WebSocket服务器实现:
public class WebSocketServer {
private static final int PORT = 8080;
private static Selector selector;
private static Map<SocketChannel, WebSocketConnection> connections = new ConcurrentHashMap<>();
public static void main(String[] args) throws IOException {
// 创建ServerSocketChannel
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.socket().bind(new InetSocketAddress(PORT));
serverChannel.configureBlocking(false);
// 创建Selector
selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("WebSocket服务器启动,监听端口:" + PORT);
while (true) {
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isAcceptable()) {
handleAccept(key);
} else if (key.isReadable()) {
handleRead(key);
}
keyIterator.remove();
}
}
}
private static void handleAccept(SelectionKey key) throws IOException {
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
SocketChannel clientChannel = serverChannel.accept();
clientChannel.configureBlocking(false);
clientChannel.register(selector, SelectionKey.OP_READ);
// 创建WebSocket连接对象
connections.put(clientChannel, new WebSocketConnection(clientChannel));
System.out.println("新客户端连接:" + clientChannel.getRemoteAddress());
}
private static void handleRead(SelectionKey key) throws IOException {
SocketChannel clientChannel = (SocketChannel) key.channel();
WebSocketConnection connection = connections.get(clientChannel);
try {
if (connection.isHandshakeComplete()) {
// 处理WebSocket数据帧
processWebSocketFrame(connection);
} else {
// 处理WebSocket握手
processHandshake(connection);
}
} catch (IOException e) {
System.out.println("连接异常:" + e.getMessage());
connections.remove(clientChannel);
key.cancel();
clientChannel.close();
}
}
private static void processHandshake(WebSocketConnection connection) throws IOException {
// 读取HTTP握手请求
ByteBuffer buffer = ByteBuffer.allocate(1024);
int bytesRead = connection.getChannel().read(buffer);
if (bytesRead < 0) {
throw new IOException("客户端关闭连接");
}
buffer.flip();
byte[] data = new byte[bytesRead];
buffer.get(data);
String request = new String(data);
// 解析WebSocket握手请求
if (request.contains("Upgrade: websocket")) {
// 提取Sec-WebSocket-Key
String secWebSocketKey = extractWebSocketKey(request);
// 生成Sec-WebSocket-Accept
String secWebSocketAccept = generateWebSocketAccept(secWebSocketKey);
// 构建握手响应
String response = "HTTP/1.1 101 Switching Protocols\r\n" +
"Upgrade: websocket\r\n" +
"Connection: Upgrade\r\n" +
"Sec-WebSocket-Accept: " + secWebSocketAccept + "\r\n\r\n";
// 发送握手响应
connection.getChannel().write(ByteBuffer.wrap(response.getBytes()));
connection.setHandshakeComplete(true);
System.out.println("WebSocket握手完成:" + connection.getChannel().getRemoteAddress());
}
}
private static void processWebSocketFrame(WebSocketConnection connection) throws IOException {
// 读取WebSocket数据帧
ByteBuffer buffer = ByteBuffer.allocate(1024);
int bytesRead = connection.getChannel().read(buffer);
if (bytesRead < 0) {
throw new IOException("客户端关闭连接");
}
buffer.flip();
// 解析WebSocket数据帧
WebSocketFrame frame = WebSocketFrame.parse(buffer);
if (frame.isTextFrame()) {
String message = frame.getPayloadText();
System.out.println("收到消息:" + message);
// 构建响应帧并发送
WebSocketFrame responseFrame = WebSocketFrame.createTextFrame("服务器回复:" + message);
connection.getChannel().write(responseFrame.toByteBuffer());
} else if (frame.isCloseFrame()) {
// 处理关闭帧
connection.getChannel().close();
connections.remove(connection.getChannel());
System.out.println("WebSocket连接关闭");
}
}
// 辅助方法:提取WebSocket密钥
private static String extractWebSocketKey(String request) {
String[] lines = request.split("\r\n");
for (String line : lines) {
if (line.startsWith("Sec-WebSocket-Key:")) {
return line.substring(line.indexOf(':') + 1).trim();
}
}
return "";
}
// 辅助方法:生成WebSocket接受密钥
private static String generateWebSocketAccept(String key) throws NoSuchAlgorithmException {
String concat = key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
MessageDigest sha1 = MessageDigest.getInstance("SHA-1");
byte[] hash = sha1.digest(concat.getBytes());
return Base64.getEncoder().encodeToString(hash);
}
}
WebSocket客户端实现(HTML/JavaScript)
<!DOCTYPE html>
<html>
<head>
<title>WebSocket客户端</title>
<style>
body { font-family: Arial, sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; }
#messageArea { height: 300px; border: 1px solid #ccc; overflow-y: scroll; padding: 10px; margin-bottom: 10px; }
#messageInput { width: 80%; padding: 5px; }
button { padding: 5px 10px; }
</style>
</head>
<body>
<h1>WebSocket客户端</h1>
<div id="messageArea"></div>
<input type="text" id="messageInput" placeholder="输入消息...">
<button onclick="sendMessage()">发送</button>
<button onclick="connect()">连接</button>
<button onclick="disconnect()">断开</button>
<script>
let socket;
const messageArea = document.getElementById('messageArea');
const messageInput = document.getElementById('messageInput');
function connect() {
if (socket && socket.readyState === WebSocket.OPEN) {
appendMessage("已经连接到服务器");
return;
}
socket = new WebSocket("ws://localhost:8080");
socket.onopen = function(event) {
appendMessage("已连接到服务器");
};
socket.onmessage = function(event) {
appendMessage("收到消息: " + event.data);
};
socket.onclose = function(event) {
appendMessage("连接已关闭");
};
socket.onerror = function(error) {
appendMessage("连接错误: " + error);
};
}
function disconnect() {
if (socket) {
socket.close();
socket = null;
}
}
function sendMessage() {
if (!socket || socket.readyState !== WebSocket.OPEN) {
appendMessage("未连接到服务器");
return;
}
const message = messageInput.value;
if (message) {
socket.send(message);
appendMessage("发送消息: " + message);
messageInput.value = "";
}
}
function appendMessage(message) {
const messageElement = document.createElement('div');
messageElement.textContent = message;
messageArea.appendChild(messageElement);
messageArea.scrollTop = messageArea.scrollHeight;
}
// 页面加载时自动连接
window.onload = connect;
</script>
</body>
</html>
Reactor模式:高性能网络服务器的架构
Reactor模式是一种事件驱动的设计模式,它将请求的接收和处理分离,通过事件分发器(Dispatcher)将I/O事件分发给对应的处理器(Handler)。这种模式特别适合构建高性能的网络服务器。
Reactor模式的核心组件
- Reactor:事件分发器,负责监听和分发I/O事件
- Acceptor:接受新连接的处理器
- Handler:处理特定I/O事件的组件
- EventLoop:事件循环,持续监听事件并调用相应的处理器
主从Reactor模式实现
以下是一个基于主从Reactor模式的高性能服务器实现:
public class ReactorServer {
private static final int PORT = 8080;
// CPU核心数
private static final int PROCESSOR_COUNT = Runtime.getRuntime().availableProcessors();
public static void main(String[] args) throws IOException {
// 创建主Reactor线程,负责接受新连接
Reactor mainReactor = new Reactor(PORT);
// 启动主Reactor线程
new Thread(mainReactor, "MainReactor").start();
}
// Reactor类,负责监听和分发事件
static class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverChannel;
// 子Reactor数组
final SubReactor[] subReactors = new SubReactor[PROCESSOR_COUNT];
// 轮询计数器
AtomicInteger next = new AtomicInteger(0);
// 业务线程池
ExecutorService businessPool = Executors.newFixedThreadPool(100);
Reactor(int port) throws IOException {
selector = Selector.open();
serverChannel = ServerSocketChannel.open();
serverChannel.socket().bind(new InetSocketAddress(port));
serverChannel.configureBlocking(false);
// 注册Accept事件
SelectionKey sk = serverChannel.register(selector, SelectionKey.OP_ACCEPT);
// 将Acceptor附加到SelectionKey
sk.attach(new Acceptor());
// 创建并启动子Reactor
for (int i = 0; i < subReactors.length; i++) {
subReactors[i] = new SubReactor();
new Thread(subReactors[i], "SubReactor-" + i).start();
}
System.out.println("主从Reactor服务器启动,监听端口:" + port);
System.out.println("主Reactor线程:1,子Reactor线程:" + PROCESSOR_COUNT + ",业务线程池大小:100");
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
dispatch(key);
it.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
void dispatch(SelectionKey key) {
Runnable r = (Runnable) key.attachment();
if (r != null) {
r.run();
}
}
// Acceptor类,负责接受新连接
class Acceptor implements Runnable {
@Override
public void run() {
try {
SocketChannel channel = serverChannel.accept();
if (channel != null) {
System.out.println("新客户端连接:" + channel.getRemoteAddress());
// 轮询选择一个子Reactor
int index = next.getAndIncrement() % subReactors.length;
// 将新连接注册到选中的子Reactor
subReactors[index].register(channel);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
// 子Reactor类,负责处理I/O事件
class SubReactor implements Runnable {
final Selector selector;
final Queue<SocketChannel> channelQueue = new ConcurrentLinkedQueue<>();
SubReactor() throws IOException {
this.selector = Selector.open();
}
// 注册新连接
void register(SocketChannel channel) {
channelQueue.offer(channel);
selector.wakeup();
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
// 处理新注册的连接
processNewChannels();
// 等待事件
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
if (key.isReadable()) {
// 读事件
read(key);
} else if (key.isWritable()) {
// 写事件
write(key);
}
it.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
// 处理新注册的连接
private void processNewChannels() throws IOException {
SocketChannel channel;
while ((channel = channelQueue.poll()) != null) {
channel.configureBlocking(false);
// 注册读事件
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
// 创建Handler并附加到key
key.attach(new Handler(key, businessPool));
}
}
// 处理读事件
private void read(SelectionKey key) {
Handler handler = (Handler) key.attachment();
if (handler != null) {
handler.read();
}
}
// 处理写事件
private void write(SelectionKey key) {
Handler handler = (Handler) key.attachment();
if (handler != null) {
handler.write();
}
}
}
}
// Handler类,负责处理具体的I/O操作和业务逻辑
static class Handler {
final SelectionKey key;
final SocketChannel channel;
final ExecutorService businessPool;
final ByteBuffer readBuffer = ByteBuffer.allocate(1024);
final Queue<ByteBuffer> writeQueue = new ConcurrentLinkedQueue<>();
Handler(SelectionKey key, ExecutorService businessPool) {
this.key = key;
this.channel = (SocketChannel) key.channel();
this.businessPool = businessPool;
}
// 处理读事件
void read() {
try {
readBuffer.clear();
int numRead = channel.read(readBuffer);
if (numRead < 0) {
// 连接关闭
closeChannel();
return;
}
if (numRead > 0) {
// 提交到业务线程池处理
businessPool.execute(new Processor(this, readBuffer));
}
} catch (IOException e) {
closeChannel();
}
}
// 处理写事件
void write() {
ByteBuffer buffer;
try {
while ((buffer = writeQueue.peek()) != null) {
channel.write(buffer);
if (buffer.hasRemaining()) {
// 没写完,等待下次写事件
return;
}
writeQueue.poll(); // 写完了,移除
}
// 所有数据都写完了,取消写事件关注
key.interestOps(SelectionKey.OP_READ);
} catch (IOException e) {
closeChannel();
}
}
// 发送响应
void send(ByteBuffer buffer) {
writeQueue.offer(buffer);
// 注册写事件
key.interestOps(SelectionKey.OP_WRITE | SelectionKey.OP_READ);
key.selector().wakeup();
}
// 关闭连接
void closeChannel() {
try {
channel.close();
key.cancel();
System.out.println("客户端断开连接");
} catch (IOException e) {
e.printStackTrace();
}
}
}
// 业务处理器
static class Processor implements Runnable {
final Handler handler;
final ByteBuffer buffer;
Processor(Handler handler, ByteBuffer buffer) {
this.handler = handler;
this.buffer = ByteBuffer.allocate(buffer.position());
// 复制数据
buffer.flip();
this.buffer.put(buffer);
this.buffer.flip();
}
@Override
public void run() {
// 解析请求
String request = new String(buffer.array(), 0, buffer.limit());
System.out.println("处理请求:" + request.trim());
// 构建响应
String response = "服务器响应:" + request;
ByteBuffer responseBuffer = ByteBuffer.wrap(response.getBytes());
// 发送响应
handler.send(responseBuffer);
}
}
}
Reactor模式的优势
- 高并发处理能力:通过事件驱动和多线程协作,能够处理大量并发连接
- 资源利用率高:根据CPU核心数量创建适量的Reactor线程,充分利用多核资源
- 可扩展性强:可以根据负载情况动态调整线程池大小
- 职责分离:将接受连接、I/O处理和业务逻辑分离,代码结构清晰
Reactor模式的应用场景
Reactor模式被广泛应用于高性能网络框架和服务器中,如:
- Netty:Java最流行的NIO框架,采用主从Reactor模式
- Redis 6.0+:引入了多线程I/O模型,基于Reactor模式
- Nginx:高性能Web服务器,采用类似Reactor的事件驱动模型
- Node.js:JavaScript运行时,使用事件循环处理I/O,类似Reactor模式
总结
WebSocket和Reactor模式代表了现代网络编程的两个重要方向:实时通信和高并发处理。WebSocket通过全双工通信机制,使得服务器可以主动向客户端推送数据,为实时应用提供了基础。而Reactor模式通过事件驱动和多线程协作,实现了高效的I/O处理,为构建高性能网络服务器提供了成熟的架构模式。
掌握这两种技术,结合前两篇文章中介绍的基础知识和NIO技术,将使您能够构建出适应各种场景的网络应用,从简单的客户端-服务器通信到复杂的高并发实时系统。