分布式事务Seata源码解析11:全局事务执行流程之两阶段全局事务提交(手把手带你debug源码)

发布于:2022-11-01 ⋅ 阅读:(509) ⋅ 点赞:(0)

一、前言

更多内容见Seata专栏:https://blog.csdn.net/saintmm/category_11953405.html

至此,seata系列的内容包括:

  1. can not get cluster name in registry config ‘service.vgroupMapping.xx‘, please make sure registry问题解决
  2. Seata Failed to get available servers: endpoint format should like ip:port 报错原因/解决方案汇总版(看完本文必解决问题)
  3. Seata json decode exception, Cannot construct instance of java.time.LocalDateTime报错原因/解决方案最全汇总版
  4. 【微服务 31】超细的Spring Cloud 整合Seata实现分布式事务(排坑版)
  5. 【微服务 32】Spring Cloud整合Seata、Nacos实现分布式事务案例(巨细排坑版)【云原生】
  6. 【微服务33】分布式事务Seata源码解析一:在IDEA中启动Seata Server
  7. 【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么
  8. 【微服务35】分布式事务Seata源码解析三:从Spring Boot特性来看Seata Client 启动时都做了什么
  9. 【微服务36】分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信
  10. 【微服务37】分布式事务Seata源码解析五:@GlobalTransactional如何开启全局事务
  11. 【微服务38】分布式事务Seata源码解析六:全局/分支事务分布式ID如何生成?序列号超了怎么办?时钟回拨问题如何处理?
  12. 【微服务39】分布式事务Seata源码解析七:图解Seata事务执行流程之开启全局事务
  13. 分布式事务Seata源码解析八:本地事务执行流程(AT模式下)
  14. 分布式事务Seata源码解析九:分支事务如何注册到全局事务
  15. 分布式事务Seata源码解析十:AT模式回滚日志undo log详细构建过程

Seata最核心的全局事务执行流程,上文我们聊了undo log的详细构建过程,本文接着聊全局事务的提交?

二、全局事务提交

分布式事务Seata源码解析七:图解Seata事务执行流程之开启全局事务一文详细讲了全局事务的执行流程。

1、Seata Client端(TM)

全局事务的执行体现在TransactionTemplate#execute(TransactionalExecutor)方法中:

在这里插入图片描述

当全局事务和所有的分支事务都运行无误时,会调用commitTransaction()方法提交全局事务;

在这里插入图片描述

默认全局事务提交前、提交后的钩子函数没有任何实现,如果想在全局事务提交前、提交后做一些扩展操作,可以通过实现TransactionHook接口,并调用TransactionHookManager#registerHook()方法注册钩子;

public final class TransactionHookManager {

    private TransactionHookManager() {

    }

    private static final ThreadLocal<List<TransactionHook>> LOCAL_HOOKS = new ThreadLocal<>();

    /**
     * get the current hooks
     *
     * @return TransactionHook list
     * @throws IllegalStateException IllegalStateException
     */
    public static List<TransactionHook> getHooks() throws IllegalStateException {
        List<TransactionHook> hooks = LOCAL_HOOKS.get();

        if (hooks == null || hooks.isEmpty()) {
            return Collections.emptyList();
        }
        return Collections.unmodifiableList(hooks);
    }

    /**
     * add new hook
     *
     * @param transactionHook transactionHook
     */
    public static void registerHook(TransactionHook transactionHook) {
        if (transactionHook == null) {
            throw new NullPointerException("transactionHook must not be null");
        }
        List<TransactionHook> transactionHooks = LOCAL_HOOKS.get();
        if (transactionHooks == null) {
            LOCAL_HOOKS.set(new ArrayList<>());
        }
        LOCAL_HOOKS.get().add(transactionHook);
    }

    /**
     * clear hooks
     */
    public static void clear() {
        LOCAL_HOOKS.remove();
    }
}

全局事务的提交由GlobalTransaction#commit()方法触发:

在这里插入图片描述

在Seata中,GlobalTransaction接口仅有一个实现:DefaultGlobalTransaction,其重写的commit()方法如下:

