zk基础—5.Curator的使用与剖析二

发布于:2025-04-06 ⋅ 阅读:(23) ⋅ 点赞:(0)

大纲

1.基于Curator进行基本的zk数据操作

2.基于Curator实现集群元数据管理

3.基于Curator实现HA主备自动切换

4.基于Curator实现Leader选举

5.基于Curator实现分布式Barrier

6.基于Curator实现分布式计数器

7.基于Curator实现zk的节点和子节点监听机制

8.基于Curator创建客户端实例的源码分析

9.Curator在启动时是如何跟zk建立连接的

10.基于Curator进行增删改查节点的源码分析

11.基于Curator的节点监听回调机制的实现源码

12.基于Curator的Leader选举机制的实现源码

11.Curator节点监听回调机制的实现源码

(1)PathCache子节点监听机制的实现源码

(2)NodeCache节点监听机制的实现源码

(3)getChildren()方法对子节点注册监听器和后台异步回调说明

(4)PathCache实现自动重复注册监听器的效果

(5)NodeCache实现节点变化事件监听的效果

(1)PathCache子节点监听机制的实现源码

PathChildrenCache会调用原生zk客户端对象的getChildren()方法,并往该方法传入一个监听器childrenWatcher。当子节点发生事件,就会通知childrenWatcher这个原生的Watcher,然后该Watcher便会调用注册到PathChildrenCache的Listener。注意:在传入的监听器Watcher中会实现重复注册Watcher。

public class Demo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient(
            "127.0.0.1:2181",//zk的地址
            5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开
            3000,//连接zk时的超时时间
            retryPolicy
        );
        client.start();
        System.out.println("已经启动Curator客户端");

        //PathCache,监听/cluster下的子节点变化
        PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/cluster", true);
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                ...
            }
        });
        pathChildrenCache.start();
    }
}

public class PathChildrenCache implements Closeable {
    private final WatcherRemoveCuratorFramework client;
    private final String path;
    private final boolean cacheData;
    private final boolean dataIsCompressed;
    private final CloseableExecutorService executorService;
    private final ListenerContainer<PathChildrenCacheListener> listeners = new ListenerContainer<PathChildrenCacheListener>();
    ...
    //初始化
    public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService) {
        this.client = client.newWatcherRemoveCuratorFramework();
        this.path = PathUtils.validatePath(path);
        this.cacheData = cacheData;
        this.dataIsCompressed = dataIsCompressed;
        this.executorService = executorService;
        ensureContainers = new EnsureContainers(client, path);
    }
    
    //获取用来存放Listener的容器listeners
    public ListenerContainer<PathChildrenCacheListener> getListenable() {
        return listeners;
    }
    
    //启动对子节点的监听
    public void start() throws Exception {
        start(StartMode.NORMAL);
    }
    
    private volatile ConnectionStateListener connectionStateListener = new ConnectionStateListener() {
        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState) {
            //处理连接状态的变化
            handleStateChange(newState);
        }
    };
    
    public void start(StartMode mode) throws Exception {
        ...
        //对建立的zk连接添加Listener
        client.getConnectionStateListenable().addListener(connectionStateListener);
        ...
        //把PathChildrenCache自己传入RefreshOperation中
        //下面的代码其实就是调用PathChildrenCache的refresh()方法
        offerOperation(new RefreshOperation(this, RefreshMode.STANDARD));
        ...
    }
    
    //提交一个任务到线程池进行处理
    void offerOperation(final Operation operation) {
        if (operationsQuantizer.add(operation)) {
            submitToExecutor(
                new Runnable() {
                    @Override
                    public void run() {
                        ...
                        operationsQuantizer.remove(operation);
                        //其实就是调用PathChildrenCache的refresh()方法
                        operation.invoke();
                        ...
                    }
                }
            );
        }
    }
    
    private synchronized void submitToExecutor(final Runnable command) {
        if (state.get() == State.STARTED) {
            //提交一个任务到线程池进行处理
            executorService.submit(command);
        }
    }
    ...
}

class RefreshOperation implements Operation {
    private final PathChildrenCache cache;
    private final PathChildrenCache.RefreshMode mode;
    
