Executors框架

发布于:2023-01-11 ⋅ 阅读:(517) ⋅ 点赞:(0)

1 前言

  通常java最简单的线程的例子是这样的:

    public static void main(String[] args) {
        Runnable runnable = () -> System.out.println("Thread is running.");
        Thread thread = new Thread(runnable);
        thread.start();
    }

  在较小的程序中这样实践是没有问题的;但是在大规模的应用中将线程的管理和创建部分与应用部分分开则比较合理。可以提高管理效率和节省线程反复创建和销毁带来的开销。

线程的创建因为涉及到和操作系统的交互所以开销会比较大

  那么封装了线程管理和创建这些功能的对象就是 java.util.concurrent.Executors

2 Executor线程池

  Executor就是java.util.concurrent.Executors通过上述红框中几个静态方法创建的执行器(线程池)

2.1 newFixedThreadPool

newFixedThreadPool

2.2 newSingleThreadExecutor

newSingleThreadExecutor

2.3 newCachedThreadPool

newCachedThreadPool

2.4 手动创建线程池

在alibaba的《java编程手册》中提到:

4.【强制】线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

说明:Executors返回的线程池对象的弊端如下:

1) newFixedThreadPool(int)和newSingleThreadPool():

允许的请求队列长度为Intege.MAX_VALUE,可能会对接大量的请求,从而导致OOM。

2) newCachedThreadPool():

允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程从而导致OOM。

在后续对线程池模型的详细讲解中,可以印证该实践的合理性。

/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {

  对创建函数 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)的参数进行一下详细说明:

  • corePoolSize int

  始终保持在线程池中的线程数量,即使这些线程是闲置的。特别说明:如果allowCoreThreadTimeOut属性被设置成了false,那么这些线程释放的时间以keepAliveTime为准。

public void allowCoreThreadTimeOut(boolean value) {
    if (value && keepAliveTime <= 0)
        throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
    if (value != allowCoreThreadTimeOut) {
        allowCoreThreadTimeOut = value;
        if (value)
            interruptIdleWorkers();
    }
}
  • maximumPoolSize int

  线程池中允许的最大线程数量

  • keepAliveTime long

  当线程数量超出核心线程池数量,那么超出的部分在被关闭前的最大闲置时间

  • unit TimeUnit

  keepAliveTime参数的单位

  • workQueue BlockingQueue<Runnable>

  任务在被执行前放置在该队列

  • threadFactory ThreadFactory

  线程池创建线程时使用的创建工厂;线程池有个默认的线程工厂

ThreadFactory threadFactory = Executors.defaultThreadFactory();
  • handler RejectedExecutionHandler

  the handler to use when execution is blocked because the thread bounds and queue capacities are reached.

        当线程

我们着重解释下ThreadFactory和RejectedExecutionHandler

2.4.1 ThreadFactory

ThreadFactory threadFactory = new ThreadFactory() {
    @Override
    public Thread newThread(@NotNull Runnable r) {
        return null;
        }
    };

2.4.2 RejectedExecutionHandler

  1. 默认是AbortPolicy;该策略直接丢弃提交任务,并抛出RejectedExecutionException异常
  2. DiscardPolicy:该策略也是将任务抛弃掉(对于提交的任务不管不问,什么也不做),不过并不抛出异常。
  3. DiscardOldestPolicy:该策略是当执行器未关闭时,从任务队列workQueue中取出第一个任务,并抛弃这第一个任务,进而有空间存储刚刚提交的任务。使用该策略要特别小心,因为它会直接抛弃之前的任务。
  4. CallerRunsPolicy:该策略并没有抛弃任何的任务,由于线程池中已经没有了多余的线程来分配该任务,该策略是在当前线程(调用者线程)中直接执行该任务。
    public static class Task implements Runnable {
        protected String taskName;
        public Task(String name) {
            super();
            this.taskName = name;
        }
        @Override
        public void run() {
            try {
                System.out.println(this.taskName + " is running.");
                Thread.sleep(500);
            } catch (Exception ignored) {

            }
        }

    }
    public static void main(String[] args) {
        // 创建线程池。线程池的"最大池大小"和"核心池大小"都为1,"线程池"的阻塞队列容量为1。
        ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0,
                TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1));

        // 设置线程池的拒绝策略为AbortPolicy
        pool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

            }
        });

        try {
            // 新建10个任务,并将它们添加到线程池中
            for (int i = 0; i < 10; i++) {
                Runnable myTask = new Task("Task-" + i);
                pool.submit(myTask);
            }
        } catch (RejectedExecutionException e) {
            e.printStackTrace();
            // 关闭线程池
            pool.shutdown();
        }
    }

本文含有隐藏内容,请 开通VIP 后查看