@Override
public void commit() throws TransactionException {
    if (role == GlobalTransactionRole.Participant) {
        // Participant has no responsibility of committing
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid);
        }
        return;
    }
    assertXIDNotNull();
    // 提交全局事务失败重试次数,默认为5,可通过`client.tm.commitRetryCount=5`配置
    int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
    try {
        while (retry > 0) {
            try {
                retry--;
                // 全局事务提交,走进DefaultTransactionManager
                status = transactionManager.commit(xid);
                break;
            } catch (Throwable ex) {
                LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
                if (retry == 0) {
                    throw new TransactionException("Failed to report global commit", ex);
                }
            }
        }
    } finally {
        if (xid.equals(RootContext.getXID())) {
            suspend();
        }
    }
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("[{}] commit status: {}", xid, status);
    }
}

seata client发起全局事务提交的流程如下:

  1. 首先事务的角色必须是发起者Launcher,不然做执行具体的全局事务提交操作;
  2. 全局事务必须要有自己的唯一标识:xid,否则断言抛出异常:IllegalStateException
  3. 以最多重试五次(可通过client.tm.commitRetryCount=5配置)的方式,调用事务管理器TransactionManager的commit()方法,基于全局事务xid封装一个GlobalCommitRequest请求通过netty发送到seata server(TC)。

1)事务角色必须是Launcher

在这里插入图片描述

事务角色有两种:Launcher(全局事务发起者)、Participant(全局事务参与者)

public enum GlobalTransactionRole {

    /**
     * The Launcher.
     */
    // The one begins the current global transaction.
    Launcher,

    /**
     * The Participant.
     */
    // The one just joins into a existing global transaction.
    Participant
}

2)全局事务必须要有xid

private void assertXIDNotNull() {
    if (xid == null) {
        throw new IllegalStateException();
    }
}

如果全局事务不存在唯一标识xid,断言抛出异常:IllegalStateException

3)发送GlobalCommitRequest到TC

在这里插入图片描述

此处提供异常重试机制,重试次数默认为5,可通过client.tm.commitRetryCount=5配置;

由事务管理器TransactionManager负责处理全局事务的提交:

  • 基于全局事务xid封装一个GlobalCommitRequest请求通过netty发送到seata server(TC)。

在这里插入图片描述

在Seata全局事务的发起者TM层面仅有一个实现DefaultTransactionManager

@Override
public GlobalStatus commit(String xid) throws TransactionException {
    GlobalCommitRequest globalCommit = new GlobalCommitRequest();
    globalCommit.setXid(xid);
    // 将全局事务提交请求发送给seata-server
    GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
    return response.getGlobalStatus();
}

TM和TC的通信机制

见博文:分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信

全局事务发起者TM,通过netty和TC进行网络通信;其中包括对seata-server集群的负载均衡,在获取到相应seata-server实例对应的channel之后,会进步处理请求的发送和相应结果的接收。

在这里插入图片描述

在写Channel之前,channelWritableCheck()方法会检查channel是否可写。

TM / RM 和TC的RPC通信均是异步进行的:

  • TM / RM 发送请求时,将封装了CompletableFuture的MessageFuture放到futures(ConcurrentHashMap<Integer, MessageFuture>)中;
  • TC处理完请求之后,会通过netty框架发送响应到TM / RM 的AbstractNettyRemoting中,其再将futures中的MessageFuture完成,发送请求的代码段中messageFuture.get()会获取到返回值,停止阻塞。

TM发送请求之后,下面接着看TC如何接收请求,如何处理请求?

2、Seata Server端(TC)

1)TM接收请求

【微服务36】分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信一文中,我们聊了Seata Client 如何和Seata Server建立连接、通信;

又在【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么一文中,我们知道了TC(Seata Server)启动之后,AbstractNettyRemotingServer的内部类ServerHandler负责接收并处理请求。

在这里插入图片描述
ServerHandler类上有个@ChannelHandler.Sharable注解,其表示所有的连接都会共用这一个ChannelHandler;所以当消息处理很慢时,会降低并发。

processMessage(ctx, (RpcMessage) msg)方法中会根据消息类型获取到 请求处理组件(消息的处理过程是典型的策略模式),如果消息对应的处理器设置了线程池,则放到线程池中执行;如果对应的处理器没有设置线程池,则直接执行;如果某条消息处理特别慢,会严重影响并发;所以在seata-server中大部分处理器都有对应的线程池。

