Spring Boot 与 Kafka 的深度集成实践(二)

发布于:2025-06-10 ⋅ 阅读:(25) ⋅ 点赞:(0)

3. 生产者实现

3.1 生产者配置

在 Spring Boot 项目中,配置 Kafka 生产者主要是配置生产者工厂(ProducerFactory)和 KafkaTemplate 。生产者工厂负责创建 Kafka 生产者实例,而 KafkaTemplate 则是用于发送消息的核心组件,它封装了生产者的发送逻辑,提供了简洁易用的方法来发送消息到 Kafka 集群 。

首先,创建一个配置类,用于配置生产者工厂和 KafkaTemplate。在 Spring Boot 中,可以通过@Configuration注解将一个类标记为配置类,然后使用@Bean注解来定义需要创建的 Bean。以下是配置生产者的 Java 代码示例:


import org.apache.kafka.clients.producer.ProducerConfig;

import org.apache.kafka.common.serialization.StringSerializer;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.kafka.core.DefaultKafkaProducerFactory;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;

import java.util.Map;

@Configuration

public class KafkaProducerConfig {

@Value("${spring.kafka.bootstrap-servers}")

private String bootstrapServers;

@Bean

public Map<String, Object> producerConfigs() {

Map<String, Object> props = new HashMap<>();

// Kafka服务器地址

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

// 生产者在收到服务器的确认之前需要等待的时间,0表示不等待,1表示等待leader确认,all表示等待所有副本确认

props.put(ProducerConfig.ACKS_CONFIG, "1");

// 消息发送失败时的重试次数

props.put(ProducerConfig.RETRIES_CONFIG, 0);

// 批量发送消息时,缓冲区的大小,单位是字节

props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

// 生产者在发送消息之前等待更多消息进入缓冲区的时间,单位是毫秒

props.put(ProducerConfig.LINGER_MS_CONFIG, 1);

// 生产者用于缓存消息的总内存大小,单位是字节

props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

// 键的序列化器

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

// 值的序列化器

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

return props;

}

@Bean

public ProducerFactory<String, String> producerFactory() {

return new DefaultKafkaProducerFactory<>(producerConfigs());

}

@Bean

public KafkaTemplate<String, String> kafkaTemplate() {

return new KafkaTemplate<>(producerFactory());

}

}

在上述代码中:

  • producerConfigs方法用于配置 Kafka 生产者的属性。其中,bootstrapServers从application.properties或application.yml文件中读取 Kafka 服务器的地址;acks配置为1,表示生产者在发送消息后,只要收到 Kafka 集群中 leader 节点的确认,就认为消息发送成功;retries设置为0,表示消息发送失败时不进行重试;batchSize设置为16384字节,即当缓冲区达到这个大小后,生产者会将消息批量发送出去;lingerMs设置为1毫秒,生产者会在等待 1 毫秒后,即使缓冲区未满,也会将消息发送出去;bufferMemory设置为33554432字节,即生产者用于缓存消息的总内存大小;KEY_SERIALIZER_CLASS_CONFIG和VALUE_SERIALIZER_CLASS_CONFIG分别指定了键和值的序列化器为StringSerializer,用于将字符串类型的键和值序列化为字节数组,以便在网络中传输。
  • producerFactory方法通过DefaultKafkaProducerFactory创建了一个生产者工厂,它使用producerConfigs方法返回的配置属性来创建 Kafka 生产者实例 。
  • kafkaTemplate方法创建了一个 KafkaTemplate 实例,它依赖于producerFactory创建的生产者工厂,通过 KafkaTemplate,我们可以方便地发送消息到 Kafka 集群 。

3.2 发送消息

配置好生产者后,就可以使用 KafkaTemplate 来发送消息了。KafkaTemplate 提供了多种发送消息的方法,支持同步发送和异步发送,并且可以通过回调机制来处理消息发送的结果 。

同步发送消息

同步发送消息会阻塞当前线程,直到消息被成功发送或发送失败。以下是同步发送消息的代码示例:


import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.stereotype.Service;

@Service

public class KafkaProducerService {

private static final String TOPIC = "my-topic";

@Autowired

private KafkaTemplate<String, String> kafkaTemplate;

public void sendMessageSync(String message) {

kafkaTemplate.send(TOPIC, message).get();

}

}

