Quartz
文章目录
1. Quartz介绍
什么是 任务调度 ?任务调度就是我们系统中创建了 N 个任务,每个任务都有指定的时间进行执行,而这种多任务的执行策略就是任务调度。
任务调度框架“Quartz”是OpenSymphony开源组织在Job scheduling领域又一个开源项目,是完全由java开发的一个开源的任务日程管理系统,“任务进度管理器”就是一个在预先确定(被纳入日程)的时间到达时,负责执行(或者通知)其他软件组件的系统。
quartz 的作用就是让任务调度变得更加丰富,高效,安全,而且是基于 Java 实现的,这样子开发者只需要调用几个接口做下简单的配置,即可实现上述需求。
2. Quartz核心概念
任务 Job
Job 就是你想要实现的任务类,每一个 Job 必须实现 org.quartz.job 接口,且只需实现接口定义的 execute() 方法。
触发器 Trigger
Trigger 为你执行任务的触发器,比如你想每天定时3点发送一份统计邮件,Trigger 将会设置3点执行该任务。
Trigger 主要包含两种 SimplerTrigger 和 CronTrigger 两种。详见 7.9 与 7.10
调度器 Scheduler
Scheduler 为任务的调度器,它会将任务 Job 及触发器 Trigger 整合起来,负责基于 Trigger 设定的时间来执行 Job。
3. Quart的使用
3.1 引入Quartz的jar包
按照官网的 Demo,搭建一个纯 maven 项目,添加依赖:
<!-- 核心包 -->
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.3.0</version>
</dependency>
<!-- 工具包 -->
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz-jobs</artifactId>
<version>2.3.0</version>
</dependency>
3.2 入门案例
新建一个任务,实现了 org.quartz.Job 接口:
// 定义任务类
public class HelloJob implements Job {
@Override
public void execute(JobExecutionContext arg0) throws JobExecutionException {
// 输出当前时间
ystem.out.println(new Date());
}
}
main 方法,创建调度器、jobDetail 实例、trigger 实例、执行:
public class HelloSchedulerDemo {
public static void main(String[] args) throws Exception {
// 1、调度器(Scheduler),从工厂中获取调度的实例(默认:实例化new StdSchedulerFactory();)
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
// 2、任务实例(JobDetail)定义一个任务调度实例,将该实例与HelloJob绑定,任务类需要实现Job接口
JobDetail jobDetail = JobBuilder.newJob() // 加载任务类,与HelloJob完成绑定,要求HelloJob实现Job接口
.withIdentity("job1", "group1") // 参数1:任务的名称(唯一实例);参数2:任务组的名称
.build();
// 3、触发器(Trigger)定义触发器,马上执行,然后每5秒重复执行一次
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("trigger1", "group1") // 参数1:触发器的名称(唯一实例);参数2:触发器组的名称
.startNow() // 马上启动触发器
.withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(2).repeatForever()) // 每2秒重复执行一次
.build();
// 4、让调度器关联任务和触发器,保证按照触发器定义的调整执行任务
scheduler.scheduleJob(jobDetail, trigger);
// 5、启动
scheduler.start();
// 关闭
//scheduler.shutdown();
}
}
3.3 Job和JobDetail
Job: 工作任务调度的接口,任务需要实现该接口。该接口中定义execute方法,类似JDK提供的TimeTask类的run方法。在里面编写任务执行的业务逻辑。
Job实例在Quartz中的生命周期:每次调度器执行Job时,它在调用execute方法前会创建一个新的 Job 实例,当调用完成后,关联的Job对象实例会被释放,释放的实例会被垃圾回收机制回收。
JobDetail: JobDetail为Job实例提供了许多设置属性,以及JobDataMap成员变量属性,它用来存储特定Job实例的状态信息,调度器需要借助JobDetail对象来添加Job实例。
JobDetail重要属性:name、group、jobClass、JobDataMap
JobDetail job = JobBuilder.newJob(HelloJob.class)
.withIdentity("job1", "group1") // 定义该实例唯一标识,并指定一个组
.build();
System.out.println("name:" +job.getKey().getName());
System.out.println("group:" +job.getKey().getGroup());
System.out.println("jobClass:" +job.getJobClass().getName());
为什么设计成JobDetail + Job,不直接使用Job?
JobDetail 定义的是任务数据,而真正的执行逻辑是在Job中。
这是因为任务是有可能并发执行,如果Scheduler直接使用Job,就会存在对同一个Job实例并发访问的问题。
而JobDetail & Job 方式,Sheduler每次执行,都会根据JobDetail创建一个新的Job实例,这样就可以 规避并发访问的问题。
3.4 JobExecutionContext
- 当 Scheduler 调用一个 Job,就会将 JobExecutionContext 传递给 Job 的 execute() 方法;
- Job 能通过 JobExecutionContext 对象访问到 Quartz 运行时候的环境以及 Job 本身的明细数据。
任务实现的 execute() 方法,可以通过 context 参数获取。
public interface Job {
void execute(JobExecutionContext context)
throws JobExecutionException;
}
public class HelloJob implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
Trigger trigger = jobExecutionContext.getTrigger(); //获取Trigger
JobDetail jobDetail = jobExecutionContext.getJobDetail(); //获取JobDetail
Scheduler scheduler = jobExecutionContext.getScheduler(); //获取Scheduler
trigger.getKey().getName(); //获取Trigger名字
trigger.getKey().getGroup(); //获取Trigger组名(默认为 DEFAULT)
jobExecutionContext.getScheduledFireTime(); //触发器触发的预定时间。
jobExecutionContext.getFireTime(); //实际触发时间。例如,计划时间可能是 10:00:00,但如果调度程序太忙,实际触发时间可能是 10:00:03。
jobExecutionContext.getPreviousFireTime(); //上次触发时间
jobExecutionContext.getNextFireTime(); //下次触发时间
System.out.println(new Date());
}
}
3.5 JobDataMap
3.5.1 使用Map获取
- 在进行任务调度时,JobDataMap 存储在 JobExecutionContext 中,非常方便获取。
- JobDataMap 可以用来装载任何可序列化的数据对象,当 Job 实例对象被执行时这些参数对象会传递给它。
- JobDataMap 实现了 JDK 的 Map 接口,并且添加了非常方便的方法用来存取基本数据类型。
在定义 Trigger 或者 JobDetail 时,将 JobDataMap 传入,然后 Job 中便可以获取到 JobDataMap 中的参数
public class HelloScheduler {
public static void main(String[] args) throws SchedulerException {
//1. 调度器(Scheduler)
Scheduler defaultScheduler = StdSchedulerFactory.getDefaultScheduler();
JobDataMap jobDataMap2 = new JobDataMap();
jobDataMap2.put("message", "JobDetailMessage");
//2. 任务实例(JobDetail)
JobDetail jobDetail = JobBuilder.newJob(HelloJob.class)
.withIdentity("job1", "jobGroup1")
.usingJobData(jobDataMap2)
.build();
// 定义 JobDataMap
JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put("message", "TriggerMessage");
//3. 触发器(Trigger)
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("trigger1", "triggerGroup1")
.startNow()
.withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(2).repeatForever())
.endAt(new Date(new Date().getTime() + 3000L))
.usingJobData(jobDataMap) // 将 JobDataMap 放入 Trigger 中
.build();
defaultScheduler.scheduleJob(jobDetail, trigger);
defaultScheduler.start();
}
}
HelloJob.java:
public class HelloJob implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
System.out.println(jobExecutionContext.getTrigger().getJobDataMap().get("message")); //TriggerMessage
System.out.println(jobExecutionContext.getJobDetail().getJobDataMap().get("message")); //JobDetailMessage
System.out.println(jobExecutionContext.getMergedJobDataMap().get("message")); //TriggerMessage
System.out.println(new Date());
}
}
3.5.2 使用 Setter 方法获取
Job实现类中添加setter方法对应JobDataMap的键值,Quartz框架默认的JobFactory实现类在初始化Job实例对象时会自动调用这些setter方法。
HelloScheduler 类和上面一样。
HelloJob.java:
@Data
public class HelloJob implements Job {
private String message;
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
System.out.println(message); //TriggerMessage
System.out.println(new Date());
}
}
注意:如果遇到同名的 key,Trigger 中 JobDataMap 的值会覆盖 JobDetail 中 JobDataMap 同名的 Key
3.6 Job 状态参数
有状态的 job 可以理解为多次 job调用期间可以持有一些状态信息,这些状态信息存储在 JobDataMap 中。
而默认的无状态 job,每次调用时都会创建一个新的 JobDataMap。
示例如下:
(1)修改HelloSchedulerDemo.java。在 JobDetail 中添加 .usingJobData(“count”, 0),表示计数器。
JobDetail job = JobBuilder.newJob(HelloJob.class)
.withIdentity("job1", "group1") // 定义该实例唯一标识,并指定一个组
.usingJobData("message", "打印日志")
.usingJobData("count", 0)
.build();
(2)HelloJob.java
- 这里使用上面介绍的两种写法
@Data
@PersistJobDataAfterExecution
public class HelloJob implements Job {
private Integer count;
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
System.out.println(++count);
jobExecutionContext.getJobDetail().getJobDataMap().put("count", count);
}
}
//多次调用 Job 的时候,将参数保留在 JobDataMap
@PersistJobDataAfterExecution
public class HelloJob implements Job {
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
long count = (long) context.getJobDetail().getJobDataMap().get("count");
System.out.println("当前执行,第" + count + "次");
context.getJobDetail().getJobDataMap().put("count", ++count);
}
}
HelloJob类没有添加 @PersistJobDataAfterExecution 注解,每次调用时都会创建一个新的 JobDataMap。不会累加。
HelloJob类添加 @PersistJobDataAfterExecution 注解,多次调用期间可以持有一些状态信息,即可以实现 count 的累加。
3.7 Trigger
Trigger是Quartz的触发器,会去通知Scheduler何时去执行对应Job。
new Trigger().startAt():表示触发器首次被触发的时间;
new Trigger().endAt():表示触发器结束触发的时间;
3.7.1 SimpleTrigger触发器
SimpleTrigger 对于设置和使用是最为简单的一种QuartzTrigger。
它是为那种需要在特定的日期/时间启动,且以一个可能的间隔时间重复执行 n 次的 Job 所设计的。
案例一:表示在一个指定的时间段内,执行一次作业任务;
SimpleTrigger 常用方法:
方法 | 说明 |
---|---|
startNow() | Scheduler 开始执行时,触发器也即执行 |
startAt(new Date()) | 在指定的时间开始执行 |
withIntervalInSeconds(2) | 执行间隔,方法名中对应时间单位 |
repeatForever() | 一直重复执行 |
withRepeatCount(3) | 重复执行指定的次数 |
endAt(new Date()) | 结束时间 |
例子:
//立即开始执行,2秒执行一次,重复3次,3秒后结束执行(当重复次数或者结束时间有一个先达到时,就会停止执行)
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("trigger1", "triggerGroup1")
.startNow()
.withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(2).withRepeatCount(3))
.endAt(new Date(new Date().getTime() + 3000L))
.build();
- SimpleTrigger的属性有:开始时间、结束时间、重复次数和重复的时间间隔。
- 重复次数属性的值可以为0、正整数、或常量 SimpleTrigger.REPEAT_INDEFINITELY。
- 重复的时间间隔属性值必须为大于0或者长整形的正整数,以毫秒作为时间单位,当重复的时间间隔为0时,意味着与Trigger同时触发执行。
- 如果有指定结束时间属性值,则结束时间属性优先于重复次数属性,这样的好处在于:当我们需要创建一个每间隔10秒触发一次直到指定的结束时间的Trigger,而无需去计算从开始到结束的所重复的次数,我们只需简单的指定结束时间和使用REPEAT_INDEFINITELY作为重复次数的属性值即可。
3.7.2 CronTrigger触发器
如果你需要像日历那样按日程来触发任务,而不是像 SimpleTrigger 那样每隔特定的间隔时间触发,CronTrigger 通常比 SimpleTrigger 更有用,因为它是基于日历的作业调度器。
使用CronTrigger ,你可以指定诸如“每个周五中午”,或者"每个工作日的9:30"”或者“从每个周一-、周三、周五的上午9 : 00到上午10 : 00之间每隔五分钟”这样日程安排来触发。甚至,像SimpleTrigger一样,CronTrigger也有一个startTIme以指定日程从什么时候开始,也有一个 (可选的) endTIme以指定何时日程不再继续。
Cron表达式被用来配置CronTrigger实例。Cron表达式是一个由7个子表达式组成的字符串。每个子表达式都描述了一个单独的日程细节。这些子表达式用空格分隔,分别表示:
Seconds:代表秒;可出现 , - * / 四个字符,有效范围为0-59的整数
Minutes:代表分钟;可出现 , - * / 四个字符,有效范围为0-59的整数
Hours:代表小时;可出现 , - * / 四个字符,有效范围为0-23的整数
Day:代表天;可出现 , - * / ? L W C 八个字符,有效范围为0-31的整数
Month:代表月;可出现 , - * / 四个字符,有效范围为1-12的整数或JAN-DEc
Week:代表周;可出现 , - * / ? L C # 四个字符,有效范围为1-7的整数或SUN-SAT两个范围。1表示星期天,2表示星期一, 依次类推
Year:代表年;可出现 , - * / 四个字符,有效范围为1970-2099年
:表示匹配该域的任意值,假如在Minutes域使用*,即表示每分钟都会触发事件。
?:只能用在Day和Week两个域。它表示不指定值,因为Day和Week互斥,当两个子表达式其中之一被指定了值以后,为了避免冲突,需要将另一个子表达式的值设"?"。
例如:想在每月的20日触发调度,不管20日到底是星期几,则只能使用如下写法: 13 13 15 20 * ?,其中最后一位只能用?,而不能使用*,如果使用*表示不管星期几都会触发,实际上并不是这样。
-:表示范围,例如在Minutes域使用5-20,表示从5分到20分钟每分钟触发一次
/:表示起始时间开始触发,然后每隔固定时间触发一次(指定数值的增量),例如在Minutes域使用5/20,则意味着5分钟触发一次,而25,45等分别触发一次.
,:表示列出枚举值值。例如:在Minutes域使用5,20,则意味着在5和20分每分钟触发一次。
L:表示最后,它是单词“last”的缩写,只能出现在Week和Day域,在Day子表达式中,“L”表示一个月的最后一天,在Week自表达式中,“L”表示一个星期的最后一天,也就是SAT(星期六);如果在“L”前有具体的内容,它就具有其他的含义了,例如:“6L”表示这个月的倒数第6天。
注意:在使用“L”参数时,不要指定列表或范围,因为这会导致问题
W:表示有效工作日(周一到周五),只能出现在Day域,系统将在离指定日期的最近的有效工作日触发事件。
例如:在Day使用5W,如果5日是星期六,则将在最近的工作日:星期五,即4日触发。如果5日是星期天,则在6日触发;
如果5日在星期一到星期五中的一天,则就在5日触发。另外一点,W的最近寻找不会跨过月份
LW:这两个字符可以连用,表示在某个月最后一个工作日,即最后一个星期五。
#:用于确定每个月第几个星期几,只能出现在Week域。例如在2#3,表示某月的第二个星期三。
在线生成cron:在线生成cron,或者直接百度 在线cron 也行
案例:
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("trigger1", "group1")
.withSchedule(CronScheduleBuilder.cronSchedule("0/5 * * * * ?")) // 日历
.build();
3.8 SchedulerFactory
Quartz以模块方式架构,因此,要使它运行,几个组件必须很好的咬合在一起。幸运的是,已经有了一些现存的助手可以完成这些工作。
所有的Scheduler实例由SchedulerFactory创建。
Quartz的三个核心概念:调度器、任务、触发器,三者之间的关系是:
大家都知道,一个作业,比较重要的三个要素就是Scheduler,JobDetail,Trigger;
而Trigger对于Job而言就好比一个驱动器,没有触发器来定时驱动作业,作业就无法运行;
对于Job而言,一个Job可以对应多个Trigger,但对于Trigger而言,一个Trigger只能对应一个Job,所以一个Trigger只能被指派给一个Job;
如果你需要一个更负责的触发计划,你可以创建多个Trigger并指派它们给同一个Job。
1. StdSchedulerFactory
Quartz 默认的 SchedulerFactory
- 使用一组参数(java.util.Properties)来创建和初始化Quartz调度器
- 配置参数一般存储在 quartz.properties 文件中
- 调用 getScheduler方法就能创建和初始化调度器对象
创建方法:
//静态方法
Scheduler defaultScheduler = StdSchedulerFactory.getDefaultScheduler();
//实例方法
StdSchedulerFactory stdSchedulerFactory = new StdSchedulerFactory();
Scheduler scheduler = stdSchedulerFactory.getScheduler();
2. DirectSchedulerFactory(了解)
DirectSchedulerFactory 是对 SchedulerFactory 的直接实现,通过它可以直接构建 Scheduler、ThreadPool 等
DirectSchedulerFactory directSchedulerFactory = DirectSchedulerFactory.getInstance();
Scheduler scheduler = directSchedulerFactory.getScheduler();
3.9 Scheduler 常用方法
scheduler.scheduleJob(jobDetail, trigger); //绑定jobDetail与trigger
scheduler.checkExists(JobKey.jobKey(name, group)) //检查JobDetail是否存在
scheduler.checkExists(TriggerKey.triggerKey(name, group)) //检查Trigger是否存在
scheduler.deleteJob(JobKey.jobKey(name, group)) //删除jobDetail
scheduler.triggerJob(JobKey.jobKey(name, group), dataMap) //立即执行一次指定的任务
scheduler.start(); //启动任务调度
scheduler.pauseJob(jobKey); //暂停指定的job
scheduler.standby(); //任务调度挂起,即暂停操作
scheduler.shutdown(); //关闭任务调度,同shutdown(false)
scheduler.shutdown(true); //表示等待所有正在执行的Job执行完毕之后,再关闭Scheduler
scheduler.shutdown(false); // 表示直接关闭Scheduler
4. Quartz.properties
默认路径:quartz-2.3.2 中的 org.quartz.quartz.properties
我们也可以在项目的资源下添加 quartz.properties 文件,去覆盖底层的配置文件。
#===============================================================
#Configure Main Scheduler Properties 调度器属性
#===============================================================
#调度器的实例名
org.quartz.scheduler.instanceName = QuartzScheduler
#调度器的实例ID,大多数情况设置为AUTO即可
org.quartz.scheduler.instanceId = AUTO
#===============================================================
#Configure ThreadPool 线程池属性
#===============================================================
#处理Job的线程个数,至少为1,但最多的话最好不要超过100,在多数机器上设置该值超过100的话显得相当不实用了,特别是在你的Job执行时间较长的情况下
org.quartz.threadPool.threadCount = 5
#线程的优先级,优先级别搞的线程比优先级别低的线程优先得到执行。最小为1,最大为10,默认为5
org.quartz.threadPool.threadPriority = 5
#一个实现了org.quartz.spi.threadPool接口的类,Quartz自带的线程池实现类是org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
#===============================================================
#Configure JobStore 作业存储设置
#===============================================================
#Job默认是存储在内存中的,即下面配置
org.quartz.jobStore.class = org.quartz.simpl.RAMJobStore
#===============================================================
#Configure Plugins 插件配置
#===============================================================
org.quartz.plugin.jobInitializer.class = org.quartz.plugins.xml.JobInitializationPlugin
org.quartz.plugin.jobInitializer.overWriteExistingJobs = true
org.quartz.plugin.jobInitializer.failOnFileNotFound = true
org.quartz.plugin.jobInitializer.validating=false
也可以编写程序代码操作quartz.properties文件的内容:
// 创建工厂实例
StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
// 创建配置工厂的属性的对象
Properties prop = new Properties();
prop.put(StdSchedulerFactory.PROP_THREAD_POOL_CLASS, "org.quartz.simpl.SimpleThreadPool");
prop.put("org.quartz.threadPool.threadCount", "5");
try {
// 加载上面定义的属性
schedulerFactory.initialize(prop);
Scheduler scheduler = schedulerFactory.getScheduler();
scheduler.start();
} catch (SchedulerException e) {
e.printStackTrace();
}
5. Quartz监听器
5.1. 概念
Quartz的监听器用于当任务调度中你所关注事件发生时,能够及时获取这一事件的通知。类似于任务执行过程中的邮件、短信类的提醒。Quartz监听器主要由JobListener、TriggerListener、SchedulerListener三种,顾名思义,分布表示任务、触发器、调度器对应的监听器。三者的使用方法类似,在开始介绍三种监听器之前,需要明确两个概念:全局监听器与非全局监听器,二者的区别在于:
- 全局监听器能够接收到所有的Job/Trigger的事件通知
- 而非全局监听器只能接收到在其上注册的Job或者Trigger的事件,不在其上注册的Job或Trigger则不会进行监听。
5.2 JobListener
任务调度过程中,与任务Job相关的事件包括:Job开始要执行的提示;Job执行完成的提示等。
public interface JobListener {
//用于获取该JobListener的名称。
public String getName();
//Scheduler在JobDetail将要被执行时调用这个方法。
public void jobToBeExecuted(JobExecutionContext context);
//Scheduler在JobDetail即将被执行,但又被TriggerListener否决时会调用该方法。
public void jobExecutionVetoed(JobExecutionContext context);
//Scheduler在JobDetail被执行之后调用这个方法。
public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException);
}
示例:
HelloJobListener.java
// 定义任务类
public class HelloJobListener implements Job {
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
// 输出当前时间
Date date = new Date();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateString = dateFormat.format(date);
// 工作内容
System.out.println("正在进行数据库的备份工作,备份数据库的时间是:" +dateString);
}
}
创建自定义的JobListener
MyJobListener.java
public class MyJobListener implements JobListener {
@Override
public String getName() {
String name = this.getClass().getSimpleName();
System.out.println("监听器的名称是:" +name);
return name;
}
@Override
public void jobToBeExecuted(JobExecutionContext context) {
String name = context.getJobDetail().getKey().getName();
System.out.println("Job的名称是:" +name + "Scheduler在JobDetail将要被执行时调用的方法");
}
@Override
public void jobExecutionVetoed(JobExecutionContext context) {
String name = context.getJobDetail().getKey().getName();
System.out.println("Job的名称是:" +name + "Scheduler在JobDetail即将被执行,但又被TriggerListener否决时会调用该方法");
}
@Override
public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) {
String name = context.getJobDetail().getKey().getName();
System.out.println("Job的名称是:" +name + "Scheduler在JobDetail被执行之后调用这个方法");
}
}
执行调度器
HelloSchedulerDemoJobListener.java
public class HelloSchedulerDemoJobListener {
public static void main(String[] args) throws Exception {
// 1、调度器(Scheduler),从工厂中获取调度的实例(默认:实例化new StdSchedulerFactory();)
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
// 2、任务实例(JobDetail)定义一个任务调度实例,将该实例与HelloJobSimpleTrigger绑定,任务类需要实现Job接口
JobDetail jobDetail = JobBuilder.newJob(HelloJobListener.class) // 加载任务类,与HelloJob完成绑定,要求HelloJob实现Job接口
.withIdentity("job1", "group1") // 参数1:任务的名称(唯一实例);参数2:任务组的名称
.build();
// 3、触发器(Trigger)定义触发器,马上执行,然后每5秒重复执行一次
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("trigger1", "group1") // 参数1:触发器的名称(唯一实例);参数2:触发器组的名称
.startNow()
.withSchedule(SimpleScheduleBuilder.simpleSchedule().repeatSecondlyForever(5).withRepeatCount(2)) // 每5秒执行一次,连续执行3次后停止,默认是0
.build();
// 4、让调度器关联任务和触发器,保证按照触发器定义的调整执行任务
scheduler.scheduleJob(jobDetail, trigger);
// 创建并注册一个全局的Job Listener
// scheduler.getListenerManager().addJobListener(new MyJobListener(), EverythingMatcher.allJobs());
// 创建并注册一个局部的Job Listener,表示指定的任务Job
scheduler.getListenerManager().addJobListener(new MyJobListener(), KeyMatcher.keyEquals(JobKey.jobKey("job1", "group1")));
// 5、启动
scheduler.start();
// 关闭
//scheduler.shutdown();
}
}
5.3 TriggerListener
任务调度过程中,与触发器Trigger相关的事件包括:触发器触发、触发器未正确触发、触发器完成等。
public interface TriggerListener {
//用于获取触发器的名称。
public String getName();
//当与监听器关联的Trigger被触发,Job上的Execute()方法将被执行时,Scheduler就调用该方法。
public void triggerFired(Trigger trigger, JobExecutionContext context);
//在Trigger触发后,Job将要执行时由Scheduler调用这个方法。TriggerListener给了一个选择去否决Job的执行。假如这个方法返回true,这个Job将不会为此次Trigger触发而得到执行。
public boolean vetoJobExecution(Trigger trigger, JobExecutionContext context);
//Scheduler调用这个方法是在Trigger错过触发时。你应该关注此方法中持续时间长的逻辑:在出现许多错过触发的Trigger时,长逻辑会导致骨牌效应。你应当保持这个方法尽量的小。
public void triggerMisfired(Trigger trigger);
//Trigger被触发并且完成了Job的执行时,Scheduler调用这个方法。
public void triggerComplete(Trigger trigger, JobExecutionContext context, CompletedExecutionInstruction triggerInstructionCode)
}
示例:
下面的例子简单展示了TriggerListener的使用,其中创建并注册TriggerListener与JobListener几乎类似。
HelloJobListener.java
// 定义任务类
public class HelloJobListener implements Job {
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
// 输出当前时间
Date date = new Date();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateString = dateFormat.format(date);
// 工作内容
System.out.println("正在进行数据库的备份工作,备份数据库的时间是:" +dateString);
}
}
MyTriggerListener.java
public class MyTriggerListener implements TriggerListener {
private String name;
// 构造方法,自定义传递触发器的名称,默认是类的名称
public MyTriggerListener(String name) {
super();
this.name = name;
}
@Override
public String getName() {
return this.name; // 不返还会抛出一个名称为空的异常
}
// @Override
// public String getName() {
// String name = this.getClass().getSimpleName();
// System.out.println("触发器的名称:" +name);
// return name; // 不返还会抛出一个名称为空的异常
// }
@Override
public void triggerFired(Trigger trigger, JobExecutionContext context) {
String name = this.getClass().getSimpleName();
System.out.println(name +"被触发");
}
@Override
public boolean vetoJobExecution(Trigger trigger, JobExecutionContext context) {
String name = this.getClass().getSimpleName();
// TriggerListener给了一个选择去否决Job的执行。假如这个方法返回true,这个Job将不会为此次Trigger触发而得到执行。
System.out.println(name +" 没有被触发");
return false; // true:表示不会执行Job的方法
}
@Override
public void triggerMisfired(Trigger trigger) {
String name = this.getClass().getSimpleName();
// Scheduler调用这个方法是在Trigger错过触发时
System.out.println(name +" 错过触发");
}
@Override
public void triggerComplete(Trigger trigger, JobExecutionContext context,
CompletedExecutionInstruction triggerInstructionCode) {
String name = this.getClass().getSimpleName();
// Trigger被触发并且完成了Job的执行时,Scheduler调用这个方法。
System.out.println(name +" 完成之后触发");
}
}
HelloSchedulerDemoTriggerListener.java
public class HelloSchedulerDemoTriggerListener {
public static void main(String[] args) throws Exception {
// 1、调度器(Scheduler),从工厂中获取调度的实例(默认:实例化new StdSchedulerFactory();)
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
// 2、任务实例(JobDetail)定义一个任务调度实例,将该实例与HelloJobSimpleTrigger绑定,任务类需要实现Job接口
JobDetail jobDetail = JobBuilder.newJob(HelloJobListener.class) // 加载任务类,与HelloJob完成绑定,要求HelloJob实现Job接口
.withIdentity("job1", "group1") // 参数1:任务的名称(唯一实例);参数2:任务组的名称
.build();
// 3、触发器(Trigger)定义触发器,马上执行,然后每5秒重复执行一次
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("trigger1", "group1") // 参数1:触发器的名称(唯一实例);参数2:触发器组的名称
.startNow()
.withSchedule(SimpleScheduleBuilder.simpleSchedule().repeatSecondlyForever(5).withRepeatCount(2)) // 每5秒执行一次,连续执行3次后停止,默认是0
.build();
// 4、让调度器关联任务和触发器,保证按照触发器定义的调整执行任务
scheduler.scheduleJob(jobDetail, trigger);
// 创建并注册一个全局的Trigger Listener
// scheduler.getListenerManager().addTriggerListener(new MyTriggerListener(), EverythingMatcher.allTriggers());
// 创建并注册一个局部的Trigger Listener
scheduler.getListenerManager().addTriggerListener(new MyTriggerListener(), KeyMatcher.keyEquals(TriggerKey.triggerKey("trigger1", "group1")));
// 5、启动
scheduler.start();
// 关闭
//scheduler.shutdown();
}
}
5.4 SchedulerListener
public interface SchedulerListener {
//用于部署JobDetail时调用
public void jobScheduled(Trigger trigger);
//用于卸载JobDetail时调用。
public void jobUnscheduled(TriggerKey triggerKey);
//当一个Trigger来到了再也不会触发的状态时调用这个方法。除非这个Job已设置成了持久性,否则它就会从Scheduler中移除。
public void triggerFinalized(Trigger trigger);
//Scheduler调用这个方法是发生在一个Trigger或Trigger组被暂停时。假如是Trigger组的话,triggerName参数将为null。
public void triggersPaused(String triggerGroup);
//Scheduler调用这个方法是发生在一个Trigger或Trigger组从暂停中恢复时。假如是Trigger组的话,triggerName参数将为null。
public void triggersResumed(String triggerGroup);
//当一个或一组JobDetail暂停时调用这个方法。
public void jobsPaused(String jobGroup);
//当一个或一组Job从暂停上恢复时调用这个方法。假如是一个Job组,jobName将为null。
public void jobsResumed(String jobGroup);
//在Scheduler的正常运行期间产生一个严重错误时调用这个方法。
public void schedulerError(String msg, SchedulerException cause);
//当Scheduler开启时,调用该方法。
public void schedulerStarted();
//当Scheduler处于StandBy模式时,调用该方法。
public void schedulerInStandbyMode();
//当Scheduler停止时,调用该方法。
public void schedulerShutdown();
//当Scheduler中的数据被清除时,调用该方法。
public void schedulingDataCleared()
}
示例:
下面的代码简单描述了如何使用SchedulerListener方法:
HelloJobListener.java
// 定义任务类
public class HelloJobListener implements Job {
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
// 输出当前时间
Date date = new Date();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateString = dateFormat.format(date);
// 工作内容
System.out.println("正在进行数据库的备份工作,备份数据库的时间是:" +dateString);
}
}
MySchedulerListener.java
public class MySchedulerListener implements SchedulerListener {
@Override
public void jobScheduled(Trigger trigger) {
String name = trigger.getKey().getName();
// 用于部署JobDetail时调用
System.out.println(name +" 完成部署");
}
@Override
public void jobUnscheduled(TriggerKey triggerKey) {
String name = triggerKey.getName();
// 用于卸载JobDetail时调用
System.out.println(name +" 完成卸载");
}
@Override
public void triggerFinalized(Trigger trigger) {
String name = trigger.getKey().getName();
// 当一个Trigger来到了再也不会触发的状态时调用这个方法。除非这个Job已设置成了持久性,否则它就会从Scheduler中移除。
System.out.println(name +" 触发器被移除");
}
@Override
public void triggerPaused(TriggerKey triggerKey) {
String name = triggerKey.getName();
// Scheduler调用这个方法是发生在一个Trigger或Trigger组被暂停时。假如是Trigger组的话,triggerName参数将为null。
System.out.println(name +" 正在被暂停");
}
@Override
public void triggersPaused(String triggerGroup) {
// Scheduler调用这个方法是发生在一个Trigger或Trigger组被暂停时。假如是Trigger组的话,triggerName参数将为null。
System.out.println("触发器组" +triggerGroup +" 正在被暂停");
}
@Override
public void triggerResumed(TriggerKey triggerKey) {
// Scheduler调用这个方法是发生在一个Trigger或Trigger组从暂停中恢复时。假如是Trigger组的话,triggerName参数将为null。参数将为null。
String name = triggerKey.getName();
System.out.println(name +" 正在从暂停中恢复");
}
@Override
public void triggersResumed(String triggerGroup) {
// Scheduler调用这个方法是发生在一个Trigger或Trigger组从暂停中恢复时。假如是Trigger组的话,triggerName参数将为null。参数将为null。
System.out.println("触发器组" +triggerGroup +" 正在从暂停中恢复");
}
@Override
public void jobAdded(JobDetail jobDetail) {
//
System.out.println(jobDetail.getKey() +" 添加工作任务");
}
@Override
public void jobDeleted(JobKey jobKey) {
//
System.out.println(jobKey +" 删除工作任务");
}
@Override
public void jobPaused(JobKey jobKey) {
//
System.out.println(jobKey +" 工作任务正在被暂停");
}
@Override
public void jobsPaused(String jobGroup) {
//
System.out.println("工作组" +jobGroup +" 正在被暂停");
}
@Override
public void jobResumed(JobKey jobKey) {
//
System.out.println(jobKey +" 正在从暂停中恢复");
}
@Override
public void jobsResumed(String jobGroup) {
//
System.out.println("工作组" +jobGroup +" 正在从暂停中恢复");
}
@Override
public void schedulerError(String msg, SchedulerException cause) {
// 在Scheduler的正常运行期间产生一个严重错误时调用这个方法。
System.out.println("产生严重错误的时候调用" +msg +" " +cause.getUnderlyingException());
}
@Override
public void schedulerInStandbyMode() {
// 当Scheduler处于StandBy模式时,调用该方法。
System.out.println("调度器被挂起模式的时候调用");
}
@Override
public void schedulerStarted() {
// 当Scheduler开启时,调用该方法
System.out.println("调度器开启的时候调用");
}
@Override
public void schedulerStarting() {
//
System.out.println("调度器正在开启的时候调用");
}
@Override
public void schedulerShutdown() {
//
System.out.println("调度器关闭的时候调用");
}
@Override
public void schedulerShuttingdown() {
//
System.out.println("调度器正在关闭的时候调用");
}
@Override
public void schedulingDataCleared() {
// 当Scheduler中的数据被清除时,调用该方法
System.out.println("调度器数据被清除的时候调用");
}
}
HelloSchedulerDemoTriggerListener.java
public class HelloSchedulerDemoTriggerListener {
public static void main(String[] args) throws Exception {
// 1、调度器(Scheduler),从工厂中获取调度的实例(默认:实例化new StdSchedulerFactory();)
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
// 2、任务实例(JobDetail)定义一个任务调度实例,将该实例与HelloJobSimpleTrigger绑定,任务类需要实现Job接口
JobDetail jobDetail = JobBuilder.newJob(HelloJobListener.class) // 加载任务类,与HelloJob完成绑定,要求HelloJob实现Job接口
.withIdentity("job1", "group1") // 参数1:任务的名称(唯一实例);参数2:任务组的名称
.build();
// 3、触发器(Trigger)定义触发器,马上执行,然后每5秒重复执行一次
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("trigger1", "group1") // 参数1:触发器的名称(唯一实例);参数2:触发器组的名称
.startNow()
.withSchedule(SimpleScheduleBuilder.simpleSchedule().repeatSecondlyForever(5).withRepeatCount(2)) // 每5秒执行一次,连续执行3次后停止,默认是0
.build();
// 4、让调度器关联任务和触发器,保证按照触发器定义的调整执行任务
scheduler.scheduleJob(jobDetail, trigger);
// 创建调度器的监听
scheduler.getListenerManager().addSchedulerListener(new MySchedulerListener());
// 移除对应的调度器的监听
// scheduler.getListenerManager().removeSchedulerListener(new MySchedulerListener());
// 5、启动
scheduler.start();
// 线程延迟7秒后关闭
Thread.sleep(7000L);
// 关闭
scheduler.shutdown();
}
}
6. 持久化到Mysql中
6.1 下载sql文件
Quartz 原码中有 sql 文件:sql文件 或者 sql文件备用
下载然后导入到数据库中,我这里使用的是 mysql5.7
6.2 引入依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.zyx</groupId>
<artifactId>quartz</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<exclusions>
<exclusion>
<!--排除自带的JDBC连接池-->
<groupId>com.mchange</groupId>
<artifactId>c3p0</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz-jobs</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!--定时任务需要依赖context模块-->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
</dependency>
<!--连接数据库-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!--下面的包可以替换为 mybatis 或者 mybatisPlus-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
</dependencies>
6.3 配置SchedulerFactory
配置数据源:
spring:
datasource:
username: root
password: root
url: jdbc:mysql://10.211.55.12:3306/test
driver-class-name: com.mysql.cj.jdbc.Driver
配置 SchedulerFactory:
@Configuration
public class ScheduleConfig {
@Bean
public SchedulerFactoryBean schedulerFactoryBean(DataSource dataSource)
{
SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setDataSource(dataSource);
// quartz参数
Properties prop = new Properties();
prop.put("org.quartz.scheduler.instanceName", "ZyxScheduler");
prop.put("org.quartz.scheduler.instanceId", "AUTO"); //如果使用集群,instanceId必须唯一,设置成AUTO
// 线程池配置
prop.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
prop.put("org.quartz.threadPool.threadCount", "20"); //线程数
prop.put("org.quartz.threadPool.threadPriority", "5"); //优先级
// JobStore配置
prop.put("org.quartz.jobStore.class", "org.quartz.impl.jdbcjobstore.JobStoreTX"); //配置使用数据库
// 集群配置
prop.put("org.quartz.jobStore.isClustered", "true"); //是否是集群模式
prop.put("org.quartz.jobStore.clusterCheckinInterval", "15000");
prop.put("org.quartz.jobStore.maxMisfiresToHandleAtATime", "1");
prop.put("org.quartz.jobStore.txIsolationLevelSerializable", "true");
// sqlserver 启用
// prop.put("org.quartz.jobStore.selectWithLockSQL", "SELECT * FROM {0}LOCKS UPDLOCK WHERE LOCK_NAME = ?");
prop.put("org.quartz.jobStore.misfireThreshold", "12000");
prop.put("org.quartz.jobStore.tablePrefix", "QRTZ_"); //数据库表前缀
factory.setQuartzProperties(prop);
factory.setSchedulerName("ZyxScheduler");
// 延时启动
factory.setStartupDelay(1);
factory.setApplicationContextSchedulerContextKey("applicationContextKey");
// 可选,QuartzScheduler
// 启动时更新己存在的Job,这样就不用每次修改targetObject后删除qrtz_job_details表对应记录了
factory.setOverwriteExistingJobs(true);
// 设置自动启动,默认为true
factory.setAutoStartup(true);
return factory;
}
}
使用自定义的Scheduler
定义一个简单的 Job:
@Data
public class HelloJob implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
System.out.println(new Date());
}
}
6.4 使用自定义的 Scheduler:
@SpringBootTest
class QuartzApplicationTests {
//将上面配置好的 factoryBean 注入进来
@Autowired
private SchedulerFactoryBean factoryBean;
@Test
void contextLoads() throws SchedulerException, InterruptedException {
Scheduler scheduler = factoryBean.getScheduler();
scheduler.clear();
//2. 任务实例(JobDetail)
JobDetail jobDetail = JobBuilder.newJob(HelloJob.class)
.withIdentity("job1", "jobGroup1")
.build();
//3. 触发器(Trigger)
Trigger trigger = TriggerBuilder.newTrigger()
.startNow()
.withIdentity("trigger1", "triggerGroup1")
.withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(2).repeatForever())
.build();
scheduler.scheduleJob(jobDetail, trigger);
scheduler.start();
Thread.sleep(100000);
}
}
6.5 查看数据库
查看数据库,可以发现 Quartz 中相关的数据已经保存到数据库中了
6.6 再次启动 Scheduler
直接让程序运行起来,不创建新的定时任务,会发现刚才保存到数据库中的定时任务会自动执行
@SpringBootTest
class QuartzApplicationTests {
@Autowired
private SchedulerFactoryBean factoryBean;
@Test
void contextLoads() throws SchedulerException, InterruptedException {
Thread.sleep(100000);
}
}
7. Quartz的分布式
关于分布式调度的二个核心问题:
- Quartz 如何向每台服务器分配资源
- Quartz怎么保证在一台挂机的情况下,保证任务不丢失
7.1 Quartz启动流程
若quartz是配置在spring中,当服务器启动时,就会装载相关的bean。SchedulerFactoryBean实现了InitializingBean接口,因此在初始化bean的时候,会执行afterPropertiesSet方法,该方法将会调用SchedulerFactory(DirectSchedulerFactory 或者 StdSchedulerFactory,通常用StdSchedulerFactory)创建Scheduler。SchedulerFactory在创建quartzScheduler的过程中,将会读取配置参数,初始化各个组件,关键组件如下:
ThreadPool: 一般是使用SimpleThreadPool,SimpleThreadPool创建了一定数量的WorkerThread实例来使得Job能够在线程中进行处理。WorkerThread是定义在SimpleThreadPool类中的内部类,它实质上就是一个线程。在SimpleThreadPool中有三个list:workers-存放池中所有的线程引用,availWorkers-存放所有空闲的线程,busyWorkers-存放所有工作中的线程;
线程池的配置参数如下所示:
- org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
- org.quartz.threadPool.threadCount=3
- org.quartz.threadPool.threadPriority=5
JobStore: 分为存储在内存的RAMJobStore和存储在数据库的JobStoreSupport(包括JobStoreTX和JobStoreCMT两种实现,JobStoreCMT是依赖于容器来进行事务的管理,而JobStoreTX是自己管理事务),若要使用集群要使用JobStoreSupport的方式;
QuartzSchedulerThread: 用来进行任务调度的线程,在初始化的时候paused=true,halted=false,虽然线程开始运行了,但是paused=true,线程会一直等待,直到start方法将paused置为false;
另外,SchedulerFactoryBean还实现了SmartLifeCycle接口,因此初始化完成后,会执行start()方法,该方法将主要会执行以下的几个动作:
- 创建ClusterManager线程并启动线程:该线程用来进行集群故障检测和处理;
- 创建MisfireHandler线程并启动线程:该线程用来进行misfire任务的处理;
- 置QuartzSchedulerThread的paused=false,调度线程才真正开始调度;
7.1 JobStore
Quartz将调度程序的所有的工作数据包括:Jobs,triggers,日历等信息,交由JobStore管理。JobStore的存储方式分为RAM和JDBC两种。群集模式下使用的JDBC。
下面是分析QuartzSchedulerThread源码解决Quartz 如何向每台服务器分配资源,中JobStore的主要作用函数
public interface JobStore {
// 此方法主要是申请将要执行任务:
// 1. 从triggers表中拉取状态为 WAITING 的 Trigger
// 2. Trigger状态从WAITING改为 ACQUIRED,
// 3. 如果2修改成功,则向 FIRED_TRIGGERS 表插入记录
// 此方法有两种方式保证任务的分配
// 1. 乐观锁,即任务从WAITING改为 ACQUIRED失败,则表示被别台机子分配,也可能ABA问题引起任务重复执行
// 2. 悲观锁, 步骤1前,通过Lock表进行上锁
List<OperableTrigger> acquireNextTriggers(long noLaterThan, int maxCount, long timeWindow);
// 此方法主要作用在Trigger的触发:
// 1. 将Trigger状态ACQUIRED, 将 FIRED_TRIGGERS状态改为 EXECUTING
// 2. 计算trigger的NEXT_FIRE_TIME
// 3. 如果 NEXT_FIRE_TIME 为空,trigger状态 变成 STATE_COMPLETE, 并不进行4步骤
// 4. 如果不允许并发执行,将 trigger状态 变成 BLOCKED, 否则 置为wait (这里就是导致上面ABA问题的主要原因)
List<TriggerFiredResult> triggersFired(List<OperableTrigger> triggers);
// 通过shellJob结果监听器 notifyJobStoreJobComplete进行回调
// 1. 如果任务正常, 则将FIRED_TRIGGERS记录删除
void triggeredJobComplete(OperableTrigger trigger, JobDetail jobDetail, Trigger.CompletedExecutionInstruction triggerInstCode);
}
下面是分析ClusterManager源码解决Quartz 保证任务不丢失
- 项目启动时,会这 SCHEDULER_STATE 表 插入数据
- 之后维护和 SCHEDULER_STATE 的心跳
- 如果发布哪台服务器掉线,则重置任务继续执行 @see JobStoreSupport.clusterRecover
8. 其他说明
8.1 并发执行
通过 @DisallowConcurrentExecution 注解来实现阻止并发。
Quartz定时任务默认都是并发执行的,不会等待上一次任务执行完毕,只要间隔时间到就会执行, 如果定时任执行太长,会长时间占用资源,导致其它任务堵塞。
@DisallowConcurrentExecution 禁止并发执行多个相同定义的JobDetail, 这个注解是加在Job类上的, 但意思并不是不能同时执行多个Job, 而是不能并发执行同一个Job Definition(由JobDetail定义), 但是可以同时执行多个不同的JobDetail。
举例说明,我们有一个Job类,叫做SayHelloJob, 并在这个Job上加了这个注解, 然后在这个Job上定义了很多个JobDetail, 如sayHelloToJoeJobDetail, sayHelloToMikeJobDetail, 那么当scheduler启动时, 不会并发执行多个sayHelloToJoeJobDetail或者sayHelloToMikeJobDetail, 但可以同时执行sayHelloToJoeJobDetail跟sayHelloToMikeJobDetail
@PersistJobDataAfterExecution 同样, 也是加在Job上。表示当正常执行完Job后, JobDataMap中的数据应该被改动, 以被下一次调用时用。
当使用 @PersistJobDataAfterExecution 注解时, 为了避免并发时, 存储数据造成混乱, 强烈建议把 @DisallowConcurrentExecution 注解也加上。
测试代码,设定的时间间隔为3秒,但job执行时间是5秒,设置 @DisallowConcurrentExecution以 后程序会等任务执行完毕以后再去执行,否则会在3秒时再启用新的线程执行。
9. SpringBoot 整合
9.1 环境准备
从 springboot 2.4.10 开始,添加 quartz 的 maven 依赖:
<!-- Quartz 任务调度 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
application 配置文件:
# 开发环境配置
server:
# 服务器的HTTP端口
port: 80
servlet:
# 应用的访问路径
context-path: /
tomcat:
# tomcat的URI编码
uri-encoding: UTF-8
spring:
datasource:
username: root
password: root
url: jdbc:mysql://127.0.0.1:3306/quartz?useUnicode=true&characterEncoding=utf-8&useSSL=true
driver-class-name: com.mysql.cj.jdbc.Driver
# HikariPool 较佳配置
hikari:
# 客户端(即您)等待来自池的连接的最大毫秒数
connection-timeout: 60000
# 控制将测试连接的活动性的最长时间
validation-timeout: 3000
# 控制允许连接在池中保持空闲状态的最长时间
idle-timeout: 60000
login-timeout: 5
# 控制池中连接的最大生存期
max-lifetime: 60000
# 控制允许池达到的最大大小,包括空闲和使用中的连接
maximum-pool-size: 10
# 控制HikariCP尝试在池中维护的最小空闲连接数
minimum-idle: 10
# 控制默认情况下从池获得的连接是否处于只读模式
read-only: false
Quartz 自带有数据库模式,脚本都是现成的:
下载这个脚本:sql文件 或者 sql文件备用
最后准备一个任务方法:
@Slf4j
@Component("mysqlJob")
public class MysqlJob {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
public void execute(String param) {
logger.info("执行 Mysql Job,当前时间:{},任务参数:{}", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), param);
}
}
9.2 核心代码
ScheduleConfig 配置代码类:
@Configuration
public class ScheduleConfig {
@Bean
public SchedulerFactoryBean schedulerFactoryBean(DataSource dataSource) {
SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setDataSource(dataSource);
// quartz参数
Properties prop = new Properties();
prop.put("org.quartz.scheduler.instanceName", "shivaScheduler");
prop.put("org.quartz.scheduler.instanceId", "AUTO");
// 线程池配置
prop.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
prop.put("org.quartz.threadPool.threadCount", "20");
prop.put("org.quartz.threadPool.threadPriority", "5");
// JobStore配置
prop.put("org.quartz.jobStore.class", "org.quartz.impl.jdbcjobstore.JobStoreTX");
// 集群配置
prop.put("org.quartz.jobStore.isClustered", "true");
prop.put("org.quartz.jobStore.clusterCheckinInterval", "15000");
prop.put("org.quartz.jobStore.maxMisfiresToHandleAtATime", "1");
prop.put("org.quartz.jobStore.txIsolationLevelSerializable", "true");
// sqlserver 启用
// prop.put("org.quartz.jobStore.selectWithLockSQL", "SELECT * FROM {0}LOCKS UPDLOCK WHERE LOCK_NAME = ?");
prop.put("org.quartz.jobStore.misfireThreshold", "12000");
prop.put("org.quartz.jobStore.tablePrefix", "QRTZ_");
factory.setQuartzProperties(prop);
factory.setSchedulerName("shivaScheduler");
// 延时启动
factory.setStartupDelay(1);
factory.setApplicationContextSchedulerContextKey("applicationContextKey");
// 可选,QuartzScheduler
// 启动时更新己存在的Job,这样就不用每次修改targetObject后删除qrtz_job_details表对应记录了
factory.setOverwriteExistingJobs(true);
// 设置自动启动,默认为true
factory.setAutoStartup(true);
return factory;
}
}
ScheduleUtils 调度工具类,这是本篇中最核心的代码:
public class ScheduleUtils {
/**
* 得到quartz任务类
*
* @param job 执行计划
* @return 具体执行任务类
*/
private static Class<? extends Job> getQuartzJobClass(QuartzJob job) {
boolean isConcurrent = "0".equals(job.getConcurrent());
return isConcurrent ? QuartzJobExecution.class : QuartzDisallowConcurrentExecution.class;
}
/**
* 构建任务触发对象
*/
public static TriggerKey getTriggerKey(Long jobId, String jobGroup) {
return TriggerKey.triggerKey(ScheduleConstants.TASK_CLASS_NAME + jobId, jobGroup);
}
/**
* 构建任务键对象
*/
public static JobKey getJobKey(Long jobId, String jobGroup) {
return JobKey.jobKey(ScheduleConstants.TASK_CLASS_NAME + jobId, jobGroup);
}
/**
* 创建定时任务
*/
public static void createScheduleJob(Scheduler scheduler, QuartzJob job) throws Exception {
Class<? extends Job> jobClass = getQuartzJobClass(job);
// 构建job信息
Long jobId = job.getJobId();
String jobGroup = job.getJobGroup();
JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(getJobKey(jobId, jobGroup)).build();
// 表达式调度构建器
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression());
cronScheduleBuilder = handleCronScheduleMisfirePolicy(job, cronScheduleBuilder);
// 按新的cronExpression表达式构建一个新的trigger
CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(getTriggerKey(jobId, jobGroup))
.withSchedule(cronScheduleBuilder).build();
// 放入参数,运行时的方法可以获取
jobDetail.getJobDataMap().put(ScheduleConstants.TASK_PROPERTIES, job);
// 判断是否存在
if (scheduler.checkExists(getJobKey(jobId, jobGroup))) {
// 防止创建时存在数据问题 先移除,然后在执行创建操作
scheduler.deleteJob(getJobKey(jobId, jobGroup));
}
scheduler.scheduleJob(jobDetail, trigger);
// 暂停任务
if (job.getStatus().equals(ScheduleConstants.Status.PAUSE.getValue())) {
scheduler.pauseJob(ScheduleUtils.getJobKey(jobId, jobGroup));
}
}
/**
* 设置定时任务策略
*/
public static CronScheduleBuilder handleCronScheduleMisfirePolicy(QuartzJob job, CronScheduleBuilder cb)
throws Exception {
switch (job.getMisfirePolicy()) {
case ScheduleConstants.MISFIRE_DEFAULT:
return cb;
case ScheduleConstants.MISFIRE_IGNORE_MISFIRES:
return cb.withMisfireHandlingInstructionIgnoreMisfires();
case ScheduleConstants.MISFIRE_FIRE_AND_PROCEED:
return cb.withMisfireHandlingInstructionFireAndProceed();
case ScheduleConstants.MISFIRE_DO_NOTHING:
return cb.withMisfireHandlingInstructionDoNothing();
default:
throw new Exception("The task misfire policy '" + job.getMisfirePolicy()
+ "' cannot be used in cron schedule tasks");
}
}
}
这里可以看到,在完成任务与触发器的关联后,如果是暂停状态,会先让调度器停止任务。
AbstractQuartzJob 抽象任务:
public abstract class AbstractQuartzJob implements Job {
private static final Logger log = LoggerFactory.getLogger(AbstractQuartzJob.class);
/**
* 线程本地变量
*/
private static ThreadLocal<Date> threadLocal = new ThreadLocal<>();
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
QuartzJob job = new QuartzJob();
BeanUtils.copyBeanProp(job, context.getMergedJobDataMap().get(ScheduleConstants.TASK_PROPERTIES));
try {
before(context, job);
if (job != null) {
doExecute(context, job);
}
after(context, job, null);
} catch (Exception e) {
log.error("任务执行异常 - :", e);
after(context, job, e);
}
}
/**
* 执行前
*
* @param context 工作执行上下文对象
* @param job 系统计划任务
*/
protected void before(JobExecutionContext context, QuartzJob job) {
threadLocal.set(new Date());
}
/**
* 执行后
*
* @param context 工作执行上下文对象
* @param sysJob 系统计划任务
*/
protected void after(JobExecutionContext context, QuartzJob sysJob, Exception e) {
}
/**
* 执行方法,由子类重载
*
* @param context 工作执行上下文对象
* @param job 系统计划任务
* @throws Exception 执行过程中的异常
*/
protected abstract void doExecute(JobExecutionContext context, QuartzJob job) throws Exception;
}
这个类将原本 execute 方法执行的任务,下放到了子类重载的 doExecute 方法中
同时准备实现,分了允许并发和不允许并发,差别就是一个注解:
public class QuartzJobExecution extends AbstractQuartzJob {
@Override
protected void doExecute(JobExecutionContext context, QuartzJob job) throws Exception {
JobInvokeUtil.invokeMethod(job);
}
}
@DisallowConcurrentExecution
public class QuartzDisallowConcurrentExecution extends AbstractQuartzJob {
@Override
protected void doExecute(JobExecutionContext context, QuartzJob job) throws Exception {
JobInvokeUtil.invokeMethod(job);
}
}
最后由 JobInvokeUtil 通过反射,进行实际的方法调用:
public class JobInvokeUtil {
/**
* 执行方法
*
* @param job 系统任务
*/
public static void invokeMethod(QuartzJob job) throws Exception {
String invokeTarget = job.getInvokeTarget();
String beanName = getBeanName(invokeTarget);
String methodName = getMethodName(invokeTarget);
List<Object[]> methodParams = getMethodParams(invokeTarget);
if (!isValidClassName(beanName)) {
Object bean = SpringUtils.getBean(beanName);
invokeMethod(bean, methodName, methodParams);
} else {
Object bean = Class.forName(beanName).newInstance();
invokeMethod(bean, methodName, methodParams);
}
}
/**
* 调用任务方法
*
* @param bean 目标对象
* @param methodName 方法名称
* @param methodParams 方法参数
*/
private static void invokeMethod(Object bean, String methodName, List<Object[]> methodParams)
throws NoSuchMethodException, SecurityException, IllegalAccessException, IllegalArgumentException,
InvocationTargetException {
if (StringUtils.isNotNull(methodParams) && methodParams.size() > 0) {
Method method = bean.getClass().getDeclaredMethod(methodName, getMethodParamsType(methodParams));
method.invoke(bean, getMethodParamsValue(methodParams));
} else {
Method method = bean.getClass().getDeclaredMethod(methodName);
method.invoke(bean);
}
}
/**
* 校验是否为为class包名
*
* @param invokeTarget 名称
* @return true是 false否
*/
public static boolean isValidClassName(String invokeTarget) {
return StringUtils.countMatches(invokeTarget, ".") > 1;
}
/**
* 获取bean名称
*
* @param invokeTarget 目标字符串
* @return bean名称
*/
public static String getBeanName(String invokeTarget) {
String beanName = StringUtils.substringBefore(invokeTarget, "(");
return StringUtils.substringBeforeLast(beanName, ".");
}
/**
* 获取bean方法
*
* @param invokeTarget 目标字符串
* @return method方法
*/
public static String getMethodName(String invokeTarget) {
String methodName = StringUtils.substringBefore(invokeTarget, "(");
return StringUtils.substringAfterLast(methodName, ".");
}
/**
* 获取method方法参数相关列表
*
* @param invokeTarget 目标字符串
* @return method方法相关参数列表
*/
public static List<Object[]> getMethodParams(String invokeTarget) {
String methodStr = StringUtils.substringBetween(invokeTarget, "(", ")");
if (StringUtils.isEmpty(methodStr)) {
return null;
}
String[] methodParams = methodStr.split(",");
List<Object[]> classs = new LinkedList<>();
for (int i = 0; i < methodParams.length; i++) {
String str = StringUtils.trimToEmpty(methodParams[i]);
// String字符串类型,包含'
if (StringUtils.contains(str, "'")) {
classs.add(new Object[]{StringUtils.replace(str, "'", ""), String.class});
}
// boolean布尔类型,等于true或者false
else if (StringUtils.equals(str, "true") || StringUtils.equalsIgnoreCase(str, "false")) {
classs.add(new Object[]{Boolean.valueOf(str), Boolean.class});
}
// long长整形,包含L
else if (StringUtils.containsIgnoreCase(str, "L")) {
classs.add(new Object[]{Long.valueOf(StringUtils.replaceIgnoreCase(str, "L", "")), Long.class});
}
// double浮点类型,包含D
else if (StringUtils.containsIgnoreCase(str, "D")) {
classs.add(new Object[]{Double.valueOf(StringUtils.replaceIgnoreCase(str, "D", "")), Double.class});
}
// 其他类型归类为整形
else {
classs.add(new Object[]{Integer.valueOf(str), Integer.class});
}
}
return classs;
}
/**
* 获取参数类型
*
* @param methodParams 参数相关列表
* @return 参数类型列表
*/
public static Class<?>[] getMethodParamsType(List<Object[]> methodParams) {
Class<?>[] classs = new Class<?>[methodParams.size()];
int index = 0;
for (Object[] os : methodParams) {
classs[index] = (Class<?>) os[1];
index++;
}
return classs;
}
/**
* 获取参数值
*
* @param methodParams 参数相关列表
* @return 参数值列表
*/
public static Object[] getMethodParamsValue(List<Object[]> methodParams) {
Object[] classs = new Object[methodParams.size()];
int index = 0;
for (Object[] os : methodParams) {
classs[index] = (Object) os[0];
index++;
}
return classs;
}
}
启动程序后可以看到,调度器已经启动:
2022-08-06 16:26:05.162 INFO 10764 --- [shivaScheduler]] o.s.s.quartz.SchedulerFactoryBean : Starting Quartz Scheduler now, after delay of 1 seconds
2022-08-06 16:26:05.306 INFO 10764 --- [shivaScheduler]] org.quartz.core.QuartzScheduler : Scheduler shivaScheduler_$_DESKTOP-OKMJ1351633508761366 started.
9.3 新增调度任务
添加任务,使用如下 json 进行请求:
{
"concurrent": "1",
"cronExpression": "0/10 * * * * ?",
"invokeTarget": "mysqlJob.execute('got it!!!')",
"jobGroup": "mysqlGroup",
"jobId": 9,
"jobName": "新增 mysqlJob 任务",
"misfirePolicy": "1",
"remark": "",
"status": "0"
}
@Override
@Transactional(rollbackFor = Exception.class)
public int insertJob(QuartzJob job) throws Exception {
// 先将任务设置为暂停状态
job.setStatus(ScheduleConstants.Status.PAUSE.getValue());
int rows = quartzMapper.insert(job);
if (rows > 0) {
ScheduleUtils.createScheduleJob(scheduler, job);
}
return rows;
}
先将任务设置为暂停状态,数据库插入成功后,在调度器创建任务。
再手动启动任务,根据 ID 来启动任务:
实现代码:
@Override
public int changeStatus(Long jobId, String status) throws SchedulerException {
int rows = quartzMapper.changeStatus(jobId, status);
if (rows == 0) {
return rows;
}
//更新成功,需要改调度器内任务的状态
//拿到整个任务
QuartzJob job = quartzMapper.selectJobById(jobId);
//根据状态来启动或者关闭
if (ScheduleConstants.Status.NORMAL.getValue().equals(status)) {
rows = resumeJob(job);
} else if (ScheduleConstants.Status.PAUSE.getValue().equals(status)) {
rows = pauseJob(job);
}
return rows;
}
@Override
public int resumeJob(QuartzJob job) throws SchedulerException {
Long jobId = job.getJobId();
String jobGroup = job.getJobGroup();
job.setStatus(ScheduleConstants.Status.NORMAL.getValue());
int rows = quartzMapper.updateById(job);
if (rows > 0) {
scheduler.resumeJob(ScheduleUtils.getJobKey(jobId, jobGroup));
}
return rows;
}
暂停任务的代码也相同。
调用启动后可以看到控制台打印日志:
2022-08-06 20:36:30.018 INFO 8536 --- [eduler_Worker-3] cn.shiva.quartz.job.MysqlJob : 执行 Mysql Job,当前时间:2022-08-06 20:36:30,任务参数:got it!!!
2022-08-06 20:36:40.016 INFO 8536 --- [eduler_Worker-4] cn.shiva.quartz.job.MysqlJob : 执行 Mysql Job,当前时间:2022-08-06 20:36:40,任务参数:got it!!!
2022-08-06 20:36:50.017 INFO 8536 --- [eduler_Worker-5] cn.shiva.quartz.job.MysqlJob : 执行 Mysql Job,当前时间:2022-08-06 20:36:50,任务参数:got it!!!
如果涉及到任务修改,需要在调度器先删除原有任务,重新创建调度任务。
9.4 启动初始化任务
这部分倒是比较简单,初始化的时候清空原有任务,重新创建就好了:
/**
* 项目启动时,初始化定时器 主要是防止手动修改数据库导致未同步到定时任务处理(注:不能手动修改数据库ID和任务组名,否则会导致脏数据)
*/
@PostConstruct
public void init() throws Exception {
scheduler.clear();
List<QuartzJob> jobList = quartzMapper.selectJobAll();
for (QuartzJob job : jobList) {
ScheduleUtils.createScheduleJob(scheduler, job);
}
}
9.5 阻止特定时间运行
通过调度器实现的:
//2014-8-15这一天不执行任何任务
Calendar c = new GregorianCalendar(2014, 7, 15);
cal.setDayExcluded(c, true);
scheduler.addCalendar("exclude", cal, false, false);
//...中间省略
TriggerBuilder.newTrigger().modifiedByCalendar("exclude")....
9.6 Spring + Quartz 实现企业级调度的实现示例
9.6.1 Maven 引入
9.6.2 数据库脚本准备
在Quartz包下docs/dbTables,选择对应的数据库脚本,创建相应的数据库表即可,我用的是mysql5.6,这里有一个需要注意的地方,mysql5.5之前用的表存储引擎是MyISAM,使用的是表级锁,锁发生冲突的概率比较高,并发度低;5.6之后默认的存储引擎为InnoDB,InnoDB采用的锁机制是行级锁,并发度也较高。而quartz集群使用数据库锁的机制来实现同一个任务在同一个时刻只被实例执行,所以为了防止冲突,我们建表的时候要选取InnoDB作为表的存储引擎。
9.6.3 关键代码及配置
1、spring-quartz.xml 配置 在application.xml 文件中引入
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
<!-- 注册本地调度任务
<bean id="localQuartzScheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean"></bean>-->
<!-- 注册集群调度任务 -->
<bean id="schedulerFactoryBean" lazy-init="false" autowire="no"
class="org.springframework.scheduling.quartz.SchedulerFactoryBean" destroy-method="destroy">
<!--可选,QuartzScheduler 启动时更新己存在的Job,这样就不用每次修改targetObject后删除qrtz_job_details表对应记录了 -->
<property name="overwriteExistingJobs" value="true" />
<!--必须的,QuartzScheduler 延时启动,应用启动完后 QuartzScheduler 再启动 -->
<property name="startupDelay" value="3" />
<!-- 设置自动启动 -->
<property name="autoStartup" value="true" />
<property name="applicationContextSchedulerContextKey" value="applicationContext" />
<property name="configLocation" value="classpath:quartz.properties" />
</bean>
</beans>
2、quartz.properties 文件配置
#==============================================================
#Configure Main Scheduler Properties
#==============================================================
org.quartz.scheduler.instanceName = KuanrfGSQuartzScheduler
org.quartz.scheduler.instanceId = AUTO
#==============================================================
#Configure JobStore
#==============================================================
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.tablePrefix = QRTZ_
org.quartz.jobStore.isClustered = true
org.quartz.jobStore.clusterCheckinInterval = 20000
org.quartz.jobStore.dataSource = myDS
org.quartz.jobStore.maxMisfiresToHandleAtATime = 1
org.quartz.jobStore.misfireThreshold = 120000
org.quartz.jobStore.txIsolationLevelSerializable = false
#==============================================================
#Configure DataSource
#==============================================================
org.quartz.dataSource.myDS.driver = com.mysql.jdbc.Driver
org.quartz.dataSource.myDS.URL = 你的数据链接
org.quartz.dataSource.myDS.user = 用户名
org.quartz.dataSource.myDS.password = 密码
org.quartz.dataSource.myDS.maxConnections = 30
org.quartz.jobStore.selectWithLockSQL = SELECT * FROM {0}LOCKS WHERE LOCK_NAME = ? FOR UPDATE
#==============================================================
#Configure ThreadPool
#==============================================================
org.quartz.threadPool.class= org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount= 10
org.quartz.threadPool.threadPriority= 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread= true
#==============================================================
#Skip Check Update
#update:true
#not update:false
#==============================================================
org.quartz.scheduler.skipUpdateCheck = true
#============================================================================
# Configure Plugins
#============================================================================
org.quartz.plugin.triggHistory.class = org.quartz.plugins.history.LoggingJobHistoryPlugin
org.quartz.plugin.shutdownhook.class = org.quartz.plugins.management.ShutdownHookPlugin
org.quartz.plugin.shutdownhook.cleanShutdown = true
3、关键代码
JobTaskService
package com.netease.ad.omp.service.sys;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Set;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import com.netease.ad.omp.common.utils.SpringUtils;
import com.netease.ad.omp.dao.sys.mapper.ScheduleJobMapper;
import com.netease.ad.omp.entity.sys.ScheduleJob;
import com.netease.ad.omp.quartz.job.JobUtils;
import com.netease.ad.omp.quartz.job.MyDetailQuartzJobBean;
import com.netease.ad.omp.quartz.job.QuartzJobFactory;
import com.netease.ad.omp.quartz.job.QuartzJobFactoryDisallowConcurrentExecution;
import org.apache.log4j.Logger;
import org.quartz.CronScheduleBuilder;
import org.quartz.CronTrigger;
import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.TriggerKey;
import org.quartz.impl.matchers.GroupMatcher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.stereotype.Service;
/**
* 计划任务管理
*/
@Service
public class JobTaskService {
public final Logger log = Logger.getLogger(this.getClass());
@Autowired
private SchedulerFactoryBean schedulerFactoryBean;
@Autowired
private ScheduleJobMapper scheduleJobMapper;
/**
* 从数据库中取 区别于getAllJob
*
* @return
*/
public List<ScheduleJob> getAllTask() {
return scheduleJobMapper.select(null);
}
/**
* 添加到数据库中 区别于addJob
*/
public void addTask(ScheduleJob job) {
job.setCreateTime(new Date());
scheduleJobMapper.insertSelective(job);
}
/**
* 从数据库中查询job
*/
public ScheduleJob getTaskById(Long jobId) {
return scheduleJobMapper.selectByPrimaryKey(jobId);
}
/**
* 更改任务状态
*
* @throws SchedulerException
*/
public void changeStatus(Long jobId, String cmd) throws SchedulerException {
ScheduleJob job = getTaskById(jobId);
if (job == null) {
return;
}
if ("stop".equals(cmd)) {
deleteJob(job);
job.setJobStatus(JobUtils.STATUS_NOT_RUNNING);
} else if ("start".equals(cmd)) {
job.setJobStatus(JobUtils.STATUS_RUNNING);
addJob(job);
}
scheduleJobMapper.updateByPrimaryKeySelective(job);
}
/**
* 更改任务 cron表达式
*
* @throws SchedulerException
*/
public void updateCron(Long jobId, String cron) throws SchedulerException {
ScheduleJob job = getTaskById(jobId);
if (job == null) {
return;
}
job.setCronExpression(cron);
if (JobUtils.STATUS_RUNNING.equals(job.getJobStatus())) {
updateJobCron(job);
}
scheduleJobMapper.updateByPrimaryKeySelective(job);
}
/**
* 添加任务
*
* @throws SchedulerException
*/
public void addJob(ScheduleJob job) throws SchedulerException {
if (job == null || !JobUtils.STATUS_RUNNING.equals(job.getJobStatus())) {
return;
}
Scheduler scheduler = schedulerFactoryBean.getScheduler();
log.debug(scheduler + ".......................................................................................add");
TriggerKey triggerKey = TriggerKey.triggerKey(job.getJobName(), job.getJobGroup());
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
// 不存在,创建一个
if (null == trigger) {
Class clazz = JobUtils.CONCURRENT_IS.equals(job.getIsConcurrent()) ? QuartzJobFactory.class : QuartzJobFactoryDisallowConcurrentExecution.class;
JobDetail jobDetail = JobBuilder.newJob(clazz).withIdentity(job.getJobName(), job.getJobGroup()).build();
jobDetail.getJobDataMap().put("scheduleJob", job);
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression());
trigger = TriggerBuilder.newTrigger().withIdentity(job.getJobName(), job.getJobGroup()).withSchedule(scheduleBuilder).build();
scheduler.scheduleJob(jobDetail, trigger);
} else {
// Trigger已存在,那么更新相应的定时设置
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression());
// 按新的cronExpression表达式重新构建trigger
trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();
// 按新的trigger重新设置job执行
scheduler.rescheduleJob(triggerKey, trigger);
}
}
@PostConstruct
public void init() throws Exception {
// 这里获取任务信息数据
List<ScheduleJob> jobList = scheduleJobMapper.select(null);
for (ScheduleJob job : jobList) {
addJob(job);
}
}
/**
* 获取所有计划中的任务列表
*
* @return
* @throws SchedulerException
*/
public List<ScheduleJob> getAllJob() throws SchedulerException {
Scheduler scheduler = schedulerFactoryBean.getScheduler();
GroupMatcher<JobKey> matcher = GroupMatcher.anyJobGroup();
Set<JobKey> jobKeys = scheduler.getJobKeys(matcher);
List<ScheduleJob> jobList = new ArrayList<ScheduleJob>();
for (JobKey jobKey : jobKeys) {
List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
for (Trigger trigger : triggers) {
ScheduleJob job = new ScheduleJob();
job.setJobName(jobKey.getName());
job.setJobGroup(jobKey.getGroup());
job.setDescription("触发器:" + trigger.getKey());
Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
job.setJobStatus(triggerState.name());
if (trigger instanceof CronTrigger) {
CronTrigger cronTrigger = (CronTrigger) trigger;
String cronExpression = cronTrigger.getCronExpression();
job.setCronExpression(cronExpression);
}
jobList.add(job);
}
}
return jobList;
}
/**
* 所有正在运行的job
*
* @return
* @throws SchedulerException
*/
public List<ScheduleJob> getRunningJob() throws SchedulerException {
Scheduler scheduler = schedulerFactoryBean.getScheduler();
List<JobExecutionContext> executingJobs = scheduler.getCurrentlyExecutingJobs();
List<ScheduleJob> jobList = new ArrayList<ScheduleJob>(executingJobs.size());
for (JobExecutionContext executingJob : executingJobs) {
ScheduleJob job = new ScheduleJob();
JobDetail jobDetail = executingJob.getJobDetail();
JobKey jobKey = jobDetail.getKey();
Trigger trigger = executingJob.getTrigger();
job.setJobName(jobKey.getName());
job.setJobGroup(jobKey.getGroup());
job.setDescription("触发器:" + trigger.getKey());
Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
job.setJobStatus(triggerState.name());
if (trigger instanceof CronTrigger) {
CronTrigger cronTrigger = (CronTrigger) trigger;
String cronExpression = cronTrigger.getCronExpression();
job.setCronExpression(cronExpression);
}
jobList.add(job);
}
return jobList;
}
/**
* 暂停一个job
*
* @param scheduleJob
* @throws SchedulerException
*/
public void pauseJob(ScheduleJob scheduleJob) throws SchedulerException {
Scheduler scheduler = schedulerFactoryBean.getScheduler();
JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
scheduler.pauseJob(jobKey);
}
/**
* 恢复一个job
*
* @param scheduleJob
* @throws SchedulerException
*/
public void resumeJob(ScheduleJob scheduleJob) throws SchedulerException {
Scheduler scheduler = schedulerFactoryBean.getScheduler();
JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
scheduler.resumeJob(jobKey);
}
/**
* 删除一个job
*
* @param scheduleJob
* @throws SchedulerException
*/
public void deleteJob(ScheduleJob scheduleJob) throws SchedulerException {
Scheduler scheduler = schedulerFactoryBean.getScheduler();
JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
scheduler.deleteJob(jobKey);
}
/**
* 立即执行job
*
* @param scheduleJob
* @throws SchedulerException
*/
public void runAJobNow(ScheduleJob scheduleJob) throws SchedulerException {
Scheduler scheduler = schedulerFactoryBean.getScheduler();
JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
scheduler.triggerJob(jobKey);
}
/**
* 更新job时间表达式
*
* @param scheduleJob
* @throws SchedulerException
*/
public void updateJobCron(ScheduleJob scheduleJob) throws SchedulerException {
Scheduler scheduler = schedulerFactoryBean.getScheduler();
TriggerKey triggerKey = TriggerKey.triggerKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(scheduleJob.getCronExpression());
trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();
scheduler.rescheduleJob(triggerKey, trigger);
}
public static void main(String[] args) {
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule("xxxxx");
}
}
JobUtils
package com.netease.ad.omp.quartz.job;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import com.netease.ad.omp.common.utils.SpringUtils;
import com.netease.ad.omp.entity.sys.ScheduleJob;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
import org.quartz.JobExecutionContext;
import org.springframework.context.ApplicationContext;
/**
* Created with IntelliJ IDEA
* Class Description:定时任务工具类
*/
public class JobUtils {
public final static Logger log = Logger.getLogger(JobUtils.class);
public static final String STATUS_RUNNING = "1"; //启动状态
public static final String STATUS_NOT_RUNNING = "0"; //未启动状态
public static final String CONCURRENT_IS = "1";
public static final String CONCURRENT_NOT = "0";
private ApplicationContext ctx;
/**
* 通过反射调用scheduleJob中定义的方法
*
* @param scheduleJob
*/
public static void invokMethod(ScheduleJob scheduleJob,JobExecutionContext context) {
Object object = null;
Class clazz = null;
if (StringUtils.isNotBlank(scheduleJob.getSpringId())) {
object = SpringUtils.getBean(scheduleJob.getSpringId());
} else if (StringUtils.isNotBlank(scheduleJob.getBeanClass())) {
try {
clazz = Class.forName(scheduleJob.getBeanClass());
object = clazz.newInstance();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (object == null) {
log.error("任务名称 = [" + scheduleJob.getJobName() + "]---------------未启动成功,请检查是否配置正确!!!");
return;
}
clazz = object.getClass();
Method method = null;
try {
method = clazz.getMethod(scheduleJob.getMethodName(), new Class[] {JobExecutionContext.class});
} catch (NoSuchMethodException e) {
log.error("任务名称 = [" + scheduleJob.getJobName() + "]---------------未启动成功,方法名设置错误!!!");
} catch (SecurityException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if (method != null) {
try {
method.invoke(object, new Object[] {context});
} catch (IllegalAccessException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IllegalArgumentException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InvocationTargetException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
log.info("任务名称 = [" + scheduleJob.getJobName() + "]----------启动成功");
}
}
QuartzJobFactory
package com.netease.ad.omp.quartz.job;
import com.netease.ad.omp.entity.sys.ScheduleJob;
import org.apache.log4j.Logger;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
/**
*
* @Description: 计划任务执行处 无状态
* Spring调度任务 (重写 quartz 的 QuartzJobBean 类原因是在使用 quartz+spring 把 quartz 的 task 实例化进入数据库时,会产生: serializable 的错误)
*/
public class QuartzJobFactory implements Job {
public final Logger log = Logger.getLogger(this.getClass());
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
ScheduleJob scheduleJob = (ScheduleJob) context.getMergedJobDataMap().get("scheduleJob");
JobUtils.invokMethod(scheduleJob,context);
}
}
QuartzJobFactoryDisallowConcurrentExecution
package com.netease.ad.omp.quartz.job;
import com.netease.ad.omp.entity.sys.ScheduleJob;
import org.apache.log4j.Logger;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
/**
*
* @Description: 若一个方法一次执行不完下次轮转时则等待该方法执行完后才执行下一次操作
* Spring调度任务 (重写 quartz 的 QuartzJobBean 类原因是在使用 quartz+spring 把 quartz 的 task 实例化进入数据库时,会产生: serializable 的错误)
*/
@DisallowConcurrentExecution
public class QuartzJobFactoryDisallowConcurrentExecution implements Job {
public final Logger log = Logger.getLogger(this.getClass());
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
ScheduleJob scheduleJob = (ScheduleJob) context.getMergedJobDataMap().get("scheduleJob");
JobUtils.invokMethod(scheduleJob,context);
}
}
ScheduleJob
package com.netease.ad.omp.entity.sys;
import javax.persistence.Id;
import javax.persistence.Table;
import java.io.Serializable;
import java.util.Date;
/**
* Created with IntelliJ IDEA
* Class Description:计划任务信息
*/
@Table(name = "task_schedule_job")
public class ScheduleJob implements Serializable {
@Id
private Long jobId;
private Date createTime;
private Date updateTime;
/**
* 任务名称
*/
private String jobName;
/**
* 任务分组
*/
private String jobGroup;
/**
* 任务状态 是否启动任务
*/
private String jobStatus;
/**
* cron表达式
*/
private String cronExpression;
/**
* 描述
*/
private String description;
/**
* 任务执行时调用哪个类的方法 包名+类名
*/
private String beanClass;
/**
* 任务是否有状态
*/
private String isConcurrent;
/**
* spring bean
*/
private String springId;
/**
* 任务调用的方法名
*/
private String methodName;
public Long getJobId() {
return jobId;
}
public void setJobId(Long jobId) {
this.jobId = jobId;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public Date getUpdateTime() {
return updateTime;
}
public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}
public String getJobName() {
return jobName;
}
public void setJobName(String jobName) {
this.jobName = jobName;
}
public String getJobGroup() {
return jobGroup;
}
public void setJobGroup(String jobGroup) {
this.jobGroup = jobGroup;
}
public String getJobStatus() {
return jobStatus;
}
public void setJobStatus(String jobStatus) {
this.jobStatus = jobStatus;
}
public String getCronExpression() {
return cronExpression;
}
public void setCronExpression(String cronExpression) {
this.cronExpression = cronExpression;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public String getBeanClass() {
return beanClass;
}
public void setBeanClass(String beanClass) {
this.beanClass = beanClass;
}
public String getIsConcurrent() {
return isConcurrent;
}
public void setIsConcurrent(String isConcurrent) {
this.isConcurrent = isConcurrent;
}
public String getSpringId() {
return springId;
}
public void setSpringId(String springId) {
this.springId = springId;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
}
SpringUtils
package com.netease.ad.omp.common.utils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
public final class SpringUtils implements BeanFactoryPostProcessor {
private static ConfigurableListableBeanFactory beanFactory; // Spring应用上下文环境
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
SpringUtils.beanFactory = beanFactory;
}
/**
* 获取对象
*
* @param name
* @return Object 一个以所给名字注册的bean的实例
* @throws BeansException
*
*/
@SuppressWarnings("unchecked")
public static <T> T getBean(String name) throws BeansException {
return (T) beanFactory.getBean(name);
}
/**
* 获取类型为requiredType的对象
*
* @param clz
* @return
* @throws BeansException
*
*/
public static <T> T getBean(Class<T> clz) throws BeansException {
@SuppressWarnings("unchecked")
T result = (T) beanFactory.getBean(clz);
return result;
}
/**
* 如果BeanFactory包含一个与所给名称匹配的bean定义,则返回true
*
* @param name
* @return boolean
*/
public static boolean containsBean(String name) {
return beanFactory.containsBean(name);
}
/**
* 判断以给定名字注册的bean定义是一个singleton还是一个prototype。
* 如果与给定名字相应的bean定义没有被找到,将会抛出一个异常(NoSuchBeanDefinitionException)
*
* @param name
* @return boolean
* @throws NoSuchBeanDefinitionException
*
*/
public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException {
return beanFactory.isSingleton(name);
}
/**
* @param name
* @return Class 注册对象的类型
* @throws NoSuchBeanDefinitionException
*
*/
public static Class<?> getType(String name) throws NoSuchBeanDefinitionException {
return beanFactory.getType(name);
}
/**
* 如果给定的bean名字在bean定义中有别名,则返回这些别名
*
* @param name
* @return
* @throws NoSuchBeanDefinitionException
*
*/
public static String[] getAliases(String name) throws NoSuchBeanDefinitionException {
return beanFactory.getAliases(name);
}
}