/**
 * Rpc message processing.
 *
 * @param ctx        Channel handler context.
 * @param rpcMessage rpc message.
 * @throws Exception throws exception process message error.
 * @since 1.3.0
 */
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
    }
    Object body = rpcMessage.getBody();
    if (body instanceof MessageTypeAware) {
        MessageTypeAware messageTypeAware = (MessageTypeAware) body;
        // 根据消息的类型获取到请求处理组件和请求处理线程池组成的Pair
        final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
        if (pair != null) {
            // 如果消息对应的处理器设置了线程池,则放到线程池中执行
            if (pair.getSecond() != null) {
                try {
                    pair.getSecond().execute(() -> {
                        try {
                            pair.getFirst().process(ctx, rpcMessage);
                        } catch (Throwable th) {
                            LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                        } finally {
                            MDC.clear();
                        }
                    });
                } catch (RejectedExecutionException e) {
                    // 线程池拒绝策略之一,抛出异常:RejectedExecutionException
                    LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),
                        "thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
                    if (allowDumpStack) {
                        String name = ManagementFactory.getRuntimeMXBean().getName();
                        String pid = name.split("@")[0];
                        long idx = System.currentTimeMillis();
                        try {
                            String jstackFile = idx + ".log";
                            LOGGER.info("jstack command will dump to " + jstackFile);
                            Runtime.getRuntime().exec(String.format("jstack %s > %s", pid, jstackFile));
                        } catch (IOException exx) {
                            LOGGER.error(exx.getMessage());
                        }
                        allowDumpStack = false;
                    }
                }
            } else {
                // 对应的处理器没有设置线程池,则直接执行;如果某条消息处理特别慢,会严重影响并发;
                try {
                    pair.getFirst().process(ctx, rpcMessage);
                } catch (Throwable th) {
                    LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                }
            }
        } else {
            LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
        }
    } else {
        LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
    }
}

Seata Serer接收到请求的执行链路为:

在这里插入图片描述

2)TM处理请求GlobalCommitRequest

TM发送提交事务请求时的RPCMessage的body为GlobalCommitRequest

public class GlobalCommitRequest extends AbstractGlobalEndRequest {
    @Override
    public short getTypeCode() {
        return MessageType.TYPE_GLOBAL_COMMIT;
    }

    @Override
    public AbstractTransactionResponse handle(RpcContext rpcContext) {
        return handler.handle(this, rpcContext);
    }
}

又由于在Seata接收请求的执行链的一部分 DefaultCoordinator#onRequest()方法中,将DefaultCoordinator自身绑定到了AbstractTransactionRequestToTChandler属性中:

在这里插入图片描述

所以实际执行的handle()方法是DefaultCoordinator的handle()方法,而DefaultCoordinator并没有重写其父类AbstractTCInboundHandler的handle()方法,所以进入到AbstractTCInboundHandler的handle()方法:

在这里插入图片描述

AbstractExceptionHandler#exceptionHandleTemplate()方法只是会运行方法的入参Callback,即会进入到–> doGlobalCommit() 方法

在这里插入图片描述

这里直接调用DefaultCore的commit()方法;

3)DefaultCore执行提交全局事务的业务逻辑

DefaultCore中封装了AT、TCC、Saga、XA分布式事务模式的具体实现类;

DefaultCore#commit()方法为提交全局事务的业务逻辑,方法的入参仅有一个xid(全局事务的唯一标识);

@Override
public GlobalStatus commit(String xid) throws TransactionException {
    // 根据xid从存储介质(store.mode)中查询出全局事务信息、以及全局事务关联的全部分支事务信息
    GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
    if (globalSession == null) {
        return GlobalStatus.Finished;
    }
    globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
    // just lock changeStatus

    boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
        // 仅当全局事务状态为Begin的时候,才能提交
        if (globalSession.getStatus() == GlobalStatus.Begin) {
            // Highlight: Firstly, close the session, then no more branch can be registered.
            globalSession.closeAndClean();
            // 所有分支事务的模式为AT 或 事务状态为TCC模式的一阶段,可以异步提交全局事务
            if (globalSession.canBeCommittedAsync()) {
                globalSession.asyncCommit();
                // 性能指标监控,有需要再关注
                MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Committed, false, false);
                return false;
            } else {
                globalSession.changeGlobalStatus(GlobalStatus.Committing);
                return true;
            }
        }
        return false;
    });

    // AT模式下shouldCommit为false;
    if (shouldCommit) {
        boolean success = doGlobalCommit(globalSession, false);
        //If successful and all remaining branches can be committed asynchronously, do async commit.
        if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {
            globalSession.asyncCommit();
            return GlobalStatus.Committed;
        } else {
            return globalSession.getStatus();
        }
    } else {
        // 如果全局事务的状态为AsyncCommitting,则返回GlobalStatus.Committed;否则返回全局事务此时的真实状态。
        return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();
    }
}

