seata-2阶段提交-笔记3

发布于:2024-12-19 ⋅ 阅读:(15) ⋅ 点赞:(0)

本文属于B站图灵课堂springcloud笔记系列。

前面整理过2篇:seata 2阶段提交实现代码-笔记1-CSDN博客   扫描@GlobalTransactional注解

seata 2阶段提交实现代码-笔记2-CSDN博客 TC生成XID,并保存到global_table表。

本篇继续整理 执行业务逻辑,提交本地事务部分。

目前已经通过beginTransaction(txInfo, tx)获取到了全局事务ID,并记录到global_table全局事务表中,接下来会执行 business.execute():进入业务代码,触发点在执行业务逻辑第一篇里面,TransactionalTemplate.execute()。

demo :order调用逻辑就是调用库存服务扣减库存、调用账户服务扣减余额、生成订单。

以库存服务为例:

@Override
	@Transactional
	public void reduceStock(String commodityCode, Integer count)
			throws BusinessException {
		logger.info("[reduceStock] current XID: {}", RootContext.getXID());
		logger.info("扣减库存");

		checkStock(commodityCode, count);

		Timestamp updateTime = new Timestamp(System.currentTimeMillis());
		int updateCount = storageMapper.reduceStock(commodityCode, count, updateTime);
		if (updateCount == 0) {
			throw new BusinessException("deduct stock failed");
		}
	}

只是扣减库存,但是执行过程中我们会观察到库存库中表undo_log 会自动新增一条记录。

原因就是seata的数据源代理,用户是无感知的。前面篇1 也提到了GlobalTransactionScanner 是扫描有注解的bean做AOP增强。

数据源代理

  

DataSourceProxy 构造器会调用io.seata.rm.datasource.DataSourceProxy#init

private void init(DataSource dataSource, String resourceGroupId) {
        this.resourceGroupId = resourceGroupId;
        try (Connection connection = dataSource.getConnection()) {
            jdbcUrl = connection.getMetaData().getURL();
            dbType = JdbcUtils.getDbType(jdbcUrl);
            if (JdbcConstants.ORACLE.equals(dbType)) {
                userName = connection.getMetaData().getUserName();
            } else if (JdbcConstants.MYSQL.equals(dbType)) {
                validMySQLVersion(connection);
                checkDerivativeProduct();
            }
        } catch (SQLException e) {
            throw new IllegalStateException("can not init dataSource", e);
        }
        if (JdbcConstants.SQLSERVER.equals(dbType)) {
            LOGGER.info("SQLServer support in AT mode is currently an experimental function, " +
                    "if you have any problems in use, please feedback to us");
        }
        initResourceId();
        DefaultResourceManager.get().registerResource(this);
        TableMetaCacheFactory.registerTableMeta(this);
        //Set the default branch type to 'AT' in the RootContext.
        RootContext.setDefaultBranchType(this.getBranchType());
    }

DataSourceProxy.init的主要功能:

设置资源组ID,默认是DEFAULT、初始化ResourceId

向资源管理器DefaultResourceManager注册本类

加缓存:TableMetaCacheFactory 表元数据信息。还有隐含生成了代理连接ConnectionProxy

注意这个DataSourceProxy#getConnection()生成ConnectionProxy,不是普通的Connection

    @Override
    public ConnectionProxy getConnection() throws SQLException {
        Connection targetConnection = targetDataSource.getConnection();
        return new ConnectionProxy(this, targetConnection);
    }

    @Override
    public ConnectionProxy getConnection(String username, String password) throws SQLException {
        Connection targetConnection = targetDataSource.getConnection(username, password);
        return new ConnectionProxy(this, targetConnection);
    }
public class ConnectionProxy extends AbstractConnectionProxy {

ConnectionProxy 继承了AbstractConnectionProxy,这个抽象连接代理,封装了很多通用功能。比如获取连接等。关注下StatementProxy、PreparedStatementProxy

  public Statement createStatement() throws SQLException {
        Statement targetStatement = getTargetConnection().createStatement();
        return new StatementProxy(this, targetStatement);
    }

