线程池介绍与应用

发布于:2023-01-18 ⋅ 阅读:(387) ⋅ 点赞:(0)

一、线程池介绍

        如果每一个请求过来都创建一个线程,创建线程和销毁线程的消耗是相当大的,与对象池、连接处类似,线程池就是创建好几个线程放在一个容器里,有任务则直接分配给池子中的线程执行,任务处理完后这个线程不会被销毁,继续等待后续的任务。java中提供了几个线程池的工厂方法:

1、newFixedThreadPool,该方法返回一个固定数量的线程池,当一个任务提交时,若线程池中空闲则立即执行,若没有则会暂缓在一个任务队列中,等待被执行。其选用的阻塞队列是LinkedBlockingQueue,使用的默认容量是Integer.MAX_VALUE,相当于没有上限,可以一直添加任务,用于负载比较大的服务器。

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

2、newSingleThreadExecutor,创建一个线程的线程池,若空闲则执行,若没有则暂缓在队列。只用唯一的工作线程来执行任务,保证所有任务按照制定顺序(FIFO,LIFO,优先级)执行。

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

3、newCachedThreadPool,没有核心线程,返回一个根据实际情况调整线程个数的线程池,不限制最大线程数,若有空闲的线程则分配任务执行,若无任务则不创建线程,且每一个空闲线程会在60s后自动回收。

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

4、newScheduledThreadPool,创建可指定线程数量的线程池,且带有延迟和周期性执行任务的功能。

        上面几种api都是基于ThreadPoolExecutor来构建的,下面来看一下他的构造方法和参数。

public ThreadPoolExecutor(int corePoolSize, //核心线程数量
                              int maximumPoolSize, //最大线程数
                              long keepAliveTime, //超时时间,超出核心线程数以外的线程空余存活时间
                              TimeUnit unit, //存活时间单位
                              BlockingQueue<Runnable> workQueue, //保存执行任务的队列
                              ThreadFactory threadFactory, //创建新线程使用的工厂
                              RejectedExecutionHandler handler //当任务无法执行时候的处理方式) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

二、任务流程及注意事项

1、线程池的任务处理流程如下,核心线程->队列->最大线程->自定义饱和策略。

 2、如何合理配置线程池大小

        如果是 CPU 密集型,主要是执行计算任务,响应时间很快,CPU 一直在运行,这种任务 CPU 的利用率很高,那么线程数的配置应该根据 CPU 核心数来决定,CPU 核心数=最大同时执行线程数,过多的线程会导致上下文切换反而使得效率降低,那线程池的最大线程数可以配置为 CPU 核心数+1;

        如果是 IO 密集型,主要是进行 IO 操作,执行 IO 操作的时间较长,CPU 处于空闲状态,导致 CPU 的利用率不高,这种情况下可以增加线程池的大小。这种情况下可以结合线程的等待时长来做判断,等待时间越高,那么线程数也相对越多。一般可以配置 CPU 核心数的 2 倍。一个公式:线程池设定最佳线程数目 = ((线程池设定的线程等待时间+线程 CPU 时间)/线程 CPU 时间 )* CPU 数目。

3、任务缓存队列及排队策略

        任务缓存队列 workQueue用来存放等待执行的任务,workQueue 的类型为 BlockingQueue,通常可以取下面三种类型:

1)ArrayBlockingQueue,基于数组的先进先出队列,此队列创建时必须指定大小;

2) LinkedBlockingQueue,基于链表的先进先出队列,如果创建时没有指定此队列大小,则默

认为 Integer.MAX_VALUE;
3)SynchronousQueue,这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个
线程来执行新来的任务。
4、线程池的监控
        如果在项目中大规模的使用了线程池,那么必须要有一套监控体系来查看当前线程池的状
态,当出现问题的时候可以快速定位到问题。而线程池提供了相应的扩展方法,我们通过重
写线程池的 beforeExecute、afterExecute 和 shutdown 等方式就可以实现对线程的监控,简
单给大家演示一个案例。
1)监控类代码
import java.util.Date;
import java.util.concurrent.*;

