捣鼓小玩意-分批处理工具类

发布于:2024-12-19 ⋅ 阅读:(13) ⋅ 点赞:(0)

分批处理工具类

博主自己的博客点击访问(内容大部分更新在自己的博客,有时间才会整理到CSDN)

  • 有时候会遇到一些大批量数据成千上万的列表,如果单独一个循环处理,可能会很慢,或者是遇到如需要根据id in ()去数据库查询,可能会遇到参数上限,此时就需要分割列表分批去查询,所以闲着没事的时候写了个工具类用于处理这类情况。
  • 这个工具类的使用需要对Function(四大函数式接口)以及Future类的了解。
package com.ruoyi.common.utils;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.function.Function;

/**
 * 批量处理工具类(需要写Function)
 *
 * @author Zyyyyu
 * @date 2024 11-23 00:25:23
 */
public class SplitBatchUtil {


    /**
     * Demo
     *
     * @param args
     */
    public static void main(String[] args) {
        List<String> list = new ArrayList<>();
	/** 初始化模拟数据 */
        for (int i = 0; i < 10000; i++) {
            list.add(i + ":" + "test");
        }
	/** 分批处理时需要执行的逻辑 */
        Function<List<String>, List<String>> function = ((l) -> {
            List<String> strings = new ArrayList<>();

            l.stream().forEach(item -> {
                String[] split = item.split(":");
                strings.add(split[0]);
            });
            System.out.println(strings);
            return strings;
        });
	/** 调用分批处理,第二个参数指的是每批多少条数据,第三个参数指的是是否开启多线程 */
        splitBatch(list, 10, true, function);
    }


    /**
     * @param list           需要处理的数据
     * @param batchSize      每批数据量
     * @param isBatchProcess 是否开启多线程处理
     * @param function       处理函数
     */
    public static <T, R> void splitBatch(List<T> list, Integer batchSize, Boolean isBatchProcess, Function<List<T>, R> function) {
	/** 被分割的列表不能为空 */
        Objects.requireNonNull(list, "list cannot be null");
        if (list.isEmpty()) {
            return;
        }
        // 默认每批数据量
        final int DEFAULT_BATCH_SIZE = 50;
        // 批处理每批数据量
        if (null == batchSize || batchSize <= 0) {
            batchSize = DEFAULT_BATCH_SIZE;
        }
        // 待处理数据量
        int listSize = list.size();
        if (listSize <= batchSize) {
            // 直接处理
            function.apply(list);
        } else {
            // 计算需要的批数
            int batchNum = listSize / batchSize;
            if (listSize % batchSize != 0) {
                batchNum += 1;
            }
            // 判断是否开启Future多线程批处理
            if (!isBatchProcess) {
                for (int i = 0; i < batchNum; i++) {
                    System.out.println("第" + (i + 1) + "批数据");
                    int begin = i * batchSize;
                    int end = Math.min((i + 1) * batchSize, listSize);
                    List<T> subList = extractBatch(list, begin, end);
                    // 执行处理函数
                    function.apply(subList);
                }
            } else {
                /**
                 * 创建固定大小的线程池
                 * TODO 如果不需要开启多线程 可以将这个分支的逻辑进行删除
                 */
                ExecutorService executorService = Executors.newFixedThreadPool(5);
                /**
                 *  Future是通过多线程处理后将结果进行返回
                 *  所以List<Future<T>> 的 T 是需要返回的数据类型
                 */
                List<Future<T>> futures = new ArrayList<>();

                for (int i = 0; i < batchNum; i++) {
                    int begin = i * batchSize;
                    int end = Math.min((i + 1) * batchSize, listSize);
                    // 这里是提交任务
                    Future<T> future = executorService.submit(new BatchProcessor(list, begin, end, i + 1, function));
                    futures.add(future);
                }

                // 收集结果
                for (Future<T> future : futures) {
                    try {
                        T subList = future.get();
                    } catch (InterruptedException | ExecutionException e) {
                        e.printStackTrace();
                    }
                }
                executorService.shutdown(); // 关闭线程池
            }
        }
    }

    /**
     * 提取指定范围的数据
     *
     * @param list
     * @param start
     * @param end
     * @return
     */
    private static <T> List<T> extractBatch(List<T> list, int start, int end) {
        List<T> batch = new ArrayList<>(end - start);
        for (int i = start; i < end; i++) {
            batch.add(list.get(i));
        }
        return batch;
    }

    /**
     * 批处理任务
     */
    private static class BatchProcessor<T, R> implements Callable<List<T>> {
        private final List<T> list;
        private final int start;
        private final int end;
        private final int batchNumber;
        private final Function<List<T>, R> function;

        public BatchProcessor(List<T> list, int start, int end, int batchNumber, Function<List<T>, R> function) {
            this.list = list;
            this.start = start;
            this.end = end;
            this.batchNumber = batchNumber;
            this.function = function;
        }

        /**
         * 执行批处理任务
         * 在这里写要执行的业务逻辑
         *
         * @return
         */
        @Override
        public List<T> call() {
            List<T> subList = extractBatch(list, start, end);
            System.out.println("线程:" + Thread.currentThread().getName() + " 第" + batchNumber + "批数据:");
            function.apply(subList);
            return subList;
        }
    }

}

注意事项

Future一般用于先计算后才再处理结果,这里结果我直接忽略了,还没补充完整,所以有需要的可以自己补充


网站公告

今日签到

点亮在社区的每一天
去签到