【SpringBoot】集成kafka之生产者、消费者、幂等性处理和消息积压

发布于:2025-05-14 ⋅ 阅读:(13) ⋅ 点赞:(0)

配置文件 application.properties

# 应用配置
server.port=8080 # 应用端口
spring.application.name=kafka-demo # 应用名称

# Kafka配置
kafka.bootstrap-servers=localhost:9092 # Kafka服务器地址
kafka.consumer.group-id=demo-group # 消费者组ID
kafka.consumer.concurrency=5 # 消费者并发数

# 日志配置
logging.level.root=INFO # 根日志级别
logging.level.org.springframework.kafka=INFO # Spring Kafka日志级别
logging.level.com.example=DEBUG # 应用自定义代码日志级别

# 异步配置
spring.task.execution.pool.core-size=10 # 异步任务线程池核心大小
spring.task.execution.pool.max-size=20 # 异步任务线程池最大大小
spring.task.execution.pool.queue-capacity=100 # 异步任务队列容量     

启动类 Application

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

/**
 * 应用启动类
 */
@SpringBootApplication // 启用Spring Boot自动配置
@EnableScheduling // 启用定时任务支持,用于消息积压监控
public class DemoApplication {
    public static void main(String[] args) {
        // 启动Spring Boot应用
        SpringApplication.run(DemoApplication.class, args);
    }
}    

Kafka 配置

package com.example.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka // 启用Kafka注解支持
public class KafkaConfig {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${kafka.consumer.group-id}")
    private String groupId;

    @Value("${kafka.consumer.concurrency:5}")
    private int concurrency;

    // 生产者配置 - 确保消息可靠发送
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.ACKS_CONFIG, "all"); // 所有副本都确认后才认为消息发送成功
        configProps.put(ProducerConfig.RETRIES_CONFIG, 3); // 消息发送失败时的重试次数
        configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 开启幂等性,防止消息重复发送
        configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); // 允许5个未确认请求
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    // 消费者配置 - 确保消息可靠消费
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 从最早的消息开始消费
        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁用自动提交,使用手动提交
        configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); // 每次拉取的最大消息数
        return new DefaultKafkaConsumerFactory<>(configProps);
    }

    // 普通消息消费者容器工厂
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency); // 并发消费者数量,应根据分区数调整
        factory.getContainerProperties().setPollTimeout(3000); // 拉取超时时间
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); // 手动立即提交
        factory.setMessageConverter(new StringJsonMessageConverter()); // JSON消息转换器
        return factory;
    }

    // 用于处理积压消息的消费者配置 - 增加了并发度和批量处理能力
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> backlogKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(10); // 更高的并发数处理积压
        factory.getContainerProperties().setPollTimeout(3000);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.setBatchListener(true); // 启用批量消费模式
        return factory;
    }
}    

Message 消息实体类

package com.example.entity;

import java.io.Serializable;
import java.util.UUID;

/**
 * 消息实体类,用于封装消息内容和元数据
 */
public class Message implements Serializable {
    private static final long serialVersionUID = 1L;
    
    private String id; // 消息唯一标识,用于幂等性检查
    private String content; // 消息内容
    private long timestamp; // 消息创建时间戳
    private String traceId; // 跟踪ID,用于分布式追踪

    public Message() {
        this.id = UUID.randomUUID().toString(); // 自动生成唯一ID
        this.timestamp = System.currentTimeMillis();
    }

    public Message(String content) {
        this();
        this.content = content;
    }

    // Getters and Setters
    public String getId() { return id; }
    public void setId(String id) { this.id = id; }
    public String getContent() { return content; }
    public void setContent(String content) { this.content = content; }
    public long getTimestamp() { return timestamp; }
    public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
    public String getTraceId() { return traceId; }
    public void setTraceId(String traceId) { this.traceId = traceId; }

    @Override
    public String toString() {
        return "Message{id='" + id + "', content='" + content + "', timestamp=" + timestamp + '}';
    }
}    

MessageRepository 消息处理

package com.example.repository;