    RefreshOperation(PathChildrenCache cache, PathChildrenCache.RefreshMode mode) {
        this.cache = cache;
        this.mode = mode;
    }
    
    @Override
    public void invoke() throws Exception {
        //调用PathChildrenCache的refresh方法,也就是发起对子节点的监听
        cache.refresh(mode);
    }
    ...
}

public class PathChildrenCache implements Closeable {
    ...
    private volatile Watcher childrenWatcher = new Watcher() {
        //重复注册监听器
        //当子节点发生变化事件时,该方法就会被触发调用
        @Override
        public void process(WatchedEvent event) {
            //下面的代码其实依然是调用PathChildrenCache的refresh()方法
            offerOperation(new RefreshOperation(PathChildrenCache.this, RefreshMode.STANDARD));
        }
    };
    
    void refresh(final RefreshMode mode) throws Exception {
        ensurePath();
        //创建一个回调,在下面执行client.getChildren()成功时会触发执行该回调
        final BackgroundCallback callback = new BackgroundCallback() {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                if (reRemoveWatchersOnBackgroundClosed()) {
                    return;
                }
                if (event.getResultCode() == KeeperException.Code.OK.intValue()) {
                    //处理子节点数据
                    processChildren(event.getChildren(), mode);
                } else if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) {
                    if (mode == RefreshMode.NO_NODE_EXCEPTION) {
                        log.debug("KeeperException.NoNodeException received for getChildren() and refresh has failed. Resetting ensureContainers but not refreshing. Path: [{}]", path);
                        ensureContainers.reset();
                    } else {
                        log.debug("KeeperException.NoNodeException received for getChildren(). Resetting ensureContainers. Path: [{}]", path);
                        ensureContainers.reset();
                        offerOperation(new RefreshOperation(PathChildrenCache.this, RefreshMode.NO_NODE_EXCEPTION));
                    }
                }
            }
        };
        //下面的代码最后会调用到原生zk客户端的getChildren方法发起对子节点的监听
        //并且添加一个叫childrenWatcher的监听,一个叫callback的后台异步回调
        client.getChildren().usingWatcher(childrenWatcher).inBackground(callback).forPath(path);
    }
    ...
}

//子节点发生变化事件时,最后都会触发执行EventOperation的invoke()方法
class EventOperation implements Operation {
    private final PathChildrenCache cache;
    private final PathChildrenCacheEvent event;
    
    EventOperation(PathChildrenCache cache, PathChildrenCacheEvent event) {
        this.cache = cache;
        this.event = event;
    }
    
    @Override
    public void invoke() {
        //调用PathChildrenCache的Listener
        cache.callListeners(event);
    }
    ...
}

(2)NodeCache节点监听机制的实现源码

NodeCache会调用原生zk客户端对象的exists()方法,并往该方法传入一个监听器watcher。当子节点发生事件,就会通知watcher这个原生的Watcher,然后该Watcher便会调用注册到NodeCache的Listener。注意:在传入的监听器Watcher中会实现重复注册Watcher。

public class Demo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient(
            "127.0.0.1:2181",//zk的地址
            5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开
            3000,//连接zk时的超时时间
            retryPolicy
        );
        client.start();
        System.out.println("已经启动Curator客户端");

        //NodeCache
        final NodeCache nodeCache = new NodeCache(client, "/cluster");
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            public void nodeChanged() throws Exception {
                Stat stat = client.checkExists().forPath("/cluster");
                if (stat == null) {
                } else {
                    nodeCache.getCurrentData();
                }
            }
        });
        nodeCache.start();
    }
}

