本文属于B站图灵课堂springcloud笔记系列。
前面整理过2篇:seata 2阶段提交实现代码-笔记1-CSDN博客 扫描@GlobalTransactional注解
seata 2阶段提交实现代码-笔记2-CSDN博客 TC生成XID,并保存到global_table表。
本篇继续整理 执行业务逻辑,提交本地事务部分。
目前已经通过beginTransaction(txInfo, tx)
获取到了全局事务ID
,并记录到global_table
全局事务表中,接下来会执行 business.execute()
:进入业务代码,触发点在执行业务逻辑第一篇里面,TransactionalTemplate.execute()。
demo :order调用逻辑就是调用库存服务扣减库存、调用账户服务扣减余额、生成订单。
以库存服务为例:
@Override
@Transactional
public void reduceStock(String commodityCode, Integer count)
throws BusinessException {
logger.info("[reduceStock] current XID: {}", RootContext.getXID());
logger.info("扣减库存");
checkStock(commodityCode, count);
Timestamp updateTime = new Timestamp(System.currentTimeMillis());
int updateCount = storageMapper.reduceStock(commodityCode, count, updateTime);
if (updateCount == 0) {
throw new BusinessException("deduct stock failed");
}
}
只是扣减库存,但是执行过程中我们会观察到库存库中表undo_log 会自动新增一条记录。
原因就是seata的数据源代理,用户是无感知的。前面篇1 也提到了GlobalTransactionScanner 是扫描有注解的bean做AOP增强。
数据源代理
DataSourceProxy 构造器会调用io.seata.rm.datasource.DataSourceProxy#init
private void init(DataSource dataSource, String resourceGroupId) {
this.resourceGroupId = resourceGroupId;
try (Connection connection = dataSource.getConnection()) {
jdbcUrl = connection.getMetaData().getURL();
dbType = JdbcUtils.getDbType(jdbcUrl);
if (JdbcConstants.ORACLE.equals(dbType)) {
userName = connection.getMetaData().getUserName();
} else if (JdbcConstants.MYSQL.equals(dbType)) {
validMySQLVersion(connection);
checkDerivativeProduct();
}
} catch (SQLException e) {
throw new IllegalStateException("can not init dataSource", e);
}
if (JdbcConstants.SQLSERVER.equals(dbType)) {
LOGGER.info("SQLServer support in AT mode is currently an experimental function, " +
"if you have any problems in use, please feedback to us");
}
initResourceId();
DefaultResourceManager.get().registerResource(this);
TableMetaCacheFactory.registerTableMeta(this);
//Set the default branch type to 'AT' in the RootContext.
RootContext.setDefaultBranchType(this.getBranchType());
}
DataSourceProxy.init的主要功能:
设置资源组ID,默认是DEFAULT、初始化ResourceId
向资源管理器DefaultResourceManager注册本类
加缓存:TableMetaCacheFactory 表元数据信息。还有隐含生成了代理连接ConnectionProxy
注意这个DataSourceProxy#getConnection()生成ConnectionProxy,不是普通的Connection
@Override
public ConnectionProxy getConnection() throws SQLException {
Connection targetConnection = targetDataSource.getConnection();
return new ConnectionProxy(this, targetConnection);
}
@Override
public ConnectionProxy getConnection(String username, String password) throws SQLException {
Connection targetConnection = targetDataSource.getConnection(username, password);
return new ConnectionProxy(this, targetConnection);
}
public class ConnectionProxy extends AbstractConnectionProxy {
ConnectionProxy 继承了AbstractConnectionProxy,这个抽象连接代理,封装了很多通用功能。比如获取连接等。关注下StatementProxy、PreparedStatementProxy
public Statement createStatement() throws SQLException {
Statement targetStatement = getTargetConnection().createStatement();
return new StatementProxy(this, targetStatement);
}
@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
String dbType = getDbType();
// support oracle 10.2+
PreparedStatement targetPreparedStatement = null;
if (BranchType.AT == RootContext.getBranchType()) {
List<SQLRecognizer> sqlRecognizers = SQLVisitorFactory.get(sql, dbType);
if (sqlRecognizers != null && sqlRecognizers.size() == 1) {
SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) {
TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(),
sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId());
String[] pkNameArray = new String[tableMeta.getPrimaryKeyOnlyName().size()];
tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray);
targetPreparedStatement = getTargetConnection().prepareStatement(sql,pkNameArray);
}
}
}
if (targetPreparedStatement == null) {
targetPreparedStatement = getTargetConnection().prepareStatement(sql);
}
return new PreparedStatementProxy(this, targetPreparedStatement, sql);
}
2.0版本里面StatementProxy、PreparedStatementProxy 继承类 不一样。都封装了几个SQL执行方法。SQL执行:
@Override
public boolean execute() throws SQLException {
return ExecuteTemplate.execute(this, (statement, args) -> statement.execute());
}
@Override
public ResultSet executeQuery() throws SQLException {
return ExecuteTemplate.execute(this, (statement, args) -> statement.executeQuery());
}
@Override
public int executeUpdate() throws SQLException {
return ExecuteTemplate.execute(this, (statement, args) -> statement.executeUpdate());
}
在这些方法中都调用了 ExecuteTemplate.execute(),接下来看看这个方法:
public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
StatementProxy<S> statementProxy,
StatementCallback<T, S> statementCallback,
Object... args) throws SQLException {
//不是全局模式,不是AT 走正常的逻辑直接执行SQL
if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {
// Just work as original statement
return statementCallback.execute(statementProxy.getTargetStatement(), args);
}
//以下为代理逻辑
//获取数据库类型 mysql/oracle
String dbType = statementProxy.getConnectionProxy().getDbType();
if (CollectionUtils.isEmpty(sqlRecognizers)) {
//sql 解析器,通过它可以获取sql的表名、列名、类型等信息,解析出sql表达式
//PlainExecutor直接使用原生的Statment对象执行SQL
sqlRecognizers = SQLVisitorFactory.get(
statementProxy.getTargetSQL(),
dbType);
}
Executor<T> executor;
if (CollectionUtils.isEmpty(sqlRecognizers)) {
//没找到sql 解析器,就使用PlainExecutor
executor = new PlainExecutor<>(statementProxy, statementCallback);
} else {
if (sqlRecognizers.size() == 1) {
SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
switch (sqlRecognizer.getSQLType()) {//针对插入、更新、删除、加锁查询、插入加锁等
case INSERT:
executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
new Object[]{statementProxy, statementCallback, sqlRecognizer});
break;
case UPDATE:
if (JdbcConstants.SQLSERVER.equalsIgnoreCase(dbType)) {
executor = new SqlServerUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
} else {
executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
}
break;
case DELETE:
if (JdbcConstants.SQLSERVER.equalsIgnoreCase(dbType)) {
executor = new SqlServerDeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
} else {
executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
}
break;
case SELECT_FOR_UPDATE:
if (JdbcConstants.SQLSERVER.equalsIgnoreCase(dbType)) {
executor = new SqlServerSelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
} else {
executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
}
break;
case INSERT_ON_DUPLICATE_UPDATE:
switch (dbType) {
case JdbcConstants.MYSQL:
executor =
new MySQLInsertOnDuplicateUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
break;
case JdbcConstants.MARIADB:
executor =
new MariadbInsertOnDuplicateUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
break;
case JdbcConstants.POLARDBX:
executor = new PolarDBXInsertOnDuplicateUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
break;
default:
throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE");
}
break;
case UPDATE_JOIN:
switch (dbType) {
case JdbcConstants.MYSQL:
executor = new MySQLUpdateJoinExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case JdbcConstants.MARIADB:
executor = new MariadbUpdateJoinExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case JdbcConstants.POLARDBX:
executor = new PolarDBXUpdateJoinExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
default:
throw new NotSupportYetException(dbType + " not support to " + SQLType.UPDATE_JOIN.name());
}
break;
default: //默认原生
executor = new PlainExecutor<>(statementProxy, statementCallback);
break;
}
} else {//批量
executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
}
}
T rs;
try {//使用上面返回的执行器,执行execute
rs = executor.execute(args);
} catch (Throwable ex) {
if (!(ex instanceof SQLException)) {
// Turn other exception into SQLException
ex = new SQLException(ex);
}
throw (SQLException) ex;
}
return rs;
}
Seata提供了几种执行器也就是我们代码 case 中(INSERT
,UPDATE
,DELETE
,SELECT_FOR_UPDATE
,INSERT_ON_DUPLICATE_UPDATE,
UPDATE_JOIN 少见),这些执行器的父类都是AbstractDMLBaseExecutor。以UpdateExecutor为例
然后我们看 executor.execute(args);
最终执行的方法
@Override
public T execute(Object... args) throws Throwable {
String xid = RootContext.getXID();//获取xid,此前RootContext已经填充
if (xid != null) {//与数据库连接绑定,注意是ConnectionProxy
statementProxy.getConnectionProxy().bind(xid);
}
//设置是否需要全局锁
statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
return doExecute(args);//执行
}
接下来看io.seata.rm.datasource.exec.AbstractDMLBaseExecutor#doExecute
@Override
public T doExecute(Object... args) throws Throwable {
AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
if (connectionProxy.getAutoCommit()) {
return executeAutoCommitTrue(args);
} else {
return executeAutoCommitFalse(args);
}
}
对于数据库而言,本身都是自动提交的,所以我们进入executeAutoCommitTrue()
protected T executeAutoCommitTrue(Object[] args) throws Throwable {
ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
try {//设置为手动提交
connectionProxy.changeAutoCommit();
return new LockRetryPolicy(connectionProxy).execute(() -> {
//调用手动提交方法,得到分支执行的最终结果(并准备undo_log的内容,设置前置镜像 和 后置镜像)
T result = executeAutoCommitFalse(args);
//执行提交
connectionProxy.commit();
return result;
});
} catch (Exception e) {
// when exception occur in finally,this exception will lost, so just print it here
LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
connectionProxy.getTargetConnection().rollback();
}
throw e;
} finally {
connectionProxy.getContext().reset();
connectionProxy.setAutoCommit(true);
}
}
再看下关键的executeAutoCommitFalse()
protected T executeAutoCommitFalse(Object[] args) throws Exception {
try {//获取前镜像
TableRecords beforeImage = beforeImage();
//执行目标sql
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
//后镜像
TableRecords afterImage = afterImage(beforeImage);
//准备undo_log
prepareUndoLog(beforeImage, afterImage);
return result;
} catch (TableMetaException e) {
LOGGER.error("table meta will be refreshed later, due to TableMetaException, table:{}, column:{}",
e.getTableName(), e.getColumnName());
statementProxy.getConnectionProxy().getDataSourceProxy().tableMetaRefreshEvent();
throw e;
}
}
此时本地事务未提交,再回到executeAutoCommitTrue
中看看提交.ConnectionProxy.commit()
public void commit() throws SQLException {
try {
lockRetryPolicy.execute(() -> {
doCommit();//提交
return null;
});
} catch (SQLException e) {
if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
rollback();//回滚
}
throw e;
} catch (Exception e) {
throw new SQLException(e);
}
}
io.seata.rm.datasource.ConnectionProxy#doCommit()
private void doCommit() throws SQLException {
if (context.inGlobalTransaction()) {
processGlobalTransactionCommit();//执行全局事务的提交逻辑
} else if (context.isGlobalLockRequire()) {//如果需要全局事务锁
processLocalCommitWithGlobalLocks();
} else {//非全局事务,直接提交
targetConnection.commit();
}
}
作为分布式事务,看第一个。
private void processGlobalTransactionCommit() throws SQLException {
try {//注册分支事务( RM 向TC发请求,TC注册分支事务)
register();
} catch (TransactionException e) {
recognizeLockKeyConflictException(e, context.buildLockKeys());
}
try {
//生成 undo_log 回滚日志:插入到undo_log表
UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
//一阶段提交
targetConnection.commit();
} catch (Throwable ex) {
LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
report(false);
throw new SQLException(ex);
}
if (IS_REPORT_SUCCESS_ENABLE) {
report(true);
}
context.reset();
}
其中,生成undo_log,底层调用了io.seata.rm.datasource.undo.mysql.MySQLUndoLogManager#insertUndoLog
代码不贴了。对应表就是业务库undo_log,
register RM想TC注册分支。
TC处理逻辑入口在
@Override
protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response,
RpcContext rpcContext) throws TransactionException {
MDC.put(RootContext.MDC_KEY_XID, request.getXid());
response.setBranchId(
core.branchRegister(request.getBranchType(), request.getResourceId(), rpcContext.getClientId(),
request.getXid(), request.getApplicationData(), request.getLockKey()));
}
底层核心逻辑调用了io.seata.server.coordinator.AbstractCore#branchRegister
@Override
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
String applicationData, String lockKeys) throws TransactionException {
//根据xid 获取GlobalSession
GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);
return SessionHolder.lockAndExecute(globalSession, () -> {
globalSessionStatusCheck(globalSession);
//创建分支事务 branchSession
BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,
applicationData, lockKeys, clientId);
MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));
//获取全局锁 底层会存储到lock_table
branchSessionLock(globalSession, branchSession);
try {//加入到globalSession 底层会保存到branch_table 表
globalSession.addBranch(branchSession);
} catch (RuntimeException ex) {
branchSessionUnlock(branchSession);
throw new BranchTransactionException(FailedToAddBranch, String
.format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(),
branchSession.getBranchId()), ex);
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Register branch successfully, xid = {}, branchId = {}, resourceId = {} ,lockKeys = {}",
globalSession.getXid(), branchSession.getBranchId(), resourceId, lockKeys);
}
return branchSession.getBranchId();
});
}
最终会在seata库的 全局加锁会存储到lock_table
表中,branch_table
表中插一条记录。
这里不再展开,后面待整理。
流程很长,只看原码容易乱。网上找了个大佬画的图比较清晰,原文地址:墨滴社区
引用下,这个图可能跟你在看版本有所差异,我看2.0就是这样。
小结:
扫描@GlobalTransactional注解,TM向TC发请求,获取全局事务XID
篇2:seata 2阶段提交实现代码-笔记2-CSDN博客
TC生成全局事务XID,并存储到全局事务表global_table中
本篇:使用数据源代理
准备前置镜像
执行目标sql,执行但未提交
准备后置镜像,组装undo_log
向TC注册分支事务,TC端获取全局事务锁,涉及到seata库lock_table表中,把分支事务信息存储到branch_table表,
RM端提交undo_log信息,在业务库下的 undo_log表中,用于事务回滚。
RM端提交本地事务
真是博大精深啊,才看了个皮毛。