手把手教你将@Schedule任务调度升级为分布式调度@DistributeSchedule

发布于:2022-11-11 ⋅ 阅读:(1137) ⋅ 点赞:(1)

背景介绍

很多小伙伴们都跟我留言说过一个类似的问题,就是针对于任务调度框架而言的选取,很多公司都会采用任务调度框架的鼻祖Quartz,那么我们来梳理以下Java领域的任务调度框架吧。

Java领域的定时任务的框架

单机级别任务调度

  • TimeTask:是一个定时器类,通过该类可以为指定的定时任务进行配置。TimerTask类是一个定时任务类,该类实现了Runnable接口,缺点异常未检查会中止线程。

  • ScheduledExecutorService:相对延迟或者周期作为定时任务调度,缺点没有绝对的日期或者时间

  • spring定时框架:配置简单功能较多,如果系统使用单机的话可以优先考虑spring定时器

给予Java原生的一些任务调度框架后有很多组织机构推陈出新,出现了很多的优秀的任务调度框架,它们除了可以实现任务调度,还可以实现了分布式体系下的运行任务。例如以下:

分布式级别任务调度

  • Quartz:Java实现的定时任务调度框架,但Quartz关注点在于定时任务而非数据,并没有一套根据数据处理而定制化的流程。虽然Quartz可以基于数据库实现作业的高可用,但缺少分布式并行调度的功能。

  • TBSchedule:阿里早期开源的分布式任务调度系统,代码较为陈旧,使用timer而非线程池执行任务调度。众所周知,timer在处理异常状况时是有缺陷的。而且TBSchedule作业类型较为单一,只能是获取/处理数据一种模式。还有就是文档缺失比较严重,而且目前已经没有对应的人进行维护了,(处理方式主要分为“抢占式”和“协同分配式”,通过集群的节点分担大批量任务的处理,提高批量任务的处理效率)。

  • Elastic-job:当当开发的弹性分布式任务调度系统,功能丰富强大,采用zookeeper实现分布式协调,实现任务高可用以及分片,并且可以支持云开发,但是开发的模式有点侵入性,但也是一个不错的选择方向

  • Saturn:是唯品会自主研发的分布式的定时任务的调度平台,基于当当的elastic-job 版本开发的,并且可以很好的部署到docker容器上,目前本人还没有做过

  • Xxl-job: 是大众点评员工徐雪里于2015年发布的分布式任务调度平台,是一个轻量级分布式任务调度框架,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。

回到主旨如何建立Spring的分布式框架

这时候你会说,如果我们已经有了很多以上的分布式能力的任务调度框架了,我们为什么还要再进行改造Spring模式的任务调度框架呢,好的,就这事一个问题,如果当我们的任务调度不需要那么大的资源以及不能够依靠以上的开源框架去处理的时候,我们只能采用单机版的场景下,那该怎么办呢?比如说,公司不需要提供额外的部署分布式调度、资源不足不想暴漏任务调度服务的单点问题等场景、甚至很多人习惯了使用任务调度了@Schedule,你让他直接改成以上的开发模式,成本是否也会很高?这该咋办?那么种种不能使用以上开源的分布式任务调度框架的时候,才有了我们今天的课题系列。

废话不多说直接上干货

先说以下我们所需要的原材料有哪些,首先Spring框架是必须的,之后还需要就是可以连接Redis的组件如:jedis、lettuce、Redission等。本次我们采用的是Redission框架,如果有用其他的小伙伴可以直接替换理念就好。

分析以下Spring的@Schedule的工作原理

@EnableScheduling这个注解

以下是源码:

可以看出Schedule调度框架主要启动需要依靠SchedulingConfiguration这个类进行解析扫描所有的组件。我们深入进去看一下。

