//把编码信息分组进行查询,不然请求url过长,会出现错误
//使用google的guava包
Lists.partition(allCodes,15).forEach(codes->{//15个股票编码为一组
//TODO:使用RestTemplate发送请求,获取个股的JS格式信息
//设置url
String url=stockInfoConfig.getMarketUrl()+String.join(",",codes);
/* //设置请求头
HttpHeaders headers = new HttpHeaders();
//必须填写,否则数据采集不到
headers.add("Referer","https://finance.sina.com.cn/stock/");
headers.add("User-Agent","Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36");
//组装请求对象
HttpEntity<Object> httpEntity = new HttpEntity<>(headers);*/
//使用restTemplate发送请求数据
ResponseEntity<String> responseEntity = restTemplate.exchange(url, HttpMethod.GET, httpEntity, String.class);
//获取响应码
int statusCode= responseEntity.getStatusCodeValue();
if(statusCode!=200){
log.error("时间:{},采集数据出错,响应码状态为:{}", DateTime.now().toString(DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")),statusCode);
return;
}
//获取js响应数据
String resData = responseEntity.getBody();//TODO:会查询出15行个股JS数据
log.info("时间:{},采集数据成功,数据为:{}",DateTime.now().toString(DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")),resData);
//TODO:使用工具类把JS数据封装成个股entity对象
List<StockRtInfo> list = myParserStockInfoUtil.parseStockMarketInfo(resData, ParseStockType.ASHARE);
//TODO:批量把15个 个股entity对象入库
int count=stockRtInfoMapper.insertBatch(list);
if(count>0){
log.info("插入成功");
}else {
log.error("插入失败");
}
});
分组对个股股票进行查询插入时,是串行执行的,是一组一组依次执行,如果个股的数量太多,数据IO时间成本较高,则会导致运行超时,所以我们可引入多线程并发插入数据来提高操作效率,但是也会有随之而来的问题:
- 当前项目是单体架构,股票数据采集线程和主业务线程共享(线程资源竞争问题),如果股票数据采集线程长时间占用CPU,会造成主业务线程无法正常提供有效服务(线程挤压问题),这时我们可以通过线程池与主业务进行隔离;
- 线程频繁的创建和销毁会带来非常大的性能开销,我们尽量提高线程的复用性
目录
第二步:在 value object(vo)包下创建一个信息类,来映射这个yml文件的属性
2.并发任务数大于核心线程数 ,且小于等于(核心线程数+任务队列长度)
所以我们还要引入线程池
第一步:在yml文件中配置线程池的基本参数
# 定时任务线程池基础参数
task:
pool:
corePoolSize: 5 # 核心线程数
maxPoolSize: 20 # 设置最大线程数
keepAliveSeconds: 300 # 设置线程活跃时间,单位秒
queueCapacity: 100 # 设置队列容量
第二步:在 value object(vo)包下创建一个信息类,来映射这个yml文件的属性
@ConfigurationProperties(prefix = "task.pool")
@Data
public class TaskThreadPoolInfo {
/**
* 核心线程数(获取硬件):线程池创建时候初始化的线程数
*/
private Integer corePoolSize;
private Integer maxPoolSize;
private Integer keepAliveSeconds;
private Integer queueCapacity;
}
第三步:书写线程池的配置类,并开启信息类
@Configuration
@EnableConfigurationProperties(TaskThreadPoolInfo.class)
//开启后才让TaskThreadPoolInfo类成为bean,并自动映射yml文件的值
public class TaskExecutePoolConfig {
@Autowired
private TaskThreadPoolInfo taskThreadPoolInfo;
@Bean(name="threadPoolTaskExecutor",destroyMethod = "shutdown")//使用父类的销毁方法
public ThreadPoolTaskExecutor threadPoolTaskExecutor(){
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
//设置核心线程池数量
threadPoolTaskExecutor.setCorePoolSize(taskThreadPoolInfo.getCorePoolSize());
//设置最大线程数
threadPoolTaskExecutor.setMaxPoolSize(taskThreadPoolInfo.getMaxPoolSize());
//设置空闲线程的最大存活时间
threadPoolTaskExecutor.setKeepAliveSeconds(taskThreadPoolInfo.getKeepAliveSeconds());
//设置任务队列长度
threadPoolTaskExecutor.setQueueCapacity(taskThreadPoolInfo.getQueueCapacity());
//将参数初始化
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
}
第四步:在分组的操作中使用线程池
Lists.partition(allCodes,15).forEach(codes->{ //TODO:15个个股股票编码为一组
threadPoolTaskExecutor.execute(()->{//TODO:使用线程池,每一组都开一个子线程开执行
String url=stockInfoConfig.getMarketUrl()+String.join(",",codes);
//发送请求
ResponseEntity<String> responseEntity = restTemplate.exchange(url, HttpMethod.GET, httpEntity, String.class);
//获取响应码
int statusCodeValue = responseEntity.getStatusCodeValue();
if(statusCodeValue!=200){
log.error("当前时间为:{},采集数据失败",DateTime.now().toString(DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")));
}
//获取JS数据
String resData = responseEntity.getBody();
//解析js数据封装成个股类对象集合(15个)
List<StockRtInfo> list = myParserStockInfoUtil.parseStockMarketInfo(resData, ParseStockType.ASHARE);
//批量插入
int count = stockRtInfoMapper.insertBatch(list);
if(count>0){
log.info("插入成功");
}else{
log.error("插入失败");
}
});
});
@Autowired
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
Lists.partition(list,20).forEach(l->{
threadPoolTaskExecutor.execute(()->{//每个分片的数据开启一个线程异步执行任务
//批量插入数据
int count=stockBlockRtInfoMapper.insertBatch(l);
if(count>0){
log.info("插入成功,数量为:{}",count);
}else {
log.error("当前时间:{},插入失败",DateTime.now().toString(DateTimeFormat.forPattern("yyyy-HH-mm HH:mm:ss")));
}
});
});
log.info("需要的时间为:{}ms",System.currentTimeMillis()-start);
线程池的工作流程
说明:
- 当一个任务通过submit或者execute方法提交到线程池的时候,如果当前池中线程数(包括闲置线程)小于coolPoolSize,则创建一个新的线程执行该任务;
- 如果当前线程池中线程数已经达到coolPoolSize,则将任务放入等待队列;
- 如果任务队列已满,则任务无法入队列,此时如果当前线程池中线程数小于maxPoolSize,则创建一个临时线程(非核心线程)执行该任务;
- 如果当前池中线程数已经等于maxPoolSize,此时无法执行该任务,对于新的任务会根据拒绝执行策略处理;
注意:
当池中线程数大于coolPoolSize,超过keepAliveTime时间的闲置线程会被回收掉。回收的是非核心线程,核心线程一般是不会回收的。如果设置allowCoreThreadTimeOut(true),则核心线程在闲置keepAliveTime时间后也会被回收。
线程池的拒绝策略
怎么触发线程池的拒绝策略
- 当线程池调用 shutdown() 等方法关闭线程池后,如果再向线程池内提交任务,就会遭到拒绝;
- 当线程达到最大线程数,且无空闲线程,同时任务队列已经满;
拒绝策略的类型
AbortPolicy(抛出异常中断程序执行)
这种拒绝策略在拒绝任务时,会直接抛出一个类型为 RejectedExecutionException 的 RuntimeException,让你感知到任务被拒绝了,于是你便可以根据业务逻辑选择重试或者放弃提交等策略(默认)。 说白了不仅不处理当前任务,并且还抛出异常,中断当前任务的执行;
DiscardPolicy(任务丢弃不抛出异常)
当有新任务被提交后直接被丢弃掉,也不会给你任何的通知,相对而言存在一定的风险,因为我们提交的时候根本不知道这个任务会被丢弃,可能造成数据丢失。
DiscardOldestPolicy(丢弃存活时长最长的任务)
丢弃任务队列中的头结点,通常是存活时间最长的任务,它也存在一定的数据丢失风险。
CallerRunsPolicy(多出来的任务让主线程去执行)
第四种拒绝策略是 ,相对而言它就比较完善了,当有新任务提交后,如果线程池没被关闭且没有能力执行,则把这个任务交于提交任务的线程执行,也就是谁提交任务,谁就负责执行任务。 任务线程满了后,改策略可将执行的人为交换给主线程执行,这个过程相当于一个正反馈,此时如果主线程能处理,则处理,如果也不能处理,也就以为这当前服务不能接收新的任务了; 主线程处理任务期间,可以为线程池腾出时间,如果此时有新的空闲线程,那么继续协助主线程处理任务;
自定义拒绝策略
通过实现RejectedExecutionHandler接口来自定义任务拒绝策略;
并发情况
1.并发任务数小于等于核心线程数情况
- 线程池对象初始化时采用延迟加载的方式构建核心线程对象,因为线程对象构建有资源开销;
- 当线程池内的线程数未达到核心线程数时,此时哪怕线程是空闲的,不会复用线程,而是最大努力构建新的线程,直到达到核心线程数为止;
2.并发任务数大于核心线程数 ,且小于等于(核心线程数+任务队列长度)
结论:当核心线程数使用完后,多余的任务优先压入任务队列;
- 当核心线程数被占用,且有新的任务时,优先将新的任务压入队列;
- 当核心线程数已满,且阻塞队列如果未填满,会持续填入;
- 当核心线程存在空闲时,会主动去阻塞队列下领取新的任务处理;
3.阻塞队列已满,且线程池中的线程数为未达到最大线程数:
结论:
- 当核心线程数已被使用,且任务队列已满,优先开启临时线程处理任务;
- 线程执行流程:1.先扩容5个核心线程处理任务 2.将多余的10个任务压入队列 3.剩下的构建临时线程处理任务(最大线程数=临时线程数+核心线程数)
- 临时线程如果空闲,且达到keepAliveSenconds指定的最大存活时间,则会被淘汰,直到达到核心线程数为止;
4.并发任务数量超过(最大线程数+任务队列长度)的情况;
默认采用AbortPolicy策略,直接中断程序执行
结论:
- 什么时候触发任务的拒绝策略?
- 1.任务队列已满,且没有空闲的线程(线程已经达到最大线程数),则会触发拒绝策略;
- 2.线程池对象sutdown关闭拒绝服务时,也会触发拒绝策略;
- 3.拒绝策略:
- jdk给我们提供了4种;
- AbortPolicy 抛出异常,终止程序允许
- DiscardPolicy 丢弃新的任务,不抛出异常
- DiscardOldestPolicy 丢弃旧的任务,不抛出异常
- CallsRunerPolicy 委托主线程执行任务 不抛出异常
- jdk给我们提供了4种;
如何为线程池设置合适的核心线程数量
目前根据一些开源框架,设置多少个核心线程数量通常是根据应用的类型**:I/O 密集型、CPU 密集型。**
I/O密集型
- I/O密集型的场景在开发中比较常见,比如像 MySQL数据库读写、文件的读写、网络通信等任务,这类任务不会 特别消耗CPU资源,但是IO操作比较耗时,会占用比较多时间;
- IO密集型核心线程数通常设置为 2n+1,其中 n 为 CPU 核数;
- 说白了,对于i/o密集型的场景,不太占用cpu资源,所以并发的任务数大于cpu的核数,这样的话能更加充分的利用CPU资源;
CPU密集型
- CPU密集型的场景,比如像加解密,压缩、计算等一系列需要大量耗费 CPU 资源的任务,这些场景大部分都是纯 CPU计算;
- CPU密集型核心线程数通常设置为n+1,这样也可避免多线程环境下CPU资源挣钱带来上下文频繁切换的开销;