从零搭建微服务项目Pro(第7-1章——分布式雪花算法)

发布于:2025-04-03 ⋅ 阅读:(30) ⋅ 点赞:(0)

前言:

        本章介绍了雪花算法的背景知识,讲解了雪花算法的实现原理,给出其基础实现代码,以及对基础版算法的思考,并根据思考给出一种结合redis自动分配workerID以及缓存队列优化的算法实现。

代码链接如下:

wlf728050719/SpringCloudPro7-1https://github.com/wlf728050719/SpringCloudPro7-1

分表策略:   

     在微服务项目中,在高并发情况下往往需要进行分库分表减小数据库压力,其中分表策略有以下两种:

分表策略

 水平分表

垂直分表

优点

性能提升:通过将数据分散到多个表中,可以减少单个表的行数,从而提高查询性能,尤其是在处理大量数据时。

负载均衡:可以将不同的表分布到不同的数据库服务器上,实现负载均衡,提升系统的可扩展性。

简化数据管理:可以根据数据的访问模式和生命周期对数据进行分区,便于管理和归档。

优化查询性能:将常用的列放在同一张表中,可以减少查询时的数据量,提高查询效率。

提高数据安全性:可以将敏感数据(如用户密码、个人信息)与其他数据分开存储,增强数据安全性。

简化表结构:通过将表拆分为多个小表,可以使每个表的结构更加清晰,便于维护。

缺点

复杂性增加:在查询时可能需要跨多个表进行JOIN操作,增加了查询的复杂性。

数据分布不均:如果分片规则设计不当,可能导致某些表的数据量过大,而其他表则较小,影响性能。

增加JOIN操作:在查询时,可能需要对多个表进行JOIN,增加了查询的复杂性和性能开销。

数据一致性问题:在多个表之间维护数据一致性可能会变得更加复杂,尤其是在更新操作时。

适用

适合于数据量大、行数多的场景,尤其是当数据的访问模式比较均匀时。它能够有效地提高查询性能和系统的可扩展性。

适合于列数多、某些列访问频率高的场景,尤其是当需要优化特定查询时。它能够减少查询时的数据量,提高性能。

分片策略:

其中,水平分表策略被广泛应用于商城、外卖、银行这类频繁交易导致数据大量产生的系统。对于一项新产生的数据,其应当被放到哪个数据库\表有以下方法:

        哈希分片:通过对某个字段(如用户ID)进行哈希运算,确定数据存储的表。例如,可以使用 table_index = hash(user_id) % number_of_tables 来决定数据应该存储在哪个表中。

        范围分片:根据某个字段的值的范围将数据分到不同的表中。例如,用户ID在1到1000的存储在表A,1001到2000的存储在表B,依此类推。

        轮询分片:依次将数据写入不同的表中,适合数据量比较均衡的情况。

轮询分片由于各服务器不能同步或同步数据代价较大(分布式中间件如Redis、RabbitMQ或远程过程调用RPC),显然使用哈希或范围分片这种单纯判断某个字段范围或取模操作的开销更小。同时由于各种表结构不同,为了确保分片的操作对于每种表都能统一处理,自然而然会选取主键字段进行分片

主键生成策略:

同时又会引申出另一个问题,如何生成一个数据的主键。常用的策略有以下几种:

自增主键
自增主键在简单场景中表现良好,因其易于实现且具有良好的可读性。然而,在高并发环境中,自增主键可能会导致性能瓶颈。由于自增主键是顺序生成的,多个并发插入操作可能会争用同一资源,导致锁竞争和延迟。

UUID(通用唯一标识符)
UUID提供了全球唯一性,适合于分布式系统中数据的唯一标识。由于UUID是随机生成的,能够有效避免主键冲突,适合高并发场景。然而,UUID的缺点在于其长度较长,且在B+树中插入时可能导致较多的随机写入,增加了树的高度,降低了查询性能。因此,虽然UUID在分布式环境中具有优势,但在性能上可能不如自增主键。

组合主键
组合主键通常由多个字段组成,适用于需要唯一性约束的复杂数据模型。虽然组合主键在某些情况下能够提供灵活性,但在高并发环境中,其复杂性可能导致性能下降。组合主键的插入和查询操作相对复杂,可能会增加数据库的开销,尤其是在B+树中进行节点分裂时。

雪花算法
雪花算法是一种生成唯一ID的算法,能够在分布式系统中高效地生成有序的ID。它结合了时间戳、机器ID和序列号,确保生成的ID在全局范围内唯一且有序。由于雪花算法生成的ID是基于时间的,能够有效地减少节点分裂的概率,保持B+树的平衡性,从而提高查询性能。这使得雪花算法在高并发和分布式场景中成为一种理想的选择。

