zk基础—4.zk实现分布式功能二

发布于:2025-04-04 ⋅ 阅读:(15) ⋅ 点赞:(0)

大纲

1.zk实现数据发布订阅

2.zk实现负载均衡

3.zk实现分布式命名服务

4.zk实现分布式协调(Master-Worker协同)

5.zk实现分布式通信

6.zk实现Master选举

7.zk实现分布式锁

8.zk实现分布式队列和分布式屏障

4.zk实现分布式协调(Master-Worker协同)

(1)Master-Worker架构

(2)Master-Worker架构示例—HBase

(3)Master-Worker架构示例—Kafka

(4)Master-Worker架构示例—HDFS

(5)如何使用zk实现Master-Worker

(1)Master-Worker架构

Master-Work是一个广泛使用的分布式架构,系统中有一个Master负责监控Worker的状态,并为Worker分配任务。

说明一:在任何时刻,系统中最多只能有一个Master。不可以出现两个Master,多个Master共存会导致脑裂。

说明二:系统中除了Active状态的Master还有一个Bakcup的Master。如果Active失败,Backup可以很快进入Active状态。

说明三:Master实时监控Worker的状态,能够及时收到Worker成员变化的通知。Master在收到Worker成员变化通知时,会重新进行任务分配。

图片

(2)Master-Worker架构示例—HBase

HBase采用的就是Master-Worker的架构。HMBase是系统中的Master,HRegionServer是系统中的Worker。HMBase会监控HBase Cluster中Worker的成员变化,HMBase会把region分配给各个HRegionServer。系统中有一个HMaster处于Active状态,其他HMaster处于备用状态。

图片

(3)Master-Worker架构示例—Kafka

一个Kafka集群由多个Broker组成,这些Borker是系统中的Worker,Kafka会从这些Worker选举出一个Controller。这个Controlle是系统中的Master,负责把Topic Partition分配给各个Broker。

图片

(4)Master-Worker架构示例—HDFS

HDFS采用的也是一个Master-Worker的架构。NameNode是系统中的Master,DataNode是系统中的Worker。NameNode用来保存整个分布式文件系统的MetaData,并把数据块分配给Cluster中的DataNode进行保存。

图片

(5)如何使用zk实现Master-Worker

步骤1:使用一个临时节点"/master"表示Master。Master在行使Master的职能之前,首先要创建这个znode。如果创建成功,进入Active状态,开始行使Master职能。如果创建失败,则进入Backup状态,并使用Watcher机制监听"/master"。假设系统中有一个Active Master和一个Backup Master。如果Active Master故障了,那么它创建的"/master"就会被zk自动删除。这时Backup Master会收到通知,再次创建"/master"成为新Active Master。

步骤2:使用一个持久节点"/workers"下的临时子节点来表示Worker,Worker通过在"/workers"节点下创建临时节点来加入集群。

步骤3:处于Active状态的Master会通过Watcher机制,监控"/workers"下的子节点列表来实时获取Worker成员的变化。

5.zk实现分布式通信

(1)心跳检测

(2)工作进度汇报

(3)系统调度

在大部分的分布式系统中,机器间的通信有三种类型:心跳检测、工作进度汇报、系统调度。

(1)心跳检测

机器间的心跳检测是指:在分布式环境中,不同机器间需要检测彼此是否还在正常运行。其中,心跳检测有如下三种方法。

方法一:通常会通过机器间是否可以相互PING通来判断对方是否正常运行。

方法二:在机器间建立长连接,通过TCP连接固有的心跳检测机制来实现上层机器的心跳检测。

方法三:基于zk的临时子节点来实现心跳检测,让不同的机器都在zk的一个指定节点下创建临时子节点,不同机器间可以根据这个临时子节点来判断对应的客户端是否存活。基于zk的临时节点来实现的心跳检测,可以大大减少系统的耦合。因为检测系统和被检测系统之间不需要直接关联,只需要通过zk临时节点间接关联。

(2)工作进度汇报

在一个任务分发系统中,任务被分发到不同的机器上执行后,需要实时地将自己的任务执行进度汇报给分发系统。

通过zk的临时子节点来实现工作进度汇报:可以在zk上选择一个节点,每个任务机器都在该节点下创建临时子节点。然后通过判断临时子节点是否存在来确定任务机器是否存活,各个任务机器会实时地将自己的任务执行进度写到其对应的临时节点上,以便中心系统能够实时获取到任务的执行进度。

(3)系统调度

一个分布式系统由控制台和一些客户端系统组成,控制台的职责是将一些指令信息发送给所有客户端。