在上述代码中,sendMessageSync方法使用kafkaTemplate.send(TOPIC, message)发送消息到指定的主题my-topic,然后调用get()方法阻塞当前线程,等待 Kafka 集群返回消息发送的结果。如果消息发送成功,get()方法会返回一个包含消息元数据(如分区号、偏移量等)的RecordMetadata对象;如果发送失败,get()方法会抛出异常,开发者可以通过捕获异常来处理发送失败的情况 。

异步发送消息

异步发送消息不会阻塞当前线程,生产者会立即返回,消息将在后台被发送到 Kafka 集群 。为了处理消息发送的结果,可以使用ListenableFuture和回调函数。以下是异步发送消息的代码示例:


import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.kafka.support.SendResult;

import org.springframework.stereotype.Service;

import org.springframework.util.concurrent.ListenableFuture;

import org.springframework.util.concurrent.ListenableFutureCallback;

@Service

public class KafkaProducerService {

private static final String TOPIC = "my-topic";

@Autowired

private KafkaTemplate<String, String> kafkaTemplate;

public void sendMessageAsync(String message) {

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC, message);

future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

@Override

public void onSuccess(SendResult<String, String> result) {

System.out.println("消息发送成功:" + result.getRecordMetadata());

}

@Override

public void onFailure(Throwable ex) {

System.out.println("消息发送失败:" + ex.getMessage());

}

});

}

}

在上述代码中,sendMessageAsync方法使用kafkaTemplate.send(TOPIC, message)发送消息到指定主题,该方法返回一个ListenableFuture<SendResult<String, String>>对象 。通过调用future.addCallback方法,添加了一个回调函数,当消息发送成功时,会调用onSuccess方法,在方法中可以获取到消息的元数据,如分区号、偏移量等;当消息发送失败时,会调用onFailure方法,在方法中可以获取到发送失败的异常信息,以便进行相应的处理,比如记录日志、进行重试等 。

3.3 高级特性

Kafka 生产者还提供了一些高级特性,如事务管理、自定义分区、消息序列化优化等,这些特性可以满足更复杂的业务需求 。

事务管理

Kafka 的事务功能可以确保在一个事务中发送的所有消息要么全部成功,要么全部失败,从而保证数据的一致性 。在 Spring Boot 中使用 Kafka 的事务管理,需要进行以下配置:

  1. 在生产者配置中开启事务支持,将ProducerConfig.TRANSACTIONAL_ID_CONFIG属性设置为一个唯一的事务 ID 。
  1. 使用@EnableKafkaTransaction注解开启事务支持 。
  1. 在发送消息的方法上使用@Transactional注解来定义事务边界 。

以下是配置事务管理的代码示例:


import org.apache.kafka.clients.producer.ProducerConfig;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.kafka.annotation.EnableKafkaTransaction;

import org.springframework.kafka.core.DefaultKafkaProducerFactory;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.kafka.core.ProducerFactory;

import org.springframework.transaction.annotation.EnableTransactionManagement;

import java.util.HashMap;

import java.util.Map;

@Configuration

@EnableKafkaTransaction

@EnableTransactionManagement

public class KafkaTransactionConfig {

@Value("${spring.kafka.bootstrap-servers}")

private String bootstrapServers;

@Value("${spring.kafka.producer.transaction-id-prefix}")

private String transactionIdPrefix;

@Bean

public Map<String, Object> producerConfigs() {

Map<String, Object> props = new HashMap<>();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

props.put(ProducerConfig.ACKS_CONFIG, "all");

props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

// 开启事务支持,设置事务ID前缀

props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionIdPrefix + "-" + System.currentTimeMillis());

return props;

}

@Bean

public ProducerFactory<String, String> producerFactory() {

return new DefaultKafkaProducerFactory<>(producerConfigs());

}

@Bean

public KafkaTemplate<String, String> kafkaTemplate() {

KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());

kafkaTemplate.setTransactionIdPrefix(transactionIdPrefix);

return kafkaTemplate;

}

}

在上述代码中:

  • ProducerConfig.TRANSACTIONAL_ID_CONFIG属性设置为transactionIdPrefix + "-" + System.currentTimeMillis(),其中transactionIdPrefix从配置文件中读取,用于生成唯一的事务 ID 。
  • 使用@EnableKafkaTransaction和@EnableTransactionManagement注解分别开启 Kafka 事务支持和 Spring 的事务管理 。

