前言
在Java并发编程中,线程的创建和销毁是一项昂贵的操作。频繁地创建和销毁线程不仅会消耗大量的系统资源,还会影响应用程序的性能。线程池(ThreadPool)作为Java并发包中的核心组件,通过复用线程资源、管理线程生命周期、控制并发度等机制,为高并发应用提供了高效、稳定的解决方案。
本文将从源码角度深入分析ThreadPoolExecutor的设计原理、核心机制以及最佳实践,帮助读者全面掌握线程池的精髓。
一、为什么需要线程池
1.1 传统多线程方案的问题
在线程池出现之前,我们通常采用以下方式处理并发任务:
直接创建线程
// 传统方式:为每个任务创建新线程
public class TraditionalThreadExample {
public void handleRequest(Runnable task) {
Thread thread = new Thread(task);
thread.start(); // 为每个任务创建新线程
}
public static void main(String[] args) {
TraditionalThreadExample example = new TraditionalThreadExample();
// 模拟1000个并发请求
for (int i = 0; i < 1000; i++) {
example.handleRequest(() -> {
System.out.println("执行任务: " + Thread.currentThread().getName());
try {
Thread.sleep(1000); // 模拟任务执行
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
}
}
这种方案存在严重问题:
- 资源消耗巨大:每个线程大约消耗1MB的内存(栈空间)
- 创建销毁开销:线程创建和销毁需要系统调用,开销很大
- 无法控制并发度:可能创建过多线程导致系统崩溃
- 线程管理复杂:需要手动管理线程的生命周期
1.2 性能对比测试
让我们通过一个具体的测试来对比线程池和直接创建线程的性能差异:
public class ThreadPerformanceComparison {
private static final int TASK_COUNT = 1000;
private static final int THREAD_POOL_SIZE = 10;
public static void main(String[] args) throws InterruptedException {
// 测试直接创建线程
testDirectThreadCreation();
// 测试线程池
testThreadPool();
}
private static void testDirectThreadCreation() throws InterruptedException {
System.out.println("=== 直接创建线程测试 ===");
long startTime = System.currentTimeMillis();
CountDownLatch latch = new CountDownLatch(TASK_COUNT);
for (int i = 0; i < TASK_COUNT; i++) {
Thread thread = new Thread(() -> {
try {
// 模拟业务处理
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
});
thread.start();
}
latch.await();
long endTime = System.currentTimeMillis();
System.out.println("直接创建线程耗时: " + (endTime - startTime) + "ms");
}
private static void testThreadPool() throws InterruptedException {
System.out.println("=== 线程池测试 ===");
long startTime = System.currentTimeMillis();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
THREAD_POOL_SIZE, // 核心线程数
THREAD_POOL_SIZE, // 最大线程数
0L, // 空闲时间
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>() // 工作队列
);
CountDownLatch latch = new CountDownLatch(TASK_COUNT);
for (int i = 0; i < TASK_COUNT; i++) {
executor.submit(() -> {
try {
// 模拟业务处理
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
});
}
latch.await();
executor.shutdown();
long endTime = System.currentTimeMillis();
System.out.println("线程池耗时: " + (endTime - startTime) + "ms");
}
}
典型测试结果:
- 直接创建线程:2500ms+,创建1000个线程
- 线程池:800ms-,复用10个线程
1.3 线程池的核心优势
通过上面的对比,我们可以看出线程池具有以下核心优势:
1. 降低资源消耗
// 线程池复用机制示意
public class ThreadReuseExample {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 2, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>()
);
// 提交多个任务,观察线程复用
for (int i = 0; i < 10; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("任务" + taskId + " 在线程 " +
Thread.currentThread().getName() + " 中执行");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
executor.shutdown();
}
}
2. 提高响应速度
- 任务到达时,可以立即执行,无需等待线程创建
- 避免了线程创建的系统调用开销
3. 提高线程的可管理性
// 线程池管理示例
public class ThreadPoolManagement {
private final ThreadPoolExecutor executor;
public ThreadPoolManagement() {
this.executor = new ThreadPoolExecutor(
5, // 核心线程数
10, // 最大线程数
60L, // 空闲时间
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100), // 有界队列
new ThreadFactory() { // 自定义线程工厂
private final AtomicInteger counter = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "MyPool-" + counter.incrementAndGet());
t.setDaemon(false);
return t;
}
},
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
}
public void monitorThreadPool() {
ScheduledExecutorService monitor = Executors.newScheduledThreadPool(1);
monitor.scheduleAtFixedRate(() -> {
System.out.println("=== 线程池状态监控 ===");
System.out.println("核心线程数: " + executor.getCorePoolSize());
System.out.println("活跃线程数: " + executor.getActiveCount());
System.out.println("最大线程数: " + executor.getMaximumPoolSize());
System.out.println("队列任务数: " + executor.getQueue().size());
System.out.println("已完成任务数: " + executor.getCompletedTaskCount());
System.out.println("总任务数: " + executor.getTaskCount());
System.out.println();
}, 0, 5, TimeUnit.SECONDS);
}
}
4. 提供更强大的功能
- 定时执行、周期执行
- 任务优先级控制
- 拒绝策略配置
- 线程池状态监控
二、线程池核心概念
2.1 ThreadPoolExecutor核心参数
ThreadPoolExecutor是Java线程池的核心实现,它的构造函数包含7个重要参数:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
1. corePoolSize(核心线程数)
// 核心线程数示例
public class CorePoolSizeExample {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
3, // 核心线程数为3
10,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>()//Creates a LinkedBlockingQueue with a capacity of Integer.MAX_VALUE.
);
// 提交5个任务,观察线程创建情况
for (int i = 0; i < 5; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("任务" + taskId + " 在 " +
Thread.currentThread().getName() + " 执行");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
System.out.println("当前活跃线程数: " + executor.getActiveCount());
}
executor.shutdown();
}
}
设置了核心线程数为3,队列的长度是无限大的,所以不会触发扩容,只会由这三个核心线程轮着执行
2. maximumPoolSize(最大线程数)
// 最大线程数示例
public class MaxPoolSizeExample {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // 核心线程数
5, // 最大线程数
60L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3) // 容量为3的有界队列
);
// 提交10个任务,观察线程和队列的变化
for (int i = 0; i < 10; i++) {
final int taskId = i;
try {
executor.submit(() -> {
System.out.println("任务" + taskId + " 开始执行,线程: " +
Thread.currentThread().getName());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("任务" + taskId + " 执行完成");
});
Thread.sleep(100); // 稍微间隔,便于观察
System.out.printf("活跃线程: %d, 队列大小: %d, 池大小: %d%n",
executor.getActiveCount(),
executor.getQueue().size(),
executor.getPoolSize());
} catch (RejectedExecutionException e) {
System.out.println("任务" + taskId + " 被拒绝执行");
}
}
executor.shutdown();
}
}
当活跃线程为2,队列大小为3时,说明此时已经达到了线程池目前的极限,开始新增线程,直至达到最大线程数
那么就引申出来了一个问题,最大线程数应该怎么设置呢?
就要分场景来设置了,分为CPU密集型和I/O密集型。
- cpu密集型 最大线程数=cpu核心数+1 因为cpu密集型可以充分利用线程,开太多线程反而会增加上下文开销
- I/O密集型 最大线程数=cpu核心数*2 因为io操作会阻塞线程,不会利用到cpu,所以有很多的io操作时就需要大量的线程来处理,提高等待io时的cpu利用率
3. keepAliveTime与TimeUnit(空闲时间)
// 线程空闲回收示例
public class KeepAliveTimeExample {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // 核心线程数
5, // 最大线程数
3L, // 3秒空闲时间
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10)
);
// 提交大量任务,触发最大线程数
for (int i = 0; i < 15; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("任务" + taskId + " 在 " +
Thread.currentThread().getName() + " 执行");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 监控线程池大小变化
ScheduledExecutorService monitor = Executors.newScheduledThreadPool(1);
monitor.scheduleAtFixedRate(() -> {
System.out.printf("当前时间: %s, 池大小: %d, 活跃线程: %d%n",
LocalTime.now(),
executor.getPoolSize(),
executor.getActiveCount());
}, 0, 1, TimeUnit.SECONDS);
// 等待观察线程回收
Thread.sleep(15000);
monitor.shutdown();
executor.shutdown();
}
}
如果线程空闲时间超过空闲存活时间,并且当前线程数大于核心线程数的则会销毁线程,直到线程数等于核心线程数(设置 allowCoreThreadTimeOut 为 true 可以回收核心线程,默认为 false)。
2.2 工作队列(BlockingQueue)详解
工作队列是线程池的核心组件,用于存储等待执行的任务。不同类型的队列有不同的特性:
1. ArrayBlockingQueue(有界队列)
// 有界队列示例
public class ArrayBlockingQueueExample {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3) // 容量为3的有界队列
);
System.out.println("=== 有界队列特性演示 ===");
System.out.println("核心线程数: 2, 最大线程数: 4, 队列容量: 3");
for (int i = 0; i < 10; i++) {
final int taskId = i;
try {
executor.submit(() -> {
System.out.println("任务" + taskId + " 执行中...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
System.out.printf("提交任务%d后 - 活跃线程: %d, 队列大小: %d%n",
taskId,
executor.getActiveCount(),
executor.getQueue().size());
} catch (RejectedExecutionException e) {
System.out.println("任务" + taskId + " 被拒绝:队列已满且达到最大线程数");
}
}
executor.shutdown();
}
}
2. LinkedBlockingQueue(无界队列)
// 无界队列示例
public class LinkedBlockingQueueExample {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>() // 无界队列
);
System.out.println("=== 无界队列特性演示 ===");
System.out.println("核心线程数: 2, 最大线程数: 4, 队列: 无界");
// 提交大量任务
for (int i = 0; i < 20; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("任务" + taskId + " 执行中...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
if (i % 5 == 0) {
System.out.printf("提交%d个任务后 - 活跃线程: %d, 队列大小: %d, 池大小: %d%n",
i + 1,
executor.getActiveCount(),
executor.getQueue().size(),
executor.getPoolSize());
}
}
// 无界队列特点:最大线程数永远不会超过核心线程数
Thread.sleep(5000);
System.out.printf("最终状态 - 活跃线程: %d, 队列大小: %d, 池大小: %d%n",
executor.getActiveCount(),
executor.getQueue().size(),
executor.getPoolSize());
executor.shutdown();
}
}
3. 队列选择对比分析
队列类型 | 容量 | 特点 | 适用场景 | 注意事项 |
---|---|---|---|---|
ArrayBlockingQueue | 有界 | FIFO,公平/非公平可选 | 资源有限,需要控制内存使用 | 可能触发拒绝策略 |
LinkedBlockingQueue | 可选有界/无界 | FIFO,基于链表 | 任务量不确定,希望缓冲所有任务 | 无界时可能内存溢出 |
SynchronousQueue | 0 | 直接交付,无缓冲 | 任务执行速度快,希望快速响应 | 容易触发拒绝策略 |
PriorityBlockingQueue | 无界 | 支持优先级排序 | 有任务优先级需求 | 无界,需要注意内存 |
总结:
- 1.默认情况下线程不会预创建,任务提交之后才会创建线程(不过设置prestartAllCoreThreads 可以预创建核心线程)
- 2.当核心线程满了之后不会新建线程,而是把任务堆积到工作队列中。
- 3.如果工作队列放不下了,然后才会新增线程,直至达到最大线程数。
- 4.如果工作队列满了,然后也已经达到最大线程数了,这时候来任务会执行拒绝策略。
- 5.如果线程空闲时间超过空闲存活时间,并且当前线程数大于核心线程数的则会销毁线程,直到线程数等于核心线程数(设置 allowCoreThreadTimeOut 为 true 可以回收核心线程,默认为 false)。
三、ThreadPoolExecutor源码深度解析
3.1 核心数据结构
ThreadPoolExecutor的实现基于几个关键的数据结构和状态管理:
public class ThreadPoolExecutor extends AbstractExecutorService {
// 核心状态变量,使用AtomicInteger同时存储运行状态和线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 位运算常量
private static final int COUNT_BITS = Integer.SIZE - 3; // 29位
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 最大线程数量
// 线程池状态常量(高3位)
private static final int RUNNING = -1 << COUNT_BITS; // 接受新任务,处理队列任务
private static final int SHUTDOWN = 0 << COUNT_BITS; // 不接受新任务,但处理队列任务
private static final int STOP = 1 << COUNT_BITS; // 不接受新任务,不处理队列任务,中断运行任务
private static final int TIDYING = 2 << COUNT_BITS; // 所有任务已终止,workerCount为0
private static final int TERMINATED = 3 << COUNT_BITS; // terminated()方法已完成
// 工作队列
private final BlockingQueue<Runnable> workQueue;
// 线程工厂
private volatile ThreadFactory threadFactory;
// 拒绝策略
private volatile RejectedExecutionHandler handler;
// 工作线程集合
private final HashSet<Worker> workers = new HashSet<Worker>();
// 主锁
private final ReentrantLock mainLock = new ReentrantLock();
}
3.2 execute方法源码分析
execute方法是线程池任务提交的核心入口:
// execute方法的完整实现分析
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 步骤1:如果当前线程数小于核心线程数,尝试创建新线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 步骤2:如果线程池运行中且成功入队
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 双重检查:如果线程池不再运行,移除任务并拒绝
if (!isRunning(recheck) && remove(command))
reject(command);
// 如果没有工作线程,添加一个
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 步骤3:如果入队失败,尝试创建新线程
else if (!addWorker(command, false))
reject(command); // 创建失败,拒绝任务
}
四、线程池最佳实践
4.1 配置原则 最大线程数应该怎么设置呢?
public class ThreadPoolBestPractices {
/**
* CPU密集型任务的线程池配置
* 推荐配置:核心线程数 = CPU核心数 + 1
*/
public static ThreadPoolExecutor createCpuIntensivePool() {
int corePoolSize = Runtime.getRuntime().availableProcessors() + 1;
return new ThreadPoolExecutor(
corePoolSize,
corePoolSize,
0L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(100),
new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "CPU-Pool-" + counter.incrementAndGet());
t.setDaemon(false);
return t;
}
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
/**
* I/O密集型任务的线程池配置
* 推荐配置:核心线程数 = CPU核心数 * 2
*/
public static ThreadPoolExecutor createIoIntensivePool() {
int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
return new ThreadPoolExecutor(
corePoolSize,
corePoolSize * 2,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(200),
new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "IO-Pool-" + counter.incrementAndGet());
t.setDaemon(false);
return t;
}
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
}
4.2 拒绝策略选择
public class RejectionPolicyGuide {
/**
* AbortPolicy - 默认策略,抛出异常
* 适用场景:关键业务,不能丢失任务
*/
public static void abortPolicyDemo() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, 1, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1),
new ThreadPoolExecutor.AbortPolicy()
);
try {
executor.submit(() -> sleep(3000));
executor.submit(() -> sleep(1000));
executor.submit(() -> sleep(1000)); // 会被拒绝
} catch (RejectedExecutionException e) {
System.out.println("任务被拒绝: " + e.getMessage());
} finally {
executor.shutdown();
}
}
/**
* CallerRunsPolicy - 调用者运行策略
* 适用场景:Web应用,提供自然的负反馈机制
*/
public static void callerRunsPolicyDemo() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, 1, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1),
new ThreadPoolExecutor.CallerRunsPolicy()
);
System.out.println("主线程: " + Thread.currentThread().getName());
executor.submit(() -> {
System.out.println("任务1在 " + Thread.currentThread().getName() + " 执行");
sleep(2000);
});
executor.submit(() -> {
System.out.println("任务2在 " + Thread.currentThread().getName() + " 执行");
sleep(1000);
});
executor.submit(() -> {
System.out.println("任务3在 " + Thread.currentThread().getName() + " 执行");
sleep(1000);
});
executor.shutdown();
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
- AbortPolicy:当任务队列满且没有线程空闲,此时添加任务会直接抛出 RejectedExecutionException 错误,这也是默认的拒绝策略。适用于必须通知调用者任务未能被执行的场景。
- CallerRunsPolicy:当任务队列满且没有线程空闲,此时添加任务由调用者线程执行,此时线程停止调用等待调用线程执行结束之后再开始提交新的线程。适用于希望通过减缓任务提交速度来稳定系统的场景。
- DiscardOldestPolicy:当任务队列满且没有线程空闲,会删除最早的任务,然后重新提交当前任务。适用于希望丢弃最旧的任务以保证新的重要任务能够被处理的场景。
- DiscardPolicy:直接丢弃当前提交的任务,不会执行任何操作,也不会抛出异常。适用于对部分任务丢弃没有影响的场景,或系统负载较高时不需要处理所有任务。
4.3 监控和诊断
public class ThreadPoolMonitor {
private final ThreadPoolExecutor executor;
private final ScheduledExecutorService monitor;
public ThreadPoolMonitor(ThreadPoolExecutor executor) {
this.executor = executor;
this.monitor = Executors.newScheduledThreadPool(1, r -> {
Thread t = new Thread(r, "ThreadPool-Monitor");
t.setDaemon(true);
return t;
});
}
public void startMonitoring(long period, TimeUnit unit) {
monitor.scheduleAtFixedRate(this::printStats, 0, period, unit);
}
private void printStats() {
System.out.println("=== 线程池监控数据 ===");
System.out.println("时间: " + LocalDateTime.now());
System.out.println("核心线程数: " + executor.getCorePoolSize());
System.out.println("最大线程数: " + executor.getMaximumPoolSize());
System.out.println("当前线程数: " + executor.getPoolSize());
System.out.println("活跃线程数: " + executor.getActiveCount());
System.out.println("队列大小: " + executor.getQueue().size());
System.out.println("已完成任务数: " + executor.getCompletedTaskCount());
// 计算利用率
double utilization = (double) executor.getActiveCount() / executor.getMaximumPoolSize() * 100;
System.out.printf("线程池利用率: %.2f%%%n", utilization);
// 健康检查
if (utilization > 90) {
System.err.println("⚠️ 警告:线程池利用率过高");
}
System.out.println("===========================\n");
}
public void stopMonitoring() {
monitor.shutdown();
}
}
总结 在实际开发中怎么设计一个线程池
1. 配置原则
2.拒绝策略
3.监控和诊断
五、常见问题和解决方案
5.1 为什么不推荐使用Executors
阿里巴巴Java开发手册明确规定不允许使用Executors创建线程池,原因如下:
public class ExecutorsProblems {
// 问题1:FixedThreadPool和SingleThreadExecutor的OOM风险
public static void fixedThreadPoolOOMRisk() {
// Executors.newFixedThreadPool源码
// return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
// new LinkedBlockingQueue<Runnable>());
// 问题:LinkedBlockingQueue无界,可能导致OOM
ExecutorService executor = Executors.newFixedThreadPool(2);
// 快速提交大量长时间任务
for (int i = 0; i < 100000; i++) {
executor.submit(() -> {
try {
Thread.sleep(10000); // 长时间任务
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 队列中会积压大量任务,可能导致内存溢出
}
// 问题2:CachedThreadPool的线程数无限制风险
public static void cachedThreadPoolRisk() {
// Executors.newCachedThreadPool源码
// return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
// new SynchronousQueue<Runnable>());
// 问题:最大线程数为Integer.MAX_VALUE,可能创建大量线程
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10000; i++) {
executor.submit(() -> {
try {
Thread.sleep(60000); // 长时间任务
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 可能创建10000个线程,导致系统资源耗尽
}
// 推荐做法:手动创建ThreadPoolExecutor
public static ThreadPoolExecutor createRecommendedThreadPool() {
return new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(), // 核心线程数
Runtime.getRuntime().availableProcessors() * 2, // 最大线程数
60L, // 空闲时间
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000), // 有界队列
new ThreadFactory() { // 自定义线程工厂
private final AtomicInteger counter = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "CustomPool-" + counter.incrementAndGet());
t.setDaemon(false);
return t;
}
},
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
}
}
5.2 线程池死锁问题
public class ThreadPoolDeadlockSolution {
/**
* 演示线程池死锁场景
*/
public static void demonstrateDeadlock() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 2, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>()
);
// 提交外层任务
for (int i = 0; i < 3; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("外层任务" + taskId + "开始执行");
// 在任务内部提交新任务并等待结果 - 可能导致死锁
Future<String> future = executor.submit(() -> {
System.out.println("内层任务" + taskId + "执行");
return "内层任务" + taskId + "结果";
});
try {
String result = future.get(); // 等待内层任务完成
System.out.println("外层任务" + taskId + "获得结果: " + result);
} catch (Exception e) {
System.err.println("外层任务" + taskId + "异常: " + e.getMessage());
}
});
}
executor.shutdown();
}
/**
* 死锁解决方案:使用不同的线程池
*/
public static void solutionForDeadlock() {
ThreadPoolExecutor outerPool = new ThreadPoolExecutor(
2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()
);
ThreadPoolExecutor innerPool = new ThreadPoolExecutor(
2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()
);
for (int i = 0; i < 3; i++) {
final int taskId = i;
outerPool.submit(() -> {
System.out.println("外层任务" + taskId + "开始执行");
Future<String> future = innerPool.submit(() -> {
System.out.println("内层任务" + taskId + "执行");
return "内层任务" + taskId + "结果";
});
try {
String result = future.get(5, TimeUnit.SECONDS);
System.out.println("外层任务" + taskId + "获得结果: " + result);
} catch (Exception e) {
System.err.println("外层任务" + taskId + "异常: " + e.getMessage());
}
});
}
outerPool.shutdown();
innerPool.shutdown();
}
}
六、总结
6.1 线程池设计的核心思想
ThreadPoolExecutor的设计体现了多项重要的编程思想:
- 生产者-消费者模式:任务提交者作为生产者,工作线程作为消费者
- 对象池模式:线程作为昂贵资源被池化管理
- 状态机模式:线程池具有明确的状态转换机制
- 策略模式:拒绝策略和队列选择支持不同策略
6.2 核心优势
- 资源管理:可控的并发度和内存使用
- 性能提升:减少线程创建开销,提高CPU利用率
- 系统稳定性:资源隔离和过载保护
6.3 最佳实践总结
- 避免使用Executors:手动创建ThreadPoolExecutor
- 合理配置参数:根据任务特性设置核心参数
- 选择合适的队列:根据场景选择有界或无界队列
- 配置拒绝策略:根据业务需求选择合适的拒绝策略
- 建立监控体系:监控关键指标,及时发现问题
- 业务隔离:不同业务使用独立的线程池
6.4 学习建议
- 深入理解原理:阅读源码,理解实现细节
- 实践应用:在项目中合理使用线程池
- 持续关注:关注Java新版本的并发特性
结语
线程池作为Java并发编程的核心组件,其设计思想和实现技巧值得每个Java开发者深入学习。通过本文的详细分析,我们从源码层面理解了ThreadPoolExecutor的工作原理,掌握了线程池的配置、监控和调优技巧。
在实际开发中,正确使用线程池不仅能提升应用性能,更能提高系统的稳定性和可维护性。记住:
- 避免使用Executors工厂方法,手动创建ThreadPoolExecutor
- 根据任务特性合理配置线程池参数
- 建立完善的监控体系,及时发现和解决问题
- 不同业务使用独立的线程池,实现资源隔离