提交全局事务逻辑如下:

  1. 根据xid从存储介质(比如:store.mode=db)中查询出全局事务信息、以及全局事务关联的全部分支事务信息;如果找不到相应的全局事务,则直接返回GlobalStatus.Finished,表示全局事务已经被完成 或 全局事务不存在。
  2. 根据SeataServer数据存储模式(File、DB、Redis)选择不同的SessionManager,进而执行相应SessionManager的lockAndExecute()方法,比如博主使用的数据存储模式是DB、所以对应的SessionManagerDataBaseSessionManager;但无论是哪种SessionManager本质上都是要运行lockAndExecute()方法入参中的GlobalSession.LockCallable,区别在于,File的方式会先给全局事务会话上把锁(ReentrantLock)。
    PS:这里的是用烂了设计模式搭配方式:工厂 + 策略 + 模板方法。
  3. 仅当全局事务状态为Begin的时候,才能提交;
  4. 先将全局事务的状态失活;然后如果事务模式是AT模式,则释放占用当前全局事务占用的全局锁,即根据全局事务xid从lock_table中删除数据。
  5. 接着,判断当前全局事务是否可以异步提交,仅当所有分支事务的模式都为AT 或 事务状态为一阶段提交失败,可以异步提交全局事务。
    所谓异步提交,实际是将全局事务的状态更新为异步提交中(AsyncCommitting);
    在Seata Server启动的时候会初始化DefaultCoordinator,进而启动一个定时任务:默认定时任务每次(每秒运行一次)只会处理100个异步提交请求(DB存储模式下可通过store.db.queryLimit=100参数配置)在每次执行定时任务时,需要获取分布式全局锁(lockKey = AsyncCommitting,lockValue = ip:port(ip和port为tc的)),防止某一个全局事务的异步提交操作执行多次!
  6. 如果当前全局事务不可以异步提交,则同步提交;然而全局事务异步提交请求的处理方式和同步提交是一样的:
    • SAGA模式走自己特定的逻辑(聊saga的时候再细讲),其余的–AT、TCC、XA均走以下逻辑:
    • 走APP内存层面获取到全局事务所有的分支事务:
      • 针对每个分支事务构建分支事务提交请求BranchCommitRequest发送到RM,RM清空分支事务的undo_log数据;
      • 分支事务提交成功后,如果返回的状态是二阶段提交完成PhaseTwo_Committed,将分支事务数据从branch_table中移除;
    • 分支事务全部提交之后,将全局事务从global_table中移除。

这里的信息量比较大,有很多内容在DefaultCore#commit()方法中并没有体现出来。下面博主就展开给大家聊一下。

1、从存储介质中查询全局事务、关联的所有分支事务

GlobalSession globalSession = SessionHolder.findGlobalSession(xid);

在前面的文章我们聊过,SessionHolder是各种会话管理器SessionManager的持有者,我们通过其实现对GlobalSession、BranchSession的操作。

在这里插入图片描述
SessionHolder.findGlobalSession()方法会从存储介质(可以是MySQL)中查询出全局事务和全局事务关联的所有分支事务信息;

就MySQL而言,这里会做两次SQL查询操作;

在这里插入图片描述

SQL分别为:

1> 查询全局事务SQL:

select xid, transaction_id, status, application_id, transaction_service_group, transaction_name, timeout, begin_time, application_data, gmt_create, gmt_modified  from global_table where xid = ?

2> 查询全局事务关联的所有分支事务SQL:

select xid, transaction_id, branch_id, resource_group_id, resource_id, branch_type, status, client_id, application_data, gmt_create, gmt_modified  from branch_table where xid = ? order by gmt_create asc

获取到全局事务 和 全局事务关联的所有分支事务之后,将全局事务信息转换为GlobalSession,然后再将所有的分支事务转换为BranchSession、并关联到GlobalSession中。

在这里插入图片描述

如果找不到相应的全局事务,则直接返回GlobalStatus.Finished,表示全局事务已经被完成 或 全局事务不存在。

2、关闭全局事务、清理全局锁信息

