分布式微服务系统架构第103集:高性能落地实践,Kafka,分布式锁,缓存双写架构

发布于:2025-04-13 ⋅ 阅读:(25) ⋅ 点赞:(0)

加群联系作者vx:xiaoda0423

仓库地址:https://webvueblog.github.io/JavaPlusDoc/

https://1024bat.cn/

JVM 维度调优说明

优化项

原因

避免频繁 new HashMap()

提前预估容量,减少扩容频率

复用 SimpleDateFormat

避免创建新对象,减少 GC 压力

使用 StringBuilder

替代 += 字符串拼接,提升效率

Bean 拷贝中使用 getNullPropertyNames

防止覆盖有效字段,提高安全性

if-else 简化与空判断提前

提前返回可减少嵌套和冗余分支

优化点说明:

优化项

描述

好处

ConcurrentHashMap.newKeySet()

 替代 HashSet

使用线程安全的集合处理并发场景

防止高并发下的集合并发异常,提高稳定性

final static

 常量

如 KEYSPACEPRE_TABLE_NAMELogger

避免每次访问创建,提高访问效率,便于 JVM 编译器优化

合并 checkTableExist / checkBackupTableExist 为 checkAndCreateTable

避免重复逻辑代码

精简、易维护,提高可读性

优化字段命名:tableNa -> tableNameiData -> data

统一命名规范

提高可读性、代码整洁度

字段 @Autowired 改为 private

遵循封装性和代码规范

减少暴露字段,提高类的封装性

日志组件 Logger 加上 final

常规建议

提高可读性和线程安全性标识

保留注释但更清晰

仅保留核心注释

代码更清爽,降低维护成本

异步写入前不记录日志

如非调试场景可省略日志

避免日志IO影响性能,可由 AOP 全局处理日志

SimpleDateFormat

 每次创建

没有复用静态变量

避免线程不安全问题(局部变量线程安全)


⚙️ JVM 层面性能优化说明:

调整点

JVM 层影响

性能收益

使用局部变量的 SimpleDateFormat

避免静态共享导致线程安全问题,减少锁竞争

线程安全、性能提升

避免频繁创建 String 拼接语句(如 tableName

使用 final 减少 GC 压力

更少的临时对象,GC 频率降低

ConcurrentHashMap

 替代 HashSet

减少 synchronized 同步代码块

并发效率提升,锁竞争减少

精简方法体逻辑

JVM JIT 更容易进行方法内联优化

增强运行期性能

使用 ((Id,day), xxx, xxx, xxx) 的复合主键,有利于 Cassandra 的查询性能(分区+排序)

使用特点总结

特点

说明

📅 表结构适用于 按月分表 场景(如车载数据、日志、轨迹)

🚀 搭配 insertAsync() 等异步写入,适合 高并发写入 的场景

📊 表结构固定,便于数据仓库处理、日志追踪、消息还原等场景

🔧 灵活生成、可扩展性强,可未来支持版本号、标签等字段

精简优化点说明

优化点

原因/好处

✅ 用 String.valueOf() 替代 toString()

防止 NPE(更健壮)

✅ 使用 equals("xxx") 而非反过来

防止 eventId 为 null 时抛异常

✅ 提前 try,再 finally ack.acknowledge()

无论是否异常,都提交 offset,避免消费重复

✅ logger.info("xxx {}", var)

使用参数化日志格式,避免无谓字符串拼接(更高效)

AppListener 用于系统启动后的初始化工作

// 启动 TCP 服务监听线程
new Thread(() -> {
    try {
        Server.bind(serverConfig.getPort());
        logger.info("TCP Server started on port {}", serverConfig.getPort());
    } catch (Exception e) {
        logger.error("Server 启动失败", e);
    }
}, "Server-Thread").start();

调优与说明

优化点

说明

✅ Thread 命名

方便在 JVM 工具如 JVisualVM 中观察线程

✅ isInterrupted() 检查

避免无限死循环,增加线程可控性(便于后期 shutdown hook)

✅ 日志等级优化

同步耗时

 改为 debug,生产环境降低无关日志开销

✅ System.exit(1)

明确非正常退出

  • ✅ 可用线程池(如 Spring @Async + 配置线程池)替代裸线程。

  • ✅ 将线程改为 守护线程setDaemon(true))可选优化。

  • ✅ 后期支持优雅关闭,可以整合 Spring SmartLifecycle

配置线程池(推荐)
@Configuration
@EnableAsync
public class AsyncConfig {
    @Bean("customTaskExecutor")
    public Executor asyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(4);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("startup-task-");
        executor.initialize();
        return executor;
    }
}

✅ Thread.currentThread().isInterrupted()

检查当前线程是否已被中断(非清除标志位的查询)。

  • 返回 true:线程被中断。

  • 返回 false:线程未被中断。

🧠 不会清除中断标志位,适合在循环中定期检查是否应退出:

while (!Thread.currentThread().isInterrupted()) {
    // do work
}

✅ Thread.currentThread().interrupt()

