在 Spring 中使用 Apache Kafka 的配置主要涉及 Spring Boot Starter for Kafka
,而开启 KRaft 模式(Kafka 的元数据管理新模式,替代 ZooKeeper)需要特定的 Kafka Broker 配置。以下是详细步骤:
一、Spring 中集成 Kafka 的配置
1. 添加依赖
<!-- Maven 依赖 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- 如果是 Spring Boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-kafka</artifactId>
</dependency>
2. 基础配置(application.yml/properties)
spring:
kafka:
# Kafka Broker 地址
bootstrap-servers: localhost:9092
# 生产者配置
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all # 高可靠性配置
# 消费者配置
consumer:
group-id: my-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3. 发送消息(生产者)
@RestController
public class KafkaProducerController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@PostMapping("/send")
public void sendMessage(@RequestParam String topic, @RequestParam String message) {
kafkaTemplate.send(topic, message);
}
}
4. 接收消息(消费者)
@Component
public class KafkaConsumer {
@KafkaListener(topics = "my-topic")
public void listen(String message) {
System.out.println("Received message: " + message);
}
}
二、开启 Kafka 的 KRaft 模式
KRaft 模式(Kafka Raft Metadata Mode)是 Kafka 从 2.8 版本开始引入的元数据管理模式,用于替代 ZooKeeper。以下是配置步骤:
1. 下载 Kafka
确保使用 Kafka 3.0+ 版本(推荐 3.4+),并解压到本地。
2. 修改 Kafka 配置
编辑 config/kraft/server.properties
,配置示例:
# 启用 KRaft 模式
process.roles=broker,controller
node.id=1
# 元数据存储路径
log.dirs=/tmp/kraft-combined-logs
# 监听地址
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
advertised.listeners=PLAINTEXT://localhost:9092
# Controller 配置
controller.listener.names=CONTROLLER
controller.quorum.voters=1@localhost:9093
3. 生成集群 ID 并初始化
# 生成集群 UUID
bin/kafka-storage.sh generate-cluster-id --format
# 输出示例:CLUSTER_ID="ABC123XYZ"
# 格式化存储目录
bin/kafka-storage.sh format \
--cluster-id ABC123XYZ \
--config config/kraft/server.properties \
--ignore-formatted
# 启动 Kafka
bin/kafka-server-start.sh config/kraft/server.properties
4. 验证 KRaft 模式
- 检查日志:若启动成功且无 ZooKeeper 依赖,则 KRaft 已生效。
- 创建 Topic 测试:
bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
三、Spring 集成 KRaft 模式的 Kafka
KRaft 模式对客户端透明,Spring 应用无需特殊配置,只需确保 bootstrap-servers
指向正确的地址即可。
注意事项:
版本兼容性:
- Kafka Broker 需 3.0+。
- Spring Kafka 建议使用 3.0+(对应 Spring Boot 3.0+)。
配置覆盖(可选):
spring:
kafka:
properties:
# 调整客户端参数(如需要)
socket.connection.setup.timeout.ms: 30000
四、示例项目结构
src/
├── main/
│ ├── java/
│ │ └── com/example/demo/
│ │ ├── KafkaProducerController.java
│ │ └── KafkaConsumer.java
│ └── resources/
│ └── application.yml
总结
- Spring Kafka 配置:通过
spring-boot-starter-kafka
快速集成。 - KRaft 模式:需在 Kafka Broker 端配置
process.roles
并初始化元数据存储,客户端无需修改。
如果需要本地测试 KRaft 模式,可以使用 Docker 镜像(如 bitnami/kafka:3.4
)或参考 Kafka KRaft 官方文档。
关于 Kafka与Zookeeper与关系及使用依赖,以及KRaft模式的解释,可以点击这里我的另外一篇博文了解