SpringBoot教程(二十一) | SpringBoot实现定时任务

发布于:2024-08-15 ⋅ 阅读:(76) ⋅ 点赞:(0)

参考文章:
【1】IDEA SpringBoot实现定时任务(保姆级教程,超详细!!!)
【2】SpringBoot整合分布式任务调度Elastic-Job
【3】ElasticJob3.0整合SpringBoot,ElasticJob-Lite【ElasticJob入门篇】

单点定时任务

方式一:使用@Scheduled+@EnableScheduling注解

在Spring Boot项目中使用@Scheduled注解实现定时任务时,你通常不需要额外导入特定的依赖,
因为@Scheduled是Spring框架的核心功能之一,并且Spring Boot会自动配置与调度相关的组件。

@EnableScheduling:Spring框架提供的一个注解,用于启用基于注解的定时任务调度功能。当在Spring的配置类(如使用@Configuration注解的类)上使用@EnableScheduling注解时,Spring会自动配置一个任务调度器(TaskScheduler),负责管理所有带有@Scheduled注解的方法。

@Scheduled:Spring框架中用于定时任务调度的注解。它允许开发者将一个方法标记为定时任务,并配置任务的执行时间间隔或Cron表达式,从而实现在指定时间或按照指定周期自动执行该方法的功能。 除了配置Cron表达式外,还可以通过fixedRate和fixedDelay两种方式设置定时任务,这两种方式可以自行了解。

@Slf4j
@Component
@EnableScheduling
public class DemoTask {
 
    // 每5秒执行一次
    @Scheduled(cron = "0/5 * * * * ? ")
    public void testSchedule1() {
        log.info("第 1 个定时任务"+"当前执行任务的线程号ID===>{}", Thread.currentThread().getId()); // 日志输出
    }

    // 每10秒执行一次
    @Scheduled(cron = "0/10 * * * * ? ")
    public void testSchedule2() {
        log.info("第 2 个定时任务"+"当前执行任务的线程号ID===>{}", Thread.currentThread().getId()); // 日志输出
    }
 
}

项目启动后
在这里插入图片描述
是成功且正常执行的
当我把阻塞sleep加进去以后

@Slf4j
@Component
@EnableScheduling
public class DemoTask {
 
    // 每10秒执行一次
    @Scheduled(cron = "0/10 * * * * ? ")
    public void testSchedule1() {
         Thread.sleep(10000);//休眠10秒
        log.info("第 1 个定时任务"+"当前执行任务的线程号ID===>{}", Thread.currentThread().getId()); // 日志输出
    }

    // 每10秒执行一次
    @Scheduled(cron = "0/10 * * * * ? ")
    public void testSchedule2() {
        log.info("第 2 个定时任务"+"当前执行任务的线程号ID===>{}", Thread.currentThread().getId()); // 日志输出
    }
 
}

在这里插入图片描述

会发现怎么定时任务2变成间隔20秒执行一次了
是因为会执行这个定时任务使用的线程号ID都是同一个,任务1堵塞了10秒导致影响了后面任务2的执行(说明都用了同一个线程去执行定时任务的,简直巨坑!!!)。

巨坑(@Scheduled任务都用了同一个线程去执行,导致定时任务存在堵塞)

解决办法一:添加自定义的ThreadPoolTaskScheduler配置(为调度配置多个线程)

默认情况下,Spring会尝试在Spring应用上下文中查找一个名为taskScheduler的bean,这个bean必须是TaskScheduler接口的实现。
一旦找到了这个bean,Spring就会使用它来调度所有@Scheduled注解标记的方法。
所以Bean的name不能随便乱写

import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;  
  
@Configuration  
public class SchedulerConfig {  
     
    //这个bean名字不要乱改,否则会不生效
    @Bean(name = "taskScheduler")  
    public ThreadPoolTaskScheduler taskScheduler() {  
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();  
        // 设置线程池的核心大小  
        scheduler.setPoolSize(10);  
        // 其他设置,如线程名称前缀等(可选)  
        scheduler.setThreadNamePrefix("wocao-task-");  
        // 初始化调度器  
        scheduler.initialize();  
        return scheduler;  
    }  
}

效果如下:

定时任务1不会堵塞定时任务2了,且 定时任务 都是10秒钟执行一次,不会存在堵塞延迟

在这里插入图片描述

解决办法二(建议用这个):使用异步包裹定时任务

(1)首先配置自定义线程池

@Configuration
public class DemoTheadPoolConfig {

    @Bean(name = "taskExecutor")
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //设置核心线程数
        executor.setCorePoolSize(10);
        //设置最大线程数
        executor.setMaxPoolSize(20);
        //缓冲队列200:用来缓冲执行任务的队列
        executor.setQueueCapacity(200);
        //线程活路时间 60 秒
        executor.setKeepAliveSeconds(60);
        //线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
        executor.setThreadNamePrefix("demo-thread-");
        //设置拒绝策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setWaitForTasksToCompleteOnShutdown(true);
        return executor;
    }
}

(2) 执行异步操作

方法一:使用工具类CompletableFuture.runAsync