通过上面的思考引出本章的主角:雪花算法。

以及本专栏会持续更新微服务项目,每一章的项目都会基于前一章项目进行功能的完善,欢迎小伙伴们关注!同时如果只是对单章感兴趣也不用从头看,只需下载前一章项目即可,每一章都会有前置项目准备部分,跟着操作就能实现上一章的最终效果,当然如果是一直跟着做可以直接跳过这一部分。专栏目录链接如下,其中Base篇为基础微服务搭建,Pro篇为复杂模块实现。

从零搭建微服务项目(全)-CSDN博客https://blog.csdn.net/wlf2030/article/details/145799620?spm=1001.2014.3001.5501


雪花算法原理:

ID结构

雪花算法生成的ID是一个64位的长整型数字,由以下几部分组成:

| 1位未使用 | 41位时间戳 | 10位工作机器ID | 12位序列号 |

符号位:始终为0,保证生成的ID为正数。

时间戳:毫秒级的时间戳,可以使用约69年(1L << 41) / (1000L * 60 * 60 * 24 * 365) ≈ 69年。

工作机器ID:可以配置机器ID,支持最多1024个节点,通常分为5位数据中心ID和5位机器ID。

序列号:每毫秒生成的序列号,支持每节点每毫秒生成4096个ID。

具体流程:

1. 初始化阶段

  • 配置工作机器ID,通常分为数据中心ID和机器ID两部分

  • 初始化序列号为0

  • 记录一个初始时间戳(通常为算法开始使用的时间,称为epoch)

  • 设置最后时间戳记录值为-1

2. ID生成主流程

步骤1:获取当前时间戳

  • 获取当前的系统时间,转换为毫秒级时间戳

  • 计算与初始时间戳(epoch)的差值,得到相对时间戳

步骤2:检查时钟回拨

  • 将当前时间戳与上次生成ID的时间戳比较:

    • 如果当前时间戳 < 最后时间戳:说明发生了时钟回拨

      • 可以抛出异常或采取其他处理措施(如等待时钟追上)

步骤3:处理同一毫秒的请求

  • 如果当前时间戳 == 最后时间戳:

    • 序列号自增1

    • 检查序列号是否超过最大值(通常是4095):

      • 如果超过:等待直到下一毫秒,然后重置序列号为0

      • 如果未超过:继续使用当前序列号

  • 如果当前时间戳 > 最后时间戳:

    • 重置序列号为0(新的毫秒开始)

步骤4:更新最后时间戳

  • 将最后时间戳记录值更新为当前时间戳

步骤5:组合ID各部分

  • 将各组成部分按位组合:

    1. 将相对时间戳左移22位(留出后面的22位)

    2. 将数据中心ID左移17位

    3. 将机器ID左移12位

    4. 保留序列号的12位

  • 通过位或运算(|)将这些部分组合起来

步骤6:返回生成的ID

  • 将组合后的64位长整型数字作为生成的ID返回

3. 异常处理流程

时钟回拨处理

  • 轻微回拨(毫秒级):可以短暂等待后重试

  • 严重回拨:需要记录警告或报警,可能需要人工干预

序列号耗尽处理

  • 当同一毫秒内序列号用完时:

    • 线程休眠到下一毫秒

    • 重置序列号从0开始


基础实现代码

以下为基础实现代码

package cn.bit.pro7_1.snow_flake;

public class BaseSnowFlakeIdGenerator {

    /**
     * 机器ID  2进制5位
     */
    private long workerId;

    /**
     * 机房ID 2进制5位
     */
    private long datacenterId;

    /**
     * 代表一毫秒内生成的多个id的最新序号  12位,范围从0到4095
     */
    private long sequence;

    /**
     * 设置一个时间初始值(这个用自己业务系统上线的时间)    2^41 - 1   差不多可以用69年
     */
    private long twepoch = 1585644268888L;

    /**
     * 5位的机器id
     */
    private long workerIdBits = 5L;

    /**
     * 5位的机房id
     */
    private long datacenterIdBits = 5L;

    /**
     * 每毫秒内产生的id数 2 的 12次方
     */
    private long sequenceBits = 12L;

    /**
     * 这个是二进制运算,就是5 bit最多只能有31个数字,也就是说机器id最多只能是32以内
     */
    private long maxWorkerId = -1L ^ (-1L << workerIdBits);

