Java线程池ThreadPoolExecutor封装工具类

发布于:2025-02-10 ⋅ 阅读:(50) ⋅ 点赞:(0)

Java线程池ThreadPoolExecutor

一、线程池特性

1.动态调整线程数量

根据当前的工作负载,ThreadPoolExecutor 可以动态地增加或减少工作线程的数量。当有新的任务提交且当前线程数小于核心线程数时,会创建新的线程;如果已有足够的线程,则将任务放入队列中。如果队列已满并且线程数未达到最大值,还会创建额外的线程来处理任务。

2.队列管理

ThreadPoolExecutor 支持多种类型的阻塞队列,如 LinkedBlockingQueue、ArrayBlockingQueue 和 SynchronousQueue 等。选择合适的队列类型可以影响线程池的行为和性能。
例如,使用无界队列(如 LinkedBlockingQueue)会导致所有超出核心线程数的任务都被排队,而不会立即创建新的线程;相反,SynchronousQueue 则强制每个任务都由一个可用线程立即处理,否则会被拒绝。

3.拒绝策略

当线程池无法接受新任务时(比如因为线程数已达上限并且队列也满了),会触发拒绝策略。Java 提供了几种内置的拒绝策略实现,也可以自定义拒绝策略:

  • AbortPolicy:抛出 RejectedExecutionException。
  • CallerRunsPolicy:由调用线程(提交任务的线程)执行该任务。
  • DiscardPolicy:静默丢弃任务。
  • DiscardOldestPolicy:丢弃队列中最老的任务,并尝试重新提交当前任务。

4.生命周期管理

ThreadPoolExecutor 提供了 shutdown() 和 shutdownNow() 方法来优雅地关闭线程池。前者会等待所有已提交的任务完成后再关闭,后者则试图立即停止所有正在执行的任务,并返回尚未开始的任务列表。

二、参数介绍和基本使用

1.参数介绍

public ThreadPoolExecutor(
    int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler)
  • corePoolSize:线程池中保持的核心线程数,即使这些线程是空闲的。
  • maximumPoolSize:线程池允许的最大线程数。
  • keepAliveTime:当线程数超过核心线程数时,多余的空闲线程在终止前等待新任务的时间。
  • unit:keepAliveTime参数的时间单位。
  • workQueue:用于保存等待执行的任务的阻塞队列。
  • threadFactory:用于创建新线程的工厂。
  • handler:当任务提交到已满的线程池时所使用的拒绝策略。

2.基本使用

public class ThreadPoolExample {
    public static void main(String[] args) {
        // 定义线程池参数
        int corePoolSize = 2; // 核心线程数
        int maximumPoolSize = 4; // 最大线程数
        long keepAliveTime = 5000; // 空闲线程存活时间
        TimeUnit unit = TimeUnit.MILLISECONDS; // 时间单位
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(10); // 工作队列
        
        // 创建 ThreadPoolExecutor
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            corePoolSize, 
            maximumPoolSize, 
            keepAliveTime, 
            unit, 
            workQueue
        );

