SpringBatch简单处理多表批量动态更新

发布于:2025-02-27 ⋅ 阅读:(9) ⋅ 点赞:(0)

         项目需要处理一堆表,这些表数据量不是很大都有经纬度信息,但是这些表的数据没有流域信息,需要按经纬度信息计算所属流域信息。比较简单的项目,按DeepSeek提示思索完成开发,AI真好用。

       阿里AI个人版本IDEA安装

      IDEA中使用DeepSeek满血版的手把手教程来了!

代码实现

1、controller

/**
 * 批量流域处理任务
 */
@Tag(name = "批量流域处理任务")
@ApiSupport(order = 2)
@RequestMapping("/job")
@RestController
public class SysBatchJobController {

    @Autowired
    JobLauncher jobLauncher;

    @Autowired
    JobOperator jobOperator;

    @Autowired
    @Qualifier("updateWaterCodeJob")
    private Job updateWaterCodeJob;

    // 多线程分页更新数据
    @GetMapping("/asyncJob")
    public void asyncJob() throws Exception {
        JobParameters jobParameters = new JobParametersBuilder().addLong("time",System.currentTimeMillis()).toJobParameters();
        JobExecution run = jobLauncher.run(updateWaterCodeJob, jobParameters);
        run.getId();
    }

}

2、批量处理表

/**
 * 需要批量处理的业务表信息
 */
@Builder
@AllArgsConstructor
@Data
@TableName(value = "ads_t_sys_batch_update_table")
public class SysBatchUpdateTable extends BaseEntity implements Serializable {

    private static final long serialVersionUID = -7367871287146067724L;

    @TableId(type = IdType.ASSIGN_UUID)
    private String pkId;

    /**
     * 需要更新的表名
     **/
    @TableField(value = "table_name")
    private String tableName;

    /**
     * 所需更新表所在数据库ID
     **/
    @TableField(value = "data_source_id")
    private String dataSourceId;

    /**
     * 表对应的主键字段
     **/
    @TableField(value = "key_id")
    private String keyId;

    /**
     * 表对应的流域字段
     **/
    @TableField(value = "water_code_column")
    private String waterCodeColumn;

    /**
     * 表对应的经度字段
     **/
    @TableField(value = "lon_column")
    private String lonColumn;

    /**
     * 表对应的纬度字段
     **/
    @TableField(value = "lat_column")
    private String latColumn;


    public SysBatchUpdateTable() {
        
    }

}

3、Mapper,传递参数比较麻烦,可以考虑将参数动态整合到sql里面构造

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.bigdatacd.panorama.system.domain.po.SysBatchUpdateTable;
import org.apache.ibatis.annotations.Mapper;

import java.util.List;
import java.util.Map;

@Mapper
public interface UpdateTableMapper extends BaseMapper<SysBatchUpdateTable> {

    /**
     * 根据表名分页查询数据
     * @param tableName 表名
     * @return
     */
    List<Map<String, Object>> selectUpdateTableByPage(String tableName);

    /**
     * 更新数据
     * @param tableName 表名
     * @param waterCode 流域编码
     * @param pkId  表主键ID
     */
    void updateWaterCode(String tableName,
                         String waterCode,
                         String pkId);
}


<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.bigdatacd.panorama.system.mapper.UpdateTableMapper">

    <!-- 动态分页查询通过#和$区别动态构造更新所需参数 -->
    <select id="selectUpdateTableByPage" resultType="java.util.HashMap">
        <!--如果有分页查询就直接使用分页查询sql-->
        SELECT
        ${keyId} as pkId,
        #{keyId} as keyId,
        ${waterCodeColumn} as waterCode,
        #{waterCodeColumn} as waterCodeColumn,
        ${lonColumn} as lon,
        ${latColumn} as lat,
        #{tableName} as tableName
        FROM ${tableName}  a  where ${waterCodeColumn} is null
        ORDER BY ${keyId} <!-- 确保分页顺序 -->
        LIMIT #{_pagesize} OFFSET #{_skiprows}
    </select>

    <!-- 动态更新 -->
    <update id="updateWaterCode">
        UPDATE ${tableName}
        SET ${waterCodeColumn} = #{waterCode}
        WHERE ${keyId} = #{pkId} <!-- 假设主键为id -->
    </update>
</mapper>

4、配置文件

Spring
 batch:
    job:
      enabled: false   #启动时不启动job
    jdbc:
      initialize-schema: always  
  sql:
    init:
      schema-locations: classpath:/org/springframework/batch/core/schema-mysql.sql    

数据源url加个批处理参数rewriteBatchedStatements=true
 url: jdbc:mysql://localhost:3306/xxxx?autoReconnect=true&useUnicode=true&createDatabaseIfNotExist=true&characterEncoding=utf8&&serverTimezone=GMT%2b8&useSSL=false&rewriteBatchedStatements=true