public class NodeCache implements Closeable {
    private final WatcherRemoveCuratorFramework client;
    private final String path;
    private final ListenerContainer<NodeCacheListener> listeners = new ListenerContainer<NodeCacheListener>();
    ...
    private ConnectionStateListener connectionStateListener = new ConnectionStateListener() {
        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState) {
            if ((newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED)) {
                if (isConnected.compareAndSet(false, true)) {
                    reset();
                }
            } else {
                isConnected.set(false);
            }
        }
    };
    
    //初始化一个Watcher,作为监听器添加到下面reset()方法执行的client.checkExists()方法中
    private Watcher watcher = new Watcher() {
        //重复注册监听器
        @Override
        public void process(WatchedEvent event) {
            reset();
        }
    };
    
    //初始化一个回调,在下面reset()方法执行client.checkExists()成功时会触发执行该回调
    private final BackgroundCallback backgroundCallback = new BackgroundCallback() {
        @Override
        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
            processBackgroundResult(event);
        }
    };
    
    //初始化NodeCache
    public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed) {
        this.client = client.newWatcherRemoveCuratorFramework();
        this.path = PathUtils.validatePath(path);
        this.dataIsCompressed = dataIsCompressed;
    }
    
    //获取存放Listener的容器ListenerContainer
    public ListenerContainer<NodeCacheListener> getListenable() {
        Preconditions.checkState(state.get() != State.CLOSED, "Closed");
        return listeners;
    }
    
    //启动对节点的监听
    public void start() throws Exception {
        start(false);
    }
    
    public void start(boolean buildInitial) throws Exception {
        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
        //对建立的zk连接添加Listener
        client.getConnectionStateListenable().addListener(connectionStateListener);
        if (buildInitial) {
            //调用原生的zk客户端的exists()方法,对节点进行监听
            client.checkExists().creatingParentContainersIfNeeded().forPath(path);
            internalRebuild();
        }
        reset();
    }
    
    private void reset() throws Exception {
        if ((state.get() == State.STARTED) && isConnected.get()) {
            //下面的代码最后会调用原生的zk客户端的exists()方法,对节点进行监听
            //并且添加一个叫watcher的监听,一个叫backgroundCallback的后台异步回调
            client.checkExists().creatingParentContainersIfNeeded().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
        }
    }
    
    private void processBackgroundResult(CuratorEvent event) throws Exception {
        switch (event.getType()) {
            case GET_DATA: {
                if (event.getResultCode() == KeeperException.Code.OK.intValue()) {
                    ChildData childData = new ChildData(path, event.getStat(), event.getData());
                    setNewData(childData);
                }
                break;
            }
            case EXISTS: {
                if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) {
                    setNewData(null);
                } else if (event.getResultCode() == KeeperException.Code.OK.intValue()) {
                    if (dataIsCompressed) {
                        client.getData().decompressed().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
                    } else {
                        client.getData().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
                    }
                }
                break;
            }
        }
    }
    ...
}

(3)getChildren()方法对子节点注册监听器和后台异步回调说明

getChildren()方法注册的Watcher只有一次性,其注册的回调是一个异步回调。

public class Demo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient(
            "127.0.0.1:2181",//zk的地址
            5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开
            3000,//连接zk时的超时时间
            retryPolicy
        );
        client.start();
        System.out.println("已经启动Curator客户端,完成zk的连接");

        client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/test", "10".getBytes());
        System.out.println("创建节点'/test");

        client.getChildren().usingWatcher(new CuratorWatcher() {
            public void process(WatchedEvent event) throws Exception {
                //只要通知过一次zk节点的变化,这里就不会再被通知了
                //也就是第一次的通知才有效,这里被执行过一次后,就不会再被执行
                System.out.println("收到一个zk的通知: " + event);
            }
        }).inBackground(new BackgroundCallback() {
            //后台回调通知,表示会让zk.getChildren()在后台异步执行
            //后台异步执行client.getChildren()方法完毕,便会回调这个方法进行通知
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                System.out.println("收到一个后台回调通知: " + event);
            }
        }).forPath("/test");
    }
}

(4)PathCache实现自动重复注册监听器的效果

每当节点发生变化时,就会触发childEvent()方法的调用。

public class Demo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        final CuratorFramework client = CuratorFrameworkFactory.newClient(
            "127.0.0.1:2181",//zk的地址
            5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开
            3000,//连接zk时的超时时间
            retryPolicy
        );
        client.start();
        System.out.println("已经启动Curator客户端,完成zk的连接");

        final PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/test", true);
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            //只要子节点发生变化,无论变化多少次,每次变化都会触发这里childEvent的调用
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                System.out.println("监听的子节点发生变化,收到了事件通知:" + pathChildrenCacheEvent);
            }
        });
        pathChildrenCache.start();
        System.out.println("完成子节点的监听和启动");
    }
}

