参考
CAP
这个定理的内容是指:在一个分布式系统中、Consistency(一致性)、Availability(可用性)、Partitiontolerance(分区容错性),三者不可得兼。
- 一致性(C)
在分布式系统中的所有数据备份,在同一时刻是否同样的值。(等同于所有节点访问同一份最新的数据副本) - 可用性(A)
在集群中一部分节点故障后,集群整体是否还能响应客户端的读写请求。(对数据更新具备高可用性) - 分区容错性(P)
以实际效果而言,分区相当于对通信的时限要求。系统如果不能在时限内达成数据一致性,就意味着发生了分区的情况,必须就当前操作在C和A之间做出选择
BASE理论(Basically Available Soft state Eventually consistent):即使无法做到强一致性,但每个应用都可以根据自身业务特点,采用适当的方式来使系统达到最终一致性。
CAP的应用:
- 分布式锁:redis分布式锁是AP,zk分布式锁CP
- 服务注册发现:zk是CP(选举时不可用),Eureka是AP(节点平等,挂掉会进入其他节点)
四种模式
AT模式(常用)需要客户端有undo_log表
XA模式基于主流关系型数据库能力
SAGA需要用前端组建绘制编排图之后生成json在服务中使用
XA
- sql语法
XA {START|BEGIN} xid [JOIN|RESUME] 开启XA事务
业务sql
XA END xid [SUSPEND [FOR MIGRATE]]
XA PREPARE xid 二阶段提交的准备阶段
XA COMMIT xid [ONE PHASE] 二阶段提交的提交阶段
XA ROLLBACK xid 回滚
XA RECOVER [CONVERT XID] 列出所有处于prepared状态的事务
- jdbc方式
package study.xa;
import com.mysql.jdbc.jdbc2.optional.MysqlXAConnection;
import com.mysql.jdbc.jdbc2.optional.MysqlXid;
import javax.sql.XAConnection;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
/***
* @Description mysql分布式事务XAConnection模拟
* @author denny
* @date 2019/4/3 上午9:15
*/
public class MysqlXaConnectionTest {
public static void main(String[] args) throws SQLException {
//true表示打印XA语句,,用于调试
boolean logXaCommands = true;
// 获得资源管理器操作接口实例 RM1
Connection conn1 = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "12345");
XAConnection xaConn1 = new MysqlXAConnection((com.mysql.jdbc.Connection)conn1, logXaCommands);
XAResource rm1 = xaConn1.getXAResource();
// 获得资源管理器操作接口实例 RM2
Connection conn2 = DriverManager.getConnection("jdbc:mysql://localhost:3306/test2", "root", "12345");
XAConnection xaConn2 = new MysqlXAConnection((com.mysql.jdbc.Connection)conn2, logXaCommands);
XAResource rm2 = xaConn2.getXAResource();
// AP请求TM执行一个分布式事务,TM生成全局事务id
byte[] gtrid = "g12345".getBytes();
int formatId = 1;
try {
// ==============分别执行RM1和RM2上的事务分支====================
// TM生成rm1上的事务分支id
byte[] bqual1 = "b00001".getBytes();
Xid xid1 = new MysqlXid(gtrid, bqual1, formatId);
// 执行rm1上的事务分支 One of TMNOFLAGS, TMJOIN, or TMRESUME.
rm1.start(xid1, XAResource.TMNOFLAGS);
// 业务1:插入user表
PreparedStatement ps1 = conn1.prepareStatement("INSERT into user VALUES ('99', 'user99')");
ps1.execute();
rm1.end(xid1, XAResource.TMSUCCESS);
// TM生成rm2上的事务分支id
byte[] bqual2 = "b00002".getBytes();
Xid xid2 = new MysqlXid(gtrid, bqual2, formatId);
// 执行rm2上的事务分支
rm2.start(xid2, XAResource.TMNOFLAGS);
// 业务2:插入user_msg表
PreparedStatement ps2 = conn2.prepareStatement("INSERT into user_msg VALUES ('88', '99', 'user99的备注')");
ps2.execute();
rm2.end(xid2, XAResource.TMSUCCESS);
// ===================两阶段提交================================
// phase1:询问所有的RM 准备提交事务分支
int rm1Prepare = rm1.prepare(xid1);
int rm2Prepare = rm2.prepare(xid2);
// phase2:提交所有事务分支
boolean onePhase = false;
//TM判断有2个事务分支,所以不能优化为一阶段提交
if (rm1Prepare == XAResource.XA_OK
&& rm2Prepare == XAResource.XA_OK
) {
//所有事务分支都prepare成功,提交所有事务分支
rm1.commit(xid1, onePhase);
rm2.commit(xid2, onePhase);
} else {
//如果有事务分支没有成功,则回滚
rm1.rollback(xid1);
rm1.rollback(xid2);
}
} catch (XAException e) {
// 如果出现异常,也要进行回滚
e.printStackTrace();
}
}
}
- atomikos包:封装AtomikosDataSourceBean代理datasource,使用@Transactional即可。