5、主配置类调整(按表分区)

import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.batch.MyBatisPagingItemReader;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.step.skip.AlwaysSkipItemSkipPolicy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.HashMap;
import java.util.Map;

// 1. 主配置类调整(按表分区)
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private SqlSessionFactory sqlSessionFactory;

    /**
     * 主任务
     *
     * @return
     */
    @Bean("updateWaterCodeJob")
    public Job updateWaterCodeJob(
            @Qualifier("masterStep") Step masterStep
    ) {
        return jobBuilderFactory.get("updateWaterCodeJob")
                .start(masterStep)
                .build();
    }

    @Bean("masterStep")
    public Step masterStep(
            @Qualifier("updateBatchTableData") Step updateBatchTableData,
            @Qualifier("multiTablePartitioner") MultiTablePartitioner multiTablePartitioner
    ) {
        return stepBuilderFactory.get("masterStep")
                .partitioner(updateBatchTableData.getName(), multiTablePartitioner) // 分区器按表名分区一个表一个分区
                .step(updateBatchTableData)
                .gridSize(10) // 按表分区了 并发数一般设置为核心数
                .taskExecutor(taskExecutor())
                .listener(new BatchJobListener())
                .build();
    }

    // 线程池配置(核心线程数=表数量)
    @Bean("batchTaskExecutor")
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setThreadNamePrefix("table-processor-");
        return executor;
    }

    /**
     * 处理分页数据更新步骤
     * @return
     */
    @Bean("updateBatchTableData")
    public Step updateBatchTableData(
            @Qualifier("dynamicTableReader") MyBatisPagingItemReader<Map<String, Object>> myBatisPagingItemReader,
            @Qualifier("batchUpdateWriter") BatchUpdateWriter batchUpdateWriter,
            @Qualifier("tableProcessor") TableProcessor tableProcessor

    ) {
        return stepBuilderFactory.get("updateBatchTableData")
                .<Map<String, Object>, Map<String, Object>>chunk(100)
                .reader(myBatisPagingItemReader)
                .processor(tableProcessor)
                .writer(batchUpdateWriter)
                .faultTolerant()
                .skipPolicy(new AlwaysSkipItemSkipPolicy())
                .build();
    }


    /**
     * 分页获取需要更新的表数据
     * @return
     */
    @Bean
    @StepScope
    public MyBatisPagingItemReader<Map<String, Object>> dynamicTableReader(
            @Value("#{stepExecutionContext['keyId']}") String keyId, //需要更新的表ID字段
            @Value("#{stepExecutionContext['waterCodeColumn']}") String waterCodeColumn,// 需要更新的流域字段
            @Value("#{stepExecutionContext['lonColumn']}") String lonColumn,// 经度纬度字段
            @Value("#{stepExecutionContext['latColumn']}") String latColumn,// 经度纬度字段
            @Value("#{stepExecutionContext['tableName']}") String tableName // 需要更新的表名
            ) {

        MyBatisPagingItemReader<Map<String, Object>> reader = new MyBatisPagingItemReader<>();
        reader.setSqlSessionFactory(sqlSessionFactory);
        reader.setQueryId("com.bigdatacd.panorama.system.mapper.UpdateTableMapper.selectUpdateTableByPage");
        Map<String,Object> param = new HashMap<>();
        param.put("keyId",keyId);
        param.put("waterCodeColumn",waterCodeColumn);
        param.put("lonColumn",lonColumn);
        param.put("latColumn",latColumn);
        param.put("tableName",tableName);
        reader.setParameterValues(param);
        reader.setPageSize(2000);
        return reader;
    }

}


import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.jdbc.core.namedparam.SqlParameterSource;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;

// 批量更新Writer
@Component("batchUpdateWriter")
@StepScope
public class BatchUpdateWriter implements ItemWriter<Map<String, Object>> {

    @Autowired
    private NamedParameterJdbcTemplate jdbcTemplate;

    @Override
    public void write(List<? extends Map<String, Object>> items) {
        // 构造动态sql更新数据
        StringBuilder sb = new StringBuilder();
        sb.append("UPDATE ");
        sb.append((String) items.get(0).get("tableName"));
        sb.append(" SET ");
        sb.append((String) items.get(0).get("waterCodeColumn"));
        sb.append(" = :waterCode");
        sb.append(" WHERE ");
        sb.append((String) items.get(0).get("keyId"));
        sb.append(" = :pkId");

        jdbcTemplate.batchUpdate(sb.toString(), items.stream()
                .map(item -> new MapSqlParameterSource()
                        .addValue("waterCode", item.get("waterCode"))
                        .addValue("tableName", item.get("tableName"))
                        .addValue("waterCodeColumn", item.get("waterCodeColumn"))
                        .addValue("keyId", item.get("keyId"))
                        .addValue("pkId", item.get("pkId"))
                )
                .toArray(SqlParameterSource[]::new));
    }
}

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Component
@Slf4j
public class MultiTablePartitioner implements Partitioner {

