线程池

发布于:2024-09-18 ⋅ 阅读:(64) ⋅ 点赞:(0)
 //把编码信息分组进行查询,不然请求url过长,会出现错误
        //使用google的guava包
        Lists.partition(allCodes,15).forEach(codes->{//15个股票编码为一组
            //TODO:使用RestTemplate发送请求,获取个股的JS格式信息
            //设置url
            String url=stockInfoConfig.getMarketUrl()+String.join(",",codes);
           /* //设置请求头
            HttpHeaders headers = new HttpHeaders();
            //必须填写,否则数据采集不到
            headers.add("Referer","https://finance.sina.com.cn/stock/");
            headers.add("User-Agent","Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36");
            //组装请求对象
            HttpEntity<Object> httpEntity = new HttpEntity<>(headers);*/

            //使用restTemplate发送请求数据
            ResponseEntity<String> responseEntity = restTemplate.exchange(url, HttpMethod.GET, httpEntity, String.class);
            //获取响应码
            int statusCode= responseEntity.getStatusCodeValue();
            if(statusCode!=200){
                log.error("时间:{},采集数据出错,响应码状态为:{}", DateTime.now().toString(DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")),statusCode);
                return;
            }
            //获取js响应数据
            String resData = responseEntity.getBody();//TODO:会查询出15行个股JS数据
            log.info("时间:{},采集数据成功,数据为:{}",DateTime.now().toString(DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")),resData);
            //TODO:使用工具类把JS数据封装成个股entity对象
            List<StockRtInfo> list = myParserStockInfoUtil.parseStockMarketInfo(resData, ParseStockType.ASHARE);

            //TODO:批量把15个 个股entity对象入库
            int count=stockRtInfoMapper.insertBatch(list);
            if(count>0){
                log.info("插入成功");
            }else {
                log.error("插入失败");
            }
        });

 分组对个股股票进行查询插入时,是串行执行的,是一组一组依次执行,如果个股的数量太多,数据IO时间成本较高,则会导致运行超时,所以我们可引入多线程并发插入数据来提高操作效率,但是也会有随之而来的问题:

  • 当前项目是单体架构,股票数据采集线程和主业务线程共享(线程资源竞争问题),如果股票数据采集线程长时间占用CPU,会造成主业务线程无法正常提供有效服务(线程挤压问题),这时我们可以通过线程池与主业务进行隔离;
  • 线程频繁的创建和销毁会带来非常大的性能开销,我们尽量提高线程的复用性

目录

所以我们还要引入线程池

第一步:在yml文件中配置线程池的基本参数

第二步:在 value object(vo)包下创建一个信息类,来映射这个yml文件的属性 

第三步:书写线程池的配置类,并开启信息类

 第四步:在分组的操作中使用线程池

 线程池的工作流程

线程池的拒绝策略

怎么触发线程池的拒绝策略

 拒绝策略的类型

 自定义拒绝策略

 并发情况

1.并发任务数小于等于核心线程数情况

2.并发任务数大于核心线程数 ,且小于等于(核心线程数+任务队列长度) 

3.阻塞队列已满,且线程池中的线程数为未达到最大线程数: 

4.并发任务数量超过(最大线程数+任务队列长度)的情况; 

如何为线程池设置合适的核心线程数量


所以我们还要引入线程池

第一步:在yml文件中配置线程池的基本参数

  # 定时任务线程池基础参数
task:
  pool:
    corePoolSize: 5 # 核心线程数
    maxPoolSize: 20 # 设置最大线程数
    keepAliveSeconds: 300 # 设置线程活跃时间,单位秒
    queueCapacity: 100 # 设置队列容量

第二步:在 value object(vo)包下创建一个信息类,来映射这个yml文件的属性 


@ConfigurationProperties(prefix = "task.pool")
@Data
public class TaskThreadPoolInfo {
    /**
     *  核心线程数(获取硬件):线程池创建时候初始化的线程数
     */
    private Integer corePoolSize;
    private Integer maxPoolSize;
    private Integer keepAliveSeconds;
    private Integer queueCapacity;
}

第三步:书写线程池的配置类,并开启信息类

@Configuration
@EnableConfigurationProperties(TaskThreadPoolInfo.class)
//开启后才让TaskThreadPoolInfo类成为bean,并自动映射yml文件的值
public class TaskExecutePoolConfig {
    @Autowired
    private TaskThreadPoolInfo taskThreadPoolInfo;
    @Bean(name="threadPoolTaskExecutor",destroyMethod = "shutdown")//使用父类的销毁方法
    public ThreadPoolTaskExecutor threadPoolTaskExecutor(){
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        //设置核心线程池数量
        threadPoolTaskExecutor.setCorePoolSize(taskThreadPoolInfo.getCorePoolSize());
        //设置最大线程数
        threadPoolTaskExecutor.setMaxPoolSize(taskThreadPoolInfo.getMaxPoolSize());
        //设置空闲线程的最大存活时间
        threadPoolTaskExecutor.setKeepAliveSeconds(taskThreadPoolInfo.getKeepAliveSeconds());
        //设置任务队列长度
        threadPoolTaskExecutor.setQueueCapacity(taskThreadPoolInfo.getQueueCapacity());
        //将参数初始化
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }
}

 第四步:在分组的操作中使用线程池

        Lists.partition(allCodes,15).forEach(codes->{ //TODO:15个个股股票编码为一组
            threadPoolTaskExecutor.execute(()->{//TODO:使用线程池,每一组都开一个子线程开执行
                String url=stockInfoConfig.getMarketUrl()+String.join(",",codes);
                //发送请求
                ResponseEntity<String> responseEntity = restTemplate.exchange(url, HttpMethod.GET, httpEntity, String.class);
                //获取响应码
                int statusCodeValue = responseEntity.getStatusCodeValue();
                if(statusCodeValue!=200){
                    log.error("当前时间为:{},采集数据失败",DateTime.now().toString(DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")));
                }
                //获取JS数据
                String resData = responseEntity.getBody();
                //解析js数据封装成个股类对象集合(15个)
                List<StockRtInfo> list = myParserStockInfoUtil.parseStockMarketInfo(resData, ParseStockType.ASHARE);
                //批量插入
                int count = stockRtInfoMapper.insertBatch(list);
                if(count>0){
                    log.info("插入成功");
                }else{
                    log.error("插入失败");
                }
            });
        });
   @Autowired
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

        Lists.partition(list,20).forEach(l->{
            threadPoolTaskExecutor.execute(()->{//每个分片的数据开启一个线程异步执行任务
                //批量插入数据
                int count=stockBlockRtInfoMapper.insertBatch(l);
                if(count>0){
                    log.info("插入成功,数量为:{}",count);
                }else {
                    log.error("当前时间:{},插入失败",DateTime.now().toString(DateTimeFormat.forPattern("yyyy-HH-mm HH:mm:ss")));
                }
            });
        });
        log.info("需要的时间为:{}ms",System.currentTimeMillis()-start);

 线程池的工作流程

说明:

  • 当一个任务通过submit或者execute方法提交到线程池的时候,如果当前池中线程数(包括闲置线程)小于coolPoolSize,则创建一个新的线程执行该任务;
  • 如果当前线程池中线程数已经达到coolPoolSize,则将任务放入等待队列;
  • 如果任务队列已满,则任务无法入队列,此时如果当前线程池中线程数小于maxPoolSize,则创建一个临时线程(非核心线程)执行该任务;
  • 如果当前池中线程数已经等于maxPoolSize,此时无法执行该任务,对于新的任务会根据拒绝执行策略处理;

注意:

当池中线程数大于coolPoolSize,超过keepAliveTime时间的闲置线程会被回收掉。回收的是非核心线程,核心线程一般是不会回收的。如果设置allowCoreThreadTimeOut(true),则核心线程在闲置keepAliveTime时间后也会被回收。

线程池的拒绝策略

怎么触发线程池的拒绝策略

  • 当线程池调用 shutdown() 等方法关闭线程池后,如果再向线程池内提交任务,就会遭到拒绝;
  • 当线程达到最大线程数,且无空闲线程,同时任务队列已经满;

 拒绝策略的类型

  • AbortPolicy(抛出异常中断程序执行)

    这种拒绝策略在拒绝任务时,会直接抛出一个类型为 RejectedExecutionException 的 RuntimeException,让你感知到任务被拒绝了,于是你便可以根据业务逻辑选择重试或者放弃提交等策略(默认)。 说白了不仅不处理当前任务,并且还抛出异常,中断当前任务的执行;

  • DiscardPolicy(任务丢弃不抛出异常)

    当有新任务被提交后直接被丢弃掉,也不会给你任何的通知,相对而言存在一定的风险,因为我们提交的时候根本不知道这个任务会被丢弃,可能造成数据丢失。

  • DiscardOldestPolicy(丢弃存活时长最长的任务)

    丢弃任务队列中的头结点,通常是存活时间最长的任务,它也存在一定的数据丢失风险。

  • CallerRunsPolicy(多出来的任务让主线程去执行)

    第四种拒绝策略是 ,相对而言它就比较完善了,当有新任务提交后,如果线程池没被关闭且没有能力执行,则把这个任务交于提交任务的线程执行,也就是谁提交任务,谁就负责执行任务。 任务线程满了后,改策略可将执行的人为交换给主线程执行,这个过程相当于一个正反馈,此时如果主线程能处理,则处理,如果也不能处理,也就以为这当前服务不能接收新的任务了; 主线程处理任务期间,可以为线程池腾出时间,如果此时有新的空闲线程,那么继续协助主线程处理任务;

 自定义拒绝策略

​ 通过实现RejectedExecutionHandler接口来自定义任务拒绝策略;

 并发情况

1.并发任务数小于等于核心线程数情况

  • 线程池对象初始化时采用延迟加载的方式构建核心线程对象,因为线程对象构建有资源开销;
  • 当线程池内的线程数未达到核心线程数时,此时哪怕线程是空闲的,不会复用线程,而是最大努力构建新的线程,直到达到核心线程数为止;

2.并发任务数大于核心线程数 ,且小于等于(核心线程数+任务队列长度) 

结论:当核心线程数使用完后,多余的任务优先压入任务队列;

  • 当核心线程数被占用,且有新的任务时,优先将新的任务压入队列;
  • 当核心线程数已满,且阻塞队列如果未填满,会持续填入;
  • 当核心线程存在空闲时,会主动去阻塞队列下领取新的任务处理;

3.阻塞队列已满,且线程池中的线程数为未达到最大线程数: 

结论:

  • 当核心线程数已被使用,且任务队列已满,优先开启临时线程处理任务;
  • 线程执行流程:1.先扩容5个核心线程处理任务 2.将多余的10个任务压入队列 3.剩下的构建临时线程处理任务(最大线程数=临时线程数+核心线程数)
  • 临时线程如果空闲,且达到keepAliveSenconds指定的最大存活时间,则会被淘汰,直到达到核心线程数为止;

4.并发任务数量超过(最大线程数+任务队列长度)的情况; 

 默认采用AbortPolicy策略,直接中断程序执行

结论:

  • 什么时候触发任务的拒绝策略?
    • 1.任务队列已满,且没有空闲的线程(线程已经达到最大线程数),则会触发拒绝策略;
    • 2.线程池对象sutdown关闭拒绝服务时,也会触发拒绝策略;
    • 3.拒绝策略:
      • jdk给我们提供了4种;
        • AbortPolicy 抛出异常,终止程序允许
        • DiscardPolicy 丢弃新的任务,不抛出异常
        • DiscardOldestPolicy 丢弃旧的任务,不抛出异常
        • CallsRunerPolicy 委托主线程执行任务 不抛出异常

如何为线程池设置合适的核心线程数量

目前根据一些开源框架,设置多少个核心线程数量通常是根据应用的类型**:I/O 密集型、CPU 密集型。**

  • I/O密集型

    • I/O密集型的场景在开发中比较常见,比如像 MySQL数据库读写、文件的读写、网络通信等任务,这类任务不会 特别消耗CPU资源,但是IO操作比较耗时,会占用比较多时间;
    • IO密集型核心线程数通常设置为 2n+1,其中 n 为 CPU 核数;
    • 说白了,对于i/o密集型的场景,不太占用cpu资源,所以并发的任务数大于cpu的核数,这样的话能更加充分的利用CPU资源;
  • CPU密集型

    • CPU密集型的场景,比如像加解密,压缩、计算等一系列需要大量耗费 CPU 资源的任务,这些场景大部分都是纯 CPU计算;
    • CPU密集型核心线程数通常设置为n+1,这样也可避免多线程环境下CPU资源挣钱带来上下文频繁切换的开销;