深入解析ZooKeeper核心机制与应用(下)

发布于:2025-09-10 ⋅ 阅读:(19) ⋅ 点赞:(0)

目录

Curator

引入Curator的Maven依赖

配置文件

项目示例演示

ZK的读写锁设计

读锁概述

如何实现读写锁?

Watch机制

概述

ZK集群

集群搭建

Zookeeper集群的leader选举

Leader选举流程

 选举触发

第一轮选举投票

第二轮选举投票

确定 leader后

zk集群同步


接上篇深入解析ZooKeeper核心机制与应用(上)主要介绍了原理部分,本篇更注重应用部分

Curator

官网介绍 https://curator.apache.org/

        Curator是Netflix公司开源的一套Zookeeper客户端框架,Curator是对Zookeeper支持最好的客户端框架。Curator封装了大部分Zookeeper的功能,比如:Leader选举、分布式锁等等,极大的减轻了开发者在使用Zookeeper时的底层细节开发工作。

引入Curator的Maven依赖

        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>5.2.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        

配置文件

@Data
@Component
@ConfigurationProperties(prefix = "curator")
public class ZkProperties {

    // 重试次数
    private int retryCount;

    // 重试的间隔时间(单位:毫秒)
    private int sleepBetweenRetries;

    // zk连接地址(多个zk的时候,用逗号分割)
    private String connect;

    // 会话超时时间(单位:毫秒)
    private int sessionTimout;

    // 连接超时时间(单位:毫秒)
    private int connectionTimeout;
}
@Configuration
public class CuratorConfig {

    @Resource
    private ZkProperties zkProperties;

    @Bean(initMethod = "start")
    public CuratorFramework curatorFramework() {
        RetryPolicy retryPolicy = new RetryNTimes(zkProperties.getRetryCount(), zkProperties.getSleepBetweenRetries());
        CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(
                zkProperties.getConnect(),
                zkProperties.getSessionTimout(),
                zkProperties.getConnectionTimeout(), retryPolicy);
        return curatorFramework;
    }
}

项目示例演示

@Slf4j
@SpringBootTest
class ZookeeperDemoApplicationTests {

    private final static String NODE_NAME = "/curator-node";
    private final static String EPHEMERAL_NODE_NAME = "/curator-ephemeral-node-";
    private final static String PARENT_NODE_NAME = "/animal/dog/whiteDog";
    private final static String WATCH_NODE_NAME = "/curator-watch-node";
    private final static byte[] VALUE_BYTES = "muse".getBytes();
    private final static byte[] NEW_VALUE_BYTES = "muse-new".getBytes();
    private final static Gson GSON = new Gson();

    @Resource
    private CuratorFramework curatorFramework;

    /**
     * 创建永久节点/curator-node,并存储值“muse”
     */
    @Test
    void createNode() throws Throwable {
        String path = curatorFramework.create().forPath(NODE_NAME, VALUE_BYTES);
        log.info("createNode success! path={}", path);
    }

    /**
     * 创建临时序号节点 /curator-ephemeral-node-[序号]
     */
    @Test
    @SneakyThrows
    void createEphemeralSeqNode() {
        String path = curatorFramework.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).
                forPath(EPHEMERAL_NODE_NAME, VALUE_BYTES);
        log.info("createEphemeralSeqNode success! path={}", path);
        Thread.sleep(20000); // 线程睡眠20秒钟,这个时间可以查询到临时节点,方法执行完毕,临时节点就不存在了
    }

    /**
     * 如果父节点不存在,则连带着创建父类节点 /animal/dog/whiteDog
     */
    @Test
    @SneakyThrows
    void createWithParent() {
        String path = curatorFramework.create().creatingParentsIfNeeded().forPath(PARENT_NODE_NAME, VALUE_BYTES);
        log.info("createWithParent success! path={}", path);
    }

    /**
     * 获取节点/curator-node上存储的值
     */
    @Test
    @SneakyThrows
    void getData() {
        byte[] valueByte = curatorFramework.getData().forPath(NODE_NAME);
        log.info("getData success! valueByte={}", new String(valueByte));
    }

    /**
     * 修改节点/curator-node的值为“muse-new”
     */
    @Test
    @SneakyThrows
    void setData() {
        curatorFramework.setData().forPath(NODE_NAME, NEW_VALUE_BYTES);
    }

    /**
     * 删除节点/curator-node及其包含的子节点
     */
    @Test
    @SneakyThrows
    void deleteNode() {
        curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath(NODE_NAME);
    }

   }

ZK的读写锁设计

读锁概述

读锁(Read Lock)并发的时候,多个线程都可以去执行读操作,彼此不会阻塞。

加读锁成功的前提是:没有对其待访问的资源加写锁。

