1 线程池在业务中的实践
1.1 业务场景
在当今的互联网业界,为了最大程度利用 CPU 的多核性能,并行运算的能力是不可或缺的。通过线程池管理线程获取并发性是一个非常基础的操作,下面看两个典型的使用线程池获取并发性的场景。
场景一:快速响应用户请求
描述:当用户发起实时请求(如查看商品信息,需聚合价格、优惠、库存、图片等信息)时,服务追求短响应时间;
分析:从用户体验看,响应越快越好。而商品信息聚合这类功能复杂,存在调用级联等情况。此时用线程池,把各调用封装成任务并行执行,能缩短总体响应时间。为了最大程度提升响应速度,不设置队列缓冲并发任务,调高
corePoolSize
(核心线程数)和maxPoolSize
(最大线程数),尽可能创建多的线程快速执行任务。如下图所示,串行执行时获取价格、库存、图片是依次进行,耗时久;并行执行时这些操作同时开展,能更快返回结果;
场景2:快速处理批量任务
描述:像统计报表(计算全国各门店有某属性的商品以辅助营销策略)这类离线大量计算任务,需要快速执行;
分析:这类场景任务量大,虽也希望快,但不需要瞬时完成,更关注单位时间内处理更多任务(吞吐量优先)。所以要用多线程并行计算,但要设置队列缓冲并发任务,调整合适的
corePoolSize
。因为线程数过多会引发线程上下文切换频繁,降低处理速度和吞吐量。如下图所示,串行执行批量任务时,任务依次进行,总耗时是各任务耗时之和;并行执行时,任务先缓存到队列,然后线程并行处理队列里的任务,能更高效完成,提升吞吐量;
1.2 实际问题及方案思考
线程池使用面临的核心的问题在于:线程池的参数并不好配置。一方面其运行机制难理解,合理配置需要强依赖开发人员的经验与知识;另一方面,线程池执行情况和任务类型(如IO密集型、CPU密集型)关联性强,不同任务运行表现差异大,业界缺乏成熟普适的经验策略;
关于线程池配置不合理引发的故障,下面举一些例子:
Case1:XX页面展示接口大量调用降级
- 事故描述:XX页面展示接口出现大量调用降级情况,数量级达到几十到上百;
- 事故原因:该服务展示接口内部逻辑采用线程池进行并行计算,由于没有预先估算好调用的流量,使得最大线程数设置偏小。当任务量超过线程池承载能力时,大量抛出
RejectedExecutionException
异常,从而触发接口降级条件。如下图:请求进入展示接口后生成多个任务,线程池因容量不足,只能处理部分任务,其余任务被拒绝;
Case2:XX业务服务不可用S2级故障
- 事故描述:XX业务提供的服务执行时间过长,作为上游服务整体超时,进而导致大量下游服务调用失败;
- 事故原因:该服务处理请求的内部逻辑使用线程池做资源隔离,但队列设置过长,且最大线程数设置失效。当请求数量增加时,大量任务堆积在队列中,任务执行时间被大幅拉长,最终造成下游服务大量调用超时。如下图:请求生成任务后,任务都在队列中缓冲等待执行,而核心线程数设置过小,导致任务执行速率低;
业务中要使用线程池,而线程池使用不当又会导致故障,那么我们怎样才能更好地使用线程池呢?
2 线程池参数动态化
2.1 引出
在日常项目开发中,线程池用于处理并发场景以提升任务处理效率。但传统方式下,线程池参数难以精准预设,只能在服务运行时不断调整参数,且每次调整都需重启服务,这会对服务可用性造成影响,因此需要一种不重启服务就能动态调整参数的方案;
那如何实现在不重启服务的前提下,动态调整线程池参数呢?
旧流程:修改线程池参数 → 重新发布服务 → 查看服务是否运行正常 → 结束。流程繁琐,且重新发布意味着服务会中断或重启;
新流程:仅需修改线程池参数即可完成,无需重启服务,大幅简化了参数调整的成本,提升了服务的灵活性;
2.2 线程池可调整的参数
- 线程池构造参数有8个,但最核心的3个是:
corePoolSize
(核心线程数)、maximumPoolSize
(最大线程数)、workQueue
(任务队列)。这3个参数直接决定了线程池的任务分配和线程分配策略,结合不同业务场景,又衍生出两种典型队列选择逻辑:场景1:并行执行子任务,提高响应速度。这类场景追求即时响应(如前文快速响应用户请求场景),应使用同步队列。同步队列不缓存任务,任务到达后会立即尝试执行,避免任务在队列中等待的时间开销,保障响应速度;
场景2:并行执行大批量任务,提升吞吐量。这类场景追求单位时间处理更多任务(如前文快速处理批量任务场景),应使用有界队列。有界队列会缓存大批量任务,通过队列缓冲来削峰填谷,同时要声明队列容量,防止任务无限制堆积导致内存溢出等问题。
2.3 实现思路
JDK提供的
ThreadPoolExecutor
类,通过多个public
的setter
方法(如setCorePoolSize
、setMaximumPoolSize
、setKeepAliveTime
等),支持在运行时动态修改线程池核心参数:以
setCorePoolSize
为例,修改后线程池会根据新核心线程数、当前工作线程数与原始值的对比,采取不同策略:若新核心线程数 < 当前工作线程数:说明有多余的
worker
线程,会尝试将它们中断,回收多余线程;若新核心线程数 > 原始值,且任务队列中有等待任务:会创建新的
Worker
线程,加快任务执行;
示例代码:
public class DynamicThreadPool { // 初始化ThreadPoolExecutor private ThreadPoolExecutor executor; // 指定核心线程数、最大线程数、存活时间、任务队列等基础参数 public DynamicThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this.executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } // 调用ThreadPoolExecutor的setCorePoolSize和setMaximumPoolSize,动态修改核心线程数和最大线程数,无需重启服务 public void adjustThreadPool(int newCorePoolSize, int newMaximumPoolSize) { this.executor.setCorePoolSize(newCorePoolSize); this.executor.setMaximumPoolSize(newMaximumPoolSize); } // 向线程池提交任务(调用execute方法) public void submitTask(Runnable task) { this.executor.execute(task); } // 打印线程池当前状态(核心线程数、最大线程数、活跃线程数),用于观察参数调整效果 public void print(){ System.out.println("核心线程数:" + executor.getCorePoolSize() + " " +"最大线程数:" + executor.getMaximumPoolSize() +" " + "活跃线程数:" + executor.getActiveCount()); } // 关闭线程池 public void shutdown() { this.executor.shutdown(); } // 测试 public static void main(String[] args) throws InterruptedException { // 初始化任务队列和DynamicThreadPool,核心 / 最大线程数均为 10 BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(20); DynamicThreadPool dynamicThreadPool = new DynamicThreadPool(10, 10, 10, TimeUnit.SECONDS, workQueue); // 提交 30 个任务(每个任务休眠 10 秒,模拟耗时操作) for (int i = 0; i < 30; i++) { dynamicThreadPool.submitTask(() -> { log.info(Thread.currentThread().getName() + "开始执行任务"); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } }); } // 休眠 1 秒后,打印“修改前”的线程池状态 Thread.sleep(1000); System.out.println("======修改前======="); dynamicThreadPool.print(); // 动态调整线程池参数。调用adjustThreadPool,将核心线程数改为 5、最大线程数改为 8 dynamicThreadPool.adjustThreadPool(5, 8); System.out.println("======修改后======="); dynamicThreadPool.print(); // 打印 “修改后” 的线程池状态,再休眠 10 秒后再次打印,观察参数调整对线程池运行的影响 Thread.sleep(10000); dynamicThreadPool.print(); // 关闭线程池 dynamicThreadPool.shutdown(); } }
2.4 实现方案
2.4.1 基于 Nacos 配置中心动态调整线程池参数
借助Nacos的
Listener
机制,在Spring Bean初始化时,开启对Nacos中线程池配置文件(如threadPool.yml
)的监听。当Nacos中的配置发生变更时,通过监听逻辑实时更新线程池的核心参数(无需重启服务);nacosConfigManager.getConfigService().addListener("threadPool.yml", nacosConfigProperties.getGroup(), new Listener() { @Override public Executor getExecutor() { return null; } @Override public void receiveConfigInfo(String configInfo) { } });
nacosConfigManager.getConfigService()
:获取Nacos的配置服务实例,用于与Nacos服务器交互(获取配置、监听配置等);.addListener(...)
:为指定配置添加监听器,当配置发生变更时触发回调逻辑;- 第一个参数
"threadPool.yml"
:要监听的配置文件名(Nacos中存储的配置标识); - 第二个参数
nacosConfigProperties.getGroup()
:配置所属的分组(Nacos中配置的组织方式,默认分组为DEFAULT_GROUP
); - 第三个参数
new Listener(){...}
:匿名内部类实现的监听器,包含配置变更时的处理逻辑;
- 第一个参数
Listener
接口有两个核心方法,用于定义配置变更的处理规则:getExecutor()
方法:- 返回值:
Executor
线程池实例(可为null
); - 作用:指定执行
receiveConfigInfo
回调方法的线程池;- 若返回
null
:默认使用Nacos客户端内部的线程池执行回调; - 若返回自定义线程池:则在该线程池中执行回调,避免回调逻辑阻塞Nacos内部线程;
- 若返回
- 返回值:
receiveConfigInfo(String configInfo)
方法:- 参数
configInfo
:变更后的配置内容(字符串形式,如threadPool.yml
的最新文本内容); - 作用:配置变更时的核心处理逻辑。在实际业务中,这里会编写解析
configInfo
的代码(如将字符串转为YAML/JSON对象),然后提取新的线程池参数(如corePoolSize
、maxPoolSize
),最后调用线程池的setter
方法完成动态更新;
- 参数
示例代码:
@Configuration @Data public class MyDynamicThreadPool implements InitializingBean { // 通过@Value注解,从配置中注入线程池的初始参数 @Value("${threadPool.corePoolSize}") private int corePoolSize; // 核心线程数 @Value("${threadPool.maxPoolSize}") private int maxPoolSize; // 最大线程数 @Value("${threadPool.queueCapacity}") private int queueCapacity; // 队列容量 @Value("${threadPool.keepAliveSeconds}") private int keepAliveSeconds; // 线程存活时间 private static ThreadPoolTaskExecutor threadPoolTaskExecutor; @Autowired private NacosConfigManager nacosConfigManager; @Autowired private NacosConfigProperties nacosConfigProperties; // 实现InitializingBean接口,在 Bean 初始化时: @Override public void afterPropertiesSet() throws Exception { // 创建ThreadPoolTaskExecutor实例 threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); // 配置初始线程池参数 threadPoolTaskExecutor.setCorePoolSize(corePoolSize); threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize); threadPoolTaskExecutor.setQueueCapacity(queueCapacity); threadPoolTaskExecutor.setKeepAliveSeconds(keepAliveSeconds); // 设置线程名前缀 threadPoolTaskExecutor.setThreadNamePrefix( "SHISAN--"); // 设置拒绝策略 threadPoolTaskExecutor.setRejectedExecutionHandler( new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("队列已满,丢弃任务"); } }); threadPoolTaskExecutor.initialize(); // 监听 Nacos 中threadPool.yml配置的变更 nacosConfigManager.getConfigService().addListener("threadPool.yml", nacosConfigProperties.getGroup(), new Listener() { @Override public Executor getExecutor() { return null; } // 当配置变更时,receiveConfigInfo方法会被触发 @Override public void receiveConfigInfo(String configInfo) { System.out.println("动态修改前-->"); print(); // 解析新的配置内容(通过 YAML 解析工具将配置字符串转为Map) Yaml yaml = new Yaml(); InputStream inputStream = new ByteArrayInputStream(configInfo.getBytes()); Map<String, Object> dataMap = yaml.load(inputStream); JSONObject pool = new JSONObject(dataMap).getJSONObject("threadPool"); // 调用ThreadPoolTaskExecutor的setter方法,动态更新线程池参数 threadPoolTaskExecutor.setCorePoolSize(pool.getInteger("corePoolSize")); threadPoolTaskExecutor.setMaxPoolSize(pool.getInteger("maxPoolSize")); threadPoolTaskExecutor.setQueueCapacity(pool.getInteger("keepAliveSeconds")); threadPoolTaskExecutor.setQueueCapacity(pool.getInteger("queueCapacity")); System.out.println("动态修改后-->"); print(); } }); } // 提交任务到线程池执行 public void execute(Runnable runnable){ threadPoolTaskExecutor.execute(runnable); } // 打印线程池当前状态(核心线程数、最大线程数、阻塞队列已用 / 总容量、活跃线程数),用于观察参数调整效果 public void print(){ System.out.println("核心线程数:" + threadPoolTaskExecutor.getThreadPoolExecutor().getCorePoolSize() + " " +"最大线程数:" + threadPoolTaskExecutor.getThreadPoolExecutor().getMaximumPoolSize() +" " + "阻塞队列数:" + threadPoolTaskExecutor.getThreadPoolExecutor().getQueue().size() + "/" + queueCapacity +" " + "活跃线程数:" + threadPoolTaskExecutor.getThreadPoolExecutor().getActiveCount()); } }
2.4.2 使用DynamicTp——基于配置中心的轻量级动态可监控线程池
DynamicTp 是基于配置中心实现的轻量级动态线程池管理工具,核心功能涵盖:
动态调参:支持不重启服务,动态调整线程池参数(如核心线程数、最大线程数等);
通知报警:当线程池运行状态异常(如队列积压、线程活跃度过高等)时,通过企微、钉钉、 Lark、邮件等渠道发送告警;
运行监控:对线程池的运行状态(如活跃线程数、队列长度等)进行监控,结合监控系统(Prometheus、InfluxDB + Grafana 等)可视化展示;
三方包线程池管理:能管理第三方中间件(如 Tomcat、Dubbo、RocketMQ 等)的线程池;
官网:首页 | dynamictp;
架构分为三大核心模块,相互协作实现线程池的动态化、可监控管理:
配置中心:支持多种主流配置中心(Nacos、Apollo、Zookeeper、Consul 等),用于存储线程池的配置(如创建/删除线程池配置、修改参数、告警相关配置等),是“动态调参”的配置来源;
SpringBoot 服务:承载业务自定义线程池(如 DtpExecutor、OrderedDtpExecutor 等)和第三方中间件线程池(如 Tomcat、Dubbo 等的线程池),并基于配置中心的配置,实现任务增强(对任务执行的扩展)、运行监控(采集线程池运行指标)、通知告警(异常时触发告警)、动态调参(应用配置中心的参数变更);
监控模块:通过指标采集(如 JsonLog、MicroMeter、Endpoint 等方式),将线程池运行指标对接监控系统(Prometheus、InfluxDB、Grafana 等),实现线程池状态的可视化监控;
核心配置:
spring: dynamic: tp: enabled: true # 是否启用 dynamictp,默认true executors: # 动态线程池配置,都有默认值,采用默认值的可以不配置该项,减少配置量 - threadPoolName: dtpExecutor1 # 线程池名称,必填 threadPoolAliasName: 测试线程池 # 线程池别名,可选 executorType: common # 线程池类型 common、eager、ordered、scheduled、priority,默认 common corePoolSize: 5 # 核心线程数,默认1 maximumPoolSize: 8 # 最大线程数,默认cpu核数 queueCapacity: 2000 # 队列容量,默认1024 queueType: VariableLinkedBlockingQueue # 任务队列,查看源码QueueTypeEnum枚举类,默认VariableLinkedBlockingQueue rejectedHandlerType: CallerRunsPolicy # 拒绝策略,查看RejectedTypeEnum枚举类,默认AbortPolicy keepAliveTime: 10 # 空闲线程等待超时时间,默认60 threadNamePrefix: Fox # 线程名前缀,默认dtp allowCoreThreadTimeOut: false # 是否允许核心线程池超时,默认false waitForTasksToCompleteOnShutdown: true # 参考spring线程池设计,优雅关闭线程池,默认true awaitTerminationSeconds: 5 # 优雅关闭线程池时,阻塞等待线程池中任务执行时间,默认3,单位(s) preStartAllCoreThreads: false # 是否预热所有核心线程,默认false