使用zk实现系统调度时:先让控制台的一些操作指令对应到zk的某些节点数据,然后让客户端系统注册对这些节点数据的监听。当控制台进行一些操作时,便会触发修改这些节点的数据,而zk会将这些节点数据的变更以事件通知的形式发送给监听的客户端。

这样就能省去大量底层网络通信和协议设计上的重复工作了,也大大降低了系统间的耦合,方便实现异构系统的灵活通信。

6.zk实现Master选举

(1)通过创建临时节点实现

(2)通过临时顺序子节点来实现

Master选举的需求是:在集群的所有机器中选举出一台机器作为Master。

(1)通过创建临时节点实现

集群的所有机器都往zk上创建一个临时节点如"/master"。在这个过程中只会有一个机器能成功创建该节点,则该机器便成为Master。同时其他没有成功创建节点的机器会在"/master"节点上注册Watcher监听,一旦当前Master机器挂了,那么其他机器就会重新往zk上创建临时节点。

(2)通过临时顺序子节点来实现

使用临时顺序子节点来表示集群中的机器发起的选举请求,然后让创建最小后缀数字节点的机器成为Master。

7.zk实现分布式锁

(1)死锁的解决方案

(2)zk如何实现排他锁

(3)zk如何实现共享锁(读写锁)

(4)羊群效应

(5)改进后的排他锁

(6)改进后的共享锁

(7)zk原生实现分布式锁的示例

可以利用zk的临时节点来解决死锁问题,可以利用zk的Watcher监听机制实现锁释放后重新竞争锁,可以利用zk数据节点的版本来实现乐观锁。

(1)死锁的解决方案

在单机环境下,多线程之间会产生死锁问题。同样,在分布式系统环境下,也会产生分布式死锁的问题。常用的解决死锁问题的方法有超时方法和死锁检测。

一.超时方法

在解决死锁问题时,超时方法可能是最简单的处理方式了。超时方式是在创建分布式线程时,对每个线程都设置一个超时时间。当该线程的超时时间到期后,无论该线程是否执行完毕,都要关闭该线程并释放该线程所占用的系统资源,之后其他线程就可以访问该线程释放的资源,这样就不会造成死锁问题。

但这种设置超时时间的方法最大的缺点是很难设置一个合适的超时时间。如果时间设置过短,可能造成线程未执行完相关的处理逻辑,就因为超时时间到期就被迫关闭,最终导致程序执行出错。

二.死锁检测

死锁检测是处理死锁问题的另一种方法,它解决了超时方法的缺陷。死锁检测方法会主动检测发现线程死锁,在控制死锁问题上更加灵活准确。

可以把死锁检测理解为一个运行在各服务器系统上的线程或方法,该方法专门用来发现应用服务上的线程是否发生死锁。如果发生死锁,那么就会触发相应的预设处理方案。

(2)zk如何实现排他锁

一.获取锁

获取排他锁时,所有的客户端都会试图通过调用create()方法,在"/exclusive_lock"节点下创建临时子节点"/exclusive_lock/lock"。zk会保证所有的客户端中只有一个客户端能创建临时节点成功,从而获得锁。没有创建临时节点成功的客户端也就没能获得锁,需要到"/exclusive_lock"节点上,注册一个子节点变更的Watcher监听,以便可以实时监听lock节点的变更情况。

二.释放锁

如果获取锁的客户端宕机,那么zk上的这个临时节点(lock节点)就会被移除。如果获取锁的客户端执行完,也会主动删除自己创建的临时节点(lock节点)。

(3)zk如何实现共享锁(读写锁)

一.获取锁

获取共享锁时,所有客户端会到"/shared_lock"下创建一个临时顺序节点。如果是读请求,那么就创建"/shared_lock/read001"的临时顺序节点。如果是写请求,那么就创建"/shared_lock/write002"的临时顺序节点。

二.判断读写顺序

步骤一:客户端在创建完节点后,会获取"/shared_lock"节点下的所有子节点,并对"/shared_lock"节点注册子节点变更的Watcher监听。

步骤二:然后确定自己的节点序号在所有子节点中的顺序(包括读节点和写节点)。

步骤三:对于读请求:如果没有比自己序号小的写请求子节点,或所有比自己小的子节点都是读请求,那么表明可以成功获取共享锁。如果有比自己序号小的子节点是写请求,那么就需要进入等待。对于写请求:如果自己不是序号最小的子节点,那么就需要进入等待。

步骤四:如果客户端在等待过程中接收到Watcher通知,则重复步骤一。

三.释放锁

如果获取锁的客户端宕机,那么zk上的对应的临时顺序节点就会被移除。如果获取锁的客户端执行完,也会主动删除自己创建的临时顺序节点。

(4)羊群效应

一.排他锁的羊群效应