根据SeataServer数据存储模式(File、DB、Redis)选择不同的SessionManager,进而执行相应SessionManager的lockAndExecute()方法,比如博主使用的数据存储模式是DB、所以对应的SessionManagerDataBaseSessionManager;但无论是哪种SessionManager本质上都是要运行lockAndExecute()方法入参中的GlobalSession.LockCallable,区别在于,File的方式会先给全局事务会话上把锁(ReentrantLock),以保证写文件的有序性。

当全局事务状态为Begin的时候,才能提交;

提交之前会先将全局事务的状态失活;然后如果事务模式是AT模式,则释放占用当前全局事务占用的全局锁,即根据全局事务xid从lock_table中删除数据。

在这里插入图片描述

1> 全局事务会话失活流程:

在这里插入图片描述

此处的SessionLifecycleListener是在上层DefaultCore#commit()方法中添加的:

在这里插入图片描述

2> 清理全局锁流程:

首先判断全局事务会话中是否包含事务模式的AT的分支事务:

在这里插入图片描述

如果含有,则将全局事务占用的全局锁全局释放,其实就是根据全局事务xid从lock_table中删除数据。

在这里插入图片描述

本质上就是执行一条DELETE类型的SQL语句:

delete from lock_table where xid = ? 

3、判断全局事务是否可以异步提交

判断当前全局事务是否可以异步提交,仅当所有分支事务的模式都为AT 或 事务状态为一阶段提交失败,可以异步提交全局事务。

public boolean canBeCommittedAsync() {
    List<BranchSession> branchSessions = getBranchSessions();
    for (BranchSession branchSession : branchSessions) {
        // 只有所有的分支事务的模式都为AT模式 或 事务状态为一阶段提交失败,才能异步提交全局事务
        if (!branchSession.canBeCommittedAsync()) {
            return false;
        }
    }
    return true;
}

// BranchSession类中
public boolean canBeCommittedAsync() {
    return branchType == BranchType.AT || status == BranchStatus.PhaseOne_Failed;
}

就本文示例而言,所有的分支事务模式都是AT模式,所以会触发异步提交全局事务。

4、全局事务异步提交

所谓异步提交,实际是将全局事务的状态更新为异步提交中(AsyncCommitting);

另外:在Seata Server启动的时候会初始化DefaultCoordinator,进而启动一个定时任务:默认定时任务每次(每秒运行一次)只会处理100个异步提交请求(DB存储模式下可通过store.db.queryLimit=100参数配置)

1> 更新全局事务状态为AsyncCommitting

在这里插入图片描述

最终体现其实就是一条更新全局事务的SQL:

update global_table   set status = 8,       gmt_modified = now() where xid = ?

状态为8,表示全局事务此时处理异步提交中。

这里只是将全局事务状态更为为异步提交中,但没有真正的提交事务。

2> DefaultCoordinator中定时任务处理异步提交的全局事务

在Seata Server启动的时候会初始化DefaultCoordinator,进而启动一个定时任务:默认定时任务每次(每秒运行一次)只会处理100个异步提交请求(DB存储模式下可通过store.db.queryLimit=100参数配置)

在这里插入图片描述

在每次执行定时任务时,需要先获取分布式全局锁(lockKey = AsyncCommitting,lockValue = ip:port(ip和port为tc的)),防止某一个全局事务的异步提交操作执行多次!

在这里插入图片描述

此处分布式锁的释放是一个惰性策略,并不会主动删除数据,而是在每次获取分布式锁的时候进行必要的更新,每次获取分布式锁的逻辑为:

  1. 先从DB中以当前读的方式查询出异步提交全局事务的分布式锁;SQL为:

    SELECT lock_key,lock_value,expire FROM distributed_lock_table WHERE lock_key = `AsyncCommitting` FOR UPDATE;
    
  2. 如果不存在分布式锁,则插入一个新的分布式锁;SQL为:

    INSERT INTO distributed_lock_table(lock_key,lock_value,expire) VALUES (`AsyncCommitting`, ?, ?)
    
  3. 如果存在分布式锁,并且还没有过期,则表明当前获取锁失败,不能执行异步提交全局事务;

  4. 如果其他人获取到的锁已经过期,此时需要更新分布式锁的信息;SQL为:

    UPDATE distributed_lock_table set lock_value=?, expire=? WHERE lock_key=`AsyncCommitting`
    

handleAsyncCommitting()方法处理全局事务异步提交请求:

