基于Kafka实现动态监听topic功能

发布于:2025-07-24 ⋅ 阅读:(19) ⋅ 点赞:(0)

生命无罪,健康万岁,我是laity。

我曾七次鄙视自己的灵魂:

第一次,当它本可进取时,却故作谦卑;

第二次,当它在空虚时,用爱欲来填充;

第三次,在困难和容易之间,它选择了容易;

第四次,它犯了错,却借由别人也会犯错来宽慰自己;

第五次,它自由软弱,却把它认为是生命的坚韧;

第六次,当它鄙夷一张丑恶的嘴脸时,却不知那正是自己面具中的一副;

第七次,它侧身于生活的污泥中,虽不甘心,却又畏首畏尾。

基于Kafka实现动态监听topic功能

业务场景:导条根据各家接口进行数据分发其中包含动态kafka-topic,各家通过监听topic实现获取数据从而实现后续业务。

实现逻辑

pom

yaml 方案1 接收的是String

  kafka:
    bootstrap-servers: youKafkaIp:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔
    listener:
      type: batch
    consumer:
      enable-auto-commit: false
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      auto-offset-reset: earliest
      group-id: consumer-sb
    producer:
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      key-serializer: org.apache.kafka.common.serialization.StringSerializer

yaml 方案2 接收的是Byte

  kafka:
    bootstrap-servers: youKafkaIp:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔
    listener:
      type: batch
    consumer:
      enable-auto-commit: false
      value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
      key-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
      auto-offset-reset: earliest
      group-id: consumer-sb
    producer:
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      key-serializer: org.apache.kafka.common.serialization.StringSerializer

收消息CODE

KafkaConfig.java

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor;
import org.springframework.kafka.annotation.KafkaListenerConfigurer;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;

import java.util.HashMap;
import java.util.Map;

/**
 * @author laity
 */
@EnableKafka
@Configuration
public class KafkaConfig {

    // 解决 Could not create message listener - MessageHandlerMethodFactory not set  TODO:WWS 不好使
    /*@Bean
    public KafkaListenerAnnotationBeanPostProcessor kafkaListenerAnnotationBeanPostProcessor() {
        KafkaListenerAnnotationBeanPostProcessor processor = new KafkaListenerAnnotationBeanPostProcessor();
        processor.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());
        return processor;
    }*/

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> map = new HashMap<>();
        map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "youKafkaIp:9092");
        map.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-laity");
        map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        return new DefaultKafkaConsumerFactory<String, String>(map);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(5);
        // new DefaultMessageHandlerMethodFactory()
        return factory;
    }


    // implements KafkaListenerConfigurer + 解决 Could not create message listener - MessageHandlerMethodFactory not set
    /*@Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());
    }*/
}

KafkaListenerController.java

package cn.iocoder.yudao.server.controller.admin.szbl;

import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.server.controller.admin.szbl.common.config.kafka.MyComponent;
import cn.iocoder.yudao.server.controller.admin.szbl.vo.InitSceneVO;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.security.PermitAll;

/**
 * @author laity
 */
@RestController
@RequestMapping("/kafka")
public class KafkaListenerController {

    private final MyComponent component;

    public KafkaListenerController(MyComponent component) {
        this.component = component;
    }

    private String topic;


	// 用于接收导条分发数据接口
    @PostMapping("/reception")
    @PermitAll
    public CommonResult<Boolean> putAwayL(@RequestBody InitSceneVO vo) {
    	// …… 业务逻辑
        // 去执行 监听固定的topic
        component.startListening(vo.getGzTopicName());
        return CommonResult.success(true);
    }
}

DynamicKafkaListenerService.java

import org.springframework.aop.framework.Advised;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.stereotype.Service;

import java.lang.reflect.Method;
import java.time.LocalDateTime;
import java.util.Objects;

/**
 * @author laity 动态管理Kafka监听器
 */
@Service
public class DynamicKafkaListenerService {

    private final KafkaListenerEndpointRegistry registry;

    private final ConcurrentKafkaListenerContainerFactory<String, String> factory;


    @Autowired
    public DynamicKafkaListenerService(KafkaListenerEndpointRegistry registry, ConcurrentKafkaListenerContainerFactory<String, String> factory) {
        this.registry = registry;
        this.factory = factory;
    }

    public void addListener(String topic, String groupId, Object bean, Method method) {
        if (AopUtils.isAopProxy(bean)) {
            try {
                bean = ((Advised) bean).getTargetSource().getTarget();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>();
        assert bean != null;
        endpoint.setBean(bean);
        endpoint.setMethod(method);
        endpoint.setTopics(topic);
        endpoint.setGroup(groupId);
        endpoint.setId(method.getName() + "_" + LocalDateTime.now());
        endpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory()); // 之前怎么点都点不出来这个属性 突然又出来了……无语
        registry.registerListenerContainer(endpoint, factory, true); // 指定容器工厂
    }


    public void removeListener(String beanName) {
        // 断言
        Objects.requireNonNull(registry.getListenerContainer(beanName)).stop();
        registry.unregisterListenerContainer(beanName);
    }
}

BlueKafkaConsumer.java

import org.springframework.stereotype.Component;

/**
 * @author laity
 */
@Component
public class BlueKafkaConsumer {


    // @KafkaListener(topics = "#{__listener.getTopicName()}", groupId = "consumer-laity")
    public void listen(Object record) {
        System.out.println("======================= 接收动态KafkaTopics Received message ========================");
        System.out.println(record.toString());
    }


}

MyComponent.java

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.lang.reflect.Method;

/**
 * @author laity
 */
@Component
public class MyComponent {
    private final DynamicKafkaListenerService kafkaListenerService;
    private final BlueKafkaConsumer blueKafkaConsumer;


    @Autowired
    public MyComponent(DynamicKafkaListenerService kafkaListenerService, BlueKafkaConsumer blueKafkaConsumer) {
        this.kafkaListenerService = kafkaListenerService;
        this.blueKafkaConsumer = blueKafkaConsumer;
    }


    public void startListening(String topic) {
        try {
            Method blueMethod = BlueKafkaConsumer.class.getMethod("listen", Object.class);
            kafkaListenerService.addListener(topic, "consumer-laity", blueKafkaConsumer, blueMethod);
        } catch (NoSuchMethodException e) {
            throw new RuntimeException(e);
        }
    }

    public void stopListening(String beanName) {
        kafkaListenerService.removeListener(beanName);
    }

    // init
    @PostConstruct // 这个是服务启动时调用 但我想要的时实时可变的
    public void init() {
    }

}

世界上最可贵的两个词,一个叫认真,一个叫坚持,认真的人改变自己,坚持的人改变命运,有些事情不是看到了希望才去坚持,而是坚持了才有希望。我是Laity,正在前进的Laity。