CompletableFuture.runAsync 是 Java 8 引入的一个非常有用的工具,用于异步执行任务
runAsync 方法用于启动一个异步任务,这个任务不返回结果(即返回类型为 void)。

基本用法

runAsync 方法有两种形式:

  1. 无参数版本:使用系统默认的 ForkJoinPool 来异步执行任务。

    CompletableFuture.runAsync(() -> {
        // 这里是异步执行的任务
        System.out.println("异步任务执行中:" + Thread.currentThread().getName());
    });
    

    在这个例子中,runAsync 接收一个 Runnable 函数式接口的实现(即一个不接受参数且不返回结果的 run 方法),并在一个默认的线程池中异步执行这个任务。

  2. 带 Executor 参数版本:允许你指定一个自定义的 Executor 来执行异步任务。

    Executor executor = Executors.newFixedThreadPool(5); // 创建一个固定大小的线程池
    CompletableFuture.runAsync(() -> {
        // 这里是异步执行的任务
        System.out.println("异步任务执行中,使用自定义线程池:" + Thread.currentThread().getName());
    }, executor);
    

    在这个例子中,runAsync 除了接收一个 Runnable 任务外,还接收一个 Executor 参数,允许你控制异步任务的执行线程。

@Slf4j
@Component
@EnableScheduling
public class DemoTask {

    @Autowired
    @Qualifier("taskExecutor") // 确保使用我们自定义的线程池
    private TaskExecutor taskExecutor; // 注入 TaskExecutor
 
