13.Websocket

发布于:2025-06-10 ⋅ 阅读:(23) ⋅ 点赞:(0)

java基础完结.最后补充一下 WebSocket

原生使用TCP实现Websocket

Websocket全双工通讯的协议  借助于Http协议进行连接,当客户端连接到服务端的时候会向服务端发送一个类似下面的HTTP报文: 

GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13

 这是一个HTTP的get请求报文,注意该报文中有一个upgrade首部,它的作用是告诉服务端需要将通信协议切换到websocket。
如果服务端支持websocket协议,那么它就会将自己的通信协议切换到websocket,同时发给客户端类似于以下的一个响应报文头:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=

  返回的状态码为101,表示同意客户端协议转换请求,并将它转换为websocket协议。以上过程都是利用HTTP通信完成的,称之为websocket协议握手。

实现步骤

阶段一、客户端通过 HTTP 协议发送包含特殊头部的请求,触发协议升级:

GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
  • Upgrade: websocket明确请求升级协议。
  • Sec-WebSocket-Key:客户端生成的随机字符串,用于安全验证。
  • Sec-WebSocket-Version:指定协议版本(RFC 6455 规定为 13)。

阶段二、服务器端进行响应确认,返回 101 Switching Protocols 响应:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
  • Sec-WebSocket-Accept:服务器将客户端的 Sec-WebSocket-Key 与固定字符串拼接后,计算 SHA-1 哈希并进行 Base64 编码,生成验证令牌。

阶段三、此时 TCP 连接从 HTTP 升级为 WebSocket 协议,后续数据可通过二进制帧进行传输。

阶段四、数据传输,WebSocket是一种全双工通信协议,客户端与服务端可同时发送/接收数据,无需等待对方请求,数据帧是以二进制格式进行传输的。

如下图所示:

  • FIN (1 bit):标记是否为消息的最后一个分片。
  • Opcode (4 bits):定义数据类型(如文本 0x1、二进制 0x2、关闭连接 0x8、Ping 0x9、Pong 0xA)。
  • Mask (1 bit):客户端发送的数据需掩码处理(防止缓存污染攻击),服务端发送的数据无需掩码。
  • Payload Length (7 or 7+16 or 7+64 bits):帧内容的长度,支持最大 2^64-1 字节。
  • Masking-key(32 bits),掩码密钥,由上面的标志位 MASK 决定的,如果使用掩码就是 4 个字节的随机数,否则就不存在。
  • payload data 字段:这里存放的就是真正要传输的数据

阶段五、连接关闭,客户端或服务器端都可以发起关闭。

示例代码

前端代码:

<!DOCTYPE html>
<html>
<body>
  <input type="text" id="messageInput" placeholder="输入消息">
  <button onclick="sendMessage()">发送</button>
  <div id="messages"></div>
  <script>
    // 创建 WebSocket 连接
    const socket = new WebSocket('ws://localhost:8080');
    // 连接打开时触发
    socket.addEventListener('open', () => {
      logMessage('连接已建立');
    });
    // 接收消息时触发
    socket.addEventListener('message', (event) => {
      logMessage('收到消息: ' + event.data);
    });
    // 连接关闭时触发
    socket.addEventListener('close', () => {
      logMessage('连接已关闭');
    });
    // 错误处理
    socket.addEventListener('error', (error) => {
      logMessage('连接错误: ' + error.message);
    });
    // 发送消息
    function sendMessage() {
      const message = document.getElementById('messageInput').value;
      socket.send(message);
      logMessage('发送消息: ' + message);
    }
    // 日志输出
    function logMessage(message) {
      const messagesDiv = document.getElementById('messages');
      const p = document.createElement('p');
      p.textContent = message;
      messagesDiv.appendChild(p);
    }
    
  </script>
</body>
</html>

java代码:

