spring事务源码解析

发布于:2024-12-18 ⋅ 阅读:(5) ⋅ 点赞:(0)

1 引入

在企业级应用开发中,事务管理 是确保数据一致性和完整性的重要手段。而在 Spring 框架中,事务管理提供了高度抽象和灵活的实现,开发者只需通过简单的注解或配置即可轻松实现复杂的事务逻辑。然而,Spring 事务背后的实现机制却涉及一系列底层原理和设计模式,如 AOP(面向切面编程)、代理模式,以及事务传播特性等。

本文将从源码层面深入剖析 Spring 事务的核心机制,揭示它是如何通过 PlatformTransactionManager、AOP 和代理对象协作,实现对事务的透明管理。通过解析源码,我们不仅可以理解 Spring 事务的运行原理,还能掌握其扩展和优化的方式,从而更高效地应对复杂场景下的事务管理需求。

2 入口

@EnableTransactionManagement

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
// 引入核心类
@Import(TransactionManagementConfigurationSelector.class)
public @interface EnableTransactionManagement {
}

// 进入TransactionManagementConfigurationSelector
protected String[] selectImports(AdviceMode adviceMode) {
    switch (adviceMode) {
       case PROXY:
          // 默认是PROXY
          // 注册ProxyTransactionManagementConfiguration
          return new String[] {AutoProxyRegistrar.class.getName(),
                ProxyTransactionManagementConfiguration.class.getName()};
       case ASPECTJ:
          // 表示不用动态代理技术,用ASPECTJ技术,比较麻烦了
          return new String[] {determineTransactionAspectClass()};
       default:
          return null;
    }
}

// 进入ProxyTransactionManagementConfiguration
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {

    @Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    // aop中的advisor
    public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor(
          TransactionAttributeSource transactionAttributeSource, TransactionInterceptor transactionInterceptor) {

       BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
       advisor.setTransactionAttributeSource(transactionAttributeSource);
       advisor.setAdvice(transactionInterceptor);
       if (this.enableTx != null) {
          advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
       }
       return advisor;
    }

    @Bean
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    // 解析@transaction注解
    public TransactionAttributeSource transactionAttributeSource() {
       // AnnotationTransactionAttributeSource中定义了一个Pointcut
       // 并且AnnotationTransactionAttributeSource可以用来解析@Transactional注解,并得到一个RuleBasedTransactionAttribute对象
       return new AnnotationTransactionAttributeSource();
    }

    @Bean
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public TransactionInterceptor transactionInterceptor(TransactionAttributeSource transactionAttributeSource) {
       // 增强的逻辑
       TransactionInterceptor interceptor = new TransactionInterceptor();
       interceptor.setTransactionAttributeSource(transactionAttributeSource);
       if (this.txManager != null) {
          interceptor.setTransactionManager(this.txManager);
       }
       return interceptor;
    }

}

3 切点的判断

核心类:TransactionAttributeSourcePointcut

private class TransactionAttributeSourceClassFilter implements ClassFilter {

    @Override
    public boolean matches(Class<?> clazz) {
       if (TransactionalProxy.class.isAssignableFrom(clazz) ||
             TransactionManager.class.isAssignableFrom(clazz) ||
             PersistenceExceptionTranslator.class.isAssignableFrom(clazz)) {
          return false;
       }
       TransactionAttributeSource tas = getTransactionAttributeSource();
       // 匹配class
       return (tas == null || tas.isCandidateClass(clazz));
    }
}

public boolean isCandidateClass(Class<?> targetClass) {
    for (TransactionAnnotationParser parser : this.annotationParsers) {
       if (parser.isCandidateClass(targetClass)) {
          return true;
       }
    }
    return false;
}

// 主要匹配是否被@Transactional注解标注
public boolean isCandidateClass(Class<?> targetClass) {
    return AnnotationUtils.isCandidateClass(targetClass, Transactional.class);
}

4 切面的增强代码

核心类:TransactionInterceptor