    /**
     * 这个是一个意思,就是5 bit最多只能有31个数字,机房id最多只能是32以内
     */
    private long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);

    /**
     * 机器ID向左移12位
     */
    private long workerIdShift = sequenceBits;

    /**
     * 机房ID向左移17位
     */
    private long datacenterIdShift = sequenceBits + workerIdBits;

    /**
     *时间戳向左移22位
     */
    private long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;

    /**
     * 生成序列的掩码,这里为4095 (0b111111111111=0xfff=4095)
     */
    private long sequenceMask = -1L ^ (-1L << sequenceBits);

    /**
     * 上次生成ID的时间截,记录产生时间毫秒数,判断是否是同1毫秒
     */
    private long lastTimestamp = -1L;

    /**
     * 构造函数
     * @param workerId
     * @param datacenterId
     * @param sequence
     */
    public BaseSnowFlakeIdGenerator(long workerId, long datacenterId, long sequence){

        // 检查机房id和机器id是否超过最大值,不能小于0
        if (workerId>maxWorkerId||workerId<0){
            throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0",maxWorkerId));
        }

        if (datacenterId>maxDatacenterId||datacenterId<0){
            throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0",maxDatacenterId));
        }

        this.workerId=workerId;
        this.datacenterId=datacenterId;
        this.sequence=sequence;
    }

    /**
     * 这个是核心方法,通过调用nextId()方法,让当前这台机器上的snowflake算法程序生成一个全局唯一的id
     * @return
     */
    public synchronized long nextId(){
        // 获取当前的时间戳,单位是毫秒
        long timestamp = timeGen();

        //如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常
        if (timestamp < lastTimestamp){
            System.err.printf("clock is moving backwards. Rejecting requests until %d.",lastTimestamp);
            throw new RuntimeException(String.format("Clock moved backwards. Refusing to generate id for %d milliseconds",lastTimestamp-timestamp));
        }

        // 假设在同一个毫秒内,又发送了一个请求生成一个id  此时就得把seqence序号给递增1,最多就是4096
        if (lastTimestamp == timestamp) {
            // 这个意思是说一个毫秒内最多只能有4096个数字,无论你传递多少进来,
            //这个位运算保证始终就是在4096这个范围内,避免你自己传递个sequence超过了4096这个范围
            sequence = (sequence + 1) & sequenceMask;
            //当某一毫秒的时间,产生的id数 超过4095,系统会进入等待,直到下一毫秒,系统继续产生ID
            if (sequence == 0) {
                // 阻塞到下一个毫秒,获得新的时间戳
                timestamp = tilNextMillis(lastTimestamp);
            }
        }else{ // 时间戳改变,毫秒内序列重置
            sequence=0;
        }

        //上次生成ID的时间截
        lastTimestamp = timestamp;

        // 这儿就是最核心的二进制位运算操作,生成一个64bit的id
        // 先将当前时间戳左移,放到41 bit那儿;将机房id左移放到5 bit那儿;将机器id左移放到5 bit那儿;将序号放最后12 bit
        // 最后拼接起来成一个64 bit的二进制数字,转换成10进制就是个long型
        return ( (timestamp-twepoch) << timestampLeftShift )|
                ( datacenterId<<datacenterIdShift )|
                ( workerId<<workerIdShift )|sequence;
    }

    /**
     * 当某一毫秒的时间,产生的id数 超过4095,系统会进入等待,直到下一毫秒,系统继续产生ID
     * @param lastTimestamp 上次生成ID的时间截
     * @return 当前时间戳
     */
    private long tilNextMillis(long lastTimestamp) {
        long timestamp=timeGen();
        while(timestamp<=lastTimestamp){
            timestamp=timeGen();
        }
        return timestamp;
    }

    /**
     * 获取当前时间戳
     * @return 当前时间(毫秒)
     */
    private long timeGen() {
        return System.currentTimeMillis();
    }
}

思考

对于基础实现进行以下思考:

1.是否不同服务同时生成的id保证唯一性?同一服务不同实例同时生成的id保证唯一性?

需要根据系统对机器id的如何界定,无论是集群部署或是单机运行,由于不同服务所在IOC容器不同,并非使用同一生成器(即使单机运行)只要配置工作机器码相同同时生成的id仍有可能相同,如果不同服务同时生成的id保证唯一性,需要在10位机器工作码中拿取若干位用于区分服务,但由于微服务架构中一张表应当只由一个服务维护,其他服务远程调用该服务接口进行操作,所以不会特意对服务进行区分,生成相同id只要不在同一张表中即可。对于一个服务的多个运行实例则需要拿出若干字段进行实例的区分,实际上机器工作码的实质就是对同一服务的不同实例进行区分

2.同一服务同一实例不同线程生成的id是否保证唯一性?