import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class WebSocketServer {
    private static final int PORT = 8080;
    private static final String WEBSOCKET_KEY_MAGIC = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
    private final ExecutorService threadPool = Executors.newCachedThreadPool();
    private ServerSocket serverSocket;
    private boolean running = false;

    public static void main(String[] args) {
        WebSocketServer server = new WebSocketServer();
        server.start();

        
    }

    public void start() {
        try {
            serverSocket = new ServerSocket(PORT);
            running = true;
            System.out.println("WebSocket服务器已启动,监听端口: " + PORT);

            while (running) {
                Socket clientSocket = serverSocket.accept();
                threadPool.execute(() -> handleClient(clientSocket));
            }
        } catch (IOException e) {
            if (running) {
                System.err.println("服务器启动失败: " + e.getMessage());
            }
        }
    }

    public void stop() {
        running = false;
        try {
            if (serverSocket != null && !serverSocket.isClosed()) {
                serverSocket.close();
            }
            threadPool.shutdown();
            System.out.println("服务器已停止");
        } catch (IOException e) {
            System.err.println("关闭服务器时出错: " + e.getMessage());
        }
    }

    private void handleClient(Socket clientSocket) {
        try (InputStream in = clientSocket.getInputStream();
             OutputStream out = clientSocket.getOutputStream()) {

            // 读取HTTP握手请求
            BufferedReader reader = new BufferedReader(new InputStreamReader(in));
            StringBuilder request = new StringBuilder();
            String line;
            while ((line = reader.readLine()) != null && !line.isEmpty()) {
                request.append(line).append("\r\n");
            }
            System.out.println("收到握手请求:\n" + request);

            // 提取WebSocket密钥
            String key = extractWebSocketKey(request.toString());
            if (key == null) {
                System.out.println("不是WebSocket握手请求");
                return;
            }

            // 生成响应密钥
            String responseKey = generateResponseKey(key);

            // 发送HTTP升级响应
            String response = "HTTP/1.1 101 Switching Protocols\r\n" +
                    "Upgrade: websocket\r\n" +
                    "Connection: Upgrade\r\n" +
                    "Sec-WebSocket-Accept: " + responseKey + "\r\n\r\n";
            out.write(response.getBytes(StandardCharsets.UTF_8));
            out.flush();
            System.out.println("发送握手响应");

            // 开始WebSocket通信
            communicateWebSocket(clientSocket, in, out);
        } catch (IOException e) {
            System.err.println("处理客户端时出错: " + e.getMessage());
        } finally {
            try {
                clientSocket.close();
            } catch (IOException e) {
                System.err.println("关闭客户端连接时出错: " + e.getMessage());
            }
        }
    }

    private String extractWebSocketKey(String request) {
        String[] lines = request.split("\r\n");
        for (String line : lines) {
            if (line.startsWith("Sec-WebSocket-Key:")) {
                return line.substring("Sec-WebSocket-Key:".length()).trim();
            }
        }
        return null;
    }

    private String generateResponseKey(String key) {
        try {
            String concatenated = key + WEBSOCKET_KEY_MAGIC;
            MessageDigest sha1 = MessageDigest.getInstance("SHA-1");
            byte[] hash = sha1.digest(concatenated.getBytes(StandardCharsets.UTF_8));
            return Base64.getEncoder().encodeToString(hash);
        } catch (NoSuchAlgorithmException e) {
            throw new RuntimeException("SHA-1算法不可用", e);
        }
    }

    private void communicateWebSocket(Socket clientSocket, InputStream in, OutputStream out) throws IOException {
        while (clientSocket.isConnected()) {
            // 读取WebSocket帧
            byte[] header = new byte[2];
            if (in.read(header) != 2) {
                break;
            }

            boolean fin = (header[0] & 0x80) != 0;
            int opcode = header[0] & 0x0F;
            boolean masked = (header[1] & 0x80) != 0;
            long payloadLength = header[1] & 0x7F;

            if (payloadLength == 126) {
                byte[] extended = new byte[2];
                if (in.read(extended) != 2) {
                    break;
                }
                payloadLength = ((extended[0] & 0xFF) << 8) | (extended[1] & 0xFF);
            } else if (payloadLength == 127) {
                byte[] extended = new byte[8];
                if (in.read(extended) != 8) {
                    break;
                }
                payloadLength = 0;
                for (int i = 0; i < 8; i++) {
                    payloadLength = (payloadLength << 8) | (extended[i] & 0xFF);
                }
            }

            byte[] maskingKey = new byte[4];
            if (masked && in.read(maskingKey) != 4) {
                break;
            }

            byte[] payload = new byte[(int) payloadLength];
            if (in.read(payload) != payloadLength) {
                break;
            }

            // 解掩码
            if (masked) {
                for (int i = 0; i < payloadLength; i++) {
                    payload[i] = (byte) (payload[i] ^ maskingKey[i % 4]);
                }
            }

            // 处理控制帧
            if (opcode == 8) { // 关闭帧
                System.out.println("收到关闭帧");
                sendCloseFrame(out);
                break;
            } else if (opcode == 9) { // ping帧
                System.out.println("收到ping帧");
                sendPong(out, payload);
            } else if (opcode == 1) { // 文本帧
                String message = new String(payload, StandardCharsets.UTF_8);
                System.out.println("收到消息: " + message);
                sendMessage(out, "服务器收到: " + message);
            }
        }
    }

    private void sendMessage(OutputStream out, String message) throws IOException {
        byte[] payload = message.getBytes(StandardCharsets.UTF_8);
        int payloadLength = payload.length;

        // 构建WebSocket帧
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        // 第一个字节: FIN=1, opcode=1(文本帧)
        baos.write(0x81);

        // 第二个字节: 负载长度
        if (payloadLength <= 125) {
            baos.write(payloadLength);
        } else if (payloadLength <= 65535) {
            baos.write(126);
            baos.write((payloadLength >> 8) & 0xFF);
            baos.write(payloadLength & 0xFF);
        } else {
            baos.write(127);
            for (int i = 7; i >= 0; i--) {
                baos.write((int) ((payloadLength >> (i * 8)) & 0xFF));
            }
        }

        // 写入负载数据
        baos.write(payload);

        // 发送帧
        out.write(baos.toByteArray());
        out.flush();
    }

    private void sendPong(OutputStream out, byte[] payload) throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        // 第一个字节: FIN=1, opcode=10(乒乓帧)
        baos.write(0x8A);

        // 第二个字节: 负载长度
        if (payload.length <= 125) {
            baos.write(payload.length);
        } else {
            throw new IOException("Ping负载太长,不支持");
        }

        // 写入负载数据
        baos.write(payload);

        // 发送帧
        out.write(baos.toByteArray());
        out.flush();
    }

    private void sendCloseFrame(OutputStream out) throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        // 第一个字节: FIN=1, opcode=8(关闭帧)
        baos.write(0x88);
        // 第二个字节: 负载长度为 0
        baos.write(0x00);
        out.write(baos.toByteArray());
        out.flush();
    }
}    