别的我们都不看,我们就关注一个类:ScheduledAnnotationBeanPostProcessor,相信对Spring了解的小伙伴们应该知道了,这个是一个注解注入的扫描器类,专门处理bean的生命周期的,专门讲@Scheduled的方法对应的类进行动态代理植入到对应的容器内,并且分配对应的任务调度触发器机制,以下就是ScheduledAnnotationBeanPostProcessor的处理类的源码。

再次我们重点关注代码,后置处理的类拦截机制,他会扫描出所有相关的@Schedule对应的类和方法:

然后进行循环调用processScheduled的方法:

我们再来看一下processScheduled方法里面最直接封装到对应的schedule调用机制的方法片段。

cron模式

delay模式

fix模式

以上类型分为了三种模式:Cron模式、fix模式、delay模式等,相信了解任务调度的小伙伴们并不陌生吧!

最重要的改造点来了

我们如何通过以上分析得出的结论讲我们的@Schedule改造为分布式模式的呢,关键点就在这里,下图标黄的位置createRunnable方法,这也是Spring提供给我们唯一可以改造的地方。

createRunnable

进入createRunnable方法之后,我们需要看一下最后创建出的对象是什么?就是这个ScheduledMethodRunnable,进入这里的ScheduledMethodRunnable的类后可以看到如下

改造位置

这里就是真正要执行的代码,我们只需要再这里加入分布式锁,进行拦截,保证多个任务调度,同时只会有一个方法进行执行是否就可以保证了呢。但是如果你说需要进行分片处理,也是可以通过数据条件进行分隔,但是那又是另外一回事了。所以是不是很好就可以解决了?那我们来实现一下啊,以下就是小编完成的分布式改造后的成品。

我的分布式任务调度

定义属于我的分布式调度注解

我才用了加入前缀Distribute用来区分和原来的任务调度。

启动分布式任务调度机制

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(DistributeSchedulingConfiguration.class)
@Documented
public @interface EnableDistributeScheduling {}
复制代码

标识任务调度机制(1)

源码直接拷贝过来改一下改一下名称就好,调整成为对应的注解名称

@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DistributeScheduleds {
    DistributeScheduled[] value();
}
复制代码

标识任务调度机制(2)

源码直接拷贝过来改一下改一下名称就好,调整成为对应的注解名称

对应的EnableDistributeScheduling的配置类服务

和源码保持一致,只是讲原来的ScheduledAnnotationBeanPostProcessor改为DistributeScheduledAnnotationBeanPostProcessor的对象即可。

@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class DistributeSchedulingConfiguration {

    @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public DistributeScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
        return new DistributeScheduledAnnotationBeanPostProcessor();
    }
}
复制代码

定义DistributeSchedulingConfiguration中的DistributeScheduledAnnotationBeanPostProcessor处理器

只需要找原来的样子调整成为return new DistributeScheduledMethodRunnable(target, invocableMethod,redisDistributionLock);即可。

 /**
     * Create a {@link Runnable} for the given bean instance,
     * calling the specified scheduled method.
     * <p>The default implementation creates a {@link ScheduledMethodRunnable}.
     * @param target the target bean instance
     * @param method the scheduled method to call
     * @since 5.1
     * @see ScheduledMethodRunnable#ScheduledMethodRunnable(Object, Method)
     */
    protected Runnable createRunnable(Object target, Method method) {
        Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @DistributeScheduled");
        Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass());
        RedisDistributionLock redisDistributionLock = applicationContext.getBean(RedisDistributionLock.class);
        return new DistributeScheduledMethodRunnable(target, invocableMethod,redisDistributionLock);
    }
复制代码

定义我的DistributeScheduledMethodRunnable的runnable对象

添加分布式锁组件

添加了我自己封装的分布式所有的组件依赖之后即可。

改造run方法

添加对执行方法之前的加锁操作,我们采用的key为ClassName+methodName,如果可以还可以加入参数的调整。

亲测有效!不信可以试试看!

本文含有隐藏内容,请 开通VIP 后查看

网站公告

今日签到

点亮在社区的每一天
去签到