zookeeper实现分布式获取全局唯一自增ID的案例。

发布于:2025-05-09 ⋅ 阅读:(18) ⋅ 点赞:(0)

项目结构

所有配置写在 application.yml 文件中,代码进行了拆分,加入了相关依赖。

1. pom.xml 依赖

<dependencies>
  <dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.7.1</version>
  </dependency>
</dependencies>

2. application.yml 配置文件

src/main/resources/application.yml 文件中配置 ZooKeeper 服务器地址、初始 workerId 路径等信息:

zookeeper:
  server: "localhost:2181"
  counter-path: "/counter"
  worker-id-path: "/workerId"

3. 创建 ZookeeperConfig 配置类

配置类用于初始化 Zookeeper 连接,并从配置文件读取参数。

import org.apache.zookeeper.ZooKeeper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration

public class ZookeeperConfig {

    @Value("${zookeeper.server}")
    private String zkServer;

    @Bean
    public ZooKeeper zooKeeper() throws Exception {
        System.out.println("Zookeeper server: " + zkServer);
        return new ZooKeeper(zkServer, 3000, event ->
                System.out.println("Watcher triggered: " + event.getType()));
    }
}
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Component
@ConfigurationProperties(prefix = "zookeeper")
public class ZookeeperProperties {

    private String server;
    private String counterPath;
    private String workerIdPath;
    public String getServer() {
        return server;
    }

    public void setServer(String server) {
        this.server = server;
    }

    public String getCounterPath() {
        return counterPath;
    }

    public void setCounterPath(String counterPath) {
        this.counterPath = counterPath;
    }

    public String getWorkerIdPath() {
        return workerIdPath;
    }

    public void setWorkerIdPath(String workerIdPath) {
        this.workerIdPath = workerIdPath;
    }
}

4. 创建 IDGeneratorService 服务类

这是 ID 生成的核心服务类,包含简易自增 ID 和雪花算法实现。

import com.example.client.redis_test.snowflake.SnowflakeIdGenerator;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.springframework.stereotype.Service;


import java.util.Random;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

@Service
public class IDGeneratorService {

    private static final Lock lock = new ReentrantLock();
    private final ZooKeeper zk;
    private final ZookeeperProperties zookeeperProperties;
    private final SnowflakeIdGenerator idGenerator;

    private final long workerId;

    private final long dataCenterId;

    public IDGeneratorService(ZooKeeper zk, ZookeeperProperties zookeeperProperties) throws Exception {
        this.zk = zk;
        this.zookeeperProperties = zookeeperProperties;
        this.workerId = getWorkerId();
        Random random = new Random();
        dataCenterId = random.nextInt(32);
        System.out.println("dataCenterId = " + dataCenterId);
        this.idGenerator = new SnowflakeIdGenerator(workerId, dataCenterId);

    }