(5)NodeCache实现节点变化事件监听的效果

每当节点发生变化时,就会触发nodeChanged()方法的调用。

public class Demo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        final CuratorFramework client = CuratorFrameworkFactory.newClient(
            "127.0.0.1:2181",//zk的地址
            5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开
            3000,//连接zk时的超时时间
            retryPolicy
        );
        client.start();
        System.out.println("已经启动Curator客户端,完成zk的连接");

        final NodeCache nodeCache = new NodeCache(client, "/test/child/id");
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            //只要节点发生变化,无论变化多少次,每次变化都会触发这里nodeChanged的调用
            public void nodeChanged() throws Exception {
                Stat stat = client.checkExists().forPath("/test/child/id");
                if (stat != null) {
                    byte[] dataBytes = client.getData().forPath("/test/child/id");
                    System.out.println("节点数据发生了变化:" + new String(dataBytes));
                } else {
                    System.out.println("节点被删除");
                }
            }
        });
        nodeCache.start();
    }
}

12.基于Curator的Leader选举机制的实现源码

(1)第一种Leader选举机制LeaderLatch的源码

(2)第二种Leader选举机制LeaderSelector的源码

利用Curator的CRUD+ 监听回调机制,就能满足大部分系统使用zk的场景了。需要注意的是:如果使用原生的zk去注册监听器来监听节点或者子节点,当节点或子节点发生了对应的事件,会通知客户端一次,但是下一次再有对应的事件就不会通知了。使用zk原生的API时,客户端需要每次收到事件通知后,重新注册监听器。然而Curator的PathCache + NodeCache,会自动重新注册监听器。

(1)第一种Leader选举机制LeaderLatch的源码

Curator客户端会通过创建临时顺序节点的方式来竞争成为Leader的,LeaderLatch这种Leader选举的实现方式与分布式锁的实现几乎一样。

每个Curator客户端创建完临时顺序节点后,就会对/leader/latch目录调用getChildren()方法来获取里面所有的子节点,调用getChildren()方法的结果会通过backgroundCallback回调进行通知,接着客户端便对获取到的子节点进行排序来判断自己是否是第一个子节点。

如果客户端发现自己是第一个子节点,那么就是Leader。如果客户端发现自己不是第一个子节点,就对上一个节点添加一个监听器。在添加监听器时,会使用getData()方法获取自己的上一个节点,getData()方法执行成功后会调用backgrondCallback回调。

当上一个节点对应的客户端释放了Leader角色,上一个节点就会消失,此时就会通知第二个节点对应的客户端,执行getData()方法添加的监听器。

所以如果getData()方法的监听器被触发了,即发现上一个节点不存在了,客户端会调用getChildren()方法重新获取子节点列表,判断是否是Leader。

注意:使用getData()代替exists(),可以避免不必要的Watcher造成的资源泄露。

public class Demo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        final CuratorFramework client = CuratorFrameworkFactory.newClient(
            "127.0.0.1:2181",//zk的地址
            5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开
            3000,//连接zk时的超时时间
            retryPolicy
        );
        client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                switch (newState) {
                    case LOST:
                        //当Leader与zk断开时,需要暂停当前Leader的工作
                }
            }
        });
        client.start();
        System.out.println("已经启动Curator客户端,完成zk的连接");

        LeaderLatch leaderLatch = new LeaderLatch(client, "/leader/latch");
        leaderLatch.start();
        leaderLatch.await();//阻塞等待直到当前客户端成为Leader
        Boolean hasLeaderShip = leaderLatch.hasLeadership();
        System.out.println("是否成为Leader: " + hasLeaderShip);
    }
}

