Spring框架集成Kakfa的方式

发布于:2025-09-01 ⋅ 阅读:(24) ⋅ 点赞:(0)

Spring框架集成Kakfa的方式

springboot集成kafka的方式

添加maven依赖

<dependency>
    <groupId>org.mybatis.spring.boot</groupId>
    <artifactId>mybatis-spring-boot-starter</artifactId>
    <version>2.3.0</version>
</dependency>

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

配置application.yml

spring:
  kafka:
    producer:
      bootstrap-servers: ip:port
      topics: topics
      retries: 0
      batch-size: 16384
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      topics: topics
      bootstrap-servers: ip:port
      group-id: group_id
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        security.protocol: SASL_SSL
        sasl.mechanism: PLAIN
        sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";
        ssl.truststore.location: client.truststore.jks
        ssl.truststore.password: trus_password
        ssl.endpoint.identification.algorithm:

创建kafka生产者和消费者

在Spring Boot应用中,正确配置application.propertiesapplication.yml后,Spring Boot的Kafka自动配置(KafkaAutoConfiguration)会自动创建和装配KafkaTemplateKafkaConsumer等相关的Bean。

  • KafkaTemplate:用于发送消息到Kafka

  • ConsumerFactory:创建Kafka消费者的工厂

  • KafkaListenerContainerFactory:为@KafkaListener方法创建消息监听容器。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class KafkaMessageService {

    @Value("${spring.kafka.producer.topics}")
    private String outputTopic;
    
    @Autowired
    private final KafkaTemplate<String, String> kafkaTemplate;
    
    /**
     * 监听输入主题的消息
     * @param message 接收到的消息
     */
    @KafkaListener(topics = "${spring.kafka.consumer.topics}")
    public void listen(String message) {
        log.info("Received message:  message = {}", topic, message);
        
        // todo 处理消息
        
        // 发送到输出主题
        kafkaTemplate.send(outputTopic, processedMessage);
        log.info("Sent Processed Message: {}", processedMessage);
    }
}

手动配置kafka生产者和消费者

如果需要更复杂的配置,也可以自定义kafka的配置类。

kafka消费者配置类:

@Configuration
@Slf4j
public class KafkaConsumerConfig {
    
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);

        props.put("security.protocol", securityProtocol);
        props.put("sasl.mechanism", saslMechanism);
        props.put("sasl.jaas.config", saslJaasConfig);
        props.put("ssl.truststore.location", truststoreLocation);
        props.put("ssl.truststore.password", truststorePassword);
        props.put("ssl.endpoint.identification.algorithm", endpointIdentificationAlgorithm);

        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        // 设置并发消费者数量,模拟多个独立的消费者并发处理消息
        factory.setConcurrency(3);
        // 设置手动提交
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }

kafka生产者配置类:

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.producer.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.producer.key-serializer}")
    private String keyDeserializer;

    @Value("${spring.kafka.producer.value-serializer}")
    private String valueDeserializer;

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>(4);
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keyDeserializer);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueDeserializer);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
        return new KafkaTemplate<>(producerFactory);
    }
}

监听消息并处理:

@Component
@Slf4j
public class KafkaMessageProcess {

    @Value("${spring.kafka.producer.topics}")
    private String outTopic;

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    @KafkaListener(topics = "#{'${spring.kafka.consumer.topics}'.split(',')}")
    public void listen(@Payload String message,
                        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                        Acknowledgment acknowledgment) {
        log.info("Received message: topic = {}, message = {}", topic, message);

        // 手动确认消息,提交当前消息的偏移量(offset)到Kafka。Kafka会记录这个偏移量,表示该消息(及之前的所有消息)已被成功消费。
        acknowledgment.acknowledge();
    }
    
    private void process(String message) {
        // todo process msg
    }

}

KafkaListener 源码分析

@KafkaListener 的注册

  1. 扫描注解:在bean初始化阶段,KafkaListenerAnnotationBeanPostProcessor 由于实现了BeanPostProcessor,会扫描所有 Bean,查找 @KafkaListener 注解
KafkaListenerAnnotationBeanPostProcessor