如果有大量的客户端在等待锁的释放,那么就会出现大量的Watcher通知。然后这些客户端又会发起创建请求,但最后只有一个客户端能创建成功。这个Watcher事件通知其实对绝大部分客户端都不起作用,极端情况可能会出现zk短时间向其余客户端发送大量的事件通知,这就是羊群效应。出现羊群效应的根源在于:没有找准客户端真正的关注点。

二.共享锁的羊群效应

如果有大量的客户端在等待锁的释放,那么不仅会出现大量的Watcher通知,还会出现大量的获取"/shared_lock"的子节点列表的请求,但最后大部分客户端都会判断出自己并非是序号最小的节点。所以客户端会接收过多和自己无关的通知和发起过多查询节点列表的请求,这就是羊群效应。出现羊群效应的根源在于:没有找准客户端真正的关注点。

(5)改进后的排他锁

使用临时顺序节点来表示获取锁的请求,让创建出后缀数字最小的节点的客户端成功拿到锁。

步骤一:首先客户端调用create()方法在"/exclusive_lock"下创建一个临时顺序节点。

步骤二:然后客户端调用getChildren()方法返回"/exclusive_lock"下的所有子节点,接着对这些子节点进行排序。

步骤三:排序后,看看是否有后缀比自己小的节点。如果没有,则当前客户端便成功获取到排他锁。如果有,则调用exist()方法对排在自己前面的那个节点注册Watcher监听。

步骤四:当客户端收到Watcher通知前面的节点不存在,则重复步骤二。

(6)改进后的共享锁

步骤一:客户端调用create()方法在"/shared_lock"节点下创建临时顺序节点。如果是读请求,那么就创建"/shared_lock/read001"的临时顺序节点。如果是写请求,那么就创建"/shared_lock/write002"的临时顺序节点。

步骤二:然后调用getChildren()方法返回"/shared_lock"下的所有子节点,接着对这些子节点进行排序。

步骤三:对于读请求:如果排序后发现有比自己序号小的写请求子节点,则需要等待,且需要向比自己序号小的最后一个写请求子节点注册Watcher监听。对于写请求:如果排序后发现自己不是序号最小的子节点,则需要等待,并且需要向比自己序号小的最后一个请求子节点注册Watcher监听。注意:这里注册Watcher监听也是调用exist()方法。此外,不满足上述条件则表示成功获取共享锁。

步骤四:如果客户端在等待过程中接收到Watcher通知,则重复步骤二。

(7)zk原生实现分布式锁的示例

一.分布式锁的实现步骤

步骤一:每个线程都通过"临时顺序节点 + zk.create()方法 + 添加回调"去创建节点。

步骤二:线程执行完创建临时顺序节点后,先通过CountDownLatch.await()方法进行阻塞。然后在创建成功的回调中,通过zk.getChildren()方法获取根目录并继续回调。

步骤三:某线程在获取根目录成功后的回调中,会对目录排序。排序后如果发现其创建的节点排第一,那么就执行countDown()方法不再阻塞,表示获取锁成功。排序后如果发现其创建的节点不是第一,则通过zk.exists()方法监听前一节点。

步骤四:获取到锁的线程会通过zk.delete()方法来删除其对应的节点实现释放锁。在等候获取锁的线程掉线时其对应的节点也会被删除。而一旦节点被删除,那些监听根目录的线程就会重新zk.getChildren()方法,获取成功后其回调又会进行排序以及通过zk.exists()方法监听前一节点。

二WatchCallBack对分布式锁的具体实现

public class WatchCallBack implements Watcher, AsyncCallback.StringCallback, AsyncCallback.Children2Callback, AsyncCallback.StatCallback {
    ZooKeeper zk ;
    String threadName;
    CountDownLatch countDownLatch = new CountDownLatch(1);
    String pathName;
    
    public String getPathName() {
        return pathName;
    }
    
    public void setPathName(String pathName) {
        this.pathName = pathName;
    }
    
    public String getThreadName() {
        return threadName;
    }
    
    public void setThreadName(String threadName) {
        this.threadName = threadName;
    }
    
    public ZooKeeper getZk() {
        return zk;
    }
    
    public void setZk(ZooKeeper zk) {
        this.zk = zk;
    }
    