如果将该类注册为Bean,SpringBean默认为单例对象,由于添加了synchronized关键字,核心函数同时只能被一个线程访问,其余线程阻塞等待。


改进代码

在基于上述思考下,对工作机器码进行以下划分,其中5位workerID,用于区分同一集群下不同的服务实例,这里的集群特指使用同一redis数据库的服务,datacenterID则用于区分不同集群,或者说不同的redis数据库。

在上述规定下,引入服务自动注册worderID的机制,服务启动时会根据服务名在redis中寻找没有使用过的workerID,寻找后占用该键,值为当前ip和线程号,方便运维定位占用的服务实例,通过心跳线程定期续期键,当服务关闭或意外终止后,由于没有续期键超时自动释放。但datacenterID仍需要手动在配置文件中指定,后续可结合nacos配置中心或数据库实现自动配置。

引入缓存机制,当缓存队列中存在预先生成的数据时优先从缓存中读取数据。不过这样就无法从主键id中解析出真实的id生成时间戳信息,但时间信息一般数据库都有专门的字段存储,同时还能缓解一定的隐私泄密问题。

优化生成器代码:

package cn.bit.pro7_1.snow_flake;

import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;

import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j
public class DistributedSnowflakeIdGenerator {
    // ==================== 常量定义 ====================
    private static final long DATACENTER_ID_BITS = 5L;
    private static final long WORKER_ID_BITS = 5L;
    private static final long SEQUENCE_BITS = 12L;

    private static final long MAX_DATACENTER_ID = ~(-1L << DATACENTER_ID_BITS);
    private static final long MAX_WORKER_ID = ~(-1L << WORKER_ID_BITS);
    private static final long MAX_SEQUENCE = ~(-1L << SEQUENCE_BITS);

    private static final long WORKER_ID_SHIFT = SEQUENCE_BITS;
    private static final long DATACENTER_ID_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS;
    private static final long TIMESTAMP_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS + DATACENTER_ID_BITS;

    private static final long DEFAULT_EPOCH = 1577836800000L; // 2020-01-01 00:00:00
    private static final int DEFAULT_HEARTBEAT_INTERVAL = 30; // 心跳间隔(秒)
    private static final int DEFAULT_LOCK_TIMEOUT = 60; // 锁超时时间(秒)
    private static final int DEFAULT_HEARTBEAT_EXPIRE = 90; // 心跳过期时间(秒)
    private static final int MAX_TOLERATE_BACKWARD_MS = 5; // 最大容忍时钟回拨(毫秒)
    private static final int DEFAULT_BUFFER_SIZE = 20000; // 缓冲区大小
    private static final int DEFAULT_BUFFER_THRESHOLD = 10000; // 缓冲区阈值

    // ==================== 实例字段 ====================
    private final long epoch; // 起始时间戳
    private final long datacenterId; // 数据中心ID
    private final long workerId; // 工作节点ID
    private final AtomicLong lastTimestamp = new AtomicLong(-1L); // 上次生成ID的时间戳
    private final AtomicLong sequence = new AtomicLong(0L); // 序列号
    private final AtomicLong generatedIds = new AtomicLong(0); // 生成的ID总数统计
    private final AtomicLong clockBackwardsEvents = new AtomicLong(0); // 时钟回拨事件计数

    private final RedisTemplate<String, Object> redisTemplate; // Redis操作模板
    private final String workerKeyPrefix; // Redis中worker键的前缀
    private final ScheduledExecutorService heartbeatExecutor; // 心跳执行器
    private final String nodeIdentifier; // 节点唯一标识符
    private final int heartbeatInterval; // 心跳间隔时间(秒)
    private final int lockTimeout; // 锁超时时间(秒)
    private final int heartbeatExpire; // 心跳过期时间(秒)

    private final BlockingQueue<Long> idBuffer; // ID缓冲区
    private final ExecutorService bufferExecutor; // 缓冲区填充执行器
    private final int bufferSize; // 缓冲区大小
    private final int bufferThreshold; // 缓冲区填充阈值

    // 并发控制锁
    private final Lock idGenerationLock = new ReentrantLock();

    public DistributedSnowflakeIdGenerator(String serviceName, RedisTemplate<String, Object> redisTemplate,
                                           long datacenterId) throws Exception {
        this(serviceName, redisTemplate, datacenterId, DEFAULT_EPOCH,
                DEFAULT_HEARTBEAT_INTERVAL, DEFAULT_LOCK_TIMEOUT, DEFAULT_HEARTBEAT_EXPIRE,
                DEFAULT_BUFFER_SIZE, DEFAULT_BUFFER_THRESHOLD);
    }

