一、ZooKeeper 客户端常用命令
1. 启动与退出
bin/zkCli.sh -server 127.0.0.1:2181 # 连接客户端 quit # 退出客户端
2. 节点操作
# 查看子节点 ls / ls -s / ls /app # 查看节点详细信息 ls2 /app stat /app
# 创建节点 create /node1 "hello" # 持久节点 create -e /node2 "temp" # 临时节点 create -s /node3 "seq" # 顺序节点 create -e -s /node4 "tempSeq" # 临时顺序节点 创建node1下的子节点,不能node1和node1一起创建,必须创建了node1才能执行下面的否则报错 create /node1/node11 "hello1" #子节点
# 获取/修改数据 get /node1 set /node1 "world"
# 删除节点 delete /node1 # 删除无子节点的 deleteall /节点path #删除带有子节点的节点 rmr /node1 # 递归删除
3. Watch 监听
get /node1 true # 监听数据变化 ls / true # 监听子节点变化
⚠️ 监听是一次性的,触发后失效。
4. ACL 权限控制
getAcl /node1 setAcl /node1 world:anyone:rw
权限:
r
=读,w
=写,c
=创建,d
=删除,a
=管理。
模式:world
(所有人)、auth
、digest
(用户名密码)、ip
。
5. 辅助命令
help # 查看帮助 history # 查看历史命令 redo <id> # 重做历史命令
二、ZooKeeper Java API 操作
1. 原生 API(org.apache.zookeeper.ZooKeeper)
需要的依赖
<!-- 原生 ZooKeeper 依赖 --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.8.3</version> </dependency>
代码实现
import org.apache.zookeeper.*; public class ZkDemo { public static void main(String[] args) throws Exception { // 1. 连接 ZooKeeper ZooKeeper zk = new ZooKeeper("127.0.0.1:2181", 30000, event -> { System.out.println("收到事件:" + event); }); // 2. 创建节点 zk.create("/node1", "hello".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // 3. 获取节点数据 byte[] data = zk.getData("/node1", false, null); System.out.println("节点数据:" + new String(data)); // 4. 修改节点数据 zk.setData("/node1", "world".getBytes(), -1); // 5. 获取子节点 System.out.println("子节点:" + zk.getChildren("/", false)); // 6. 删除节点 zk.delete("/node1", -1); // 7. 关闭连接 zk.close(); } }
2. Curator 客户端(推荐,简化 API)
需要的依赖
<!--curator--> <!-- Curator(如果你要用 Curator 封装的API) --> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> </dependency>
⚠️
curator-framework
和curator-recipes
内部会依赖zookeeper
,所以一般不用单独引入zookeeper
依赖,除非你要控制 zk 版本。2.1 使用Curator实现增删改查的api
Curator 方法 ZooKeeper CLI 类似命令 create().forPath("/node")
create /node
getData().forPath("/node")
get /node
getChildren().forPath("/")
ls /
getData().storingStatIn(stat)
ls -s /node
setData().forPath("/node", data)
set /node data
delete().forPath("/node")
delete /node
delete().deletingChildrenIfNeeded()
rmr /node
代码实现:
public class CuratorTest { private CuratorFramework client; // Curator 客户端对象 /** * 建立连接 */ @Before public void testConnect() { /* * * @param connectString 连接字符串。zk server 地址和端口 "127.0.0.1:2181,127.0.0.1:2181" * @param sessionTimeoutMs 会话超时时间 单位ms * @param connectionTimeoutMs 连接超时时间 单位ms * @param retryPolicy 重试策略 */ /* //重试策略 RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10); //1.第一种方式 CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.149.135:2181", 60 * 1000, 15 * 1000, retryPolicy);*/ //重试策略:初始等待3秒,重试10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10); //2.第二种方式:通过builder方式构建客户端 //CuratorFrameworkFactory.builder(); client = CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181") // 设置ZK地址 .sessionTimeoutMs(60 * 1000) // 会话超时时间 .connectionTimeoutMs(15 * 1000) // 连接超时时间 .retryPolicy(retryPolicy) // 设置重试策略 // namespace("czq") // 设置命名空间,将 "czq" 作为客户端操作的根路径 // 这样之后创建节点时无需每次都写 /czq 前缀 // 例如:client.create().forPath("/node11", "hello1".getBytes()) // 实际会在 ZooKeeper 中创建 /czq/node11 节点 .namespace("czq") .build(); //开启连接 client.start(); } //==============================create============================================================================= /** * 创建节点:create 持久 临时 顺序 数据 * 1. 基本创建 :create().forPath("") * 2. 创建节点 带有数据:create().forPath("",data) * 3. 设置节点的类型:create().withMode().forPath("",data) * 4. 创建多级节点 /app1/p1 :create().creatingParentsIfNeeded().forPath("",data) */ @Test public void testCreate() throws Exception { //2. 创建节点 带有数据 //如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储 String path = client.create().forPath("/app2", "hehe".getBytes()); System.out.println(path); } @Test public void testCreate2() throws Exception { //1. 基本创建 //如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储 String path = client.create().forPath("/app1"); System.out.println(path); } @Test public void testCreate3() throws Exception { //3. 设置节点的类型 //默认类型:持久化 String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3"); System.out.println(path); } @Test public void testCreate4() throws Exception { //4. 创建多级节点 /app1/p1 //creatingParentsIfNeeded():如果父节点不存在,则创建父节点 String path = client.create().creatingParentsIfNeeded().forPath("/app4/p1"); System.out.println(path); } //===========================get================================================================================ /** * 查询节点: * 1. 查询数据:get: getData().forPath() * 2. 查询子节点: ls /: getChildren().forPath() * 3. 查询节点状态信息:ls -s /:getData().storingStatIn(状态对象).forPath() */ @Test public void testGet1() throws Exception { //1. 查询数据:get byte[] data = client.getData().forPath("/app1"); System.out.println(new String(data)); } @Test public void testGet2() throws Exception { // 2. 查询子节点: ls / List<String> path = client.getChildren().forPath("/"); System.out.println(path); } @Test public void testGet3() throws Exception { Stat status = new Stat(); // 状态对象,用来存储节点元信息 System.out.println(status); //3. 查询节点状态信息:ls -s / //.storingStatIn(status)代表存储状态信息到status对象中 client.getData().storingStatIn(status).forPath("/app1"); System.out.println(status); } //===========================set================================================================================ /** * 修改数据 * 1. 基本修改数据:setData().forPath() * 2. 根据版本修改: setData().withVersion().forPath() * * version 是通过查询出来的。目的就是为了让其他客户端或者线程不干扰我。 * * @throws Exception */ @Test public void testSet() throws Exception { // 基本修改节点数据 client.setData().forPath("/app1", "itcast".getBytes()); } @Test public void testSetForVersion() throws Exception { Stat status = new Stat(); //3. 查询节点状态信息:ls -s //.storingStatIn(status)代表存储状态信息到status对象中 client.getData().storingStatIn(status).forPath("/app1"); int version = status.getVersion();//查询出来的节点版本 System.out.println(version); // 根据版本修改节点数据,保证并发安全 client.setData().withVersion(version).forPath("/app1", "hehe".getBytes()); } //===========================delete================================================================================ /** * 删除节点: delete deleteall * 1. 删除单个节点:delete().forPath("/app1"); * 2. 删除带有子节点的节点:delete().deletingChildrenIfNeeded().forPath("/app1"); * 3. 必须成功的删除:为了防止网络抖动。本质就是重试。 client.delete().guaranteed().forPath("/app2"); * 4. 回调:inBackground * @throws Exception */ @Test public void testDelete() throws Exception { // 1. 删除单个节点 client.delete().forPath("/app1"); } @Test public void testDelete2() throws Exception { //2. 删除带有子节点的节点 client.delete().deletingChildrenIfNeeded().forPath("/app4"); } @Test public void testDelete3() throws Exception { //3. 必须成功的删除(自动重试保证删除成功) client.delete().guaranteed().forPath("/app2"); } @Test public void testDelete4() throws Exception { //4. 回调异步删除 client.delete().guaranteed().inBackground(new BackgroundCallback(){ @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { System.out.println("我被删除了~"); System.out.println(event); } }).forPath("/app1"); } @After public void close() { // 关闭客户端连接 if (client != null) { client.close(); } } }
2.2 使用Curator实现Watch事件监听的api
(1)Curator 2.x/4.x 常见的写法
使用者三个类(
NodeCache
、PathChildrenCache
、TreeCache
)
NodeCache
(监听一个节点自己)
PathChildrenCache
(监听某个节点的直接子节点)
TreeCache
(监听某个节点和所有子节点)从 Curator 5.x 开始,这三个类(
NodeCache
、PathChildrenCache
、TreeCache
)已经被 统一弃用,官方推荐用CuratorCache
来代替。
监听器 监听范围 典型应用场景 NodeCache
单个节点的数据变化 监听配置节点变化 PathChildrenCache
子节点的增删改,不监听本节点 监听服务节点上下线 TreeCache
节点及其所有子节点 全量配置或服务树监控 代码实现
public class CuratorWatcherTest { private CuratorFramework client; // Curator 客户端对象 /** * 建立连接 */ @Before public void testConnect() { /* * * @param connectString 连接字符串。zk server 地址和端口 "127.0.0.1:2181,127.0.0.1:2181" * @param sessionTimeoutMs 会话超时时间 单位ms * @param connectionTimeoutMs 连接超时时间 单位ms * @param retryPolicy 重试策略 */ /* //重试策略 RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10); //1.第一种方式 CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.149.135:2181", 60 * 1000, 15 * 1000, retryPolicy);*/ //重试策略:初始等待3秒,重试10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10); //2.第二种方式:通过builder方式构建客户端 //CuratorFrameworkFactory.builder(); client = CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181") // 设置ZK地址 .sessionTimeoutMs(60 * 1000) // 会话超时时间 .connectionTimeoutMs(15 * 1000) // 连接超时时间 .retryPolicy(retryPolicy) // 设置重试策略 // namespace("czq") // 设置命名空间,将 "czq" 作为客户端操作的根路径 // 这样之后创建节点时无需每次都写 /czq 前缀 // 例如:client.create().forPath("/node11", "hello1".getBytes()) // 实际会在 ZooKeeper 中创建 /czq/node11 节点 .namespace("czq") .build(); //开启连接 client.start(); } @After public void close() { if (client != null) { client.close(); // 关闭客户端连接 } } /** * 演示 NodeCache:给指定一个节点注册监听器 * NodeCache 只能监听某个具体节点的数据变化(新增/修改/删除) */ @Test public void testNodeCache() throws Exception { // 1. 创建 NodeCache 对象,监听 /app1 节点 final NodeCache nodeCache = new NodeCache(client,"/app1"); // 2. 注册监听器 nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("节点变化了~"); // 获取修改后的节点数据(如果节点被删除,这里可能会是 null) byte[] data = nodeCache.getCurrentData().getData(); System.out.println(new String(data)); } }); // 3. 开启监听 // 参数 true 表示在启动监听时,立即加载一次缓存数据 nodeCache.start(true); // 阻塞住主线程,保证监听器一直生效 while (true){ } } /** * 演示 PathChildrenCache:监听某个节点的所有子节点 * 只能监听子节点的变化(新增/修改/删除),不能监听当前节点本身 */ @Test public void testPathChildrenCache() throws Exception { // 1. 创建 PathChildrenCache 监听对象 // 参数 true 表示对子节点数据进行缓存 PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app2",true); // 2. 绑定监听器 pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { System.out.println("子节点变化了~"); System.out.println(event); // 监听子节点数据变更 PathChildrenCacheEvent.Type type = event.getType(); if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){ System.out.println("数据变了!!!"); byte[] data = event.getData().getData(); System.out.println(new String(data)); } } }); // 3. 开启监听 pathChildrenCache.start(); // 阻塞主线程,保证监听器一直生效 while (true){ } } /** * 演示 TreeCache:监听某个节点自己和它的所有子节点 * 相当于 NodeCache + PathChildrenCache 的结合体 */ @Test public void testTreeCache() throws Exception { // 1. 创建 TreeCache 监听对象 TreeCache treeCache = new TreeCache(client,"/app2"); // 2. 注册监听器 treeCache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { System.out.println("节点变化了"); System.out.println(event); } }); // 3. 开启监听 treeCache.start(); // 阻塞主线程,保证监听器一直生效 while (true){ } } }
(2)Curator 5.x以上常见的写法
从 Curator 5.x 开始,这三个类(
NodeCache
、PathChildrenCache
、TreeCache
)已经被 统一弃用,官方推荐用CuratorCache
来代替。(新版本)
方法名 对应旧API 监听范围 应用场景 testCuratorCacheNode
NodeCache 单节点数据变化 单个配置项 testCuratorCacheChildren
PathChildrenCache 子节点(不含父节点) 服务注册/发现 testCuratorCacheTree
TreeCache 节点 + 全部子节点 配置中心、全量监控 代码实现
public class CuratorWatcherTest { // Curator 客户端对象,用于操作 ZooKeeper private CuratorFramework client; /** * 建立连接 */ @Before public void testConnect() { /* * * @param connectString 连接字符串。zk server 地址和端口 "127.0.0.1:2181,127.0.0.1:2181" * @param sessionTimeoutMs 会话超时时间 单位ms * @param connectionTimeoutMs 连接超时时间 单位ms * @param retryPolicy 重试策略 */ /* //重试策略 RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10); //1.第一种方式 CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.149.135:2181", 60 * 1000, 15 * 1000, retryPolicy);*/ //重试策略:初始等待3秒,重试10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10); //2.第二种方式:通过builder方式构建客户端 //CuratorFrameworkFactory.builder(); client = CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181") // 设置ZK地址 .sessionTimeoutMs(60 * 1000) // 会话超时时间 .connectionTimeoutMs(15 * 1000) // 连接超时时间 .retryPolicy(retryPolicy) // 设置重试策略 // namespace("czq") // 设置命名空间,将 "czq" 作为客户端操作的根路径 // 这样之后创建节点时无需每次都写 /czq 前缀 // 例如:client.create().forPath("/node11", "hello1".getBytes()) // 实际会在 ZooKeeper 中创建 /czq/node11 节点 .namespace("czq") .build(); //开启连接 client.start(); } @After public void close() { // 测试完成后关闭客户端,释放资源 if (client != null) { client.close(); } } // ==============================替代 NodeCache============================================================================= /** * 1. 监听单个节点(替代 NodeCache) * 使用 CuratorCache + CuratorCacheListener 来代替旧的 NodeCache */ @Test public void testCuratorCacheNode() throws Exception { // 创建 CuratorCache,监听 /app1 节点 CuratorCache cache = CuratorCache.build(client, "/app1"); // 定义监听器,使用 forNodeCache 模式,只监听该节点数据变化 CuratorCacheListener listener = CuratorCacheListener.builder() .forNodeCache(new Runnable() { @Override public void run() { System.out.println("节点变化了~"); try { // 获取节点最新数据并打印 byte[] data = client.getData().forPath("/app1"); System.out.println("最新数据:" + new String(data)); } catch (Exception e) { e.printStackTrace(); } } }) .build(); // 将监听器绑定到缓存 cache.listenable().addListener(listener); // 开启缓存(监听) cache.start(); // 阻塞主线程,保证监听器一直运行 Thread.sleep(Long.MAX_VALUE); } // ==============================替代 PathChildrenCache============================================================================= /** * 2. 监听子节点变化(替代 PathChildrenCache) * 使用 CuratorCache + PathChildrenCacheListener 监听某节点的所有子节点 */ @Test public void testCuratorCacheChildren() throws Exception { // 创建 CuratorCache,监听 /app2 节点的子节点 CuratorCache cache = CuratorCache.build(client, "/app2"); // 定义监听器,forPathChildrenCache 表示只监听子节点的变化 CuratorCacheListener listener = CuratorCacheListener.builder() .forPathChildrenCache("/app2", client, new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { System.out.println("子节点变化了~"); System.out.println("类型:" + event.getType()); if (event.getData() != null) { // 打印节点路径和数据 System.out.println("节点:" + event.getData().getPath()); System.out.println("数据:" + new String(event.getData().getData())); } } }) .build(); // 将监听器绑定到缓存 cache.listenable().addListener(listener); // 开启缓存 cache.start(); // 阻塞主线程,保证监听器一直运行 Thread.sleep(Long.MAX_VALUE); } // ==============================替代 TreeCache============================================================================= /** * 3. 监听节点及其所有子节点(替代 TreeCache) * 使用 CuratorCache + TreeCacheListener 监听整个节点树 */ @Test public void testCuratorCacheTree() throws Exception { // 创建 CuratorCache,监听 /app2 节点及其子节点 CuratorCache cache = CuratorCache.build(client, "/app2"); // 定义监听器,forTreeCache 表示节点本身和子节点都监听 CuratorCacheListener listener = CuratorCacheListener.builder() .forTreeCache(client, new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { System.out.println("树节点变化了~"); System.out.println("类型:" + event.getType()); if (event.getData() != null) { // 打印节点路径和数据 System.out.println("节点:" + event.getData().getPath()); System.out.println("数据:" + new String(event.getData().getData())); } } }) .build(); // 将监听器绑定到缓存 cache.listenable().addListener(listener); // 开启缓存 cache.start(); // 阻塞主线程,保证监听器一直运行 Thread.sleep(Long.MAX_VALUE); } }
2.3 实现分布式锁
这里不做过多讲解详细去看我的另一篇博客: