Spring Boot整合Kafka的详细步骤

发布于:2025-04-17 ⋅ 阅读:(32) ⋅ 点赞:(0)

1. 安装Kafka

  1. 下载Kafka:从Kafka官网下载最新版本的Kafka。

  2. 解压并启动

    • 解压Kafka文件后,进入bin目录。

    • 启动ZooKeeper:./zookeeper-server-start.sh ../config/zookeeper.properties

    • 启动Kafka:./kafka-server-start.sh ../config/server.properties

    • 确认启动成功后,Kafka服务即可使用。

2. 创建Spring Boot项目

  1. 在Spring Initializr创建一个新项目,选择需要的依赖(如Spring Web和Spring Kafka)。

  2. 下载并解压项目,导入到IDE中。

3. 添加Kafka依赖

pom.xml中添加以下依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

这个依赖会自动配置Spring Kafka的相关组件。

4. 配置Kafka

application.yml中添加Kafka的配置:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: my-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

这里配置了Kafka服务器地址、消费者组、序列化器等。

5. 创建Kafka生产者

  1. 创建生产者配置类

@Configuration
public class KafkaProducerConfig {
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}
  1. 创建生产者服务类

@Service
public class KafkaProducerService {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message).addCallback(
            success -> System.out.println("Message sent successfully: " + message),
            failure -> System.err.println("Failed to send message: " + failure.getMessage())
        );
    }
}

通过KafkaTemplate发送消息。

6. 创建Kafka消费者

  1. 创建消费者服务类

@Service
public class KafkaConsumerService {
    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void consume(String message) {
        System.out.println("Received message: " + message);
    }
}

使用@KafkaListener注解监听指定主题并接收消息。

7. 测试应用

  1. 创建一个控制器,用于发送消息:

@RestController
public class KafkaController {
    private final KafkaProducerService kafkaProducerService;

    public KafkaController(KafkaProducerService kafkaProducerService) {
        this.kafkaProducerService = kafkaProducerService;
    }

    @GetMapping("/send")
    public String sendMessage(@RequestParam String message) {
        kafkaProducerService.sendMessage("my-topic", message);
        return "Message sent";
    }
}
  1. 启动Spring Boot应用,通过访问http://localhost:8080/send?message=HelloKafka发送消息。

通过以上步骤,你可以在Spring Boot中成功集成并使用Kafka。

 

 

 

 


网站公告

今日签到

点亮在社区的每一天
去签到