文章目录
第九章、Java中的线程池
Java中的线程池时运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。
合理地使用线程池能带来三个好处:
- **降低资源消耗。**通过重复利用已创建的线程降低线程创建和销毁造成的损耗。
- **提高响应速度。**当任务到达时,任务可以不需要等到线程创建就能立即执行。
- **提高线程的可管理性。**线程是稀缺资源,使用线程池进行统一分配、调优和监控。
1.线程池的实现原理
从图中可以理解提交一个线程到线程池时线程的工作原理:
关于线程池的具体实现原理,在了解一些线程池的参数之后,在作讲解。
2.线程池的参数
我们可以通过ThredPoolExecutor 来创建一个线程池
new ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, milliseconds, runnableTaskQueue, handler);
创建线程池需要这几个参数:
corePoolSize(线程池的基本大小):提交任务到线程池,会优先创建线程(即便存在空闲的线程),等到需要执行的任务数大于 corePoolSize 时就不再创建。(这句话在后面【线程数最大数量】会有修正)
runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列:
- ArrayBlockingQueue:基于数组结构的有界阻塞队列,FIFO(先进先出)
- LinkedBlockingQueue:基于链表结构的阻塞队列,FIFO,吞吐量通常高于ArrayBlockingQueue
- SynchronousQueue:不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常高于LinkedBlockingQueue
- PriorityBlockingQueue:具有优先级的无限阻塞队列
maximumPoolSize(线程池最大数量):线程池允许创建的最大线程数。如果队列满了,并且线程池小于maximumPoolSize ,线程池就会再创建新线程。如果使用了无界的任务队列这个参数就没什么效果
ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字
RejectedExcutionHandler(饱和策略):当线程池和队列都满了,采取一种策略来处理提交的新任务。这个策略默认下是AbortPolicy,表示无法处理新任务时抛出异常。在JDK1.5中提供了以下四种策略:
AbortPolicy:直接抛出异常(默认)
CallerRunsPolicy:只用调用者所在线程来运行任务
DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务
DiscardPolicy:不处理,丢弃过
可以通过实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化存储不能处理的任务
keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。可以通过调大时间提高线程的利用率
TimeUnit(线程活动保持时间的单位):天、小时、分钟、毫秒、微妙、千分之一毫秒、纳秒。
3.向线程池提交任务
可以使用两个方法向线程池提交任务:
3.1 execute()
用于提交不需要返回值的任务,所以无法判断线程是否执行成功。
execute提交的是一个Runnable类的实例。
- 如果线程池中的线程数小于 corePoolSize ,则创建新线程来执行任务(需要获取全局锁)
- 如果线程池中的线程数等于或大于 corePoolSize ,则将任务加入 BlockingQueue
- 如果无法将任务加入 BlockingQueue(队列已满),创建新的线程来处理任务(需要获取全局锁)
- 如果创建新线程将使当前运行的线程超出 maximumPoolSize ,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法
3.2 submit()
用于提交需要返回值的任务。线程池会返回一个future类型的对象
future对象:
这个future对象可以判断任务是否执行成功,并且可以通过get()方法获取返回值,get(long timeout, TimeUnit unit) 方法则会阻塞一段时间后返回,这时任务可能没有执行完,会报TimeoutException
3.3 实战
1.不使用get()
public class ThreadTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main_start");
Runnable runnable = new Runable01();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5,
200,
10,
TimeUnit.SECONDS,new LinkedBlockingDeque<>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
threadPoolExecutor.submit(runnable);
System.out.println("main_end");
}
public static class Runable01 implements Runnable{
@Override
public void run() {
System.out.println("当前线程" + Thread.currentThread().getId());
int i = 10/5;
System.out.println("运行结果:" + i);
}
}
}
2.使用get()
public class ThreadTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main_start");
Runnable runnable = new Runable01();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5,
200,
10,
TimeUnit.SECONDS,new LinkedBlockingDeque<>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
Future<?> future = threadPoolExecutor.submit(runnable);
Object o = future.get();
System.out.println("main_end");
}
public static class Runable01 implements Runnable{
@Override
public void run() {
System.out.println("当前线程" + Thread.currentThread().getId());
int i = 10/5;
System.out.println("运行结果:" + i);
}
}
}
3.get(1L,TimeUnit.NANOSECONDS)
public class ThreadTest {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
System.out.println("main_start");
Runnable runnable = new Runable01();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5,
200,
10,
TimeUnit.SECONDS,new LinkedBlockingDeque<>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
Future<?> future = threadPoolExecutor.submit(runnable);
Object o = future.get(1L,TimeUnit.NANOSECONDS);
System.out.println("main_end");
}
public static class Runable01 implements Runnable{
@Override
public void run() {
System.out.println("当前线程" + Thread.currentThread().getId());
int i = 10/5;
System.out.println("运行结果:" + i);
}
}
}
4.关闭线程池
原理:
- 遍历线程池中的工作线程,逐个调用线程的interrupt 方法中断线程,所以无法响应中断的任务可能永远无法终止
关闭线程池的方法以及一些不同之处:
- shutdown():将线程池的状态设置为SHUTDOWN状态,然后中断所有没有正在执行任务的线程
- shutdownNow():首先将线程池中的状态设置成STOP,然后尝试停止所以正在执行或暂停任务的线程,并返回等待执行任务的线程的列表
只要调用了这两个方法中的任意一个,isShutDown就会返回true。
当所有的服务关闭后,才表示线程关闭成功,这时调用isTerminaed方法会返回true。
通常调用shutdown 方法,如果线程不一定要执行完,可以调用 shutdownNow 方法。
5.合理分配线程池
要想合理分配线程池,必须分析任务特性:
- 任务的性质:CPU 密集型任务、IO 密集型任务和混合型任务
- 任务的优先级:高、中和低
- 任务的执行时间:长、中和短
- 任务的依赖性:是否依赖其他系统资源,如数据库连接
性质不同的任务可以使用不同规模的线程池来分开处理。
如果一直有优先级高的任务提交到队列里,则优先级低的任务可能永远不能执行
建议使用有界队列,这样如果线程池中的任务耗时较大、任务量较多时仅会影响后台任务,同时提醒系统出现问题了,如果使用的无界队列,可能等到挤爆内存,整个系统不可用了,才会发现错误。
6.线程池的监控
一些线程池监控的属性:
- taskCount:线程池需要执行的任务数量
- completedTaskCount:线程池在运行过程中已完成的任务数量,小于或等于 taskCount
- largestPoolSize:线程池里曾经创建过的最大线程数量。可以直到线程池是否满过
- getPoolSize:线程池的线程数量。线程池不销毁的话,线程不会减少
- getActiveCount:获取活动的线程数
可以通过重写线程池的beforeExecute、afterExecute和terminated方法自定义线程池来对线程进行监控。这几个方法在线程池里默认是空方法。