为什么需要“DAG+Quartz”的分布式任务调度?
在我们日常开发过程中,任务调度始终是支撑核心业务流程的“隐形引擎”。无论是电商大促时的“库存扣减→订单生成→支付通知→物流同步”,还是数据处理平台的“日志采集→清洗→聚合→分析”,或是金融系统的“交易对账→风控校验→报表生成”,都需要任务按特定顺序、依赖关系高效执行。
传统定时任务工具(如Quartz)虽能解决“何时执行”的问题,但面对复杂业务场景时,逐渐暴露出三大痛点:
- 依赖关系难管理:任务间若存在“前置任务未完成则后续任务不能执行”的强依赖(如支付成功后才能发短信),传统工具无法直观描述这种逻辑,只能通过“延迟触发”或“人工干预”勉强实现,效率低且易出错;
- 并行执行难实现:对于无依赖关系的任务(如大促时同时执行“库存扣减”和“订单生成”),传统工具需手动拆分任务或依赖外部协调,难以动态适配业务变化;
- 失败处理难闭环:任务失败后,传统工具仅支持简单的“重试”或“标记失败”,无法根据失败类型(如偶发网络问题 vs 永久性错误)动态调整策略(如指数退避重试 vs 熔断跳过),导致资源浪费或业务中断。
而 DAG(有向无环图) 恰好能通过“节点表示任务、边表示依赖”的模型,清晰描述任务的执行逻辑。结合Quartz的分布式调度能力,可构建一个“能描述依赖、支持并行、灵活容错”的任务调度系统。但现实中,这一目标的实现面临多重挑战:
- DAG与Quartz的集成复杂度高:Quartz的任务触发逻辑需与DAG的拓扑排序结果深度绑定,如何动态生成任务执行顺序并触发?
- 分布式环境下的一致性难题:多实例部署时,如何避免任务重复触发?如何保证任务状态(如“执行中”“成功”“失败”)在分布式系统中的准确同步?
- 失败处理的工程化落地:如何设计可配置的重试策略(如指数退避)?如何通过熔断机制避免“失败任务反复重试”导致的资源耗尽?
本文将围绕这些核心问题,从“环境搭建→核心功能开发→测试验证”全程拆解,手把手实现一个支持“依赖管理、并行执行、动态容错”的分布式任务调度系统,解决传统工具无法应对的复杂场景需求。
一、环境准备
1.1 必须的软件和环境
1.1.1 JDK 1.8
- 下载地址:https://adoptium.net/temurin/releases/?version=8(选择Windows/Linux/macOS版本)。
- 验证安装:命令行输入
java -version
,输出类似java version "1.8.0_301"
表示成功。
1.1.2 Maven 3.8+(管理Java依赖)
- 下载地址:https://maven.apache.org/download.cgi(选择二进制包)。
- 安装步骤(以Linux为例):
# 解压到 /opt/maven sudo tar -zxvf apache-maven-3.8.6-bin.tar.gz -C /opt/maven # 配置环境变量(编辑 ~/.bashrc 或 /etc/profile) echo 'export MAVEN_HOME=/opt/maven/apache-maven-3.8.6' >> ~/.bashrc echo 'export PATH=$MAVEN_HOME/bin:$PATH' >> ~/.bashrc source ~/.bashrc # 验证安装 mvn -v # 输出 Maven 版本信息(如 Apache Maven 3.8.6)
1.1.3 MySQL 8.0(存储Quartz的元数据)
- 下载地址:https://dev.mysql.com/downloads/mysql/(选择MySQL Community Server 8.0.x)。
- 安装步骤(以Linux为例):
# 下载并解压 wget https://dev.mysql.com/get/Downloads/MySQL-8.0/mysql-8.0.36-linux-glibc2.17-x86_64.tar.xz tar -xvf mysql-8.0.36-linux-glibc2.17-x86_64.tar.xz -C /opt # 创建软链接(方便后续操作) sudo ln -s /opt/mysql-8.0.36-linux-glibc2.17-x86_64 /opt/mysql # 启动MySQL(首次启动需要初始化) sudo /opt/mysql/bin/mysqld --initialize-insecure --user=mysql --basedir=/opt/mysql --datadir=/opt/mysql/data # 配置环境变量(编辑 ~/.bashrc) echo 'export PATH=/opt/mysql/bin:$PATH' >> ~/.bashrc source ~/.bashrc # 登录MySQL(初始无密码) mysql -u root
1.1.4 Redis 7.0+(存储任务状态和分布式锁)
- 下载地址:https://download.redis.io/releases/redis-7.0.12.tar.gz。
- 安装步骤(以Linux为例):
# 下载并解压 wget https://download.redis.io/releases/redis-7.0.12.tar.gz tar -xzf redis-7.0.12.tar.gz -C /opt # 编译安装 cd /opt/redis-7.0.12 make && make install # 启动Redis(前台模式,方便测试) redis-server
1.2 创建Spring Boot项目
1.2.1 访问Spring Initializr
打开浏览器,访问 https://start.spring.io/,按以下步骤创建项目:
步骤 | 操作 |
---|---|
Project | 选择 Maven Project (Java项目管理工具) |
Language | 选择 Java |
Spring Boot | 选择 3.2.0 (最新稳定版) |
Group | 自定义(如 com.example ) |
Artifact | 自定义(如 dag-scheduler-demo ) |
Dependencies | 添加以下依赖(点击“Add Dependencies”搜索): |
- Spring Web(提供HTTP接口) | |
- Spring Data Redis(操作Redis) | |
- Quartz Scheduler(分布式任务调度核心) | |
- Lombok(简化Java代码) | |
- Fastjson(解析JSON格式的DAG定义) |
1.2.2 导入项目到IDE
- 使用IntelliJ IDEA或Eclipse导入Maven项目(选择
pom.xml
文件)。
1.3 配置数据库和Quartz集群(关键!)
1.3.1 创建MySQL数据库
登录MySQL,创建存储Quartz元数据的数据库:
-- 登录MySQL(初始无密码)
mysql -u root
-- 创建数据库(名称可自定义,这里用 quartz_db)
CREATE DATABASE quartz_db CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;
-- 退出MySQL
exit;
1.3.2 配置Quartz集群(必须!)
在 src/main/resources/application.yml
中添加以下配置:
# 应用基础配置
server:
port: 8080 # 服务端口(默认8080)
# Spring Boot 核心配置
spring:
application:
name: dag-scheduler-demo # 应用名称(自定义)
# Quartz 集群配置(必须!)
quartz:
job-store-type: jdbc # 使用JDBC存储任务元数据(必须)
properties:
org.quartz.jobStore:
class: org.quartz.impl.jdbcjobstore.JobStoreTX # 事务性存储(必须)
driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate # 通用驱动(必须)
tablePrefix: QRTZ_ # 表前缀(默认,无需修改)
isClustered: true # 启用集群(必须!)
misfireThreshold: 60000 # 任务错过触发的最大时间(1分钟)
org.quartz.threadPool:
class: org.quartz.simpl.SimpleThreadPool # 线程池(必须)
threadCount: 10 # 线程数(根据任务量调整,默认10)
threadPriority: 5 # 线程优先级(默认5)
# MySQL 数据库连接配置
datasource:
url: jdbc:mysql://localhost:3306/quartz_db?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai
username: root # 替换为你的MySQL用户名(初始无密码)
password: 123456 # 替换为你的MySQL密码(建议修改)
driver-class-name: com.mysql.cj.jdbc.Driver # MySQL驱动类名
# Redis 配置(存储任务状态和分布式锁)
redis:
host: localhost # 替换为你的Redis地址(本地默认localhost)
port: 6379 # 替换为你的Redis端口(本地默认6379)
database: 0 # 使用Redis的第0号数据库
1.3.3 初始化Quartz表(自动完成)
首次启动Spring Boot应用时,Quartz会自动在 quartz_db
数据库中创建所需的表(如 QRTZ_TRIGGERS
、QRTZ_CRON_TRIGGERS
等)。启动后,登录MySQL验证:
mysql -u root -p # 输入密码(初始无密码)
USE quartz_db;
SHOW TABLES; # 应看到以 QRTZ_ 开头的表(如 QRTZ_JOB_DETAILS、QRTZ_TRIGGERS)
二、核心功能开发
2.1 定义DAG的数据结构(JSON格式)
DAG(有向无环图)用JSON格式描述任务关系,包含节点(任务)和边(依赖关系)。例如,一个“订单支付后通知”的DAG定义如下:
{
"dagId": "order_pay_notify", // DAG的唯一ID(如订单支付后通知)
"name": "订单支付后通知", // DAG的名称(自定义)
"nodes": [ // 任务列表(节点)
{"id": "check_pay", "name": "支付校验", "jobClass": "com.example.job.CheckPayJob"},
{"id": "send_sms", "name": "发送短信", "jobClass": "com.example.job.SendSmsJob"},
{"id": "send_email", "name": "发送邮件", "jobClass": "com.example.job.SendEmailJob"},
{"id": "record_log", "name": "记录日志", "jobClass": "com.example.job.RecordLogJob"}
],
"edges": [ // 依赖关系(边)
{"from": "check_pay", "to": "send_sms"}, // 发送短信依赖支付校验
{"from": "check_pay", "to": "send_email"}, // 发送邮件依赖支付校验
{"from": "send_sms", "to": "record_log"}, // 记录日志依赖发送短信
{"from": "send_email", "to": "record_log"} // 记录日志依赖发送邮件
]
}
关键说明:
dagId
:DAG的唯一标识(如order_pay_notify
),用于区分不同的任务链路;nodes
:任务列表,每个任务有id
(唯一ID)、name
(显示名称)、jobClass
(任务类的全限定名);edges
:依赖关系,from
是前置任务ID,to
是后续任务ID(表示“前置任务完成后才能执行后续任务”)。
2.2 解析DAG:将JSON转为Java对象
需要将JSON格式的DAG定义转换为Java对象,方便程序处理。
2.2.1 定义DAG的节点(TaskNode)和边(TaskEdge)
在 src/main/java/com/example/dag/entity
目录下创建以下类:
// TaskNode.java(任务节点)
package com.example.dag.entity;
import lombok.Data;
@Data
public class TaskNode {
private String id; // 任务ID(唯一,如 check_pay)
private String name; // 任务名称(如 支付校验)
private String jobClass;// 任务类的全限定名(如 com.example.job.CheckPayJob)
}
// TaskEdge.java(任务边,表示依赖关系)
package com.example.dag.entity;
import lombok.Data;
@Data
public class TaskEdge {
private String from; // 前置任务ID(如 check_pay)
private String to; // 后续任务ID(如 send_sms)
}
2.2.2 定义DAG整体结构(DagDefinition)
// DagDefinition.java(完整的DAG定义)
package com.example.dag.entity;
import lombok.Data;
import java.util.List;
@Data
public class DagDefinition {
private String dagId; // DAG的唯一ID(如 order_pay_notify)
private String name; // DAG名称(如 订单支付后通知)
private List<TaskNode> nodes; // 任务节点列表
private List<TaskEdge> edges; // 依赖边列表
}
2.2.3 解析JSON的DAG解析器(DagParser)
使用Fastjson将JSON字符串转换为 DagDefinition
对象:
// DagParser.java(DAG解析器)
package com.example.dag.parser;
import com.alibaba.fastjson.JSON;
import com.example.dag.entity.DagDefinition;
import org.springframework.stereotype.Component;
@Component
public class DagParser {
// Fastjson的JSON解析器(自动注入)
private final JSON jsonParser = JSON.parseObject("");
/**
* 将JSON字符串解析为DagDefinition对象
* @param dagJson JSON格式的DAG定义(如上面的示例)
* @return DagDefinition对象
*/
public DagDefinition parse(String dagJson) {
return jsonParser.parseObject(dagJson, DagDefinition.class);
}
}
2.3 拓扑排序:生成任务的执行顺序
DAG的任务必须按依赖关系执行(比如“支付校验”完成后才能执行“发送短信”)。我们需要用 拓扑排序 算法(Kahn算法)生成执行顺序。
2.3.1 拓扑排序的原理
拓扑排序的核心是:
- 统计每个任务的入度(有多少前置任务);
- 将入度为0的任务(没有前置任务)加入执行队列;
- 依次执行队列中的任务,并将其后续任务的入度减1;
- 重复步骤2-3,直到所有任务都被执行。
如果有任务未被执行,说明DAG中存在循环依赖(比如A依赖B,B依赖A),此时拓扑排序会失败。
2.3.2 实现拓扑排序(TopologySorter)
在 src/main/java/com/example/dag/service
目录下创建 TopologySorter.java
:
// TopologySorter.java(拓扑排序工具类)
package com.example.dag.service;
import com.example.dag.entity.TaskEdge;
import com.example.dag.entity.TaskNode;
import com.example.dag.entity.DagDefinition;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.stream.Collectors;
@Service
public class TopologySorter {
/**
* 对DAG进行拓扑排序,生成执行顺序
* @param dagDefinition DAG定义
* @return 执行顺序(任务ID列表)
*/
public List<String> sort(DagDefinition dagDefinition) {
// 1. 统计每个任务的入度(初始为0)
Map<String, Integer> inDegreeMap = new HashMap<>();
for (TaskNode node : dagDefinition.getNodes()) {
inDegreeMap.put(node.getId(), 0); // 初始入度为0
}
// 2. 遍历所有边,更新后续任务的入度(前置任务+1)
for (TaskEdge edge : dagDefinition.getEdges()) {
String toNodeId = edge.getTo();
inDegreeMap.put(toNodeId, inDegreeMap.getOrDefault(toNodeId, 0) + 1);
}
// 3. 初始化队列(入度为0的任务)
Queue<String> queue = new LinkedList<>();
for (Map.Entry<String, Integer> entry : inDegreeMap.entrySet()) {
if (entry.getValue() == 0) {
queue.offer(entry.getKey()); // 入度为0的任务加入队列
}
}
// 4. 生成执行顺序
List<String> executionOrder = new ArrayList<>();
while (!queue.isEmpty()) {
String nodeId = queue.poll(); // 取出一个入度为0的任务
executionOrder.add(nodeId);
// 遍历当前节点的所有后续任务,减少它们的入度
for (TaskEdge edge : dagDefinition.getEdges()) {
if (edge.getFrom().equals(nodeId)) { // 当前节点是前置任务
String nextNodeId = edge.getTo();
int newInDegree = inDegreeMap.get(nextNodeId) - 1; // 后续任务入度-1
inDegreeMap.put(nextNodeId, newInDegree);
// 如果后续任务的入度变为0,加入队列
if (newInDegree == 0) {
queue.offer(nextNodeId);
}
}
}
}
// 5. 检查是否有循环依赖(总任务数与执行顺序长度不一致说明有环)
if (executionOrder.size() != dagDefinition.getNodes().size()) {
throw new RuntimeException("DAG存在循环依赖,无法排序!");
}
return executionOrder;
}
}
2.4 集成Quartz:触发任务执行
Quartz是Java生态中最经典的任务调度框架,支持定时触发和分布式执行。我们需要让Quartz根据拓扑排序的结果,按顺序触发任务。
2.4.1 自定义Quartz任务类(BaseJob)
所有任务需要继承Quartz的 QuartzJobBean
类,并实现 executeInternal
方法(任务的具体逻辑在这里编写)。
在 src/main/java/com/example/dag/job
目录下创建 BaseJob.java
:
// BaseJob.java(所有任务的基类)
package com.example.dag.job;
import lombok.extern.slf4j.Slf4j;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.scheduling.quartz.QuartzJobBean;
@Slf4j
public abstract class BaseJob extends QuartzJobBean {
/**
* 任务的核心执行逻辑(由子类实现)
* @param dagId DAG的唯一ID
* @param nodeId 当前任务的ID
* @param context Quartz的执行上下文(可传递参数)
*/
protected abstract void executeTask(String dagId, String nodeId, JobDataMap context);
/**
* Quartz触发任务时调用的方法
*/
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
// 从Quartz的上下文中获取参数(如dagId、nodeId)
JobDataMap dataMap = context.getJobDetail().getJobDataMap();
String dagId = dataMap.getString("dagId");
String nodeId = dataMap.getString("nodeId");
try {
log.info("开始执行任务:dagId={}, nodeId={}", dagId, nodeId);
// 执行当前任务的具体逻辑(由子类实现)
executeTask(dagId, nodeId, dataMap);
log.info("任务执行成功:dagId={}, nodeId={}", dagId, nodeId);
// 任务执行成功,更新状态为SUCCESS(后面会讲状态管理)
StateManager.markTaskSuccess(dagId, nodeId);
} catch (Exception e) {
log.error("任务执行失败:dagId={}, nodeId={}", dagId, nodeId, e);
// 任务执行失败,更新状态为FAIL(后面会讲失败处理)
StateManager.markTaskFail(dagId, nodeId, e.getMessage());
}
}
}
2.4.2 示例任务类(发送短信)
编写一个具体的任务类(例如“发送短信”任务),继承 BaseJob
并实现 executeTask
方法:
// SendSmsJob.java(发送短信的任务)
package com.example.dag.job;
import com.example.dag.service.StateManager;
import lombok.extern.slf4j.Slf4j;
import org.quartz.JobDataMap;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class SendSmsJob extends BaseJob {
@Autowired
private StateManager stateManager; // 状态管理器(用于记录任务状态)
@Override
protected void executeTask(String dagId, String nodeId, JobDataMap context) {
// 从上下文中获取参数(比如订单ID)
String orderId = context.getString("orderId");
log.info("发送短信给订单:{},DAG ID:{},任务ID:{}", orderId, dagId, nodeId);
// 实际调用短信网关(这里模拟)
// SmsClient.send(orderId, "您的订单已支付成功!");
}
}
2.4.3 触发DAG执行(DagScheduler)
创建一个调度器类,负责解析DAG、生成执行顺序,并调用Quartz触发每个任务:
// DagScheduler.java(DAG调度器)
package com.example.dag.scheduler;
import com.alibaba.fastjson.JSON;
import com.example.dag.entity.DagDefinition;
import com.example.dag.entity.TaskNode;
import com.example.dag.parser.DagParser;
import com.example.dag.service.TopologySorter;
import com.example.dag.state.StateManager;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.stereotype.Service;
import java.util.List;
@Slf4j
@Service
@RequiredArgsConstructor
public class DagScheduler {
private final SchedulerFactoryBean schedulerFactoryBean; // Quartz调度器工厂
private final TopologySorter topologySorter; // 拓扑排序工具类
private final DagParser dagParser; // DAG解析器
private final StateManager stateManager; // 状态管理器
/**
* 触发一个DAG的执行
* @param dagJson JSON格式的DAG定义(如上面的示例)
*/
public void triggerDag(String dagJson) {
try {
// 1. 解析DAG定义(将JSON转为Java对象)
DagDefinition dagDefinition = dagParser.parse(dagJson);
log.info("解析DAG成功:dagId={}", dagDefinition.getDagId());
// 2. 生成拓扑排序的执行顺序(任务ID列表)
List<String> executionOrder = topologySorter.sort(dagDefinition);
log.info("生成执行顺序:{}", executionOrder);
// 3. 遍历执行顺序,逐个触发任务
for (String nodeId : executionOrder) {
// 找到当前节点的定义(从dagDefinition中获取)
TaskNode currentNode = dagDefinition.getNodes().stream()
.filter(node -> node.getId().equals(nodeId))
.findFirst()
.orElseThrow(() -> new RuntimeException("未找到任务节点:" + nodeId));
// 4. 构建Quartz的JobDetail(任务的详细信息)
JobDetail jobDetail = JobBuilder.newJob(BaseJob.class)
.withIdentity(nodeId, "dag_group") // 任务唯一标识(节点ID + 分组)
.usingJobData("dagId", dagDefinition.getDagId()) // 传递DAG ID
.usingJobData("nodeId", nodeId) // 传递当前任务ID
.usingJobData("orderId", "20250721123456") // 示例参数(订单ID)
.build();
// 5. 构建Quartz的Trigger(触发器,立即触发)
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity(nodeId + "_trigger", "dag_group") // 触发器唯一标识
.startNow() // 立即触发
.build();
// 6. 调度执行(Quartz会自动管理分布式触发)
Scheduler scheduler = schedulerFactoryBean.getScheduler();
scheduler.scheduleJob(jobDetail, trigger);
log.info("触发任务成功:dagId={}, nodeId={}", dagDefinition.getDagId(), nodeId);
}
} catch (Exception e) {
log.error("触发DAG失败:dagJson={}", dagJson, e);
throw new RuntimeException("触发DAG失败", e);
}
}
}
2.5 状态管理:记录任务的执行状态
为了知道任务是否成功或失败,需要记录每个任务的执行状态。这里用Redis存储状态(方便分布式访问)。
2.5.1 定义任务状态枚举(TaskState)
在 src/main/java/com/example/dag/enum
目录下创建 TaskState.java
:
// TaskState.java(任务状态枚举)
package com.example.dag.enums;
public enum TaskState {
PENDING, // 待执行(等待依赖或触发)
RUNNING, // 执行中
SUCCESS, // 成功
FAIL, // 失败
RETRYING // 重试中
}
2.5.2 状态管理器(StateManager)
使用Redis存储每个任务的状态,并提供更新状态的方法:
// StateManager.java(状态管理器)
package com.example.dag.state;
import com.example.dag.enums.TaskState;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
@RequiredArgsConstructor
public class StateManager {
private final StringRedisTemplate redisTemplate; // Redis的字符串模板(自动注入)
// 存储任务状态的键格式:dag:{dagId}:status:{nodeId}
private static final String STATUS_KEY_PREFIX = "dag:%s:status:%s";
// 存储错误信息的键格式:dag:{dagId}:error:{nodeId}
private static final String ERROR_KEY_PREFIX = "dag:%s:error:%s";
/**
* 标记任务为成功
* @param dagId DAG的唯一ID
* @param nodeId 任务ID
*/
public void markTaskSuccess(String dagId, String nodeId) {
updateTaskState(dagId, nodeId, TaskState.SUCCESS);
}
/**
* 标记任务为失败
* @param dagId DAG的唯一ID
* @param nodeId 任务ID
* @param errorMsg 失败原因
*/
public void markTaskFail(String dagId, String nodeId, String errorMsg) {
updateTaskState(dagId, nodeId, TaskState.FAIL);
// 存储错误信息到Redis(可选,保留7天)
redisTemplate.opsForValue().set(
String.format(ERROR_KEY_PREFIX, dagId, nodeId),
errorMsg,
7, TimeUnit.DAYS
);
}
/**
* 标记任务为运行中
* @param dagId DAG的唯一ID
* @param nodeId 任务ID
*/
public void markTaskRunning(String dagId, String nodeId) {
updateTaskState(dagId, nodeId, TaskState.RUNNING);
}
/**
* 更新任务状态
*/
private void updateTaskState(String dagId, String nodeId, TaskState state) {
String key = String.format(STATUS_KEY_PREFIX, dagId, nodeId);
redisTemplate.opsForValue().set(key, state.name());
log.info("更新任务状态:dagId={}, nodeId={}, state={}", dagId, nodeId, state);
}
/**
* 获取任务状态
* @param dagId DAG的唯一ID
* @param nodeId 任务ID
* @return 任务状态(如SUCCESS、FAIL)
*/
public TaskState getTaskState(String dagId, String nodeId) {
String key = String.format(STATUS_KEY_PREFIX, dagId, nodeId);
String stateStr = redisTemplate.opsForValue().get(key);
if (stateStr == null) {
return TaskState.PENDING; // 默认未开始
}
return TaskState.valueOf(stateStr);
}
/**
* 获取任务错误信息
* @param dagId DAG的唯一ID
* @param nodeId 任务ID
* @return 错误信息(无错误返回null)
*/
public String getTaskError(String dagId, String nodeId) {
String key = String.format(ERROR_KEY_PREFIX, dagId, nodeId);
return redisTemplate.opsForValue().get(key);
}
}
2.6 失败处理:重试与熔断
任务执行失败时,需要自动重试或触发熔断机制,避免无限失败。
2.6.1 失败重试策略(RetryPolicy)
定义一个简单的指数退避重试策略(失败后等待时间逐渐增加):
// RetryPolicy.java(失败重试策略)
package com.example.dag.retry;
import org.springframework.stereotype.Component;
@Component
public class RetryPolicy {
private int maxRetryCount = 3; // 最大重试次数(3次)
private long initialInterval = 1000; // 初始重试间隔(1秒)
private double backoffFactor = 2; // 退避因子(每次间隔翻倍)
/**
* 计算下一次重试的间隔时间
* @param retryCount 当前已重试次数
* @return 间隔时间(毫秒)
*/
public long getNextInterval(int retryCount) {
if (retryCount >= maxRetryCount) {
return -1; // 超过最大次数,不再重试
}
return (long) (initialInterval * Math.pow(backoffFactor, retryCount));
}
}
2.6.2 熔断机制(CircuitBreaker)
当某个任务频繁失败(比如失败率超过50%),触发熔断,暂时跳过该任务:
// CircuitBreaker.java(熔断机制)
package com.example.dag.circuitbreaker;
import com.example.dag.enums.TaskState;
import com.example.dag.state.StateManager;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
@RequiredArgsConstructor
public class CircuitBreaker {
private final StringRedisTemplate redisTemplate;
private final StateManager stateManager;
// 存储熔断状态的键格式:dag:{dagId}:circuit:{nodeId}
private static final String CIRCUIT_KEY_PREFIX = "dag:%s:circuit:%s";
// 熔断窗口大小(5分钟)
private static final long WINDOW_SIZE = 5 * 60 * 1000;
// 失败率阈值(50%)
private static final double FAIL_RATE_THRESHOLD = 0.5;
/**
* 检查是否允许执行任务(熔断未触发时允许)
* @param dagId DAG的唯一ID
* @param nodeId 任务ID
* @return 允许执行返回true,否则返回false
*/
public boolean allowExecute(String dagId, String nodeId) {
String key = String.format(CIRCUIT_KEY_PREFIX, dagId, nodeId);
Long failCount = redisTemplate.opsForValue().get(key);
if (failCount == null || failCount < 3) { // 失败次数小于3次,允许执行
return true;
}
// 计算失败率(假设窗口内有10次执行)
double failRate = (double) failCount / 10;
return failRate < FAIL_RATE_THRESHOLD; // 失败率低于50%允许执行
}
/**
* 记录任务失败(用于熔断统计)
* @param dagId DAG的唯一ID
* @param nodeId 任务ID
*/
public void recordFailure(String dagId, String nodeId) {
String key = String.format(CIRCUIT_KEY_PREFIX, dagId, nodeId);
redisTemplate.opsForValue().increment(key); // 失败次数+1
}
}
三、测试验证
3.1 启动系统
- 确保MySQL和Redis已启动;
- 在Spring Boot项目的根目录执行
mvn spring-boot:run
,启动应用; - 控制台输出
Started DagSchedulerDemoApplication in X seconds
表示启动成功。
3.2 触发DAG执行
通过HTTP接口触发DAG执行(需要先添加一个Controller):
// DagController.java(提供HTTP接口触发DAG)
package com.example.dag.controller;
import com.example.dag.scheduler.DagScheduler;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/dag")
@RequiredArgsConstructor
public class DagController {
private final DagScheduler dagScheduler;
/**
* 触发DAG执行的接口
*/
@PostMapping("/trigger")
public String triggerDag(@RequestBody String dagJson) {
try {
dagScheduler.triggerDag(dagJson);
return "DAG触发成功!";
} catch (Exception e) {
e.printStackTrace();
return "DAG触发失败:" + e.getMessage();
}
}
}
3.3 发送POST请求触发DAG
使用Postman或curl发送以下请求(替换 dagJson
为你的DAG定义):
curl -X POST http://localhost:8080/dag/trigger \
-H "Content-Type: application/json" \
-d '{
"dagId": "order_pay_notify",
"name": "订单支付后通知",
"nodes": [
{"id": "check_pay", "name": "支付校验", "jobClass": "com.example.job.CheckPayJob"},
{"id": "send_sms", "name": "发送短信", "jobClass": "com.example.job.SendSmsJob"},
{"id": "send_email", "name": "发送邮件", "jobClass": "com.example.job.SendEmailJob"},
{"id": "record_log", "name": "记录日志", "jobClass": "com.example.job.RecordLogJob"}
],
"edges": [
{"from": "check_pay", "to": "send_sms"},
{"from": "check_pay", "to": "send_email"},
{"from": "send_sms", "to": "record_log"},
{"from": "send_email", "to": "record_log"}
]
}'
3.4 查看任务执行结果
3.4.1 查看控制台输出
任务执行时,控制台会打印每个任务的触发信息(如“触发任务:check_pay”)。
3.4.2 查看Redis状态
使用Redis客户端(如 redis-cli
)查看任务状态:
redis-cli
> KEYS "dag:order_pay_notify:status:*" # 查看所有任务状态键
1) "dag:order_pay_notify:status:check_pay"
2) "dag:order_pay_notify:status:send_sms"
3) "dag:order_pay_notify:status:send_email"
4) "dag:order_pay_notify:status:record_log"
> GET "dag:order_pay_notify:status:check_pay" # 查看支付校验任务的状态
"SUCCESS"
> GET "dag:order_pay_notify:status:send_sms" # 查看发送短信任务的状态
"SUCCESS"
> GET "dag:order_pay_notify:status:send_email" # 查看发送邮件任务的状态
"SUCCESS"
> GET "dag:order_pay_notify:status:record_log" # 查看记录日志任务的状态
"SUCCESS"
四、常见问题与解决
4.1 Quartz集群启动失败
现象:启动应用时报错 org.quartz.JobPersistenceException: Couldn't store trigger...
。
原因:Quartz的元数据表未正确创建。
解决:手动执行Quartz的建表脚本(https://github.com/quartz-scheduler/quartz/tree/main/quartz-core/src/main/resources/org/quartz/impl/jdbcjobstore),或检查MySQL用户是否有建表权限。
4.2 任务未按顺序执行
现象:“发送短信”任务在“支付校验”完成前就执行了。
原因:拓扑排序逻辑错误,或Quartz触发任务时未等待依赖。
解决:检查 TopologySorter
的排序逻辑,确保入度计算正确;或在触发任务时,检查前置任务是否已完成(通过 StateManager
查询状态)。
4.3 任务失败后未重试
现象:任务执行失败后,状态直接变为 FAIL
,没有重试。
原因:未在任务中调用重试逻辑,或 RetryPolicy
未正确配置。
解决:在 BaseJob
的 executeInternal
方法中,捕获异常后调用重试逻辑(如使用 RetryPolicy
计算间隔,重新触发任务)。
总结
到此为止,已经完成了一个支持“任务依赖、并行执行、失败重试”的分布式任务调度系统的搭建。核心流程是:
- 用JSON定义DAG的任务关系;
- 用拓扑排序生成执行顺序;
- 用Quartz触发任务执行;
- 用Redis记录任务状态;
- 用重试和熔断机制提升健壮性。
当然,这只是一个基础实现,实际生产环境中还可以扩展:
- 增加任务参数传递(上游任务的输出作为下游任务的输入);
- 提供可视化界面(如前端拖拽编辑DAG);
- 集成监控告警(任务失败时发送邮件/钉钉通知);
- 支持动态修改DAG(运行时调整任务链路)。
最后,希望通过本文,你能大致了解DAG+Quartz整个流程,后续可以在此基础上进一步优化探索!