    private final DataSource dataSource;

    public MultiTablePartitioner(DataSource dataSource) {
        this.dataSource = dataSource;
    }
    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
        String sql = "SELECT key_id as keyId,water_code_column as waterCodeColumn,lon_column as lonColumn,lat_column as latColumn,page_sql as pageSql,table_name as tableName FROM ads_t_sys_batch_update_table where deleted = 0 and data_status = '0'";
        List<Map<String,Object>> tables = jdbcTemplate.queryForList(sql);
        log.info("查询" + sql);
        Map<String, ExecutionContext> partitions = new HashMap<>();
        for (int i = 0; i < tables.size(); i++) {
            ExecutionContext ctx = new ExecutionContext();
            // 将需要传递的参数放到上下文中,用于动态批量更新的sql用
            ctx.putString("keyId", String.valueOf(tables.get(i).get("keyId")));
            ctx.putString("waterCodeColumn", String.valueOf(tables.get(i).get("waterCodeColumn")));
            ctx.putString("lonColumn", String.valueOf(tables.get(i).get("lonColumn")));
            ctx.putString("latColumn", String.valueOf(tables.get(i).get("latColumn")));
            //ctx.putString("pageSql", String.valueOf(tables.get(i).get("pageSql")));
            ctx.putString("tableName", String.valueOf(tables.get(i).get("tableName")));
            partitions.put("partition" + i, ctx);
        }
        return partitions;
    }
}


import com.bigdatacd.panorama.common.utils.GeoJsonUtil;
import lombok.Builder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;

import java.util.Map;

// 处理数据的经纬度所在流域
@Component("tableProcessor")
@Builder
public class TableProcessor implements ItemProcessor<Map<String, Object>, Map<String, Object>> {

    @Override
    public Map<String, Object> process(Map<String, Object> item) {
        // 处理数据经纬度查找对应的流域
        item.put("waterCode", GeoJsonUtil.getWaterCode(Double.parseDouble(item.get("lon").toString()), Double.parseDouble(item.get("lat").toString())));
        return item;
    }
}


import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;

/**
 * Job 监听
 */
public class BatchJobListener implements JobExecutionListener {

    private long beingTime;
    private long endTime;

    @Override
    public void beforeJob(JobExecution jobExecution) {
        beingTime = System.currentTimeMillis();
        System.out.println(jobExecution.getJobInstance().getJobName() + "  beforeJob...... " + beingTime);
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        endTime = System.currentTimeMillis();
        System.out.println(jobExecution.getJobInstance().getJobName() + "  afterJob...... " + endTime);
        System.out.println(jobExecution.getJobInstance().getJobName()  + "一共耗耗时:【" + (endTime - beingTime) + "】毫秒");
    }

}


 6、通过经纬度计算流域工具类

import lombok.extern.slf4j.Slf4j;
import org.geotools.feature.FeatureCollection;
import org.geotools.feature.FeatureIterator;
import org.geotools.geojson.feature.FeatureJSON;
import org.geotools.geometry.jts.JTSFactoryFinder;
import org.locationtech.jts.geom.*;
import org.opengis.feature.Feature;
import org.opengis.feature.Property;

import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;


/**
 * @Description: GeoJSON工具类
 * @author: Mr.xulong
 * @date: 2023年01月09日 14:39
 */
@Slf4j
public class GeoJsonUtil {

    /*public static void main(String[] args) {
        try {
            FeatureCollection featureCollection = getFeatureCollection("sichuanliuyu.json");
            double x = 106.955085;
            double y = 32.09546061139062;
            System.out.println(JSON.toJSONString(properties(x,y,featureCollection)));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }*/
    private static String geoJsonFilePath = "sichuanliuyu.json";