    // 获取下一个自增ID
    public long getNextId() throws Exception {
        lock.lock();
        try {
            String counterPath = zookeeperProperties.getCounterPath();
            // 检查路径是否存在,如果不存在则创建
            Stat stat = zk.exists(counterPath, false);
            if (stat == null) {
                // 如果路径不存在,则创建该路径,并设置初始值为 1
                zk.create(counterPath, "1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }

            // 获取当前值
            byte[] data = zk.getData(counterPath, false, null);
            long currentId = Long.parseLong(new String(data));

            // 计算下一个 ID
            long nextId = currentId + 1;

            // 更新路径数据
            zk.setData(counterPath, String.valueOf(nextId).getBytes(), -1);

            return nextId;
        } finally {
            lock.unlock();
        }
    }


    // 从 Zookeeper 获取并分配唯一的 workerId
    private long getWorkerId() throws Exception {
        String workerIdPath = zookeeperProperties.getWorkerIdPath();
        Stat stat = zk.exists(workerIdPath, false);
        if (stat == null) {
            zk.create(workerIdPath, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        byte[] data = zk.getData(workerIdPath, false, null);
        long currentWorkerId = Long.parseLong(new String(data));

        if (currentWorkerId <= 31) { // 最大 Worker ID 为 31
            zk.setData(workerIdPath, String.valueOf(currentWorkerId + 1).getBytes(), -1);
            return currentWorkerId;
        } else {
            throw new RuntimeException("Exceeded max worker ID.");
        }
    }

    // 生成 Snowflake ID
    public long generateSnowflakeId() {
        return idGenerator.nextId();
    }
}
public class SnowflakeIdGenerator {
    private final long workerId;
    private final long dataCenterId;
    private final long sequenceBits = 12L;
    private final long workerIdBits = 5L;
    private final long dataCenterIdBits = 5L;

    private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
    private final long maxDataCenterId = -1L ^ (-1L << dataCenterIdBits);

    private final long workerIdShift = sequenceBits;
    private final long dataCenterIdShift = sequenceBits + workerIdBits;
    private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits;

    private final long sequenceMask = -1L ^ (-1L << sequenceBits);

    private long sequence = 0L;
    private long lastTimestamp = -1L;

    public SnowflakeIdGenerator(long workerId, long dataCenterId) {
        if (workerId > maxWorkerId || workerId < 0) {
            throw new IllegalArgumentException("workerId invalid");
        }
        if (dataCenterId > maxDataCenterId || dataCenterId < 0) {
            throw new IllegalArgumentException("dataCenterId invalid");
        }
        this.workerId = workerId;
        this.dataCenterId = dataCenterId;
    }

    public synchronized long nextId() {
        long timestamp = timeGen();
        if (timestamp < lastTimestamp) {
            throw new RuntimeException("Clock moved backwards.");
        }

        if (lastTimestamp == timestamp) {
            sequence = (sequence + 1) & sequenceMask;
            if (sequence == 0) {
                timestamp = tilNextMillis(lastTimestamp);
            }
        } else {
            sequence = 0L;
        }

        lastTimestamp = timestamp;
        return ((timestamp - 1609459200000L) << timestampLeftShift)
                | (dataCenterId << dataCenterIdShift)
                | (workerId << workerIdShift)
                | sequence;
    }

    private long tilNextMillis(long lastTimestamp) {
        long timestamp = timeGen();
        while (timestamp <= lastTimestamp) {
            timestamp = timeGen();
        }
        return timestamp;
    }

    private long timeGen() {
        return System.currentTimeMillis();
    }
}

5. 创建 IDGeneratorController 控制器

用于暴露一个简单的 API 接口,返回生成的 ID。


import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@RestController
public class IDGeneratorController {
    @Resource
    private IDGeneratorService idGeneratorService;

    public IDGeneratorController(IDGeneratorService idGeneratorService) {
        this.idGeneratorService = idGeneratorService;
    }

    @GetMapping("/next-id")
    public long getNextId() throws Exception {
        return idGeneratorService.getNextId();
    }

    @GetMapping("/snowflake-id")
    public long getSnowflakeId() {
        return idGeneratorService.generateSnowflakeId();
    }

    @PostConstruct
    public void test() throws InterruptedException {
        System.out.println("==== Zookeeper 雪花ID多线程测试开始 ====");
        int threadCount = 20;         // 模拟并发线程数
        int idsPerThread = 1000000;     // 每个线程生成的 ID 数

        Set<Long> allIds = ConcurrentHashMap.newKeySet(); // 用于去重检查
        CountDownLatch latch = new CountDownLatch(threadCount);
        long start = System.currentTimeMillis();
        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
        for (int i = 0; i < threadCount; i++) {
            executor.execute(() -> {
                for (int j = 0; j < idsPerThread; j++) {
                    long id = idGeneratorService.generateSnowflakeId();
                    allIds.add(id);
                }
                latch.countDown();
            });
        }

        latch.await();
        long end = System.currentTimeMillis();
        executor.shutdown();

        int totalGenerated = threadCount * idsPerThread;
        int uniqueCount = allIds.size();

        System.out.println("==== 雪花ID多线程测试报告 ====");
        System.out.println("线程数: " + threadCount);
        System.out.println("每线程生成ID数: " + idsPerThread);
        System.out.println("总生成ID数: " + totalGenerated);
        System.out.println("唯一ID数: " + uniqueCount);
        System.out.println("重复ID数: " + (totalGenerated - uniqueCount));
        System.out.println("执行耗时(ms): " + (end - start));
        System.out.printf("吞吐量: %.2f 万ID/秒%n", totalGenerated / ((end - start) / 1000.0) / 10000);
        System.out.println("============================");
    }
}

6. 启动与测试

启动 Spring Boot 应用,访问以下接口来测试 ID 生成:

  • GET /next-id:获取自增 ID。
  • GET /snowflake-id:获取雪花算法生成的 ID。

总结

这个 Spring Boot 项目利用 Zookeeper 来确保 ID 的全局唯一性,并通过配置文件来控制相关的参数。主要拆分为以下几个部分:

  • ZookeeperConfig:Zookeeper 连接配置。
  • IDGeneratorService:包含 ID 生成逻辑,包括自增 ID 和雪花算法。
  • IDGeneratorController:暴露接口供外部调用。

网站公告

今日签到

点亮在社区的每一天
去签到