【面试题】利用Promise实现Websocket阻塞式await wsRequest() 请求

发布于:2025-03-28 ⋅ 阅读:(23) ⋅ 点赞:(0)

逻辑实现过程

1. 目标与基础设计

  • 目标:实现一个类似 HTTP 请求的阻塞式调用接口(如 await wsRequest(...)),让开发者无需手动处理 WebSocket 的事件回调,而是通过 Promise 和 async/await 获得同步体验。

  • 基础设计:

    • 创建一个 WebSocketClient 类,封装 WebSocket 的连接和消息处理逻辑。

    • 使用 Promise 将 WebSocket 的异步消息机制转化为请求-响应模型。

    • 为每个请求生成唯一的 requestId,用于匹配客户端请求和服务器响应。

2. WebSocket 连接管理

  • 初始化连接:

    • 在构造函数中实例化 WebSocket 对象,并绑定基本事件监听(onopen、onmessage、onerror、onclose)。

    • 维护一个 isConnected 状态,跟踪连接是否可用。

  • 事件处理:

    • onopen:标记连接成功,触发队列处理。

    • onmessage:解析收到的消息,提取 requestId 和 data,通过 requestMap 找到对应的 Promise 并完成它。

    • onerror 和 onclose:标记连接失败,清理未完成请求并尝试重连。

3. wsRequest 方法的核心逻辑

  • 请求发送:

    • 生成唯一 requestId,与请求数据组成一个对象(如 { requestId, data })。

    • 检查连接状态(isConnected 和 ws.readyState),确保请求只在连接可用时发送。

    • 将请求数据序列化为 JSON 并通过 ws.send 发送。

  • Promise 封装:

    • 创建一个 Promise,将其 resolve 函数存储到 requestMap 中,以 requestId 为键。

    • 当收到服务器响应时,通过 requestId 找到对应的 resolve 并执行。

  • 超时处理:

    • 为每个请求设置超时计时器(默认 5 秒),超时后移除 requestMap 中的记录并触发重试或失败。

4. 消息队列与重连机制

  • 消息队列:

    • 当连接不可用时,将请求(包括数据和 Promise 的 resolve/reject)存入 messageQueue。

    • 连接恢复后,逐步处理队列中的请求。

  • 重连机制:

    • 如果连接断开,通过 reconnect 方法延迟(默认 1 秒)尝试重新连接。

    • 重连成功后触发 flushQueue,处理积压的请求。

5. 队列处理的异步优化

  • 初始实现:

    • 使用 while 循环同步处理队列,可能导致主线程阻塞。

  • 优化为异步:

    • 使用 setTimeout 递归调用 flushQueue,每次处理一个请求后将控制权交还给事件循环。

    • 在请求完成后(.finally),异步触发下一次处理。

6. 错误与清理

  • 错误处理:

    • 连接错误或关闭时,通过 rejectAllPending 拒绝所有未完成请求。

    • 超时或重试超限时,返回明确的错误信息。

  • 资源清理:

    • 在超时或请求完成时清理 requestMap,避免内存泄漏。

    • 提供 close 方法,手动关闭 WebSocket 连接。

7. 最终接口

  • 用户通过 await wsClient.wsRequest({data: xxx}) 调用,获得类似 HTTP 请求的阻塞式体验。

  • 支持配置选项(如超时时间、重试次数、重试间隔),增强灵活性。


考虑的问题与解决方案

1. WebSocket 的异步特性

  • 问题:WebSocket 是事件驱动的,没有内置的请求-响应匹配机制。

  • 解决方案:

    • 使用 requestId 标识每个请求,服务器返回时带上相同的 requestId。

    • 用 Promise 封装请求,监听 onmessage 时根据 requestId 完成对应的 Promise。

2. 连接状态的不确定性

  • 问题:WebSocket 可能未连接、断开或出错,导致请求失败。

  • 解决方案:

    • 检查 readyState 和 isConnected,未连接时将请求加入队列并尝试重连。

    • 提供重试机制(默认 3 次),确保网络不稳定时仍有机会成功。

3. 请求超时的风险

  • 问题:服务器可能延迟响应,导致 Promise 无限等待。

  • 解决方案:

    • 为每个请求设置超时(默认 5 秒),超时后触发重试或失败。

    • 在 requestMap 中动态更新 resolve,确保超时后仍能正确清理。

4. 主线程阻塞

  • 问题:使用 while 处理队列可能阻塞主线程,影响页面响应性。

  • 解决方案:

    • 改为 setTimeout 异步递归处理队列,每次只处理一个请求,避免同步循环。

5. 服务器配合

  • 问题:WebSocket 本身不保证响应格式,依赖服务器正确实现。

  • 解决方案:

    • 假设服务器返回 JSON 格式的消息,包含 requestId 和 data。

    • 文档中明确说明服务器端需配合返回匹配的 requestId。

6. 资源管理与内存泄漏

  • 问题:未完成的请求可能堆积,导致内存泄漏。

  • 解决方案:

    • 在超时、连接关闭或错误时清理 requestMap。

    • 队列处理失败时移除对应项,避免无限积压。

