Curator+Zookeeper实现分布式锁的示例

发布于:2024-09-18 ⋅ 阅读:(11) ⋅ 点赞:(0)

1. 文章导读

随着互联网的兴起,应用的单服务器部署已无法满足日益增长的用户需求,此时云服务+分布式技术应用而生,解决了应用弹性伸缩的问题。
问题来了,分布式不同于单机,在单机应用下可以使用Java的并发编程API实现对共享资源的控制,而在分布式应用系统环境则需要借助Curator+Zookeeper来实现。
本文基于Curator+Zookeeper实现各种分布式锁,以实现分布式环境下对有限共享资源的并发访问。

2. 共享非可重入锁

2.1 概念

何为共享锁,所有分布式中的应用进程之间共享的锁,确保每次只能有一个进程获取到锁,其它应用需要等待持有者释放后,才能获取到锁。

2.2 源码解读

  • 公共代码片段
package com.example.curatorsimpledemo.recipes;  
  
import org.apache.curator.RetryPolicy;  
import org.apache.curator.framework.CuratorFramework;  
import org.apache.curator.framework.CuratorFrameworkFactory;  
import org.apache.curator.framework.recipes.locks.InterProcessLock;  
import org.apache.curator.framework.recipes.locks.InterProcessMutex;  
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;  
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;  
import org.apache.curator.framework.recipes.locks.InterProcessMultiLock;  
import org.apache.curator.retry.ExponentialBackoffRetry;  
import org.apache.curator.utils.CloseableUtils;  
import org.junit.After;  
import org.junit.Assert;  
import org.junit.Before;  
import org.junit.Test;  
import org.junit.runner.RunWith;  
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;  
  
import java.util.Arrays;  
import java.util.concurrent.TimeUnit;  
  
@RunWith(SpringJUnit4ClassRunner.class)  
public class DistributedLockDemo {

		// ZooKeeper 锁节点路径, 分布式锁的相关操作都是在这个节点上进行  
		private final String lockPath = "/distri-lock";  
		// ZooKeeper 服务地址, 单机格式为:(127.0.0.1:2181), 集群格式为:(127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183)  
		private String connectString;  
		// Curator 客户端重试策略  
		private RetryPolicy retry;  
		// Curator 客户端对象  
		private CuratorFramework client;  
		// client2 用户模拟其他客户端  
		private CuratorFramework client2;  
		  
		// 初始化资源  
		@Before  
		public void init() throws Exception {  
		    // 设置 ZooKeeper 服务地址为本机的 2181 端口  
		    connectString = "192.168.206.100:2181";  
		    // 重试策略  
		    // 初始休眠时间为 1000ms, 最大重试次数为 3    retry = new ExponentialBackoffRetry(1000, 3);  
		    // 创建一个客户端, 60000(ms)为 session 超时时间, 15000(ms)为链接超时时间  
		    client = CuratorFrameworkFactory.newClient(connectString, 60000, 15000, retry);  
		    client2 = CuratorFrameworkFactory.newClient(connectString, 60000, 15000, retry);  
		    // 创建会话  
		    client.start();  
		    client2.start();  
		}  
		  
		// 释放资源  
		@After  
		public void close() {  
		    CloseableUtils.closeQuietly(client);  
		}
}
  • 共享非可重入锁的测试用例