        // 提交任务给线程池执行
        for (int i = 0; i < 10; i++) {
            final int taskNumber = i;
            executor.execute(() -> {
                System.out.println("Executing Task " + taskNumber + " by " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000); // 模拟任务耗时
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        // 关闭线程池
        executor.shutdown();
        try {
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {//等待60秒
                executor.shutdownNow();//不管线程池中的任务是否完成,都直接中断掉
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
        }
    }
}

二、线程池状态

1.线程池状态类型

ThreadPoolExecutor 内部维护了一个状态机来跟踪线程池的不同状态。包含:

  • 运行(RUNNING)
  • 关闭(SHUTDOWN)
  • 停止(STOP)
  • 整理(TIDYING)
  • 终结(TERMINATED)

2.线程池状态实现

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//初始化 ctl 的值,表示线程池处于运行状态并且当前没有活动的工作线程。
    private static final int COUNT_BITS = Integer.SIZE - 3;//表示用于存储线程计数(workerCount)的位数。由于 Java 的 int 类型有 32 位(Integer.SIZE),这里减去 3 位留给状态标志,因此留下 29 位用于线程计数。如果线程数量不满足,则可以将类型改为long类型以及AtomicInteger改了AtomicLong
    private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;//00011111111111111111111111111111 这是一个掩码,用于从 ctl 中提取出线程计数值。它由 COUNT_BITS 位全为 1 组成,即低 29 位为 1。

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;//当线程池处于运行状态时,高 3 位设置为 -1 的二进制补码形式(即 111...000)。
    private static final int SHUTDOWN   =  0 << COUNT_BITS;//线程池正在关闭,不再接受新任务,但会继续处理队列中的任务,高 3 位为 000。
    private static final int STOP       =  1 << COUNT_BITS;//线程池已停止,不再接受新任务,也不再处理队列中的任务,高 3 位为 001。
    private static final int TIDYING    =  2 << COUNT_BITS;//所有任务都已完成,线程池即将进入终结状态,高 3 位为 010。
    private static final int TERMINATED =  3 << COUNT_BITS;//线程池已经完全终止,所有资源都被释放,高 3 位为 011。

    // Packing and unpacking ctl

    /**
     * COUNT_MASK=00011111111111111111111111111111 取反之后 11100000000000000000000000000000 既为RUNNING初始状态
     * c & ~COUNT_MASK 与运算得到线程池当前的状态;可以这样理解:当前ctl的值取二进制高三位进行比对,得到的结果就是当前ctl代表的线程池状态
     *
     */
    private static int runStateOf(int c)     { return c & ~COUNT_MASK; }

    /**
     * COUNT_MASK=00011111111111111111111111111111
     * c & COUNT_MASK 去除高三位状态,相当于重置高三位的二进制为0,后29位二进制就是当前线程数量
     */
    private static int workerCountOf(int c)  { return c & COUNT_MASK; }

    /**
     * 或运算运算,例如 00011111111111111111111111111111 | 11100000000000000000000000000000 = 11111111111111111111111111111111
     * 这样理解,高三位表示状态|低29位表示线程数量,这个时候只要用一个int类型存储,那么将高三位的二进制和低29位的二进制合并一起为一个数,就是ctl的值
     */
    private static int ctlOf(int rs, int wc) { return rs | wc; }

三、封装线程池工具类

package com.zzc.common.utils;

import lombok.extern.slf4j.Slf4j;

import java.util.Map;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Slf4j
public class ThreadPoolUtils {

    private static final Map<String, ThreadPoolExecutor> THREAD_POOL_EXECUTOR;

    private static final String COMMON_THREAD_POOL_KEY;

    private static final String COMMON_SCHEDULE_EXECUTOR_POOL_KEY;

    private static final int DEFAULT_CORE_POOL_SIZE = 1;

    private static final int DEFAULT_KEEP_ALIVE_TIME = 30;

    private static final int DEFAULT_PROCESSORS = 2;

    private static int ALB_PROCESSORS = DEFAULT_PROCESSORS;

    static {
        THREAD_POOL_EXECUTOR = new ConcurrentHashMap<>();
        COMMON_THREAD_POOL_KEY = "COMMON";
        COMMON_SCHEDULE_EXECUTOR_POOL_KEY = "SCHEDULE-COMMON";
        int processors = Runtime.getRuntime().availableProcessors();
        ALB_PROCESSORS = processors;
        log.info("availableProcessors:{}, DEFAULT_PROCESS:{}", ALB_PROCESSORS, DEFAULT_PROCESSORS);
        ALB_PROCESSORS = Math.max(ALB_PROCESSORS, DEFAULT_PROCESSORS);
    }

//    private static Executor executor = Executors.newFixedThreadPool()

    /**
     *
     * @param threadPoolKey
     * @param corePoolSize
     * @param maxPoolSize
     * @param keepAliveTime
     * @param timeUnit
     * @param discardContinueWait 如果被拒绝,则等待时间,单位ms
     * @return
     */
    public static ThreadPoolExecutor newThreadPoolExecutorDirectAndAsy(String threadPoolKey, int corePoolSize,
                                                                       int maxPoolSize, int keepAliveTime, TimeUnit timeUnit,
                                                                       int discardContinueWait) {
        return newThreadPoolExecutor(threadPoolKey, corePoolSize, maxPoolSize, keepAliveTime, timeUnit,  new SynchronousQueue(true), new DiscardSynchronousQueueWaitPolicy(discardContinueWait));
    }

    public static ThreadPoolExecutor newThreadPoolExecutorNewThreadToRun(String threadPoolKey, int corePoolSize,
                                                                       int maxPoolSize, int keepAliveTime, TimeUnit timeUnit) {
        return newThreadPoolExecutor(threadPoolKey, corePoolSize, maxPoolSize, keepAliveTime, timeUnit,  new SynchronousQueue(true), new NewThreadToRun());
    }

    public static ThreadPoolExecutor newThreadPoolExecutor(String threadPoolKey, int corePoolSize,
                                                           int maxPoolSize, int keepAliveTime, TimeUnit timeUnit,
                                                           BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        if (StrUtils.isBlank(threadPoolKey)) {
            throw new RuntimeException("threadPoolKey is null");
        }
        if (THREAD_POOL_EXECUTOR.containsKey(threadPoolKey)) {
            return THREAD_POOL_EXECUTOR.get(threadPoolKey);
        }
        log.info("before new threadPool, threadPoolKey:{}, corePoolSize:{}, maxPoolSize:{}, keepAliveTime:{}, timeUnit:{}", threadPoolKey, corePoolSize, maxPoolSize, keepAliveTime, timeUnit);
        corePoolSize = corePoolSize <= 0 ? DEFAULT_CORE_POOL_SIZE : corePoolSize;
        maxPoolSize = maxPoolSize <= 0 ? corePoolSize : maxPoolSize;
        keepAliveTime = keepAliveTime <= 0 ? DEFAULT_KEEP_ALIVE_TIME : keepAliveTime;
        timeUnit = timeUnit == null ? TimeUnit.SECONDS : timeUnit;
        ThreadPoolExecutor executor = new PThreadPoolExecutor(threadPoolKey, corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue, newThreadFactory(threadPoolKey), handler);
        log.info("after new thread pool, threadPoolKey:{}, corePoolSize:{}, maxPoolSize:{}, keepAliveTime:{}, timeUnit:{}", threadPoolKey, corePoolSize, maxPoolSize, keepAliveTime, timeUnit);

        THREAD_POOL_EXECUTOR.put(threadPoolKey, executor);
        return executor;
    }

    public static ScheduledExecutorService getCommonScheduleExecutorsPool() {
        ScheduledThreadPoolExecutor executorService = (ScheduledThreadPoolExecutor) THREAD_POOL_EXECUTOR.get(COMMON_SCHEDULE_EXECUTOR_POOL_KEY);
        if (executorService == null) {
            executorService = new ScheduledThreadPoolExecutor(2, newThreadFactory(COMMON_SCHEDULE_EXECUTOR_POOL_KEY));
            THREAD_POOL_EXECUTOR.put(COMMON_SCHEDULE_EXECUTOR_POOL_KEY, executorService);
        }
        return executorService;
    }

    public static ThreadFactory newThreadFactory(String threadPrefix) {
        ThreadFactory threadFactory = new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName(threadPrefix);
                thread.setDaemon(true);
                log.info("new thread:{}", threadPrefix);
                return thread;
            }
        };
        return threadFactory;
    }

    /**
     * 拒绝策略
     * 直接给主线程自己执行
     */
    static class CallerRunsPolicy extends ThreadPoolExecutor.CallerRunsPolicy {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            String threadKey = "";
            if (e instanceof PThreadPoolExecutor) {
                threadKey = ((PThreadPoolExecutor) e).getThreadPoolKey();
            }
            if (r instanceof Thread) {
                log.warn("CallRuns theadKey:{}, reject hashCode:{}, thread:{}", threadKey, r.hashCode(), ((Thread) r).getClass().getSimpleName());
            } else {
                log.warn("CallRuns theadKey:{}, reject hashCode:{}, thread:{}", threadKey, r.hashCode(), r.getClass().getSimpleName());
            }
            super.rejectedExecution(r, e);
        }
    }