Tomcat 实现 WebSocket

Tomcat 实现 WebSocket 主要基于 Java WebSocket API(JSR 356),并采用了高效的非阻塞 I/O 模型来处理大量并发连接,而非传统的线程池模型。

1. Tomcat 的 WebSocket 实现原理

1.1 基于 NIO 的非阻塞 I/O 模型

Tomcat 自 7.0 版本后引入了 NIO 连接器(org.apache.coyote.http11.Http11NioProtocol),并在 8.0 + 版本中默认使用 NIO2(Http11Nio2Protocol)。这些连接器使用单线程管理多个连接,通过事件驱动的方式处理 I/O 操作,避免了传统线程池的局限性。

  • Selector 模式:Tomcat 使用java.nio.channels.Selector监听多个SocketChannel的读写事件,一个线程可以管理数千个连接。
  • 异步处理:WebSocket 连接建立后,数据读写通过异步方式进行,不会阻塞线程。
1.2 WebSocket 握手与协议升级

当客户端发起 WebSocket 握手请求(HTTP 1.1 + Upgrade: websocket头)时,Tomcat 会:

  1. 解析 HTTP 请求,验证Sec-WebSocket-Key等头信息。
  2. 生成Sec-WebSocket-Accept响应头,完成协议升级。
  3. 将连接从 HTTP 处理器移交到 WebSocket 处理器(如WsFrameServer)。
1.3 生命周期管理

Tomcat 为每个 WebSocket 连接创建一个Session对象,并通过Endpoint接口管理生命周期:

  • onOpen():连接建立时触发。
  • onMessage():收到消息时触发。
  • onClose():连接关闭时触发。
  • onError():发生错误时触发。

Tomcat 的 WebSocket 实现通过非阻塞 I/O + 少量线程(如 1 个 Acceptor 线程 + N 个 Selector 线程)即可管理大量连接,显著提升吞吐量。

2. Tomcat 的线程模型

Tomcat 的 WebSocket 处理涉及三类线程:

  1. Acceptor 线程:接收新连接,将其注册到 Selector。
  2. Selector 线程(I/O 线程):监听所有连接的 I/O 事件,触发回调。
  3. Worker 线程(可选):处理耗时操作(如业务逻辑),避免阻塞 Selector 线程。
示例配置(server.xml)
<Connector port="8080" protocol="org.apache.coyote.http11.Http11Nio2Protocol"
           maxThreads="200"           <!-- Worker线程数 -->
           acceptCount="100"          <!-- 最大等待连接数 -->
           selectorThreadCount="2"    <!-- Selector线程数 -->
           maxConnections="8192"      <!-- 最大并发连接数 -->
/>

3. 性能优化建议

  1. 调整 Selector 线程数:根据 CPU 核心数设置,通常为2~4
  2. 增大maxConnections:默认值较低(如 8192),可根据内存调整。
  3. 异步处理业务逻辑:避免在 Selector 线程中执行耗时操作。
  4. 使用 WebSocket 注解:通过@ServerEndpoint简化开发,Tomcat 会自动优化。
示例:异步处理消息
@ServerEndpoint("/ws")
public class MyWebSocket {
    @OnMessage
    public void onMessage(Session session, String message) {
        // 提交到线程池处理业务逻辑,避免阻塞Selector线程
        ExecutorService executor = Executors.newFixedThreadPool(10);
        executor.submit(() -> {
            // 处理消息(如数据库操作、调用外部服务)
            session.getAsyncRemote().sendText("处理完成");
        });
    }
}

4. 与 Netty 的对比

  • Tomcat:基于 Servlet 容器,适合与现有 Web 应用集成,开箱即用。
  • Netty:纯 NIO 框架,性能更高(约 10%~20%),但需要手动配置。

对于大多数应用,Tomcat 的 WebSocket 实现已足够高效.Tomcat 通过非阻塞 I/O + 事件驱动模型实现 WebSocket,避免了传统线程池的瓶颈,可轻松支持数万并发连接;对于高性能场景(如游戏服务器),可考虑 Netty。