【Java线程池前生今世】

发布于:2022-12-23 ⋅ 阅读:(246) ⋅ 点赞:(0)

Java线程池前生今世

池化技术简述

池化技术在开发中应用十分广泛,简单来说,池化技术就是将可重复利用的对象比如连接、线程等,统一管理起来。线程池、数据库、连接池、HTTP、Redis 连接池等等都是对池化技术的很好实现。

通常而言,池化技术所管理的对象,无论是连接还是线程,它们的创建过程都是比较耗时的,也比较消耗资源。所以,我们就需要将他们放入一个池子中统一管理起来,以达到提升性能和资源复用的目的。所以池化技术的核心思想就是空间换时间

不过,没有任何一项技术是黑核的,如果池子中的对象、资源没有得到充分的利用,也会造成多余的内存浪费(池化技术的核心思想就是空间换时间)当然,相对于池化技术的有点来说,这个缺点是可以忽略的。

线程池详解

本篇的标题即是线程池前生今世,所以还是来聊聊线程池吧 ~

如其名,线程池主要就是负责创建和管理线程。

在没有线程池的时候,我们每次使用到线程的时候就需要单独创建,用完之后再做销毁,然而,创建、销毁线程都是比较消耗资源和时间的操作。

Java 创建线程的三种方式

通过继承 Thread类的方式
package com.softsheng.based.thread;

import lombok.extern.slf4j.Slf4j;

/**
 * 通过继承Thread实现线程
 *
 * @author softsheng
 * @since 2022/9/7
 */
@Slf4j
public class ThreadTest extends Thread {

    @Override
    public void run() {
        LOGGER.info("~ 通过继承Thread方式!");
    }

    public static void main(String[] args) {
        ThreadTest thread = new ThreadTest();
        // 启动线程
        thread.start();
    }
}
实现 Runnable接口的方式
package com.softsheng.based.thread;

import lombok.extern.slf4j.Slf4j;

/**
 * 通过实现Runnable接口实现线程
 *
 * @author softsheng
 * @since 2022/9/7
 */
@Slf4j
public class RunnableTest implements Runnable  {

    @Override
    public void run() {
        LOGGER.info("~ 通过实现Runnable方式!");
    }

    public static void main(String[] args) {
        RunnableTest runnable = new RunnableTest();
        Thread thread = new Thread(runnable);
        // 启动线程
        thread.start();
    }
}
实现 Callable接口的方式
package com.softsheng.based.thread;

import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

/**
 * 通过实现Callable实现线程
 *
 * @author softsheng
 * @since 2022/9/7
 */
public class CallableTest implements Callable {

    @Override
    public Object call() throws Exception {
        System.out.println("实现Callable接口方式");
        // ... 具体业务逻辑

        // 返回值,类型和Callable泛型一致,这里用String举例
        return "返回值";
    }

    public static void main(String[] args) {
        // 创建Callable实现类对象
        CallableTest callable = new CallableTest();
        // 将callable作为参数创建FutureTask对象
        FutureTask<String> futureTask = new FutureTask<>(callable);
        // 将futureTask作为参数创建线程
        Thread thread = new Thread(futureTask);
        // 启动线程
        thread.start();
        try {
            // 获取线程执行后的返回值
            String result = futureTask.get();
            System.out.println(result);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

线程创建三种方式对比
  1. 从继承和实现的角度来说(单继承多实现),实现扩展性更强。
  2. 集成Thread和实现Runnable没有返回值,且覆写的run()方法。实现Callable方式,覆写的call()方法,有返回值,对应泛型。
  3. 编程复杂度。

Java 创建线程池的四种方式

Executors类(java.util.concurrent包下 Doug Lea 亲作)提供了四种创建线程池方法,这些方法最终都是通过配置ThreadPoolExecutor的不同参数,来达到不同的线程管理效果。

分别为:

newCacheTreadPool: 创建一个可以缓存的线程池,如果线程池长度超过处理需要,可以灵活回收空闲线程,没回收的话就新建线程。

newFixedThread: 创建一个定长的线程池,可控制最大并发数,超出的线程进行队列等待。

newScheduleThreadPool: 可以创建定长的、支持定时任务,周期任务执行。

newSingleExecutor: 创建一个单线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

通过 newCacheTreadPool创建线程池

创建一个线程池,根据需要创建新线程,但在可用时将重用以前构造的线程。这些池通常会提高执行许多短期异步任务的程序的性能。如果可用,对execute的调用将重用以前构造的线程。如果没有可用的现有线程,将创建一个新线程并将其添加到池中。六十秒内未使用的线程将被终止并从缓存中删除。因此,保持空闲足够长时间的池不会消耗任何资源。请注意,可以使用ThreadPoolExecutor构造函数创建具有相似属性但细节不同(例如超时参数)的池。

示例代码:

package com.softsheng.based.threadpool;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 通过 newCacheTreadPool创建线程池
 *
 * @author softsheng
 * @since 2022/9/7
 */
@Slf4j
public class CacheThreadPoolTest {

    public static void main(String[] args) {
        // 不推荐该种方式创建线程池:::newCachedThreadPool提供了两种构建方式
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            final int index = i;
            try {
                Thread.sleep(index * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            cachedThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    LOGGER.info(index + "");
                }
            });

        }
    }
}

线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。

通过 newFixedThread创建线程池

创建一个线程池,该线程池重用在共享无界队列上运行的固定数量的线程,并在需要时使用提供的 ThreadFactory 创建新线程。在任何时候,最多nThreads线程将是活动的处理任务。如果在所有线程都处于活动状态时提交了其他任务,它们将在队列中等待,直到有线程可用。如果任何线程在关闭之前的执行过程中由于失败而终止,如果需要执行后续任务,新的线程将取代它。池中的线程将一直存在,直到显式shutdown

示例代码:

package com.softsheng.based.threadpool;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 通过 newFixedThreadPool 创建线程池
 *
 * @author softsheng
 * @since 2022/9/7
 */
@Slf4j
public class FixedThreadPoolTest {

