文章目录
一、前言
更多内容见Seata专栏:https://blog.csdn.net/saintmm/category_11953405.html
至此,seata系列的内容包括:
- can not get cluster name in registry config ‘service.vgroupMapping.xx‘, please make sure registry问题解决;
- Seata Failed to get available servers: endpoint format should like ip:port 报错原因/解决方案汇总版(看完本文必解决问题)
- Seata json decode exception, Cannot construct instance of java.time.LocalDateTime报错原因/解决方案最全汇总版
- 【微服务 31】超细的Spring Cloud 整合Seata实现分布式事务(排坑版)
- 【微服务 32】Spring Cloud整合Seata、Nacos实现分布式事务案例(巨细排坑版)【云原生】
- 【微服务33】分布式事务Seata源码解析一:在IDEA中启动Seata Server
- 【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么
- 【微服务35】分布式事务Seata源码解析三:从Spring Boot特性来看Seata Client 启动时都做了什么
- 【微服务36】分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信
- 【微服务37】分布式事务Seata源码解析五:@GlobalTransactional如何开启全局事务
- 【微服务38】分布式事务Seata源码解析六:全局/分支事务分布式ID如何生成?序列号超了怎么办?时钟回拨问题如何处理?
- 【微服务39】分布式事务Seata源码解析七:图解Seata事务执行流程之开启全局事务
- 分布式事务Seata源码解析八:本地事务执行流程(AT模式下)
- 分布式事务Seata源码解析九:分支事务如何注册到全局事务
- 分布式事务Seata源码解析十:AT模式回滚日志undo log详细构建过程
Seata最核心的全局事务执行流程,上文我们聊了undo log的详细构建过程,本文接着聊全局事务的提交?
二、全局事务提交
在 分布式事务Seata源码解析七:图解Seata事务执行流程之开启全局事务一文详细讲了全局事务的执行流程。
1、Seata Client端(TM)
全局事务的执行体现在TransactionTemplate#execute(TransactionalExecutor)
方法中:
当全局事务和所有的分支事务都运行无误时,会调用commitTransaction()
方法提交全局事务;
默认全局事务提交前、提交后的钩子函数没有任何实现,如果想在全局事务提交前、提交后做一些扩展操作,可以通过实现TransactionHook
接口,并调用TransactionHookManager#registerHook()
方法注册钩子;
public final class TransactionHookManager {
private TransactionHookManager() {
}
private static final ThreadLocal<List<TransactionHook>> LOCAL_HOOKS = new ThreadLocal<>();
/**
* get the current hooks
*
* @return TransactionHook list
* @throws IllegalStateException IllegalStateException
*/
public static List<TransactionHook> getHooks() throws IllegalStateException {
List<TransactionHook> hooks = LOCAL_HOOKS.get();
if (hooks == null || hooks.isEmpty()) {
return Collections.emptyList();
}
return Collections.unmodifiableList(hooks);
}
/**
* add new hook
*
* @param transactionHook transactionHook
*/
public static void registerHook(TransactionHook transactionHook) {
if (transactionHook == null) {
throw new NullPointerException("transactionHook must not be null");
}
List<TransactionHook> transactionHooks = LOCAL_HOOKS.get();
if (transactionHooks == null) {
LOCAL_HOOKS.set(new ArrayList<>());
}
LOCAL_HOOKS.get().add(transactionHook);
}
/**
* clear hooks
*/
public static void clear() {
LOCAL_HOOKS.remove();
}
}
全局事务的提交由GlobalTransaction#commit()方法触发:
在Seata中,GlobalTransaction
接口仅有一个实现:DefaultGlobalTransaction
,其重写的commit()方法如下:
@Override
public void commit() throws TransactionException {
if (role == GlobalTransactionRole.Participant) {
// Participant has no responsibility of committing
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid);
}
return;
}
assertXIDNotNull();
// 提交全局事务失败重试次数,默认为5,可通过`client.tm.commitRetryCount=5`配置
int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
try {
while (retry > 0) {
try {
retry--;
// 全局事务提交,走进DefaultTransactionManager
status = transactionManager.commit(xid);
break;
} catch (Throwable ex) {
LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
if (retry == 0) {
throw new TransactionException("Failed to report global commit", ex);
}
}
}
} finally {
if (xid.equals(RootContext.getXID())) {
suspend();
}
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("[{}] commit status: {}", xid, status);
}
}
seata client发起全局事务提交的流程如下:
- 首先事务的角色必须是发起者
Launcher
,不然做执行具体的全局事务提交操作;- 全局事务必须要有自己的唯一标识:xid,否则断言抛出异常:
IllegalStateException
;- 以最多重试五次(可通过
client.tm.commitRetryCount=5
配置)的方式,调用事务管理器TransactionManager
的commit()方法,基于全局事务xid封装一个GlobalCommitRequest
请求通过netty发送到seata server(TC)。
1)事务角色必须是Launcher
事务角色有两种:Launcher(全局事务发起者)、Participant(全局事务参与者)
public enum GlobalTransactionRole {
/**
* The Launcher.
*/
// The one begins the current global transaction.
Launcher,
/**
* The Participant.
*/
// The one just joins into a existing global transaction.
Participant
}
2)全局事务必须要有xid
private void assertXIDNotNull() {
if (xid == null) {
throw new IllegalStateException();
}
}
如果全局事务不存在唯一标识xid,断言抛出异常:IllegalStateException
。
3)发送GlobalCommitRequest到TC
此处提供异常重试机制,重试次数默认为5,可通过client.tm.commitRetryCount=5
配置;
由事务管理器TransactionManager
负责处理全局事务的提交:
- 基于全局事务xid封装一个
GlobalCommitRequest
请求通过netty发送到seata server(TC)。
在Seata全局事务的发起者TM层面仅有一个实现DefaultTransactionManager
:
@Override
public GlobalStatus commit(String xid) throws TransactionException {
GlobalCommitRequest globalCommit = new GlobalCommitRequest();
globalCommit.setXid(xid);
// 将全局事务提交请求发送给seata-server
GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
return response.getGlobalStatus();
}
TM和TC的通信机制
见博文:分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信。
全局事务发起者TM,通过netty和TC进行网络通信;其中包括对seata-server集群的负载均衡,在获取到相应seata-server实例对应的channel之后,会进步处理请求的发送和相应结果的接收。
在写Channel之前,channelWritableCheck()
方法会检查channel是否可写。
TM / RM 和TC的RPC通信均是异步进行的:
- TM / RM 发送请求时,将封装了
CompletableFuture
的MessageFuture放到futures
(ConcurrentHashMap<Integer, MessageFuture>)中;- TC处理完请求之后,会通过netty框架发送响应到TM / RM 的
AbstractNettyRemoting
中,其再将futures
中的MessageFuture完成,发送请求的代码段中messageFuture.get()
会获取到返回值,停止阻塞。
TM发送请求之后,下面接着看TC如何接收请求,如何处理请求?
2、Seata Server端(TC)
1)TM接收请求
在【微服务36】分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信一文中,我们聊了Seata Client 如何和Seata Server建立连接、通信;
又在【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么一文中,我们知道了TC(Seata Server)启动之后,AbstractNettyRemotingServer的内部类ServerHandler负责接收并处理请求。
ServerHandler类上有个@ChannelHandler.Sharable
注解,其表示所有的连接都会共用这一个ChannelHandler;所以当消息处理很慢时,会降低并发。
processMessage(ctx, (RpcMessage) msg)
方法中会根据消息类型获取到 请求处理组件(消息的处理过程是典型的策略模式),如果消息对应的处理器设置了线程池,则放到线程池中执行;如果对应的处理器没有设置线程池,则直接执行;如果某条消息处理特别慢,会严重影响并发;所以在seata-server中大部分处理器都有对应的线程池。
/**
* Rpc message processing.
*
* @param ctx Channel handler context.
* @param rpcMessage rpc message.
* @throws Exception throws exception process message error.
* @since 1.3.0
*/
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
}
Object body = rpcMessage.getBody();
if (body instanceof MessageTypeAware) {
MessageTypeAware messageTypeAware = (MessageTypeAware) body;
// 根据消息的类型获取到请求处理组件和请求处理线程池组成的Pair
final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
if (pair != null) {
// 如果消息对应的处理器设置了线程池,则放到线程池中执行
if (pair.getSecond() != null) {
try {
pair.getSecond().execute(() -> {
try {
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
} finally {
MDC.clear();
}
});
} catch (RejectedExecutionException e) {
// 线程池拒绝策略之一,抛出异常:RejectedExecutionException
LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),
"thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
if (allowDumpStack) {
String name = ManagementFactory.getRuntimeMXBean().getName();
String pid = name.split("@")[0];
long idx = System.currentTimeMillis();
try {
String jstackFile = idx + ".log";
LOGGER.info("jstack command will dump to " + jstackFile);
Runtime.getRuntime().exec(String.format("jstack %s > %s", pid, jstackFile));
} catch (IOException exx) {
LOGGER.error(exx.getMessage());
}
allowDumpStack = false;
}
}
} else {
// 对应的处理器没有设置线程池,则直接执行;如果某条消息处理特别慢,会严重影响并发;
try {
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
}
}
} else {
LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
}
} else {
LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
}
}
Seata Serer接收到请求的执行链路为:
2)TM处理请求GlobalCommitRequest
TM发送提交事务请求时的RPCMessage的body为GlobalCommitRequest
:
public class GlobalCommitRequest extends AbstractGlobalEndRequest {
@Override
public short getTypeCode() {
return MessageType.TYPE_GLOBAL_COMMIT;
}
@Override
public AbstractTransactionResponse handle(RpcContext rpcContext) {
return handler.handle(this, rpcContext);
}
}
又由于在Seata接收请求的执行链的一部分 DefaultCoordinator#onRequest()
方法中,将DefaultCoordinator
自身绑定到了AbstractTransactionRequestToTC
的handler
属性中:
所以实际执行的handle()方法是DefaultCoordinator的handle()方法,而DefaultCoordinator并没有重写其父类AbstractTCInboundHandler
的handle()方法,所以进入到AbstractTCInboundHandler
的handle()方法:
AbstractExceptionHandler#exceptionHandleTemplate()
方法只是会运行方法的入参Callback
,即会进入到–> doGlobalCommit() 方法
:
这里直接调用DefaultCore的commit()方法;
3)DefaultCore执行提交全局事务的业务逻辑
DefaultCore中封装了AT、TCC、Saga、XA分布式事务模式的具体实现类;
DefaultCore#commit()方法为提交全局事务的业务逻辑,方法的入参仅有一个xid(全局事务的唯一标识);
@Override
public GlobalStatus commit(String xid) throws TransactionException {
// 根据xid从存储介质(store.mode)中查询出全局事务信息、以及全局事务关联的全部分支事务信息
GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
if (globalSession == null) {
return GlobalStatus.Finished;
}
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
// just lock changeStatus
boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
// 仅当全局事务状态为Begin的时候,才能提交
if (globalSession.getStatus() == GlobalStatus.Begin) {
// Highlight: Firstly, close the session, then no more branch can be registered.
globalSession.closeAndClean();
// 所有分支事务的模式为AT 或 事务状态为TCC模式的一阶段,可以异步提交全局事务
if (globalSession.canBeCommittedAsync()) {
globalSession.asyncCommit();
// 性能指标监控,有需要再关注
MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Committed, false, false);
return false;
} else {
globalSession.changeGlobalStatus(GlobalStatus.Committing);
return true;
}
}
return false;
});
// AT模式下shouldCommit为false;
if (shouldCommit) {
boolean success = doGlobalCommit(globalSession, false);
//If successful and all remaining branches can be committed asynchronously, do async commit.
if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {
globalSession.asyncCommit();
return GlobalStatus.Committed;
} else {
return globalSession.getStatus();
}
} else {
// 如果全局事务的状态为AsyncCommitting,则返回GlobalStatus.Committed;否则返回全局事务此时的真实状态。
return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();
}
}
提交全局事务逻辑如下:
- 根据xid从存储介质(比如:
store.mode=db
)中查询出全局事务信息、以及全局事务关联的全部分支事务信息;如果找不到相应的全局事务,则直接返回GlobalStatus.Finished
,表示全局事务已经被完成 或 全局事务不存在。- 根据SeataServer数据存储模式(File、DB、Redis)选择不同的
SessionManager
,进而执行相应SessionManager
的lockAndExecute()方法,比如博主使用的数据存储模式是DB、所以对应的SessionManager
为DataBaseSessionManager
;但无论是哪种SessionManager
本质上都是要运行lockAndExecute()方法入参中的GlobalSession.LockCallable
,区别在于,File的方式会先给全局事务会话上把锁(ReentrantLock
)。
PS:这里的是用烂了设计模式搭配方式:工厂 + 策略 + 模板方法。- 仅当全局事务状态为Begin的时候,才能提交;
- 先将全局事务的状态失活;然后如果事务模式是AT模式,则释放占用当前全局事务占用的全局锁,即根据全局事务xid从lock_table中删除数据。
- 接着,判断当前全局事务是否可以异步提交,仅当所有分支事务的模式都为AT 或 事务状态为一阶段提交失败,可以异步提交全局事务。
所谓异步提交,实际是将全局事务的状态更新为异步提交中(AsyncCommitting
);
在Seata Server启动的时候会初始化DefaultCoordinator
,进而启动一个定时任务:默认定时任务每次(每秒运行一次)只会处理100个异步提交请求(DB存储模式下可通过store.db.queryLimit=100
参数配置);在每次执行定时任务时,需要获取分布式全局锁(lockKey = AsyncCommitting,lockValue = ip:port(ip和port为tc的)),防止某一个全局事务的异步提交操作执行多次!- 如果当前全局事务不可以异步提交,则同步提交;然而全局事务异步提交请求的处理方式和同步提交是一样的:
- SAGA模式走自己特定的逻辑(聊saga的时候再细讲),其余的–AT、TCC、XA均走以下逻辑:
- 走APP内存层面获取到全局事务所有的分支事务:
- 针对每个分支事务构建分支事务提交请求
BranchCommitRequest
发送到RM,RM清空分支事务的undo_log数据;- 分支事务提交成功后,如果返回的状态是二阶段提交完成
PhaseTwo_Committed
,将分支事务数据从branch_table中移除;- 分支事务全部提交之后,将全局事务从global_table中移除。
这里的信息量比较大,有很多内容在DefaultCore#commit()方法
中并没有体现出来。下面博主就展开给大家聊一下。
1、从存储介质中查询全局事务、关联的所有分支事务
GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
在前面的文章我们聊过,SessionHolder是各种会话管理器SessionManager的持有者,我们通过其实现对GlobalSession、BranchSession的操作。
SessionHolder.findGlobalSession()方法会从存储介质(可以是MySQL)中查询出全局事务和全局事务关联的所有分支事务信息;
就MySQL而言,这里会做两次SQL查询操作;
SQL分别为:
1> 查询全局事务SQL:
select xid, transaction_id, status, application_id, transaction_service_group, transaction_name, timeout, begin_time, application_data, gmt_create, gmt_modified from global_table where xid = ?
2> 查询全局事务关联的所有分支事务SQL:
select xid, transaction_id, branch_id, resource_group_id, resource_id, branch_type, status, client_id, application_data, gmt_create, gmt_modified from branch_table where xid = ? order by gmt_create asc
获取到全局事务 和 全局事务关联的所有分支事务之后,将全局事务信息转换为GlobalSession,然后再将所有的分支事务转换为BranchSession、并关联到GlobalSession中。
如果找不到相应的全局事务,则直接返回GlobalStatus.Finished
,表示全局事务已经被完成 或 全局事务不存在。
2、关闭全局事务、清理全局锁信息
根据SeataServer数据存储模式(File、DB、Redis)选择不同的SessionManager
,进而执行相应SessionManager
的lockAndExecute()方法,比如博主使用的数据存储模式是DB、所以对应的SessionManager
为DataBaseSessionManager
;但无论是哪种SessionManager
本质上都是要运行lockAndExecute()方法入参中的GlobalSession.LockCallable
,区别在于,File的方式会先给全局事务会话上把锁(ReentrantLock
),以保证写文件的有序性。
当全局事务状态为Begin的时候,才能提交;
提交之前会先将全局事务的状态失活;然后如果事务模式是AT模式,则释放占用当前全局事务占用的全局锁,即根据全局事务xid从lock_table中删除数据。
1> 全局事务会话失活流程:
此处的SessionLifecycleListener
是在上层DefaultCore#commit()方法中添加的:
2> 清理全局锁流程:
首先判断全局事务会话中是否包含事务模式的AT的分支事务:
如果含有,则将全局事务占用的全局锁全局释放,其实就是根据全局事务xid从lock_table中删除数据。
本质上就是执行一条DELETE类型的SQL语句:
delete from lock_table where xid = ?
3、判断全局事务是否可以异步提交
判断当前全局事务是否可以异步提交,仅当所有分支事务的模式都为AT 或 事务状态为一阶段提交失败,可以异步提交全局事务。
public boolean canBeCommittedAsync() {
List<BranchSession> branchSessions = getBranchSessions();
for (BranchSession branchSession : branchSessions) {
// 只有所有的分支事务的模式都为AT模式 或 事务状态为一阶段提交失败,才能异步提交全局事务
if (!branchSession.canBeCommittedAsync()) {
return false;
}
}
return true;
}
// BranchSession类中
public boolean canBeCommittedAsync() {
return branchType == BranchType.AT || status == BranchStatus.PhaseOne_Failed;
}
就本文示例而言,所有的分支事务模式都是AT模式,所以会触发异步提交全局事务。
4、全局事务异步提交
所谓异步提交,实际是将全局事务的状态更新为异步提交中(AsyncCommitting
);
另外:在Seata Server启动的时候会初始化DefaultCoordinator
,进而启动一个定时任务:默认定时任务每次(每秒运行一次)只会处理100个异步提交请求(DB存储模式下可通过store.db.queryLimit=100
参数配置);
1> 更新全局事务状态为AsyncCommitting
最终体现其实就是一条更新全局事务的SQL:
update global_table set status = 8, gmt_modified = now() where xid = ?
状态为8,表示全局事务此时处理异步提交中。
这里只是将全局事务状态更为为异步提交中,但没有真正的提交事务。
2> DefaultCoordinator中定时任务处理异步提交的全局事务
在Seata Server启动的时候会初始化DefaultCoordinator
,进而启动一个定时任务:默认定时任务每次(每秒运行一次)只会处理100个异步提交请求(DB存储模式下可通过store.db.queryLimit=100
参数配置);
在每次执行定时任务时,需要先获取分布式全局锁(lockKey = AsyncCommitting,lockValue = ip:port(ip和port为tc的)),防止某一个全局事务的异步提交操作执行多次!
此处分布式锁的释放是一个惰性策略,并不会主动删除数据,而是在每次获取分布式锁的时候进行必要的更新,每次获取分布式锁的逻辑为:
先从DB中以当前读的方式查询出异步提交全局事务的分布式锁;SQL为:
SELECT lock_key,lock_value,expire FROM distributed_lock_table WHERE lock_key = `AsyncCommitting` FOR UPDATE;
如果不存在分布式锁,则插入一个新的分布式锁;SQL为:
INSERT INTO distributed_lock_table(lock_key,lock_value,expire) VALUES (`AsyncCommitting`, ?, ?)
如果存在分布式锁,并且还没有过期,则表明当前获取锁失败,不能执行异步提交全局事务;
如果其他人获取到的锁已经过期,此时需要更新分布式锁的信息;SQL为:
UPDATE distributed_lock_table set lock_value=?, expire=? WHERE lock_key=`AsyncCommitting`
handleAsyncCommitting()
方法处理全局事务异步提交请求:
protected void handleAsyncCommitting() {
// 默认获取100条(DB存储模式下可通过`store.db.queryLimit=100`参数配置) 状态为AsyncCommitting的全局事务,走一次DB SQL查询操作
SessionCondition sessionCondition = new SessionCondition(GlobalStatus.AsyncCommitting);
Collection<GlobalSession> asyncCommittingSessions =
SessionHolder.getAsyncCommittingSessionManager().findGlobalSessions(sessionCondition);
if (CollectionUtils.isEmpty(asyncCommittingSessions)) {
return;
}
SessionHelper.forEach(asyncCommittingSessions, asyncCommittingSession -> {
try {
asyncCommittingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
// 全局事务异步提交
core.doGlobalCommit(asyncCommittingSession, true);
} catch (TransactionException ex) {
LOGGER.error("Failed to async committing [{}] {} {}", asyncCommittingSession.getXid(), ex.getCode(), ex.getMessage(), ex);
}
});
}
异步提交全局事务只做了两件事:
首先默认获取100条(DB存储模式下可通过
store.db.queryLimit=100
参数配置) 状态为AsyncCommitting的全局事务(会把关联的所有分支事务一起查出来),会走DB执行两次SQL查询操作:select xid, transaction_id, status, application_id, transaction_service_group, transaction_name, timeout, begin_time, application_data, gmt_create, gmt_modified from global_table where status in(`AsyncCommitting`) order by gmt_modified limit 100 select xid, transaction_id, branch_id, resource_group_id, resource_id, branch_type, status, client_id, application_data, gmt_create, gmt_modified from branch_table where xid in (?) order by gmt_create asc
通过封装了AT、TCC、Saga、XA分布式事务模式具体实现的DefaultCore处理全局事务提交;这里的逻辑和全局事务同步提交是一致的,所以我们放在下面来看。
5、全局事务同步提交
如果当前全局事务不可以异步提交,则同步提交;然而全局事务异步提交请求的处理方式和同步提交是一样的,都体现在DefaultCore#doGlobalCommit()方法:
@Override
public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
boolean success = true;
// start committing event
MetricsPublisher.postSessionDoingEvent(globalSession, retrying);
if (globalSession.isSaga()) {
success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);
} else {
// 获取到全局事务所有的分支事务
Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {
// if not retrying, skip the canBeCommittedAsync branches
if (!retrying && branchSession.canBeCommittedAsync()) {
return CONTINUE;
}
BranchStatus currentStatus = branchSession.getStatus();
if (currentStatus == BranchStatus.PhaseOne_Failed) {
SessionHelper.removeBranch(globalSession, branchSession, !retrying);
return CONTINUE;
}
try {
// 分支事务的提交
BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);
if (isXaerNotaTimeout(globalSession,branchStatus)) {
LOGGER.info("Commit branch XAER_NOTA retry timeout, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
branchStatus = BranchStatus.PhaseTwo_Committed;
}
switch (branchStatus) {
case PhaseTwo_Committed:
// 事务的两阶段完成(即:分支事务提交完成),将分支事务数据从branch_table中移除。
SessionHelper.removeBranch(globalSession, branchSession, !retrying);
return CONTINUE;
case PhaseTwo_CommitFailed_Unretryable:
//not at branch
SessionHelper.endCommitFailed(globalSession, retrying);
LOGGER.error("Committing global transaction[{}] finally failed, caused by branch transaction[{}] commit failed.", globalSession.getXid(), branchSession.getBranchId());
return false;
default:
if (!retrying) {
globalSession.queueToRetryCommit();
return false;
}
if (globalSession.canBeCommittedAsync()) {
LOGGER.error("Committing branch transaction[{}], status:{} and will retry later",
branchSession.getBranchId(), branchStatus);
return CONTINUE;
} else {
LOGGER.error(
"Committing global transaction[{}] failed, caused by branch transaction[{}] commit failed, will retry later.", globalSession.getXid(), branchSession.getBranchId());
return false;
}
}
} catch (Exception ex) {
StackTraceLogger.error(LOGGER, ex, "Committing branch transaction exception: {}",
new String[] {branchSession.toString()});
if (!retrying) {
globalSession.queueToRetryCommit();
throw new TransactionException(ex);
}
}
return CONTINUE;
});
// Return if the result is not null
// todo 你以为分支事务都提交完,result会返回点内容??,实际上TMD 的 CONTINUE常量值是null,所以会继续往下走。
if (result != null) {
return result;
}
//If has branch and not all remaining branches can be committed asynchronously,
//do print log and return false
if (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) {
LOGGER.info("Committing global transaction is NOT done, xid = {}.", globalSession.getXid());
return false;
}
if (!retrying) {
//contains not AT branch
globalSession.setStatus(GlobalStatus.Committed);
}
}
// if it succeeds and there is no branch, retrying=true is the asynchronous state when retrying. EndCommitted is
// executed to improve concurrency performance, and the global transaction ends..
if (success && globalSession.getBranchSessions().isEmpty()) {
// 分支事务全部提交之后,将全局事务从global_table中移除。
SessionHelper.endCommitted(globalSession, retrying);
LOGGER.info("Committing global transaction is successfully done, xid = {}.", globalSession.getXid());
}
return success;
}
流程可以概括为:
- SAGA模式走自己特定的逻辑(聊saga的时候再细讲),其余的–AT、TCC、XA均走以下逻辑:
- 走APP内存层面获取到全局事务所有的分支事务:
- 针对每个分支事务构建分支事务提交请求
BranchCommitRequest
交给RM,RM清空分支事务的undo_log数据;- 分支事务提交成功后,如果返回的状态是二阶段提交完成
PhaseTwo_Committed
,将分支事务数据从branch_table中移除;- 分支事务全部提交之后,将全局事务从global_table中移除。
1> 走APP内存获取到全局事务所有的分支事务
无论是异步提交全局事务、还是同步提交全局事务,它们调用到DefaultCore#doGlobalCommit()时,传入的全局事务会话都是包含分支事务信息的。
2> 每个分支事务发送BranchCommitRequest到RM,清空undo_log日志
遍历全局事务的每个分支事务,构建分支事务提交请求BranchCommitRequest
交给TC自己其他业务线程处理;其他线程负责清空分支事务的undo_log数据;
(1)发送分支事务提交请求BranchCommitRequest到RM:
(2)RM处理分支事务提交请求BranchCommitRequest:
>> 1 接收到请求的部分链路如下(具体TC和RM/TM的交互可以参考:分布式事务Seata源码解析九:分支事务如何注册到全局事务):
>> 2 RM处理分支事务提交:
分支事务的提交委托给异步工作者AsyncWorker
;
private void addToCommitQueue(Phase2Context context) {
// 将二阶段上下文添加到一个BlockingQueue中;queue的长度默认为1000,可通过(`client.rm.asyncCommitBufferLimit=10000`配置)
if (commitQueue.offer(context)) {
return;
}
CompletableFuture.runAsync(this::doBranchCommitSafely, scheduledExecutor)
.thenRun(() -> addToCommitQueue(context));
}
AsyncWorker
将通过分支事务转换的两阶段上下文Phase2Context
添加到一个阻塞队列BlockingQueue
中,queue的长度默认为1000;如果队列满了,无法添加Phase2Context
,则紧急的执行分支事务提交doBranchCommit()
,分支事务提交之后再将Phase2Context
添加到阻塞队列BlockingQueue
中。
>> 3 RM正式分支事务提交:
异步工作者AsyncWroker
实例化的时候会延时10ms启动一个每秒执行一次的定时任务去从从commitQueue中获取两阶段上下文 对分支事务进行提交。
提交分支事务的方式统一为doBranchCommitSafely()
:
最终其实就是1000个Phase2Context一组,分组删除相应分支事务的undo_log:
有个注意点:如果没有获取到数据库资源,则将分支事务提交请求再次添加会commitQueue中;
批量删除undo_log的SQL如下:
DELETE FROM undo_log WHERE branch_id IN(?,?,?) AND xid IN(?,?,?)
3> 分支事务提交成功后,删除分支事务数据
分支事务提交成功后,如果返回的状态是二阶段提交完成PhaseTwo_Committed
,将分支事务数据从branch_table中移除(此时,RM还没有将undo_log删除,具体删除时间点由AsyncWorker中每秒执行一次的定时任务决定。);
删除分支事务数据是同步执行的,而从DB中删除分支事务数据的操作由SessionLifecycleListener
触发,执行方法onRemoveBranch()
;
在每次执行DefaultCore#doGlobalCommit()
方法提交全局事务时,其上游调用方法都会通过以下方式,添加一个SessionLifecycleListener
,就store.mode=db而言,SessionLifecycleListener
为DataBaseSessionManager
。
committingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
删除分支事务的SQL如下:
delete from branch_table where xid = ? and branch_id = ?
4> 分支事务全部提交之后,删除全局事务数据
分支事务全部提交之后,将全局事务从global_table中移除;
当且仅当所有的分支事务全部提交完毕之后,才会将全局事务数据删除;
删除全局事务数据的SQL如下:
delete from global_table where xid = ?
三、总结和后续
seata client发起全局事务提交的流程如下:
- 首先事务的角色必须是发起者
Launcher
,不然做执行具体的全局事务提交操作;- 全局事务必须要有自己的唯一标识:xid,否则断言抛出异常:
IllegalStateException
;- 以最多重试五次(可通过
client.tm.commitRetryCount=5
配置)的方式,调用事务管理器TransactionManager
的commit()方法,基于全局事务xid封装一个GlobalCommitRequest
请求通过netty发送到seata server(TC)。
TC接收到TM提交全局事务的请求之后,会交给DefaultCore来处理提交全局事务逻辑:
提交全局事务逻辑如下:
- 根据xid从存储介质(比如:
store.mode=db
)中查询出全局事务信息、以及全局事务关联的全部分支事务信息;如果找不到相应的全局事务,则直接返回GlobalStatus.Finished
,表示全局事务已经被完成 或 全局事务不存在。- 根据SeataServer数据存储模式(File、DB、Redis)选择不同的
SessionManager
,进而执行相应SessionManager
的lockAndExecute()方法,比如博主使用的数据存储模式是DB、所以对应的SessionManager
为DataBaseSessionManager
;但无论是哪种SessionManager
本质上都是要运行lockAndExecute()方法入参中的GlobalSession.LockCallable
,区别在于,File的方式会先给全局事务会话上把锁(ReentrantLock
)。
PS:这里的是用烂了设计模式搭配方式:工厂 + 策略 + 模板方法。- 仅当全局事务状态为Begin的时候,才能提交;
- 先将全局事务的状态失活;然后如果事务模式是AT模式,则释放占用当前全局事务占用的全局锁,即根据全局事务xid从lock_table中删除数据。
- 接着,判断当前全局事务是否可以异步提交,仅当所有分支事务的模式都为AT 或 事务状态为一阶段提交失败,可以异步提交全局事务。
所谓异步提交,实际是将全局事务的状态更新为异步提交中(AsyncCommitting
);
在Seata Server启动的时候会初始化DefaultCoordinator
,进而启动一个定时任务:默认定时任务每次(每秒运行一次)只会处理100个异步提交请求(DB存储模式下可通过store.db.queryLimit=100
参数配置);在每次执行定时任务时,需要获取分布式全局锁(lockKey = AsyncCommitting,lockValue = ip:port(ip和port为tc的)),防止某一个全局事务的异步提交操作执行多次!- 如果当前全局事务不可以异步提交,则同步提交;然而全局事务异步提交请求的处理方式和同步提交是一样的:
- SAGA模式走自己特定的逻辑(聊saga的时候再细讲),其余的–AT、TCC、XA均走以下逻辑:
- 走APP内存层面获取到全局事务所有的分支事务:
- 针对每个分支事务构建分支事务提交请求
BranchCommitRequest
发送到RM,RM清空分支事务的undo_log数据;- 分支事务提交成功后,如果返回的状态是二阶段提交完成
PhaseTwo_Committed
,将分支事务数据从branch_table中移除;- 分支事务全部提交之后,将全局事务从global_table中移除。
下一篇文章,我们继续聊如果全局事务执行出现异常,全局事务是如何回滚的。