public class LeaderLatch implements Closeable {
    private final WatcherRemoveCuratorFramework client;
    private final ConnectionStateListener listener = new ConnectionStateListener() {
        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState) {
            handleStateChange(newState);
        }
    };
    ...
    //Add this instance to the leadership election and attempt to acquire leadership.
    public void start() throws Exception {
        ...
        //对建立的zk连接添加Listener
        client.getConnectionStateListenable().addListener(listener);
        reset();
        ...
    }
    
    @VisibleForTesting
    void reset() throws Exception {
        setLeadership(false);
        setNode(null);
        //callback作为成功创建临时顺序节点后的回调
        BackgroundCallback callback = new BackgroundCallback() {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                ...
                if (event.getResultCode() == KeeperException.Code.OK.intValue()) {
                    setNode(event.getName());
                    if (state.get() == State.CLOSED) {
                        setNode(null);
                    } else {
                        //成功创建临时顺序节点,需要通过getChildren()再去获取子节点列表
                        getChildren();
                    }
                } else {
                    log.error("getChildren() failed. rc = " + event.getResultCode());
                }
            }
        };
        //创建临时顺序节点
        client.create().creatingParentContainersIfNeeded().withProtection()
            .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback)
            .forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
    }
    
    //获取子节点列表
    private void getChildren() throws Exception {
        //callback作为成功获取子节点列表后的回调
        BackgroundCallback callback = new BackgroundCallback() {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                if (event.getResultCode() == KeeperException.Code.OK.intValue()) {
                    checkLeadership(event.getChildren());
                }
            }
        };
        client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null));
    }
    
    //检查自己是否是第一个节点
    private void checkLeadership(List<String> children) throws Exception {
        if (debugCheckLeaderShipLatch != null) {
            debugCheckLeaderShipLatch.await();
        }
        final String localOurPath = ourPath.get();
        //对获取到的节点进行排序
        List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
        int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
        if (ourIndex < 0) {
            log.error("Can't find our node. Resetting. Index: " + ourIndex);
            reset();
        } else if (ourIndex == 0) {
            //如果自己是第一个节点,则标记自己为Leader
            setLeadership(true);
        } else {
            //如果自己不是第一个节点,则对前一个节点添加监听
            String watchPath = sortedChildren.get(ourIndex - 1);
            Watcher watcher = new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if ((state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null)) {
                        //重新获取子节点列表
                        getChildren();
                    }
                }
            };
            BackgroundCallback callback = new BackgroundCallback() {
                @Override
                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                    if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) {
                        reset();
                    }
                }
            };
            //use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
            //使用getData()代替exists(),可以避免不必要的Watcher造成的资源泄露
            client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath));
        }
    }
    ...
    //阻塞等待直到成为Leader
    public void await() throws InterruptedException, EOFException {
        synchronized(this) {
            while ((state.get() == State.STARTED) && !hasLeadership.get()) {
                wait();//Objetc对象的wait()方法,阻塞等待
            }
        }
        if (state.get() != State.STARTED) {
            throw new EOFException();
        }
    }
    
    //设置当前客户端成为Leader,并进行notifyAll()通知之前阻塞的线程
    private synchronized void setLeadership(boolean newValue) {
        boolean oldValue = hasLeadership.getAndSet(newValue);
        if (oldValue && !newValue) { // Lost leadership, was true, now false
            listeners.forEach(new Function<LeaderLatchListener, Void>() {
                @Override
                public Void apply(LeaderLatchListener listener) {
                    listener.notLeader();
                    return null;
                }
            });
        } else if (!oldValue && newValue) { // Gained leadership, was false, now true
            listeners.forEach(new Function<LeaderLatchListener, Void>() {
                @Override
                public Void apply(LeaderLatchListener input) {
                    input.isLeader();
                    return null;
                }
            });
        }
        notifyAll();//唤醒之前执行了wait()方法的线程
    }
}

(2)第二种Leader选举机制LeaderSelector的源码

通过判断是否成功获取到分布式锁,来判断是否竞争成为Leader。正因为是通过持有分布式锁来成为Leader,所以LeaderSelector.takeLeadership()方法不能退出,否则就会释放锁。而一旦释放了锁,其他客户端就会竞争锁成功而成为新的Leader。

