在Spring Boot中配置@Async的线程池的拒绝策略

发布于:2023-12-06 ⋅ 阅读:(68) ⋅ 点赞:(0)

上一篇文章中,我们使用多个线程隔离不同的异步任务,这篇文章,我们将围绕在@Async的线程池的拒绝策略进行完善线程池的使用,在我们例举案例之前,我们先了解一下:


  1. @Async的拒绝策略用来解决什么问题,还有使用他究竟有什么好处?

    使用@Async的拒绝策略可以解决异步任务线程池队列已满时的问题。当线程池队列已满时,默认的拒绝策略是抛出RejectedExecutionException异常,表示无法接受新的任务。而自定义拒绝策略可以提供一种灵活的方式来处理这种情况,从而解决以下问题:

    1. 避免任务丢失:当线程池队列已满时,如果没有合适的拒绝策略,新的任务可能会被丢弃,导致任务丢失。通过自定义拒绝策略,你可以选择将任务丢弃、阻塞等待或者采取其他适当的处理方式,以避免任务丢失。

    2. 控制任务流量:拒绝策略可以帮助你控制任务的流量。当线程池队列已满时,你可以选择拒绝执行新的任务,从而控制任务的提交速率,避免系统资源被过度消耗。这对于保护系统的稳定性和可靠性非常重要。

    3. 提供反馈机制:自定义拒绝策略可以提供一种反馈机制,告知任务提交者任务被拒绝执行的原因。通过捕获拒绝执行的异常或其他方式,你可以根据需要记录日志、发送通知或采取其他适当的操作,以便及时了解任务无法执行的情况。


  1. 在什么情况下,我们才使用@Async的拒绝策略?
    1. 任务队列满载:当异步任务提交的速度超过线程池处理任务的速度时,任务队列可能会被填满。这时,新的任务无法加入队列,就需要使用线程拒绝策略来处理这些被拒绝的任务。

    2. 任务执行资源有限:当系统的资源(如线程数)有限,并且无法扩展时,可能会出现无法处理所有任务的情况。这时,使用线程拒绝策略可以控制任务的提交速率,避免资源被过度消耗。

    3. 任务处理能力不足:当异步任务的处理能力不足以满足需求时,可以使用线程拒绝策略来限制任务的提交,以避免任务堆积和系统负载过高。

    4. 任务优先级管理:有时,你可能希望根据任务的优先级来管理任务的执行。通过自定义线程拒绝策略,你可以根据任务的优先级进行选择性的拒绝执行,以确保高优先级任务能够及时得到处理。

接下来,我们看一个案例:
我们先创建一个Spring Boot应用,创建好我们的线程池配置。

@EnableAsync
@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @EnableAsync
    @Configuration
    class TaskPoolConfig {
        @Bean
        public Executor taskExecutor1() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(2);
            executor.setMaxPoolSize(2);
            executor.setQueueCapacity(2);
            executor.setKeepAliveSeconds(60);
            executor.setThreadNamePrefix("executor-1-");
            //后续在这里填写我们需要的拒绝策略
            return executor;
        }
    }
}

如上,我们创建了一个核心线程数为2,最大线程数为2,缓冲队列长度为2,假设我们有五个异步任务同时开始,那么会造成什么情况呢?

接着看吧,我们使用@Async注解实现一个任务

@Slf4j
@Component
public class AsyncTasks {

    public static Random random = new Random();

    @Async("taskExecutor1")
    public CompletableFuture<String> doTaskOne(String taskNo) throws Exception {
        log.info("开始任务:{}", taskNo);
        long start = System.currentTimeMillis();
        Thread.sleep(random.nextInt(10000));
        long end = System.currentTimeMillis();
        log.info("完成任务:{},耗时:{} 毫秒", taskNo, end - start);
        return CompletableFuture.completedFuture("任务完成");
    }
}

我们来编写一个测试用例,来看看会发生什么结果?

@Slf4j
@SpringBootTest
public class ApplicationTests {

    @Autowired
    private AsyncTasks asyncTasks;