在发送消息的服务类中,可以使用@Transactional注解来定义事务边界,如下所示:


import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.stereotype.Service;

import org.springframework.transaction.annotation.Transactional;

@Service

public class KafkaTransactionService {

private static final String TOPIC = "my-topic";

@Autowired

private KafkaTemplate<String, String> kafkaTemplate;

@Transactional

public void sendTransactionalMessage(String message1, String message2) {

kafkaTemplate.send(TOPIC, message1);

// 模拟一个可能会失败的操作,比如抛出异常

if (Math.random() > 0.5) {

throw new RuntimeException("模拟发送失败");

}

kafkaTemplate.send(TOPIC, message2);

}

}

在sendTransactionalMessage方法中,使用@Transactional注解定义了事务边界。如果在事务中发送消息时发生异常,比如Math.random() > 0.5时抛出RuntimeException,则事务会回滚,之前发送的消息也会被撤销,从而保证了数据的一致性 。

自定义分区

默认情况下,Kafka 生产者会根据消息的键(Key)的哈希值来决定消息发送到哪个分区 。在某些场景下,可能需要根据业务逻辑来自定义分区策略,比如按照用户 ID、订单 ID 等进行分区 。

要实现自定义分区,需要实现org.apache.kafka.clients.producer.Partitioner接口,并在生产者配置中指定自定义的分区器 。以下是自定义分区器的代码示例:


import org.apache.kafka.clients.producer.Partitioner;

import org.apache.kafka.common.Cluster;

import org.apache.kafka.common.PartitionInfo;

import java.util.List;

import java.util.Map;

public class CustomPartitioner implements Partitioner {

@Override

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

// 根据业务逻辑自定义分区策略,这里简单示例根据key的第一个字符进行分区

if (key != null) {

String keyStr = (String) key;

int partition = Math.abs(keyStr.charAt(0)) % cluster.partitionsForTopic(topic).size();

return partition;

}

// 如果key为null,则使用默认分区策略

return 0;

}

@Override

public void close() {

// 关闭分区器时的操作,一般为空

}

@Override

public void configure(Map<String, ?> configs) {

// 配置分区器时的操作,一般为空

}

}

在上述代码中,CustomPartitioner实现了Partitioner接口,重写了partition方法。在partition方法中,根据消息的键的第一个字符的绝对值对分区数量取模,来决定消息发送到哪个分区。如果键为null,则返回默认分区0 。

然后,在生产者配置中指定自定义的分区器:


import org.apache.kafka.clients.producer.ProducerConfig;

import org.apache.kafka.common.serialization.StringSerializer;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.kafka.core.DefaultKafkaProducerFactory;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;

import java.util.Map;

@Configuration

public class KafkaProducerConfig {

@Value("${spring.kafka.bootstrap-servers}")

private String bootstrapServers;

@Bean

public Map<String, Object> producerConfigs() {

Map<String, Object> props = new HashMap<>();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

props.put(ProducerConfig.ACKS_CONFIG, "1");

props.put(ProducerConfig.RETRIES_CONFIG, 0);

props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

props.put(ProducerConfig.LINGER_MS_CONFIG, 1);

props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

// 指定自定义分区器

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);

return props;

}

@Bean

public ProducerFactory<String, String> producerFactory() {

return new DefaultKafkaProducerFactory<>(producerConfigs());

}

@Bean

public KafkaTemplate<String, String> kafkaTemplate() {

return new KafkaTemplate<>(producerFactory());

}

}

在producerConfigs方法中,通过props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class)指定了自定义的分区器为CustomPartitioner 。这样,生产者在发送消息时就会使用自定义的分区策略 。

消息序列化优化

Kafka 默认提供了一些序列化器,如StringSerializer、ByteArraySerializer等,但在处理复杂对象时,可能需要自定义序列化器来提高序列化和反序列化的效率 。例如,使用 Apache Avro、Protobuf 等序列化框架,可以实现更高效的数据序列化和反序列化,同时减少数据传输的大小 。

以使用 Apache Avro 为例,首先需要定义 Avro 的 Schema 文件,例如定义一个用户信息的 Schema:


{

"namespace": "com.example",

"type": "record",

"name": "User",

"fields": [

{"name": "id", "type": "int"},

{"name": "name", "type": "string"},

{"name": "age", "type": "int"}

]

}