    @Override
    public PreparedStatement prepareStatement(String sql) throws SQLException {
        String dbType = getDbType();
        // support oracle 10.2+
        PreparedStatement targetPreparedStatement = null;
        if (BranchType.AT == RootContext.getBranchType()) {
            List<SQLRecognizer> sqlRecognizers = SQLVisitorFactory.get(sql, dbType);
            if (sqlRecognizers != null && sqlRecognizers.size() == 1) {
                SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
                if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) {
                    TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(),
                            sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId());
                    String[] pkNameArray = new String[tableMeta.getPrimaryKeyOnlyName().size()];
                    tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray);
                    targetPreparedStatement = getTargetConnection().prepareStatement(sql,pkNameArray);
                }
            }
        }
        if (targetPreparedStatement == null) {
            targetPreparedStatement = getTargetConnection().prepareStatement(sql);
        }
        return new PreparedStatementProxy(this, targetPreparedStatement, sql);
    }

2.0版本里面StatementProxy、PreparedStatementProxy 继承类 不一样。都封装了几个SQL执行方法。SQL执行:

    @Override
    public boolean execute() throws SQLException {
        return ExecuteTemplate.execute(this, (statement, args) -> statement.execute());
    }

    @Override
    public ResultSet executeQuery() throws SQLException {
        return ExecuteTemplate.execute(this, (statement, args) -> statement.executeQuery());
    }

    @Override
    public int executeUpdate() throws SQLException {
        return ExecuteTemplate.execute(this, (statement, args) -> statement.executeUpdate());
    }

在这些方法中都调用了 ExecuteTemplate.execute(),接下来看看这个方法:

  public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
                                                     StatementProxy<S> statementProxy,
                                                     StatementCallback<T, S> statementCallback,
                                                     Object... args) throws SQLException {
        //不是全局模式,不是AT 走正常的逻辑直接执行SQL
        if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {
            // Just work as original statement
            return statementCallback.execute(statementProxy.getTargetStatement(), args);
        }
        //以下为代理逻辑
        //获取数据库类型 mysql/oracle
        String dbType = statementProxy.getConnectionProxy().getDbType();
        if (CollectionUtils.isEmpty(sqlRecognizers)) {
            //sql 解析器,通过它可以获取sql的表名、列名、类型等信息,解析出sql表达式
            //PlainExecutor直接使用原生的Statment对象执行SQL
            sqlRecognizers = SQLVisitorFactory.get(
                    statementProxy.getTargetSQL(),
                    dbType);
        }
        Executor<T> executor;
        if (CollectionUtils.isEmpty(sqlRecognizers)) {
            //没找到sql 解析器,就使用PlainExecutor
            executor = new PlainExecutor<>(statementProxy, statementCallback);
        } else {
            if (sqlRecognizers.size() == 1) {
                SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
                switch (sqlRecognizer.getSQLType()) {//针对插入、更新、删除、加锁查询、插入加锁等
                    case INSERT:
                        executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
                                    new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
                                    new Object[]{statementProxy, statementCallback, sqlRecognizer});
                        break;
                    case UPDATE:
                        if (JdbcConstants.SQLSERVER.equalsIgnoreCase(dbType)) {
                            executor = new SqlServerUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        } else {
                            executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        }
                        break;
                    case DELETE:
                        if (JdbcConstants.SQLSERVER.equalsIgnoreCase(dbType)) {
                            executor = new SqlServerDeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        } else {
                            executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        }
                        break;
                    case SELECT_FOR_UPDATE:
                        if (JdbcConstants.SQLSERVER.equalsIgnoreCase(dbType)) {
                            executor = new SqlServerSelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        } else {
                            executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        }
                        break;
                    case INSERT_ON_DUPLICATE_UPDATE:
                        switch (dbType) {
                            case JdbcConstants.MYSQL:
                                executor =
                                        new MySQLInsertOnDuplicateUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
                                break;
                            case JdbcConstants.MARIADB:
                                executor =
                                        new MariadbInsertOnDuplicateUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
                                break;
                            case JdbcConstants.POLARDBX:
                                executor = new PolarDBXInsertOnDuplicateUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
                                break;
                            default:
                                throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE");
                        }
                        break;
                    case UPDATE_JOIN:
                        switch (dbType) {
                            case JdbcConstants.MYSQL:
                                executor = new MySQLUpdateJoinExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                                break;
                            case JdbcConstants.MARIADB:
                                executor = new MariadbUpdateJoinExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                                break;
                            case JdbcConstants.POLARDBX:
                                executor = new PolarDBXUpdateJoinExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                                break;
                            default:
                                throw new NotSupportYetException(dbType + " not support to " + SQLType.UPDATE_JOIN.name());
                        }
                        break;
                    default: //默认原生
                        executor = new PlainExecutor<>(statementProxy, statementCallback);
                        break;
                }
            } else {//批量
                executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
            }
        }
        T rs;
        try {//使用上面返回的执行器,执行execute
            rs = executor.execute(args);
        } catch (Throwable ex) {
            if (!(ex instanceof SQLException)) {
                // Turn other exception into SQLException
                ex = new SQLException(ex);
            }
            throw (SQLException) ex;
        }
        return rs;
    }

