基于SpringBoot3集成Kafka集群

发布于:2025-07-15 ⋅ 阅读:(19) ⋅ 点赞:(0)

1. build.gradle依赖引入

implementation 'org.springframework.kafka:spring-kafka:3.2.0'

2. 新增kafka-log.yml文件

在resource/config下面新增kafka-log.yml,配置主题与消费者组

# Kafka消费者群组
kafka:
  consumer:
    group:
      log-data: log-data-group
    topic:
      log-data: log-data-topic
    auto-startup: false

加载自定义yml文件

@Configuration
public class YmlConfiguration {

    @Bean
    public PropertySourcesPlaceholderConfigurer properties() {
        PropertySourcesPlaceholderConfigurer configurer = new PropertySourcesPlaceholderConfigurer();
        YamlPropertiesFactoryBean yaml = new YamlPropertiesFactoryBean();
        yaml.setResources(new ClassPathResource[]{
                new ClassPathResource("config/kafka-log.yml")
        });
        configurer.setProperties(yaml.getObject());
        return configurer;
    }
}

3. application.yml文件配置

spring:
  kafka:
    bootstrap-servers: 192.168.0.81:9092,192.168.0.82:9092,192.168.0.83:9092
    producer:
      retries: 0
      batch-size: 16384
      buffer-memory: 254288
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      auto-topic-creation:
        auto-create: true
      properties:
        linger.ms: 1
        session.timeout.ms: 15000
        sasl.mechanism: PLAIN
        security.protocol: SASL_PLAINTEXT
        sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="your-password";
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: log-data-group
      auto-offset-reset: latest
      properties:
        session.timeout.ms: 15000
        sasl.mechanism: PLAIN
        security.protocol: SASL_PLAINTEXT
        sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required  username="admin"  password="your-password";
# 按需在不同环境配置值,如开发环境默认不启动
kafka:
  consumer:
    auto-startup: false

4. 生产者实现

@Service
@Slf4j
public class KafkaProducer {

    private final KafkaTemplate<Integer, String> kafkaTemplate;

    public KafkaProducer(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String topic, String data) {
        CompletableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(topic, data);
        future.whenComplete((sendResult, ex) -> {
            if (ex != null) {
                log.error("Kafka send message error = {}, topic = {}, data = {}", ex.getMessage(), topic, data);
            } else {
                // Handle the successful send
                System.out.println("Message sent successfully: " + sendResult);
            }
        });
    }
}

5. 消费者实现

@Component
public class KafkaConsumerGroupManager {

    private KafkaAdmin kafkaAdmin;
    private ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory;

    public KafkaConsumerGroupManager(KafkaAdmin kafkaAdmin, ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory) {
        this.kafkaAdmin = kafkaAdmin;
        this.kafkaListenerContainerFactory = kafkaListenerContainerFactory;
    }

    public void ensureConsumerGroupExists(String groupId) {
        try {
            // 获取 AdminClient
            AdminClient adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties());

            // 检查消费者组是否存在
            ListConsumerGroupsResult listConsumerGroupsResult = adminClient.listConsumerGroups();
            Collection<org.apache.kafka.clients.admin.ConsumerGroupListing> consumerGroupListings = listConsumerGroupsResult.all().get();

            boolean groupExists = consumerGroupListings.stream()
                    .anyMatch(consumerGroupListing -> consumerGroupListing.groupId().equals(groupId));

            if (!groupExists) {
                // 如果不存在,则创建消费者组
                kafkaListenerContainerFactory.getContainerProperties().setGroupId(groupId);
            }
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException("Failed to check consumer group existence", e);
        }
    }
}
@Service
@Slf4j
public class KafkaConsumer {

    private ElasticsearchOperations elasticsearchOperations206;

    public KafkaConsumer(ElasticsearchOperations elasticsearchOperations206) {
        this.elasticsearchOperations206 = elasticsearchOperations206;
    }

    /**
     * 日志数据消费
     *
     * @param message
     */
    @KafkaListener(topics = {"${kafka.consumer.topic.log-data}"}, groupId = "${kafka.consumer.group.log-data}", autoStartup = "${kafka.consumer.auto-startup}")
    public void consumer(String message) {
        this.bulkIndexJsonData(message);
    }

    public void bulkIndexJsonData(String jsonData) {
        List<IndexQuery> queries = new ArrayList<>();
        IndexQuery query = new IndexQuery();
        query.setSource(jsonData);
        query.setOpType(IndexQuery.OpType.INDEX);
        queries.add(query);
        elasticsearchOperations206.bulkIndex(queries, IndexCoordinates.of("log"));
    }
}

OK, 至此完毕。在某次集群宕机后,我们发现日志无法查询,经排查,是因为最初配置了auto-offset-reset: earliest 导致从头开始重新消费,幸好ES做了幂等性处理


网站公告

今日签到

点亮在社区的每一天
去签到