一、线程池介绍
如果每一个请求过来都创建一个线程,创建线程和销毁线程的消耗是相当大的,与对象池、连接处类似,线程池就是创建好几个线程放在一个容器里,有任务则直接分配给池子中的线程执行,任务处理完后这个线程不会被销毁,继续等待后续的任务。java中提供了几个线程池的工厂方法:
1、newFixedThreadPool,该方法返回一个固定数量的线程池,当一个任务提交时,若线程池中空闲则立即执行,若没有则会暂缓在一个任务队列中,等待被执行。其选用的阻塞队列是LinkedBlockingQueue,使用的默认容量是Integer.MAX_VALUE,相当于没有上限,可以一直添加任务,用于负载比较大的服务器。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
2、newSingleThreadExecutor,创建一个线程的线程池,若空闲则执行,若没有则暂缓在队列。只用唯一的工作线程来执行任务,保证所有任务按照制定顺序(FIFO,LIFO,优先级)执行。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
3、newCachedThreadPool,没有核心线程,返回一个根据实际情况调整线程个数的线程池,不限制最大线程数,若有空闲的线程则分配任务执行,若无任务则不创建线程,且每一个空闲线程会在60s后自动回收。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
4、newScheduledThreadPool,创建可指定线程数量的线程池,且带有延迟和周期性执行任务的功能。
上面几种api都是基于ThreadPoolExecutor来构建的,下面来看一下他的构造方法和参数。
public ThreadPoolExecutor(int corePoolSize, //核心线程数量
int maximumPoolSize, //最大线程数
long keepAliveTime, //超时时间,超出核心线程数以外的线程空余存活时间
TimeUnit unit, //存活时间单位
BlockingQueue<Runnable> workQueue, //保存执行任务的队列
ThreadFactory threadFactory, //创建新线程使用的工厂
RejectedExecutionHandler handler //当任务无法执行时候的处理方式) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
二、任务流程及注意事项
1、线程池的任务处理流程如下,核心线程->队列->最大线程->自定义饱和策略。
2、如何合理配置线程池大小
如果是 CPU 密集型,主要是执行计算任务,响应时间很快,CPU 一直在运行,这种任务 CPU 的利用率很高,那么线程数的配置应该根据 CPU 核心数来决定,CPU 核心数=最大同时执行线程数,过多的线程会导致上下文切换反而使得效率降低,那线程池的最大线程数可以配置为 CPU 核心数+1;
如果是 IO 密集型,主要是进行 IO 操作,执行 IO 操作的时间较长,CPU 处于空闲状态,导致 CPU 的利用率不高,这种情况下可以增加线程池的大小。这种情况下可以结合线程的等待时长来做判断,等待时间越高,那么线程数也相对越多。一般可以配置 CPU 核心数的 2 倍。一个公式:线程池设定最佳线程数目 = ((线程池设定的线程等待时间+线程 CPU 时间)/线程 CPU 时间 )* CPU 数目。
3、任务缓存队列及排队策略
任务缓存队列 workQueue用来存放等待执行的任务,workQueue 的类型为 BlockingQueue,通常可以取下面三种类型:
1)ArrayBlockingQueue,基于数组的先进先出队列,此队列创建时必须指定大小;
2) LinkedBlockingQueue,基于链表的先进先出队列,如果创建时没有指定此队列大小,则默
import java.util.Date;
import java.util.concurrent.*;
public class ThreadPoolMonitor extends ThreadPoolExecutor {
// 保存任务开始执行的时间,当任务结束时,用任务结束时间减去开始时间计算任务执行时间
private ConcurrentHashMap<String, Date> startTimes;
public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long
keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
workQueue);
this.startTimes = new ConcurrentHashMap<>();
}
@Override
public void shutdown() {
System.out.println("已经执行的任务数: " + this.getCompletedTaskCount() + "," +
"当前活动线程数:" + this.getActiveCount() + ",当前排队线程数:" + this.getQueue().size());
System.out.println();
super.shutdown();
}
//任务开始之前记录任务开始时间
@Override
protected void beforeExecute(Thread t, Runnable r) {
startTimes.put(String.valueOf(r.hashCode()), new Date());
super.beforeExecute(t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
Date startDate = startTimes.remove(String.valueOf(r.hashCode()));
Date finishDate = new Date();
long diff = finishDate.getTime() - startDate.getTime();
// 统计任务耗时、初始线程数、核心线程数、正在执行的任务数量、
// 已完成任务数量、任务总数、队列里缓存的任务数量、
// 池中存在的最大线程数、最大允许的线程数、线程空闲时间、线程池是否关闭、线程池是否终止
System.out.print("任务耗时:" + diff + "\n");
System.out.print("初始线程数:" + this.getPoolSize() + "\n");
System.out.print("核心线程数:" + this.getCorePoolSize() + "\n");
System.out.print("正在执行的任务数量:" + this.getActiveCount() + "\n");
System.out.print("已经执行的任务数:"+this.getCompletedTaskCount()+"\n ");
System.out.print("任务总数:" + this.getTaskCount() + "\n");
System.out.print("最大允许的线程数:" + this.getMaximumPoolSize() + "\n");
System.out.print("线程空闲时间:"+this.getKeepAliveTime(TimeUnit.MILLISECONDS)+"\n ");
System.out.println();
super.afterExecute(r, t);
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolMonitor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue());
}
import java.util.concurrent.ExecutorService;
public class ThreadTest implements Runnable{
private static ExecutorService es = ThreadPoolMonitor.newCachedThreadPool();
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
for (int i = 0; i < 10; i++) {
es.execute(new ThreadTest());
}
es.shutdown();
}
}
三、Callable/Future使用及原理分析
线程池的执行任务有两种方法,一种是submit,一种是execute,这两种方法是有区别的。
1)execute只能接收一个Runnable的参数,submit可以接收Runnable和Callable两种类型的参数;
2)execute出现异常会抛出,submit方法调用不会抛出异常,除非调用future.get()方法;
3)execute没有返回值,而submit如果传入一个Callable,可以得到一个Future的返回值。
下面演示一个Callable/Future案例,其与Thread类的线程构造最大区别在于,能够很方便的获得线程执行完成以后的结果。
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class CallableDemo implements Callable<String> {
@Override
public String call() throws Exception {
return "hello, thread";
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CallableDemo callableDemo = new CallableDemo();
FutureTask futureTask = new FutureTask(callableDemo);
new Thread(futureTask).start();
System.out.println(futureTask.get());
}
}
为什么需要回调呢?因为结果值是由另外一个线程计算的,当前线程是不知道什么时候计算完成,所以传递一个回调接口给计算线程,当计算完成时调用这个回调接口,回传结果值。这个在很多地方有用到,比如Dubbo的异步调用,比如消息中间件的异步通信等等。利用FutureTask、Callable、Thread对耗时任务(如查询数据库)的预处理,在需要计算结果之前就启动计算。下面来看一下它们之间的关系图。
RunnableFuture是一个接口,继承了Runnable和Future这两个接口,Future表示一个任务的生命周期,并提供了相应的方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等。
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
/**
*当前的Future是否被取消
*/
boolean isCancelled();
/**
*当前的Future是否已结束,包括运行完成、抛出异常以及取消
*/
boolean isDone();
/**
*获取Future的结果值,如果当前Future还没有结束,那么当前线程就等待
*直到Future运行结束,那么会唤醒等待结果值的线程
*/
V get() throws InterruptedException, ExecutionException;
/**
* 获取Future的结果值,与get()相比较多了允许设置超时时间
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
我们可以把Runnable比作是生产者,Future比作是消费者,那么FutureTask是被这两者共享的,生产者运行run方法计算结果,消费者通过get方法获取结果。作为生产者消费者模式,有一个机制,如果生产者数据还没准备的时候,消费者会被阻塞,当生产者数据准备好后会唤醒消费者继续执行。下面看一下FutureTask的实现:
1)state 的含义
private volatile int state;
//新建状态,表示这个FutureTask还没有开始运行
private static final int NEW = 0;
//完成状态,表示FutureTask任务已经计算完毕了,但是还有一些后续操作,例如唤醒等待线程操作,还没有完成
private static final int COMPLETING = 1;
//FutureTask任务完结,正常完成,没有发生异常
private static final int NORMAL = 2;
//FutureTask任务完结,因为发生异常
private static final int EXCEPTIONAL = 3;
//FutureTask任务完结,因为取消任务
private static final int CANCELLED = 4;
//FutureTask任务完结,也是取消任务,不过发起了中断运行任务线程的中断请求
private static final int INTERRUPTING = 5;
//FutureTask任务完结,也是取消任务,已经完成了中断运行任务线程的中断请求
private static final int INTERRUPTED = 6;
2)run 方法
/**
* 调用callable的call方法返回结果result,根据是否发生异常,调用 set(result)或
* setException(ex)方法表示FutureTask任务完结。
* 不过因为FutureTask任务都是在多线程环境中使用,所以要注意并发冲突问题,注意在run
* 方法中,没有使用synchronized代码块或者Lock来解决并发问题,而是使用了CAS乐观锁
* 来实现并发安全,保证只有一个线程能运行FutureTask任务。
*/
public void run() {
//如果状态state不是NEW,或者设置runner值失败
//表示有别的线程在此之前调用run方法,并成功设置了runner值
//保证只有一个线程可以运行try代码块中的代码
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
3)get 方法
/**
* 阻塞获取线程执行结果,这里主要做了两个事情:
* 1. 判断当前的状态,如果状态小于等于 COMPLETING,表示FutureTask任务还没有完结,
* 所以调用awaitDone方法,让当前线程等待;
* 2. report 返回结果值或者抛出异常
*/
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
4)awaitDone方法
/**
* 如果当前的结果还没有被执行完,把当前线程线程插入到等待队列
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;//节点是否已添加
for (;;) {
// 如果当前线程中断标志位是 true,
// 那么从列表中移除节点q,并抛出InterruptedException异常
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {//当状态大于COMPLETING时,表示FutureTask任务已结束
if (q != null)
q.thread = null;// 将节点q线程设置为null,因为线程没有阻塞等待
return s;
}// 表示还有一些后序操作没有完成,那么当前线程让出执行权
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
//表示状态是NEW,那么就需要将当前线程阻塞等待,就是将它插入等待线程链表中
else if (q == null)
q = new WaitNode();
else if (!queued)
// 使用CAS函数将新节点添加到链表中,如果添加失败,那么queued为false,
// 下次循环时,会继续添加,直到成功。
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {// timed为true表示需要设置超时
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);// 让当前线程等待nanos时间
}
else
LockSupport.park(this);
}
}
5)report方法
/**
* 根据传入的状态值s,来决定是抛出异常,还是返回结果值。这个两种情况都表示FutureTask完结了
*/
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
Object x = outcome;;//表示call的返回值
if (s == NORMAL)//表示正常完结状态,所以返回结果值
return (V)x;
// 大于或等于CANCELLED,都表示手动取消FutureTask任务,
// 所以抛出CancellationException异常
if (s >= CANCELLED)
throw new CancellationException();
// 否则就是运行过程中,发生了异常,这里就抛出这个异常
throw new ExecutionException((Throwable)x);
}
下面再来看一个线程池里面的submit方法。
public class PoolCallableDemo implements Callable<String> {
@Override
public String call() throws Exception {
return "hello world";
}
public static void main(String[] args) throws ExecutionException,
InterruptedException {
ExecutorService es= Executors.newFixedThreadPool(1);
CallableDemo callableDemo=new CallableDemo();
Future future=es.submit(callableDemo);
System.out.println(future.get());
}
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
四、线程池的应用
1、配置文件中配置线程池参数
# 线程池配置
# 配置核心线程数
threadpool.coreSize = 20
# 配置最大线程数
threadpool.maxSize = 50
# 配置存活时间
threadpool.keepAliveTime = 120
# 配置队列
threadpool.dequeSize = 100
2、线程池配置类
package com.mszlu.blog.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
public class ThreadPoolConfig {
@Bean("threadPool")
public ThreadPoolTaskExecutor threadPoolTaskExecutor(ThreadPoolProperties threadPoolProperties) {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(threadPoolProperties.getCoreSize());
executor.setMaxPoolSize(threadPoolProperties.getMaxSize());
executor.setQueueCapacity(threadPoolProperties.getDequeSize());
executor.setKeepAliveSeconds(threadPoolProperties.getKeepAliveTime());
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("MyExecutor-");
executor.initialize();
return executor;
}
@ConfigurationProperties(prefix = "threadpool")
@Component
@Data
private class ThreadPoolProperties {
private Integer coreSize;
private Integer maxSize;
private Integer keepAliveTime;
private Integer dequeSize;
}
}
package com.mszlu.blog.service.impl;
import com.mszlu.blog.service.TaskService;
import com.mszlu.blog.utils.Exception.AppBaseException;
import com.mszlu.blog.utils.response.ResponseStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@Service
public class TaskServiceImpl implements TaskService {
@Autowired
private ThreadPoolTaskExecutor threadPool;
@Override
public String waitAndGetExportId(String taskId) {
Future<Map<String, String>> future = threadPool.submit(() -> {
long startTIme = 0;
while (true) {
//status为1成功,2为失败,3为等待
Map<String, String> exportInfoByTask = getExportInfoByTask(taskId);
boolean isFinish = exportInfoByTask.get("status") != "3";
if (isFinish || startTIme > 12) {
return exportInfoByTask;
}
Thread.sleep(5000);
startTIme++;
}
});
Map<String, String> callback;
try {
callback = future.get();
} catch (InterruptedException | ExecutionException e) {
throw new AppBaseException("阻塞等待导出任务结果异常" + e.getMessage(), ResponseStatus.FAIL.getStatus());
}
return callback.get("exportInfo");
}
/**
* 任务状态和信息
*
* @param taskId
* @return
*/
private Map<String, String> getExportInfoByTask(String taskId) {
HashMap<String, String> map = new HashMap<>();
String status = "1";
if (taskId != null) {
map.put("status", status);
map.put("exportInfo", "123456")
}
return map;
}
}