Seata提供了几种执行器也就是我们代码 case 中(INSERTUPDATEDELETESELECT_FOR_UPDATE,INSERT_ON_DUPLICATE_UPDATE,UPDATE_JOIN 少见),这些执行器的父类都是AbstractDMLBaseExecutor。以UpdateExecutor为例

然后我们看 executor.execute(args); 最终执行的方法

    @Override
    public T execute(Object... args) throws Throwable {
        String xid = RootContext.getXID();//获取xid,此前RootContext已经填充
        if (xid != null) {//与数据库连接绑定,注意是ConnectionProxy
            statementProxy.getConnectionProxy().bind(xid);
        }
        //设置是否需要全局锁
        statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
        return doExecute(args);//执行
    }

接下来看io.seata.rm.datasource.exec.AbstractDMLBaseExecutor#doExecute

    @Override
    public T doExecute(Object... args) throws Throwable {
        AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        if (connectionProxy.getAutoCommit()) {
            return executeAutoCommitTrue(args);
        } else {
            return executeAutoCommitFalse(args);
        }
    }

对于数据库而言,本身都是自动提交的,所以我们进入executeAutoCommitTrue()

    protected T executeAutoCommitTrue(Object[] args) throws Throwable {
        ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        try {//设置为手动提交
            connectionProxy.changeAutoCommit();
            return new LockRetryPolicy(connectionProxy).execute(() -> {
                //调用手动提交方法,得到分支执行的最终结果(并准备undo_log的内容,设置前置镜像 和 后置镜像)
                T result = executeAutoCommitFalse(args);
                //执行提交
                connectionProxy.commit();
                return result;
            });
        } catch (Exception e) {
            // when exception occur in finally,this exception will lost, so just print it here
            LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
            if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
                connectionProxy.getTargetConnection().rollback();
            }
            throw e;
        } finally {
            connectionProxy.getContext().reset();
            connectionProxy.setAutoCommit(true);
        }
    }

再看下关键的executeAutoCommitFalse()

    protected T executeAutoCommitFalse(Object[] args) throws Exception {
        try {//获取前镜像
            TableRecords beforeImage = beforeImage();
            //执行目标sql
            T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
            //后镜像
            TableRecords afterImage = afterImage(beforeImage);
            //准备undo_log
            prepareUndoLog(beforeImage, afterImage);
            return result;
        } catch (TableMetaException e) {
            LOGGER.error("table meta will be refreshed later, due to TableMetaException, table:{}, column:{}",
                e.getTableName(), e.getColumnName());
            statementProxy.getConnectionProxy().getDataSourceProxy().tableMetaRefreshEvent();
            throw e;
        }
    }

此时本地事务未提交,再回到executeAutoCommitTrue中看看提交.ConnectionProxy.commit()

    public void commit() throws SQLException {
        try {
            lockRetryPolicy.execute(() -> {
                doCommit();//提交
                return null;
            });
        } catch (SQLException e) {
            if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
                rollback();//回滚
            }
            throw e;
        } catch (Exception e) {
            throw new SQLException(e);
        }
    }

