多台服务器分布式定时调度的几种方案

发布于:2024-10-18 ⋅ 阅读:(14) ⋅ 点赞:(0)

背景:现在有多个后端服务器,并且在代码中定义了一个定时任务,希望这个定时任务在一个时间只在一个服务器上执行,涉及到分布式调度,调研了一下总结出几种方案:

1.mysql的内置GET_LOCK

GET_LOCK方法的介绍
GET_LOCK是MySQL提供的一种加锁机制,用于控制并发访问数据库中的资源。GET_LOCK允许一个客户端获取一个带有给定名称的锁,并且只有该客户端能够释放该锁。这个锁是应用程序级别的,在不同的mysql会话之间使用,是名字锁,不是锁具体某个表名或字段,具体是锁什么完全交给应用程序。它是一种独占锁,意味着哪个会话持有这个锁,其他会话尝试拿这个锁的时候都会失败。这种方式是针对锁内的所有操作加锁,并不针对特定表或特定行,如果不释放锁,直到关闭连接会话结束,锁才会释放

以下是GET_LOCK方法的语法:

select GET_LOCK(str, timeout);
str: 锁的名称,通常是一个字符串。
timeout: 获取锁的超时时间(秒),如果在指定的时间内无法获取到锁,GET_LOCK将返回0。

释放锁:SELECT RELEASE_LOCK(@lock_key);
在这里插入图片描述
如果使用java的jpa,直接定义一个方法,用原生的sql即可,并且不用指定表

   @Query(value = "SELECT GET_LOCK(:lockName, 10)", nativeQuery = true)
    int addLock(@Param("lockName") String lock);

    @Query(value = "SELECT RELEASE_LOCK(:lockName)", nativeQuery = true)
    int releaseLock(@Param("lockName") String lock);

2.version乐观锁

需要在数据库表中增加一个version字段,主要用于实现并发更新,读取数据时,将version字段值一通取出,每更新一次version加一,提交更新时判断当前version与第一次取出来的version是否相等,相等则更新,否则认为是过期数据。如下图所示:
在这里插入图片描述

使用:

先查出出记录

select (status,status,version) from t_goods where id=#{id}

再修改记录

update t_goods

set status=2,version=version+1

where id=#{id} and version=#{version};

如果使用jpa,可以直接使用version注解来标识版本号字段:

  @Version
    private int version;  // 版本号字段

此时当尝试更新实体时,Spring Data JPA会自动检查版本号是否一致。

3.mysql的悲观锁

用for update实现,sql语句层面是这样:

update table set column=‘value’ for update

这种情况where条件如果涉及到数据库对应的索引字段,会是行级锁,否则会是表锁

实现悲观锁的时候有两种方式:
自行写原生SQL,然后写上for update语句。(方法:findCatalogsForUpdate)
使用@Lock注解,并且设置值为LockModeType.PESSIMISTIC_WRITE即可代表行级锁。

使用Lock注解例子:

@Lock(value = LockModeType.PESSIMISTIC_WRITE) //代表行级锁
    @Query("select a from Catalog a where a.id = :id")

    Optional<Catalog> findCatalogWithPessimisticLock(@Param("id") Long id);


当你使用 @Lock(LockModeType.PESSIMISTIC_WRITE) 注解来加锁查询结果时,查询会获取一个写锁(排他锁)。这意味着在当前事务提交之前,其他事务将无法读取或修改被锁定的行。

缺点:使用Lock加锁,由于mysql执行速度很快,在一台节点上执行完之后会很快释放锁,另外的节点进入时仍然会获取到锁

4.redis分布式锁

boolean acquired = redisLock.tryLock(LOCK_KEY, lockValue, LOCK_TIMEOUT, TimeUnit.SECONDS); 

if (acquired) {}

这里在使用时为了防止多台节点先后得到锁,可以设置redis自动过期机制:

 //如果键不存在则新增,存在则不改变已经有的值。(备注:失效时间要大于多台服务器之间的时间差,如果多台服务器时间差大于超时时间,定时任务可能会执行多次)

 Boolean flag = redisTemplate.opsForValue().setIfAbsent(key, value, 20, TimeUnit.SECONDS);

if (flag != null && flag) {

 log.info("{} 锁定成功,开始处理业务", key)

            }

5.分布式任务调度框架(quartz\XXL-JOB)

Quartz 是一个开源的任务调度框架,支持以下功能:

简单和复杂的调度:支持简单的时间间隔调度和复杂的 Cron 表达式调度。

持久化:支持将调度信息持久化到数据库中,确保任务在应用重启后依然有效。

集群支持:支持在分布式环境中运行,确保任务在多个节点中只执行一次。

灵活的任务定义:支持定义各种类型的任务,包括无状态和有状态任务。

缺点:使用来说相对需要加的东西较多,需要添加新依赖和新的类:

Quartz 的基本概念

Job:任务接口,定义具体的任务逻辑。

JobDetail:任务详情,包含任务的定义和相关数据。

Trigger:触发器,定义任务的触发时间和频率。

Scheduler:调度器,管理和调度任务。

Quartz 的使用

  1. 添加依赖

在 Maven 项目中添加 Quartz 依赖:

<dependency>
    <groupId>org.quartz-scheduler</groupId>
    <artifactId>quartz</artifactId>
    <version>2.3.2</version>
</dependency>
  1. 定义任务

实现 Job 接口,定义具体的任务逻辑:

import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;

public class MyJob implements Job {
    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        System.out.println("Executing MyJob at " + System.currentTimeMillis());
    }
}

XXL-JOB:

同样需要添加新的依赖,并且要引入xxljob的配置:

<dependency>
  <groupId>com.xuxueli</groupId>
  <artifactId>xxl-job-core</artifactId>
  <version>2.3.0</version>
</dependency>

配置:

@Configuration
public class XxlJobConfig {
    private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
    
    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;
    @Value("${xxl.job.accessToken}")
    private String accessToken;
    @Value("${xxl.job.executor.appname}")
    private String appname;
    @Value("${xxl.job.executor.address}")
    private String address;
    @Value("${xxl.job.executor.ip}")
    private String ip;
    @Value("${xxl.job.executor.port}")
    private int port;
    @Value("${xxl.job.executor.logpath}")
    private String logPath;
    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;
    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
        return xxlJobSpringExecutor;
    }
}