    @Test
    public void test2() throws Exception {
        long start = System.currentTimeMillis();
        // 线程池1
        CompletableFuture<String> task1 = asyncTasks.doTaskOne("1");
        CompletableFuture<String> task2 = asyncTasks.doTaskOne("2");
        CompletableFuture<String> task3 = asyncTasks.doTaskOne("3");
        CompletableFuture<String> task4 = asyncTasks.doTaskOne("4");
        CompletableFuture<String> task5 = asyncTasks.doTaskOne("5");
        // 一起执行
        CompletableFuture.allOf(task1, task2, task3, task4, task5).join();
        long end = System.currentTimeMillis();
        log.info("任务全部完成,总耗时:" + (end - start) + "毫秒");
    }
}
2023-11-28 19:03:57.138  INFO 27916 --- [   executor-1-1] com.miaow.demo.AsyncTasks       : 开始任务:1
2023-11-28 19:03:57.138  INFO 27916 --- [   executor-1-2] com.miawo.demo.AsyncTasks       : 开始任务:2

org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@5580d62f[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]] did not accept task: java.util.concurrent.CompletableFuture$AsyncSupply@17b6d426

	at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:324)
	at java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1618)
	at java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1843)
	at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.doSubmit(AsyncExecutionAspectSupport.java:274)
	...
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
	at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
	at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$AsyncSupply@17b6d426 rejected from java.util.concurrent.ThreadPoolExecutor@5580d62f[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
	at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:321)
	... 76 more

我们来对报错日志进行分析:
[java.util.concurrent.ThreadPoolExecutor@5580d62f[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]] did not accept task: 这段代码中,我们可以明确的知道,在我们的超过了执行线程 + 缓冲队列长度,也就是 2 + 2 = 4 ,但是我们进来了5个线程,所以我们的第五个线程就被拒绝了。
所以,在默认情况之下,我们的线程池的拒绝策略就是:
当线程池队列满了,那么我们的线程池就会丢弃这个任务,并抛出异常


OK,既然线程池中有默认的线程池拒绝策略,那么我们可以对他配置吗?考虑到实际开发过程中,我们在有些任务场景中,直接拒绝的策略一般都不太适用,有的时候,我们会选择丢掉之前开始执行但是并未完成的任务,也可能会考虑丢掉刚刚开始执行,但是没完成的任务,反正有各种场景,只要你线程没执行完毕,我就可以丢弃你,那么我们具体要怎么实现呢?

线程池的拒绝策略是指当线程池无法接受新的任务时,如何处理这些被拒绝的任务。在Spring框架中,可以通过配置ThreadPoolTaskExecutor来设置线程池的拒绝策略。

ThreadPoolTaskExecutor提供了几种常见的拒绝策略:

  • AbortPolicy(默认):当线程池无法接受新的任务时,直接抛出RejectedExecutionException异常。

  • CallerRunsPolicy:当线程池无法接受新的任务时,将任务返回给调用者执行。也就是说,如果线程池满了,任务会在调用者的线程中执行。

  • DiscardPolicy:当线程池无法接受新的任务时,直接丢弃这个任务,不做任何处理。

  • DiscardOldestPolicy:当线程池无法接受新的任务时,先丢弃最早加入队列的任务,然后尝试再次提交新的任务。

来,我们在代码中进行配置:

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

// AbortPolicy策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());

// DiscardPolicy策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());

// DiscardOldestPolicy策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());

// CallerRunsPolicy策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

Lamba表达式的线程策略配置:

executor.setRejectedExecutionHandler((r, executor1) -> {
    // 拒绝策略的逻辑
});

总的来说,当异步任务的提交速度超过处理速度、资源有限或任务处理能力不足时,使用@Async的线程拒绝策略可以帮助你控制任务的提交速率,避免任务堆积和系统负载过高。这样可以提高系统的稳定性和可靠性,确保异步任务的顺利执行。

本文含有隐藏内容,请 开通VIP 后查看

网站公告

今日签到

点亮在社区的每一天
去签到