@Test  
public void sharedLock() throws Exception {  
    // 创建共享锁 模拟第1个进程  
    InterProcessLock lock = new InterProcessSemaphoreMutex(client, lockPath);  
    // lock2 用于模拟分布式环境中的其它进程  
    InterProcessLock lock2 = new InterProcessSemaphoreMutex(client2, lockPath);  
    // 直接获取锁对象  
    lock.acquire();  
    // 测试是否可以重入  
    /**  超时获取锁对象的方法 boolean acquire(long var1, TimeUnit var3) 第一个参数为时间, 第二个参数为时间单位。 给获取所对象设置个超时时间,超过指定时间没有获得锁则返回false。  
        因为锁已经被获取, 所以返回 false。  
     **/  
    Assert.assertFalse(lock.acquire(2, TimeUnit.SECONDS));  
    // 继续测试进程2能否获取锁, 同样lockPath已经被锁lock锁住,进程2没法再用lock2去加锁。  
    Assert.assertFalse(lock2.acquire(2, TimeUnit.SECONDS));  
    // 进程1释放lockPath的锁lock  
    lock.release();  
    // lock2 尝试获取锁成功, 因为锁已经被释放  
    Assert.assertTrue(lock2.acquire(2, TimeUnit.SECONDS));  
    lock2.release();  
}

因为共享非可重入锁都是锁定Zookeeper的相同path节点,所以同一时间只能有一个进程获取到锁。只有进程1释放锁以后,进程2才能获取到锁。

2.3 相关API

  • org.apache.curator.framework.CuratorFrameworkFactory
    模拟一个访问Zookeeper的客户端。

  • *org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex

    • A NON re-entrant mutex that works across JVMs. Uses Zookeeper to hold the lock. All processes in all JVMs that use the same lock path will achieve an inter-process critical section.
      一个在跨多个JVM进程间的不支持重入的共享锁, 使用Zookeeper来实现锁。所有的进程使用相同的Zookeeper 锁路径来实现进程间的关键部分。

    • 构造方法 public InterProcessSemaphoreMutex(CuratorFramework client, String path)
      client: CuratorFramework对象。
      path: Zookeeper的ZNODE路径(path)。

    • method

      • void acquire(); //获取锁,直到锁可用。这个方法会引发阻塞,只有在确保能获取到锁的前提下才使用
      • boolean acquire(long time, TimeUnit unit) // 在指定的超时时间内获取锁,获取不到则返回false。
      • void release(); //释放进程对象持有的锁

3. 共享可重入锁

3.1 概念

这里的可重入是指获得锁的进程(对象)在没有释放锁的前提下,仍然可以执行acquire()方法获得锁对象,其它进程(对象)是不能获得锁对象的。
注意的是,可重入锁的释放需要和进入的次数相匹配,如果释放的次数小于进入的次数,其它进程仍然不能获得锁对象。

3.2 源码解读

@Test  
public void sharedReentrantLock() throws Exception {  
    // 创建可重入锁  
    InterProcessLock lock = new InterProcessMutex(client, lockPath);  
    // lock2 用于模拟其他客户端  
    InterProcessLock lock2 = new InterProcessMutex(client2, lockPath);  
    // lock 获取锁  
    lock.acquire();  
    try {  
       // lock 第二次获取锁  
       lock.acquire();  
       try {  
          // lock2 超时获取锁, 因为锁已经被 lock 客户端占用, 所以获取失败, 需要等 lock 释放  
          Assert.assertFalse(lock2.acquire(2, TimeUnit.SECONDS));  
       } finally {  
          lock.release();  
       }  
    } finally {  
       // 重入锁获取与释放需要一一对应, 如果获取 2 次, 释放 1 次, 那么该锁依然是被占用, 如果将下面这行代码注释, 那么会发现下面的 lock2 获取锁失败  
       lock.release();  
    }  
    // 在 lock 释放后, lock2 能够获取锁  
    Assert.assertTrue(lock2.acquire(2, TimeUnit.SECONDS));  
    lock2.release();  
}

3.3 相关API

  • org.apache.curator.framework.recipes.locks.InterProcessMutex 共享可重入锁
    • A re-entrant mutex that works across JVMs. Uses Zookeeper to hold the lock. All processes in all JVMs that use the same lock path will achieve an inter-process critical section. Further, this mutex is “fair” - each user will get the mutex in the order requested (from ZK’s point of view) 当申请锁的应用有多个时,Zookeeper能按照顺序获得

4. 共享可重入读写锁

