高级版 Web Worker 封装(含 WorkerPool 调度池 + 超时控制)

发布于:2025-06-24 ⋅ 阅读:(19) ⋅ 点赞:(0)

🧠 高级版 Web Worker 封装(含 WorkerPool 调度池 + 超时控制)

📌 掌握后可实现:多线程任务并发处理、任务分配优化、最大并发数限制、任务队列调度。


一、核心思路:Worker Pool 调度池

Web Worker 虽然可以无限创建,但过多线程反而拖慢浏览器性能,因此我们需要限制同时运行的 Worker 数量,并为其分配任务队列:

  • 🧵 最多同时运行 N 个 Worker
  • ⏱ 任务支持超时(防止卡死)
  • 🚦 未分配的任务进入等待队列
  • 🔁 任务完成后自动调度下一个任务

二、核心封装代码

✅ 1. WorkerPool.ts

type TaskPayload = any;
type TaskResult = any;
type TaskCallback = (res: TaskResult) => void;

interface Task {
  payload: TaskPayload;
  resolve: (data: TaskResult) => void;
  reject: (err: any) => void;
  timeoutId?: any;
}

export class WorkerPool {
  private poolSize: number;
  private workers: Worker[] = [];
  private busyWorkers: Set<number> = new Set();
  private taskQueue: Task[] = [];
  private workerPath: string;

  constructor(workerPath: string, poolSize = 4) {
    this.workerPath = workerPath;
    this.poolSize = poolSize;
    for (let i = 0; i < poolSize; i++) {
      const worker = new Worker(new URL(workerPath, import.meta.url), { type: 'module' });
      worker.onmessage = this.handleMessage(i);
      worker.onerror = this.handleError(i);
      this.workers.push(worker);
    }
  }

  private handleMessage = (index: number) => (event: MessageEvent) => {
    const task = this.workers[index]['__currentTask__'];
    if (task?.timeoutId) clearTimeout(task.timeoutId);

    task.resolve(event.data);
    delete this.workers[index]['__currentTask__'];
    this.busyWorkers.delete(index);
    this.runNextTask();
  };

  private handleError = (index: number) => (err: ErrorEvent) => {
    const task = this.workers[index]['__currentTask__'];
    if (task?.timeoutId) clearTimeout(task.timeoutId);

    task.reject(err);
    delete this.workers[index]['__currentTask__'];
    this.busyWorkers.delete(index);
    this.runNextTask();
  };

  private runNextTask() {
    if (this.taskQueue.length === 0) return;

    const idleIndex = this.workers.findIndex((_, i) => !this.busyWorkers.has(i));
    if (idleIndex === -1) return;

    const task = this.taskQueue.shift()!;
    const worker = this.workers[idleIndex];
    worker['__currentTask__'] = task;
    this.busyWorkers.add(idleIndex);

    task.timeoutId = setTimeout(() => {
      task.reject(new Error('Worker task timeout'));
      this.busyWorkers.delete(idleIndex);
      this.runNextTask();
    }, 10000); // 10秒超时

    worker.postMessage(task.payload);
  }

  public run(payload: TaskPayload): Promise<TaskResult> {
    return new Promise((resolve, reject) => {
      const task: Task = { payload, resolve, reject };
      this.taskQueue.push(task);
      this.runNextTask();
    });
  }

  public destroy() {
    this.workers.forEach((w) => w.terminate());
    this.workers = [];
    this.taskQueue = [];
    this.busyWorkers.clear();
  }
}

✅ 2. 示例 worker:sortWorker.js

// workers/sortWorker.js
self.onmessage = ({ data }) => {
  // 模拟 CPU 密集排序
  const result = data.sort((a, b) => a - b);
  self.postMessage(result);
};

三、Vue 示例:并发排序多个数据块

<script setup lang="ts">
import { WorkerPool } from '@/utils/WorkerPool';
import { ref, onBeforeUnmount } from 'vue';

const pool = new WorkerPool('@/workers/sortWorker.js', 4); // 限制最大并发为 4

const results = ref<number[][]>([]);
const blocks = Array.from({ length: 10 }, () =>
  Array.from({ length: 10000 }, () => Math.floor(Math.random() * 1000000))
);

const sortAll = async () => {
  results.value = [];
  const sortedBlocks = await Promise.all(blocks.map((block) => pool.run(block)));
  results.value = sortedBlocks;
};

onBeforeUnmount(() => pool.destroy());
</script>

<template>
  <el-button @click="sortAll">并发排序</el-button>
  <div v-for="(res, i) in results" :key="i">
    Block {{ i + 1 }} - Top 5: {{ res.slice(0, 5).join(', ') }}
  </div>
</template>

四、优势总结

能力项 说明
多线程并发 限制同时运行 Worker 数量,避免性能浪费
自动调度 任务完成自动分配新任务,无需手动管理
支持超时 防止某任务永久挂起
生命周期管理 支持销毁、清理任务,配合 Vue 生命周期使用更安全
更适合大型项目 封装通用接口、路径可配置,适用于图像处理/数据处理等场景

五、典型应用场景推荐

✅ 适用于:

  • 表格大数据处理(筛选/排序/聚合)
  • 图像像素处理(滤镜/转码)
  • 地图坐标点聚合(如高德/百度点图层)
  • AI 小模型推理任务(WebAssembly + Worker)
  • 并发压缩、解压缩、大文件切片等前处理

🧩 拓展建议

  1. ✅ 支持多类型 Worker(注册任务类型)
  2. ✅ 支持任务 ID / cancel 功能
  3. ✅ 状态订阅支持(如 loading 状态分发)

💬 如果你希望我继续生成「支持多类型任务的多 Worker 调度中心」进阶封装,请回复 “生成多任务 Worker 调度中心”


📦 本教程已完整覆盖:

  • Web Worker 基础通信
  • Worker 通用封装
  • Worker Pool 调度池
  • Vue 实战 + 自动调度 + 超时处理

🚀 学会之后,你的前端就具备真正“多线程”能力!


网站公告

今日签到

点亮在社区的每一天
去签到