Springboot 集成 SpringBatch 批处理组件
1.Spring Batch 简介
Spring Batch 是 Spring 生态系统中的企业级批处理框架,专门设计用于处理大规模数据作业。它提供了批处理应用所需的核心功能,解决了传统批处理应用开发中的重复性问题,使开发人员能够专注于业务逻辑而非基础设施。
核心价值与定位
问题解决:自动化处理周期性的、数据密集型的任务(如报表生成、数据迁移、对账结算)
典型场景:
每月财务报表生成
银行日终批量交易处理
电商平台每日用户行为分析
百万级数据迁移(如旧系统到新系统)
2.批处理工具架构和示例
项 | 接口 |
---|---|
读 | ItemReader |
处理 | ItemProcessor |
写 | ItemWriter |
项目结构
依赖包
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>SpringBatcher</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring-boot.version>3.5.3</spring-boot.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.38</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>easyexcel</artifactId>
<version>4.0.3</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope> <!-- 通常只需运行时依赖 -->
</dependency>
</dependencies>
</project>
启动类
package org.example;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@EnableBatchProcessing
@SpringBootApplication
public class BatchApp {
public static void main(String[] args) {
SpringApplication.run(BatchApp.class, args);
}
}
2.1 批处理任务持久化控制
示例代码基于 H2 存储
package org.example.config;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import javax.sql.DataSource;
/**
* @Author zhx && moon
* @Since 21
* @Date 2025-06-20 PM 4:21
*/
@Configuration
public class BatchJobConfig {
@Bean
public JobRepository jobRepository(DataSource dataSource) throws Exception {
JobRepositoryFactoryBean bean = new JobRepositoryFactoryBean();
bean.setDataSource(dataSource);
bean.setDatabaseType("H2");
bean.setTransactionManager(new DataSourceTransactionManager(dataSource));
bean.afterPropertiesSet();
return bean.getObject();
}
}
2.2 实现一个读取器
以 Excel 文件读取为例
package org.example.job.common;
import com.alibaba.excel.EasyExcel;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.InitializingBean;
import java.io.File;
import java.util.List;
/**
* @Author zhx && moon
* @Since 21
* @Date 2025-06-24 PM 2:16
*/
public class EasyExcelItemReader<T> implements ItemReader<T>, InitializingBean {
private final Class<T> clazz;
private final String filePath;
private List<T> cacheList;
private int index = 0;
public EasyExcelItemReader(Class<T> clazz, String filePath) {
this.clazz = clazz;
this.filePath = filePath;
}
@Override
public void afterPropertiesSet() {
try {
// 一次性读取Excel所有数据(适用于中小文件)
cacheList = EasyExcel.read(new File(filePath))
.head(clazz)
.sheet()
.headRowNumber(1) // 跳过标题行
.doReadSync();
} catch (Exception e) {
throw new RuntimeException("read excel failed ", e);
}
}
@Override
public T read() {
if (index < cacheList.size()) {
return cacheList.get(index++);
}
// 重置读取的位置
index = 0;
return null;
}
}
2.3 定义批处理JOB
package org.example.job;
import org.example.entity.User;
import org.example.job.common.EasyExcelItemReader;
import org.springframework.batch.core.*;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
/**
* @Author zhx && moon
* @Since 21
* @Date 2025-06-24 PM 2:08
*/
@Component
public class SVCJob {
/**
* Excel 读取
* @return
*/
@Bean("easyExcelItemReader")
public EasyExcelItemReader<User> easyExcelItemReader() {
return new EasyExcelItemReader<>(User.class, "C:\\Users\\Administrator\\Desktop\\Test.xlsx");
}
/**
* 数据处理器 对读取的数据进行加工
* @return
*/
@Bean("getNameProcessors")
public ItemProcessor<User, String> getNameProcessors() {
return item -> {
return item.getName();
};
}
/**
* 配置写入器(保持不变)
* @return
*/
@Bean("nameWriter")
public ItemWriter<String> nameWriter() {
return items -> {
for (String item : items) {
System.out.println("User Name: " + item);
}
};
}
/**
* 配置批处理步骤(使用新版API)
* @param jobRepository
* @param transactionManager
* @param reader
* @param processor
* @param writer
* @return
*/
@Bean("easyExcelStep")
public Step easyExcelStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
@Qualifier("easyExcelItemReader") EasyExcelItemReader<User> reader,
@Qualifier("getNameProcessors") ItemProcessor<User, String> processor,
@Qualifier("nameWriter") ItemWriter<String> writer) {
return new StepBuilder("easyExcelStep", jobRepository)
.<User, String>chunk(100, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
.faultTolerant()
.skipLimit(1)
.skip(IllegalArgumentException.class)
.listener(new StepExecutionListener() {
@Override
public void beforeStep(StepExecution stepExecution) {
System.out.println("start to processor data ...");
}
})
.build();
}
/**
* 配置批处理作业
* @param jobRepository
* @param importStep
* @return
*/
@Bean("easyExcelImportJobs")
public Job customerImportJob(JobRepository jobRepository, @Qualifier("easyExcelStep") Step importStep) {
return new JobBuilder("easyExcelImportJobs", jobRepository)
.incrementer(new RunIdIncrementer())
.start(importStep)
.listener(new JobExecutionListener() {
@Override
public void afterJob(JobExecution jobExecution) {
System.out.println("Job Finished!State: " + jobExecution.getStatus());
}
})
.build();
}
}
2.4数据实体
package org.example.entity;
import com.alibaba.excel.annotation.ExcelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @Author zhx && moon
* @Since 21
* @Date 2025-06-24 PM 2:20
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {
@ExcelProperty("姓名")
private String name;
@ExcelProperty("编号")
private String employeeId;
@ExcelProperty("年龄")
private Integer age;
}
2.5 接口类
package org.example.controller;
import jakarta.annotation.Resource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @Author zhx && moon
* @Since 21
* @Date 2025-06-23 PM 4:40
*/
@RestController
@RequestMapping("/job")
public class JobManage {
@Autowired
private JobLauncher jobLauncher;
@Resource(name = "easyExcelImportJobs")
Job job;
@GetMapping("/start")
public void start(){
try {
JobParameters params = new JobParametersBuilder()
.addLong("uniqueId", System.nanoTime())
.toJobParameters();
jobLauncher.run(job, params);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
4.6 H2 配置
spring:
datasource:
url: jdbc:h2:file:Z:/IdeaProjects/SpringBatcher/SpringBatcher/springbatchdb #jdbc:h2:tcp://localhost/mem:springbatchdb;DB_CLOSE_DELAY=-1 #jdbc:h2:mem:springbatchdb
driver-class-name: org.h2.Driver
username: sa
password: sa
h2:
console:
enabled: true
path: /h2/db-console
settings:
web-allow-others: true
batch:
jdbc:
initialize-schema: always
4.7 H2 数据库脚本
-- Autogenerated: do not edit this file
CREATE TABLE BATCH_JOB_INSTANCE (
JOB_INSTANCE_ID BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY ,
VERSION BIGINT ,
JOB_NAME VARCHAR(100) NOT NULL,
JOB_KEY VARCHAR(32) NOT NULL,
constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ;
CREATE TABLE BATCH_JOB_EXECUTION (
JOB_EXECUTION_ID BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY ,
VERSION BIGINT ,
JOB_INSTANCE_ID BIGINT NOT NULL,
CREATE_TIME TIMESTAMP(9) NOT NULL,
START_TIME TIMESTAMP(9) DEFAULT NULL ,
END_TIME TIMESTAMP(9) DEFAULT NULL ,
STATUS VARCHAR(10) ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED TIMESTAMP(9),
constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ;
CREATE TABLE BATCH_JOB_EXECUTION_PARAMS (
JOB_EXECUTION_ID BIGINT NOT NULL ,
PARAMETER_NAME VARCHAR(100) NOT NULL ,
PARAMETER_TYPE VARCHAR(100) NOT NULL ,
PARAMETER_VALUE VARCHAR(2500) ,
IDENTIFYING CHAR(1) NOT NULL ,
constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;
CREATE TABLE BATCH_STEP_EXECUTION (
STEP_EXECUTION_ID BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY ,
VERSION BIGINT NOT NULL,
STEP_NAME VARCHAR(100) NOT NULL,
JOB_EXECUTION_ID BIGINT NOT NULL,
CREATE_TIME TIMESTAMP(9) NOT NULL,
START_TIME TIMESTAMP(9) DEFAULT NULL ,
END_TIME TIMESTAMP(9) DEFAULT NULL ,
STATUS VARCHAR(10) ,
COMMIT_COUNT BIGINT ,
READ_COUNT BIGINT ,
FILTER_COUNT BIGINT ,
WRITE_COUNT BIGINT ,
READ_SKIP_COUNT BIGINT ,
WRITE_SKIP_COUNT BIGINT ,
PROCESS_SKIP_COUNT BIGINT ,
ROLLBACK_COUNT BIGINT ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED TIMESTAMP(9),
constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;
CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT LONGVARCHAR ,
constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ;
CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT LONGVARCHAR ,
constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;
CREATE SEQUENCE BATCH_STEP_EXECUTION_SEQ;
CREATE SEQUENCE BATCH_JOB_EXECUTION_SEQ;
CREATE SEQUENCE BATCH_JOB_SEQ;
3.测试
启动服务
测试 H2 连接
测试数据
触发 JOB
JOB 执行记录