    /**
     * 拒绝策略 -- activateMQ的做法
     * 重新尝试加入队列,等待超时,影响线程执行效率
     */
    static class DiscardSynchronousQueueWaitPolicy implements RejectedExecutionHandler {

        private long discardContinueWait = 1;

        public DiscardSynchronousQueueWaitPolicy(long discardContinueWait) {
            if (discardContinueWait <= 0) {
                discardContinueWait = 1;
            }
            this.discardContinueWait = discardContinueWait;
        }

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            String threadKey = "";
            if (executor instanceof PThreadPoolExecutor) {
                threadKey = ((PThreadPoolExecutor) executor).getThreadPoolKey();
            }
            if (r instanceof Thread) {
                log.warn("Discard threadKey:{}, reject hashCode:{}, thread:{}", threadKey, r.hashCode(), ((Thread) r).getClass().getSimpleName());
            } else {
                log.warn("Discard threadKey:{}, reject hashCode:{}, thread:{}", threadKey, r.hashCode(), r.getClass().getSimpleName());
            }

            if (!executor.isShutdown()) {
                try {
                    executor.getQueue().poll(discardContinueWait, TimeUnit.MICROSECONDS);
                } catch (InterruptedException e) {
                    log.error("rejectedExecution", e);
                }
                executor.execute(r);
            }
        }
    }

    /**
     * 拒绝策略 -- netty的做法
     * 创建新的线程,直接执行,直到系统创建不了新的线程为止
     */
    static class NewThreadToRun implements RejectedExecutionHandler {

        public NewThreadToRun() {
            super();
        }

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            String threadKey = "";
            if (executor instanceof PThreadPoolExecutor) {
                threadKey = ((PThreadPoolExecutor) executor).getThreadPoolKey();
            }
            if (r instanceof Thread) {
                log.warn("NewThreadToRun threadKey:{}, reject hashCode:{}, thread:{}", threadKey, r.hashCode(), ((Thread) r).getClass().getSimpleName());
            } else {
                log.warn("NewThreadToRun threadKey:{}, reject hashCode:{}, thread:{}", threadKey, r.hashCode(), r.getClass().getSimpleName());
            }
            try {
                final Thread thread = new Thread(r, "new thread: " + threadKey);
                thread.start();
            } catch (Exception e) {
                throw new RejectedExecutionException("Failed to start new thread. threadKey:" + threadKey, e);
            }
        }

    }

    /**
     * 线程池,添加添加一些日志打印
     */
    static class PThreadPoolExecutor extends ThreadPoolExecutor {

        private String threadPoolKey;

        public PThreadPoolExecutor(String threadPoolKey, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
            this.threadPoolKey = threadPoolKey;
        }

        @Override
        public void execute(Runnable command) {
            try {
                super.execute(command);
            } catch (Exception e) {
                log.error("execute error.", e);
            }
            log.debug("execute runnable, hashCode:{}, threadPoolKey:{}, poolSize:{}, largestPoolSize:{}, activeCount:{}, taskCount:{}, completedTaskCount:{}, queueSize:{}",
                    command.hashCode(), threadPoolKey, this.getPoolSize(), this.getLargestPoolSize(), this.getActiveCount(), this.getTaskCount(), this.getCompletedTaskCount(), this.getQueue().size());
        }

        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            if (t != null) {
                log.error("execute Runnable error, hashCode:{}", r.hashCode(), t);
            }
            super.afterExecute(r, t);
        }

        public String getThreadPoolKey() {
            return threadPoolKey;
        }
    }

}


网站公告

今日签到

点亮在社区的每一天
去签到