还在重启应用改 Topic?Spring Boot 动态 Kafka 消费的“终极形态”

发布于:2025-09-13 ⋅ 阅读:(21) ⋅ 点赞:(0)

图片

场景描述:
你的一个微服务正在稳定地消费 Kafka 的 order_topic。现在,上游系统为了做业务隔离,新增加了一个 order_topic_vip,并开始向其中投递 VIP 用户的订单。你需要在不重启、不发布新版本的情况下,让你现有的消费者同时开始消费 order_topic_vip 的消息。

这是一个典型的动态运维需求。静态的 @KafkaListener(topics = "order_topic") 注解无法满足这个要求。本文将提供一套完整的解决方案,教你如何利用配置中心(以 Nacos 为例)和 Spring Kafka 的底层 API,实现消费者 Topic 列表的“热更新”。

1. 核心原理:销毁并重建 (Destroy and Rebuild)

Spring Kafka 的消费者容器 (MessageListenerContainer) 在创建时,其核心配置(如监听的 Topic)就已经确定。在运行时直接修改一个正在运行的容器的 Topic 列表,是一种不被推荐且存在风险的操作。

最稳健、最可靠的方案是:

  1. 1. 停止注销监听旧 Topic 的消费者容器。

  2. 2. 根据原始的消费者配置和新传入的 Topic 列表,以编程方式创建一个全新的消费者容器。

  3. 3. 启动这个新的容器。

整个过程对外界来说是“无感”的,最终效果就是消费者监听的 Topic 列表发生了变化。

2. 方案架构

要实现上述流程,我们需要三个关键组件:

  1. 1. 元数据采集器 (BeanPostProcessor): 在应用启动时,扫描并缓存所有 @KafkaListener 的“配置蓝图”(包括 idgroupId, 原始 topics 等)。

  2. 2. 配置中心 (Nacos): 作为动态 Topic 配置的“真理之源”。

  3. 3. 动态刷新服务: 监听 Nacos 的配置变更,并调用 Spring Kafka 的 KafkaListenerEndpointRegistry API 来完成“销毁并重建”的操作。

3. 完整代码实现

这是一个可以直接集成的、完整的解决方案代码。

步骤 3.1: 定义元数据存储

EndpointMetadata.java

package com.example.kafka.dynamic.core;

import java.io.Serializable;
import java.lang.reflect.Method;

// 用于存储 @KafkaListener 的“蓝图”
public class EndpointMetadata implements Serializable {
    private String id;
    private String groupId;
    private String[] topics;
    private Object bean;
    private Method method;
    // ... 可按需添加 concurrency, autoStartup 等其他属性

    // Getters and Setters...
    public String getId() { return id; }
    public void setId(String id) { this.id = id; }
    public String getGroupId() { return groupId; }
    public void setGroupId(String groupId) { this.groupId = groupId; }
    public String[] getTopics() { return topics; }
    public void setTopics(String[] topics) { this.topics = topics; }
    public Object getBean() { return bean; }
    public void setBean(Object bean) { this.bean = bean; }
    public Method getMethod() { return method; }
    public void setMethod(Method method) { this.method = method; }
}

KafkaListenerMetadataRegistry.java (元数据采集与注册)

package com.example.kafka.dynamic.processor;

import com.example.kafka.dynamic.core.EndpointMetadata;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Component
public class KafkaListenerMetadataRegistry implements BeanPostProcessor {

    private final Map<String, EndpointMetadata> metadataStore = new ConcurrentHashMap<>();

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        Class<?> targetClass = AopUtils.getTargetClass(bean);
        for (Method method : targetClass.getMethods()) {
            KafkaListener kafkaListener = AnnotationUtils.findAnnotation(method, KafkaListener.class);
            if (kafkaListener != null && kafkaListener.id() != null && !kafkaListener.id().isEmpty()) {
                EndpointMetadata metadata = new EndpointMetadata();
                metadata.setId(kafkaListener.id());
                metadata.setTopics(kafkaListener.topics());
                metadata.setGroupId(kafkaListener.groupId());
                metadata.setBean(bean);
                metadata.setMethod(method);
                metadataStore.put(kafkaListener.id(), metadata);
            }
        }
        return bean;
    }
    
    public EndpointMetadata getMetadata(String listenerId) {
        return metadataStore.get(listenerId);
    }
}
步骤 3.2: 核心实现:动态刷新服务

DynamicKafkaConsumerService.java

package com.example.kafka.dynamic.service;

import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.example.kafka.dynamic.core.EndpointMetadata;
import com.example.kafka.dynamic.processor.KafkaListenerMetadataRegistry;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;

@Service
public class DynamicKafkaConsumerService {

    private static final Logger log = LoggerFactory.getLogger(DynamicKafkaConsumerService.class);

    @Autowired
    private KafkaListenerEndpointRegistry listenerRegistry;

    @Autowired
    private KafkaListenerContainerFactory<?> kafkaListenerContainerFactory;

    @Autowired
    private KafkaListenerMetadataRegistry metadataRegistry;
    
    @Autowired
    private ConfigService configService; // Nacos Config Service
    
    private final ObjectMapper objectMapper = new ObjectMapper();

    private final String DATA_ID = "dynamic-kafka-topics.json";
    private final String GROUP = "DEFAULT_GROUP";

