本章代码已分享至Gitee: https://gitee.com/lengcz/curator01
Curator API 常用操作 Watch事件监听
zookeeper 允许用户在指定节点上注册一些Watcher ,并且在一些特定事件触发的时候,zookeeper 服务端会将事件通知到感兴趣的客户端上,该机制是zookeeper 实现分布式协调服务的重要特性。
zookeeper 中引入了Wather 机制来实现了发布/订阅功能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态发生变化时,会通知所有订阅者。
zookeeper 原生支持通过注册Wather 来进行事件监听,但是其使用起来特别不方便,需要开发人员自己反复注册Wather,比较繁琐。
Curator 引入了Cache 来实现对zookeeper 服务端事件的监听。
zookeeper 提供了三种Watcher
-
- NodeCache : 只是监听某一个特定的节点
-
- PathChildrenCache: 监听一个ZNode的子节点。
-
- TreeCache :可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache 的结合
NodeCache
NodeCache 用于监听单个节点的变化,包括节点的创建、更新和删除事件。适用于需要关注特定节点数据变化的场景。
/**
* NodeCache 给指定一个节点注册监听器
* @throws Exception
*/
@Test
public void testNodeCache() throws Exception{
//1 创建NodeCache 对象
NodeCache nodeCache = new NodeCache(client,"/app1",false);
//2 注册监听
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
logger.info("节点变化了");
byte[] data = nodeCache.getCurrentData().getData(); // 获取数据
logger.info(new String(data));
}
});
//3 开启监听,如果设置 true,则开启监听,加载缓冲数据
nodeCache.start(true);
// 防止虚拟机退出
while(true){
}
}
NodeCache 会自动处理连接中断和会话过期,并在重新连接后恢复监听。可以通过 nodeCache.getCurrentData() 获取当前节点数据。
PathChildrenCache
PathChildrenCache 监听指定路径下子节点的变化,包括子节点的添加、移除和更新事件。适用于需要监控目录结构变化的场景。
tips: 监听只会对子节点有效,对本节点无效。
/**
* PathChildrenCache 监听某个子节点的所有子节点(不含本节点)
* @throws Exception
*/
@Test
public void testPathChildrenCache() throws Exception{
//1 创建PathChildrenCache对象
PathChildrenCache nodeCache = new PathChildrenCache(client,"/app2",true);
//2 注册监听
nodeCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
logger.info("子节点变化");
logger.info(JSONObject.toJSONString(pathChildrenCacheEvent));
if(pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){ //子节点变化,打印数据
byte[] data = pathChildrenCacheEvent.getData().getData();
logger.info(new String(data));
}
}
});
//3 开启监听,如果设置 true,则开启监听,加载缓冲数据
nodeCache.start();
// 防止虚拟机退出
while(true){
}
}
TreeCache
TreeCache 结合了 NodeCache 和 PathChildrenCache 的功能,可以监听指定节点及其所有子节点的变化。适用于需要完整树形结构监控的场景。
/**
* TreeCache 监听某个子节点自己和所有子节点, 相当于NodeCache和PathChildrenCache的组合
* @throws Exception
*/
@Test
public void testTreeCache() throws Exception{
//1 创建TreeCache对象
TreeCache nodeCache = new TreeCache(client,"/app2");
//2 注册监听
nodeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
logger.info("节点变化");
logger.info(JSONObject.toJSONString(treeCacheEvent));
if(treeCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){ //子节点变化,打印数据
byte[] data = treeCacheEvent.getData().getData();
logger.info(new String(data));
}
}
});
//3 开启监听,如果设置 true,则开启监听,加载缓冲数据
nodeCache.start();
// 防止虚拟机退出
while(true){
}
}
TreeCache 提供的事件类型更丰富,包括 NODE_ADDED、NODE_UPDATED、NODE_REMOVED 等。可以获取完整的节点树结构变化信息。
发布/订阅
由于Watcher是发布订阅模式,所以多个监听器都会收到同一条消息。
/**
* NodeCache 给指定一个节点注册监听器
* @throws Exception
*/
@Test
public void testNodeCache() throws Exception{
{
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
String connectString = "localhost:2181";
CuratorFramework clientTemp = CuratorFrameworkFactory.builder()
.connectString(connectString)
.sessionTimeoutMs(60_000)
.connectionTimeoutMs(15_000)
.retryPolicy(retryPolicy).namespace("demo01").build();
clientTemp.start();
//1 创建NodeCache 对象
NodeCache nodeCache = new NodeCache(clientTemp,"/app1",false);
//2 注册监听
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
byte[] data = nodeCache.getCurrentData().getData(); // 获取数据
logger.info("节点1收到消息:"+new String(data));
}
});
//3 开启监听,如果设置 true,则开启监听,加载缓冲数据
try {
nodeCache.start();
} catch (Exception e) {
e.printStackTrace();
}
}
{
//1 创建NodeCache 对象
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
String connectString = "localhost:2181";
CuratorFramework clientTemp = CuratorFrameworkFactory.builder()
.connectString(connectString)
.sessionTimeoutMs(60_000)
.connectionTimeoutMs(15_000)
.retryPolicy(retryPolicy).namespace("demo01").build();
clientTemp.start();
NodeCache nodeCache = new NodeCache(clientTemp,"/app1",false);
//2 注册监听
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
byte[] data = nodeCache.getCurrentData().getData(); // 获取数据
logger.info("节点2收到消息:"+new String(data));
}
});
//3 开启监听,如果设置 true,则开启监听,加载缓冲数据
try {
nodeCache.start();
} catch (Exception e) {
e.printStackTrace();
}
}
while (true){
}
}
启动示例后,在zk客户端set 数据
示例输出