一、ElasticJob是什么
Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。
本文主要基于Elastic-Job-Lite2.x据介绍,Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。
二、ElasticJob架构及核心理念
分布式调度
Elastic-Job-Lite并无作业调度中心节点,而是基于部署作业框架的程序在到达相应时间点时各自触发调度。
注册中心仅用于作业注册和监控信息存储。而主作业节点仅用于处理分片和清理等功能。
作业高可用
Elastic-Job-Lite提供最安全的方式执行作业。将分片总数设置为1,并使用多于1台的服务器执行作业,作业将会以1主n从的方式执行。
一旦执行作业的服务器崩溃,等待执行的服务器将会在下次作业启动时替补执行。开启失效转移功能效果更好,可以保证在本次作业执行时崩溃,备机立即启动替补执行。
最大限度利用资源
Elastic-Job-Lite也提供最灵活的方式,最大限度的提高执行作业的吞吐量。将分片项设置为大于服务器的数量,最好是大于服务器倍数的数量,作业将会合理的利用分布式资源,动态的分配分片项。
三、ElasticJob特性
1、分片策略
ElasticJob 提供最灵活的方式,最大限度的提高执行作业的吞吐量。当新增加作业服务器时,ElasticJob会通过注册中心的临时节点的变化感知到新服务器的存在,并在下次任务调度的时候重新分片,新的服务器会承载一部分作业分片。
AverageAllocationJobShardingStrategy(默认)
基于平均分配算法的分片策略,也是默认的分片策略。
如果分片不能整除,则不能整除的多余分片将依次追加到序号小的服务器。如:
如果有3台服务器,分成9片,则每台服务器分到的分片是:1=[0,1,2], 2=[3,4,5], 3=[6,7,8]
如果有3台服务器,分成8片,则每台服务器分到的分片是:1=[0,1,6], 2=[2,3,7], 3=[4,5]
如果有3台服务器,分成10片,则每台服务器分到的分片是:1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8]
OdevitySortByNameJobShardingStrategy
根据作业名的哈希值奇偶数决定IP升降序算法的分片策略。
作业名的哈希值为奇数则IP升序。
作业名的哈希值为偶数则IP降序。
用于不同的作业平均分配负载至不同的服务器。
AverageAllocationJobShardingStrategy的缺点是,一旦分片数小于作业服务器数,作业将永远分配至IP地址靠前的服务器,导致IP地址靠后的服务器空闲。而OdevitySortByNameJobShardingStrategy则可以根据作业名称重新分配服务器负载。如:
如果有3台服务器,分成2片,作业名称的哈希值为奇数,则每台服务器分到的分片是:1=[0], 2=[1], 3=[]
如果有3台服务器,分成2片,作业名称的哈希值为偶数,则每台服务器分到的分片是:3=[0], 2=[1], 1=[]
RotateServerByNameJobShardingStrategy
根据作业名的哈希值对服务器列表进行轮转的分片策略。
自定义分片策略
2、高可用
ElasticJob每次执行任务时通过注册中心获取节点信息,如果某台节点发生宕机时,在下次执行周期中会在其他节点上重新执行相应任务,从而实现高可用的架构。
3、失效转移
在有些情况下,我们需要在某个节点宕机时将任务转移到别的节点继续执行,失效转移就提供了这种机制。例如下面案例:
图中表示作业分别于 12:00,13:00 和 14:00 执行。图中显示的当前时间点为 13:00 的作业执行中。 如果作业的其中一个分片服务器在 13:10 的时候宕机,那么剩余的 20 分钟应该处理的业务未得到执行,并且需要在 14:00 时才能再次开始执行下一次作业。也就是说,在不开启失效转移的情况下,位于该分片的作业有 50 分钟空档期。如下如图所示。
在开启失效转移功能之后,ElasticJob 的其他服务器能够在感知到宕机的作业服务器之后,补偿执行该分片作业。如下图所示:
4、错过任务重新执行
在执行过程中,会存在任务执行的时长超过了任务的调度间隔周期。可以设置错误任务重新执行,从而保证预期的作业完成执行。
例如下图中表示作业分别于 12:00,13:00 和 14:00 执行。
若12:00任务执行的时长为70分钟,已经到达下次需要执行的周期,这是就存在任务重叠的情况
设置了错过任务重新执行,则在上一个任务执行结束之后,立即执行下一次任务。
四、ElasticJob开发使用案例
1、作业开发案例
ElasticJob默认支持了三种任务执行的方式:简单作业,数据流作业和脚本作业。
简单作业案例
/* 简单作业需要实现SimpleJob接口,并将任务实现在execute方法中 */
public class MyElasticJob implements SimpleJob {
@Override
public void execute(ShardingContext context) {
switch (context.getShardingItem()) {
case 0:
// do something by sharding item 0
break;
case 1:
// do something by sharding item 1
break;
case 2:
// do something by sharding item 2
break;
// case n: ...
}
}
}
数据流作业案例
public class MyElasticJob implements DataflowJob<Foo> {
@Override
public List<Foo> fetchData(ShardingContext context) {
switch (context.getShardingItem()) {
case 0:
List<Foo> data = // get data from database by sharding item 0
return data;
case 1:
List<Foo> data = // get data from database by sharding item 1
return data;
case 2:
List<Foo> data = // get data from database by sharding item 2
return data;
// case n: ...
}
}
@Override
public void processData(ShardingContext shardingContext, List<Foo> data) {
// process data
// ...
}
}
Script脚本作业案例
Script类型作业意为脚本类型作业,支持shell,python,perl等所有类型脚本。只需通过控制台或代码配置scriptCommandLine即可,无需编码。执行脚本路径可包含参数,参数传递完毕后,作业框架会自动追加最后一个参数为作业运行时信息。
作业的启动
elasticjob 2.x版本官方网站提供了java和spring的两种方式,具体可以参照官方文档的连接开发指南 (apache.org)
然而2.x版本没有提供springboot的启动版本,个人编写了基于springboot的starter大家可以参考DoolinMa/xc-elasticjob: elasticjob-spring-boot-starter (github.com)
2、基于elasticjob平台短信发送功能的实现
功能简介
目前系统会通过批量和实时的方式将待发送的短信数据写入数据库中,要求基于springboot开发一套短信发送功能,需要轮训数据库对相应的短信数据进行发送。
简单实现
实现该逻辑的伪代码如下:
public class SmsElasticJob implements SimpleJob {
@Override
public void execute(ShardingContext context) {
switch (context.getShardingItem()) {
case 0:
// 读取待发送短信
readSms();
// 读取待发送短信
sendSms();
break;
}
}
}
缺点:
由于目前设置的调度时间间隔为4分钟,如果1分钟就可以发送完这段时间所产生的待发送短信数量,则将会有3分钟的时间空闲,会浪费部分系统资源。
如果在4分钟时间间隔内产生的短信数据量较大,则会一次性读取到内存中,有内存溢出的风险。
流式发送实现
使用elasticjob的DataflowJob方式实现流式处理,避免内存占用较大和空闲的问题
伪代码如下:
public class SmsElasticJob implements DataflowJob<SmsDO> {
@Override
public List<SmsDO> fetchData(ShardingContext context) {
switch (context.getShardingItem()) {
case 0:
//读取1000短信
List<SmsDO> data = readSms(1000);
return data;
}
}
@Override
public void processData(ShardingContext shardingContext, List<SmsDO> data) {
// 处理待发送短信数据data
for(SmsDO sd : data){
//发送短信
sendSms(sd);
}
}
}
分片处理
由于数据量的逐步增加,单个分片对应数据的处理已经满足不了业务的需求。所以使用对短信编号进行hash分片的策略。使用多个分片增加并发,提高了系统的吞吐量。
伪代码如下:
public class SmsElasticJob implements DataflowJob<SmsDO> {
@Override
public List<SmsDO> fetchData(ShardingContext context) {
//从数据源获取 (短信编号%分片数)== context.getShardingItem() 的待发送短信
List<SmsDO> data = readSms(1000,smsNumHash(context.getShardingItem()));
return data;
}
@Override
public void processData(ShardingContext shardingContext, List<SmsDO> data) {
// 处理待发送短信数据data
for(SmsDO sd : data){
//发送短信
sendSms(sd);
}
}
}
进一步优化
随着数据量的增大,单个数据库已经满足不了数据的存储要求,所以对短信进行了分库处理。针对多库的情况可以使用以下两种分片的策略:
根据每个数据库分配一个或多个分片进行数据处理;
针对多个分库使用轮训的方式,对轮训到的每个库使用短信编号进行hash分片处理。
3、运维平台
ElasticJobLite运维平台,可以监控作业运行状态,作业运行情况、作业触发、删除、下线等操作。可以多维度的控制调度的任务。
[参考链接]
官方网站:ElasticJob - Distributed scheduled job solution
github地址: apache/shardingsphere-elasticjob: Distributed scheduled job framework (github.com)