public class ThreadPoolMonitor extends ThreadPoolExecutor {

    // 保存任务开始执行的时间,当任务结束时,用任务结束时间减去开始时间计算任务执行时间
    private ConcurrentHashMap<String, Date> startTimes;

    public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long
            keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
                workQueue);
        this.startTimes = new ConcurrentHashMap<>();
    }

    @Override
    public void shutdown() {
        System.out.println("已经执行的任务数: " + this.getCompletedTaskCount() + "," +
                "当前活动线程数:" + this.getActiveCount() + ",当前排队线程数:" + this.getQueue().size());
        System.out.println();
        super.shutdown();
    }

    //任务开始之前记录任务开始时间
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        startTimes.put(String.valueOf(r.hashCode()), new Date());
        super.beforeExecute(t, r);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        Date startDate = startTimes.remove(String.valueOf(r.hashCode()));
        Date finishDate = new Date();
        long diff = finishDate.getTime() - startDate.getTime();
        // 统计任务耗时、初始线程数、核心线程数、正在执行的任务数量、
        // 已完成任务数量、任务总数、队列里缓存的任务数量、
        // 池中存在的最大线程数、最大允许的线程数、线程空闲时间、线程池是否关闭、线程池是否终止
        System.out.print("任务耗时:" + diff + "\n");
        System.out.print("初始线程数:" + this.getPoolSize() + "\n");
        System.out.print("核心线程数:" + this.getCorePoolSize() + "\n");
        System.out.print("正在执行的任务数量:" + this.getActiveCount() + "\n");
        System.out.print("已经执行的任务数:"+this.getCompletedTaskCount()+"\n ");
        System.out.print("任务总数:" + this.getTaskCount() + "\n");
        System.out.print("最大允许的线程数:" + this.getMaximumPoolSize() + "\n");
        System.out.print("线程空闲时间:"+this.getKeepAliveTime(TimeUnit.MILLISECONDS)+"\n ");
        System.out.println();
        super.afterExecute(r, t);
    }

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolMonitor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue());
    }
2)测试类代码
import java.util.concurrent.ExecutorService;

public class ThreadTest implements Runnable{
        private static ExecutorService es = ThreadPoolMonitor.newCachedThreadPool();
        @Override
        public void run() {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        public static void main(String[] args) throws Exception {
            for (int i = 0; i < 10; i++) {
                es.execute(new ThreadTest());
            }
            es.shutdown();
        }
}

三、Callable/Future使用及原理分析

        线程池的执行任务有两种方法,一种是submit,一种是execute,这两种方法是有区别的。

1)execute只能接收一个Runnable的参数,submit可以接收Runnable和Callable两种类型的参数;

2)execute出现异常会抛出,submit方法调用不会抛出异常,除非调用future.get()方法;

3)execute没有返回值,而submit如果传入一个Callable,可以得到一个Future的返回值。

        下面演示一个Callable/Future案例,其与Thread类的线程构造最大区别在于,能够很方便的获得线程执行完成以后的结果。

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class CallableDemo implements Callable<String> {

    @Override
    public String call() throws Exception {

        return "hello, thread";

    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CallableDemo callableDemo = new CallableDemo();
        FutureTask futureTask = new FutureTask(callableDemo);
        new Thread(futureTask).start();
        System.out.println(futureTask.get());
    }
}

        为什么需要回调呢?因为结果值是由另外一个线程计算的,当前线程是不知道什么时候计算完成,所以传递一个回调接口给计算线程,当计算完成时调用这个回调接口,回传结果值。这个在很多地方有用到,比如Dubbo的异步调用,比如消息中间件的异步通信等等。利用FutureTask、Callable、Thread对耗时任务(如查询数据库)的预处理,在需要计算结果之前就启动计算。下面来看一下它们之间的关系图。

         RunnableFuture是一个接口,继承了Runnable和Future这两个接口,Future表示一个任务的生命周期,并提供了相应的方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等。