    public DistributedSnowflakeIdGenerator(String serviceName, RedisTemplate<String, Object> redisTemplate,
                                           long datacenterId, long epoch, int heartbeatInterval,
                                           int lockTimeout, int heartbeatExpire, int bufferSize,
                                           int bufferThreshold) throws Exception {
        // 参数校验
        validateParameters(datacenterId, serviceName);

        this.epoch = epoch;
        this.datacenterId = datacenterId;
        this.redisTemplate = redisTemplate;
        this.heartbeatInterval = heartbeatInterval;
        this.lockTimeout = lockTimeout;
        this.heartbeatExpire = heartbeatExpire;
        this.bufferSize = bufferSize;
        this.bufferThreshold = bufferThreshold;
        this.idBuffer = new LinkedBlockingQueue<>(bufferSize);
        this.bufferExecutor = Executors.newSingleThreadExecutor();

        // 生成节点唯一标识符(IP+进程ID)
        this.nodeIdentifier = generateNodeIdentifier();
        log.info("生成节点唯一标识符: {}", nodeIdentifier);

        // 构造Redis中worker键的前缀
        this.workerKeyPrefix = String.format("snowflake:%s:dc-%d:worker-", serviceName, datacenterId);
        log.info("初始化Redis键前缀: {}", workerKeyPrefix);

        // 注册并获取worker ID
        this.workerId = registerAndGetWorkerId();
        log.info("成功分配worker ID: {}, 数据中心ID: {}", workerId, datacenterId);

        // 启动心跳维护和缓冲区填充
        this.heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();
        startHeartbeat();
        startBufferRefill();

        log.info("雪花ID生成器初始化完成, 时间戳起点: {}, 缓冲区大小: {}, 填充阈值: {}",
                epoch, bufferSize, bufferThreshold);
    }

    private void validateParameters(long datacenterId, String serviceName) {
        if (datacenterId < 0 || datacenterId > MAX_DATACENTER_ID) {
            throw new IllegalArgumentException(
                    String.format("数据中心ID不能大于 %d 或小于 0", MAX_DATACENTER_ID));
        }
        if (serviceName == null || serviceName.isEmpty()) {
            throw new IllegalArgumentException("服务名称不能为空");
        }
    }

    private String generateNodeIdentifier() throws Exception {
        String hostAddress = InetAddress.getLocalHost().getHostAddress();
        String processId = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
        return hostAddress + "-" + processId;
    }

    private long registerAndGetWorkerId() {
        String lockKey = getWorkerLockKey();
        log.info("尝试获取worker ID分配锁, 锁键: {}", lockKey);

        Boolean lockAcquired = redisTemplate.opsForValue().setIfAbsent(lockKey, nodeIdentifier,
                lockTimeout, TimeUnit.SECONDS);

        if (lockAcquired != null && lockAcquired) {
            log.info("成功获取worker ID分配锁");
            try {
                long allocatedWorkerId = allocateWorkerId();
                log.info("分配worker ID成功: {}", allocatedWorkerId);
                return allocatedWorkerId;
            } finally {
                redisTemplate.delete(lockKey);
                log.info("释放worker ID分配锁");
            }
        }
        log.error("获取worker ID分配锁失败");
        throw new IllegalStateException("无法获取worker ID分配锁");
    }

    private String getWorkerLockKey() {
        return workerKeyPrefix + "lock";
    }

    private String getWorkerKey(long workerId) {
        return workerKeyPrefix + workerId;
    }

    private long allocateWorkerId() {
        log.info("开始分配worker ID...");
        Set<Long> usedWorkerIds = new HashSet<>();

        // 扫描已使用的worker ID
        for (long id = 0; id <= MAX_WORKER_ID; id++) {
            if (Boolean.TRUE.equals(redisTemplate.hasKey(getWorkerKey(id)))) {
                usedWorkerIds.add(id);
            }
        }
        log.info("已使用的worker ID: {}", usedWorkerIds);

        // 查找可用的worker ID
        for (long id = 0; id <= MAX_WORKER_ID; id++) {
            if (!usedWorkerIds.contains(id)) {
                redisTemplate.opsForValue().set(getWorkerKey(id), nodeIdentifier,
                        heartbeatExpire, TimeUnit.SECONDS);
                log.info("成功注册worker ID: {}, 节点标识: {}, 过期时间: {}秒",
                        id, nodeIdentifier, heartbeatExpire);
                return id;
            }
        }

        log.error("数据中心 {} 中没有可用的worker ID (最大: {})", datacenterId, MAX_WORKER_ID);
        throw new IllegalStateException(
                String.format("数据中心 %d 中没有可用的worker ID (最大: %d)", datacenterId, MAX_WORKER_ID));
    }

