简介
rdkafka 是 Rust 语言的 Apache Kafka 客户端库,它基于 librdkafka(一个用 C 语言编写的高性能 Kafka 客户端库)构建。rdkafka 提供了与 Kafka 集群交互的完整功能,包括生产者、消费者、管理员操作等。
本文档将详细介绍如何使用 rdkafka 进行各种 Kafka 操作,并提供清晰的代码示例。
目录
安装与配置
添加依赖
在 Cargo.toml
文件中添加 rdkafka 依赖:
[dependencies]
rdkafka = { version = "0.36", features = ["cmake-build", "ssl", "sasl"] }
tokio = { version = "1.0", features = ["full"] }
系统依赖
在 Windows 上,需要安装以下依赖:
GNU toolchain:使用 MSYS2 安装 MinGW
- 下载并安装 MSYS2:https://www.msys2.org/
- 更新依赖:
pacman -Syu
- 安装工具链:
pacman -S --needed base-devel mingw-w64-x86_64-toolchain
CMake:从 https://cmake.org/download/ 下载并安装
其他可选依赖:
- zlib:压缩支持
- libssl-dev:SSL 支持
- libsasl2-dev:SASL 支持
- libzstd-dev:Zstandard 压缩支持
在 Linux 上,可以使用包管理器安装:
# Ubuntu/Debian
sudo apt-get install build-essential cmake libssl-dev libsasl2-dev libzstd-dev
# CentOS/RHEL
sudo yum install gcc-c++ cmake openssl-devel cyrus-sasl-devel libzstd-devel
基本概念
Kafka 架构
- Producer:消息生产者,向 Kafka broker 发送消息的客户端
- Consumer:消息消费者,从 Kafka broker 读取消息的客户端
- Consumer Group:消费者组,由多个 consumer 组成
- Broker:一台 Kafka 服务器就是一个 broker
- Topic:消息类别,可以理解为一个队列
- Partition:为了实现扩展性,一个 topic 可以分为多个 partition
- Replica:副本,一个 topic 的每个分区都有若干个副本
- Leader:每个分区多个副本的"主",生产者发送数据的对象
- Follower:每个分区多个副本中的"从",实时从 Leader 中同步数据
rdkafka 组件
rdkafka 提供了以下主要组件:
- FutureProducer:异步生产者
- BaseProducer:同步生产者
- StreamConsumer:基于流的消费者
- BaseConsumer:基础消费者
- AdminClient:管理员客户端,用于管理 Kafka 集群
生产者 (Producer)
创建生产者
use rdkafka::config::ClientConfig;
use rdkafka::producer::{FutureProducer, BaseProducer};
use rdkafka::producer::FutureRecord;
use std::time::Duration;
async fn create_producer(brokers: &str) -> FutureProducer {
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("message.timeout.ms", "5000")
.set("acks", "all")
.create()
.expect("Producer creation error");
producer
}
发送消息(异步)
use rdkafka::producer::FutureRecord;
use rdkafka::message::ToBytes;
async fn send_message_async(
producer: &FutureProducer,
topic: &str,
key: Option<&str>,
payload: Option<&[u8]>,
) -> Result<(), rdkafka::error::KafkaError> {
let record = FutureRecord::to(topic)
.key(key)
.payload(payload);
let delivery_status = producer.send(record, Duration::from_secs(0)).await;
match delivery_status {
Ok(_) => {
println!("Message sent successfully");
Ok(())
}
Err((e, _)) => {
eprintln!("Failed to send message: {}", e);
Err(e)
}
}
}
发送消息(同步)
use rdkafka::producer::BaseProducer;
use rdkafka::producer::BaseRecord;
use rdkafka::message::ToBytes;
fn send_message_sync(
producer: &BaseProducer,
topic: &str,
key: Option<&str>,
payload: Option<&[u8]>,
) -> Result<(), rdkafka::error::KafkaError> {
let record = BaseRecord::to(topic)
.key(key)
.payload(payload);
producer.send(record)?;
// 确保所有消息都已发送
producer.flush(Duration::from_secs(1));
Ok(())
}
批量发送消息
use rdkafka::producer::FutureRecord;
use futures::future::try_join_all;
async fn send_messages_batch(
producer: &FutureProducer,
topic: &str,
messages: Vec<(Option<&str>, Option<&[u8]>)>,
) -> Result<(), rdkafka::error::KafkaError> {
let futures = messages.into_iter().map(|(key, payload)| {
let record = FutureRecord::to(topic)
.key(key)
.payload(payload);
producer.send(record, Duration::from_secs(0))
});
let results = try_join_all(futures).await;
match results {
Ok(_) => {
println!("All messages sent successfully");
Ok(())
}
Err(e) => {
eprintln!("Failed to send some messages: {}", e);
Err(e.into())
}
}
}
带消息头的消息发送
use rdkafka::message::{Header, OwnedHeaders};
use rdkafka::producer::FutureRecord;
async fn send_message_with_headers(
producer: &FutureProducer,
topic: &str,
key: Option<&str>,
payload: Option<&[u8]>,
headers: Vec<(String, Vec<u8>)>,
) -> Result<(), rdkafka::error::KafkaError> {
let mut owned_headers = OwnedHeaders::new();
for (key, value) in headers {
owned_headers = owned_headers.add(Header::new(key, value));
}
let record = FutureRecord::to(topic)
.key(key)
.payload(payload)
.headers(owned_headers);
let delivery_status = producer.send(record, Duration::from_secs(0)).await;
match delivery_status {
Ok(_) => {
println!("Message with headers sent successfully");
Ok(())
}
Err((e, _)) => {
eprintln!("Failed to send message with headers: {}", e);
Err(e)
}
}
}
指定分区的消息发送
use rdkafka::producer::FutureRecord;
async fn send_message_to_partition(
producer: &FutureProducer,
topic: &str,
partition: i32,
key: Option<&str>,
payload: Option<&[u8]>,
) -> Result<(), rdkafka::error::KafkaError> {
let record = FutureRecord::to(topic)
.partition(partition)
.key(key)
.payload(payload);
let delivery_status = producer.send(record, Duration::from_secs(0)).await;
match delivery_status {
Ok(_) => {
println!("Message sent to partition {} successfully", partition);
Ok(())
}
Err((e, _)) => {
eprintln!("Failed to send message to partition {}: {}", partition, e);
Err(e)
}
}
}
消费者 (Consumer)
创建消费者
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{StreamConsumer, BaseConsumer, CommitMode, Consumer};
use rdkafka::consumer::stream_consumer::StreamConsumer as KafkaStreamConsumer;
use std::time::Duration;
async fn create_consumer(brokers: &str, group_id: &str, topics: &[&str]) -> StreamConsumer {
let consumer: StreamConsumer = ClientConfig::new()
.set("group.id", group_id)
.set("bootstrap.servers", brokers)
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "false")
.create()
.expect("Consumer creation failed");
consumer.subscribe(&topics.to_vec())
.expect("Can't subscribe to specified topics");
consumer
}
基础消费者(同步)
use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::message::Message;
use std::time::Duration;
fn consume_messages_sync(consumer: &BaseConsumer, timeout_ms: u64) {
loop {
match consumer.poll(Duration::from_millis(timeout_ms)) {
Some(Ok(msg)) => {
match msg.view() {
Some(payload) => {
println!("Received message: {:?}", payload);
// 手动提交偏移量
consumer.commit_message(&msg, CommitMode::Sync)
.expect("Failed to commit message");
}
None => {
println!("Received empty message");
}
}
}
Some(Err(e)) => {
eprintln!("Error while consuming: {}", e);
}
None => {
// 超时,没有消息
}
}
}
}
流式消费者(异步)
use rdkafka::consumer::stream_consumer::StreamConsumer;
use rdkafka::message::Message;
use futures::StreamExt;
async fn consume_messages_stream(consumer: &StreamConsumer) {
let mut message_stream = consumer.stream();
while let Some(result) = message_stream.next().await {
match result {
Ok(msg) => {
match msg.view() {
Some(payload) => {
println!("Received message: {:?}", payload);
// 手动提交偏移量
consumer.commit_message(&msg, CommitMode::Async)
.expect("Failed to commit message");
}
None => {
println!("Received empty message");
}
}
}
Err(e) => {
eprintln!("Error while consuming: {}", e);
}
}
}
}
从特定分区和偏移量消费
use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::TopicPartitionList;
use rdkafka::Offset;
use std::time::Duration;
fn consume_from_offset(
consumer: &BaseConsumer,
topic: &str,
partition: i32,
offset: i64,
) {
let mut tpl = TopicPartitionList::new();
tpl.add_partition_offset(topic, partition, Offset::Offset(offset))
.expect("Failed to add partition offset");
consumer.assign(&tpl)
.expect("Failed to assign partition");
loop {
match consumer.poll(Duration::from_millis(100)) {
Some(Ok(msg)) => {
match msg.view() {
Some(payload) => {
println!("Received message from partition {} at offset {}: {:?}",
partition, msg.offset(), payload);
}
None => {
println!("Received empty message");
}
}
}
Some(Err(e)) => {
eprintln!("Error while consuming: {}", e);
}
None => {
// 超时,没有消息
}
}
}
}
获取消费者位置
use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::TopicPartitionList;
fn get_consumer_position(consumer: &BaseConsumer, topic: &str, partition: i32) -> i64 {
let mut tpl = TopicPartitionList::new();
tpl.add_partition(topic, partition);
let position = consumer.position(&tpl)
.expect("Failed to get consumer position");
for elem in position.elements() {
if elem.topic() == topic && elem.partition() == partition {
return elem.offset().to_raw();
}
}
-1
}
获取水印偏移量
use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::TopicPartitionList;
fn get_watermark_offsets(
consumer: &BaseConsumer,
topic: &str,
partition: i32,
) -> (i64, i64) {
let mut tpl = TopicPartitionList::new();
tpl.add_partition(topic, partition);
let (low, high) = consumer.get_watermark_offsets(&tpl, Duration::from_secs(1))
.expect("Failed to get watermark offsets");
(low, high)
}
消费者组管理
use rdkafka::consumer::{StreamConsumer, ConsumerGroupMetadata};
use rdkafka::consumer::stream_consumer::StreamConsumer as KafkaStreamConsumer;
fn get_consumer_group_metadata(consumer: &StreamConsumer) -> ConsumerGroupMetadata {
consumer.group_metadata()
.expect("Failed to get consumer group metadata")
}
管理员 (Admin)
创建管理员客户端
use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication, AlterConfig, ConfigSource};
use rdkafka::config::ClientConfig;
use std::time::Duration;
async fn create_admin_client(brokers: &str) -> AdminClient {
let admin_client: AdminClient = ClientConfig::new()
.set("bootstrap.servers", brokers)
.create()
.expect("Admin client creation failed");
admin_client
}
创建主题
use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
async fn create_topic(
admin_client: &AdminClient,
topic_name: &str,
num_partitions: i32,
replication_factor: i32,
) -> Result<(), rdkafka::error::KafkaError> {
let new_topic = NewTopic::new(topic_name, num_partitions, TopicReplication::Fixed(replication_factor));
let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));
let result = admin_client.create_topics(&[new_topic], &options).await;
match result {
Ok(_) => {
println!("Topic '{}' created successfully", topic_name);
Ok(())
}
Err(e) => {
eprintln!("Failed to create topic '{}': {}", topic_name, e);
Err(e)
}
}
}
创建主题时的参数配置
在创建 Kafka 主题时,除了基本的分区数和复制因子外,还可以设置许多其他配置参数。这些参数可以通过 NewTopic
的 set
方法来设置。
use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
use std::collections::HashMap;
use std::time::Duration;
async fn create_topic_with_configs(
admin_client: &AdminClient,
topic_name: &str,
num_partitions: i32,
replication_factor: i32,
configs: &HashMap<String, String>,
) -> Result<(), rdkafka::error::KafkaError> {
let mut new_topic = NewTopic::new(topic_name, num_partitions, TopicReplication::Fixed(replication_factor));
// 设置主题配置参数
for (key, value) in configs {
new_topic = new_topic.set(key, value);
}
let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));
let result = admin_client.create_topics(&[new_topic], &options).await;
match result {
Ok(_) => {
println!("Topic '{}' created successfully with custom configs", topic_name);
Ok(())
}
Err(e) => {
eprintln!("Failed to create topic '{}': {}", topic_name, e);
Err(e)
}
}
}
主题配置参数详解
以下是创建 Kafka 主题时可以设置的所有参数及其说明:
1. 日志清理策略参数
参数名 | 默认值 | 说明 | 可选值 |
---|---|---|---|
cleanup.policy |
delete |
日志清理策略,决定如何处理旧数据 | delete (基于时间或大小删除)、compact (日志压缩)、compact,delete (同时启用压缩和删除) |
delete.retention.ms |
86400000 (24小时) |
已删除消息的保留时间(仅当 cleanup.policy 包含 delete 时有效) |
正整数(毫秒) |
min.compaction.lag.ms |
0 |
消息被压缩前的最小等待时间(仅当 cleanup.policy 包含 compact 时有效) |
正整数(毫秒) |
max.compaction.lag.ms |
9223372036854775807 |
消息被压缩前的最大等待时间(仅当 cleanup.policy 包含 compact 时有效) |
正整数(毫秒) |
min.cleanable.dirty.ratio |
0.5 |
日志清理的触发阈值,当日志中未清理消息的比例超过此值时触发清理 | 0到1之间的浮点数 |
2. 日志保留策略参数
参数名 | 默认值 | 说明 | 可选值 |
---|---|---|---|
retention.ms |
604800000 (7天) |
消息保留时间,超过此时间的消息将被删除 | 正整数(毫秒),-1表示永不删除 |
retention.bytes |
-1 |
主题的最大大小,超过此大小的消息将被删除 | 正整数(字节),-1表示无限制 |
retention.check.interval.ms |
300000 (5分钟) |
日志保留策略的检查间隔 | 正整数(毫秒) |
segment.ms |
604800000 (7天) |
日志段滚动的时间间隔,超过此时间将创建新的日志段 | 正整数(毫秒) |
segment.bytes |
1073741824 (1GB) |
单个日志段的最大大小 | 正整数(字节) |
segment.index.bytes |
10485760 (10MB) |
日志索引文件的最大大小 | 正整数(字节) |
segment.jitter.ms |
0 |
日志段滚动时间的最大随机抖动,用于避免同时滚动多个日志段 | 正整数(毫秒) |
3. 消息大小和时间戳参数
参数名 | 默认值 | 说明 | 可选值 |
---|---|---|---|
max.message.bytes |
1048588 (1MB) |
单个消息的最大大小 | 正整数(字节) |
message.timestamp.type |
CreateTime |
消息时间戳类型 | CreateTime (消息创建时间)、LogAppendTime (日志追加时间) |
message.timestamp.difference.max.ms |
9223372036854775807 |
允许的消息时间戳与 broker 时间戳之间的最大差异 | 正整数(毫秒) |
message.downconversion.enable |
true |
是否启用消息格式降级转换 | true 、false |
4. 日志刷新和同步参数
参数名 | 默认值 | 说明 | 可选值 |
---|---|---|---|
flush.messages |
9223372036854775807 |
日志刷新的消息数量阈值,超过此数量的消息将刷新到磁盘 | 正整数,-1表示禁用 |
flush.ms |
9223372036854775807 |
日志刷新的时间间隔,超过此时间将刷新到磁盘 | 正整数(毫秒),-1表示禁用 |
unclean.leader.election.enable |
false |
是否允许在数据丢失的情况下选举新的 leader | true 、false |
5. 索引和缓存参数
参数名 | 默认值 | 说明 | 可选值 |
---|---|---|---|
index.interval.bytes |
4096 (4KB) |
索引项之间的字节数 | 正整数(字节) |
preallocate |
false |
是否预分配日志文件 | true 、false |
6. 压缩参数
参数名 | 默认值 | 说明 | 可选值 |
---|---|---|---|
compression.type |
producer |
消息压缩类型 | producer (使用生产者指定的压缩类型)、none 、gzip 、snappy 、lz4 、zstd |
7. 副本和领导者参数
参数名 | 默认值 | 说明 | 可选值 |
---|---|---|---|
min.insync.replicas |
1 |
当生产者设置 acks=all 时,必须成功写入的最小副本数 |
正整数(1 ≤ 值 ≤ 副本因子) |
leader.replication.throttled.replicas |
无 | 被限制复制的副本列表 | 副本ID列表,例如:0:1,1:2 |
8. 远程日志存储参数
参数名 | 默认值 | 说明 | 可选值 |
---|---|---|---|
remote.log.storage.enable |
false |
是否启用远程日志存储 | true 、false |
local.log.retention.ms |
-2 |
本地日志保留时间,-2表示使用 retention.ms 的值 |
正整数(毫秒),-2表示使用 retention.ms 的值 |
local.log.retention.bytes |
-2 |
本地日志保留大小,-2表示使用 retention.bytes 的值 |
正整数(字节),-2表示使用 retention.bytes 的值 |
创建带配置的主题示例
use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
use std::collections::HashMap;
use std::time::Duration;
async fn create_topic_with_example_configs(
admin_client: &AdminClient,
topic_name: &str,
) -> Result<(), rdkafka::error::KafkaError> {
let mut configs = HashMap::new();
// 设置日志清理策略为删除和压缩
configs.insert("cleanup.policy".to_string(), "delete,compact".to_string());
// 设置消息保留时间为3天
configs.insert("retention.ms".to_string(), "259200000".to_string());
// 设置单个日志段的最大大小为512MB
configs.insert("segment.bytes".to_string(), "536870912".to_string());
// 设置单个消息的最大大小为5MB
configs.insert("max.message.bytes".to_string(), "5242880".to_string());
// 设置消息时间戳类型为日志追加时间
configs.insert("message.timestamp.type".to_string(), "LogAppendTime".to_string());
// 设置压缩类型为lz4
configs.insert("compression.type".to_string(), "lz4".to_string());
// 设置必须成功写入的最小副本数为2
configs.insert("min.insync.replicas".to_string(), "2".to_string());
let mut new_topic = NewTopic::new(topic_name, 6, TopicReplication::Fixed(3));
// 应用配置
for (key, value) in &configs {
new_topic = new_topic.set(key, value);
}
let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(10)));
let result = admin_client.create_topics(&[new_topic], &options).await;
match result {
Ok(_) => {
println!("Topic '{}' created successfully with custom configs", topic_name);
Ok(())
}
Err(e) => {
eprintln!("Failed to create topic '{}': {}", topic_name, e);
Err(e)
}
}
}
删除主题
use rdkafka::admin::{AdminClient, AdminOptions};
async fn delete_topic(
admin_client: &AdminClient,
topic_name: &str,
) -> Result<(), rdkafka::error::KafkaError> {
let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));
let result = admin_client.delete_topics(&[topic_name], &options).await;
match result {
Ok(_) => {
println!("Topic '{}' deleted successfully", topic_name);
Ok(())
}
Err(e) => {
eprintln!("Failed to delete topic '{}': {}", topic_name, e);
Err(e)
}
}
}
列出所有主题
use rdkafka::admin::{AdminClient, AdminOptions, Metadata};
async fn list_topics(admin_client: &AdminClient) -> Result<Vec<String>, rdkafka::error::KafkaError> {
let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));
let metadata: Metadata = admin_client.inner().fetch_metadata(None, Duration::from_secs(5))?;
let topics: Vec<String> = metadata.topics()
.iter()
.map(|topic| topic.name().to_string())
.collect();
Ok(topics)
}
获取主题详情
use rdkafka::admin::{AdminClient, AdminOptions, Metadata};
async fn get_topic_details(
admin_client: &AdminClient,
topic_name: &str,
) -> Result<(i32, i32), rdkafka::error::KafkaError> {
let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));
let metadata: Metadata = admin_client.inner().fetch_metadata(Some(topic_name), Duration::from_secs(5))?;
for topic in metadata.topics() {
if topic.name() == topic_name {
let num_partitions = topic.partitions().len() as i32;
let replication_factor = topic.partitions()
.first()
.map(|p| p.replicas().len() as i32)
.unwrap_or(0);
return Ok((num_partitions, replication_factor));
}
}
Err(rdkafka::error::KafkaError::UnknownTopicOrPartition)
}
修改主题配置
use rdkafka::admin::{AdminClient, AdminOptions, AlterConfig, ConfigSource};
async fn alter_topic_config(
admin_client: &AdminClient,
topic_name: &str,
config_updates: Vec<(String, String)>,
) -> Result<(), rdkafka::error::KafkaError> {
let mut alter_config = AlterConfig::new(ConfigSource::Topic(topic_name.to_string()));
for (key, value) in config_updates {
alter_config = alter_config.set(key, value);
}
let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));
let result = admin_client.alter_configs(&[alter_config], &options).await;
match result {
Ok(_) => {
println!("Topic '{}' config updated successfully", topic_name);
Ok(())
}
Err(e) => {
eprintln!("Failed to update topic '{}' config: {}", topic_name, e);
Err(e)
}
}
}
创建分区
use rdkafka::admin::{AdminClient, AdminOptions, NewPartitions};
async fn create_partitions(
admin_client: &AdminClient,
topic_name: &str,
new_total_count: i32,
) -> Result<(), rdkafka::error::KafkaError> {
let new_partitions = NewPartitions::new(topic_name, new_total_count);
let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));
let result = admin_client.create_partitions(&[new_partitions], &options).await;
match result {
Ok(_) => {
println!("Partitions for topic '{}' created successfully", topic_name);
Ok(())
}
Err(e) => {
eprintln!("Failed to create partitions for topic '{}': {}", topic_name, e);
Err(e)
}
}
}
列出消费者组
use rdkafka::admin::{AdminClient, AdminOptions, GroupListing};
async fn list_consumer_groups(admin_client: &AdminClient) -> Result<Vec<GroupListing>, rdkafka::error::KafkaError> {
let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));
let groups = admin_client.list_consumer_groups(&options).await?;
Ok(groups)
}
描述消费者组
use rdkafka::admin::{AdminClient, AdminOptions, GroupListing};
async fn describe_consumer_group(
admin_client: &AdminClient,
group_id: &str,
) -> Result<(), rdkafka::error::KafkaError> {
let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));
let group_description = admin_client.describe_consumer_groups(&[group_id], &options).await?;
for description in group_description {
println!("Group ID: {}", description.group_id());
println!("State: {}", description.state());
println!("Coordinator: {:?}", description.coordinator());
for member in description.members() {
println!("Member ID: {}", member.member_id());
println!("Client ID: {}", member.client_id());
println!("Host: {}", member.client_host());
for assignment in member.assignment() {
println!("Topic: {}, Partition: {}", assignment.topic(), assignment.partition());
}
}
}
Ok(())
}
删除消费者组
use rdkafka::admin::{AdminClient, AdminOptions};
async fn delete_consumer_group(
admin_client: &AdminClient,
group_id: &str,
) -> Result<(), rdkafka::error::KafkaError> {
let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));
let result = admin_client.delete_consumer_groups(&[group_id], &options).await;
match result {
Ok(_) => {
println!("Consumer group '{}' deleted successfully", group_id);
Ok(())
}
Err(e) => {
eprintln!("Failed to delete consumer group '{}': {}", group_id, e);
Err(e)
}
}
}
高级功能
事务支持
use rdkafka::producer::{FutureProducer, TransactionalProducer};
use rdkafka::config::ClientConfig;
use rdkafka::producer::FutureRecord;
use std::time::Duration;
async fn create_transactional_producer(
brokers: &str,
transactional_id: &str,
) -> TransactionalProducer {
let producer: TransactionalProducer = ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("transactional.id", transactional_id)
.set("enable.idempotence", "true")
.create()
.expect("Transactional producer creation failed");
producer
}
async fn send_messages_in_transaction(
producer: &TransactionalProducer,
topic: &str,
messages: Vec<(Option<&str>, Option<&[u8]>)>,
) -> Result<(), rdkafka::error::KafkaError> {
// 初始化事务
producer.init_transactions(Duration::from_secs(5)).await?;
// 开始事务
producer.begin_transaction().await?;
// 发送消息
for (key, payload) in messages {
let record = FutureRecord::to(topic)
.key(key)
.payload(payload);
producer.send(record, Duration::from_secs(0)).await?;
}
// 提交事务
producer.commit_transaction(Duration::from_secs(5)).await?;
Ok(())
}
精确一次语义 (Exactly-Once Semantics)
use rdkafka::producer::{FutureProducer, TransactionalProducer};
use rdkafka::consumer::{StreamConsumer, Consumer};
use rdkafka::config::ClientConfig;
use rdkafka::producer::FutureRecord;
use rdkafka::message::Message;
use std::time::Duration;
async fn exactly_once_processing(
input_topic: &str,
output_topic: &str,
brokers: &str,
group_id: &str,
transactional_id: &str,
) -> Result<(), rdkafka::error::KafkaError> {
// 创建事务性生产者
let producer: TransactionalProducer = ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("transactional.id", transactional_id)
.set("enable.idempotence", "true")
.create()
.expect("Transactional producer creation failed");
// 创建消费者
let consumer: StreamConsumer = ClientConfig::new()
.set("group.id", group_id)
.set("bootstrap.servers", brokers)
.set("enable.auto.commit", "false")
.set("isolation.level", "read_committed")
.create()
.expect("Consumer creation failed");
consumer.subscribe(&[input_topic])
.expect("Can't subscribe to input topic");
// 初始化事务
producer.init_transactions(Duration::from_secs(5)).await?;
let mut message_stream = consumer.stream();
while let Some(result) = message_stream.next().await {
match result {
Ok(input_msg) => {
// 开始事务
producer.begin_transaction().await?;
// 处理消息
let processed_payload = process_message(input_msg.view().unwrap_or(&[]));
// 发送处理后的消息
let record = FutureRecord::to(output_topic)
.key(input_msg.key())
.payload(&processed_payload);
producer.send(record, Duration::from_secs(0)).await?;
// 提交消费者偏移量
producer.send_offsets_to_transaction(
&consumer.consumer_group_metadata()?,
Duration::from_secs(5),
).await?;
// 提交事务
producer.commit_transaction(Duration::from_secs(5)).await?;
}
Err(e) => {
eprintln!("Error while consuming: {}", e);
// 中止事务
producer.abort_transaction(Duration::from_secs(5)).await?;
}
}
}
Ok(())
}
fn process_message(input: &[u8]) -> Vec<u8> {
// 这里实现消息处理逻辑
// 示例:简单地将消息转换为大写
input.to_uppercase()
}
自定义分区器
use rdkafka::producer::{DefaultProducerContext, FutureProducer, ProducerContext};
use rdkafka::config::ClientConfig;
use rdkafka::message::{Message, ToBytes};
use rdkafka::util::Timeout;
use std::time::Duration;
struct CustomPartitioner;
impl ProducerContext for CustomPartitioner {
fn partition(&self, topic: &str, key: Option<&[u8]>, partition_count: i32, _key_data: Option<&[u8]>) -> i32 {
// 自定义分区逻辑
// 示例:基于键的哈希值进行分区
if let Some(key) = key {
let hash = key.iter().fold(0, |acc, &x| acc.wrapping_add(x as usize));
(hash % partition_count as usize) as i32
} else {
// 如果没有键,使用轮询策略
let current_time = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
(current_time % partition_count as u64) as i32
}
}
}
async fn create_producer_with_custom_partitioner(brokers: &str) -> FutureProducer<CustomPartitioner> {
let producer: FutureProducer<CustomPartitioner> = ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("message.timeout.ms", "5000")
.create_with_context(CustomPartitioner)
.expect("Producer creation error");
producer
}
自定义序列化器
use serde::{Serialize, Deserialize};
use rdkafka::producer::FutureRecord;
use rdkafka::message::ToBytes;
use std::error::Error;
#[derive(Serialize, Deserialize, Debug)]
struct MyData {
id: u32,
name: String,
timestamp: i64,
}
trait MessageSerializer<T> {
fn serialize(&self, data: &T) -> Result<Vec<u8>, Box<dyn Error>>;
}
trait MessageDeserializer<T> {
fn deserialize(&self, bytes: &[u8]) -> Result<T, Box<dyn Error>>;
}
struct JsonSerializer;
impl MessageSerializer<MyData> for JsonSerializer {
fn serialize(&self, data: &MyData) -> Result<Vec<u8>, Box<dyn Error>> {
Ok(serde_json::to_vec(data)?)
}
}
struct JsonDeserializer;
impl MessageDeserializer<MyData> for JsonDeserializer {
fn deserialize(&self, bytes: &[u8]) -> Result<MyData, Box<dyn Error>> {
Ok(serde_json::from_slice(bytes)?)
}
}
async fn send_serialized_message(
producer: &FutureProducer,
topic: &str,
key: Option<&str>,
data: &MyData,
serializer: &impl MessageSerializer<MyData>,
) -> Result<(), Box<dyn Error>> {
let payload = serializer.serialize(data)?;
let record = FutureRecord::to(topic)
.key(key)
.payload(&payload);
let delivery_status = producer.send(record, Duration::from_secs(0)).await;
match delivery_status {
Ok(_) => {
println!("Serialized message sent successfully");
Ok(())
}
Err((e, _)) => {
eprintln!("Failed to send serialized message: {}", e);
Err(Box::new(e))
}
}
}
消息拦截器
use rdkafka::producer::{FutureProducer, ProducerContext};
use rdkafka::consumer::{StreamConsumer, ConsumerContext};
use rdkafka::message::{Message, OwnedMessage};
use rdkafka::config::ClientConfig;
use std::time::Duration;
struct LoggingProducerContext;
impl ProducerContext for LoggingProducerContext {
fn delivery(&self, delivery_result: &rdkafka::producer::DeliveryResult) {
match delivery_result {
Ok(message) => {
println!("Message delivered to topic {}, partition {} at offset {}",
message.topic(), message.partition(), message.offset());
}
Err((e, message)) => {
eprintln!("Failed to deliver message to topic {}, partition {}: {}",
message.topic(), message.partition(), e);
}
}
}
}
struct LoggingConsumerContext;
impl ConsumerContext for LoggingConsumerContext {
fn message_consumed(&self, message: &OwnedMessage) {
match message.payload_view::<str>() {
Ok(Some(payload)) => {
println!("Message consumed from topic {}, partition {} at offset {}: {}",
message.topic(), message.partition(), message.offset(), payload);
}
Ok(None) => {
println!("Empty message consumed from topic {}, partition {} at offset {}",
message.topic(), message.partition(), message.offset());
}
Err(e) => {
eprintln!("Error while consuming message from topic {}, partition {}: {}",
message.topic(), message.partition(), e);
}
}
}
}
async fn create_producer_with_interceptor(brokers: &str) -> FutureProducer<LoggingProducerContext> {
let producer: FutureProducer<LoggingProducerContext> = ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("message.timeout.ms", "5000")
.create_with_context(LoggingProducerContext)
.expect("Producer creation error");
producer
}
async fn create_consumer_with_interceptor(brokers: &str, group_id: &str, topics: &[&str]) -> StreamConsumer<LoggingConsumerContext> {
let consumer: StreamConsumer<LoggingConsumerContext> = ClientConfig::new()
.set("group.id", group_id)
.set("bootstrap.servers", brokers)
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "false")
.create_with_context(LoggingConsumerContext)
.expect("Consumer creation failed");
consumer.subscribe(&topics.to_vec())
.expect("Can't subscribe to specified topics");
consumer
}
消息过滤
use rdkafka::consumer::{StreamConsumer, Consumer};
use rdkafka::message::Message;
use futures::StreamExt;
async fn filter_messages(
consumer: &StreamConsumer,
filter_fn: impl Fn(&[u8]) -> bool + Send + Sync + 'static,
) {
let mut message_stream = consumer.stream();
while let Some(result) = message_stream.next().await {
match result {
Ok(msg) => {
match msg.view() {
Some(payload) => {
if filter_fn(payload) {
println!("Filtered message: {:?}", payload);
// 处理符合条件的消息
}
}
None => {
println!("Received empty message");
}
}
}
Err(e) => {
eprintln!("Error while consuming: {}", e);
}
}
}
}
// 示例过滤函数
fn example_filter(payload: &[u8]) -> bool {
// 只处理包含特定字符串的消息
let payload_str = std::str::from_utf8(payload).unwrap_or("");
payload_str.contains("important")
}
消息转换
use rdkafka::producer::{FutureProducer, Producer};
use rdkafka::consumer::{StreamConsumer, Consumer};
use rdkafka::message::Message;
use rdkafka::producer::FutureRecord;
use futures::StreamExt;
use std::time::Duration;
async fn transform_messages(
input_consumer: &StreamConsumer,
output_producer: &FutureProducer,
input_topic: &str,
output_topic: &str,
transform_fn: impl Fn(&[u8]) -> Vec<u8> + Send + Sync + 'static,
) {
let mut message_stream = input_consumer.stream();
while let Some(result) = message_stream.next().await {
match result {
Ok(msg) => {
match msg.view() {
Some(payload) => {
// 转换消息
let transformed_payload = transform_fn(payload);
// 发送转换后的消息
let record = FutureRecord::to(output_topic)
.key(msg.key())
.payload(&transformed_payload);
match output_producer.send(record, Duration::from_secs(0)).await {
Ok(_) => {
println!("Transformed message sent successfully");
}
Err((e, _)) => {
eprintln!("Failed to send transformed message: {}", e);
}
}
}
None => {
println!("Received empty message");
}
}
}
Err(e) => {
eprintln!("Error while consuming: {}", e);
}
}
}
}
// 示例转换函数
fn example_transform(payload: &[u8]) -> Vec<u8> {
// 将消息转换为大写
let payload_str = std::str::from_utf8(payload).unwrap_or("");
payload_str.to_uppercase().into_bytes()
}
错误处理
基本错误处理
use rdkafka::error::{KafkaError, RDKafkaError};
use rdkafka::producer::FutureProducer;
use rdkafka::producer::FutureRecord;
use std::time::Duration;
async fn send_message_with_error_handling(
producer: &FutureProducer,
topic: &str,
key: Option<&str>,
payload: Option<&[u8]>,
) -> Result<(), Box<dyn std::error::Error>> {
let record = FutureRecord::to(topic)
.key(key)
.payload(payload);
match producer.send(record, Duration::from_secs(0)).await {
Ok(_) => {
println!("Message sent successfully");
Ok(())
}
Err((e, _)) => {
match e {
KafkaError::MessageProduction(RDKafkaError::BrokerTransportFailure(_)) => {
eprintln!("Broker transport failure: {}", e);
// 可以尝试重新连接或使用备用 broker
}
KafkaError::MessageProduction(RDKafkaError::QueueFull) => {
eprintln!("Producer queue is full: {}", e);
// 可以等待一段时间后重试
}
KafkaError::MessageProduction(RDKafkaError::MessageSizeTooLarge) => {
eprintln!("Message size too large: {}", e);
// 可以尝试压缩消息或减小消息大小
}
KafkaError::MessageProduction(RDKafkaError::UnknownTopicOrPartition) => {
eprintln!("Unknown topic or partition: {}", e);
// 可以尝试创建主题或检查主题名称
}
_ => {
eprintln!("Failed to send message: {}", e);
}
}
Err(Box::new(e))
}
}
}
重试机制
use rdkafka::producer::FutureProducer;
use rdkafka::producer::FutureRecord;
use std::time::Duration;
use tokio::time::sleep;
async fn send_message_with_retry(
producer: &FutureProducer,
topic: &str,
key: Option<&str>,
payload: Option<&[u8]>,
max_retries: u32,
retry_delay: Duration,
) -> Result<(), Box<dyn std::error::Error>> {
let mut retries = 0;
loop {
let record = FutureRecord::to(topic)
.key(key)
.payload(payload);
match producer.send(record, Duration::from_secs(0)).await {
Ok(_) => {
println!("Message sent successfully");
return Ok(());
}
Err((e, _)) => {
retries += 1;
if retries >= max_retries {
eprintln!("Failed to send message after {} retries: {}", max_retries, e);
return Err(Box::new(e));
}
eprintln!("Failed to send message (attempt {}/{}): {}, retrying in {:?}...",
retries, max_retries, e, retry_delay);
sleep(retry_delay).await;
}
}
}
}
死信队列处理
use rdkafka::producer::FutureProducer;
use rdkafka::consumer::{StreamConsumer, Consumer};
use rdkafka::message::Message;
use rdkafka::producer::FutureRecord;
use futures::StreamExt;
use std::time::Duration;
async fn dead_letter_queue_handler(
main_consumer: &StreamConsumer,
dlq_producer: &FutureProducer,
main_topic: &str,
dlq_topic: &str,
error_handler: impl Fn(&[u8]) -> bool + Send + Sync + 'static,
) {
let mut message_stream = main_consumer.stream();
while let Some(result) = message_stream.next().await {
match result {
Ok(msg) => {
match msg.view() {
Some(payload) => {
// 尝试处理消息
if error_handler(payload) {
// 处理成功
println!("Message processed successfully");
} else {
// 处理失败,发送到死信队列
let record = FutureRecord::to(dlq_topic)
.key(msg.key())
.payload(payload);
match dlq_producer.send(record, Duration::from_secs(0)).await {
Ok(_) => {
println!("Failed message sent to dead letter queue");
}
Err((e, _)) => {
eprintln!("Failed to send message to dead letter queue: {}", e);
}
}
}
}
None => {
println!("Received empty message");
}
}
}
Err(e) => {
eprintln!("Error while consuming: {}", e);
}
}
}
}
// 示例错误处理函数
fn example_error_handler(payload: &[u8]) -> bool {
// 模拟处理消息,有时会失败
let payload_str = std::str::from_utf8(payload).unwrap_or("");
// 如果消息包含 "error",则处理失败
if payload_str.contains("error") {
return false;
}
// 否则处理成功
true
}
性能优化
批量处理
use rdkafka::producer::{FutureProducer, Producer};
use rdkafka::producer::FutureRecord;
use rdkafka::message::ToBytes;
use futures::future::try_join_all;
use std::time::Duration;
async fn batch_send_messages(
producer: &FutureProducer,
topic: &str,
messages: Vec<(Option<&str>, Option<&[u8]>)>,
batch_size: usize,
) -> Result<(), rdkafka::error::KafkaError> {
for chunk in messages.chunks(batch_size) {
let futures = chunk.iter().map(|(key, payload)| {
let record = FutureRecord::to(topic)
.key(*key)
.payload(*payload);
producer.send(record, Duration::from_secs(0))
});
let results = try_join_all(futures).await;
match results {
Ok(_) => {
println!("Batch of {} messages sent successfully", chunk.len());
}
Err(e) => {
eprintln!("Failed to send batch of messages: {}", e);
return Err(e.into());
}
}
}
Ok(())
}
异步并发处理
use rdkafka::consumer::{StreamConsumer, Consumer};
use rdkafka::message::Message;
use futures::StreamExt;
use tokio::task;
async fn concurrent_message_processing(
consumer: &StreamConsumer,
concurrency_level: usize,
process_fn: impl Fn(&[u8]) + Send + Sync + 'static,
) {
let mut message_stream = consumer.stream();
let mut tasks = Vec::with_capacity(concurrency_level);
while let Some(result) = message_stream.next().await {
match result {
Ok(msg) => {
match msg.view() {
Some(payload) => {
let payload = payload.to_vec();
let process_fn = &process_fn;
// 如果已经达到并发限制,等待一个任务完成
if tasks.len() >= concurrency_level {
if let Some(task_result) = tasks.pop() {
let _ = task_result.await;
}
}
// 启动新的处理任务
let task = task::spawn(async move {
process_fn(&payload);
});
tasks.push(task);
}
None => {
println!("Received empty message");
}
}
}
Err(e) => {
eprintln!("Error while consuming: {}", e);
}
}
}
// 等待所有剩余任务完成
for task in tasks {
let _ = task.await;
}
}
// 示例处理函数
fn example_process_fn(payload: &[u8]) {
// 模拟耗时处理
let payload_str = std::str::from_utf8(payload).unwrap_or("");
println!("Processing message: {}", payload_str);
// 模拟处理耗时
std::thread::sleep(std::time::Duration::from_millis(100));
}
连接池管理
use rdkafka::producer::FutureProducer;
use rdkafka::consumer::StreamConsumer;
use rdkafka::config::ClientConfig;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;
struct KafkaConnectionPool {
producers: Arc<Mutex<HashMap<String, FutureProducer>>>,
consumers: Arc<Mutex<HashMap<String, StreamConsumer>>>,
brokers: String,
}
impl KafkaConnectionPool {
fn new(brokers: &str) -> Self {
Self {
producers: Arc::new(Mutex::new(HashMap::new())),
consumers: Arc::new(Mutex::new(HashMap::new())),
brokers: brokers.to_string(),
}
}
fn get_producer(&self, client_id: &str) -> FutureProducer {
let mut producers = self.producers.lock().unwrap();
if let Some(producer) = producers.get(client_id) {
return producer.clone();
}
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", &self.brokers)
.set("client.id", client_id)
.set("message.timeout.ms", "5000")
.create()
.expect("Producer creation error");
producers.insert(client_id.to_string(), producer.clone());
producer
}
fn get_consumer(&self, group_id: &str, topics: &[&str]) -> StreamConsumer {
let key = format!("{}:{}", group_id, topics.join(","));
let mut consumers = self.consumers.lock().unwrap();
if let Some(consumer) = consumers.get(&key) {
return consumer.clone();
}
let consumer: StreamConsumer = ClientConfig::new()
.set("group.id", group_id)
.set("bootstrap.servers", &self.brokers)
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "false")
.create()
.expect("Consumer creation failed");
consumer.subscribe(&topics.to_vec())
.expect("Can't subscribe to specified topics");
consumers.insert(key, consumer.clone());
consumer
}
}
缓冲区优化
use rdkafka::producer::{FutureProducer, Producer};
use rdkafka::producer::FutureRecord;
use rdkafka::message::ToBytes;
use std::time::Duration;
async fn optimized_send_messages(
producer: &FutureProducer,
topic: &str,
messages: Vec<(Option<&str>, Option<&[u8]>)>,
linger_ms: u32,
batch_size: usize,
) -> Result<(), rdkafka::error::KafkaError> {
// 配置生产者以优化批量发送
let mut config = producer.client().config().clone();
config.set("linger.ms", linger_ms.to_string());
config.set("batch.size", batch_size.to_string());
// 创建新的生产者实例
let optimized_producer: FutureProducer = config.create()
.expect("Optimized producer creation error");
// 发送消息
for (key, payload) in messages {
let record = FutureRecord::to(topic)
.key(key)
.payload(payload);
match optimized_producer.send(record, Duration::from_secs(0)).await {
Ok(_) => {
// 消息已添加到缓冲区
}
Err((e, _)) => {
eprintln!("Failed to send message: {}", e);
return Err(e);
}
}
}
// 刷新缓冲区,确保所有消息都已发送
optimized_producer.flush(Duration::from_secs(5));
Ok(())
}
最佳实践
资源管理
use rdkafka::producer::FutureProducer;
use rdkafka::consumer::StreamConsumer;
use rdkafka::admin::AdminClient;
use std::sync::Arc;
struct KafkaClient {
producer: Arc<FutureProducer>,
consumer: Arc<StreamConsumer>,
admin: Arc<AdminClient>,
}
impl KafkaClient {
fn new(brokers: &str, group_id: &str, topics: &[&str]) -> Self {
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("message.timeout.ms", "5000")
.create()
.expect("Producer creation error");
let consumer: StreamConsumer = ClientConfig::new()
.set("group.id", group_id)
.set("bootstrap.servers", brokers)
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "false")
.create()
.expect("Consumer creation failed");
consumer.subscribe(&topics.to_vec())
.expect("Can't subscribe to specified topics");
let admin: AdminClient = ClientConfig::new()
.set("bootstrap.servers", brokers)
.create()
.expect("Admin client creation failed");
Self {
producer: Arc::new(producer),
consumer: Arc::new(consumer),
admin: Arc::new(admin),
}
}
fn producer(&self) -> Arc<FutureProducer> {
self.producer.clone()
}
fn consumer(&self) -> Arc<StreamConsumer> {
self.consumer.clone()
}
fn admin(&self) -> Arc<AdminClient> {
self.admin.clone()
}
}
// 使用示例
async fn use_kafka_client() {
let brokers = "localhost:9092";
let group_id = "my-group";
let topics = &["my-topic"];
let kafka_client = KafkaClient::new(brokers, group_id, topics);
// 使用生产者
let producer = kafka_client.producer();
// ... 使用生产者发送消息
// 使用消费者
let consumer = kafka_client.consumer();
// ... 使用消费者接收消息
// 使用管理员
let admin = kafka_client.admin();
// ... 使用管理员执行管理操作
}
配置管理
use serde::{Deserialize, Serialize};
use std::env;
#[derive(Debug, Serialize, Deserialize)]
struct KafkaConfig {
brokers: String,
group_id: String,
topics: Vec<String>,
producer_config: ProducerConfig,
consumer_config: ConsumerConfig,
admin_config: AdminConfig,
}
#[derive(Debug, Serialize, Deserialize)]
struct ProducerConfig {
linger_ms: u32,
batch_size: usize,
message_timeout_ms: u32,
acks: String,
retries: u32,
}
#[derive(Debug, Serialize, Deserialize)]
struct ConsumerConfig {
session_timeout_ms: u32,
auto_commit: bool,
auto_offset_reset: String,
max_poll_records: i32,
}
#[derive(Debug, Serialize, Deserialize)]
struct AdminConfig {
request_timeout_ms: u32,
}
impl KafkaConfig {
fn from_env() -> Result<Self, env::VarError> {
Ok(Self {
brokers: env::var("KAFKA_BROKERS")?,
group_id: env::var("KAFKA_GROUP_ID")?,
topics: env::var("KAFKA_TOPICS")?
.split(',')
.map(|s| s.to_string())
.collect(),
producer_config: ProducerConfig {
linger_ms: env::var("KAFKA_PRODUCER_LINGER_MS")?
.parse()
.unwrap_or(10),
batch_size: env::var("KAFKA_PRODUCER_BATCH_SIZE")?
.parse()
.unwrap_or(16384),
message_timeout_ms: env::var("KAFKA_PRODUCER_MESSAGE_TIMEOUT_MS")?
.parse()
.unwrap_or(5000),
acks: env::var("KAFKA_PRODUCER_ACKS")?
.parse()
.unwrap_or_else(|_| "all".to_string()),
retries: env::var("KAFKA_PRODUCER_RETRIES")?
.parse()
.unwrap_or(3),
},
consumer_config: ConsumerConfig {
session_timeout_ms: env::var("KAFKA_CONSUMER_SESSION_TIMEOUT_MS")?
.parse()
.unwrap_or(10000),
auto_commit: env::var("KAFKA_CONSUMER_AUTO_COMMIT")?
.parse()
.unwrap_or(false),
auto_offset_reset: env::var("KAFKA_CONSUMER_AUTO_OFFSET_RESET")?
.parse()
.unwrap_or_else(|_| "earliest".to_string()),
max_poll_records: env::var("KAFKA_CONSUMER_MAX_POLL_RECORDS")?
.parse()
.unwrap_or(500),
},
admin_config: AdminConfig {
request_timeout_ms: env::var("KAFKA_ADMIN_REQUEST_TIMEOUT_MS")?
.parse()
.unwrap_or(5000),
},
})
}
fn to_producer_config(&self) -> rdkafka::config::ClientConfig {
let mut config = rdkafka::config::ClientConfig::new();
config.set("bootstrap.servers", &self.brokers);
config.set("linger.ms", self.producer_config.linger_ms.to_string());
config.set("batch.size", self.producer_config.batch_size.to_string());
config.set("message.timeout.ms", self.producer_config.message_timeout_ms.to_string());
config.set("acks", &self.producer_config.acks);
config.set("retries", self.producer_config.retries.to_string());
config
}
fn to_consumer_config(&self) -> rdkafka::config::ClientConfig {
let mut config = rdkafka::config::ClientConfig::new();
config.set("bootstrap.servers", &self.brokers);
config.set("group.id", &self.group_id);
config.set("session.timeout.ms", self.consumer_config.session_timeout_ms.to_string());
config.set("enable.auto.commit", self.consumer_config.auto_commit.to_string());
config.set("auto.offset.reset", &self.consumer_config.auto_offset_reset);
config.set("max.poll.records", self.consumer_config.max_poll_records.to_string());
config
}
fn to_admin_config(&self) -> rdkafka::config::ClientConfig {
let mut config = rdkafka::config::ClientConfig::new();
config.set("bootstrap.servers", &self.brokers);
config.set("request.timeout.ms", self.admin_config.request_timeout_ms.to_string());
config
}
}
// 使用示例
async fn use_config() -> Result<(), Box<dyn std::error::Error>> {
// 从环境变量加载配置
let kafka_config = KafkaConfig::from_env()?;
// 创建生产者
let producer: rdkafka::producer::FutureProducer = kafka_config.to_producer_config()
.create()?;
// 创建消费者
let consumer: rdkafka::consumer::StreamConsumer = kafka_config.to_consumer_config()
.create()?;
// 创建管理员
let admin: rdkafka::admin::AdminClient = kafka_config.to_admin_config()
.create()?;
// 使用客户端...
Ok(())
}
监控和指标
use rdkafka::consumer::StreamConsumer;
use rdkafka::producer::FutureProducer;
use rdkafka::statistics::Statistics;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
struct KafkaMetrics {
messages_produced: u64,
messages_consumed: u64,
bytes_produced: u64,
bytes_consumed: u64,
produce_errors: u64,
consume_errors: u64,
last_update: Instant,
}
impl KafkaMetrics {
fn new() -> Self {
Self {
messages_produced: 0,
messages_consumed: 0,
bytes_produced: 0,
bytes_consumed: 0,
produce_errors: 0,
consume_errors: 0,
last_update: Instant::now(),
}
}
fn record_message_produced(&mut self, size: usize) {
self.messages_produced += 1;
self.bytes_produced += size as u64;
self.last_update = Instant::now();
}
fn record_message_consumed(&mut self, size: usize) {
self.messages_consumed += 1;
self.bytes_consumed += size as u64;
self.last_update = Instant::now();
}
fn record_produce_error(&mut self) {
self.produce_errors += 1;
self.last_update = Instant::now();
}
fn record_consume_error(&mut self) {
self.consume_errors += 1;
self.last_update = Instant::now();
}
fn print_metrics(&self) {
println!("Kafka Metrics:");
println!(" Messages produced: {}", self.messages_produced);
println!(" Messages consumed: {}", self.messages_consumed);
println!(" Bytes produced: {}", self.bytes_produced);
println!(" Bytes consumed: {}", self.bytes_consumed);
println!(" Produce errors: {}", self.produce_errors);
println!(" Consume errors: {}", self.consume_errors);
println!(" Last update: {:?}", self.last_update.elapsed());
}
}
struct MonitoredProducer {
producer: FutureProducer,
metrics: Arc<Mutex<KafkaMetrics>>,
}
impl MonitoredProducer {
fn new(producer: FutureProducer, metrics: Arc<Mutex<KafkaMetrics>>) -> Self {
Self { producer, metrics }
}
async fn send(
&self,
record: rdkafka::producer::FutureRecord<&[u8], &[u8]>,
timeout: Duration,
) -> Result<(), (rdkafka::error::KafkaError, rdkafka::producer::FutureRecord<&[u8], &[u8]>)> {
let size = record.payload.map(|p| p.len()).unwrap_or(0);
match self.producer.send(record, timeout).await {
Ok(_) => {
let mut metrics = self.metrics.lock().unwrap();
metrics.record_message_produced(size);
Ok(())
}
Err(e) => {
let mut metrics = self.metrics.lock().unwrap();
metrics.record_produce_error();
Err(e)
}
}
}
}
struct MonitoredConsumer {
consumer: StreamConsumer,
metrics: Arc<Mutex<KafkaMetrics>>,
}
impl MonitoredConsumer {
fn new(consumer: StreamConsumer, metrics: Arc<Mutex<KafkaMetrics>>) -> Self {
Self { consumer, metrics }
}
async fn consume(&self) {
let mut message_stream = self.consumer.stream();
while let Some(result) = message_stream.next().await {
match result {
Ok(msg) => {
let size = msg.payload().map(|p| p.len()).unwrap_or(0);
let mut metrics = self.metrics.lock().unwrap();
metrics.record_message_consumed(size);
// 处理消息...
}
Err(e) => {
let mut metrics = self.metrics.lock().unwrap();
metrics.record_consume_error();
eprintln!("Error while consuming: {}", e);
}
}
}
}
}
// 使用示例
async fn use_monitored_clients() {
let metrics = Arc::new(Mutex::new(KafkaMetrics::new()));
// 创建生产者
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.set("message.timeout.ms", "5000")
.create()
.expect("Producer creation error");
let monitored_producer = MonitoredProducer::new(producer, metrics.clone());
// 创建消费者
let consumer: StreamConsumer = ClientConfig::new()
.set("group.id", "my-group")
.set("bootstrap.servers", "localhost:9092")
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "false")
.create()
.expect("Consumer creation failed");
consumer.subscribe(&["my-topic"])
.expect("Can't subscribe to specified topics");
let monitored_consumer = MonitoredConsumer::new(consumer, metrics.clone());
// 启动消费者任务
tokio::spawn(async move {
monitored_consumer.consume().await;
});
// 定期打印指标
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(10)).await;
let metrics = metrics.lock().unwrap();
metrics.print_metrics();
}
});
// 使用生产者发送消息...
}
完整示例
use rdkafka::config::ClientConfig;
use rdkafka::producer::{FutureProducer, Producer};
use rdkafka::consumer::{StreamConsumer, Consumer, CommitMode};
use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
use rdkafka::message::{Message, ToBytes};
use rdkafka::producer::FutureRecord;
use futures::StreamExt;
use std::time::Duration;
use tokio;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Kafka 配置
let brokers = "localhost:9092";
let topic = "example-topic";
let group_id = "example-group";
// 创建管理员客户端
let admin_client: AdminClient = ClientConfig::new()
.set("bootstrap.servers", brokers)
.create()
.expect("Admin client creation failed");
// 创建主题(如果不存在)
let new_topic = NewTopic::new(topic, 3, TopicReplication::Fixed(1));
let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));
match admin_client.create_topics(&[new_topic], &options).await {
Ok(_) => {
println!("Topic '{}' created or already exists", topic);
}
Err(e) => {
eprintln!("Failed to create topic '{}': {}", topic, e);
}
}
// 创建生产者
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("message.timeout.ms", "5000")
.set("acks", "all")
.create()
.expect("Producer creation error");
// 创建消费者
let consumer: StreamConsumer = ClientConfig::new()
.set("group.id", group_id)
.set("bootstrap.servers", brokers)
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "false")
.create()
.expect("Consumer creation failed");
consumer.subscribe(&[topic])
.expect("Can't subscribe to specified topics");
// 启动消费者任务
let consumer_handle = tokio::spawn(async move {
let mut message_stream = consumer.stream();
while let Some(result) = message_stream.next().await {
match result {
Ok(msg) => {
match msg.view() {
Some(payload) => {
let payload_str = std::str::from_utf8(payload).unwrap_or("");
println!("Received message: {}", payload_str);
// 手动提交偏移量
consumer.commit_message(&msg, CommitMode::Async)
.expect("Failed to commit message");
}
None => {
println!("Received empty message");
}
}
}
Err(e) => {
eprintln!("Error while consuming: {}", e);
}
}
}
});
// 发送一些消息
for i in 0..10 {
let payload = format!("Message {}", i);
let record = FutureRecord::to(topic)
.key(Some(&format!("key-{}", i)))
.payload(payload.as_bytes());
match producer.send(record, Duration::from_secs(0)).await {
Ok(_) => {
println!("Message {} sent successfully", i);
}
Err((e, _)) => {
eprintln!("Failed to send message {}: {}", i, e);
}
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
// 等待消费者处理完所有消息
tokio::time::sleep(Duration::from_secs(5)).await;
// 取消消费者任务
consumer_handle.abort();
println!("Example completed");
Ok(())
}
总结
本文档详细介绍了如何使用 rdkafka 进行各种 Kafka 操作,包括:
- 生产者操作:发送消息(同步/异步)、批量发送、带消息头的消息发送、指定分区的消息发送等。
- 消费者操作:基础消费、流式消费、从特定分区和偏移量消费、获取消费者位置和水印偏移量等。
- 管理员操作:创建/删除主题、列出主题、获取主题详情、修改主题配置、创建分区、管理消费者组等。
- 高级功能:事务支持、精确一次语义、自定义分区器、自定义序列化器、消息拦截器、消息过滤和转换等。
- 错误处理:基本错误处理、重试机制、死信队列处理等。
- 性能优化:批量处理、异步并发处理、连接池管理、缓冲区优化等。
- 最佳实践:资源管理、配置管理、监控和指标等。
通过遵循本文档中的示例和最佳实践,您可以有效地使用 rdkafka 在 Rust 应用程序中与 Kafka 集群进行交互,构建高性能、可靠的消息处理系统。
参考资料
许可证
本文档基于 MIT 许可证发布。