    private GeoJsonUtil() {
    }
    /**
     * 获取区域数据集合
     *
     * @return
     */
    public static FeatureCollection getFeatureCollection() {
        // 读取 GeoJson 文件
        InputStream resourceAsStream = GeoJsonUtil.class.getResourceAsStream("/json/" + geoJsonFilePath);
        FeatureJSON featureJSON = new FeatureJSON();
        try {
            return featureJSON.readFeatureCollection(resourceAsStream);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 判断指定区域集合是否包含某个点
     * @param longitude
     * @param latitude
     * @param featureCollection
     * @return
     */
    public static boolean contains(double longitude, double latitude, FeatureCollection featureCollection) {
        FeatureIterator features = featureCollection.features();
        try {
            while (features.hasNext()) {
                Feature next = features.next();
                if (isContains(longitude, latitude, next)) {
                    return true;
                }
            }
        } finally {
            features.close();
        }
        return false;
    }

    /**
     * 判断指定区域集合是否包含某个点,如果包含,则返回所需属性
     * @param longitude
     * @param latitude
     * @param featureCollection
     * @return
     */
    public static Map<String, Object> properties(double longitude, double latitude, FeatureCollection featureCollection) {
        FeatureIterator features = featureCollection.features();
        try {
            while (features.hasNext()) {
                Feature next = features.next();
                boolean contains = isContains(longitude, latitude, next);
                // 如果点在面内则返回所需属性
                if (contains) {
                    HashMap<String, Object> properties = new HashMap<>();
                    properties.put("waterCode", next.getProperty("FID").getValue());
                    properties.put("waterName", next.getProperty("name").getValue());
                    return properties;
                }
            }
        } finally {
            features.close();
        }
        return null;
    }

    /**
     * 判断指定区域集合是否包含某个点,如果包含,则返回所需属性
     * @param longitude
     * @param latitude
     * @return
     */
    public static Map<String, Object> properties(double longitude, double latitude) {
        FeatureCollection featureCollection = getFeatureCollection();
        FeatureIterator features = featureCollection.features();
        try {
            while (features.hasNext()) {
                Feature next = features.next();
                boolean contains = isContains(longitude, latitude, next);
                // 如果点在面内则返回所需属性
                if (contains) {
                    HashMap<String, Object> properties = new HashMap<>();
                    properties.put("waterCode", next.getProperty("FID").getValue());
                    properties.put("waterName", next.getProperty("name").getValue());
                    return properties;
                }
            }
        } finally {
            features.close();
        }
        return null;
    }

    /**
     * 判断指定区域集合是否包含某个点,如果包含,则返回所需属性
     * @param longitude
     * @param latitude
     * @return
     */
    public static String getWaterCode(double longitude, double latitude) {
        FeatureCollection featureCollection = getFeatureCollection();
        FeatureIterator features = featureCollection.features();
        try {
            while (features.hasNext()) {
                Feature next = features.next();
                boolean contains = isContains(longitude, latitude, next);
                // 如果点在面内则返回所需属性
                if (contains) {
                    return String.valueOf(next.getProperty("FID").getValue());
                }
            }
        } finally {
            features.close();
        }
        return null;
    }

    private static boolean isContains(double longitude, double latitude, Feature feature) {
        // 获取边界数据
        Property geometry = feature.getProperty("geometry");
        Object value = geometry.getValue();
        // 创建坐标的point
        GeometryFactory geometryFactory = JTSFactoryFinder.getGeometryFactory();
        Point point = geometryFactory.createPoint(new Coordinate(longitude, latitude));
        boolean contains = false;
        // 判断是单面还是多面
        if (value instanceof MultiPolygon) {
            MultiPolygon multiPolygon = (MultiPolygon) value;
            contains = multiPolygon.contains(point);
        } else if (value instanceof Polygon) {
            Polygon polygon = (Polygon) value;
            contains = polygon.contains(point);
        }
        return contains;
    }
}

7、地图依赖

<geotools.version>27-SNAPSHOT</geotools.version>
<!--地图-->
        <dependency>
            <groupId>org.geotools</groupId>
            <artifactId>gt-shapefile</artifactId>
            <version>${geotools.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.geotools</groupId>
                    <artifactId>gt-main</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.geotools</groupId>
            <artifactId>gt-main</artifactId>
            <version>${geotools.version}</version>
        </dependency>
        <dependency>
            <groupId>org.geotools</groupId>
            <artifactId>gt-geojson</artifactId>
            <version>${geotools.version}</version>
        </dependency>
        <dependency>
            <groupId>org.geotools</groupId>
            <artifactId>gt-swing</artifactId>
            <version>${geotools.version}</version>
        </dependency>


<repositories>
    <repository>
        <id>osgeo</id>
        <name>OSGeo Release Repository</name>
        <url>https://repo.osgeo.org/repository/release/</url>
        <snapshots><enabled>false</enabled></snapshots>
        <releases><enabled>true</enabled></releases>
    </repository>
    <repository>
        <id>osgeo-snapshot</id>
        <name>OSGeo Snapshot Repository</name>
        <url>https://repo.osgeo.org/repository/snapshot/</url>
        <snapshots><enabled>true</enabled></snapshots>
        <releases><enabled>false</enabled></releases>
    </repository>
    </repositories>

参考git项目 springbatch: 这是一个SpringBoot开发的SpringBatch批处理示例,示例主要是将文件30W条数据使用多线程导入到t_cust_temp表,然后又将t_cust_temp表数据使用分片导入到t_cust_info表。下载即可用。