1. 安装Kafka
下载Kafka:从Kafka官网下载最新版本的Kafka。
解压并启动:
解压Kafka文件后,进入
bin
目录。启动ZooKeeper:
./zookeeper-server-start.sh ../config/zookeeper.properties
。启动Kafka:
./kafka-server-start.sh ../config/server.properties
。确认启动成功后,Kafka服务即可使用。
2. 创建Spring Boot项目
在Spring Initializr创建一个新项目,选择需要的依赖(如Spring Web和Spring Kafka)。
下载并解压项目,导入到IDE中。
3. 添加Kafka依赖
在pom.xml
中添加以下依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
这个依赖会自动配置Spring Kafka的相关组件。
4. 配置Kafka
在application.yml
中添加Kafka的配置:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: my-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
这里配置了Kafka服务器地址、消费者组、序列化器等。
5. 创建Kafka生产者
创建生产者配置类:
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
创建生产者服务类:
@Service
public class KafkaProducerService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message).addCallback(
success -> System.out.println("Message sent successfully: " + message),
failure -> System.err.println("Failed to send message: " + failure.getMessage())
);
}
}
通过KafkaTemplate
发送消息。
6. 创建Kafka消费者
创建消费者服务类:
@Service
public class KafkaConsumerService {
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void consume(String message) {
System.out.println("Received message: " + message);
}
}
使用@KafkaListener
注解监听指定主题并接收消息。
7. 测试应用
创建一个控制器,用于发送消息:
@RestController
public class KafkaController {
private final KafkaProducerService kafkaProducerService;
public KafkaController(KafkaProducerService kafkaProducerService) {
this.kafkaProducerService = kafkaProducerService;
}
@GetMapping("/send")
public String sendMessage(@RequestParam String message) {
kafkaProducerService.sendMessage("my-topic", message);
return "Message sent";
}
}
启动Spring Boot应用,通过访问
http://localhost:8080/send?message=HelloKafka
发送消息。
通过以上步骤,你可以在Spring Boot中成功集成并使用Kafka。