配置文件 application.properties
# 应用配置
server.port=8080 # 应用端口
spring.application.name=kafka-demo # 应用名称
# Kafka配置
kafka.bootstrap-servers=localhost:9092 # Kafka服务器地址
kafka.consumer.group-id=demo-group # 消费者组ID
kafka.consumer.concurrency=5 # 消费者并发数
# 日志配置
logging.level.root=INFO # 根日志级别
logging.level.org.springframework.kafka=INFO # Spring Kafka日志级别
logging.level.com.example=DEBUG # 应用自定义代码日志级别
# 异步配置
spring.task.execution.pool.core-size=10 # 异步任务线程池核心大小
spring.task.execution.pool.max-size=20 # 异步任务线程池最大大小
spring.task.execution.pool.queue-capacity=100 # 异步任务队列容量
启动类 Application
package com.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
Kafka 配置
package com.example.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.consumer.group-id}")
private String groupId;
@Value("${kafka.consumer.concurrency:5}")
private int concurrency;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.ACKS_CONFIG, "all");
configProps.put(ProducerConfig.RETRIES_CONFIG, 3);
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(3000);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> backlogKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(10);
factory.getContainerProperties().setPollTimeout(3000);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setBatchListener(true);
return factory;
}
}
Message 消息实体类
package com.example.entity;
import java.io.Serializable;
import java.util.UUID;
public class Message implements Serializable {
private static final long serialVersionUID = 1L;
private String id;
private String content;
private long timestamp;
private String traceId;
public Message() {
this.id = UUID.randomUUID().toString();
this.timestamp = System.currentTimeMillis();
}
public Message(String content) {
this();
this.content = content;
}
public String getId() { return id; }
public void setId(String id) { this.id = id; }
public String getContent() { return content; }
public void setContent(String content) { this.content = content; }
public long getTimestamp() { return timestamp; }
public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
public String getTraceId() { return traceId; }
public void setTraceId(String traceId) { this.traceId = traceId; }
@Override
public String toString() {
return "Message{id='" + id + "', content='" + content + "', timestamp=" + timestamp + '}';
}
}
MessageRepository 消息处理
package com.example.repository;
import org.springframework.stereotype.Repository;
import java.util.concurrent.ConcurrentHashMap;
@Repository
public class MessageRepository {
private final ConcurrentHashMap<String, Boolean> processedMessages = new ConcurrentHashMap<>();
public boolean exists(String messageId) {
return processedMessages.containsKey(messageId);
}
public void save(String messageId) {
processedMessages.put(messageId, true);
}
public long count() {
return processedMessages.size();
}
}
消息积压监控服务
package com.example.service;
import org.apache.kafka.clients.admin.*;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.concurrent.ExecutionException;
@Service
public class BacklogMonitorService {
private final AdminClient adminClient;
public BacklogMonitorService() {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
this.adminClient = AdminClient.create(props);
}
@Scheduled(fixedRate = 300000)
public void monitorBacklog() {
try {
ListTopicsResult topics = adminClient.listTopics();
Collection<TopicDescription> topicDescriptions = adminClient.describeTopics(topics.names().get()).all().get().values();
for (TopicDescription topic : topicDescriptions) {
if (topic.name().endsWith("-topic")) {
long totalBacklog = calculateTopicBacklog(topic.name());
System.out.println("主题: " + topic.name() + ", 总积压量: " + totalBacklog);
if (totalBacklog > 100000) {
triggerAlert(topic.name(), totalBacklog);
}
}
}
} catch (Exception e) {
System.err.println("监控积压时发生异常: " + e.getMessage());
}
}
private long calculateTopicBacklog(String topic) throws ExecutionException, InterruptedException {
Map<TopicPartition, Long> endOffsets = getEndOffsets(topic);
Map<TopicPartition, Long> consumerOffsets = getConsumerGroupOffsets("demo-group", topic);
long totalBacklog = 0;
for (TopicPartition partition : endOffsets.keySet()) {
long endOffset = endOffsets.get(partition);
long consumerOffset = consumerOffsets.getOrDefault(partition, 0L);
totalBacklog += endOffset - consumerOffset;
}
return totalBacklog;
}
private Map<TopicPartition, Long> getEndOffsets(String topic) throws ExecutionException, InterruptedException {
List<TopicPartition> partitions = new ArrayList<>();
DescribeTopicsResult describeTopics = adminClient.describeTopics(Collections.singletonList(topic));
TopicDescription topicDescription = describeTopics.values().get(topic).get();
for (PartitionInfo partition : topicDescription.partitions()) {
partitions.add(new TopicPartition(topic, partition.partition()));
}
return adminClient.endOffsets(partitions).get();
}
private Map<TopicPartition, Long> getConsumerGroupOffsets(String groupId, String topic) throws ExecutionException, InterruptedException {
Map<TopicPartition, Long> offsets = new HashMap<>();
ListConsumerGroupOffsetsResult consumerGroupOffsets = adminClient.listConsumerGroupOffsets(groupId);
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : consumerGroupOffsets.partitionsToOffsetAndMetadata().get().entrySet()) {
if (entry.getKey().topic().equals(topic)) {
offsets.put(entry.getKey(), entry.getValue().offset());
}
}
return offsets;
}
private void triggerAlert(String topic, long backlogSize) {
System.out.println("告警: 主题 " + topic + " 积压量达到 " + backlogSize + " 条消息!");
}
}
Kafka消息消费者服务
package com.example.service;
import com.example.entity.Message;
import com.example.repository.MessageRepository;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class KafkaConsumerService {
@Autowired
private MessageRepository messageRepository;
private final ObjectMapper objectMapper = new ObjectMapper();
@KafkaListener(topics = "demo-topic", groupId = "demo-group")
public void listen(String messageJson, Acknowledgment acknowledgment) {
try {
Message message = objectMapper.readValue(messageJson, Message.class);
if (messageRepository.exists(message.getId())) {
System.out.println("重复消息,跳过处理: " + message.getId());
acknowledgment.acknowledge();
return;
}
processMessage(message);
messageRepository.save(message.getId());
acknowledgment.acknowledge();
} catch (Exception e) {
System.err.println("处理消息失败: " + e.getMessage());
acknowledgment.acknowledge();
}
}
@KafkaListener(topics = "backlog-topic", groupId = "backlog-group",
containerFactory = "backlogKafkaListenerContainerFactory")
public void listenBatch(List<String> messages, Acknowledgment acknowledgment) {
System.out.println("接收到批量消息: " + messages.size());
for (String messageJson : messages) {
try {
Message message = objectMapper.readValue(messageJson, Message.class);
if (!messageRepository.exists(message.getId())) {
processMessage(message);
messageRepository.save(message.getId());
}
} catch (Exception e) {
System.err.println("处理批量消息中的一条失败: " + e.getMessage());
}
}
acknowledgment.acknowledge();
}
private void processMessage(Message message) {
System.out.println("处理消息: " + message);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Kafka消息生产者服务
package com.example.service;
import com.example.entity.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
@Service
public class KafkaProducerService {
private static final String TOPIC = "demo-topic";
private static final String BACKLOG_TOPIC = "backlog-topic";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(Message message) {
ListenableFuture<SendResult<String, String>> future =
kafkaTemplate.send(TOPIC, message.getId(), message.getContent());
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
System.out.println("消息发送成功: " + message.getId() +
" 分区: " + result.getRecordMetadata().partition() +
" 偏移量: " + result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
System.err.println("消息发送失败: " + message.getId() + " 错误: " + ex.getMessage());
}
});
}
public void sendBatchMessages(int count) {
for (int i = 0; i < count; i++) {
Message message = new Message("Batch message-" + i);
kafkaTemplate.send(BACKLOG_TOPIC, message.getId(), message.getContent());
if (i % 1000 == 0) {
System.out.println("已发送 " + i + " 条消息");
}
}
}
}
API控制器提供测试接口
package com.example.controller;
import com.example.entity.Message;
import com.example.service.KafkaProducerService;
import com.example.repository.MessageRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api")
public class DemoController {
@Autowired
private KafkaProducerService producerService;
@Autowired
private MessageRepository messageRepository;
@PostMapping("/send")
public String sendMessage(@RequestBody String content) {
Message message = new Message(content);
producerService.sendMessage(message);
return "消息已发送: " + message.getId();
}
@PostMapping("/send/batch/{count}")
public String sendBatchMessages(@PathVariable int count) {
producerService.sendBatchMessages(count);
return "开始发送 " + count + " 条消息";
}
@GetMapping("/processed/count")
public String getProcessedCount() {
return "已处理消息数量: " + messageRepository.count();
}
}
关键特性说明
- 幂等性保障
- 为每条消息生成全局唯一 ID(UUID)
- 消费者通过MessageRepository检查消息是否已处理
- 先检查再处理,处理成功后记录,确保不重复处理
- 高性能设计
- 生产者配置:
- 启用幂等性(ENABLE_IDEMPOTENCE_CONFIG=true)
- 批量发送
- 异步发送带回调
- 消费者配置:
- 手动提交偏移量
- 批量消费模式(处理积压)
- 高并发消费者实例
- 异常处理机制
- 生产者端:
- 消费者端:
- 单条消息处理失败不影响其他消息
- 提供失败消息处理策略(可扩展)
- 消息积压解决方案
- 独立的积压处理消费者组
- 更高的并发度(10 个消费者实例)
- 批量消费模式(每次处理 500 条)
- 定时监控积压量并触发告警
- 可观测性
- 积压监控定时任务
- 处理进度统计 API
- 详细的日志记录
生产环境建议
- 替换内存存储
- 将MessageRepository替换为基于 Redis 或数据库的实现,确保幂等性检查的分布式一致性
- 扩展积压处理能力
- 增加分区数提高并行度
- 部署多个消费者实例
- 使用独立的积压处理 Topic 和消费者组
- 完善监控告警
- 集成 Prometheus 和 Grafana 可视化积压指标
- 配置多渠道告警(邮件、短信、即时通讯)
- 增强异常处理