核心方法:invoke

    public Object invoke(MethodInvocation invocation) throws Throwable {
//     System.out.println("执行事务");

       // Work out the target class: may be {@code null}.
       // The TransactionAttributeSource should be passed the target class
       // as well as the method, which may be from an interface.
       Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

       // Adapt to TransactionAspectSupport's invokeWithinTransaction...
       return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
          @Override
          @Nullable
          public Object proceedWithInvocation() throws Throwable {
             // 执行后续的Interceptor,以及被代理的方法
             return invocation.proceed(); // test() sql
          }
          @Override
          public Object getTarget() {
             return invocation.getThis();
          }
          @Override
          public Object[] getArguments() {
             return invocation.getArguments();
          }
       });
    }
    
    
    protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
       final InvocationCallback invocation) throws Throwable {

    // If the transaction attribute is null, the method is non-transactional.
    // TransactionAttribute就是@Transactional中的配置
    TransactionAttributeSource tas = getTransactionAttributeSource();
    // 获取@Transactional注解中的属性值
    final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);

    // 返回Spring容器中类型为TransactionManager的Bean对象
    final TransactionManager tm = determineTransactionManager(txAttr);

    // ReactiveTransactionManager用得少,并且它只是执行方式是响应式的,原理流程和普通的是一样的
    if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
       boolean isSuspendingFunction = KotlinDetector.isSuspendingFunction(method);
       boolean hasSuspendingFlowReturnType = isSuspendingFunction &&
             COROUTINES_FLOW_CLASS_NAME.equals(new MethodParameter(method, -1).getParameterType().getName());
       if (isSuspendingFunction && !(invocation instanceof CoroutinesInvocationCallback)) {
          throw new IllegalStateException("Coroutines invocation not supported: " + method);
       }
       CoroutinesInvocationCallback corInv = (isSuspendingFunction ? (CoroutinesInvocationCallback) invocation : null);

       ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
          Class<?> reactiveType =
                (isSuspendingFunction ? (hasSuspendingFlowReturnType ? Flux.class : Mono.class) : method.getReturnType());
          ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(reactiveType);
          if (adapter == null) {
             throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
                   method.getReturnType());
          }
          return new ReactiveTransactionSupport(adapter);
       });

       InvocationCallback callback = invocation;
       if (corInv != null) {
          callback = () -> CoroutinesUtils.invokeSuspendingFunction(method, corInv.getTarget(), corInv.getArguments());
       }
       Object result = txSupport.invokeWithinTransaction(method, targetClass, callback, txAttr, (ReactiveTransactionManager) tm);
       if (corInv != null) {
          Publisher<?> pr = (Publisher<?>) result;
          return (hasSuspendingFlowReturnType ? KotlinDelegate.asFlow(pr) :
                KotlinDelegate.awaitSingleOrNull(pr, corInv.getContinuation()));
       }
       return result;
    }

    // 把tm强制转换为PlatformTransactionManager,所以我们在定义时得定义PlatformTransactionManager类型
    PlatformTransactionManager ptm = asPlatformTransactionManager(tm);

    // joinpoint的唯一标识,就是当前在执行的方法名字
    final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

    // CallbackPreferringPlatformTransactionManager表示拥有回调功能的PlatformTransactionManager,也不常用
    if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
       // Standard transaction demarcation with getTransaction and commit/rollback calls.
       // 如果有必要就创建事务,这里就涉及到事务传播机制的实现了
       // TransactionInfo表示一个逻辑事务,比如两个逻辑事务属于同一个物理事务
       TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

       Object retVal;
       try {
          // This is an around advice: Invoke the next interceptor in the chain.
          // This will normally result in a target object being invoked.
          // 执行下一个Interceptor或被代理对象中的方法
          retVal = invocation.proceedWithInvocation(); // test() sql
       }
       catch (Throwable ex) {
          // target invocation exception
          // 抛异常了,则回滚事务,或者
          completeTransactionAfterThrowing(txInfo, ex);
          throw ex;
       }
       finally {
          cleanupTransactionInfo(txInfo);
       }

       if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
          // Set rollback-only in case of Vavr failure matching our rollback rules...
          TransactionStatus status = txInfo.getTransactionStatus();
          if (status != null && txAttr != null) {
             retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
          }
       }

       // 提交事务
       commitTransactionAfterReturning(txInfo);
       return retVal;
    }

    else {
       Object result;
       final ThrowableHolder throwableHolder = new ThrowableHolder();

       // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
       try {
          result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {
             TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);
             try {
                Object retVal = invocation.proceedWithInvocation();
                if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
                   // Set rollback-only in case of Vavr failure matching our rollback rules...
                   retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
                }
                return retVal;
             }
             catch (Throwable ex) {
                if (txAttr.rollbackOn(ex)) {
                   // A RuntimeException: will lead to a rollback.
                   if (ex instanceof RuntimeException) {
                      throw (RuntimeException) ex;
                   }
                   else {
                      throw new ThrowableHolderException(ex);
                   }
                }
                else {
                   // A normal return value: will lead to a commit.
                   throwableHolder.throwable = ex;
                   return null;
                }
             }
             finally {
                cleanupTransactionInfo(txInfo);
             }
          });
       }
       catch (ThrowableHolderException ex) {
          throw ex.getCause();
       }
       catch (TransactionSystemException ex2) {
          if (throwableHolder.throwable != null) {
             logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
             ex2.initApplicationException(throwableHolder.throwable);
          }
          throw ex2;
       }
       catch (Throwable ex2) {
          if (throwableHolder.throwable != null) {
             logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
          }
          throw ex2;
       }

       // Check result state: It might indicate a Throwable to rethrow.
       if (throwableHolder.throwable != null) {
          throw throwableHolder.throwable;
       }
       return result;
    }
}


protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
       @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {

    // If no name specified, apply method identification as transaction name.
    if (txAttr != null && txAttr.getName() == null) {
       txAttr = new DelegatingTransactionAttribute(txAttr) {
          @Override
          public String getName() {
             return joinpointIdentification;
          }
       };
    }

    // 每个逻辑事务都会创建一个TransactionStatus,但是TransactionStatus中有一个属性代表当前逻辑事务底层的物理事务是不是新的
    TransactionStatus status = null;
    if (txAttr != null) {
       if (tm != null) {
          //
          status = tm.getTransaction(txAttr);
       }
       else {
          if (logger.isDebugEnabled()) {
             logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
                   "] because no transaction manager has been configured");
          }
       }
    }

    // 返回一个TransactionInfo对象,表示得到了一个事务,可能是新创建的一个事务,也可能是拿到的已有的事务
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}


public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
       throws TransactionException {

    // Use defaults if no transaction definition given.
    TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());

    // 得到一个新的DataSourceTransactionObject对象
    // new DataSourceTransactionObject  txObject
    Object transaction = doGetTransaction();
    boolean debugEnabled = logger.isDebugEnabled();

    // transaction.getConnectionHolder().isTransactionActive()
    // 如果存在事务
    if (isExistingTransaction(transaction)) {
       // Existing transaction found -> check propagation behavior to find out how to behave.
       return handleExistingTransaction(def, transaction, debugEnabled);
    }

    // Check definition settings for new transaction.
    if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
       throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
    }

    // No existing transaction found -> check propagation behavior to find out how to proceed.
    if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
       throw new IllegalTransactionStateException(
             "No existing transaction found for transaction marked with propagation 'mandatory'");
    }
    // 在当前Thread中没有事务的前提下,以下三个是等价的
    else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
          def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
          def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
       // 没有事务需要挂起,不过TransactionSynchronization有可能需要挂起
       // suspendedResources表示当前线程被挂起的资源持有对象(数据库连接、TransactionSynchronization)
       SuspendedResourcesHolder suspendedResources = suspend(null);
       if (debugEnabled) {
          logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
       }
       try {
          // 开启事务后,transaction中就会有数据库连接了,并且isTransactionActive为true
          // 并返回TransactionStatus对象,该对象保存了很多信息,包括被挂起的资源
          return startTransaction(def, transaction, debugEnabled, suspendedResources);
       }
       catch (RuntimeException | Error ex) {
          resume(null, suspendedResources);
          throw ex;
       }
    }
    else {
       // Create "empty" transaction: no actual transaction, but potentially synchronization.
       if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
          logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                "isolation level will effectively be ignored: " + def);
       }
       boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
       return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
    }
}

5 总结

  1. 首先EnableTransactionManagement会自动装配一个advisor对方法进行增强

  2. advisor的切点是被@transaction注解标注的方法 

  3. advisor的切面是在执行方法前进行增强 首先是判断当前线程是否存在数据库连接 初次执行一般都是不存在 就会创建一个新的数据库连接 并且将连接存储到threadlocal 然后执行对应的方法 如果方法执行完成 那么从threadlocal获取数据库连接提交事务 出现异常就会滚事务

  4. 如果方法之间涉及传播级别 以常见的传播级别为例REQUIRED和REQUIRES_NEW

    1. REQUIRES_NEW:如果在执行a方法的过程中 会调用另一个方法b 并且b方法的传播级别为REQUIRES_NEW 那么在执行b方法之前 会将threadlocal中的数据库连接等信息拿出来 放到一个挂起对象中 然后创建一个新的数据库连接存储到threadlocal 执行完成后 清楚threadlocal信息 将挂起对象的信息存储至threadlocal继续执行a方法

    2. REQUIRED:如果在执行a方法的过程中 会调用另一个方法b 并且b方法的传播级别为REQUIRED 那么执行b方法时会和a方法公用同一个数据库连接 方法处于同一个事务中