public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);

    /**
     *当前的Future是否被取消
     */
    boolean isCancelled();

    /**
     *当前的Future是否已结束,包括运行完成、抛出异常以及取消
     */
    boolean isDone();

    /**
     *获取Future的结果值,如果当前Future还没有结束,那么当前线程就等待
	 *直到Future运行结束,那么会唤醒等待结果值的线程
     */
    V get() throws InterruptedException, ExecutionException;

    /**
     * 获取Future的结果值,与get()相比较多了允许设置超时时间
     */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

        我们可以把Runnable比作是生产者,Future比作是消费者,那么FutureTask是被这两者共享的,生产者运行run方法计算结果,消费者通过get方法获取结果。作为生产者消费者模式,有一个机制,如果生产者数据还没准备的时候,消费者会被阻塞,当生产者数据准备好后会唤醒消费者继续执行。下面看一下FutureTask的实现:

1)state 的含义

private volatile int state;
	//新建状态,表示这个FutureTask还没有开始运行
    private static final int NEW          = 0;
	//完成状态,表示FutureTask任务已经计算完毕了,但是还有一些后续操作,例如唤醒等待线程操作,还没有完成
    private static final int COMPLETING   = 1;
	//FutureTask任务完结,正常完成,没有发生异常
    private static final int NORMAL       = 2;
	//FutureTask任务完结,因为发生异常
    private static final int EXCEPTIONAL  = 3;
	//FutureTask任务完结,因为取消任务
    private static final int CANCELLED    = 4;
	//FutureTask任务完结,也是取消任务,不过发起了中断运行任务线程的中断请求
    private static final int INTERRUPTING = 5;
	//FutureTask任务完结,也是取消任务,已经完成了中断运行任务线程的中断请求
    private static final int INTERRUPTED  = 6;

2)run 方法

/**
     * 调用callable的call方法返回结果result,根据是否发生异常,调用 set(result)或 
	 * setException(ex)方法表示FutureTask任务完结。
	 * 不过因为FutureTask任务都是在多线程环境中使用,所以要注意并发冲突问题,注意在run
	 * 方法中,没有使用synchronized代码块或者Lock来解决并发问题,而是使用了CAS乐观锁
	 * 来实现并发安全,保证只有一个线程能运行FutureTask任务。
     */
    public void run() {
		//如果状态state不是NEW,或者设置runner值失败
		//表示有别的线程在此之前调用run方法,并成功设置了runner值
		//保证只有一个线程可以运行try代码块中的代码
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

3)get 方法

/**
     * 阻塞获取线程执行结果,这里主要做了两个事情:
	 * 1. 判断当前的状态,如果状态小于等于 COMPLETING,表示FutureTask任务还没有完结,
	 * 所以调用awaitDone方法,让当前线程等待;
	 * 2. report 返回结果值或者抛出异常
     */
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

4)awaitDone方法

/**
     * 如果当前的结果还没有被执行完,把当前线程线程插入到等待队列
     */
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;//节点是否已添加
        for (;;) {
			// 如果当前线程中断标志位是 true,
			// 那么从列表中移除节点q,并抛出InterruptedException异常
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {//当状态大于COMPLETING时,表示FutureTask任务已结束
                if (q != null)
                    q.thread = null;// 将节点q线程设置为null,因为线程没有阻塞等待
                return s;
            }// 表示还有一些后序操作没有完成,那么当前线程让出执行权
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
			//表示状态是NEW,那么就需要将当前线程阻塞等待,就是将它插入等待线程链表中
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
				// 使用CAS函数将新节点添加到链表中,如果添加失败,那么queued为false,
				// 下次循环时,会继续添加,直到成功。
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {// timed为true表示需要设置超时
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);// 让当前线程等待nanos时间
            }
            else
                LockSupport.park(this);
        }
    }

5)report方法

/**
     * 根据传入的状态值s,来决定是抛出异常,还是返回结果值。这个两种情况都表示FutureTask完结了
     */
    @SuppressWarnings("unchecked")
    private V report(int s) throws ExecutionException {
        Object x = outcome;;//表示call的返回值
        if (s == NORMAL)//表示正常完结状态,所以返回结果值
            return (V)x;
		// 大于或等于CANCELLED,都表示手动取消FutureTask任务,
		// 所以抛出CancellationException异常
        if (s >= CANCELLED)
            throw new CancellationException();
		// 否则就是运行过程中,发生了异常,这里就抛出这个异常
        throw new ExecutionException((Throwable)x);
    }

        下面再来看一个线程池里面的submit方法。