    private void startHeartbeat() {
        String workerKey = getWorkerKey(workerId);
        log.info("启动心跳维护, worker键: {}, 间隔: {}秒", workerKey, heartbeatInterval);

        heartbeatExecutor.scheduleAtFixedRate(() -> {
            try {
                redisTemplate.expire(workerKey, heartbeatExpire, TimeUnit.SECONDS);
                log.debug("更新worker {} 的心跳", workerId);
            } catch (Exception e) {
                log.error("更新心跳失败: {}", e.getMessage());
            }
        }, 0, heartbeatInterval, TimeUnit.SECONDS);
    }

    private void startBufferRefill() {
        log.info("启动ID缓冲区填充线程, 缓冲区大小: {}, 填充阈值: {}", bufferSize, bufferThreshold);

        bufferExecutor.execute(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    if (idBuffer.size() <= bufferThreshold) {
                        refillBuffer();
                    }
                    TimeUnit.MILLISECONDS.sleep(1);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                } catch (Exception e) {
                    log.error("填充ID缓冲区错误: {}", e.getMessage());
                }
            }
        });
    }

    private void refillBuffer() {
        int refillCount = bufferSize - idBuffer.size();
        log.info("开始填充ID缓冲区, 需要填充数量: {}", refillCount);

        for (int i = 0; i < refillCount; i++) {
            try {
                long id = generateIdInternal();
                idBuffer.put(id);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            } catch (Exception e) {
                log.error("为缓冲区生成ID失败: {}", e.getMessage());
                break;
            }
        }

        log.info("ID缓冲区填充完成, 当前缓冲区大小: {}", idBuffer.size());
    }

    public long nextId() {
        try {
            Long bufferedId = idBuffer.poll(100, TimeUnit.MILLISECONDS);
            if (bufferedId != null) {
                return bufferedId;
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return generateIdInternal();
    }

    private long generateIdInternal() {
        idGenerationLock.lock();
        try {
            long currentTimestamp = timeGen();
            long lastTimestampValue = lastTimestamp.get();

            // 处理时钟回拨
            if (currentTimestamp < lastTimestampValue) {
                currentTimestamp = handleClockBackwards(currentTimestamp, lastTimestampValue);
            }

            // 处理同一毫秒内请求
            if (currentTimestamp == lastTimestampValue) {
                long sequenceValue = sequence.incrementAndGet();
                if (sequenceValue > MAX_SEQUENCE) {
                    currentTimestamp = waitNextMillis(lastTimestampValue);
                    sequence.set(0);
                    sequenceValue = 0;
                }
                sequence.set(sequenceValue);
            } else {
                // 新毫秒,重置序列号
                sequence.set(0);
            }

            lastTimestamp.set(currentTimestamp);
            generatedIds.incrementAndGet();

            return ((currentTimestamp - epoch) << TIMESTAMP_SHIFT)
                    | (datacenterId << DATACENTER_ID_SHIFT)
                    | (workerId << WORKER_ID_SHIFT)
                    | sequence.get();
        } finally {
            idGenerationLock.unlock();
        }
    }

    private long handleClockBackwards(long currentTimestamp, long lastTimestamp) {
        long offset = lastTimestamp - currentTimestamp;
        clockBackwardsEvents.incrementAndGet();

        log.warn("检测到时钟回拨, 偏移量: {} 毫秒", offset);

        if (offset <= MAX_TOLERATE_BACKWARD_MS) {
            try {
                log.info("时钟回拨在容忍范围内, 等待 {} 毫秒", offset);
                TimeUnit.MILLISECONDS.sleep(offset);
                currentTimestamp = timeGen();
                if (currentTimestamp < lastTimestamp) {
                    throw new IllegalStateException("等待后时钟仍然回拨");
                }
                return currentTimestamp;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException("处理时钟回拨时线程被中断");
            }
        }
        throw new IllegalStateException(
                String.format("时钟回拨过大: %d 毫秒, 拒绝生成ID", offset));
    }

    private long waitNextMillis(long lastTimestamp) {
        long timestamp = timeGen();
        while (timestamp <= lastTimestamp) {
            Thread.yield();
            timestamp = timeGen();
        }
        return timestamp;
    }

    private long timeGen() {
        return System.currentTimeMillis();
    }

    @PreDestroy
    public void preDestroy() {
        cleanup();
    }

    private void cleanup() {
        log.info("开始关闭雪花ID生成器...");
        try {
            // 关闭缓冲区执行器
            bufferExecutor.shutdown();
            if (!bufferExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
                bufferExecutor.shutdownNow();
            }

            // 关闭心跳执行器
            heartbeatExecutor.shutdown();
            if (!heartbeatExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
                heartbeatExecutor.shutdownNow();
            }
            log.info("雪花ID生成器关闭完成. 共生成ID数量: {}", generatedIds.get());
        } catch (Exception e) {
            log.error("清理过程中发生错误: {}", e.getMessage());
        }
    }

    // ==================== 监控指标方法 ====================
    public long getGeneratedIdsCount() {
        return generatedIds.get();
    }

    public long getClockBackwardsEventsCount() {
        return clockBackwardsEvents.get();
    }

    public int getBufferSize() {
        return idBuffer.size();
    }
}

Redis配置:

package cn.bit.pro7_1.snow_flake;

import lombok.AllArgsConstructor;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@EnableCaching
@Configuration
@AllArgsConstructor
@AutoConfigureBefore(RedisAutoConfiguration.class)
public class RedisTemplateConfig {
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());
        redisTemplate.setHashValueSerializer(new JdkSerializationRedisSerializer());
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        return redisTemplate;
    }

}