public class Demo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        final CuratorFramework client = CuratorFrameworkFactory.newClient(
            "127.0.0.1:2181",//zk的地址
            5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开
            3000,//连接zk时的超时时间
            retryPolicy
        );
        client.start();
        System.out.println("已经启动Curator客户端,完成zk的连接");

        LeaderSelector leaderSelector = new LeaderSelector(
            client,
            "/leader/election",
            new LeaderSelectorListener() {
                public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
                    System.out.println("你已经成为了Leader......");
                    //在这里干Leader所有的事情,此时方法不能退出
                    Thread.sleep(Integer.MAX_VALUE);
                }
                public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                    System.out.println("连接状态的变化,已经不是Leader......");
                    if (connectionState.equals(ConnectionState.LOST)) {
                        throw new CancelLeadershipException();
                    }
                }
            }
        );
        leaderSelector.start();//尝试和其他节点在节点"/leader/election"上进行竞争成为Leader
        Thread.sleep(Integer.MAX_VALUE);
    }
}

public class LeaderSelector implements Closeable {
    private final CuratorFramework client;
    private final LeaderSelectorListener listener;
    private final CloseableExecutorService executorService;
    private final InterProcessMutex mutex;
    ...
    public LeaderSelector(CuratorFramework client, String leaderPath, CloseableExecutorService executorService, LeaderSelectorListener listener) {
        Preconditions.checkNotNull(client, "client cannot be null");
        PathUtils.validatePath(leaderPath);
        Preconditions.checkNotNull(listener, "listener cannot be null");

        this.client = client;
        this.listener = new WrappedListener(this, listener);
        hasLeadership = false;
        this.executorService = executorService;
        //初始化一个分布式锁
        mutex = new InterProcessMutex(client, leaderPath) {
            @Override
            protected byte[] getLockNodeBytes() {
                return (id.length() > 0) ? getIdBytes(id) : null;
            }
        };
    }
    
    public void start() {
        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
        Preconditions.checkState(!executorService.isShutdown(), "Already started");
        Preconditions.checkState(!hasLeadership, "Already has leadership");
        client.getConnectionStateListenable().addListener(listener);
        requeue();
    }
    
    public boolean requeue() {
        Preconditions.checkState(state.get() == State.STARTED, "close() has already been called");
        return internalRequeue();
    }
    
    private synchronized boolean internalRequeue() {
        if (!isQueued && (state.get() == State.STARTED)) {
            isQueued = true;
            //将选举的工作作为一个任务交给线程池执行
            Future<Void> task = executorService.submit(new Callable<Void>() {
                @Override
                public Void call() throws Exception {
                    ...
                    doWorkLoop();
                    ...
                    return null;
                }
            });
            ourTask.set(task);
            return true;
        }
        return false;
    }
    
    private void doWorkLoop() throws Exception {
        ...
        doWork();
        ...
    }
    
    @VisibleForTesting
    void doWork() throws Exception {
        hasLeadership = false;
        try {
            //尝试获取一把分布式锁,获取失败会进行阻塞
            mutex.acquire();
            //执行到这一行代码,说明获取分布式锁成功
            hasLeadership = true;
            try {
                if (debugLeadershipLatch != null) {
                    debugLeadershipLatch.countDown();
                }
                if (debugLeadershipWaitLatch != null) {
                    debugLeadershipWaitLatch.await();
                }
                //回调用户重写的takeLeadership()方法
                listener.takeLeadership(client);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw e;
            } catch (Throwable e) {
                ThreadUtils.checkInterrupted(e);
            } finally {
                clearIsQueued();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw e;
        } finally {
            if (hasLeadership) {
                hasLeadership = false;
                boolean wasInterrupted = Thread.interrupted();  // clear any interrupted tatus so that mutex.release() works immediately
                try {
                    //释放锁
                    mutex.release();
                } catch (Exception e) {
                    if (failedMutexReleaseCount != null) {
                        failedMutexReleaseCount.incrementAndGet();
                    }
                    ThreadUtils.checkInterrupted(e);
                    log.error("The leader threw an exception", e);
                } finally {
                    if (wasInterrupted) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }
    }
    ...
}

网站公告

今日签到

点亮在社区的每一天
去签到