import org.springframework.stereotype.Repository;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 消息处理记录仓库,用于存储和查询已处理的消息ID
 * 实际生产环境建议使用Redis或数据库实现,确保分布式环境下的幂等性
 */
@Repository
public class MessageRepository {
    // 使用ConcurrentHashMap存储已处理的消息ID,键为消息ID,值为是否处理的标志
    private final ConcurrentHashMap<String, Boolean> processedMessages = new ConcurrentHashMap<>();

    // 检查消息是否已处理
    public boolean exists(String messageId) {
        return processedMessages.containsKey(messageId);
    }

    // 记录消息已处理
    public void save(String messageId) {
        processedMessages.put(messageId, true);
    }

    // 获取已处理消息数量
    public long count() {
        return processedMessages.size();
    }
}    

消息积压监控服务

package com.example.service;

import org.apache.kafka.clients.admin.*;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.util.*;
import java.util.concurrent.ExecutionException;

/**
 * 消息积压监控服务,定期检查Kafka主题的积压情况并触发告警
 */
@Service
public class BacklogMonitorService {

    private final AdminClient adminClient;

    public BacklogMonitorService() {
        // 创建Kafka管理客户端,用于获取主题和消费者组信息
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        this.adminClient = AdminClient.create(props);
    }

    // 定时任务:每5分钟检查一次消息积压情况
    @Scheduled(fixedRate = 300000)
    public void monitorBacklog() {
        try {
            // 获取所有主题
            ListTopicsResult topics = adminClient.listTopics();
            Collection<TopicDescription> topicDescriptions = adminClient.describeTopics(topics.names().get()).all().get().values();
            
            // 遍历每个主题,计算积压量
            for (TopicDescription topic : topicDescriptions) {
                if (topic.name().endsWith("-topic")) { // 只监控业务主题
                    long totalBacklog = calculateTopicBacklog(topic.name());
                    System.out.println("主题: " + topic.name() + ", 总积压量: " + totalBacklog);
                    
                    // 积压超过阈值触发告警
                    if (totalBacklog > 100000) {
                        triggerAlert(topic.name(), totalBacklog);
                    }
                }
            }
        } catch (Exception e) {
            // 监控过程中发生异常,记录错误信息
            System.err.println("监控积压时发生异常: " + e.getMessage());
        }
    }

    // 计算指定主题的积压量
    private long calculateTopicBacklog(String topic) throws ExecutionException, InterruptedException {
        // 获取主题各分区的最新偏移量
        Map<TopicPartition, Long> endOffsets = getEndOffsets(topic);
        // 获取消费者组当前消费的偏移量
        Map<TopicPartition, Long> consumerOffsets = getConsumerGroupOffsets("demo-group", topic);
        
        // 计算总积压量:各分区最新偏移量减去消费者当前偏移量
        long totalBacklog = 0;
        for (TopicPartition partition : endOffsets.keySet()) {
            long endOffset = endOffsets.get(partition);
            long consumerOffset = consumerOffsets.getOrDefault(partition, 0L);
            totalBacklog += endOffset - consumerOffset;
        }
        
        return totalBacklog;
    }

    // 获取主题各分区的最新偏移量
    private Map<TopicPartition, Long> getEndOffsets(String topic) throws ExecutionException, InterruptedException {
        List<TopicPartition> partitions = new ArrayList<>();
        DescribeTopicsResult describeTopics = adminClient.describeTopics(Collections.singletonList(topic));
        TopicDescription topicDescription = describeTopics.values().get(topic).get();
        
        // 构建主题分区列表
        for (PartitionInfo partition : topicDescription.partitions()) {
            partitions.add(new TopicPartition(topic, partition.partition()));
        }
        
        // 获取各分区的最新偏移量
        return adminClient.endOffsets(partitions).get();
    }

    // 获取消费者组在指定主题各分区的当前偏移量
    private Map<TopicPartition, Long> getConsumerGroupOffsets(String groupId, String topic) throws ExecutionException, InterruptedException {
        Map<TopicPartition, Long> offsets = new HashMap<>();
        ListConsumerGroupOffsetsResult consumerGroupOffsets = adminClient.listConsumerGroupOffsets(groupId);
        
        // 提取指定主题的偏移量信息
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : consumerGroupOffsets.partitionsToOffsetAndMetadata().get().entrySet()) {
            if (entry.getKey().topic().equals(topic)) {
                offsets.put(entry.getKey(), entry.getValue().offset());
            }
        }
        