然后,使用 Avro 的工具生成 Java 类:


avro-tools compile schema user.avsc .

生成的 Java 类包含了用户信息的字段和对应的 getter、setter 方法 。

接下来,实现自定义的 Avro 序列化器:


import org.apache.avro.Schema;

import org.apache.avro.generic.GenericDatumWriter;

import org.apache.avro.generic.GenericRecord;

import org.apache.avro.io.DatumWriter;

import org.apache.avro.io.Encoder;

import org.apache.avro.io.EncoderFactory;

import org.apache.kafka.common.errors.SerializationException;

import org.apache.kafka.common.serialization.Serializer;

import java.io.ByteArrayOutputStream;

import java.io.IOException;

import java.util.Map;

public class AvroSerializer implements Serializer<GenericRecord> {

private Schema schema;

@Override

public void configure(Map<String, ?> configs, boolean isKey) {

// 从配置中获取Schema字符串,并解析为Schema对象

String schemaStr = (String) configs.get("schema");

this.schema = new Schema.Parser().parse(schemaStr);

## 4. 消费者实现

### 4.1 消费者配置

在Spring Boot中配置Kafka消费者,主要涉及配置消费者工厂(ConsumerFactory)和Kafka监听器容器工厂(KafkaListenerContainerFactory)。消费者工厂负责创建Kafka消费者实例,而监听器容器工厂则用于创建监听Kafka主题的容器 。

首先,创建一个配置类,用于配置消费者工厂和监听器容器工厂。以下是配置消费者的Java代码示例:

```java

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.common.serialization.StringDeserializer;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.kafka.annotation.EnableKafka;

import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;

import org.springframework.kafka.core.ConsumerFactory;

import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;

import java.util.Map;

@Configuration

@EnableKafka

public class KafkaConsumerConfig {

@Value("${spring.kafka.bootstrap-servers}")

private String bootstrapServers;

@Value("${spring.kafka.consumer.group-id}")

private String groupId;

@Bean

public Map<String, Object> consumerConfigs() {

Map<String, Object> props = new HashMap<>();

// Kafka服务器地址

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

// 消费者组ID

props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

// 自动提交偏移量,true表示自动提交,false表示手动提交

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

// 自动提交偏移量的时间间隔,单位是毫秒

props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);

// 当消费者首次启动或找不到上次的消费偏移量时,决定从哪里开始消费消息,earliest表示从最早的消息开始消费,latest表示从最新的消息开始消费

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

// 消费者键的反序列化器

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

// 消费者值的反序列化器

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

return props;

}

@Bean

public ConsumerFactory<String, String> consumerFactory() {

return new DefaultKafkaConsumerFactory<>(consumerConfigs());

}

@Bean

public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();

factory.setConsumerFactory(consumerFactory());

// 设置并发消费者数量,默认为1,可以根据需要调整

factory.setConcurrency(3);

return factory;

}

}

在上述代码中:

  • consumerConfigs方法用于配置 Kafka 消费者的属性。其中,bootstrapServers和groupId分别从配置文件中读取 Kafka 服务器地址和消费者组 ID;ENABLE_AUTO_COMMIT_CONFIG设置为true,表示开启自动提交偏移量功能,消费者在消费消息后会自动将偏移量提交到 Kafka 集群;AUTO_COMMIT_INTERVAL_MS_CONFIG设置为1000毫秒,即每隔 1 秒自动提交一次偏移量;AUTO_OFFSET_RESET_CONFIG设置为earliest,表示当消费者首次启动或找不到上次的消费偏移量时,从最早的消息开始消费;KEY_DESERIALIZER_CLASS_CONFIG和VALUE_DESERIALIZER_CLASS_CONFIG分别指定了键和值的反序列化器为StringDeserializer,用于将字节数组反序列化为字符串 。
  • consumerFactory方法通过DefaultKafkaConsumerFactory创建了一个消费者工厂,它使用consumerConfigs方法返回的配置属性来创建 Kafka 消费者实例 。
  • kafkaListenerContainerFactory方法创建了一个并发的 Kafka 监听器容器工厂,它依赖于consumerFactory创建的消费者工厂。通过setConcurrency(3)设置了并发消费者的数量为 3,这意味着可以同时有 3 个消费者线程从 Kafka 主题中拉取消息进行消费,提高消费效率 。