生成器config:

(可定义在common模块,通过.factories将生成器bean导出给其他服务使用)

package cn.bit.pro7_1.snow_flake;


import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.RedisTemplate;

@Configuration
@ConditionalOnClass(RedisTemplate.class)
@EnableConfigurationProperties(SnowflakeProperties.class)
public class SnowflakeAutoConfiguration {

    @Bean
    @ConditionalOnMissingBean
    public DistributedSnowflakeIdGenerator distributedSnowflakeIdGenerator(
            SnowflakeProperties properties,
            RedisTemplate<String, Object> redisTemplate) throws Exception {
        return new DistributedSnowflakeIdGenerator(
            properties.getApplicationName(),
                redisTemplate,
                properties.getDatacenterId()
        );
    }

}

配置文件定义类:

package cn.bit.pro7_1.snow_flake;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "snowflake")
@Data
public class SnowflakeProperties {
    private long datacenterId;
    private String applicationName;
}

表现层测试:

package cn.bit.pro7_1;

import cn.bit.pro7_1.snow_flake.BaseSnowFlakeIdGenerator;
import cn.bit.pro7_1.snow_flake.DistributedSnowflakeIdGenerator;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

@Slf4j
@RestController
@RequestMapping("/test")
@CrossOrigin(origins = "*")
@RequiredArgsConstructor
public class TestController {

    private final BaseSnowFlakeIdGenerator idGenerator1=new BaseSnowFlakeIdGenerator(0,0,0);