    public static void main(String[] args) {
        // 不推荐该种方式创建线程池:::newFixedThreadPool提供了两种构建方式可以指定 池中的线程数 
        ExecutorService cachedThreadPool = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 10; i++) {
            final int index = i;
            try {
                Thread.sleep(index * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            cachedThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    LOGGER.info(index + "");
                }
            });

        }
    }
}

因为线程池大小为3,每个任务输出index后sleep 2秒,所以每两秒打印3个数字。

定长线程池的大小最好根据系统资源进行设置。

通过newScheduleThreadPool创建线程池

创建一个单线程执行程序,可以安排命令在给定延迟后运行,或定期执行。 (但请注意,如果该单线程在关闭前的执行过程中因故障而终止,如果需要执行后续任务,则新线程将取代它。)任务保证按顺序执行,并且不会有超过一个任务处于活动状态在任何给定时间。与其他等效的newScheduledThreadPool(1, threadFactory)不同,返回的执行程序保证不能重新配置以使用其他线程。

代码示例:

延迟3s执行

package com.softsheng.based.threadpool;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static java.util.concurrent.TimeUnit.*;

/**
 * 通过 newScheduledThreadPool 创建线程池
 *
 * @author softsheng
 * @since 2022/9/7
 */
@Slf4j
public class ScheduledThreadPoolTest {

    public static void main(String[] args) {
        // 不推荐该种方式创建线程池  newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
        scheduledThreadPool.schedule(new Runnable() {
            @Override
            public void run() {
                LOGGER.info("delay 3 seconds ");
            }
            // 延迟3s执行  
        },3, TimeUnit.SECONDS);
    }
}

延迟1秒后每3秒执行一次

package com.softsheng.based.threadpool;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static java.util.concurrent.TimeUnit.*;

/**
 * 通过 newScheduledThreadPool 创建线程池
 *
 * @author softsheng
 * @since 2022/9/7
 */
@Slf4j
public class ScheduledThreadPoolTest {

    public static void main(String[] args) {
        // 不推荐该种方式创建线程池:::newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
        scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                LOGGER.info("delay 3 seconds ");
            }
            // 表示延迟 1s后 每隔 3s执行一次
        },1,3, TimeUnit.SECONDS);
    }
}

ScheduledExecutorServiceTimer更安全,功能更加强大。

通过newSingleExecutor创建线程池

创建一个单线程执行程序,可以安排命令在给定延迟后运行,或定期执行。 (但请注意,如果该单线程在关闭前的执行过程中因故障而终止,如果需要执行后续任务,则新线程将取代它。)任务保证按顺序执行,并且不会有超过一个任务处于活动状态在任何给定时间。与其他等效的newScheduledThreadPool(1)不同,返回的执行程序保证不能重新配置以使用额外的线程。

package com.softsheng.based.threadpool;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 通过 newSingleThreadExecutor 创建线程池
 *
 * @author softsheng
 * @since 2022/9/7
 */
