@RestController
@RequestMapping("/rule")
public class RoleController {
@Autowired
private TRoleService tRoleService;
/* @Autowired
private KafkaTemplate kafkaTemplate;
@Autowired
private RabbitService rabbitService;*/
@Autowired
ThreadPoolExecutor executor;
@Autowired
private DataSource dataSource;
@Autowired
private PlatformTransactionManager transactionManager;
@Autowired
MultiThreadingTransactionManager multiThreadingTransactionManager;
@RequestMapping("test")
public String test(){
List<TRole> tRoles = new ArrayList<>();
for (int i = 0; i < 30; i++) {
TRole tRole = new TRole();
tRole.setName("刘港"+i);
tRoles.add(tRole);
}
List<List<TRole>> partition = ListUtils.partition(tRoles, 10);
List<Runnable> runnableList = new ArrayList<>();
for (List<TRole> roles : partition) {
runnableList.add(() -> {
try {
if (roles != null && !roles.isEmpty()) {
boolean result = tRoleService.saveBatch(roles);
if (!result) {
// 处理保存失败的情况
throw new RuntimeException("批量保存角色信息失败");
}
}
} catch (Exception e) {
// 记录异常日志或进行其他异常处理
}
});
}
boolean isSuccess = multiThreadingTransactionManager.execute(runnableList,null);
System.out.println(isSuccess);
return "ok";
};
package com.atguigu.yygh.cmn.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@Component
@Slf4j
public class MultiThreadingTransactionManager {
/**
* 数据源事务管理器
*/
@Autowired
private DataSourceTransactionManager dataSourceTransactionManager;
@Autowired
private ThreadPoolTaskExecutor executorService;
private long timeout = 120;
public boolean execute(List<Runnable> runnableList, List<String> factorySchema) {
/**
* 用于判断子线程业务是否处理完成
* 处理完成时threadCountDownLatch的值为0
*/
CountDownLatch mainCountDownLatch = new CountDownLatch(1);
/**
* 用于等待子线程全部完成后,子线程统一进行提交和回滚
* 进行提交和回滚时mainCountDownLatch的值为0
*/
CountDownLatch threadCountDownLatch = new CountDownLatch(runnableList.size());;
/**
* 是否提交事务,默认是true,当子线程有异常发生时,设置为false,回滚事务
*/
AtomicBoolean isSubmit = new AtomicBoolean(true);
for(int i =0;i<runnableList.size();i++){
int finalI=i;
executorService.execute(() ->{
log.info("子线程: [" + Thread.currentThread().getName() + "]");
// 判断别的子线程是否已经出现错误,错误别的线程已经出现错误,那么所有的都要回滚,这个子线程就没有必要执行了
if (!isSubmit.get()) {
log.info("整个事务中有子线程执行失败需要回滚, 子线程: [" + Thread.currentThread().getName() + "] 终止执行");
// 计数器减1,代表该子线程执行完毕
threadCountDownLatch.countDown();
return;
}
// SchemaContextHolder.setSchema(factorySchema.get(finalI));
// 开启事务
DefaultTransactionDefinition defaultTransactionDefinition = new DefaultTransactionDefinition();
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(defaultTransactionDefinition);
try {
if (finalI==2){
throw new RuntimeException("1111");
}
// 执行业务逻辑
runnableList.get(finalI).run();
} catch (Exception exception) {
// 发生异常需要进行回滚,设置isSubmit为false
isSubmit.set(false);
log.info("子线程: [" + Thread.currentThread().getName() + "]执行业务发生异常,异常为: " + exception.getMessage());
throw new RuntimeException(exception.getMessage());
} finally {
// 计数器减1,代表该子线程执行完毕
threadCountDownLatch.countDown();
}
try {
// 等待主线程执行
mainCountDownLatch.await();
} catch (Exception exception) {
log.info("子线程: [" + Thread.currentThread().getName() + "]等待提交或回滚异常,异常为: " + exception.getMessage());
throw new RuntimeException(exception.getMessage());
}
try {
// 提交
if (isSubmit.get()) {
dataSourceTransactionManager.commit(transactionStatus);
log.info("子线程: [" + Thread.currentThread().getName() + "]进行事务提交");
} else {
dataSourceTransactionManager.rollback(transactionStatus);
log.info("子线程: [" + Thread.currentThread().getName() + "]进行事务回滚");
}
} catch (Exception exception) {
log.info("子线程: [" + Thread.currentThread().getName() + "]进行事务提交或回滚出现异常,异常为:" + exception.getMessage());
throw new RuntimeException(exception.getMessage());
}
});
}
// 等待子线程全部执行完毕
try {
// 若计数器变为零了,则返回 true
boolean isFinish = threadCountDownLatch.await(timeout, TimeUnit.SECONDS);
if (!isFinish) {
// 如果还有为执行完成的就回滚
isSubmit.set(false);
log.info("存在子线程在预期时间内未执行完毕,任务将全部回滚");
}
} catch (Exception exception) {
log.info("主线程发生异常,异常为: " + exception.getMessage());
throw new RuntimeException(exception.getMessage());
} finally {
// 计数器减1,代表该主线程执行完毕
mainCountDownLatch.countDown();
}
// 返回结果,是否执行成功,事务提交即为执行成功,事务回滚即为执行失败
return isSubmit.get();
}
}
spring 使用多线程,保证事务一致性_spring 多线程事务-CSDN博客