4.1 概念

  • A re-entrant read/write mutex that works across JVMs. Uses Zookeeper to hold the lock. All processes in all JVMs that use the same lock path will achieve an inter-process critical section. Further, this mutex is “fair” - each user will get the mutex in the order requested (from ZK’s point of view).

  • A read write lock maintains a pair of associated locks, one for read-only operations and one for writing. The read lock may be held simultaneously by multiple reader processes, so long as there are no writers. The write lock is exclusive.
    读写锁维护一对相关联的锁,一个用于只读操作,另一个用于写入。只要没有写入程序,读取锁就可以由多个读取器进程同时持有。写锁是独占的。

  • Reentrancy
    This lock allows both readers and writers to reacquire read or write locks in the style of a re-entrant lock. Non-re-entrant readers are not allowed until all write locks held by the writing thread/process have been released. Additionally, a writer can acquire the read lock, but not vice-versa. If a reader tries to acquire the write lock it will never succeed
    此锁允许读取器和写入器以可重入锁的方式重新获取读锁或写锁。在写线程/进程持有的所有写锁都被释放之前,不允许使用非重入读取器。此外,写入器可以获取读取锁,但反之则不然。如果读取器试图获取写锁,它永远不会成功。

  • Lock downgrading
    Re-entrancy also allows downgrading from the write lock to a read lock, by acquiring the write lock, then the read lock and then releasing the write lock. However, upgrading from a read lock to the write lock is not possible.
    重新进入还允许通过获取写锁、读取锁然后释放写锁,从写锁降级为读锁。但是,无法从读锁升级到写锁。

4.2 源码解读

  • 读写锁测试
/**  
 * 共享可重入读写锁  
 * @throws Exception  
 */  
@Test  
public void sharedReentrantReadWriteLock() throws Exception {  
    // 创建读写锁对象, Curator 以公平锁的方式进行实现  
    InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, lockPath);  
    // lock2 用于模拟其他客户端  
    InterProcessReadWriteLock lock2 = new InterProcessReadWriteLock(client2, lockPath);  
    // 使用 lock 模拟读操作  
    // 使用 lock2 模拟写操作  
    // 获取读锁(使用 InterProcessMutex 实现, 所以是可以重入的)  
    InterProcessLock readLock = lock.readLock();  
    // 获取写锁(使用 InterProcessMutex 实现, 所以是可以重入的)  
    InterProcessLock writeLock = lock2.writeLock();  
    ReadWriteLockTest readWriteLockTest = new ReadWriteLockTest(readLock, writeLock);  
    readWriteLockTest.test();  
    readWriteLockTest.waitThread();  
}
package com.example.curatorsimpledemo.recipes;  
  
import org.apache.curator.framework.recipes.locks.InterProcessLock;  
import java.util.HashSet;  
import java.util.Set;  
  
public class ReadWriteLockTest {  
  
    private InterProcessLock readLock;  
    private InterProcessLock writeLock;  
    // 测试数据变更字段  
    private Integer testData = 0;  
    private Set<Thread> threadSet = new HashSet<>();  
  
    public ReadWriteLockTest(InterProcessLock readLock, InterProcessLock writeLock){  
        this.readLock = readLock;  
        this.writeLock = writeLock;  
    }  
  
    // 写入数据  
    private void write() throws Exception {  
        writeLock.acquire();  
        try {  
            Thread.sleep(10);  
            testData++;  
            System.out.println("写入数据 \t" + testData);  
        } finally {  
            writeLock.release();  
        }  
    }  
  
    // 读取数据  
    private void read() throws Exception {  
        readLock.acquire();  
        try {  
            Thread.sleep(10);  
            System.out.println("读取数据 \t" + testData);  
        } finally {  
            readLock.release();  
        }  
    }  
  
    // 等待线程结束, 防止 test 方法调用完成后, 当前线程直接退出, 导致控制台无法输出信息  
    public void waitThread() throws InterruptedException {  
        for (Thread thread : threadSet) {  
            thread.join();  
        }  
    }  
  