    // 每10秒执行一次
    @Scheduled(cron = "0/10 * * * * ? ")
    public void testSchedule1() {
        CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(10000); // 休眠10秒,模拟业务场景执行时间
                log.info("第 1 个定时任务"+"当前执行任务的线程号ID===>{}", Thread.currentThread().getId()); // 日志输出
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, taskExecutor);
    }

    // 每10秒执行一次
    @Scheduled(cron = "0/10 * * * * ? ")
    public void testSchedule2() {
        CompletableFuture.runAsync(() -> {
            try {
                log.info("第 2 个定时任务"+"当前执行任务的线程号ID===>{}", Thread.currentThread().getId()); // 日志输出
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, taskExecutor);
    }
 
}

效果如下

定时任务1不会堵塞定时任务2了,且 定时任务 都是10秒钟执行一次,不会存在堵塞延迟

在这里插入图片描述

方法二:使用@Async+@EnableAsync

@EnableAsync:开启异步任务
@Async:给希望异步执行的方法标注
一般使用@Async都会指定自定义的线程池
在此处的例子应该写成这样@Async(value = “TaskExecutor”)

@Slf4j
@Component
@EnableAsync
@EnableScheduling
public class DemoTask {
 
    // 每10秒执行一次
    @Async("taskExecutor")//指定自定义的线程池
    @Scheduled(cron = "0/10 * * * * ? ")
    public void testSchedule1() {
        try {
            Thread.sleep(10000); // 休眠10秒,模拟业务场景执行时间
            log.info("第 1 个定时任务"+"当前执行任务的线程号ID===>{}", Thread.currentThread().getId()); // 日志输出
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

   // 每10秒执行一次
    @Async("taskExecutor")//指定自定义的线程池
    @Scheduled(cron = "0/10 * * * * ? ")
    public void testSchedule2() {
        try {
            log.info("第 2 个定时任务"+"当前执行任务的线程号ID===>{}", Thread.currentThread().getId()); // 日志输出
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

效果如下

定时任务1不会堵塞定时任务2了,且 定时任务 都是10秒钟执行一次,不会存在堵塞延迟

在这里插入图片描述

分布式定时任务

方式一:使用elastic-job

elastic-job是当当网基于quartz 二次开发的弹性分布式任务调度系统,功能丰富强大,采用zookeeper实现分布式协调,实现任务高可用以及分片。
Elastic-Job是一个分布式调度的解决方案,由当当网开源,它由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。
使用Elastic-Job可以快速实现分布式任务调度。

zk在elastic-job的作用

Elastic-Job依赖ZooKeeper完成对执行任务信息的存储(如任务名称、任务参与实例、任务执行策略等);
Elastic-Job依赖ZooKeeper实现选举机制,在任务执行实例数量变化时(如在快速上手中的启动新实例或停止实例),会触发选举机制来决定让哪个实例去执行该任务。

我这边主要讲SpringBoot,所以肯定会采用场景启动器starter的

什么是场景启动器?
也就是spring-boot-starter- 开头的依赖,
以下是一些常见的Spring Boot场景启动器示例:
(1)spring-boot-starter-data-jpa:包含Spring Data JPA和Hibernate的依赖,用于简化数据库访问和JPA(Java Persistence API)的使用。
(2)spring-boot-starter-data-mongodb:包含Spring Data MongoDB的依赖,用于简化MongoDB数据库的访问。
(3)spring-boot-starter-security:包含Spring Security的依赖,用于添加安全功能,如用户认证和授权。
(4)… … … …

使用Spring Boot的场景启动器确实可以让您避免手动添加所需的依赖,因为它已经为您预先定义并包含了这些依赖。这样,您就可以更专注于业务逻辑的实现,而不是花费大量时间在依赖管理和配置上了。

1、引入相关依赖

我本地的SpringBoot为2.6.4版本,zk版本为3.7版本
此处的版本elasticjob就使用了3.0.1 ,我之前尝试换了3.0.2 发现项目启动报错了(具体原因暂未发现)

<dependency>
    <groupId>org.apache.shardingsphere.elasticjob</groupId>
    <artifactId>elasticjob-lite-spring-boot-starter</artifactId>
    <version>3.0.1</version>
</dependency>

2、application.yml中配置注册中心和作业调度

server:
  port: 9999

#elasticjob配置
elasticjob:
  # 注册中心配置
  reg-center:
    # 连接 ZooKeeper 服务器的列表, 包括 IP 地址和端口号,多个地址用逗号分隔
    server-lists: 127.0.0.1:2188
    # ZooKeeper 的命名空间
    namespace: elastic-job-spring
    # 等待重试的间隔时间的初始毫秒数
    base-sleep-time-milliseconds: 1000
    # 等待重试的间隔时间的最大毫秒数
    maxSleepTimeMilliseconds: 3000
    # 最大重试次数
    maxRetries: 3
    # 会话超时毫秒数
    sessionTimeoutMilliseconds: 60000
    # 连接超时毫秒数
    connectionTimeoutMilliseconds: 15000
  # 作业配置, 更多的配置参考官网
  jobs:
    springJob: # job名
      elasticJobClass: com.harvey.demo.job.SpringBootJob
      cron: 0/5 * * * * ?
      shardingTotalCount: 2
      shardingItemParameters: 0=Beijing,1=Shanghai
      overwrite: true #修改有效必须加
    springTestJob:
      elasticJobClass: com.harvey.demo.job.SpringRunnerJob
      cron: 0/10 * * * * ?
      shardingTotalCount: 3
      shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
      overwrite: true #修改有效必须加

作业调度参数讲解

属性名 含义 类型 是否必填
elasticJobClass 定时任务的全路径名 String 必填
cron 定时任务执行的cron表达式 String 必填
shardingTotalCount 作业分片总数,一般情况下设置为1足够了 int 必填
shardingItemParameters 分片序列号和参数用等号分隔,多个键值对用逗号分隔。
分片序列号从0开始,不可大于或等于作业分片总数。 如:0=a,1=b,2=c
String 非必填
巨坑(配置修改无效)

修改了jobs里面的配置(比如cron、shardingTotalCount之类的)没有作用,跑的还是之前的旧配置

最开始的解决方案
windows环境下我的的解决方法是上zookeeper使用命令删除那个命名空间才行

1.先双击zkServer.cmd启动 zookeeper 服务器
2.再双击zkCli.cmd启动 zookeeper客户端
3.然后在客户端里面使用 查命令: ls /
4.接着 执行 删除命令: deleteall /elastic-job-spring

想了半天,发现不可能这么大的问题网上没有人发现吗?不可能,于是去官网看了一下,发现有一个参数overwrite(每次启动作业都以本地配置为准。)
于是我把它配置上去,最终解决这个巨坑

3、job实例

第一个job:

@Component
public class SpringBootJob implements SimpleJob {

    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println(this.getClass() + ":分片总数是: [{" + shardingContext.getShardingTotalCount() + "}]; 当前分片项是: [{" + shardingContext.getShardingItem() + "}]");
        //分片参数,(0=text,1=image,2=video,参数就是text、image...)
        String shardingParameter = shardingContext.getShardingParameter();
        //任务参数, 配置是name=test就是name=test
        String jobParameter = shardingContext.getJobParameter();
        System.out.println("current sharding item " + shardingContext.getShardingItem() + ", shardingParameter = " + shardingParameter + ", jobParameter:" + jobParameter);
    }
}

第二个job:

@Component
public class SpringRunnerJob implements SimpleJob {

    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println(this.getClass() + ":分片总数是: [{" + shardingContext.getShardingTotalCount() + "}]; 当前分片项是: [{" + shardingContext.getShardingItem() + "}]");
        //分片参数,(0=text,1=image,2=video,参数就是text、image...)
        String shardingParameter = shardingContext.getShardingParameter();
        //任务参数
        String jobParameter = shardingContext.getJobParameter();
        System.out.println("current sharding item " + shardingContext.getShardingItem() + ", shardingParameter = " + shardingParameter + ", jobParameter:" + jobParameter);
    }
}

启动项目,可以看到相关的日志。

ElasticJob-UI监控平台

适用于【场景启动器starter】方式开发的elastic-job。

下载ElasticJob-UI

https://shardingsphere.apache.org/elasticjob/current/cn/downloads/

在这里插入图片描述
解压后在bin目录双击启动
(本人是在windows环境)
在这里插入图片描述

访问控制台

启动后游览器访问(默认端口是8088):http://127.0.0.1:8088/#/login 用户名/密码 root/root
登录成功后,链接上注册中心,链接成功后便可以进行任务的管理。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述