设置中断标志位为 true,告诉线程:“你该停下来了”。

通常不是自己调用自己,而是从外部线程对目标线程进行中断通知:

Thread t = new Thread(() -> {
    while (!Thread.currentThread().isInterrupted()) {
        // do something
    }
});
t.start();

// 某些时刻需要中断它
t.interrupt(); // 设置中断标志位为 true

❗和 Thread.interrupted() 的区别

方法

作用

是否清除中断标志

Thread.currentThread().isInterrupted()

查询当前线程是否被中断

❌ 不清除

Thread.interrupted()

查询并清除当前线程的中断状态

✅ 清除

Thread.interrupt()

设置目标线程中断状态

-


⚠️ 中断不会立刻停止线程!

中断是一种“协商式停止”:

  • 线程收到中断信号后,需要自行检查中断标志并优雅退出

  • 否则它会继续运行。


🔄 结合 sleep()wait()join()

如果线程在执行这些方法时被中断,会抛出 InterruptedException,同时中断标志会被清除,需要手动再次调用 interrupt()

try {
    Thread.sleep(1000);
} catch (InterruptedException e) {
    Thread.currentThread().interrupt(); // 保留中断标志
    log.warn("线程睡眠中被中断");
}

✅ 实用模板

public void runTask() {
    while (!Thread.currentThread().isInterrupted()) {
        try {
            // 任务逻辑
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt(); // 恢复中断状态
            break; // 或 return,退出循环
        }
    }
    log.info("任务已中断并退出");
}

Kafka + 分布式锁 + 缓存双写架构

🔧 架构概览图

Kafka -> Spring KafkaListener
             |
             v
    分布式锁(Redisson)
             |
             v
      数据库查询 + 缓存双写(防穿透 + 延迟双删等)
             |
             v
         ack/commit offset

✅ Kafka 消费 + Redisson 分布式锁 + 缓存双写

消息处理类(以订单为例)

@Slf4j
@Component
@RequiredArgsConstructor
public class OrderKafkaConsumer {

    private final RedissonClient redissonClient;
    private final OrderService orderService;
    private final CacheService cacheService;
    private final BloomFilter<String> bloomFilter;

    private static final String LOCK_PREFIX = "lock:order:";

    @KafkaListener(topics = "order-events", containerFactory = "kafkaListenerContainerFactory")
    public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String orderId = record.key();
        String message = record.value();

        if (!bloomFilter.mightContain(orderId)) {
            log.warn("BloomFilter reject orderId={}", orderId);
            ack.acknowledge(); // 直接ack,避免重复消费
            return;
        }

        String lockKey = LOCK_PREFIX + orderId;
        RLock lock = redissonClient.getLock(lockKey);

        try {
            if (lock.tryLock(10, 30, TimeUnit.SECONDS)) {
                // 业务处理逻辑:缓存双写
                handleOrderMessage(orderId, message);
            } else {
                log.warn("Order {} is already being processed.", orderId);
            }
        } catch (Exception e) {
            log.error("Kafka消费失败: {}", e.getMessage(), e);
        } finally {
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
            ack.acknowledge(); // 无论成功失败都提交offset,避免阻塞(失败后消息需通过日志+补偿机制处理)
        }
    }

    private void handleOrderMessage(String orderId, String message) {
        // 1. 查询数据库
        Order order = orderService.findById(orderId);
        if (order == null) {
            log.warn("Order not found: {}", orderId);
            return;
        }

        // 2. 缓存写入(双写)
        cacheService.saveOrderToCache(order);

        // 3. 异步延迟双删逻辑(可选)
        CompletableFuture.delayedExecutor(5, TimeUnit.SECONDS).execute(() -> {
            cacheService.deleteOrderCache(orderId);
        });

        log.info("Order processed and cached: {}", orderId);
    }
}

✅ 关键组件说明

Redisson 分布式锁配置

@Bean
public RedissonClient redissonClient() {
    Config config = new Config();
    config.useSingleServer().setAddress("redis://localhost:6379");
    return Redisson.create(config);
}

缓存双写(CacheService

public interface CacheService {
    void saveOrderToCache(Order order);
    void deleteOrderCache(String orderId);
}
@Service
public class RedisCacheService implements CacheService {

    private final RedisTemplate<String, Object> redisTemplate;

    public void saveOrderToCache(Order order) {
        String key = "order:" + order.getId();
        redisTemplate.opsForValue().set(key, order, Duration.ofMinutes(10));
    }

    public void deleteOrderCache(String orderId) {
        String key = "order:" + orderId;
        redisTemplate.delete(key);
    }
}

✅ 可选增强

功能

说明

布隆过滤器

防止无效缓存穿透,提前拦截

延迟双删

防止数据库数据修改后,旧缓存再次被设置

死信队列 (DLQ)

消费失败后持久化并异步补偿

Tracing + Metrics

接入链路追踪、埋点监控 Kafka 消费性能

消费重试机制

使用 Spring Retry 或 SeekToCurrentErrorHandler 重试消费