3.7 Spring Boot整合Kafka:消息顺序性与消费幂等性保障

发布于:2025-03-22 ⋅ 阅读:(12) ⋅ 点赞:(0)

在Spring Boot中整合Kafka并保障消息顺序性与消费幂等性,可以通过以下步骤实现:

一、消息顺序性保障

1. 生产者配置
  • 相同Key写入同一分区:Kafka保证同一分区内消息的顺序性,生产者发送消息时指定相同Key,确保相关消息进入同一分区。
     

    java

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    public void sendMessage(String key, String message) {
        kafkaTemplate.send("my-topic", key, message);
    }
  • 配置重试与飞行请求:防止网络重试导致消息乱序。
     

    properties

    spring.kafka.producer.properties.retries=3
    spring.kafka.producer.properties.max.in.flight.requests.per.connection=1
2. 消费者配置
  • 单线程按分区消费:确保每个分区由单独线程处理,避免并发消费同一分区。
     

    java

    @KafkaListener(topics = "my-topic", concurrency = "3") // 与分区数一致
    public void listen(ConsumerRecord<String, String> record, Acknowledgment ack) {
        processOrderly(record.value());
        ack.acknowledge(); // 手动提交偏移量
    }
  • 配置手动提交偏移量:处理完成后提交,避免消息丢失或重复。
     

    properties

    spring.kafka.consumer.enable-auto-commit=false
    spring.kafka.listener.ack-mode=manual

二、消费幂等性保障

1. 生产者端幂等性

启用Kafka生产者幂等性,防止网络重试导致消息重复:


properties

spring.kafka.producer.enable-idempotence=true
2. 消费者端幂等性处理
  • 唯一标识检查:利用业务唯一标识(如订单ID)进行重复判断。
     

    java

    @KafkaListener(topics = "order-topic")
    public void processOrder(ConsumerRecord<String, Order> record, Acknowledgment ack) {
        String orderId = record.value().getId();
        if (orderService.isOrderProcessed(orderId)) {
            ack.acknowledge();
            return;
        }
        orderService.saveOrder(record.value());
        ack.acknowledge();
    }
  • 数据库唯一约束:通过数据库唯一索引或插入前检查实现。
     

    sql

    CREATE TABLE orders (
        id VARCHAR(50) PRIMARY KEY,
        -- 其他字段
    );

三、完整配置示例

1. 依赖引入

xml

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
2. 生产者配置(application.yml)

yaml

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      enable-idempotence: true
      properties:
        max.in.flight.requests.per.connection: 1
        retries: 3
3. 消费者配置(application.yml)

yaml

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      group-id: my-group
      enable-auto-commit: false
      auto-offset-reset: earliest
    listener:
      ack-mode: manual
      concurrency: 3

四、异常处理与优化

  • 消费者重试策略:使用RetryTemplate处理瞬时故障。
     

    java

    @Bean
    public RetryTemplate retryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(new SimpleRetryPolicy(3));
        return retryTemplate;
    }
  • 死信队列(DLQ)​:处理多次重试失败的消息。
     

    java

    @KafkaListener(topics = "my-topic")
    @RetryableTopic(attempts = "3", dltTopicSuffix = "-dlt")
    public void handleMessage(ConsumerRecord<String, String> record) {
        // 业务处理
    }

五、测试验证

  1. 顺序性测试:发送连续消息(相同Key),观察消费顺序是否一致。
  2. 幂等性测试:重复发送相同消息,检查是否仅处理一次。

通过以上步骤,Spring Boot应用能够确保Kafka消息的顺序性和消费的幂等性,适用于订单处理、状态更新等场景。