Apache Kafka 是一个分布式流处理平台,广泛用于构建实时数据管道、日志聚合系统和事件溯源架构。Spring Boot 提供了对 Kafka 的良好集成支持,使得开发者可以非常便捷地在项目中使用 Kafka。
本文将手把手教你如何在 Spring Boot 项目中集成 Kafka,包括生产者(Producer)和消费者(Consumer)的实现,并提供完整的代码示例。
开发环境准备
- Java 17+
- Maven 或 Gradle
- Spring Boot 3.x
- Apache Kafka 3.0+(本地或远程)
- IDE(如 IntelliJ IDEA、VS Code)
创建 Spring Boot 项目
你可以通过 Spring Initializr 创建一个新的 Spring Boot 项目,选择以下依赖:
- Spring Web
- Spring for Apache Kafka
或者手动添加 pom.xml
中的依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Spring Boot 会自动管理版本兼容性,无需手动指定版本号。
配置 Kafka 连接信息
在 application.yml
或 application.properties
文件中配置 Kafka 相关参数:
application.yml 示例:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: my-group
auto-offset-reset: earliest
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
编写 Kafka 生产者(Producer)
创建一个服务类用于发送消息到 Kafka 主题。
KafkaProducer.java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
System.out.println("Sent message: " + message);
}
}
编写 Kafka 消费者(Consumer)
使用 @KafkaListener
注解监听特定主题的消息。
KafkaConsumer.java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
@KafkaListener(topics = "test-topic", groupId = "my-group")
public void listen(ConsumerRecord<String, String> record) {
System.out.printf("Received message: topic - %s, partition - %d, offset - %d, key - %s, value - %s%n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
}
添加 REST 接口用于测试发送消息
为了方便测试,我们可以创建一个简单的 REST 控制器来触发消息发送。
KafkaController.java
import org.springframework.web.bind.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
@RestController
@RequestMapping("/kafka")
public class KafkaController {
@Autowired
private KafkaProducer kafkaProducer;
@PostMapping("/send")
public String sendMessage(@RequestParam String msg) {
kafkaProducer.sendMessage("test-topic", msg);
return "Message sent: " + msg;
}
}
启动 Kafka 环境(可选)
如果你还没有运行 Kafka,可以按照以下步骤快速启动:
启动 Zookeeper(Kafka 依赖)
bin/zookeeper-server-start.sh config/zookeeper.properties
启动 Kafka 服务
bin/kafka-server-start.sh config/server.properties
创建测试 Topic
bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
测试接口
启动 Spring Boot 应用后,访问如下接口发送消息:
POST http://localhost:8080/kafka/send?msg=HelloKafka
观察控制台输出,确认是否收到类似以下内容:
Received message: topic - test-topic, partition - 0, offset - 5, key - null, value - HelloKafka
扩展功能建议
- 使用 JSON 格式传输对象(自定义序列化/反序列化)
- 多消费者组配置与负载均衡
- 异常处理与重试机制(
@DltHandler
,SeekToCurrentErrorHandler
) - Kafka Streams 实现实时流处理逻辑
- 配置 SSL、SASL 安全认证
- 结合 Spring Cloud Stream 构建云原生事件驱动架构
参考文档
小结
通过本篇文章,你已经掌握了在 Spring Boot 中集成 Kafka 的完整流程,包括配置、生产者、消费者以及 REST 接口调用测试。Kafka 在现代微服务架构中扮演着重要角色,掌握其集成方式将极大提升你的开发能力。
注意:确保 Kafka 服务已启动且网络可达,否则连接会失败。生产环境中请使用更完善的配置和异常处理机制。