protected void handleAsyncCommitting() {

    // 默认获取100条(DB存储模式下可通过`store.db.queryLimit=100`参数配置) 状态为AsyncCommitting的全局事务,走一次DB SQL查询操作
    SessionCondition sessionCondition = new SessionCondition(GlobalStatus.AsyncCommitting);
    Collection<GlobalSession> asyncCommittingSessions =
            SessionHolder.getAsyncCommittingSessionManager().findGlobalSessions(sessionCondition);
    if (CollectionUtils.isEmpty(asyncCommittingSessions)) {
        return;
    }
    SessionHelper.forEach(asyncCommittingSessions, asyncCommittingSession -> {
        try {
            asyncCommittingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
            // 全局事务异步提交
            core.doGlobalCommit(asyncCommittingSession, true);
        } catch (TransactionException ex) {
            LOGGER.error("Failed to async committing [{}] {} {}", asyncCommittingSession.getXid(), ex.getCode(), ex.getMessage(), ex);
        }
    });
}

异步提交全局事务只做了两件事:

  • 首先默认获取100条(DB存储模式下可通过store.db.queryLimit=100参数配置) 状态为AsyncCommitting的全局事务(会把关联的所有分支事务一起查出来),会走DB执行两次SQL查询操作:

    select xid, transaction_id, status, application_id, transaction_service_group, transaction_name, timeout, begin_time, application_data, gmt_create, gmt_modified  from global_table where status in(`AsyncCommitting`) order by gmt_modified limit 100
    
    select xid, transaction_id, branch_id, resource_group_id, resource_id, branch_type, status, client_id, application_data, gmt_create, gmt_modified  from branch_table where xid in (?) order by gmt_create asc
    
  • 通过封装了AT、TCC、Saga、XA分布式事务模式具体实现的DefaultCore处理全局事务提交;这里的逻辑和全局事务同步提交是一致的,所以我们放在下面来看。

5、全局事务同步提交

如果当前全局事务不可以异步提交,则同步提交;然而全局事务异步提交请求的处理方式和同步提交是一样的,都体现在DefaultCore#doGlobalCommit()方法:

@Override
public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
    boolean success = true;
    // start committing event
    MetricsPublisher.postSessionDoingEvent(globalSession, retrying);

    if (globalSession.isSaga()) {
        success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);
    } else {
        // 获取到全局事务所有的分支事务
        Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {
            // if not retrying, skip the canBeCommittedAsync branches
            if (!retrying && branchSession.canBeCommittedAsync()) {
                return CONTINUE;
            }

            BranchStatus currentStatus = branchSession.getStatus();
            if (currentStatus == BranchStatus.PhaseOne_Failed) {
                SessionHelper.removeBranch(globalSession, branchSession, !retrying);
                return CONTINUE;
            }
            try {
                // 分支事务的提交
                BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);
                if (isXaerNotaTimeout(globalSession,branchStatus)) {
                    LOGGER.info("Commit branch XAER_NOTA retry timeout, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
                    branchStatus = BranchStatus.PhaseTwo_Committed;
                }
                switch (branchStatus) {
                    case PhaseTwo_Committed:
                        // 事务的两阶段完成(即:分支事务提交完成),将分支事务数据从branch_table中移除。
                        SessionHelper.removeBranch(globalSession, branchSession, !retrying);
                        return CONTINUE;
                    case PhaseTwo_CommitFailed_Unretryable:
                        //not at branch
                        SessionHelper.endCommitFailed(globalSession, retrying);
                        LOGGER.error("Committing global transaction[{}] finally failed, caused by branch transaction[{}] commit failed.", globalSession.getXid(), branchSession.getBranchId());
                        return false;

                    default:
                        if (!retrying) {
                            globalSession.queueToRetryCommit();
                            return false;
                        }
                        if (globalSession.canBeCommittedAsync()) {
                            LOGGER.error("Committing branch transaction[{}], status:{} and will retry later",
                                branchSession.getBranchId(), branchStatus);
                            return CONTINUE;
                        } else {
                            LOGGER.error(
                                "Committing global transaction[{}] failed, caused by branch transaction[{}] commit failed, will retry later.", globalSession.getXid(), branchSession.getBranchId());
                            return false;
                        }
                }
            } catch (Exception ex) {
                StackTraceLogger.error(LOGGER, ex, "Committing branch transaction exception: {}",
                    new String[] {branchSession.toString()});
                if (!retrying) {
                    globalSession.queueToRetryCommit();
                    throw new TransactionException(ex);
                }
            }
            return CONTINUE;
        });
        // Return if the result is not null
        // todo 你以为分支事务都提交完,result会返回点内容??,实际上TMD 的 CONTINUE常量值是null,所以会继续往下走。
        if (result != null) {
            return result;
        }
        //If has branch and not all remaining branches can be committed asynchronously,
        //do print log and return false
        if (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) {
            LOGGER.info("Committing global transaction is NOT done, xid = {}.", globalSession.getXid());
            return false;
        }
        if (!retrying) {
            //contains not AT branch
            globalSession.setStatus(GlobalStatus.Committed);
        }
    }
    // if it succeeds and there is no branch, retrying=true is the asynchronous state when retrying. EndCommitted is
    // executed to improve concurrency performance, and the global transaction ends..
    if (success && globalSession.getBranchSessions().isEmpty()) {
        // 分支事务全部提交之后,将全局事务从global_table中移除。
        SessionHelper.endCommitted(globalSession, retrying);
        LOGGER.info("Committing global transaction is successfully done, xid = {}.", globalSession.getXid());
    }
    return success;
}

