背景:现在有多个后端服务器,并且在代码中定义了一个定时任务,希望这个定时任务在一个时间只在一个服务器上执行,涉及到分布式调度,调研了一下总结出几种方案:
1.mysql的内置GET_LOCK
GET_LOCK方法的介绍
GET_LOCK是MySQL提供的一种加锁机制,用于控制并发访问数据库中的资源。GET_LOCK允许一个客户端获取一个带有给定名称的锁,并且只有该客户端能够释放该锁。这个锁是应用程序级别的,在不同的mysql会话之间使用,是名字锁,不是锁具体某个表名或字段,具体是锁什么完全交给应用程序。它是一种独占锁,意味着哪个会话持有这个锁,其他会话尝试拿这个锁的时候都会失败。这种方式是针对锁内的所有操作加锁,并不针对特定表或特定行,如果不释放锁,直到关闭连接会话结束,锁才会释放
以下是GET_LOCK方法的语法:
select GET_LOCK(str, timeout);
str: 锁的名称,通常是一个字符串。
timeout: 获取锁的超时时间(秒),如果在指定的时间内无法获取到锁,GET_LOCK将返回0。
释放锁:SELECT RELEASE_LOCK(@lock_key);
如果使用java的jpa,直接定义一个方法,用原生的sql即可,并且不用指定表
@Query(value = "SELECT GET_LOCK(:lockName, 10)", nativeQuery = true)
int addLock(@Param("lockName") String lock);
@Query(value = "SELECT RELEASE_LOCK(:lockName)", nativeQuery = true)
int releaseLock(@Param("lockName") String lock);
2.version乐观锁
需要在数据库表中增加一个version字段,主要用于实现并发更新,读取数据时,将version字段值一通取出,每更新一次version加一,提交更新时判断当前version与第一次取出来的version是否相等,相等则更新,否则认为是过期数据。如下图所示:
使用:
先查出出记录
select (status,status,version) from t_goods where id=#{id}
再修改记录
update t_goods
set status=2,version=version+1
where id=#{id} and version=#{version};
如果使用jpa,可以直接使用version注解来标识版本号字段:
@Version
private int version; // 版本号字段
此时当尝试更新实体时,Spring Data JPA会自动检查版本号是否一致。
3.mysql的悲观锁
用for update实现,sql语句层面是这样:
update table set column=‘value’ for update
这种情况where条件如果涉及到数据库对应的索引字段,会是行级锁,否则会是表锁
实现悲观锁的时候有两种方式:
自行写原生SQL,然后写上for update语句。(方法:findCatalogsForUpdate)
使用@Lock注解,并且设置值为LockModeType.PESSIMISTIC_WRITE即可代表行级锁。
使用Lock注解例子:
@Lock(value = LockModeType.PESSIMISTIC_WRITE) //代表行级锁
@Query("select a from Catalog a where a.id = :id")
Optional<Catalog> findCatalogWithPessimisticLock(@Param("id") Long id);
当你使用 @Lock(LockModeType.PESSIMISTIC_WRITE) 注解来加锁查询结果时,查询会获取一个写锁(排他锁)。这意味着在当前事务提交之前,其他事务将无法读取或修改被锁定的行。
缺点:使用Lock加锁,由于mysql执行速度很快,在一台节点上执行完之后会很快释放锁,另外的节点进入时仍然会获取到锁
4.redis分布式锁
boolean acquired = redisLock.tryLock(LOCK_KEY, lockValue, LOCK_TIMEOUT, TimeUnit.SECONDS);
if (acquired) {}
这里在使用时为了防止多台节点先后得到锁,可以设置redis自动过期机制:
//如果键不存在则新增,存在则不改变已经有的值。(备注:失效时间要大于多台服务器之间的时间差,如果多台服务器时间差大于超时时间,定时任务可能会执行多次)
Boolean flag = redisTemplate.opsForValue().setIfAbsent(key, value, 20, TimeUnit.SECONDS);
if (flag != null && flag) {
log.info("{} 锁定成功,开始处理业务", key)
}
5.分布式任务调度框架(quartz\XXL-JOB)
Quartz 是一个开源的任务调度框架,支持以下功能:
简单和复杂的调度:支持简单的时间间隔调度和复杂的 Cron 表达式调度。
持久化:支持将调度信息持久化到数据库中,确保任务在应用重启后依然有效。
集群支持:支持在分布式环境中运行,确保任务在多个节点中只执行一次。
灵活的任务定义:支持定义各种类型的任务,包括无状态和有状态任务。
缺点:使用来说相对需要加的东西较多,需要添加新依赖和新的类:
Quartz 的基本概念
Job:任务接口,定义具体的任务逻辑。
JobDetail:任务详情,包含任务的定义和相关数据。
Trigger:触发器,定义任务的触发时间和频率。
Scheduler:调度器,管理和调度任务。
Quartz 的使用
- 添加依赖
在 Maven 项目中添加 Quartz 依赖:
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.3.2</version>
</dependency>
- 定义任务
实现 Job 接口,定义具体的任务逻辑:
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
public class MyJob implements Job {
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
System.out.println("Executing MyJob at " + System.currentTimeMillis());
}
}
XXL-JOB:
同样需要添加新的依赖,并且要引入xxljob的配置:
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.3.0</version>
</dependency>
配置:
@Configuration
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Value("${xxl.job.executor.appname}")
private String appname;
@Value("${xxl.job.executor.address}")
private String address;
@Value("${xxl.job.executor.ip}")
private String ip;
@Value("${xxl.job.executor.port}")
private int port;
@Value("${xxl.job.executor.logpath}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
}