    @NonNull
    private DistributedSnowflakeIdGenerator idGenerator2;
    @GetMapping("/gen1/{threadCount}")
    public Map<String, Object> generateIds1(@PathVariable int threadCount) {
        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
        CountDownLatch latch = new CountDownLatch(threadCount);

        // 使用线程安全的集合存储结果
        ConcurrentHashMap<Long, Boolean> idMap = new ConcurrentHashMap<>(threadCount);
        AtomicInteger duplicateCount = new AtomicInteger(0);
        AtomicInteger successCount = new AtomicInteger(0);
        AtomicInteger errorCount = new AtomicInteger(0);

        long startTime = System.currentTimeMillis();

        // 提交1万个任务
        for (int i = 0; i < threadCount; i++) {
            executor.execute(() -> {
                try {
                    long id = idGenerator1.nextId();

                    // 检查是否重复
                    if (idMap.putIfAbsent(id, true) != null) {
                        duplicateCount.incrementAndGet();
                        log.warn("Duplicate ID detected: {}", id);
                    } else {
                        successCount.incrementAndGet();
                    }
                } catch (Exception e) {
                    errorCount.incrementAndGet();
                    log.error("Error generating ID: {}", e.getMessage());
                } finally {
                    latch.countDown();
                }
            });
        }

        try {
            // 等待所有线程完成,最多等待30秒
            if (!latch.await(30, TimeUnit.SECONDS)) {
                log.warn("Timeout waiting for all threads to complete");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.error("Test interrupted: {}", e.getMessage());
        }

        long endTime = System.currentTimeMillis();
        long duration = endTime - startTime;

        executor.shutdownNow(); // 立即关闭线程池

        // 统计结果
        Map<String, Object> result = new LinkedHashMap<>();
        result.put("totalThreads", threadCount);
        result.put("successCount", successCount.get());
        result.put("duplicateCount", duplicateCount.get());
        result.put("errorCount", errorCount.get());
        result.put("timeTakenMs", duration);
        result.put("throughput", (threadCount * 1000.0) / duration); // IDs per second
        result.put("uniqueIds", idMap.size());

        // 添加一些示例ID(前5个)
        result.put("sampleIds", idMap.keySet().stream().limit(5).collect(Collectors.toList()));

        // 结果分析
        if (duplicateCount.get() > 0) {
            result.put("result", "FAILED - Duplicate IDs detected!");
        } else if (errorCount.get() > 0) {
            result.put("result", "PARTIAL SUCCESS - Some errors occurred but no duplicates");
        } else {
            result.put("result", "SUCCESS - All IDs are unique");
        }

        return result;
    }

    @GetMapping("/gen2/{threadCount}")
    public Map<String, Object> generateIds2(@PathVariable int threadCount) {
        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
        CountDownLatch latch = new CountDownLatch(threadCount);

        // 使用线程安全的集合存储结果
        ConcurrentHashMap<Long, Boolean> idMap = new ConcurrentHashMap<>(threadCount);
        AtomicInteger duplicateCount = new AtomicInteger(0);
        AtomicInteger successCount = new AtomicInteger(0);
        AtomicInteger errorCount = new AtomicInteger(0);

        long startTime = System.currentTimeMillis();

        // 提交1万个任务
        for (int i = 0; i < threadCount; i++) {
            executor.execute(() -> {
                try {
                    long id = idGenerator2.nextId();

                    // 检查是否重复
                    if (idMap.putIfAbsent(id, true) != null) {
                        duplicateCount.incrementAndGet();
                        log.warn("Duplicate ID detected: {}", id);
                    } else {
                        successCount.incrementAndGet();
                    }
                } catch (Exception e) {
                    errorCount.incrementAndGet();
                    log.error("Error generating ID: {}", e.getMessage());
                } finally {
                    latch.countDown();
                }
            });
        }

        try {
            // 等待所有线程完成,最多等待30秒
            if (!latch.await(30, TimeUnit.SECONDS)) {
                log.warn("Timeout waiting for all threads to complete");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.error("Test interrupted: {}", e.getMessage());
        }

        long endTime = System.currentTimeMillis();
        long duration = endTime - startTime;

        executor.shutdownNow(); // 立即关闭线程池

        // 统计结果
        Map<String, Object> result = new LinkedHashMap<>();
        result.put("totalThreads", threadCount);
        result.put("successCount", successCount.get());
        result.put("duplicateCount", duplicateCount.get());
        result.put("errorCount", errorCount.get());
        result.put("timeTakenMs", duration);
        result.put("throughput", (threadCount * 1000.0) / duration); // IDs per second
        result.put("uniqueIds", idMap.size());

        // 添加一些示例ID(前5个)
        result.put("sampleIds", idMap.keySet().stream().limit(5).collect(Collectors.toList()));

        // 结果分析
        if (duplicateCount.get() > 0) {
            result.put("result", "FAILED - Duplicate IDs detected!");
        } else if (errorCount.get() > 0) {
            result.put("result", "PARTIAL SUCCESS - Some errors occurred but no duplicates");
        } else {
            result.put("result", "SUCCESS - All IDs are unique");
        }

        return result;
    }
}

配置文件:

snowflake:
  application-name: service1
  datacenter-id: 1
spring:
  data:
    redis:
      host: localhost
      port: 6379
      database: 1

pom文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.4.4</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>cn.bit</groupId>
    <artifactId>Pro7_1</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>Pro7_1</name>
    <description>Pro7_1</description>

    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
            <version>2.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

测试:

1.workerId自动配置

按序启动同一服务不同实例:

能够看到id正常分配,关闭0和2号服务。

再马上启动2号服务,由于键未过期,此时2号服务分配为4

等待1分半后,之前释放的键过期,启动0号服务

其从低向高检索重新被分配到0号id。

2.多线程id唯一测试

最终测试结果如下:

改进算法对于50000次生成用时24671ms,平均每次生成用时0.49342ms

基础算法对于50000次生成用时24677ms,平均每次生成用时0.49354ms

使用并发控制锁以及缓存(LinkedBlockingQueue 能确保不同线程同时访问队列时不冲突)在保证不发生并发冲突的情况下,以及更多的数据记录和日志输出的情况下仍能和基础算法取得同等速度。

最后:

实习结束了,虽然在实习期间都是做些没啥挑战的业务需求,但好在公司代码的整体框架写的还是不错的,除了让人无语的数据库脏数据问题,整体下来还是不错的,开始结合公司代码研究一些高并发和分布式的东西,欢迎大家关注!


网站公告

今日签到

点亮在社区的每一天
去签到