流程可以概括为:

  • SAGA模式走自己特定的逻辑(聊saga的时候再细讲),其余的–AT、TCC、XA均走以下逻辑:
  • 走APP内存层面获取到全局事务所有的分支事务:
  • 针对每个分支事务构建分支事务提交请求BranchCommitRequest交给RM,RM清空分支事务的undo_log数据;
  • 分支事务提交成功后,如果返回的状态是二阶段提交完成PhaseTwo_Committed,将分支事务数据从branch_table中移除;
  • 分支事务全部提交之后,将全局事务从global_table中移除。
1> 走APP内存获取到全局事务所有的分支事务

无论是异步提交全局事务、还是同步提交全局事务,它们调用到DefaultCore#doGlobalCommit()时,传入的全局事务会话都是包含分支事务信息的。

2> 每个分支事务发送BranchCommitRequest到RM,清空undo_log日志

遍历全局事务的每个分支事务,构建分支事务提交请求BranchCommitRequest交给TC自己其他业务线程处理;其他线程负责清空分支事务的undo_log数据;

(1)发送分支事务提交请求BranchCommitRequest到RM:
在这里插入图片描述
在这里插入图片描述

(2)RM处理分支事务提交请求BranchCommitRequest:

>> 1 接收到请求的部分链路如下(具体TC和RM/TM的交互可以参考:分布式事务Seata源码解析九:分支事务如何注册到全局事务):

在这里插入图片描述

>> 2 RM处理分支事务提交:

在这里插入图片描述

分支事务的提交委托给异步工作者AsyncWorker

private void addToCommitQueue(Phase2Context context) {
    // 将二阶段上下文添加到一个BlockingQueue中;queue的长度默认为1000,可通过(`client.rm.asyncCommitBufferLimit=10000`配置)
    if (commitQueue.offer(context)) {
        return;
    }
    CompletableFuture.runAsync(this::doBranchCommitSafely, scheduledExecutor)
            .thenRun(() -> addToCommitQueue(context));
}

AsyncWorker将通过分支事务转换的两阶段上下文Phase2Context添加到一个阻塞队列BlockingQueue中,queue的长度默认为1000;如果队列满了,无法添加Phase2Context,则紧急的执行分支事务提交doBranchCommit(),分支事务提交之后再将Phase2Context添加到阻塞队列BlockingQueue中。

>> 3 RM正式分支事务提交:
在这里插入图片描述
异步工作者AsyncWroker实例化的时候会延时10ms启动一个每秒执行一次的定时任务去从从commitQueue中获取两阶段上下文 对分支事务进行提交。

提交分支事务的方式统一为doBranchCommitSafely()

在这里插入图片描述

最终其实就是1000个Phase2Context一组,分组删除相应分支事务的undo_log:

  • 有个注意点:如果没有获取到数据库资源,则将分支事务提交请求再次添加会commitQueue中;

  • 批量删除undo_log的SQL如下:

    DELETE FROM undo_log WHERE  branch_id IN(?,?,?) AND xid IN(?,?,?)
    

在这里插入图片描述

3> 分支事务提交成功后,删除分支事务数据

在这里插入图片描述