        return offsets;
    }

    // 触发积压告警,实际项目中可集成邮件、短信或监控系统
    private void triggerAlert(String topic, long backlogSize) {
        System.out.println("告警: 主题 " + topic + " 积压量达到 " + backlogSize + " 条消息!");
        // 可扩展实现:
        // 1. 发送邮件通知
        // 2. 调用短信API
        // 3. 集成监控系统如Prometheus和Grafana
    }
}    

Kafka消息消费者服务

package com.example.service;

import com.example.entity.Message;
import com.example.repository.MessageRepository;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;

import java.util.List;

/**
 * Kafka消息消费者服务,负责从Kafka主题消费消息并进行业务处理
 */
@Service
public class KafkaConsumerService {

    @Autowired
    private MessageRepository messageRepository;

    private final ObjectMapper objectMapper = new ObjectMapper(); // JSON序列化/反序列化工具

    // 消费普通消息,单条处理模式
    @KafkaListener(topics = "demo-topic", groupId = "demo-group")
    public void listen(String messageJson, Acknowledgment acknowledgment) {
        try {
            // 1. 从JSON字符串解析消息对象
            Message message = objectMapper.readValue(messageJson, Message.class);
            
            // 2. 幂等性检查:检查消息是否已经处理过
            if (messageRepository.exists(message.getId())) {
                System.out.println("重复消息,跳过处理: " + message.getId());
                acknowledgment.acknowledge(); // 提交偏移量
                return;
            }
            
            // 3. 处理消息业务逻辑
            processMessage(message);
            
            // 4. 记录消息已处理
            messageRepository.save(message.getId());
            
            // 5. 手动提交偏移量,表示消息已成功处理
            acknowledgment.acknowledge();
            
        } catch (Exception e) {
            // 处理异常情况
            System.err.println("处理消息失败: " + e.getMessage());
            // 可以实现重试逻辑或发送到死信队列
            acknowledgment.acknowledge(); // 这里选择提交,避免重复消费导致处理失败
        }
    }

    // 消费积压消息,批量处理模式
    @KafkaListener(topics = "backlog-topic", groupId = "backlog-group", 
                  containerFactory = "backlogKafkaListenerContainerFactory")
    public void listenBatch(List<String> messages, Acknowledgment acknowledgment) {
        System.out.println("接收到批量消息: " + messages.size());
        
        for (String messageJson : messages) {
            try {
                // 解析消息
                Message message = objectMapper.readValue(messageJson, Message.class);
                
                // 幂等性检查并处理
                if (!messageRepository.exists(message.getId())) {
                    processMessage(message);
                    messageRepository.save(message.getId());
                }
            } catch (Exception e) {
                // 处理单条消息异常,不影响其他消息处理
                System.err.println("处理批量消息中的一条失败: " + e.getMessage());
            }
        }
        
        // 批量处理完成后,统一提交偏移量
        acknowledgment.acknowledge();
    }