    public void tryLock() {
        try {
            System.out.println(threadName + " create....");
            //创建一个临时的有序的节点
            zk.create("/lock", threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, "abc");
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    //当前线程释放锁, 删除节点
    public void unLock() {
        try {
            zk.delete(pathName, -1);
            System.out.println(threadName + " over work....");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }
    
    //上面zk.create()方法的回调
    //创建临时顺序节点后的回调, 10个线程都能同时创建节点
    //创建完后获取根目录下的子节点, 也就是这10个线程创建的节点列表, 这个不用watch了, 但获取成功后要执行回调
    //这个回调就是每个线程用来执行节点排序, 看谁是第一就认为谁获得了锁
    @Override
    public void processResult(int rc, String path, Object ctx, String name) {
        if (name != null ) {
            System.out.println(threadName  + "  create node : " +  name );
            setPathName(name);
            //一定能看到自己前边的, 所以这里的watch要是false
            zk.getChildren("/", false, this ,"sdf");
        }
    }
    
    //核心方法: 各个线程获取根目录下的节点时, 上面zk.getChildren("/", false, this ,"sdf")的回调
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
        //一定能看到自己前边的节点
        System.out.println(threadName + "look locks...");
        for (String child : children) {
            System.out.println(child);
        }
        //根目录下的节点排序
        Collections.sort(children);
        //获取当前线程创建的节点在根目录中排第几
        int i = children.indexOf(pathName.substring(1));
        //是不是第一个, 如果是则说明抢锁成功; 如果不是, 则watch当前线程创建节点的前一个节点是否被删除(删除);
        if (i == 0) {
            System.out.println(threadName + " i am first...");
            try {
                //这里的作用就是不让第一个线程获得锁释放锁跑得太快, 导致后面的线程还没建立完监听第一个节点就被删了
                zk.setData("/", threadName.getBytes(), -1);
                countDownLatch.countDown();
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } else {
            //9个没有获取到锁的线程都去调用zk.exists, 去监控各自自己前面的节点, 而没有去监听父节点
            //如果各自前面的节点发生删除事件的时候才回调自己, 并关注被删除的事件(所以会执行process回调)
            zk.exists("/" + children.get(i-1), this, this, "sdf");
        }
    }
    
    //上面zk.exists()的监听
    //监听的节点发生变化的Watcher事件监听
    @Override
    public void process(WatchedEvent event) {
        //如果第一个获得锁的线程释放锁了, 那么其实只有第二个线程会收到回调事件
        //如果不是第一个哥们某一个挂了, 也能造成他后边的收到这个通知, 从而让他后边那个去watch挂掉这个哥们前边的, 保持顺序
        switch (event.getType()) {
            case None:
                break;
            case NodeCreated:
                break;
            case NodeDeleted:
                zk.getChildren("/", false, this ,"sdf");
                break;
            case NodeDataChanged:
                break;
            case NodeChildrenChanged:
                break;
        }
    }
    
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        //TODO
    }
}

三.分布式锁的测试类

public class TestLock {
    ZooKeeper zk;
    
    @Before
    public void conn () {
        zk  = ZKUtils.getZK();
    }
    
    @After
    public void close () {
        try {
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    @Test
    public void lock() {
        //10个线程都去抢锁
        for (int i = 0; i < 10; i++) {
            new Thread() {
                @Override
                public void run() {
                    WatchCallBack watchCallBack = new WatchCallBack();
                    watchCallBack.setZk(zk);
                    String threadName = Thread.currentThread().getName();
                    watchCallBack.setThreadName(threadName);
                    //每一个线程去抢锁
                    watchCallBack.tryLock();
                    //抢到锁之后才能干活
                    System.out.println(threadName + " working...");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //干完活释放锁
                    watchCallBack.unLock();
                }
            }.start();
        }
        while(true) {
        }
    }
}

8.zk实现分布式队列和分布式屏障

(1)分布式队列的实现

(2)分布式屏障的实现

(1)分布式队列的实现

步骤一:所有客户端都到"/queue"节点下创建一个临时顺序节点。

步骤二:通过调用getChildren()方法来获取"/queue"节点下的所有子节点。

步骤三:客户端确定自己的节点序号在所有子节点中的顺序。

步骤四:如果自己不是序号最小的子节点,那么就需要进入等待,同时调用exists()方法向比自己序号小的最后一个节点注册Watcher监听。

步骤五:如果客户端收到Watcher事件通知,重复步骤二。

(2)分布式屏障的实现

"/barrier"节点是一个已存在的默认节点,"/barrier"节点的值是数字n,表示Barrier值,比如10。

步骤一:首先,所有客户端都需要到"/barrier"节点下创建一个临时节点。

步骤二:然后,客户端通过getData()方法获取"/barrier"节点的数据内容,比如10。

步骤三:接着,客户端通过getChildren()方法获取"/barrier"节点下的所有子节点,同时注册对子节点列表变更的Watcher监听。

步骤四:如果客户端发现"/barrier"节点的子节点个数不足10个,那么就需要进入等待。

步骤五:如果客户端接收到了Watcher事件通知,那么就重复步骤三。