分支事务提交成功后,如果返回的状态是二阶段提交完成PhaseTwo_Committed,将分支事务数据从branch_table中移除(此时,RM还没有将undo_log删除,具体删除时间点由AsyncWorker中每秒执行一次的定时任务决定。);

在这里插入图片描述

删除分支事务数据是同步执行的,而从DB中删除分支事务数据的操作由SessionLifecycleListener触发,执行方法onRemoveBranch()

在每次执行DefaultCore#doGlobalCommit()方法提交全局事务时,其上游调用方法都会通过以下方式,添加一个SessionLifecycleListener,就store.mode=db而言,SessionLifecycleListenerDataBaseSessionManager

committingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());

删除分支事务的SQL如下:

delete from branch_table where xid = ?   and branch_id = ?
4> 分支事务全部提交之后,删除全局事务数据

分支事务全部提交之后,将全局事务从global_table中移除;

在这里插入图片描述

当且仅当所有的分支事务全部提交完毕之后,才会将全局事务数据删除;

在这里插入图片描述

删除全局事务数据的SQL如下:

delete from global_table where xid = ?

三、总结和后续

seata client发起全局事务提交的流程如下:

  1. 首先事务的角色必须是发起者Launcher,不然做执行具体的全局事务提交操作;
  2. 全局事务必须要有自己的唯一标识:xid,否则断言抛出异常:IllegalStateException
  3. 以最多重试五次(可通过client.tm.commitRetryCount=5配置)的方式,调用事务管理器TransactionManager的commit()方法,基于全局事务xid封装一个GlobalCommitRequest请求通过netty发送到seata server(TC)。

TC接收到TM提交全局事务的请求之后,会交给DefaultCore来处理提交全局事务逻辑:

提交全局事务逻辑如下:

  1. 根据xid从存储介质(比如:store.mode=db)中查询出全局事务信息、以及全局事务关联的全部分支事务信息;如果找不到相应的全局事务,则直接返回GlobalStatus.Finished,表示全局事务已经被完成 或 全局事务不存在。
  2. 根据SeataServer数据存储模式(File、DB、Redis)选择不同的SessionManager,进而执行相应SessionManager的lockAndExecute()方法,比如博主使用的数据存储模式是DB、所以对应的SessionManagerDataBaseSessionManager;但无论是哪种SessionManager本质上都是要运行lockAndExecute()方法入参中的GlobalSession.LockCallable,区别在于,File的方式会先给全局事务会话上把锁(ReentrantLock)。
    PS:这里的是用烂了设计模式搭配方式:工厂 + 策略 + 模板方法。
  3. 仅当全局事务状态为Begin的时候,才能提交;
  4. 先将全局事务的状态失活;然后如果事务模式是AT模式,则释放占用当前全局事务占用的全局锁,即根据全局事务xid从lock_table中删除数据。
  5. 接着,判断当前全局事务是否可以异步提交,仅当所有分支事务的模式都为AT 或 事务状态为一阶段提交失败,可以异步提交全局事务。
    所谓异步提交,实际是将全局事务的状态更新为异步提交中(AsyncCommitting);
    在Seata Server启动的时候会初始化DefaultCoordinator,进而启动一个定时任务:默认定时任务每次(每秒运行一次)只会处理100个异步提交请求(DB存储模式下可通过store.db.queryLimit=100参数配置)在每次执行定时任务时,需要获取分布式全局锁(lockKey = AsyncCommitting,lockValue = ip:port(ip和port为tc的)),防止某一个全局事务的异步提交操作执行多次!
  6. 如果当前全局事务不可以异步提交,则同步提交;然而全局事务异步提交请求的处理方式和同步提交是一样的:
    • SAGA模式走自己特定的逻辑(聊saga的时候再细讲),其余的–AT、TCC、XA均走以下逻辑:
    • 走APP内存层面获取到全局事务所有的分支事务:
      • 针对每个分支事务构建分支事务提交请求BranchCommitRequest发送到RM,RM清空分支事务的undo_log数据;
      • 分支事务提交成功后,如果返回的状态是二阶段提交完成PhaseTwo_Committed,将分支事务数据从branch_table中移除;
    • 分支事务全部提交之后,将全局事务从global_table中移除。

下一篇文章,我们继续聊如果全局事务执行出现异常,全局事务是如何回滚的。


网站公告

今日签到

点亮在社区的每一天
去签到