目录
我跨过山 涉过水 见过万物复苏 周而复始 如今 山是你 水是你 万物是你 心间一点清明 还是你 ..
什么是分布式
分布式就是将业务系统按照一定的维度拆分成多个模块,每个模块都是相对独立的,要想完成一个操作,需要各个模块相互合作才可以完成。
CAP理论
CAP理论是指在分布式系统中Consistency(一致性)、Availability(可用性)、Partition Tolerance(分区容错性)三者不能同时成立。在1998年由加州大学的计算机科学家Eric Brewer提出。
Consistency : Every read receives the most recent write or an error
Availability : Every request receives a (non-error) response – without the guarantee that it contains the most recent write
Partition tolerance : The system continues to operate despite an arbitrary number of messages being dropped (or delayed) by the network between nodes
翻译为:
一致性:对于客户端的每次读操作,要么读到的是绝对一致的最新数据,要么读取失败,强调的是数据的准确性。
可用性:任何客户端的请求都能得到响应数据,不会出现响应错误。对于访问者而言一定会返回数据,不会返回错误但不保证数据最新,强调的是不出错。
分区容错性:分布式系统通过网络进行通信,由于网络问题导致任意数据丢失或响应延迟,系统仍会继续提供服务不会挂掉。换句话说,分区容错性是站在分布式系统的角度,对访问本系统的客户端的一种承诺:我会一直运行,不管我的内部出现何种数据同步问题,强调的是不挂掉。
权衡CA
理论上一个分布式系统不可能同时满足C、A、P这三个特性。
note:其实这里有个关于CAP理论理解的误区。不要以为在所有时候都只能选择两个特性。在不存在网络失败的情况下(分布式系统正常运行时),C和A能够同时保证。只有当网络发生分区或失败时,才会在C和A之间做出选择。
对于分布式系统而言,P是前提必须保证,因为只要有网络交互就一定会有延迟和数据丢失,必须保证系统不能挂掉。所以只剩下C、A可以选择。要么保证数据一致性(保证数据绝对正确),要么保证可用性(保证系统不出错)。
C、A、P三者之间的冲突
假设有两台服务器,一台放着应用A和数据库V,一台放着应用B和数据库V,他们之间的网络可以互通,也就相当于分布式系统的两个部分。在满足一致性的时候,两台服务器(假设为N1,N2)的数据是一样的,DB0=DB0。在满足可用性的时候,用户不管是请求N1或者N2,都会得到立即响应。在满足分区容错性的情况下,N1和N2有任何一方宕机,或者网络不通的时候,都不会影响N1和N2彼此之间的正常运作。
用户通过N1中的A应用请求数据更新到服务器DB0,这时N1中的服务器DB0变为DB1,通过分布式系统的数据同步更新操作,N2服务器中的数据库V0也更新为了DB1(图2),这时用户通过B向数据库发起请求得到的数据就是即时更新后的数据DB1。
假设N1和N2之间通信的时候网络突然出现故障,有用户向N1发送数据更新请求,那N1中的数据DB0将被更新为DB1,由于网络是断开的,N2中的数据库仍旧是DB0,如果这个时候有用户向N2发送数据读取请求,由于数据还没有进行同步,应用程序没办法立即给用户返回最新的数据DB1,有两种选择,第一牺牲数据一致性,响应旧的数据DB0给用户,第二牺牲可用性,阻塞等待,直到网络连接恢复,数据更新操作完成之后,再给用户响应最新的数据DB1。
BASE理论
BASE理论是对CAP中的一致性和可用性进行一个权衡的结果,是Basically Available(基本可用)、Soft state(软状态)、Eventually Consistent(最终一致性)三个词语的简写。来源于大规模互联网的分布式实践总结,基于CAP逐渐演化而来的。核心思想是即使无法做到强一致性(Strong consistency),但每个系统可以根据自身的业务特点来达到最终一致性(Eventual consistency)。
基本可用(Basically Available)当分布式系统出现不可预知的故障时,允许损失部分的可用性。并不代表系统是不可用的状态,下面两个常见的例子:
响应延时:一个常规查询操作正常需要100ms,但是由于网络故障,或部分区域服务故障,导致此类查询的响应时间增加,变成1s甚至更多。
功能上的损失:在大规模的电商网站,由于购物高峰的流量激增,可能将部分用户流量引导到降级页面。
软状态(Soft state)也可以称之为弱状态,允许系统的数据存在一些中间的状态,并且该状态不会影响系统整体的可用性。简单说允许系统服务的副本间在数据同步时存在延时。
最终一致性(Eventually consistency)强调系统所有的副本数据,在一定时间的同步后,达到一致的效果。核心是在最终的时间点所有副本的状态保持一致,允许中间不同步的延迟存在。最终一致性是一种特殊的弱一致性。
分布式系统特征
(1)分布性 分布式系统由多台计算机组成,它们在地域上是分散的,可以散布在一个单位、一个城市、一个国家,甚至全球范围内。整个系统的功能是分散在各个节点上实现的,因而分布式系统具有数据处理的分布性。
(2)自治性 分布式系统中的各个节点都包含自己的处理机和内存,各自具有独立的处理数据的功能。通常彼此在地位上是平等的无主次之分,既能自治地进行工作,又能利用共享的通信线路来传送信息协调任务处理。
(3)并行性 一个大的任务可以划分为若干个子任务,分别在不同的主机上执行。
(4)全局性 分布式系统中必须存在一个单一的、全局的进程通信机制,使得任何一个进程都能与其他进程通信,并且不区分本地通信与远程通信。同时还应当有全局的保护机制。系统中所有机器上有统一的系统调用集合,它们必须适应分布式的环境。在所有CPU上运行同样的内核,使协调工作更加容易。
分布式系统分类
按地理环境衡量耦合度,分布式系统可以分为机体内系统、建筑物内系统、建筑物间系统和不同地理范围的区域系统等,它们的耦合度依次由高到低按应用领域的性质决定耦合度,可以分成三类:
第一种是面向计算任务的分布并行计算机系统和分布式多用户计算机系统,它们要求尽可能高的耦合度,以便发展成为能分担大型计算机和分时计算机系统所完成的工作。
第二种是面向管理信息的分布式数据处理系统。耦合度可以适当降低。
第三种是面向过程控制的分布式计算机控制系统。耦合度要求适中,当然对于某些实时应用,其耦合度的要求可能很高。
常见的分布式系统
分布式存储系统
分布式存储系统分为两大类中心化控制架构(HDFS)和Ceph(完全无中心架构)。
中心化控制架构:以单独元数据服务器为中间控制,具体数据存储服务器为分布式存储的架构存储。元数据:描述数据的数据(包含具体数据的路径,以及相关信息)。
用户第一步先访问元数据的服务器节点,这个元数据服务器为中间控制,每一次访问真正数据前,都需要先访问元数据服务节点,元服务器节点存储的是元数据,元数据是描述数据的数据,元数据存储了真实数据的描述信息,包含具体数据的路径以及相关信息;
完全无中心架构:客户端通过设备映射关系计算出具体数据的位置,直接访问。客户端通过Mon通信服务,计算得到客户端需要写到的具体文件路径。
客户端有一个Mon服务,根据Mon服务经过一系列的运算可以得到一个想要访问的数据的具体地址。Mon在客户端和数据端都存在,两者之间会有一个通信来保持相关节点信息的更新,客户端通过Mon服务和对应的信息可以计算得到想要访问的具体数据的具体文件路径,以便于直接访问。
分布式计算系统
常用的分布式计算系统主要有三类:Hadoop Map Reduce、Spark(批处理+实时处理)、Flink(实时处理)。
Hadoop Map Reduce
一种大数据编程模型,将数据处理运用Map和Reduce的概念进行分而治之将大任务划分成小任务的处理。
应用场景:批处理(一次性处理数据)
Hadoop Map Reduce的特点:分布式计算、中间结果都存放在硬盘、无法查询上一步的结果、效率一般。
Spark
基于内存优化的分布式大数据计算框架,分而治之,将大任务划分成小任务,引入RDD概念。
应用场景:批处理(效率最好) + 流处理(微小批处理)
在Spark中,每一个执行计划的中间结果都是写在分布式内存中,当需要读取数据的时候,可以直接从分布式内存中直接进行读取,这种基于内存优化的分布式大数据框架在单纯计算的时候高于Hadoop Map Reduce,所以Spark在批处理中效率是最好的,而且Spark也可以做流处理,流处理就是实时的处理。
Flink
分布式大数据处理框架,对流数据可以进行计算,实时处理。
分布式消息队列系统
Kafka是一个分布式消息队列。具有高性能、持久化、多副本备份、横向扩展能力。生产者往队列里写消息,消费者从队列里取消息进行业务逻辑计算。
Topic:定义特定的消息,可以让生产者和消费者都从该Topic中进行数据读写;
Partition:一个Topic可以分成好几个分区;
Broker:服务器存储数据,可以做备份。
分布式机器学习系统
Spark ML和分布式Tensorflow。
Spark ML
以Spark为计算引擎的分布式机器学习框架。提供一个分布式的模型训练环境,提供一个训练数据集分布式处理的环境。常见机器学习框架比较:Sklearn、Tensorflow、Pytorch。
分布式TensorFlow
分布式TensorFlow是在分布式集群上进行训练
分布式框架:SpringCloud微服务全家桶式的框架,内部涵盖了开发中所需的各个组件。SpringCloud是http协议,通过心跳通知服务的消费者。Dubbo是Alibaba开源的高性能轻量级分布式服务框架,提供面向接口的远程方法调用,负载均衡和智能容错以及服务的自动注册和发现机制。
分布式数据库
TiDB、Google Spanner、OceanBase。
分布式系统优点
(1)资源共享 若干不同的节点通过通信网络彼此互联,一个节点上的用户可以使用其他节点上的资源,如分布式系统允许设备共享,使众多用户共享昂贵的外部设备。
(2)加快计算速度 如果一个特定的计算任务可以划分为若干个并行运行的子任务,则可把这些子任务分散到不同的节点上,使它们同时在这些节点上运行,从而加快计算速度。
(3)可靠性高 分布式系统具有高可靠性,如果其中某个节点失效了,则其余的节点可以继续操作,整个系统不会因为一个或少数几个节点的故障而全体崩溃。所以啊分布式系统有很好的容错性能。
(4)通信方便、快捷 分布式系统中各个节点通过一个通信网络互联在一起。通信网络由通信线路、调制解调器和通信处理器等组成,不同节点的用户可以方便地交换信息。
分布式系统缺点
Dubbo可以把单个系统弄成分布式系统,分布式系统带来好处的同时也带来了相应的问题,几个常见的问题分布式事务、接口幂等性、分布式锁、分布式Session等。
网络通信:网络本身的不可靠性,因此会涉及到一些网络通信问题包括:传输问题,高负载,信息丢失等。
网络分区(脑裂):当网络发生异常导致分布式系统中部分节点之间的网络延时不断增大,最终导致组成分布式架构的所有节点,只有部分节点能够正常通信。
三态:在分布式架构三种状态成功、失败、超时。
安全性:开放系统的特性让分布式计算系统存在着数据的安全性和共享的风险等问题。
分布式事务:ACID(原子性、一致性、隔离性、持久性)。
中心化和去中心化:冷备或者热备。
开发成本增加,服务间调用的网络传输损耗,数据传输的安全性问题,故障排除定位困难。
分布式和集群
集群:复制模式,每台机器做一样的事。分布式架构里面,很多的架构思想采用的是:当集群发生故障的时候,集群中的人群会自动选举出一个新的领导。
分布式:两台机器分工合作,每台机器做的不一样。
分布式锁
锁是对同一时间只能有一个线程访问某个资源的限制,synchronized关键字虽然可以处理多线程问题,保证同一时间只有一个线程访问到某个资源,做不到粗细度控制,只适用于单机系统,对分布式集群系统就满足不了。Java虽然提供了Lock接口,可以做到粗细度控制,却依然只适用于单击系统。现在越来越多的项目,为了追求性能和并发,采用了soa架构,微服务架构,于是就会出现多个模块单独的服务。如何保证多个节点的同步执行,就用到了分布式锁。
Redis分布式锁
官方叫做RedLock
算法,是Redis官方支持的分布式锁算法。这个分布式锁有3个重要的点:互斥(只能有一个客户端获取锁)、不能死锁、容错(只要大部分Redis节点创建了这把锁就可以)。Redis普通的分布式锁实现方式,
SET key value [EX seconds] [PX milliseconds] NX //创建一个 key,这样就算加锁
NX
:表示只有key
不存在的时候才会设置成功,如果此时redis中存在这个 key
,那么设置失败,返回 nil
。
EX seconds
:设置key
的过期时间,精确到秒级。意思是 seconds
秒后锁自动释放,别人创建的时候如果发现已经有了就不能加锁了。
PX milliseconds
:同样是设置key
的过期时间,精确到毫秒级,比如执行以下命令:
SET resource_name my_random_value PX 30000 NX
释放锁就是删除key ,但是一般可以用lua
脚本删除,判断value是否一样再删除
-- 删除锁的时候,找到 key 对应的 value,跟自己传过去的 value 做比较,如果是一样的才删除。
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
如果某个客户端获取到了锁,阻塞了很长时间才执行完,超时了30s,此时可能已经自动释放锁了,可能别的客户端已经获取到了这个锁,这个时候直接删除key的话会有问题,所以把当前线程的ID当作set
操作中的value
,每次释放锁时进行判断,看是否是自己占有的锁,判断和删除不是原子性操作,可以使用守护线程来解决。
RedLock算法
假设一个Redis cluster有5个Redis master实例,然后执行如下步骤获取一把锁:
- 获取当前时间戳,单位是毫秒。
- 轮流尝试在每个master节点上创建锁,超时时间较短,一般就几十毫秒(客户端为了获取锁使用的超时时间比自动释放锁的总时间要小。如果自动释放时间是10秒,那么超时时间可能在
5~50
毫秒范围内)。 - 尝试在大多数节点上建立一个锁,比如5个节点就要求是 3 个节点
n / 2 + 1
。 - 客户端计算建立好锁的时间,如果建立锁的时间小于超时时间,就算建立成功了。
- 要是锁建立失败了,那么就依次之前建立过的锁删除。
- 只要别人建立了一把分布式锁,你就得不断轮询去尝试获取锁。
Redis 官方给出了以上两种基于Redis实现分布式锁的方法,详细说明可查看:Distributed locks with Redis – Redis 。
zk分布式锁
zk分布式锁就是某个节点尝试创建临时znode,此时创建成功了就获取了这个锁,这个时候别的客户端来创建锁会失败,只能注册个监听器监听这个锁。释放锁就是删除这个znode,一旦释放掉就会通知客户端,然后有一个等待着的客户端就可以再次重新加锁。
/**
* ZooKeeperSession
*/
public class ZooKeeperSession {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private ZooKeeper zookeeper;
private CountDownLatch latch;
public ZooKeeperSession() {
try {
this.zookeeper = new ZooKeeper("192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181", 50000, new ZooKeeperWatcher());
try {
connectedSemaphore.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("ZooKeeper session established......");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 获取分布式锁
*
* @param productId
*/
public Boolean acquireDistributedLock(Long productId) {
String path = "/product-lock-" + productId;
try {
zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
return true;
} catch (Exception e) {
while (true) {
try {
// 相当于是给node注册一个监听器,去看看这个监听器是否存在
Stat stat = zk.exists(path, true);
if (stat != null) {
this.latch = new CountDownLatch(1);
this.latch.await(waitTime, TimeUnit.MILLISECONDS);
this.latch = null;
}
zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
return true;
} catch (Exception ee) {
continue;
}
}
}
return true;
}
/**
* 释放掉一个分布式锁
*
* @param productId
*/
public void releaseDistributedLock(Long productId) {
String path = "/product-lock-" + productId;
try {
zookeeper.delete(path, -1);
System.out.println("release the lock for product[id=" + productId + "]......");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 建立 zk session 的 watcher
*/
private class ZooKeeperWatcher implements Watcher {
public void process(WatchedEvent event) {
System.out.println("Receive watched event: " + event.getState());
if (KeeperState.SyncConnected == event.getState()) {
connectedSemaphore.countDown();
}
if (this.latch != null) {
this.latch.countDown();
}
}
}
/**
* 封装单例的静态内部类
*/
private static class Singleton {
private static ZooKeeperSession instance;
static {
instance = new ZooKeeperSession();
}
public static ZooKeeperSession getInstance() {
return instance;
}
}
/**
* 获取单例
*
* @return
*/
public static ZooKeeperSession getInstance() {
return Singleton.getInstance();
}
/**
* 初始化单例的便捷方法
*/
public static void init() {
getInstance();
}
}
也可以采用另一种方式,创建临时顺序节点,如果有一把锁,被多个人给竞争,此时多个人会排队,第一个拿到锁的人会执行,然后释放锁,后面的每个人都会去监听排在自己前面的那个人创建的node上,一旦某个人释放了锁,排在自己后面的人就会被ZooKeeper给通知,一旦被通知了之后就获取锁执行代码。
public class ZooKeeperDistributedLock implements Watcher {
private ZooKeeper zk;
private String locksRoot = "/locks";
private String productId;
private String waitNode;
private String lockNode;
private CountDownLatch latch;
private CountDownLatch connectedLatch = new CountDownLatch(1);
private int sessionTimeout = 30000;
public ZooKeeperDistributedLock(String productId) {
this.productId = productId;
try {
String address = "192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181";
zk = new ZooKeeper(address, sessionTimeout, this);
connectedLatch.await();
} catch (IOException e) {
throw new LockException(e);
} catch (KeeperException e) {
throw new LockException(e);
} catch (InterruptedException e) {
throw new LockException(e);
}
}
public void process(WatchedEvent event) {
if (event.getState() == KeeperState.SyncConnected) {
connectedLatch.countDown();
return;
}
if (this.latch != null) {
this.latch.countDown();
}
}
public void acquireDistributedLock() {
try {
if (this.tryLock()) {
return;
} else {
waitForLock(waitNode, sessionTimeout);
}
} catch (KeeperException e) {
throw new LockException(e);
} catch (InterruptedException e) {
throw new LockException(e);
}
}
public boolean tryLock() {
try {
// 传入进去的locksRoot + “/” + productId
// 假设productId代表了一个商品id,比如说1
// locksRoot = locks
// /locks/10000000000,/locks/10000000001,/locks/10000000002
lockNode = zk.create(locksRoot + "/" + productId, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 看看刚创建的节点是不是最小的节点
// locks:10000000000,10000000001,10000000002
List<String> locks = zk.getChildren(locksRoot, false);
Collections.sort(locks);
if(lockNode.equals(locksRoot+"/"+ locks.get(0))){
//如果是最小的节点,则表示取得锁
return true;
}
//如果不是最小的节点,找到比自己小1的节点
int previousLockIndex = -1;
for(int i = 0; i < locks.size(); i++) {
if(lockNode.equals(locksRoot + “/” + locks.get(i))) {
previousLockIndex = i - 1;
break;
}
}
this.waitNode = locks.get(previousLockIndex);
} catch (KeeperException e) {
throw new LockException(e);
} catch (InterruptedException e) {
throw new LockException(e);
}
return false;
}
private boolean waitForLock(String waitNode, long waitTime) throws InterruptedException, KeeperException {
Stat stat = zk.exists(locksRoot + "/" + waitNode, true);
if (stat != null) {
this.latch = new CountDownLatch(1);
this.latch.await(waitTime, TimeUnit.MILLISECONDS);
this.latch = null;
}
return true;
}
public void unlock() {
try {
// 删除/locks/10000000000节点
// 删除/locks/10000000001节点
System.out.println("unlock " + lockNode);
zk.delete(lockNode, -1);
lockNode = null;
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
public class LockException extends RuntimeException {
private static final long serialVersionUID = 1L;
public LockException(String e) {
super(e);
}
public LockException(Exception e) {
super(e);
}
}
}
但是使用zk临时节点会存在另一个问题由于zk依靠session定期的心跳来维持客户端,如果客户端进入长时间的GC,可能会导致zk认为客户端宕机而释放锁,让其他的客户端获取锁,但是客户端在GC恢复后,会认为自己还持有锁,从而可能出现多个客户端同时获取到锁的情形。针对这种情况,可以通过JVM调优,尽量避免长时间GC的情况发生。
Redis分布式锁和zk分布式锁
redis分布式锁需要不断去尝试获取锁,比较消耗性能。zk分布式锁获取不到锁注册个监听器即可,不需要不断主动尝试获取锁,性能开销较小。如果是Redis获取锁的那个客户端出现问题挂了,只能等待超时时间之后才能释放锁。zk的话因为创建的是临时znode,只要客户端挂了,znode就没了此时就自动释放锁。
分布式事务
什么是分布式事务 如何解决分布式事务问题 分布式事务的实现主要有以下6种方案:XA 方案、TCC 方案、SAGA 方案、本地消息表、可靠消息最终一致性方案、最大努力通知方案。
特别严格的场景用TCC来保证强一致性;然后其他的一些场景基于阿里的RocketMQ来实现分布式事务。业务系统对于资金没那么敏感的话就用可靠消息最终一致性方案。
两阶段提交方案(XA方案)
准备阶段,协调者询问参与者是否执行成功,参与者发回事务执行结果。
提交阶段,如果每个参与者都执行成功,协调者发送通知让参与者提交事务,否则协调者通知参与者回滚。
存在的问题
所有事务参与者等待其他参与者响应都是阻塞状态
协调者发生故障影响很大,第二阶段故障会造成所有参与者一直等待
网络异常导致协调者只发送了部分通知,导致数据不一致
TCC方案(补偿事务)
TCC的全称是: Try
、 Confirm
、 Cancel
。Try阶段:这个阶段是对各个服务的资源做检测以及对资源进行锁定或者预留。Confirm 阶段:这个阶段是在各个服务中执行实际的操作。Cancel 阶段:如果任何一个服务的业务方法执行出错,那么这里就需要进行补偿,执行已经执行成功的业务逻辑的回滚操作。这种方案严重依赖于写代码来回滚和补偿,业务逻辑代码维护比较困难,一般来说跟钱相关的,跟钱打交道的,支付、交易相关的场景会用TCC,严格保证分布式事务要么全部成功,要么全部回滚,严格保证资金的正确性。而且最好是你的各个业务执行的时间都比较短。
Saga 方案
金融核心等业务可能会选择TCC方案,以追求强一致性和更高的并发量,对于更多的业务系统往往会选择补偿事务,补偿事务处理在30多年前就提出了Saga理论,随着微服务的发展,近些年才逐步受到大家的关注。目前业界比较公认的是采用Saga作为长事务的解决方案。
基本原理
业务流程中每个参与者都提交本地事务,若某一个参与者失败,则补偿前面已经成功的参与者。下图左侧是正常的事务流程,当执行到T3时发生了错误,则开始执行右边的事务补偿流程,反向执行T3、T2、T1的补偿服务C3、C2、C1,将T3、T2、T1已经修改的数据补偿掉。
使用场景
对于一致性要求高、短流程、并发高的场景,如:金融核心系统,会优先考虑TCC方案。而在另外一些场景下,我们并不需要这么强的一致性,只需要保证最终一致性即可。比如很多金融核心的业务(渠道层、产品层、系统集成层),这些系统的特点是最终一致即可、流程多、流程长、还可能要调用其它公司的服务。这种情况如果选择TCC方案开发的话,一来成本高,二来无法要求其它公司的服务也遵循TCC模式。同时流程长,事务边界太长,加锁时间长,也会影响并发性能。所以Saga模式的适用场景是:
- 业务流程长、业务流程多;
- 参与者包含其它公司或遗留系统服务,无法提供TCC模式要求的三个接口。
优势
- 一阶段提交本地事务,无锁,高性能;
- 参与者可异步执行,高吞吐;
- 补偿服务易于实现,因为一个更新操作的反向操作是比较容易理解的。
缺点
- 不保证事务的隔离性。
本地消息表
本地消息表是国外的ebay提出的这么一套思想。
利用本地事务保证对两个表(本地消息表和业务数据表)操作满足事务性,并使用了消息队列,分布式事务操作的一方完成业务数据操作后向本地消息表发送一个消息,本地事务能保证此消息一定会被写入本地消息表,将本地消息表中的消息转发到kafka等消息队列,转发成功则将消息从本地消息表中删除,否则继续重新转发,分布式事务操作另一方从消息队列中读取消息,执行操作。保证最终一致性实现方案。这个方案最大的问题在于严重依赖于数据库的消息表来管理事务,不适合高并发场景无法扩容,一般很少用。
可靠消息最终一致性方案
直接基于MQ来实现事务,比如阿里的RocketMQ就支持消息事务。大概的意思就是:
- A系统先发送一个prepared消息到mq,如果这个prepared消息发送失败那么就直接取消操作别执行了;
- 如果这个消息发送成功,那么接着执行本地事务,如果成功就告诉mq发送确认消息,如果失败就告诉mq回滚消息;
- 如果发送了确认消息,那么此时B系统会接收到确认消息,然后执行本地的事务;
- mq会自动定时轮询所有prepared消息回调你的接口,问你,这个消息是不是本地事务处理失败了,所有没发送确认的消息,是继续重试还是回滚?一般来说这里你就可以查下数据库看之前本地事务是否执行,如果回滚了,那么这里也回滚吧。这个就是避免可能本地事务执行成功了,而确认消息却发送失败了。
- 这个方案里,要是系统 B 的事务失败了咋办?重试咯,自动不断重试直到成功,如果实在是不行,要么就是针对重要的资金类业务进行回滚,比如 B 系统本地回滚后,想办法通知系统 A 也回滚;或者是发送报警由人工来手工回滚和补偿。
- 这个还是比较合适的,目前国内互联网公司大都是这么玩儿的,要不你就用 RocketMQ 支持的,要不你就自己基于类似 ActiveMQ?RabbitMQ?自己封装一套类似的逻辑出来,总之思路就是这样子的。
最大努力通知方案
这个方案的大致意思就是:系统A本地事务执行完之后,发送消息到MQ;这里会有个专门消费MQ的最大努力通知服务,这个服务会消费MQ然后写入数据库中记录下来,或者是放入个内存队列也可以,接着调用系统B的接口;要是系统B执行失败了,那么最大努力通知服务就定时尝试重新调用系统B,反复N次,最后还是不行就放弃。
分布式会话
Session是面向连接的状态信息,是对Http无状态协议的补充。浏览器有个Cookie,在一段时间内这个Cookie都存在,然后每次发请求过来都带上一个特殊的jsessionid cookie
,就根据这个东西,在服务端可以维护一个对应的Session域,里面可以放数据。一般的话只要没关掉浏览器,Cookie还在,那么对应的那个Session就在,如果Cookie没了,Session也就没了。常见于什么购物车、登录状态保存。单体系统的时候可以这么用Session,但是分布式系统那么多的服务,Session状态在哪儿维护呢,常见常用的方法:
JWT Token
完全不用Session,使用JWT Token储存用户身份,然后再从数据库或者cache中获取其他的信息。这样无论请求分配到哪个服务器都无所谓。
Tomcat + Redis
使用Session的代码,基于Tomcat原生的Session支持即可,然后就是用一个叫做Tomcat RedisSessionManager
的东西,所有部署的Tomcat都将Session数据存储到Redis即可。在Tomcat的配置文件中配置:
<Valve className="com.orangefunction.tomcat.redissessions.RedisSessionHandlerValve" />
<Manager className="com.orangefunction.tomcat.redissessions.RedisSessionManager"
host="{redis.host}"
port="{redis.port}"
database="{redis.dbnum}"
maxInactiveInterval="60"/>
然后指定Redis的host和port
<Valve className="com.orangefunction.tomcat.redissessions.RedisSessionHandlerValve" />
<Manager className="com.orangefunction.tomcat.redissessions.RedisSessionManager"
sentinelMaster="mymaster"
sentinels="<sentinel1-ip>:26379,<sentinel2-ip>:26379,<sentinel3-ip>:26379"
maxInactiveInterval="60"/>
还可以用上面这种方式基于Redis哨兵支持的Redis高可用集群来保存Session数据。
Spring Session + Redis
上面所说的第二种方式会与Tomcat容器重耦合,如果要将Web容器迁移成Jetty,难道还要重新把 Jetty都配置一遍?Tomcat + Redis 的方式好用,但是会严重依赖于 Web 容器,不好将代码移植到其他Web容器上去,尤其是你要是换了技术栈咋整?比如换成了 Spring Cloud 或者是 Spring Boot 之类的呢?所以比较好的还是基于Java一站式解决方案,也就是 Spring。Spring 基本上承包了大部分需要使用的框架,Spirng Cloud做微服务,Spring Boot做脚手架,用Spring Session是一个很好的选择。
在 pom.xml 中配置:
<dependency>
<groupId>org.springframework.session</groupId>
<artifactId>spring-session-data-redis</artifactId>
<version>1.2.1.RELEASE</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.8.1</version>
</dependency>
在 Spring 配置文件中配置:
<bean id="redisHttpSessionConfiguration"
class="org.springframework.session.data.redis.config.annotation.web.http.RedisHttpSessionConfiguration">
<property name="maxInactiveIntervalInSeconds" value="600"/>
</bean>
<bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
<property name="maxTotal" value="100" />
<property name="maxIdle" value="10" />
</bean>
<bean id="jedisConnectionFactory"
class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory" destroy-method="destroy">
<property name="hostName" value="${redis_hostname}"/>
<property name="port" value="${redis_port}"/>
<property name="password" value="${redis_pwd}" />
<property name="timeout" value="3000"/>
<property name="usePool" value="true"/>
<property name="poolConfig" ref="jedisPoolConfig"/>
</bean>
在 web.xml 中配置:
<filter>
<filter-name>springSessionRepositoryFilter</filter-name>
<filter-class>org.springframework.web.filter.DelegatingFilterProxy</filter-class>
</filter>
<filter-mapping>
<filter-name>springSessionRepositoryFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
示例代码:
@RestController
@RequestMapping("/test")
public class TestController {
@RequestMapping("/putIntoSession")
public String putIntoSession(HttpServletRequest request, String username) {
request.getSession().setAttribute("name", "leo");
return "ok";
}
@RequestMapping("/getFromSession")
public String getFromSession(HttpServletRequest request, Model model){
String name = request.getSession().getAttribute("name");
return name;
}
}
给Spring Session配置基于Redis来存储Session数据,然后配置了一个Spring Session的过滤器,这样的话,Session相关操作都会交给Spring Session来管了。接着在代码中用原生的Session操作,就是直接基于Spring Session从Redis中获取数据了。
redis缓存使用
一般使用注解@EnableCaching、@Cacheable、@CachePut和@CacheEvict等就可以实现。
@EnableCaching,如果是SpringBoot的话,可以直接在启动类上使用,在官方文档中给出的使用方法如下:
@Configuration
@EnableCaching
public class AppConfig {
@Bean
public MyService myService() {
// configure and return a class having @Cacheable methods
return new MyService();
}
@Bean
public CacheManager cacheManager() {
// configure and return an implementation of Spring's CacheManager SPI
SimpleCacheManager cacheManager = new SimpleCacheManager();
cacheManager.setCaches(Arrays.asList(new ConcurrentMapCache("default")));
return cacheManager;
}
}
<beans>
<cache:annotation-driven/>
<bean id="myService" class="com.foo.MyService"/>
<bean id="cacheManager"
class="org.springframework.cache.support.SimpleCacheManager">
<property name="caches">
<set>
<bean
class="org.springframework.cache.concurrent.ConcurrentMapCacheFactoryBean">
<property name="name" value="default"/>
</bean>
</set>
</property>
</bean>
</beans>
@Cacheable,配置要缓存的方法,该方法必须要有返回值,返回值要序列化。有属性cacheNames、key、condition、unless。
@CachePut,对已有的缓存进行更新,配置的方法也必须要有返回值,且和已缓存值的类型要一致,cacheNames和key也要一致。
@CacheEvict,清除已有的缓存,cacheNames和key指定要清除的缓存。
zookeeper的使用场景
分布式协调、分布式锁、元数据/配置信息管理、HA高可用性
分布式协调
A系统发送个请求到mq,然后B系统消息消费之后处理了,那A系统如何知道B系统的处理结果?A系统发送请求之后在zookeeper上对某个节点的值注册个监听器,一旦B系统处理完了就修改zookeeper那个节点的值,A系统立马就可以收到通知。
分布式锁
对某一个数据连续发出两个修改操作,两台机器同时收到了请求,但是只能一台机器先执行完另外一个机器再执行。此时可以使用zookeeper分布式锁,一个机器接收到了请求之后先获取 zookeeper上的一把分布式锁,创建一个znode后执行操作,另外一个机器也尝试去创建那个znode,结果发现自己创建不了,因为被别人创建了,那只能等着,等第一个机器执行完了自己再执行。
元数据/配置信息管理
zookeeper可以用作很多系统的配置信息的管理,比如kafka、storm等等很多分布式系统都会选用 zookeeper来做一些元数据、配置信息的管理,包括dubbo注册中心也支持zookeeper
HA高可用性
比如hadoop、hdfs、yarn等很多大数据系统,都选择基于zookeeper来开发HA高可用机制,就是一个重要进程一般会做主备两个,主进程挂了立马通过zookeeper感知到切换到备用进程。
如何设计一个分布式系统
系统拆分
将业务系统按照一定的维度拆分成多个模块,把通用性较强的业务独立到共通的服务中,通过调用服务减少耦合。
系统容错
架构层面要考虑服务重试、服务降级以及熔断和限流。业务功能层面服务接口的幂等性、异步处理、事务补偿机制等。
高可用
服务集群,采取负载均衡策略、主从切换、故障转移等措施。还有网络通信问题,由于网络延迟、网络故障导致网络通信问题。
数据一致性
数据一致性问题,在分布式系统中,由于数据架构需要提供多节点部署,不同节点之间通信就会存在数据差异,让业务不能正常运转,而如何让不同模块之间的数据保证完整性,一致性也是必须要解决的问题,这个问题一般采用分布式事务处理(如消息发布/订阅模式),牺牲一部分性能去保证数据一致性。