springboot集成zookeeper的增删改查、节点监听、分布式读写锁、分布式计数器
ZooKeeper 是一个开源的分布式协调服务,由 Apache 软件基金会维护。它主要用于解决分布式系统中的一致性问题,提供高可用性和高性能的分布式数据管理服务。ZooKeeper 的设计目标是简化分布式应用的开发,帮助开发者处理分布式环境中的复杂问题,如配置管理、命名服务、分布式同步和组服务等。
ZooKeeper 的核心概念
1、ZNode
ZooKeeper 的数据模型类似于文件系统,数据存储在节点(ZNode)中。
ZNode 可以存储数据,并且可以有子节点,形成一个树状结构。
ZNode 分为持久节点(Persistent)和临时节点(Ephemeral),临时节点在客户端会话结束后会自动删除。
2、Watcher
Watcher 是 ZooKeeper 的一种通知机制,客户端可以在 ZNode 上设置 Watcher,当 ZNode 发生变化时,ZooKeeper 会通知客户端。
Watcher 是一次性的,触发后需要重新设置。
3、Session
客户端与 ZooKeeper 服务器之间的连接称为会话(Session)。
会话有超时时间,如果客户端在超时时间内没有与服务器通信,会话将失效。
4、集群(Ensemble)
ZooKeeper 通常以集群形式部署,称为 Ensemble。
集群中的服务器通过 Zab 协议(ZooKeeper Atomic Broadcast)保持数据一致性。
ZooKeeper 的主要功能
1、配置管理
ZooKeeper 可以用于集中管理分布式系统的配置信息,所有节点共享同一份配置数据。
2、命名服务
ZooKeeper 可以用于实现分布式系统中的命名服务,帮助客户端查找资源。
3、分布式锁
ZooKeeper 提供了分布式锁的实现,用于控制多个节点对共享资源的访问。
4、领导者选举
ZooKeeper 可以用于实现分布式系统中的领导者选举,确保系统中只有一个节点负责协调任务。
5、分布式队列
ZooKeeper 提供了分布式队列的实现,用于在多个节点之间共享任务。
ZooKeeper 的架构
ZooKeeper 的架构通常包括以下几个组件:
1、客户端(Client)
客户端通过 ZooKeeper 提供的 API 与服务器进行交互。
2、服务器(Server)
ZooKeeper 服务器负责处理客户端的请求,维护数据的一致性。
服务器分为 Leader 和 Follower,Leader 负责处理写请求,Follower 负责处理读请求。
3、Zab 协议
Zab 协议是 ZooKeeper 的核心协议,用于保证数据的一致性和顺序性。
使用 ZooKeeper 的优势
1、高可用性:ZooKeeper 以集群形式部署,具有高可用性。
2、一致性:ZooKeeper 通过 Zab 协议保证数据的一致性。
3、简单易用:ZooKeeper 提供了简单的 API,易于集成到分布式系统中。
Curator Recipes 是 Apache Curator 框架中的一组高级 API,旨在简化分布式系统中常见模式的实现。Apache Curator 是一个用于 Apache ZooKeeper 的客户端库,而 ZooKeeper 是一个分布式协调服务,广泛用于分布式系统中的配置管理、领导者选举、分布式锁等场景。
Curator Recipes 提供了一系列现成的解决方案,帮助开发者快速实现复杂的分布式系统模式,包括:
1、分布式锁(Distributed Lock)
提供可重入锁和不可重入锁的实现。
适用于需要跨多个节点同步资源的场景。
2、领导者选举(Leader Election)
实现分布式系统中的领导者选举。
适用于需要单一节点负责协调任务的场景。
3、分布式屏障(Barrier)
实现分布式系统中的屏障同步。
适用于需要多个节点同时开始或结束任务的场景。
4、分布式计数器(Distributed Counter)
提供分布式环境下的计数器功能。
适用于需要跨节点共享计数器的场景。
5、分布式队列(Distributed Queue)
实现分布式环境下的队列功能。
适用于需要跨节点共享任务的场景。
6、分布式缓存(Distributed Cache)
提供分布式环境下的缓存功能。
适用于需要跨节点共享数据的场景。
7、分布式信号量(Distributed Semaphore)
实现分布式环境下的信号量功能。
适用于需要控制资源访问的场景。
8、节点监听(Node Watcher)
提供对 ZooKeeper 节点的监听功能。
适用于需要实时监控节点变化的场景。
使用 Curator Recipes 的优势
1、简化开发:Curator Recipes 提供了高级 API,开发者无需从头实现复杂的分布式模式。
2、可靠性:Curator 封装了 ZooKeeper 的复杂性,提供了更稳定和可靠的实现。
3、灵活性:支持多种分布式模式,适用于不同的应用场景。
zk 常用的功能
添加依赖
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<zookeeper.version>3.4.8</zookeeper.version>
<curator.version>2.11.1</curator.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>${curator.version}</version>
</dependency>
</dependencies>
ZK 工具类
/**
* zookeeper客户端
*/
@Data
@Slf4j
public class ZkClient {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private CuratorFramework client;
public TreeCache cache;
private ZookeeperProperties zookeeperProperties;
public ZkClient(ZookeeperProperties zookeeperProperties){
this.zookeeperProperties = zookeeperProperties;
}
/**
* 初始化zookeeper客户端
*/
public void init() {
try{
RetryPolicy retryPolicy = new ExponentialBackoffRetry(zookeeperProperties.getBaseSleepTimeMs(),
zookeeperProperties.getMaxRetries());
Builder builder = CuratorFrameworkFactory.builder()
.connectString(zookeeperProperties.getServer()).retryPolicy(retryPolicy)
.sessionTimeoutMs( zookeeperProperties.getSessionTimeoutMs())
.connectionTimeoutMs( zookeeperProperties.getConnectionTimeoutMs())
.namespace( zookeeperProperties.getNamespace());
if(StringUtils.isNotEmpty( zookeeperProperties.getDigest())){
builder.authorization("digest", zookeeperProperties.getDigest().getBytes("UTF-8"));
builder.aclProvider(new ACLProvider() {
@Override
public List<ACL> getDefaultAcl() {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
@Override
public List<ACL> getAclForPath(final String path) {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
});
}
client = builder.build();
client.start();
initLocalCache("/test");
// addConnectionStateListener();
client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
public void stateChanged(CuratorFramework client, ConnectionState state) {
if (state == ConnectionState.LOST) {
//连接丢失
logger.info("lost session with zookeeper");
} else if (state == ConnectionState.CONNECTED) {
//连接新建
logger.info("connected with zookeeper");
} else if (state == ConnectionState.RECONNECTED) {
logger.info("reconnected with zookeeper");
}
}
});
}catch(Exception e){
e.printStackTrace();
}
}
/**
* 初始化本地缓存
* @param watchRootPath
* @throws Exception
*/
private void initLocalCache(String watchRootPath) throws Exception {
cache = new TreeCache(client, watchRootPath);
TreeCacheListener listener = (client1, event) ->{
log.info("event:" + event.getType() +
" |path:" + (null != event.getData() ? event.getData().getPath() : null));
if(event.getData()!=null && event.getData().getData()!=null){
log.info("发生变化的节点内容为:" + new String(event.getData().getData()));
}
// client1.getData().
};
cache.getListenable().addListener(listener);
cache.start();
}
public void stop() {
client.close();
}
public CuratorFramework getClient() {
return client;
}
/**
* 创建节点
* @param mode 节点类型
* 1、PERSISTENT 持久化目录节点,存储的数据不会丢失。
* 2、PERSISTENT_SEQUENTIAL顺序自动编号的持久化目录节点,存储的数据不会丢失
* 3、EPHEMERAL临时目录节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除
*4、EPHEMERAL_SEQUENTIAL临时自动编号节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除,并且根据当前已近存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点名。
* @param path 节点名称
* @param nodeData 节点数据
*/
public void createNode(CreateMode mode, String path , String nodeData) {
try {
//使用creatingParentContainersIfNeeded()之后Curator能够自动递归创建所有所需的父节点
client.create().creatingParentsIfNeeded().withMode(mode).forPath(path,nodeData.getBytes("UTF-8"));
} catch (Exception e) {
logger.error("注册出错", e);
}
}
/**
* 创建节点
* @param mode 节点类型
* 1、PERSISTENT 持久化目录节点,存储的数据不会丢失。
* 2、PERSISTENT_SEQUENTIAL顺序自动编号的持久化目录节点,存储的数据不会丢失
* 3、EPHEMERAL临时目录节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除
* 4、EPHEMERAL_SEQUENTIAL临时自动编号节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除,并且根据当前已近存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点名。
* @param path 节点名称
*/
public void createNode(CreateMode mode,String path ) {
try {
//使用creatingParentContainersIfNeeded()之后Curator能够自动递归创建所有所需的父节点
client.create().creatingParentsIfNeeded().withMode(mode).forPath(path);
} catch (Exception e) {
logger.error("注册出错", e);
}
}
/**
* 删除节点数据
*
* @param path
*/
public void deleteNode(final String path) {
try {
deleteNode(path,true);
} catch (Exception ex) {
log.error("{}",ex);
}
}
/**
* 删除节点数据
* @param path
* @param deleteChildre 是否删除子节点
*/
public void deleteNode(final String path,Boolean deleteChildre){
try {
if(deleteChildre){
//guaranteed()删除一个节点,强制保证删除,
// 只要客户端会话有效,那么Curator会在后台持续进行删除操作,直到删除节点成功
client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
}else{
client.delete().guaranteed().forPath(path);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 设置指定节点的数据
* @param path
* @param datas
*/
public void setNodeData(String path, byte[] datas){
try {
client.setData().forPath(path, datas);
}catch (Exception ex) {
log.error("{}",ex);
}
}
/**
* 获取指定节点的数据
* @param path
* @return
*/
public byte[] getNodeData(String path){
Byte[] bytes = null;
try {
if(cache != null){
ChildData data = cache.getCurrentData(path);
if(data != null){
return data.getData();
}
}
client.getData().forPath(path);
return client.getData().forPath(path);
}catch (Exception ex) {
log.error("{}",ex);
}
return null;
}
/**
* 获取数据时先同步
* @param path
* @return
*/
public byte[] synNodeData(String path){
client.sync();
return getNodeData( path);
}
/**
* 判断路径是否存在
*
* @param path
* @return
*/
public boolean isExistNode(final String path) {
client.sync();
try {
return null != client.checkExists().forPath(path);
} catch (Exception ex) {
return false;
}
}
/**
* 获取节点的子节点
* @param path
* @return
*/
public List<String> getChildren(String path) {
List<String> childrenList = new ArrayList<>();
try {
childrenList = client.getChildren().forPath(path);
} catch (Exception e) {
logger.error("获取子节点出错", e);
}
return childrenList;
}
/**
* 随机读取一个path子路径, "/"为根节点对应该namespace
* 先从cache中读取,如果没有,再从zookeeper中查询
* @param path
* @return
* @throws Exception
*/
public String getRandomData(String path) {
try{
Map<String,ChildData> cacheMap = cache.getCurrentChildren(path);
if(cacheMap != null && cacheMap.size() > 0) {
logger.debug("get random value from cache,path="+path);
Collection<ChildData> values = cacheMap.values();
List<ChildData> list = new ArrayList<>(values);
Random rand = new Random();
byte[] b = list.get(rand.nextInt(list.size())).getData();
return new String(b,"utf-8");
}
if(isExistNode(path)) {
logger.debug("path [{}] is not exists,return null",path);
return null;
} else {
logger.debug("read random from zookeeper,path="+path);
List<String> list = client.getChildren().forPath(path);
if(list == null || list.size() == 0) {
logger.debug("path [{}] has no children return null",path);
return null;
}
Random rand = new Random();
String child = list.get(rand.nextInt(list.size()));
path = path + "/" + child;
byte[] b = client.getData().forPath(path);
String value = new String(b,"utf-8");
return value;
}
}catch(Exception e){
log.error("{}",e);
}
return null;
}
/**
* 可重入共享锁 -- Shared Reentrant Lock
* @param lockPath
* @param time
* @param dealWork 获取
* @return
*/
public Object getSRLock(String lockPath,long time, SRLockDealCallback<?> dealWork){
InterProcessMutex lock = new InterProcessMutex(client, lockPath);
try {
if (!lock.acquire(time, TimeUnit.SECONDS)) {
log.error("get lock fail:{}", " could not acquire the lock");
return null;
}
log.debug("{} get the lock",lockPath);
Object b = dealWork.deal();
return b;
}catch(Exception e){
log.error("{}", e);
}finally{
try {
lock.release();
} catch (Exception e) {
//log.error("{}",e);
}
}
return null;
}
/**
* 获取读写锁
* @param path
* @return
*/
public InterProcessReadWriteLock getReadWriteLock(String path){
InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, path);
return readWriteLock;
}
/**
* 在注册监听器的时候,如果传入此参数,当事件触发时,逻辑由线程池处理
*/
ExecutorService pool = Executors.newFixedThreadPool(2);
/**
* 监听数据节点的变化情况
* @param watchPath
* @param listener
*/
public void watchPath(String watchPath,TreeCacheListener listener){
// NodeCache nodeCache = new NodeCache(client, watchPath, false);
TreeCache cache = new TreeCache(client, watchPath);
cache.getListenable().addListener(listener,pool);
try {
cache.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
配置文件
zookeeper.enabled: true
#zookeeper.server: 47.106.106.53:9036,47.106.106.53:9037,47.106.106.53:9038
zookeeper.server: 10.10.2.137:2181,10.10.2.138:2181,10.10.2.139:2181
zookeeper.namespace: demo
zookeeper.digest: rt:rt #zkCli.sh acl 命令 addauth digest mpush
zookeeper.sessionTimeoutMs: 1000 #会话超时时间,单位为毫秒,默认60000ms,连接断开后,其它客户端还能请到临时节点的时间
zookeeper.connectionTimeoutMs: 6000 #连接创建超时时间,单位为毫秒
zookeeper.maxRetries: 3 #最大重试次数
zookeeper.baseSleepTimeMs: 1000 #初始sleep时间 ,毫秒
controller
@Api(tags="zookeeper基本操作")
@RequestMapping("/zk")
@RestController
@Slf4j
public class ZookeeperController {
@Autowired
private ZkClient zkClient;
@Autowired
private ZkClient zkClientTest;
/**
* 创建节点
* @param type
* @param znode
* @return
*/
@ApiOperation(value = "创建节点",notes = "在命名空间下创建节点")
@ApiImplicitParams({
@ApiImplicitParam(name ="type",value = "节点类型:<br> 0 持久化节点<br> 1 临时节点<br> 2 持久顺序节点<br> 3 临时顺序节点",
allowableValues = "0,1,2,3",defaultValue="3",paramType = "path",required = true,dataType = "Long"),
@ApiImplicitParam(name ="znode",value = "节点名称",paramType = "path",required = true,dataType = "String"),
@ApiImplicitParam(name ="nodeData",value = "节点数据",paramType = "body",dataType = "String")
})
@RequestMapping(value = "/create/{type}/{znode}",method=RequestMethod.POST)
private String create(@PathVariable Integer type,@PathVariable String znode,@RequestBody String nodeData){
znode = "/" + znode;
try {
zkClient.createNode(CreateMode.fromFlag(type),znode,nodeData);
} catch (KeeperException e) {
e.printStackTrace();
}
return znode;
}
/**
* 设置节点数据
* @param znode
* @return
*/
@ApiOperation(value = "设置节点数据",notes = "设置节点数据")
@ApiImplicitParams({
@ApiImplicitParam(name ="znode",value = "节点名称",paramType = "body",required = true,dataType = "String"),
@ApiImplicitParam(name ="nodeData",value = "节点数据",paramType = "query",required = true,dataType = "String")
})
@RequestMapping(value = "/update",method=RequestMethod.POST)
public String update(@RequestBody String znode,@RequestParam String nodeData){
znode = "/" + znode;
zkClient.setNodeData(znode,nodeData.getBytes());
return "sucess";
}
@ApiOperation(value = "删除节点",notes = "删除节点")
@ApiImplicitParams({
@ApiImplicitParam(name ="znode",value = "节点名称",paramType = "query",required = true,dataType = "String")
})
@RequestMapping(value = "/delete",method=RequestMethod.GET)
public String delete(@RequestParam String znode){
znode = "/" + znode;
zkClient.deleteNode(znode);
return "success";
}
@ApiOperation(value = "查找节点的内容",notes = "查找节点的内容")
@ApiImplicitParams({
@ApiImplicitParam(name ="znode",value = "节点名称",paramType = "body",required = true,dataType = "String")
})
@RequestMapping(value = "/find",method=RequestMethod.POST)
public String find(@RequestBody String znode){
znode = "/" + znode;
byte[] b = zkClient.getNodeData(znode);
return new String(b);
}
/**
* 给节点添加读写锁
* @param znode
* @return
*/
@ApiOperation(value = "添加读写锁",notes = "写锁跟读锁互斥,读锁跟读锁共享")
@ApiImplicitParams({
@ApiImplicitParam(name ="lockType",value = "锁类型:<br> 0 写锁<br> 1 读锁",
allowableValues = "0,1",defaultValue="0",paramType = "query",required = true,dataType = "Long"),
@ApiImplicitParam(name ="znode",value = "节点名称",paramType = "query",required = true,dataType = "String")
})
@RequestMapping(value = "/writeLock",method=RequestMethod.GET)
public String readLock(@RequestParam Integer lockType,@RequestParam String znode){
znode = "/" + znode;
InterProcessReadWriteLock readWriteLock = zkClient.getReadWriteLock(znode);
InterProcessMutex writeLock = readWriteLock.writeLock();
InterProcessMutex readLock = readWriteLock.readLock();
Runnable writeRunnable = ()->{
try {
System.out.println("------write lock-----------");
writeLock.acquire();
System.out.println("write acquire");
Thread.sleep(10_000);
System.out.println("write release");
writeLock.release();
} catch (Exception e) {
e.printStackTrace();
}
};
Runnable readRunnable = ()->{
try {
System.out.println("-------read lock----------");
readLock.acquire();
System.out.println("read acquire");
Thread.sleep(20_000);
System.out.println("read release");
readLock.release();
} catch (Exception e) {
e.printStackTrace();
}
};
if(lockType == 0 ){
new Thread(writeRunnable).start();
}else if(lockType == 1){
new Thread(readRunnable).start();
}
return "success";
}
/**
* 监听节点
* @param znode
* @return
*/
@ApiOperation(value = "监听节点",notes = "监控整个树上的所有节点")
@ApiImplicitParams(
@ApiImplicitParam(name ="znode",value = "节点名称",paramType = "body",required = true,dataType = "String")
)
@RequestMapping(value="/watchPath",method=RequestMethod.POST)
public String watchPath(@RequestBody String znode){
znode = "/" + znode;
zkClient.watchPath(znode,(client1, event) ->{
log.info("event:" + event.getType() +
" |path:" + (null != event.getData() ? event.getData().getPath() : null));
if(event.getData()!=null && event.getData().getData()!=null){
log.info("发生变化的节点内容为:" + new String(event.getData().getData()));
}
});
return "success";
}
/**
* 测试计算器
* 并发越高耗时越长
* 要自己实现获取锁失败重试
* @return
*/
@ApiOperation(value = "模拟分布式计数器",notes = "模拟分布式计数器")
@RequestMapping(value="/counter",method=RequestMethod.POST)
public String counter(@RequestBody String znode){
SharedCount baseCount = new SharedCount(zkClientTest.getClient(), znode, 0);
try {
baseCount.start();
//生成线程池
ExecutorService executor = Executors.newCachedThreadPool();
Consumer<SharedCount> consumer = (SharedCount count) -> {
try {
List<Callable<Boolean>> callList = new ArrayList<>();
Callable<Boolean> call = () -> {
boolean result = false;
try {
Long time = System.currentTimeMillis();
while(!result){
VersionedValue<Integer> oldVersion = baseCount.getVersionedValue();
int newCnt = oldVersion.getValue() + 1;
result = baseCount.trySetCount(oldVersion, newCnt);
if(System.currentTimeMillis()-time>10_000||result){
break;
}
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(100)+1);
}
} catch (Exception e) {
}
return result;
};
//5个线程
for (int i = 0; i < 100; i++) {
callList.add(call);
}
List<Future<Boolean>> futures = executor.invokeAll(callList);
} catch (Exception e) {
}
};
//测试分布式int类型的计数器
consumer.accept(baseCount);
System.out.println("final cnt : " + baseCount.getCount());
} catch (Exception e) {
e.printStackTrace();
}
return "success:"+baseCount.getCount();
}
/**
* DistributedAtomicLong计数器可以自己设置重试的次数与间隔
* 并发越高耗时越长
* 要自己实现获取锁失败重试
*/
@ApiOperation(value = "模拟分布式计数器2",notes = "模拟分布式计数器2")
@RequestMapping(value="/counter2",method=RequestMethod.POST)
public String distributedCount(@RequestBody String znode) throws Exception {
DistributedAtomicLong distributedAtomicLong = new DistributedAtomicLong(
zkClientTest.getClient(), znode, new RetryNTimes(10, 30));
//生成线程池
ExecutorService executor = Executors.newCachedThreadPool();
Consumer<DistributedAtomicLong> consumer = (DistributedAtomicLong count) -> {
try {
List<Callable<Boolean>> callList = new ArrayList<>();
Callable<Boolean> call = () -> {
boolean result = false;
try {
AtomicValue<Long> val = count.increment();
System.out.println("old cnt: "+val.preValue()+" new cnt : "+ val.postValue()+" result:"+val.succeeded());
result = val.succeeded();
} catch (Exception e) {
} finally {
}
return result;
};
//5个线程
for (int i = 0; i < 500; i++) {
callList.add(call);
}
List<Future<Boolean>> futures = executor.invokeAll(callList);
} catch (Exception e) {
}
};
consumer.accept(distributedAtomicLong);
return "success:"+distributedAtomicLong.get().postValue();
}
/**
*
* @return
* @throws KeeperException
*/
@ApiOperation(value = "模拟服务注册和随机获取服务",notes = "模拟服务注册和随机获取服务")
@RequestMapping(value="/serviceRegistry",method=RequestMethod.POST)
public String serviceRegistry() throws KeeperException {
//服务注册
zkClient.createNode(CreateMode.fromFlag(1),"/test/hello_service1","http://1270.0.1:8001/");
zkClient.createNode(CreateMode.fromFlag(1),"/test/hello_service2","http://1270.0.1:8002/");
zkClient.createNode(CreateMode.fromFlag(1),"/test/hello_service3","http://1270.0.1:8003/");
return zkClient.getRandomData("/test");
}
}