    // 创建线程方法  
    private void createThread(int type) {  
        Thread thread = new Thread(new Runnable() {  
            @Override  
            public void run() {  
                try {  
                    if (type == 1) {  
                        write();  
                    } else {  
                        read();  
                    }  
                } catch (Exception e) {  
                    e.printStackTrace();  
                }  
            }  
        });  
        threadSet.add(thread);  
        thread.start();  
    }  
  
    public void test() {  
        for (int i = 0; i < 5; i++) {  
            createThread(1);  
        }  
        for (int i = 0; i < 5; i++) {  
            createThread(2);  
        }  
    }  
  
}
  • 在写线程/进程持有的所有写锁都被释放之前,不允许使用非重入读取器。
@Test  
public void sharedReentrantReadWriteLock2() throws Exception {  
    // 创建读写锁对象, Curator 以公平锁的方式进行实现  
    InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, lockPath);  
    // lock2 用于模拟其他客户端  
    InterProcessReadWriteLock lock2 = new InterProcessReadWriteLock(client2, lockPath);  
    // 使用 lock 模拟读操作  
    // 使用 lock2 模拟写操作  
    // 获取读锁(使用 InterProcessMutex 实现, 所以是可以重入的)  
    InterProcessLock readLock = lock.readLock();  
    // 获取写锁(使用 InterProcessMutex 实现, 所以是可以重入的)  
    InterProcessLock writeLock = lock2.writeLock();  
    // 场景1,写进程client2在获得writeLock后,没有释放,其它进程没法获得写锁  
    writeLock.acquire();//client2获取写锁  
    Assert.assertFalse(lock.writeLock().acquire(2, TimeUnit.SECONDS)); //client尝试获取写锁,失败。  
    writeLock.release();  
    Assert.assertTrue(lock.writeLock().acquire(2, TimeUnit.SECONDS)); //写锁释放后,client尝试获取写锁,成功。  
    lock.writeLock().release();  
}
  • 写入器可以获取读取锁,但反之则不然。如果读取器试图获取写锁,它永远不会成功。
/**  
 * 共享可重入读写锁:  
 * 写入器可以获取读取锁,但反之则不然。如果读取器试图获取写锁,它永远不会成功。  
 * @throws Exception  
 */  
@Test  
public void sharedReentrantReadWriteLock3() throws Exception {  
    // 创建读写锁对象, Curator 以公平锁的方式进行实现  
    InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, lockPath);  
    // lock2 用于模拟其他客户端  
    InterProcessReadWriteLock lock2 = new InterProcessReadWriteLock(client2, lockPath);  
    // 使用 lock 模拟读操作  
    // 使用 lock2 模拟写操作  
    // 获取读锁(使用 InterProcessMutex 实现, 所以是可以重入的)  
    InterProcessLock readLock = lock.readLock();  
    // 获取写锁(使用 InterProcessMutex 实现, 所以是可以重入的)  
    InterProcessLock writeLock = lock2.writeLock();  
    // 场景2,写入器获取读锁 --成功; 读取器获取写锁-- 失败。  
    readLock.acquire(2, TimeUnit.SECONDS);//client获取读锁  
    writeLock.acquire(2, TimeUnit.SECONDS);//client2获取写锁  
    //client2尝试获取读锁,成功  
    Assert.assertTrue(lock2.readLock().acquire(2, TimeUnit.SECONDS));  
    //client尝试获取写锁,失败  
    Assert.assertFalse(lock.writeLock().acquire(2, TimeUnit.SECONDS));  
}

4.3 相关API

5. 多重共享锁

5.1 概念

利用多个锁进行加锁操作。

5.2 源码解析

**  
 * 多重共享锁: 多重锁的意思是锁上加锁。  
 * @throws Exception  
 */  
@Test  
public void multiLock() throws Exception {  
    // 可重入锁  
    InterProcessLock interProcessLock1 = new InterProcessMutex(client, lockPath);  
    // 不可重入锁  
    InterProcessLock interProcessLock2 = new InterProcessSemaphoreMutex(client2, lockPath);  
    // 创建多重锁对象  
    InterProcessLock lock = new InterProcessMultiLock(Arrays.asList(interProcessLock1, interProcessLock2));  
    // 获取参数集合中的所有锁  
    lock.acquire();  
  
    // 因为存在一个不可重入锁, 所以整个 InterProcessMultiLock 不可重入  
    Assert.assertFalse(lock.acquire(2, TimeUnit.SECONDS));  
    // interProcessLock1 是可重入锁, 所以可以继续获取锁  
    Assert.assertTrue(interProcessLock1.acquire(2, TimeUnit.SECONDS));  
    // interProcessLock2 是不可重入锁, 所以获取锁失败  
    Assert.assertFalse(interProcessLock2.acquire(2, TimeUnit.SECONDS));  
  
    // 释放参数集合中的所有锁  
    lock.release();  
  
    // interProcessLock2 中的锁已经释放, 所以可以获取  
    Assert.assertTrue(interProcessLock2.acquire(2, TimeUnit.SECONDS));  
  
}

5.3 相关API

6. 利用信号量控制并发数量

6.1 概念

信号量semaphore>0能继续后续的操作,=0时不能继续操作,用来模拟控制多少个用户进入。
针对同一个信号量,释放则信号量+1,进入则信号量-1。

6.2 源码解析

package com.example.curatorsimpledemo.recipes;  
  
  
import org.apache.curator.RetryPolicy;  
import org.apache.curator.framework.CuratorFramework;  
import org.apache.curator.framework.CuratorFrameworkFactory;  
import org.apache.curator.framework.recipes.locks.Lease;  
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2;  
import org.apache.curator.retry.ExponentialBackoffRetry;  
import org.apache.curator.utils.CloseableUtils;  
import org.junit.After;  
import org.junit.Assert;  
import org.junit.Before;  
import org.junit.Test;  
import org.junit.runner.RunWith;  
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;  
  
import java.util.Collection;  
import java.util.concurrent.TimeUnit;  
  
  
@RunWith(SpringJUnit4ClassRunner.class)  
public class SemaphoreDemo {  
  
    // ZooKeeper 锁节点路径, 分布式锁的相关操作都是在这个节点上进行  
    private final String lockPath = "/distri-lock";  
    // ZooKeeper 服务地址, 单机格式为:(127.0.0.1:2181), 集群格式为:(127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183)  
    private String connectString;  
    // Curator 客户端重试策略  
    private RetryPolicy retry;  
    // Curator 客户端对象  
    private CuratorFramework client;  
    // client2 用户模拟其他客户端  
    private CuratorFramework client2;  
  
    // 初始化资源  
    @Before  
    public void init() throws Exception {  
        // 设置 ZooKeeper 服务地址为本机的 2181 端口  
        connectString = "192.168.206.100:2181";  
        // 重试策略  
        // 初始休眠时间为 1000ms, 最大重试次数为 3        retry = new ExponentialBackoffRetry(1000, 3);  
        // 创建一个客户端, 60000(ms)为 session 超时时间, 15000(ms)为链接超时时间  
        client = CuratorFrameworkFactory.newClient(connectString, 60000, 15000, retry);  
        client2 = CuratorFrameworkFactory.newClient(connectString, 60000, 15000, retry);  
        // 创建会话  
        client.start();  
        client2.start();  
    }  
  
    // 释放资源  
    @After  
    public void close() {  
        CloseableUtils.closeQuietly(client);  
    }  
  