@Slf4j
public class SingleThreadExecutorTest {
    public static void main(String[] args) {
        ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();

        for (int i = 0; i < 10; i++) {
            final int index = i;
            singleThreadExecutor.execute(new Runnable() {

                @Override
                public void run() {
                    try {
                        LOGGER.info(index + "");
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
}

结果依次输出,相当于顺序执行各个任务。

优雅的使用线程池

虽然以上是JDK提供给我们方便使用的线程池,但是阿里巴巴开发规范不推荐我们直接使用Executors创建线程池。

java.util.concurrent源码:

CachedThreadPool允许的创建线程数量为 Integer.MAX_VALUE可能会创建大量的线程,最终导致OOM

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

FixedThreadPool允许的请求队列长度为 Integer.MAX_VALUE可能会堆积大量的请求,最终导致OOM

    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }
    
	/// 继续深入 LinkedBlockingQueue 的源码发现,创建了一个容量为 Integer.MAX_VALUE的LinkedBlockingQueue 

    /**
     * 创建一个容量为Integer.MAX_VALUE的LinkedBlockingQueue  
     * {@link Integer#MAX_VALUE}.
     */
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

推荐通过new ThreadPoolExecutor()的写法创建线程池,这样写线程数量更灵活,开发中多数用这个类创建线程。

springThreadPoolTaskExecutor为例:

示例代码中的参数配置只是个人配置的参数,具体生产使用时需要通过压测不断的动态调整线程池参数,观察 CPU 利用率、系统负载、GC、内存、RT、吞吐量 等各种综合指标数据,来找到一个相对比较合理的值。

所以各位面试官不要再问设置多少线程合适了,这个问题没有标准答案,需要结合业务场景,设置一系列数据指标,排除可能的干扰因素,注意链路依赖(比如连接池限制、三方接口限流),然后通过不断动态调整线程数,测试找到一个相对合适的值。


    private ThreadPoolTaskExecutor getThreadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 核心线程池数量,方法: 返回可用处理器的Java虚拟机的数量。
        executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
        // 最大线程数量
        executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 5);
        // 线程池的队列容量
        executor.setQueueCapacity(Runtime.getRuntime().availableProcessors() * 2);
        // 设置线程活跃时间(秒)
        executor.setKeepAliveSeconds(60);
        // 设置拒绝策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 等待所有任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        return executor;
    }
ThreadPoolExecutor核心参数解析:
    /**
     * 使用给定的初始参数创建一个新的ThreadPoolExecutor
     * 参数解析.
     *
     * @param corePoolSize 保留在池中的线​​程数,即使它们是空闲的,除非设置allowCoreThreadTimeOut
     * @param maximumPoolSize 池中允许的最大线程数
     * @param keepAliveTime 当线程数大于核心时,这是多余的空闲线程在终止前等待新任务的最长时间。
     * @param unit  keepAliveTime参数的时间单位
     * @param workQueue  用于在执行任务之前保存任务的队列。此队列将仅保存由execute方法提交的Runnable任务。
     * @param threadFactory 执行器创建新线程时使用的工厂
     * @param handler 由于达到线程边界和队列容量而阻塞执行时使用的处理程序
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler){
}

其中有三个最终要的参数:

  • corePoolSize :核心线程数定义了最小可以同时运行的线程数量。
  • maximumPoolSize:当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
  • workQueue:当新任务来的时候会先判断当前运行的线程数据量是否达到核心线程数,如果达到的话,新任务就会存放在队列中。

这里只是简单说明了ThreadPoolExecutor的七个参数的作用。

更多可参考Java线程池七个参数详解

线程池工作原理

线程池工作原理图解

假如我们需要提交任务给线程池管理整个流程如下:

  1. 提交新任务。
  2. 判断线程线程数是否少于核心线程数corePoolSize,是的话就会创建线程处理任务,否则就会将任务丢到队列中等待执行。
  3. 当队列中的任务满了之后,继续创建线程,直到线程数达到maximumPoolSize
  4. 当核心线程数达到maxnumPoolSize还有任务提交,就执行拒绝策略。

通过上述流程说明可以get到,JDK自带的ThreadPoolExecutor会优先把处理不过来的任务放入队列中去,而不是创建更多的线程去处理任务。只有当队列中等待执行的任务满了,线程池才会去创建队列,知道线程数量达到maximumPoolSize。如果任务执行时间过长的话,还会造成队列中任务堆积情况。

并且当线程数大于核心线程数时,如果线程等待keepAliveTime没有处理任务的话,该线程会被回收,直到线程数缩小到核心线程数才不会继续对线程进行回收。

可以看出JDK自带的线程池ThreadPoolExecutor比较适合CPU密集型的任务,不太适合执行I/O密集型任务。

为什么会这样说呢?

因为执行 CPU 密集型的任务时 CPU 比较繁忙,只需要创建和 CPU 核数相当的线程就好了,多了反而会造成线程上下文切换。

如何理解CPU密集型任务和I/O密集型任务?

CPU 密集型简单理解就是利用 CPU 计算能力的任务,比如:你在内存中对大量数据进行排序。但凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。