分批处理工具类
博主自己的博客点击访问(内容大部分更新在自己的博客,有时间才会整理到CSDN)
- 有时候会遇到一些大批量数据成千上万的列表,如果单独一个循环处理,可能会很慢,或者是遇到如需要根据id in ()去数据库查询,可能会遇到参数上限,此时就需要分割列表分批去查询,所以闲着没事的时候写了个工具类用于处理这类情况。
- 这个工具类的使用需要对Function(四大函数式接口)以及Future类的了解。
package com.ruoyi.common.utils;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.function.Function;
/**
* 批量处理工具类(需要写Function)
*
* @author Zyyyyu
* @date 2024 11-23 00:25:23
*/
public class SplitBatchUtil {
/**
* Demo
*
* @param args
*/
public static void main(String[] args) {
List<String> list = new ArrayList<>();
/** 初始化模拟数据 */
for (int i = 0; i < 10000; i++) {
list.add(i + ":" + "test");
}
/** 分批处理时需要执行的逻辑 */
Function<List<String>, List<String>> function = ((l) -> {
List<String> strings = new ArrayList<>();
l.stream().forEach(item -> {
String[] split = item.split(":");
strings.add(split[0]);
});
System.out.println(strings);
return strings;
});
/** 调用分批处理,第二个参数指的是每批多少条数据,第三个参数指的是是否开启多线程 */
splitBatch(list, 10, true, function);
}
/**
* @param list 需要处理的数据
* @param batchSize 每批数据量
* @param isBatchProcess 是否开启多线程处理
* @param function 处理函数
*/
public static <T, R> void splitBatch(List<T> list, Integer batchSize, Boolean isBatchProcess, Function<List<T>, R> function) {
/** 被分割的列表不能为空 */
Objects.requireNonNull(list, "list cannot be null");
if (list.isEmpty()) {
return;
}
// 默认每批数据量
final int DEFAULT_BATCH_SIZE = 50;
// 批处理每批数据量
if (null == batchSize || batchSize <= 0) {
batchSize = DEFAULT_BATCH_SIZE;
}
// 待处理数据量
int listSize = list.size();
if (listSize <= batchSize) {
// 直接处理
function.apply(list);
} else {
// 计算需要的批数
int batchNum = listSize / batchSize;
if (listSize % batchSize != 0) {
batchNum += 1;
}
// 判断是否开启Future多线程批处理
if (!isBatchProcess) {
for (int i = 0; i < batchNum; i++) {
System.out.println("第" + (i + 1) + "批数据");
int begin = i * batchSize;
int end = Math.min((i + 1) * batchSize, listSize);
List<T> subList = extractBatch(list, begin, end);
// 执行处理函数
function.apply(subList);
}
} else {
/**
* 创建固定大小的线程池
* TODO 如果不需要开启多线程 可以将这个分支的逻辑进行删除
*/
ExecutorService executorService = Executors.newFixedThreadPool(5);
/**
* Future是通过多线程处理后将结果进行返回
* 所以List<Future<T>> 的 T 是需要返回的数据类型
*/
List<Future<T>> futures = new ArrayList<>();
for (int i = 0; i < batchNum; i++) {
int begin = i * batchSize;
int end = Math.min((i + 1) * batchSize, listSize);
// 这里是提交任务
Future<T> future = executorService.submit(new BatchProcessor(list, begin, end, i + 1, function));
futures.add(future);
}
// 收集结果
for (Future<T> future : futures) {
try {
T subList = future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
executorService.shutdown(); // 关闭线程池
}
}
}
/**
* 提取指定范围的数据
*
* @param list
* @param start
* @param end
* @return
*/
private static <T> List<T> extractBatch(List<T> list, int start, int end) {
List<T> batch = new ArrayList<>(end - start);
for (int i = start; i < end; i++) {
batch.add(list.get(i));
}
return batch;
}
/**
* 批处理任务
*/
private static class BatchProcessor<T, R> implements Callable<List<T>> {
private final List<T> list;
private final int start;
private final int end;
private final int batchNumber;
private final Function<List<T>, R> function;
public BatchProcessor(List<T> list, int start, int end, int batchNumber, Function<List<T>, R> function) {
this.list = list;
this.start = start;
this.end = end;
this.batchNumber = batchNumber;
this.function = function;
}
/**
* 执行批处理任务
* 在这里写要执行的业务逻辑
*
* @return
*/
@Override
public List<T> call() {
List<T> subList = extractBatch(list, start, end);
System.out.println("线程:" + Thread.currentThread().getName() + " 第" + batchNumber + "批数据:");
function.apply(subList);
return subList;
}
}
}
注意事项
Future一般用于先计算后才再处理结果,这里结果我直接忽略了,还没补充完整,所以有需要的可以自己补充