(七)消息队列-Kafka 序列化avro(传递)
客从远方来,遗我双鲤鱼。呼儿烹鲤鱼,中有尺素书。
——佚名《饮马长城窟行》
本文已同步CSDN、掘金平台、知乎等多个平台,图片依然保持最初发布的水印(如CSDN水印)。(以后属于本人原创均以新建状态在多个平台分享发布)
前言
多年前,由于工作的性质,发现这系列没有写完,想了想,做人做事还是要有始有终。🤣实在是借口太多了,太不像话了…由于时间过得太久了,这篇开始,可能很多技术以最新或最近的几个版本为主了。
问题背景
在Kafka中,生产者与消费者之间传输消息时,通常需要对数据进行序列化和反序列化。常见的序列化方式如JSON或String存在以下问题:
- 数据冗余:字段名重复存储,占用带宽;
- 兼容性差:新增或删除字段时容易导致上下游解析失败;
- 类型安全缺失:动态解析易引发运行时错误。
而Avro作为一种高效的二进制序列化框架,通过Schema定义数据结构,可实现紧凑存储、动态兼容性和强类型校验,成为Kafka生态中推荐的序列化方案27。
核心原理
Schema驱动
Avro要求所有数据必须与预定义的Schema文件(.avsc)匹配。Schema以JSON格式描述数据结构,例如:{ "type": "record", "name": "User", "namespace": "com.example.avro", "fields": [ {"name": "id", "type": "int"}, {"name": "name", "type": "string"} ] }
然后使用
avro-maven-plugin
生成 Java 类:<plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.11.0</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>schema</goal> </goals> </execution> </executions> </plugin>
执行
mvn clean compile
后,com.example.avro.User
类会被自动生成。生产者与消费者需共享同一Schema,确保序列化与反序列化的一致性。
二进制编码
Avro将数据转换为紧凑的二进制格式,相比JSON减少约30%-50%的存储与传输开销。例如,整型字段直接以二进制存储,无需字段名冗余7。Schema Registry
为实现Schema动态管理,通常搭配Schema Registry(如Confluent或Apicurio)使用。其核心功能包括:- Schema版本控制与兼容性检查;
- 通过唯一ID关联消息与Schema,避免传输完整Schema带来的性能损耗。
实现步骤
以下以Java代码为例,展示Kafka集成Avro的配置方法:
1. 添加依赖
<dependencies>
<!-- Spring Kafka 依赖-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- Avro 依赖 -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
<!-- Schema Registry 依赖 -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.2.1</version>
</dependency>
</dependencies>
运行 HTML
2. 配置生产者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", KafkaAvroSerializer.class.getName());
props.put("schema.registry.url", "http://localhost:8081"); // Schema Registry地址
Producer<String, GenericRecord> producer = new KafkaProducer<>(props);
// 构建Avro消息
GenericRecord user = new GenericData.Record(schema);
user.put("id", 1);
user.put("name", "Alice");
producer.send(new ProducerRecord<>("user-topic", user));
------ SpringBoot框架 直接用配置application.yml 和生产者服务类--------------
spring:
kafka:
bootstrap-servers: localhost:9092
properties:
schema.registry.url: http://localhost:8081
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
@Service
public class UserProducer {
private final KafkaTemplate<String, User> kafkaTemplate;
@Value("${kafka.topic.user}")
private String topic;
public UserProducer(KafkaTemplate<String, User> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendUser(User user) {
kafkaTemplate.send(topic, user.getId().toString(), user);
}
}
在 Spring Boot 启动后,我们可以使用以下代码发送一个 User 消息:
User user = User.newBuilder()
.setId(1)
.setName("Alice")
.build();
userProducer.sendUser(user);
控制台应该能够看到消费者成功接收到 User 数据
3. 配置消费者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "avro-consumer");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", KafkaAvroDeserializer.class.getName());
props.put("schema.registry.url", "http://localhost:8081");
Consumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("user-topic"));
while (true) {
ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, GenericRecord> record : records) {
System.out.println("Received: " + record.value().get("name"));
}
}
------ SpringBoot框架 直接用配置application.yml 和消费者服务类--------------
在 application.yml 中配置消费者参数:
spring:
kafka:
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
properties:
specific.avro.reader: true
然后编写 Kafka 消费者代码:
@Service
@KafkaListener(topics = "user_topic", groupId = "user_group")
public class UserConsumer {
@KafkaHandler
public void consume(User user) {
System.out.println("Received user: " + user.getName());
}
}
常见问题与解决方案
- Schema兼容性错误
- 现象:生产者更新Schema后消费者无法解析旧数据。
- 解决:在Schema Registry中配置兼容性策略(如
BACKWARD
),允许新增字段并设置默认值7。
- ClassNotFoundException
- 现象:反序列化时提示Avro生成的类不存在。
- 解决:通过Maven插件
avro-maven-plugin
自动生成Java类,并确保生成路径在编译范围内2。
- 性能瓶颈
- 现象:高吞吐场景下序列化延迟较高。
- 优化:复用
DatumWriter
和DatumReader
实例,避免重复初始化开销7。
总结
Avro通过Schema定义与二进制编码,为Kafka提供了高效、类型安全的序列化方案。结合Schema Registry可实现动态兼容性管理,适用于复杂业务场景下的数据演进需求。实践中需注意Schema版本控制与性能调优,具体工具链配置可参考Confluent官方文档27。
引用说明
- 代码结构参考自SpringBoot RestTemplate配置方案,通过替换默认组件实现功能增强。
- Schema兼容性问题分析借鉴了MAT工具中内存对象关联性的排查思路。
后续
下期预告,敬请关注:
(八)消息队列-Kafka 生产者