public class PoolCallableDemo implements Callable<String> {


    @Override
    public String call() throws Exception {
        return "hello world";
    }
    public static void main(String[] args) throws ExecutionException,
            InterruptedException {
        ExecutorService es= Executors.newFixedThreadPool(1);
        CallableDemo callableDemo=new CallableDemo();
        Future future=es.submit(callableDemo);
        System.out.println(future.get());
    }
}
        调AbstractExecutorService抽象类中的submit方法,相对于execute方法来说,只多做了一步操作,就是封装了一个 RunnableFuture。
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

四、线程池的应用

        这里举一个实际应用的例子,某需求场景下需要阻塞等待任务的完成,1、用户发起任务,任务状态有成功、失败和等待;2、任务执行时间会比较长,成功后系统才可以执行下一步动作;3、任务成功、失败或者超时(一分钟)都视为失败,下面在springboot项目中实现这个场景。

1、配置文件中配置线程池参数

# 线程池配置
# 配置核心线程数
threadpool.coreSize = 20
# 配置最大线程数
threadpool.maxSize = 50
# 配置存活时间
threadpool.keepAliveTime = 120
# 配置队列
threadpool.dequeSize = 100

2、线程池配置类

package com.mszlu.blog.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

import java.util.concurrent.ThreadPoolExecutor;

@Configuration
public class ThreadPoolConfig {

    @Bean("threadPool")
    public ThreadPoolTaskExecutor threadPoolTaskExecutor(ThreadPoolProperties threadPoolProperties) {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(threadPoolProperties.getCoreSize());
        executor.setMaxPoolSize(threadPoolProperties.getMaxSize());
        executor.setQueueCapacity(threadPoolProperties.getDequeSize());
        executor.setKeepAliveSeconds(threadPoolProperties.getKeepAliveTime());
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setThreadNamePrefix("MyExecutor-");
        executor.initialize();
        return executor;
    }

    @ConfigurationProperties(prefix = "threadpool")
    @Component
    @Data
    private class ThreadPoolProperties {
        private Integer coreSize;
        private Integer maxSize;
        private Integer keepAliveTime;
        private Integer dequeSize;
    }


}
3、根据任务id阻塞获取状态并返回
package com.mszlu.blog.service.impl;

import com.mszlu.blog.service.TaskService;
import com.mszlu.blog.utils.Exception.AppBaseException;
import com.mszlu.blog.utils.response.ResponseStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

@Service
public class TaskServiceImpl implements TaskService {

    @Autowired
    private ThreadPoolTaskExecutor threadPool;

    @Override
    public String waitAndGetExportId(String taskId) {
        Future<Map<String, String>> future = threadPool.submit(() -> {
            long startTIme = 0;
            while (true) {
                //status为1成功,2为失败,3为等待
                Map<String, String> exportInfoByTask = getExportInfoByTask(taskId);
                boolean isFinish = exportInfoByTask.get("status") != "3";
                if (isFinish || startTIme > 12) {
                    return exportInfoByTask;
                }
                Thread.sleep(5000);
                startTIme++;
            }
        });

        Map<String, String> callback;
        try {
            callback = future.get();
        } catch (InterruptedException | ExecutionException e) {
            throw new AppBaseException("阻塞等待导出任务结果异常" + e.getMessage(), ResponseStatus.FAIL.getStatus());
        }
        return callback.get("exportInfo");

    }

    /**
     * 任务状态和信息
     *
     * @param taskId
     * @return
     */
    private Map<String, String> getExportInfoByTask(String taskId) {
        HashMap<String, String> map = new HashMap<>();
        String status = "1";
        if (taskId != null) {
            map.put("status", status);
            map.put("exportInfo", "123456")
        }
        return map;
    }
}

本文含有隐藏内容,请 开通VIP 后查看