    // 消息业务处理逻辑
    private void processMessage(Message message) {
        // 模拟业务处理逻辑
        System.out.println("处理消息: " + message);
        
        // 模拟耗时操作,实际项目中替换为真实业务逻辑
        try {
            Thread.sleep(10); // 每条消息处理10ms
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}    

Kafka消息生产者服务

package com.example.service;

import com.example.entity.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

/**
 * Kafka消息生产者服务,负责发送消息到Kafka主题
 */
@Service
public class KafkaProducerService {

    private static final String TOPIC = "demo-topic"; // 普通消息主题
    private static final String BACKLOG_TOPIC = "backlog-topic"; // 用于测试积压的主题

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    // 发送普通消息,带异步回调确认
    public void sendMessage(Message message) {
        // 发送消息,使用消息ID作为键,确保相同ID的消息发送到同一分区
        ListenableFuture<SendResult<String, String>> future = 
            kafkaTemplate.send(TOPIC, message.getId(), message.getContent());
        
        // 添加异步回调,处理发送结果
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                // 消息发送成功,记录发送信息
                System.out.println("消息发送成功: " + message.getId() + 
                                   " 分区: " + result.getRecordMetadata().partition() +
                                   " 偏移量: " + result.getRecordMetadata().offset());
            }

            @Override
            public void onFailure(Throwable ex) {
                // 消息发送失败,记录错误信息
                System.err.println("消息发送失败: " + message.getId() + " 错误: " + ex.getMessage());
                // 可实现重试逻辑或记录失败消息到日志
            }
        });
    }

    // 发送大量消息用于测试积压情况
    public void sendBatchMessages(int count) {
        for (int i = 0; i < count; i++) {
            Message message = new Message("Batch message-" + i);
            kafkaTemplate.send(BACKLOG_TOPIC, message.getId(), message.getContent());
            
            // 每发送1000条消息打印一次进度
            if (i % 1000 == 0) {
                System.out.println("已发送 " + i + " 条消息");
            }
        }
    }
}    

API控制器提供测试接口

package com.example.controller;

import com.example.entity.Message;
import com.example.service.KafkaProducerService;
import com.example.repository.MessageRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

/**
 * REST API控制器,提供测试接口
 */
@RestController
@RequestMapping("/api")
public class DemoController {

    @Autowired
    private KafkaProducerService producerService;

    @Autowired
    private MessageRepository messageRepository;

    // 发送单条消息的API接口
    @PostMapping("/send")
    public String sendMessage(@RequestBody String content) {
        // 创建消息对象
        Message message = new Message(content);
        // 调用生产者服务发送消息
        producerService.sendMessage(message);
        return "消息已发送: " + message.getId();
    }

    // 发送批量消息测试积压的API接口
    @PostMapping("/send/batch/{count}")
    public String sendBatchMessages(@PathVariable int count) {
        // 调用生产者服务发送大量消息,用于测试积压情况
        producerService.sendBatchMessages(count);
        return "开始发送 " + count + " 条消息";
    }

    // 查询已处理消息数量的API接口
    @GetMapping("/processed/count")
    public String getProcessedCount() {
        // 查询已处理的消息数量,用于监控处理进度
        return "已处理消息数量: " + messageRepository.count();
    }
}    

关键特性说明

  • 幂等性保障
    • 为每条消息生成全局唯一 ID(UUID)
    • 消费者通过MessageRepository检查消息是否已处理
    • 先检查再处理,处理成功后记录,确保不重复处理

  • 高性能设计
    • 生产者配置:
      • 启用幂等性(ENABLE_IDEMPOTENCE_CONFIG=true)
      • 批量发送
      • 异步发送带回调
    • 消费者配置:
      • 手动提交偏移量
      • 批量消费模式(处理积压)
      • 高并发消费者实例

  • 异常处理机制
    • 生产者端:
      • 异步发送失败时记录错误并支持重试
    • 消费者端:
      • 单条消息处理失败不影响其他消息
      • 提供失败消息处理策略(可扩展)

  • 消息积压解决方案
    • 独立的积压处理消费者组
    • 更高的并发度(10 个消费者实例)
    • 批量消费模式(每次处理 500 条)
    • 定时监控积压量并触发告警

  • 可观测性
    • 积压监控定时任务
    • 处理进度统计 API
    • 详细的日志记录

生产环境建议

  • 替换内存存储
    • 将MessageRepository替换为基于 Redis 或数据库的实现,确保幂等性检查的分布式一致性
  • 扩展积压处理能力
    • 增加分区数提高并行度
    • 部署多个消费者实例
    • 使用独立的积压处理 Topic 和消费者组
  • 完善监控告警
    • 集成 Prometheus 和 Grafana 可视化积压指标
    • 配置多渠道告警(邮件、短信、即时通讯)
  • 增强异常处理
    • 实现死信队列存储失败消息
    • 增加消息重试策略配置