7. 用户体验与灵活性

  • 问题:开发者需要简单易用的接口,同时支持自定义配置。

  • 解决方案:

    • 提供阻塞式接口(await wsRequest),隐藏复杂的事件逻辑。

    • 通过 options 支持配置超时、重试次数等参数。

class WebSocketClient {
  constructor(url, options = {}) {
    this.url = url;
    this.ws = null;
    this.requestMap = new Map(); // 存储 requestId 和 Promise resolver
    this.messageQueue = []; // 未连接时的消息队列
    this.isConnected = false;
    this.options = {
      maxRetries: options.maxRetries || 3, // 默认最大重试次数
      retryDelay: options.retryDelay || 1000, // 默认重试间隔 1 秒
      timeout: options.timeout || 5000, // 默认超时 5 秒
    };

    this.connect(); // 初始化连接
  }

  // 建立 WebSocket 连接
  connect() {
    this.ws = new WebSocket(this.url);

    this.ws.onopen = () => {
      console.log('WebSocket 连接已建立');
      this.isConnected = true;
      this.flushQueue(); // 连接成功后发送队列中的消息
    };

    this.ws.onmessage = (event) => {
      const response = JSON.parse(event.data);
      const { requestId, data } = response;

      const resolver = this.requestMap.get(requestId);
      if (resolver) {
        resolver(data);
        this.requestMap.delete(requestId);
      }
    };

    this.ws.onerror = (error) => {
      console.error('WebSocket 错误:', error);
      this.isConnected = false;
    };

    this.ws.onclose = () => {
      console.log('WebSocket 连接已关闭');
      this.isConnected = false;
      this.rejectAllPending('WebSocket 连接关闭');
    };
  }

  // 发送请求并返回 Promise
  wsRequest(data, retryCount = 0) {
    return new Promise((resolve, reject) => {
      // 生成唯一 requestId
      const requestId = Date.now() + Math.random().toString(36).slice(2);
      const request = { requestId, data };

      // 检查连接状态
      if (!this.isConnected || this.ws.readyState !== WebSocket.OPEN) {
        if (retryCount < this.options.maxRetries) {
          console.log(`WebSocket 未连接,第 ${retryCount + 1} 次重试...`);
          this.messageQueue.push({ request, resolve, reject, retryCount: retryCount + 1 });
          this.reconnect();
          return;
        }
        return reject(new Error('WebSocket 未连接且重试次数已达上限'));
      }

      // 存储 resolver
      this.requestMap.set(requestId, resolve);

      // 发送请求
      this.ws.send(JSON.stringify(request));

      // 设置超时
      const timeoutId = setTimeout(() => {
        if (this.requestMap.has(requestId)) {
          if (retryCount < this.options.maxRetries) {
            console.log(`请求超时,第 ${retryCount + 1} 次重试...`);
            this.requestMap.delete(requestId);
            this.wsRequest(data, retryCount + 1).then(resolve).catch(reject);
          } else {
            reject(new Error('请求超时且重试次数已达上限'));
            this.requestMap.delete(requestId);
          }
        }
      }, this.options.timeout);

      // 清理超时时绑定
      this.requestMap.set(requestId, (value) => {
        clearTimeout(timeoutId);
        resolve(value);
      });
    });
  }

  // 重连逻辑
  reconnect() {
    if (!this.isConnected) {
      setTimeout(() => {
        console.log('尝试重连...');
        this.connect();
      }, this.options.retryDelay);
    }
  }

  // 使用 setTimeout 异步处理消息队列
  flushQueue() {
    if (!this.isConnected || this.messageQueue.length === 0) return;

    const { request, resolve, reject, retryCount } = this.messageQueue.shift();
    this.wsRequest(request.data, retryCount)
      .then(resolve)
      .catch(reject)
      .finally(() => {
        // 递归调用,确保队列逐步处理
        setTimeout(() => this.flushQueue(), 0);
      });
  }

  // 拒绝所有未完成的请求
  rejectAllPending(errorMessage) {
    this.requestMap.forEach((resolver, requestId) => {
      resolver(Promise.reject(new Error(errorMessage)));
      this.requestMap.delete(requestId);
    });
  }

  // 关闭连接
  close() {
    if (this.ws) {
      this.ws.close();
    }
  }
}

// 使用示例
async function testWebSocket() {
  const wsClient = new WebSocketClient('ws://example.com/socket', {
    maxRetries: 3,
    retryDelay: 1000,
    timeout: 5000,
  });

  try {
    // 发送阻塞式请求
    const response = await wsClient.wsRequest({ action: 'getData', value: 'xxx' });
    console.log('收到响应:', response);

    // 模拟未连接时发送请求
    wsClient.ws.close(); // 手动关闭连接以测试重试和队列
    const response2 = await wsClient.wsRequest({ action: 'update', value: 'yyy' });
    console.log('收到响应:', response2);
  } catch (error) {
    console.error('请求失败:', error);
  } finally {
    wsClient.close();
  }
}

testWebSocket();