// 注:省略了部分代码
// BeanPostProcessor接口提供的方法,是 Spring 框架的核心扩展机制之一,允许在 Bean 初始化后进行自定义处理。
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
    
    // 扫描标注了KafkaListener的类
    Collection<KafkaListener> classLevelListeners = this.findListenerAnnotations(targetClass);
    
    // 扫描标注了KafkaListener的方法
    Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, (methodx) -> {
        Set<KafkaListener> listenerMethods = this.findListenerAnnotations(methodx);
        return !listenerMethods.isEmpty() ? listenerMethods : null;
    });
    
    // 遍历扫描到的方法,解析签名
    Iterator var13 = annotatedMethods.entrySet().iterator();
    
    Map.Entry<Method, Set<KafkaListener>> entry = (Map.Entry)var13.next();
    Method method = (Method)entry.getKey();
    Iterator var11 = ((Set)entry.getValue()).iterator();

    while(var11.hasNext()) {
        KafkaListener listener = (KafkaListener)var11.next();
        // 扫描到后,后续的解析注册逻辑
        this.processKafkaListener(listener, method, bean, beanName);
    }

    return bean;
}
  1. 解析注解:提取 topicsgroupIdcontainerFactory 等信息。
KafkaListenerAnnotationBeanPostProcessor

protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener, Object bean, String beanName, String[] topics, TopicPartitionOffset[] tps) {
    // 解析注解,将注解元数据、方法、bean等静态配置封装到endpoint
    this.processKafkaListenerAnnotation(endpoint, kafkaListener, bean, topics, tps);
    
    String containerFactory = this.resolve(kafkaListener.containerFactory());
    KafkaListenerContainerFactory<?> listenerContainerFactory = this.resolveContainerFactory(kafkaListener, containerFactory, beanName);
    // 将上一步扫描到的listener、method等封装成endpoint,进行注册
    this.registrar.registerEndpoint(endpoint, listenerContainerFactory);
}
  1. 注册监听端点:调用 KafkaListenerEndpointRegistrar.registerEndpoint() 注册监听器。
KafkaListenerEndpointRegistrar
    
public void registerEndpoint(KafkaListenerEndpoint endpoint, @Nullable KafkaListenerContainerFactory<?> factory) {
    KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);
    synchronized(this.endpointDescriptors) {
        // 是否立即启动,
        // true:立即创建并启动对应的 MessageListenerContainer(Kafka 消费者容器)
        // false: 仅将端点信息保存到 endpointDescriptors 集合中,后续统一创建并启动
        if (this.startImmediately) {
            this.endpointRegistry.registerListenerContainer(descriptor.endpoint, this.resolveContainerFactory(descriptor), true);
        } else {
            this.endpointDescriptors.add(descriptor);
        }

    }
}

// 统一创建KafkaMessageListenerContainer并启动
protected void registerAllEndpoints() {
    synchronized (this.endpointDescriptors) {
        for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
            this.endpointRegistry.registerListenerContainer(
                    descriptor.endpoint, resolveContainerFactory(descriptor));
        }
        this.startImmediately = true;  // trigger immediate startup
    }
}

public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,
			boolean startImmediately) {

    synchronized (this.listenerContainers) {
        // 创建MessageListenerContainer,这个方法是创建容器的环节,源码看下一节分析
        MessageListenerContainer container = createListenerContainer(endpoint, factory);
        // 将创建好的容器放到一个线程安全的map中
		this.listenerContainers.put(id, container);
        
        if (startImmediately) {
            // 启动
            startIfNecessary(container);
        }
    }
}

KafkaListenerContainerFactory 创建监听容器

KafkaMessageListenerContainer 是 Spring Kafka 的核心组件之一,负责 管理和执行 Kafka 消费者的消息监听逻辑,封装了原生 KafkaConsumer,提供了线程管理、消息拉取、监听器调用、错误处理等功能。

暂时回到我们开头的配置,这里我们配置的容器创建工厂是ConcurrentKafkaListenerContainerFactoryconcurrency=3表示启动三个线程并发处理消息,这个时候,则会由ConcurrentKafkaListenerContainerFactory创建ConcurrentMessageListenerContainer

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    // 设置并发消费者数量
    factory.setConcurrency(3);
    // 设置手动提交
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
    return factory;
}

在ConcurrentMessageListenerContainer中有一个集合,到时候会根据concurrency创建对应数量的KafkaMessageListenerContainer 子容器。

private final List<KafkaMessageListenerContainer<K, V>> containers = new ArrayList<>();

容器创建代码

AbstractKafkaListenerContainerFactory

