分布式事务

发布于:2024-11-23 ⋅ 阅读:(13) ⋅ 点赞:(0)

参考

Seata
详解Mysql分布式事务XA

CAP

这个定理的内容是指:在一个分布式系统中、Consistency(一致性)、Availability(可用性)、Partitiontolerance(分区容错性),三者不可得兼。

  • 一致性(C)
    在分布式系统中的所有数据备份,在同一时刻是否同样的值。(等同于所有节点访问同一份最新的数据副本)
  • 可用性(A)
    在集群中一部分节点故障后,集群整体是否还能响应客户端的读写请求。(对数据更新具备高可用性)
  • 分区容错性(P)
    以实际效果而言,分区相当于对通信的时限要求。系统如果不能在时限内达成数据一致性,就意味着发生了分区的情况,必须就当前操作在C和A之间做出选择

BASE理论(Basically Available Soft state Eventually consistent):即使无法做到强一致性,但每个应用都可以根据自身业务特点,采用适当的方式来使系统达到最终一致性。

CAP的应用:

  • 分布式锁:redis分布式锁是AP,zk分布式锁CP
  • 服务注册发现:zk是CP(选举时不可用),Eureka是AP(节点平等,挂掉会进入其他节点)

四种模式

在这里插入图片描述
AT模式(常用)需要客户端有undo_log表
XA模式基于主流关系型数据库能力
SAGA需要用前端组建绘制编排图之后生成json在服务中使用

XA

  • sql语法
XA {START|BEGIN} xid [JOIN|RESUME]   开启XA事务
业务sql
XA END xid [SUSPEND [FOR MIGRATE]]   
XA PREPARE xid                       二阶段提交的准备阶段
XA COMMIT xid [ONE PHASE]            二阶段提交的提交阶段
XA ROLLBACK xid                      回滚
XA RECOVER [CONVERT XID]             列出所有处于prepared状态的事务
  • jdbc方式
package study.xa;

import com.mysql.jdbc.jdbc2.optional.MysqlXAConnection;
import com.mysql.jdbc.jdbc2.optional.MysqlXid;

import javax.sql.XAConnection;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

/***
 * @Description mysql分布式事务XAConnection模拟
 * @author denny
 * @date 2019/4/3 上午9:15
 */
public class MysqlXaConnectionTest {

    public static void main(String[] args) throws SQLException {
        //true表示打印XA语句,,用于调试
        boolean logXaCommands = true;
        // 获得资源管理器操作接口实例 RM1
        Connection conn1 = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "12345");
        XAConnection xaConn1 = new MysqlXAConnection((com.mysql.jdbc.Connection)conn1, logXaCommands);
        XAResource rm1 = xaConn1.getXAResource();

        // 获得资源管理器操作接口实例 RM2
        Connection conn2 = DriverManager.getConnection("jdbc:mysql://localhost:3306/test2", "root", "12345");
        XAConnection xaConn2 = new MysqlXAConnection((com.mysql.jdbc.Connection)conn2, logXaCommands);
        XAResource rm2 = xaConn2.getXAResource();
        // AP请求TM执行一个分布式事务,TM生成全局事务id
        byte[] gtrid = "g12345".getBytes();
        int formatId = 1;
        try {
            // ==============分别执行RM1和RM2上的事务分支====================
            // TM生成rm1上的事务分支id
            byte[] bqual1 = "b00001".getBytes();
            Xid xid1 = new MysqlXid(gtrid, bqual1, formatId);
            // 执行rm1上的事务分支 One of TMNOFLAGS, TMJOIN, or TMRESUME.
            rm1.start(xid1, XAResource.TMNOFLAGS);
            // 业务1:插入user表
            PreparedStatement ps1 = conn1.prepareStatement("INSERT into user VALUES ('99', 'user99')");
            ps1.execute();
            rm1.end(xid1, XAResource.TMSUCCESS);

            // TM生成rm2上的事务分支id
            byte[] bqual2 = "b00002".getBytes();
            Xid xid2 = new MysqlXid(gtrid, bqual2, formatId);
            // 执行rm2上的事务分支
            rm2.start(xid2, XAResource.TMNOFLAGS);
            // 业务2:插入user_msg表
            PreparedStatement ps2 = conn2.prepareStatement("INSERT into user_msg VALUES ('88', '99', 'user99的备注')");
            ps2.execute();
            rm2.end(xid2, XAResource.TMSUCCESS);

            // ===================两阶段提交================================
            // phase1:询问所有的RM 准备提交事务分支
            int rm1Prepare = rm1.prepare(xid1);
            int rm2Prepare = rm2.prepare(xid2);
            // phase2:提交所有事务分支
            boolean onePhase = false;
            //TM判断有2个事务分支,所以不能优化为一阶段提交
            if (rm1Prepare == XAResource.XA_OK
                && rm2Prepare == XAResource.XA_OK
                ) {
                //所有事务分支都prepare成功,提交所有事务分支
                rm1.commit(xid1, onePhase);
                rm2.commit(xid2, onePhase);
            } else {
                //如果有事务分支没有成功,则回滚
                rm1.rollback(xid1);
                rm1.rollback(xid2);
            }
        } catch (XAException e) {
            // 如果出现异常,也要进行回滚
            e.printStackTrace();
        }
    }
}
  • atomikos包:封装AtomikosDataSourceBean代理datasource,使用@Transactional即可。