写锁(Write Lock)并发时如果多个线程都要去获得写锁,那么只有一条线程可以获得写锁,彼此会发生阻塞。

加写锁成功的前提是:没有对其待访问的资源加任和锁(无论是写锁or读锁)。

如何实现读写锁?

首先,在/lock路径下创建临时序号节点/lock/WRITE- 或 /lock/READ-,该节点就代表将要获取的Write/Read锁节点。其次:获取/lock下的子节点,并按照临时节点的顺序号排序。最后:检查此Read/Write锁之前是否有Write锁,若有,则先注册对该Write锁前一个锁的监听,然后阻塞该Read/Write锁的获取。若监听到该Read/Write锁前一个Write锁已释放,则打开阻塞,继续执行。

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.2.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
/**
     * 读锁(需要引入curator-recipes依赖)
     */
    @Test
    @SneakyThrows
    void getReadLock() {
        InterProcessReadWriteLock rwlock = new InterProcessReadWriteLock(curatorFramework, "/read-lock");
        InterProcessMutex readLock = rwlock.readLock(); /** 获得读锁实例对象 */
        for (int i = 0; i< 10 ; i++) {
            new Thread(()-> {
                String threadName = Thread.currentThread().getName();
                try {
                    readLock.acquire(); // 获取读锁
                    log.info("线程={}:等待获取Read锁成功!开始执行业务代码...", threadName);
                    Thread.sleep(1000);
                } catch (Throwable e) {
                    e.printStackTrace();
                } finally {
                    try {
                        readLock.release(); // 释放锁
                    } catch (Throwable e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
        Thread.sleep(2000);
    }

    /**
     * 写锁(需要引入curator-recipes依赖)
     */
    @Test
    @SneakyThrows
    void getWriteLock() {
        InterProcessReadWriteLock rwlock = new InterProcessReadWriteLock(curatorFramework, "/write-lock");
        InterProcessMutex writeLock = rwlock.writeLock(); /** 获得写锁实例对象 */
        for (int i = 0; i< 5 ; i++) {
            new Thread(()-> {
                String threadName = Thread.currentThread().getName();
                try {
                    writeLock.acquire(); // 获取写锁
                    log.info("线程={}:等待获取Write锁成功!开始执行业务代码...", threadName);
                    Thread.sleep(1000);
                } catch (Throwable e) {
                    e.printStackTrace();
                } finally {
                    try {
                        writeLock.release(); // 释放锁
                    } catch (Throwable e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
        Thread.sleep(6000);
    }

    /**
     * 监听/curator-watch-node节点
     */
    @Test
    @SneakyThrows
    void watch() {
        // 如果不存在节点,则创建
        if (null == curatorFramework.checkExists().forPath(WATCH_NODE_NAME)) {
            curatorFramework.create().forPath(WATCH_NODE_NAME, VALUE_BYTES);
        }
        CuratorCache curatorCache = CuratorCache.builder(curatorFramework, WATCH_NODE_NAME).build();
        CuratorCacheListener listener = CuratorCacheListener.builder().forNodeCache(
                () -> log.info("-----forNodeCache-----{} node is changed!", WATCH_NODE_NAME)).forAll(
                (type, oldData, data) -> log.info("-----forAll-----{} node is changed!type={} oldDate={} date={}",
                        WATCH_NODE_NAME, GSON.toJson(type), GSON.toJson(oldData), GSON.toJson(data))).build();
        curatorCache.listenable().addListener(listener);
        curatorCache.start();

        System.in.read();
    }

Watch机制

概述

        我们可以把Watch理解成是注册在指定Znode上的触发器。

当被Watch的这个Znode发生了变化(即:create、delete、setData方法)时,将会触发Znode上注册的对应监听事件,请求Watch的客户端会接收到异步回调通知。

        客户端使用了NIO通信模式监听服务的调用。

        监听节点内容的变化,我们可以使用get -w [节点]

         监听节点目录的变化,我们可以使用ls -w [节点]

        监听所有级别子目录变化,我么可以使用ls -w -R [节点]

        基于Curator使用WatchZookeeperDemoApplicationTests类的watch()方法

ZK集群

集群搭建

步骤一:创建四个数据存储目录

步骤二:分别在zk1~zk4目录中创建myid文件

步骤三:创建四个zoo.cfg配置文件,分别为zoo1.conf,zoo2.conf,zoo3.conf,zoo4.conf

步骤四:启动四台ZooKeeper

步骤五:查看四台ZooKeeper的角色

步骤六:连接集群./zkCli.sh -server 127.0.0.1:2181127.0.0.1:2182 127.0.0.1:2183 127.0.0.1:2184

Zookeeper集群的leader选举

        ZooKeeper作为非常重要的分布式协调组件,需要进行集群部署,集群中会以一主多从的形式进行部署。为了保证数据的一致性,使用了ZAB(ZooKeeper Atomic Broadcase)协议,这个协议解决了ZooKeeper的崩溃恢复和主从数据同步的问题。

ZAB协议定义了如下四种节点状态

Leader选举流程

注释:基于 FastLeaderElection 算法(Zookeeper 默认选举算法)

 选举触发

  • 初始启动:当 Zookeeper 集群中各服务器首次启动时,所有服务器都不知道谁是 leader,此时会触发 leader 选举流程。
  • leader 故障:在集群运行过程中,如果 follower 节点在一定时间内没有收到 leader 的心跳(通过心跳机制检测,默认 tickTime 时间间隔内未收到心跳视为 leader 故障),则认为 leader 出现故障,follower 节点会发起新一轮的 leader 选举。
第一轮选举投票
  • 初始化投票
    • 每个服务器启动后,都会初始化自己的投票信息,将自己推举为 leader,投票信息包含服务器的 myid(唯一标识,在配置文件中指定)、zxid(事务 ID,反映服务器上数据的最新状态,越大表示数据越新)以及 epoch(选举轮次,初始值为 0,每次选举都会递增)。例如,服务器 S1 的 myid 为 1,zxid 为 100,它会向集群中的其他服务器发送投票 (1, 100, 0)。
  • 接收与比较投票
    • 服务器在接收到其他服务器的投票后,会按照一定规则进行比较。首先比较 epoch,epoch 大的投票优先;若 epoch 相同,则比较 zxid,zxid 大的优先;若 zxid 也相同,再比较 myid,myid 大的优先。
    • 假设服务器 S1 接收到服务器 S2 的投票 (2, 105, 0),由于 S2 的 zxid 大于 S1,S1 会更新自己的投票,将票改投给 S2,即把自己的投票信息更新为 (2, 105, 0)。
  • 统计投票
    • 每台服务器都会统计自己收到的投票信息,判断是否有超过半数的服务器投给了某个节点。例如,一个由 5 台服务器组成的集群,当一台服务器收到至少 3 张((5 / 2) + 1 = 3)指向同一节点的有效投票时,就初步认为该节点可能成为 leader。
  • 第一轮选举结果
    • 在第一轮选举中,可能出现两种情况。一种是有某个节点获得超过半数的投票,初步当选为 leader;另一种是没有节点获得超过半数的投票,此时所有服务器会进入下一轮选举。

第二轮选举投票
  • 递增 epoch
    • 如果第一轮选举没有选出 leader,所有服务器会将自己的 epoch 值加 1,进入下一轮选举。例如,第一轮选举的 epoch 为 0,第二轮选举时,所有服务器的 epoch 变为 1。
  • 重新投票
    • 服务器基于新的 epoch 值重新封装自己的投票信息,并向集群中其他服务器发送。此时,服务器会根据当前的状态和接收到的其他服务器信息,重新确定自己的投票对象。例如,服务器 S1 在第二轮选举中,会综合第一轮选举后获取的新信息,可能会改变自己的投票选择,重新向其他服务器发送新的投票信息。
  • 重复比较与统计
    • 后续服务器接收投票、比较投票以及统计投票的过程与第一轮类似。各服务器按照 epoch、zxid、myid 的顺序依次比较接收到的投票信息,更新自己的投票,并统计投票结果。
    • 在每一轮选举中,只要有节点获得超过半数的投票,就会当选为 leader。若仍然没有节点获得超过半数的投票,则继续递增 epoch,进行下一轮选举,直到选出 leader 为止。

确定 leader后
  • 当某个节点在某一轮选举中获得超过半数的投票时,该节点就正式当选为 leader。例如,服务器 S3 在第二轮选举中获得了 3 台服务器的投票,它就成为了 leader。

       Leader选举出来之后,会周期性不断的向Follower发送心跳 (ping命令没有内容的socket)。

         当Leader崩溃后,Follower发现socket通道已经关闭,那么Follower就会从Following状态进入到Looking状态,然后重新开始进行Leader的选举,在Leader选举的这个过程中,zk集群不能堆外提供服务。

zk集群同步

  • leader 当选后,会向其他服务器发送通知,告知自己成为 leader。其他服务器收到通知后,更新自己的状态,确认 leader,并开始与 leader 进行数据同步。通过 ZAB 协议的原子广播机制,leader 将最新的数据状态同步给 follower,使整个集群的数据状态达成一致,从而保证集群的一致性和可用性。

如果客户端连接了Leader节点,则直接将数据写入到主节点;如果客户端连接到了Follower节点,那么Follower节点会将数据转发给Leader节点,Leader节点再将数据写入到本节点中。


网站公告

今日签到

点亮在社区的每一天
去签到