    @PostConstruct
    public void init() throws Exception {
        // 1. 应用启动时,先拉取一次配置
        String initialConfig = configService.getConfig(DATA_ID, GROUP, 5000);
        if (StringUtils.hasText(initialConfig)) {
            refreshListeners(initialConfig);
        }

        // 2. 注册 Nacos 监听器
        configService.addListener(DATA_ID, GROUP, new Listener() {
            @Override
            public Executor getExecutor() { return null; }

            @Override
            public void receiveConfigInfo(String configInfo) {
                log.info("接收到 Kafka Topic 配置变更:\n{}", configInfo);
                refreshListeners(configInfo);
            }
        });
    }

    public synchronized void refreshListeners(String configInfo) {
        try {
            Map<String, String> configMap = objectMapper.readValue(configInfo, new TypeReference<>() {});
            
            configMap.forEach((listenerId, topics) -> {
                log.info("准备刷新 Listener ID '{}' 的 Topics 为 '{}'", listenerId, topics);
                
                MessageListenerContainer container = listenerRegistry.getListenerContainer(listenerId);
                String[] newTopics = topics.split(",");
                
                // 如果容器存在,且 Topic 列表发生了变化
                if (container != null) {
                    if (!Arrays.equals(container.getContainerProperties().getTopics(), newTopics)) {
                        recreateAndRegisterContainer(listenerId, newTopics);
                    }
                } else {
                    // 如果容器不存在 (可能被手动停止或首次创建),也进行创建
                    recreateAndRegisterContainer(listenerId, newTopics);
                }
            });

        } catch (Exception e) {
            log.error("动态刷新 Kafka 消费者配置失败", e);
        }
    }

    private void recreateAndRegisterContainer(String listenerId, String[] topics) {
        log.info("开始重建并注册 Listener ID '{}'", listenerId);
        
        // 1. 停止并销毁旧容器
        MessageListenerContainer container = listenerRegistry.getListenerContainer(listenerId);
        if (container != null) {
            container.stop();
            // 在 Spring Kafka 2.8+ 中,注销是内部操作,我们只需创建并注册新的即可。
        }

        // 2. 从我们的“蓝图”中获取元数据
        EndpointMetadata metadata = metadataRegistry.getMetadata(listenerId);
        if (metadata == null) {
            log.error("找不到 Listener ID '{}' 的元数据,无法重建。", listenerId);
            return;
        }

        // 3. 创建一个全新的 Endpoint
        MethodKafkaListenerEndpoint<String, String> newEndpoint = new MethodKafkaListenerEndpoint<>();
        newEndpoint.setId(metadata.getId());
        newEndpoint.setGroupId(metadata.getGroupId());
        newEndpoint.setTopics(topics); // <-- 核心:使用新 Topic
        newEndpoint.setBean(metadata.getBean());
        newEndpoint.setMethod(metadata.getMethod());
        newEndpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());
        
        // 4. 注册新的 Endpoint
        listenerRegistry.registerListenerContainer(newEndpoint, kafkaListenerContainerFactory, true);
        log.info("成功重建并启动 Listener ID '{}',现在监听 Topics: {}", listenerId, Arrays.toString(topics));
    }
}

4. 实践演练

步骤 4.1: 业务代码

在你的 Spring Boot 应用中,正常定义你的消费者,但务必提供唯一的 id

@Service
public class OrderEventListener {
    @KafkaListener(id = "order-listener", topics = "order_topic", groupId = "my-group")
    public void handleOrderEvent(String message) {
        System.out.println("收到订单消息: " + message);
    }
}
步骤 4.2: application.yml 配置

确保你的应用连接到了 Nacos。

spring:
  cloud:
    nacos:
      config:
        server-addr: 127.0.0.1:8848
# ... kafka server acls
步骤 4.3: Nacos 配置

在 Nacos 中,创建一个 Data ID 为 dynamic-kafka-topics.jsonGroup 为 DEFAULT_GROUP 的配置,内容为 JSON 格式:

{
  "order-listener": "order_topic"
}

Key (order-listener) 必须与 @KafkaListener 的 id 完全一致。

步骤 4.4: 启动与验证
  1. 1. 启动应用。此时,order-listener 消费者会正常启动,并开始消费 order_topic 的消息。

  2. 2. 动态变更! 去 Nacos 控制台,将配置修改为:
    {
      "order-listener": "order_topic,order_topic_vip"
    }
  3. 3. 点击“发布”。

  4. 4. 观察应用日志。 你会看到类似下面的日志:
    INFO --- [pool-1-thread-1] c.e.k.d.s.DynamicKafkaConsumerService  : 接收到 Kafka Topic 配置变更: ...
    INFO --- [pool-1-thread-1] c.e.k.d.s.DynamicKafkaConsumerService  : 准备刷新 Listener ID 'order-listener' 的 Topics 为 'order_topic,order_topic_vip'
    INFO --- [pool-1-thread-1] c.e.k.d.s.DynamicKafkaConsumerService  : 开始重建并注册 Listener ID 'order-listener'
    ... (旧容器停止的日志) ...
    INFO --- [pool-1-thread-1] c.e.k.d.s.DynamicKafkaConsumerService  : 成功重建并启动 Listener ID 'order-listener',现在监听 Topics: [order_topic, order_topic_vip]
  5. 5. 验证结果。 现在,你的 order-listener 已经开始同时消费 order_topic 和 order_topic_vip 两个 Topic 的消息了,整个过程应用没有重启

总结

通过巧妙地结合 BeanPostProcessorKafkaListenerEndpointRegistry 和动态配置中心,我们实现了一个功能极其强大的动态 Kafka 消费管理方案。


网站公告

今日签到

点亮在社区的每一天
去签到