    /**  
     * 何为信号量,信号量semaphore>0能继续后续的操作,=0时不能继续操作,用来模拟控制多少个用户进入。  
     * 针对同一个信号量,释放则信号量+1,进入则信号量-1。  
     */  
    @Test  
    public void semaphore() throws Exception {  
        // 创建一个信号量, Curator 以公平锁的方式进行实现  
        /**  
         * public InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases)         * 第3个参数 maxLeases,指定信号量最大许可数量。  
         */  
        InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, lockPath, 6);  
        // semaphore2 用于模拟其他客户端  
        InterProcessSemaphoreV2 semaphore2 = new InterProcessSemaphoreV2(client2, lockPath, 6);  
  
        // 获取一个许可  
        Lease lease = semaphore.acquire();  
        Assert.assertNotNull(lease);  
        // semaphore.getParticipantNodes() 会返回当前参与信号量的节点列表, 俩个客户端所获取的信息相同  
        Assert.assertEquals(semaphore.getParticipantNodes(), semaphore2.getParticipantNodes());  
  
        // 超时获取一个许可  
        Lease lease2 = semaphore2.acquire(2, TimeUnit.SECONDS);  
        Assert.assertNotNull(lease2);  
        Assert.assertEquals(semaphore.getParticipantNodes(), semaphore2.getParticipantNodes());  
  
        // 获取多个许可, 参数为许可数量  
        Collection<Lease> leases = semaphore.acquire(2, 2, TimeUnit.SECONDS);  
        Assert.assertTrue(leases.size() == 2);  
        Assert.assertEquals(semaphore.getParticipantNodes(), semaphore2.getParticipantNodes());  
  
        // 超时获取多个许可, 第一个参数为许可数量  
        Collection<Lease> leases2 = semaphore2.acquire(2, 2, TimeUnit.SECONDS);  
        if(leases2 != null){  
            Assert.assertTrue(leases2.size() == 2);  
            Assert.assertEquals(semaphore.getParticipantNodes(), semaphore2.getParticipantNodes());  
        }  
  
        // 目前 semaphore 已经获取 3 个许可, semaphore2 也获取 3 个许可, 加起来为 6 个, 所以他们无法再进行许可获取  
        Assert.assertNull(semaphore.acquire(2, TimeUnit.SECONDS));  
        Assert.assertNull(semaphore2.acquire(2, TimeUnit.SECONDS));  
  
        // 释放一个许可  
        semaphore.returnLease(lease);  
        semaphore2.returnLease(lease2);  
  
        /**  
         * 释放了两个许可,相当于信号量semaphore-2,可以再重新申请2个许可。  
         */  
        Assert.assertNotNull(semaphore.acquire());  
        Assert.assertNotNull(semaphore2.acquire());  
  
        // 释放多个许可  
        semaphore.returnAll(leases);  
        semaphore2.returnAll(leases2);  
    }  
  
}

6.3 相关API

  • InterProcessSemaphoreV2
    • There are two modes for determining the max leases for the semaphore. In the first mode the max leases is a convention maintained by the users of a given path. In the second mode a {@link SharedCountReader} is used as the method for semaphores of a given path to determine the max leases. 两种方式决定信号量的最大值

      • public InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases)
      • public InterProcessSemaphoreV2(CuratorFramework client, String path, SharedCountReader count)
    • If a {@link SharedCountReader} is not used, no internal checks are done to prevent Process A acting as if there are 10 leases and Process B acting as if there are 20. Therefore, make sure that all instances in all processes use the same numberOfLeases value. 使用SharedCountReader实现所有进程共享相同数量的信号量最大值,而不需要依赖人工检查。

    • The various acquire methods return {@link Lease} objects that represent acquired leases. Clients must take care to close lease objects (ideally in a finally block) else the lease will be lost. However, if the client session drops (crash, etc.), any leases held by the client are automatically closed and made available to other clients.
      各种获取方法返回代表已获得租赁的租赁对象。 客户必须注意关闭租赁对象(最好是在finally块中关闭),否则租赁将丢失。 但是,如果客户端会话掉线(崩溃等),客户端持有的任何租约都会自动关闭并提供给其他客户端。


网站公告

今日签到

点亮在社区的每一天
去签到