// 
public C createListenerContainer(KafkaListenerEndpoint endpoint) {
    C instance = createContainerInstance(endpoint);
    JavaUtils.INSTANCE
            .acceptIfNotNull(endpoint.getId(), instance::setBeanName);
    if (endpoint instanceof AbstractKafkaListenerEndpoint) {
        configureEndpoint((AbstractKafkaListenerEndpoint<K, V>) endpoint);
    }

    endpoint.setupListenerContainer(instance, this.messageConverter);
    // 初始化容器的配置,endpoint中有静态的配置,比如topic信息、KafkaListener标记的方法、bane等,这里会将这些信息复制到容器中,还有
    initializeContainer(instance, endpoint);
    customizeContainer(instance);
    return instance;
}

protected abstract C createContainerInstance(KafkaListenerEndpoint endpoint);


ConcurrentKafkaListenerContainerFactory
// 调用子类的方法,这里是通过模板方法的设计模式,在抽象类中定义好整个流程,具体部分的实现由子类完成
protected ConcurrentMessageListenerContainer<K, V> createContainerInstance(KafkaListenerEndpoint endpoint) {
    TopicPartitionOffset[] topicPartitions = endpoint.getTopicPartitionsToAssign();
    if (topicPartitions != null && topicPartitions.length > 0) {
        ContainerProperties properties = new ContainerProperties(topicPartitions);
        return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);
    }
    else {
        Collection<String> topics = endpoint.getTopics();
        if (!topics.isEmpty()) { // NOSONAR
            ContainerProperties properties = new ContainerProperties(topics.toArray(new String[0]));
            return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);
        }
        else {
            ContainerProperties properties = new ContainerProperties(endpoint.getTopicPattern()); // NOSONAR
            return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);
        }
    }
}

启动容器,消费消息

前面我们提到容器创建好后有一个启动的过程,也就是这一行代码startIfNecessary(container);,会真正启动容器,进一步触发消费者线程(ListenerConsumer)的初始化并开始消息消费流程。

KafkaListenerEndpointRegistrar

private void startIfNecessary(MessageListenerContainer listenerContainer) {
    if ((this.contextRefreshed && this.alwaysStartAfterRefresh) || listenerContainer.isAutoStartup()) {
        listenerContainer.start();
    }
}

// 调用到AbstractMessageListenerContainer的start方法,
public final void start() {
    checkGroupId();
    synchronized (this.lifecycleMonitor) {
        if (!isRunning()) {
            doStart();
        }
    }
}

// 调用到ConcurrentMessageListenerContainer的doStart()方法,执行真正的启动逻辑
protected void doStart() {
    if (!isRunning()) {
        // 根据concurrency创建对应数量的子容器
        for (int i = 0; i < this.concurrency; i++) {
            KafkaMessageListenerContainer<K, V> container =
                    constructContainer(containerProperties, topicPartitions, i);
            configureChildContainer(i, container);
            if (isPaused()) {
                container.pause();
            }
            // 启动子容器
            container.start();
            // 保存到子容器列表
            this.containers.add(container);
        }
    }
}

// 调用到KafkaMessageListenerContainer的doStart,启动子容器
protected void doStart() {
    // 创建消费者线程
    this.listenerConsumer = new ListenerConsumer(listener, listenerType);
    setRunning(true);
    // 阻塞等待消费者线程真正启动完成。
    this.startLatch = new CountDownLatch(1);
    // 提交到线程池,异步启动消费者线程。
    this.listenerConsumerFuture = consumerExecutor
            .submitListenable(this.listenerConsumer);
}

消费消息的逻辑在ListenerConsumer中,该类实现了Runnable接口的run()方法,在run()方法中实现了拉取消息,并通过反射调用我们自定义的业务方法,进行消息处理等自定义逻辑。

ListenerConsumer
    
public void run() {
    while (isRunning()) {
        try {
            // 从kafka拉取消息并通过反射调用业务方法
            pollAndInvoke();
        }
        catch (Exception e) {
            handleConsumerException(e);
        }
        finally {
            clearThreadState();
        }
    }
}

protected void pollAndInvoke() {
    // 拉取消息
    ConsumerRecords<K, V> records = doPoll();
    // 通过反射调用到我们自定义的方法进行消息处理
    invokeIfHaveRecords(records);
}

网站公告

今日签到

点亮在社区的每一天
去签到