spring中的@KafkaListener 注解详解

发布于:2025-06-09 ⋅ 阅读:(17) ⋅ 点赞:(0)

@KafkaListener 是 Spring Kafka 提供的一个核心注解,用于标记一个方法作为 Kafka 消息的消费者。下面是对该注解的详细解析:

基本用法

@KafkaListener(topics = "myTopic", groupId = "myGroup")
public void listen(String message) {
    System.out.println("Received Message: " + message);
}

主要属性

1. 必需属性

  • topics / topicPattern:指定监听的 topic
    • topics:逗号分隔的 topic 列表
    • `topicPattern**:使用正则表达式匹配 topic
@KafkaListener(topics = "topic1,topic2")
// 或
@KafkaListener(topicPattern = "test.*")

2. 消费者配置

  • groupId:指定消费者组 ID
  • containerFactory:指定使用的 KafkaListenerContainerFactory
@KafkaListener(topics = "myTopic", groupId = "myGroup", 
               containerFactory = "myFactory")

3. 消息处理

  • id:为监听器指定唯一 ID
  • concurrency:设置并发消费者数量
@KafkaListener(id = "myListener", topics = "myTopic", concurrency = "3")

4. 高级配置

  • containerGroup:指定容器组(Spring Kafka 2.5+)
  • errorHandler:指定错误处理器
  • idIsGroup:是否使用监听器 ID 作为组 ID(默认 false)

消息处理方法签名

监听器方法可以接受多种形式的参数:

  1. 简单消息处理

    @KafkaListener(topics = "myTopic")
    public void listen(String message) { ... }
    
  2. 带元数据的消息处理

    @KafkaListener(topics = "myTopic")
    public void listen(ConsumerRecord<?, ?> record) { ... }
    
  3. 批量消息处理

    @KafkaListener(topics = "myTopic")
    public void listen(List<String> messages) { ... }
    
  4. 带确认的消息处理

    @KafkaListener(topics = "myTopic")
    public void listen(String message, Acknowledgment ack) {
        // 处理消息后手动确认
        ack.acknowledge();
    }
    

配置选项

可以通过 @KafkaListenercontainerFactory 属性引用自定义配置:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> myFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = 
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(3);
    factory.getContainerProperties().setPollTimeout(3000);
    return factory;
}

@KafkaListener(topics = "myTopic", containerFactory = "myFactory")
public void listen(String message) { ... }

错误处理

可以通过以下方式处理错误:

  1. 配置错误处理器

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setErrorHandler(new SeekToCurrentErrorHandler());
        return factory;
    }
    
  2. 使用 @SendTo 发送到死信队列

    @KafkaListener(topics = "myTopic", groupId = "myGroup")
    @SendTo("myDltTopic")
    public String listen(String message) {
        // 处理失败时返回错误消息
        return "error";
    }
    

注意事项

  1. 监听器方法应该是 public 的
  2. 避免在监听器方法中执行长时间运行的操作
  3. 考虑消息处理的幂等性
  4. 对于批量处理,确保方法参数是 List 类型
  5. 在 Spring Boot 中,许多配置可以通过 application.properties/yml 设置

完整示例

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroup");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
}

@Service
public class KafkaMessageListener {

    @KafkaListener(topics = "myTopic", groupId = "myGroup", 
                   containerFactory = "kafkaListenerContainerFactory")
    public void listen(String message, Acknowledgment ack) {
        try {
            System.out.println("Received Message: " + message);
            // 业务处理逻辑
            ack.acknowledge();
        } catch (Exception e) {
            // 错误处理
        }
    }
}

@KafkaListener 注解提供了灵活的方式来消费 Kafka 消息,开发者可以根据具体需求进行配置和扩展。

ConcurrentKafkaListenerContainerFactory详解

在Spring Kafka中,ConcurrentKafkaListenerContainerFactory是一个核心配置类,用于创建并发消息监听容器,支持多线程消费Kafka消息,以下是其详细介绍:

1、核心作用

  1. 并发消费支持:通过创建多个KafkaMessageListenerContainer实例(每个对应一个线程),实现多线程并发消费消息。例如设置concurrency=3会创建3个消费者线程,每个线程处理分配到的分区。
  2. 线程安全保障:生成的ConcurrentMessageListenerContainer内部委托给多个单线程的KafkaMessageListenerContainer实例,保证线程安全性(Kafka Consumer本身非线程安全)。

2、关键特性

  1. 并发度配置

    • 通过setConcurrency()方法设置并发消费者数量,可提高消息处理速度和吞吐量。
    • 配置规则为concurrency<=分区数/应用实例数,设置过多会导致线程闲置。
  2. 批量处理支持

    • 通过setBatchListener(true)启用批量消费
    • 配合MAX_POLL_RECORDS_CONFIG参数控制单次poll最大返回记录数
  3. 错误处理机制

    • 可配置自定义错误处理器(如SeekToCurrentErrorHandler
    • 支持重试策略集成
  4. 分区分配控制

    • 可自定义分区分配逻辑
    • 配合group.id实现消费者组协调

3、配置示例

@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3); // 设置并发消费者数量
        factory.setBatchListener(true); // 启用批量消费
        factory.getContainerProperties().setPollTimeout(3000); // 设置轮询超时
        factory.setErrorHandler(new SeekToCurrentErrorHandler()); // 设置错误处理器
        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); // 批量消费配置
        return new DefaultKafkaConsumerFactory<>(props);
    }
}

4、使用场景

  1. 高吞吐量需求:通过增加并发消费者数量提升处理能力
  2. 批量数据处理:需要批量处理消息的场景
  3. 复杂错误处理:需要自定义错误处理逻辑的场景
  4. 多主题监听:需要同时监听多个主题的场景

5、注意事项

  1. 顺序性问题:并发消费可能导致消息顺序混乱,需业务保证
  2. 重复处理问题:需实现幂等性处理机制
  3. 数据库访问:需注意并发访问控制
  4. 资源限制:并发度设置需考虑系统资源限制

在这里插入图片描述


网站公告

今日签到

点亮在社区的每一天
去签到