io.seata.rm.datasource.ConnectionProxy#doCommit()


    private void doCommit() throws SQLException {
        if (context.inGlobalTransaction()) {
            processGlobalTransactionCommit();//执行全局事务的提交逻辑
        } else if (context.isGlobalLockRequire()) {//如果需要全局事务锁
            processLocalCommitWithGlobalLocks();
        } else {//非全局事务,直接提交
            targetConnection.commit();
        }
    }

作为分布式事务,看第一个。

    private void processGlobalTransactionCommit() throws SQLException {
        try {//注册分支事务( RM 向TC发请求,TC注册分支事务)
            register();
        } catch (TransactionException e) {
            recognizeLockKeyConflictException(e, context.buildLockKeys());
        }
        try {
            //生成 undo_log 回滚日志:插入到undo_log表
            UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
            //一阶段提交 
            targetConnection.commit();
        } catch (Throwable ex) {
            LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
            report(false);
            throw new SQLException(ex);
        }
        if (IS_REPORT_SUCCESS_ENABLE) {
            report(true);
        }
        context.reset();
    }

其中,生成undo_log,底层调用了io.seata.rm.datasource.undo.mysql.MySQLUndoLogManager#insertUndoLog

代码不贴了。对应表就是业务库undo_log,

register RM想TC注册分支。

TC处理逻辑入口在

    @Override
    protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response,
                                    RpcContext rpcContext) throws TransactionException {
        MDC.put(RootContext.MDC_KEY_XID, request.getXid());
        response.setBranchId(
                core.branchRegister(request.getBranchType(), request.getResourceId(), rpcContext.getClientId(),
                        request.getXid(), request.getApplicationData(), request.getLockKey()));
    }

底层核心逻辑调用了io.seata.server.coordinator.AbstractCore#branchRegister

@Override
    public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
                               String applicationData, String lockKeys) throws TransactionException {
        //根据xid 获取GlobalSession
        GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);
        return SessionHolder.lockAndExecute(globalSession, () -> {
            globalSessionStatusCheck(globalSession);
            //创建分支事务 branchSession
            BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,
                    applicationData, lockKeys, clientId);
            MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));
            //获取全局锁 底层会存储到lock_table
            branchSessionLock(globalSession, branchSession);
            try {//加入到globalSession 底层会保存到branch_table 表
                globalSession.addBranch(branchSession);
            } catch (RuntimeException ex) {
                branchSessionUnlock(branchSession);
                throw new BranchTransactionException(FailedToAddBranch, String
                        .format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(),
                                branchSession.getBranchId()), ex);
            }
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Register branch successfully, xid = {}, branchId = {}, resourceId = {} ,lockKeys = {}",
                        globalSession.getXid(), branchSession.getBranchId(), resourceId, lockKeys);
            }
            return branchSession.getBranchId();
        });
    }

最终会在seata库的 全局加锁会存储到lock_table表中,branch_table表中插一条记录。

这里不再展开,后面待整理。

流程很长,只看原码容易乱。网上找了个大佬画的图比较清晰,原文地址:墨滴社区

引用下,这个图可能跟你在看版本有所差异,我看2.0就是这样。

小结:

篇1:seata 2阶段提交实现代码-笔记1-CSDN博客

扫描@GlobalTransactional注解,TM向TC发请求,获取全局事务XID

篇2:seata 2阶段提交实现代码-笔记2-CSDN博客
TC生成全局事务XID,并存储到全局事务表global_table中
本篇:使用数据源代理
准备前置镜像
执行目标sql,执行但未提交
准备后置镜像,组装undo_log
向TC注册分支事务,TC端获取全局事务锁,涉及到seata库lock_table表中,把分支事务信息存储到branch_table表,
RM端提交undo_log信息,在业务库下的 undo_log表中,用于事务回滚。
RM端提交本地事务

真是博大精深啊,才看了个皮毛。